aio_bench_perf_sweep.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. # Copyright (c) Microsoft Corporation.
  2. # SPDX-License-Identifier: Apache-2.0
  3. # DeepSpeed Team
  4. """
  5. Functionality of swapping optimizer tensors to/from (NVMe) storage devices.
  6. """
  7. import os
  8. import sys
  9. import argparse
  10. import json
  11. import itertools
  12. import subprocess
  13. import shutil
  14. from test_ds_aio_utils import refine_integer_value
  15. from perf_sweep_utils import READ_OP_DESC, WRITE_OP_DESC, BENCH_LOG_DIR, \
  16. READ_IO_DIR, WRITE_IO_DIR, READ_LOG_DIR, WRITE_LOG_DIR
  17. from deepspeed.ops.op_builder import AsyncIOBuilder
  18. OTHER_OPTIONS = '--handle'
  19. PERF_SCRIPT = 'test_ds_aio.py'
  20. DEFAULT_SWEEP_CONFIG = {
  21. "block_size": ["128K", "256K"],
  22. "queue_depth": [4, 16, 32],
  23. "overlap_events": [True, False],
  24. "io_parallel": [2, 8],
  25. "single_submit": [False]
  26. }
  27. class Job(object):
  28. def __init__(self, cmd_line, output_file=None, work_dir=None):
  29. self.cmd_line = cmd_line
  30. self.output_file = output_file
  31. self.work_dir = work_dir
  32. self.output_fd = None
  33. def cmd(self):
  34. return self.cmd_line
  35. def get_stdout(self):
  36. return self.output_fd
  37. def get_stderr(self):
  38. return self.output_fd
  39. def get_cwd(self):
  40. return self.work_dir
  41. def open_output_file(self):
  42. if self.output_file is not None:
  43. self.output_fd = open(self.output_file, 'w')
  44. def close_output_file(self):
  45. if self.output_fd is not None:
  46. self.output_fd.close()
  47. self.output_fd = None
  48. class SweepConfig(object):
  49. def __init__(self, args):
  50. self.nvme_dir = args.nvme_dir
  51. self.io_size = args.io_size
  52. self.search_space = get_sweep_config_dict(args.sweep_config)
  53. self.read = not args.no_read
  54. self.write = not args.no_write
  55. self.flush_cache = not args.no_sudo
  56. self.log_dir = args.log_dir
  57. self.loops = args.loops
  58. self.other_options = f'{OTHER_OPTIONS} --loops {args.loops}'
  59. def parse_arguments():
  60. parser = argparse.ArgumentParser()
  61. parser.add_argument('--nvme_dir',
  62. required=True,
  63. type=str,
  64. help='Directory in which to perform I/O tests. A writeable directory on a NVMe device.')
  65. parser.add_argument('--sweep_config', type=str, default=None, help='Performance sweep configuration json file.')
  66. parser.add_argument('--no_read', action='store_true', help='Disable read performance measurements.')
  67. parser.add_argument('--no_write', action='store_true', help='Disable write performance measurements.')
  68. parser.add_argument('--io_size',
  69. type=str,
  70. default="400M",
  71. help='Number of I/O bytes to read/write for performance measurements.')
  72. parser.add_argument(
  73. '--no_sudo',
  74. action='store_true',
  75. help=
  76. 'Run without sudo access. Page cache will not be flushed and reported read speeds may be higher than actual.')
  77. parser.add_argument(
  78. '--log_dir',
  79. type=str,
  80. default=BENCH_LOG_DIR,
  81. help=f'Output directory for performance log files. Default is {os.path.join(".", BENCH_LOG_DIR)}')
  82. parser.add_argument('--loops', type=int, default=1, help='Count of operation repetitions')
  83. args = parser.parse_args()
  84. print(f'args = {args}')
  85. return args
  86. def dump_cmd_lines(cmd_lines):
  87. print(f'cmd line count = {len(cmd_lines)}')
  88. for i, cmd in enumerate(cmd_lines):
  89. print(f'{i}: {cmd}')
  90. def get_sweep_config_dict(sweep_config_json):
  91. if sweep_config_json is None:
  92. return DEFAULT_SWEEP_CONFIG
  93. with open(sweep_config_json) as fp:
  94. sweep_config = json.load(fp)
  95. return sweep_config
  96. def get_sweep_cmd_lines(sweep_config_dict):
  97. def flatten_options(key, value_list):
  98. flat_list = []
  99. for v in value_list:
  100. if not type(v) is bool:
  101. flat_list.append(f'--{key} {v}')
  102. elif v:
  103. flat_list.append(f'--{key}')
  104. else:
  105. flat_list.append(' ')
  106. return flat_list
  107. flat_list = [flatten_options(key, value) for key, value in sweep_config_dict.items()]
  108. cmd_list = list(itertools.product(*flat_list))
  109. cmd_list = [list(cmd) for cmd in cmd_list]
  110. #dump_cmd_lines(cmd_list)
  111. return cmd_list
  112. def run_job(job):
  113. args = ' '.join(job.cmd())
  114. print(f'args = {args}')
  115. job.open_output_file()
  116. proc = subprocess.run(args=args, shell=True, stdout=job.get_stdout(), stderr=job.get_stderr(), cwd=job.get_cwd())
  117. job.close_output_file()
  118. assert proc.returncode == 0, \
  119. f"This command failed: {job.cmd()}"
  120. def launch_sweep(sweep_jobs, sync_job, flush_cache_job):
  121. for perf_job in sweep_jobs:
  122. if flush_cache_job is not None:
  123. run_job(sync_job)
  124. run_job(flush_cache_job)
  125. run_job(perf_job)
  126. run_job(sync_job)
  127. def create_cmd_tags(cmd_line):
  128. tags = {}
  129. for param_value in cmd_line:
  130. fields = param_value.split()
  131. if len(fields) == 1:
  132. tags[fields[0]] = None
  133. elif len(fields) == 2:
  134. tags[fields[0]] = fields[1]
  135. return tags
  136. def get_log_file(io_op_desc, cmd_line):
  137. QUEUE_DEPTH = "--queue_depth"
  138. BLOCK_SIZE = "--block_size"
  139. SINGLE_SUBMIT = "--single_submit"
  140. OVERLAP_EVENTS = "--overlap_events"
  141. THREAD_COUNT = "--threads"
  142. IO_PARALLEL = "--io_parallel"
  143. tag_map = {
  144. QUEUE_DEPTH: "d",
  145. BLOCK_SIZE: "bs",
  146. SINGLE_SUBMIT: "single",
  147. OVERLAP_EVENTS: "overlap",
  148. THREAD_COUNT: "t",
  149. IO_PARALLEL: "p"
  150. }
  151. tag_default = {
  152. QUEUE_DEPTH: 1,
  153. BLOCK_SIZE: "1M",
  154. SINGLE_SUBMIT: "block",
  155. OVERLAP_EVENTS: "sequential",
  156. THREAD_COUNT: 1,
  157. IO_PARALLEL: 1
  158. }
  159. def get_default_value(tag):
  160. value = tag_default[tag]
  161. if tag in [SINGLE_SUBMIT, OVERLAP_EVENTS]:
  162. return value
  163. return f'{tag_map[tag]}{value}'
  164. def get_config_value(tag, value):
  165. tag_key = tag_map[tag]
  166. if value is None:
  167. return tag_key
  168. return f'{tag_key}{value}'
  169. tag_list = [SINGLE_SUBMIT, OVERLAP_EVENTS, THREAD_COUNT, IO_PARALLEL, QUEUE_DEPTH, BLOCK_SIZE]
  170. log_tags = [io_op_desc]
  171. cmd_tags = create_cmd_tags(cmd_line)
  172. for tag in tag_list:
  173. if tag in cmd_tags:
  174. log_tags.append(get_config_value(tag, cmd_tags[tag]))
  175. else:
  176. log_tags.append(get_default_value(tag))
  177. log_file = '_'.join(log_tags)
  178. log_file += '.txt'
  179. return log_file
  180. def create_perf_jobs(io_op_desc, log_dir, cmd_lines):
  181. py_cmd = ['python', os.path.join(script_path(), PERF_SCRIPT)]
  182. perf_jobs = []
  183. for cmd in cmd_lines:
  184. log_file = os.path.join(log_dir, get_log_file(io_op_desc, cmd))
  185. job = Job(cmd_line=py_cmd + cmd, output_file=log_file)
  186. perf_jobs.append(job)
  187. return perf_jobs
  188. def script_path():
  189. return os.path.dirname(os.path.realpath(sys.argv[0]))
  190. def async_io_setup():
  191. return AsyncIOBuilder().is_compatible()
  192. def get_block_size_and_count(io_bytes):
  193. block_size = 1
  194. block_count = io_bytes
  195. bytes_in_KB = 1024
  196. while block_count % bytes_in_KB == 0:
  197. block_size *= bytes_in_KB
  198. block_count /= bytes_in_KB
  199. return int(block_size), int(block_count)
  200. def create_read_file(sweep_config):
  201. read_folder = os.path.join(sweep_config.nvme_dir, f'{READ_IO_DIR}')
  202. os.makedirs(read_folder, exist_ok=True)
  203. read_file_name = os.path.join(read_folder, f'random_{sweep_config.io_size}B.pt')
  204. block_size, block_count = get_block_size_and_count(refine_integer_value(sweep_config.io_size))
  205. dd_job = Job(cmd_line=[f'dd if=/dev/urandom of={read_file_name} bs={block_size} count={block_count}'])
  206. print(f'[Start] Create read file of {sweep_config.io_size} bytes by running {dd_job.cmd()} ....')
  207. run_job(dd_job)
  208. print(f'[Done] Create read file of {sweep_config.io_size} bytes by running {dd_job.cmd()} ....')
  209. return read_folder, read_file_name
  210. def remove_folder(folder):
  211. assert os.path.isdir(folder), f"Error: cannot remove {folder} - folder not found"
  212. shutil.rmtree(folder)
  213. def run_read_sweep(sweep_config, flush_cache_job, sync_job, cmd_lines):
  214. read_folder, read_file_name = create_read_file(sweep_config)
  215. read_option = f'--read_file {read_file_name}'
  216. read_cmd_lines = [[f'{read_option} {sweep_config.other_options}'] + cmd for cmd in cmd_lines]
  217. #dump_cmd_lines(read_cmd_lines)
  218. log_folder = os.path.join(sweep_config.log_dir, f'{READ_LOG_DIR}')
  219. os.makedirs(log_folder, exist_ok=True)
  220. perf_jobs = create_perf_jobs(io_op_desc=READ_OP_DESC, log_dir=log_folder, cmd_lines=read_cmd_lines)
  221. launch_sweep(sweep_jobs=perf_jobs, sync_job=sync_job, flush_cache_job=flush_cache_job)
  222. remove_folder(read_folder)
  223. def run_write_sweep(sweep_config, flush_cache_job, sync_job, cmd_lines):
  224. write_folder = os.path.join(sweep_config.nvme_dir, f'{WRITE_IO_DIR}')
  225. os.makedirs(write_folder, exist_ok=True)
  226. write_file_name = os.path.join(write_folder, f'random_{sweep_config.io_size}B.pt')
  227. write_option = f'--write_size {sweep_config.io_size} --write_file {write_file_name}'
  228. write_cmd_lines = [[f'{write_option} {sweep_config.other_options}'] + cmd for cmd in cmd_lines]
  229. #dump_cmd_lines(write_cmd_lines)
  230. log_folder = os.path.join(sweep_config.log_dir, f'{WRITE_LOG_DIR}')
  231. os.makedirs(log_folder, exist_ok=True)
  232. perf_jobs = create_perf_jobs(io_op_desc=WRITE_OP_DESC, log_dir=log_folder, cmd_lines=write_cmd_lines)
  233. launch_sweep(sweep_jobs=perf_jobs, sync_job=sync_job, flush_cache_job=flush_cache_job)
  234. remove_folder(write_folder)
  235. def main():
  236. print("Running performance sweep of deepspeed nvme library")
  237. if not async_io_setup():
  238. error_msg = """
  239. Failing because environment is not properly configured for deepspeed async i/o module.
  240. Possible fix: apt install libaio-dev.
  241. """
  242. print(error_msg)
  243. quit()
  244. args = parse_arguments()
  245. sweep_config = SweepConfig(args)
  246. cmd_lines = get_sweep_cmd_lines(sweep_config.search_space)
  247. if sweep_config.flush_cache:
  248. flush_cache_job = Job(cmd_line=['sudo', 'bash -c', "'echo 1 > /proc/sys/vm/drop_caches'"])
  249. else:
  250. flush_cache_job = None
  251. sync_job = Job(cmd_line=['sync'])
  252. if sweep_config.read:
  253. run_read_sweep(sweep_config, flush_cache_job, sync_job, cmd_lines)
  254. if sweep_config.write:
  255. run_write_sweep(sweep_config, flush_cache_job, sync_job, cmd_lines)
  256. if __name__ == "__main__":
  257. main()