autotuner.py 54 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194
  1. '''Copyright The Microsoft DeepSpeed Team'''
  2. import shutil
  3. import subprocess
  4. import time
  5. import datetime
  6. import math
  7. import hjson
  8. from ..runtime.config_utils import dict_raise_error_on_duplicate_keys
  9. from ..runtime.constants import *
  10. from ..runtime.zero.config import ZERO_OPTIMIZATION, ZeroStageEnum
  11. from ..utils import logger
  12. from .config import DeepSpeedAutotuningConfig
  13. from .constants import *
  14. from .scheduler import ResourceManager
  15. from .tuner import GridSearchTuner, RandomTuner, ModelBasedTuner
  16. from .utils import *
  17. from deepspeed.accelerator import get_accelerator
  18. try:
  19. from tabulate import tabulate
  20. except ImportError:
  21. tabulate = None
  22. try:
  23. import mlflow
  24. has_mlflow = True
  25. except Exception as e:
  26. has_mlflow = False
  27. ZERO_OPTIMIZATION_STAGE = "stage"
  28. OFFLOAD_OPTIMIZER = "offload_optimizer"
  29. OFFLOAD_PARAM = "offload_param"
  30. ZERO_OPTIMIZATION_STAGE_DEFAULT = ZeroStageEnum.disabled
  31. class Autotuner:
  32. """The DeepSpeed Autotuner automatically discovers the optimal DeepSpeed configuration that delivers good training speed. The Autotuner uses model information, system information, and heuristics to efficiently tune system knobs that affect compute and memory efficiencies, such as ZeRO optimization stages, micro-batch sizes, and many other ZeRO optimization configurations. It not only reduces the time and resources user spend on tuning, but also can discover configurations better than hand-tuned methods.
  33. Autotuning with DeepSpeed requires no code change from DeepSpeed users. Please refer to the README for usage details.
  34. """
  35. def __init__(self, args, active_resources):
  36. self.args = args
  37. self.selected_exp_dir = None
  38. assert tabulate is not None, "Missing required package `tabulate`, please install with `pip install deepspeed[autotuning]`."
  39. logger.debug(f"autotunning args={args}")
  40. self.user_config = self._get_user_config(args.user_args)
  41. assert self.user_config is not None, "DeepSpeed configuration is not provided"
  42. self.autotuning_config = DeepSpeedAutotuningConfig(self.user_config)
  43. if self.user_config[AUTOTUNING]:
  44. if AUTOTUNING_EXPS_DIR in self.user_config[AUTOTUNING].keys():
  45. del self.user_config[AUTOTUNING][AUTOTUNING_EXPS_DIR]
  46. if AUTOTUNING_RESULTS_DIR in self.user_config[AUTOTUNING].keys():
  47. del self.user_config[AUTOTUNING][AUTOTUNING_RESULTS_DIR]
  48. self.exps_dir = self.autotuning_config.exps_dir
  49. if self.autotuning_config.overwrite and os.path.exists(self.exps_dir):
  50. shutil.rmtree(self.exps_dir, ignore_errors=True)
  51. if not os.path.exists(self.exps_dir):
  52. try:
  53. os.makedirs(self.exps_dir, exist_ok=True)
  54. logger.info(f"Created autotuning experiments directory: {self.exps_dir}")
  55. except:
  56. logger.error(
  57. f"Failed to create {self.exps_dir}, please check `exps_dir` in the autotuning config file is accessible by all the nodes in the job."
  58. )
  59. exit(-1)
  60. self.results_dir = self.autotuning_config.results_dir
  61. if self.autotuning_config.overwrite and os.path.exists(self.results_dir):
  62. shutil.rmtree(self.results_dir, ignore_errors=True)
  63. if not os.path.exists(self.results_dir):
  64. try:
  65. os.makedirs(self.results_dir, exist_ok=True)
  66. logger.info(f"Created autotuning resutls directory: {self.exps_dir}")
  67. except:
  68. logger.error(
  69. f"Failed to create {self.results_dir}, please check `results_dir` in the autotuning config file is accessible by all the nodes in the job."
  70. )
  71. exit(-1)
  72. # set the active resource for the autotuner resource manager
  73. self.rm = self._get_resource_manager(active_resources)
  74. # get resource requirement for each autotuning experiment
  75. self.exp_num_nodes, self.exp_num_gpus = self._get_exp_resources(args)
  76. assert self.exp_num_gpus <= self.rm.num_gpus_per_node, "num_gpus in the autotuning configuration must not be less than the --num_gpus value in the train script if any"
  77. assert self.exp_num_nodes <= len(
  78. self.rm.nodes), "num_nodes in the autotuning configuration must not be less than the --num_nodes value in the train script if any"
  79. self.records = {}
  80. self.optimal_cmd = None
  81. self.optmal_ds_config = None
  82. self.mlflow_parent_id = None
  83. def print_tuning_results(self):
  84. """Print the autotuning results in tabular format.
  85. """
  86. best_space_records = self.get_best_space_records()
  87. tab = []
  88. if best_space_records:
  89. for key, val in best_space_records.items():
  90. if not val:
  91. continue
  92. row = []
  93. row.append(key)
  94. num_exps = 0
  95. if key == GLOBAL_TUNING_SPACE:
  96. cnt = 0
  97. for k, v in best_space_records.items():
  98. if k != GLOBAL_TUNING_SPACE:
  99. cnt += v[2]
  100. num_exps = cnt
  101. else:
  102. num_exps = val[2]
  103. row.append(num_exps)
  104. row.append(val[1])
  105. row.append(val[0]['name'])
  106. tab.append(row)
  107. summary = tabulate(tab,
  108. headers=[
  109. "tuning_space",
  110. "num_experiments",
  111. "best_metric_val",
  112. "best_exp_name"
  113. ],
  114. tablefmt="pipe")
  115. print(summary)
  116. with open(os.path.join(self.results_dir,
  117. 'summary.txt'),
  118. 'w',
  119. buffering=BUFSIZE) as fd:
  120. fd.write(summary)
  121. fd.flush()
  122. os.fsync(fd)
  123. if GLOBAL_TUNING_SPACE in best_space_records:
  124. best_exp, best_metric_val, total_num_exps = best_space_records[GLOBAL_TUNING_SPACE]
  125. if best_exp:
  126. logger.info(
  127. f"{best_exp['name']} is the optimal setup after tuning. The exp result is at {best_exp['result_dir']}."
  128. )
  129. else:
  130. logger.info(
  131. f"No optimal setup is found. Please check that experiments were run successfully."
  132. )
  133. tuning_duration = datetime.timedelta(seconds=(time.time() - self.start_time))
  134. logger.info(f"Tuning completed in {tuning_duration}")
  135. with open(os.path.join(self.results_dir, 'summary.txt'), 'a') as f:
  136. f.write(
  137. f"\n\nTuning completed in {tuning_duration}. Total number of experiments: {self.rm.experiment_count - 1}."
  138. )
  139. f.flush()
  140. def _get_user_config(self, user_args):
  141. """Get DeepSpeed configuration from the user arguments passed to the launcher.
  142. Args:
  143. user_args ([list]): user arguments passed to the DeepSpeed launcher
  144. Returns:
  145. [dict]: DeepSpeed configuration dictionary
  146. """
  147. user_config_file = None
  148. if "--deepspeed_config" in user_args:
  149. idx = user_args.index("--deepspeed_config")
  150. assert ".json" in user_args[idx +
  151. 1], "DeepSpeed --deepspeed_config requires a json file to specify the configuration"
  152. user_config_file = user_args[idx + 1]
  153. elif "--deepspeed" in user_args:
  154. idx = user_args.index("--deepspeed")
  155. if ".json" in user_args[idx + 1]:
  156. user_config_file = user_args[idx + 1]
  157. logger.debug(f"user_config_file = {user_config_file}")
  158. if user_config_file is not None:
  159. assert os.path.isfile(
  160. user_config_file
  161. ), "DeepSpeed configuration file: {} is not an existing file".format(
  162. user_config_file
  163. )
  164. if os.path.exists(user_config_file):
  165. return json.load(open(user_config_file,
  166. "r"),
  167. object_pairs_hook=dict_raise_error_on_duplicate_keys)
  168. return None
  169. def _get_resource_manager(self, active_resources):
  170. """Initialize and return a resource manager
  171. Args:
  172. active_resources ([dict]): A dictionary of hostname and its slots (GPUs), e.g. {"worker-0": "0,1,2,3,4,5,6,7,8"}
  173. Raises:
  174. RuntimeError: raises the error if no GPU is available
  175. Returns:
  176. [ResourceManager]: A resource manager that schedules and runs autotuning experiments.
  177. """
  178. logger.info(f"active_resources = {active_resources}")
  179. hosts = []
  180. ngpus_per_node = 100
  181. for hostname, slots in active_resources.items():
  182. hosts.append(hostname)
  183. ngpus_per_node = min(len(slots), ngpus_per_node)
  184. assert ngpus_per_node > 0, "no gpu is available"
  185. return ResourceManager(args=self.args,
  186. hosts=hosts,
  187. num_gpus_per_node=ngpus_per_node,
  188. results_dir=self.results_dir,
  189. exps_dir=self.exps_dir,
  190. arg_mappings=self.autotuning_config.arg_mappings)
  191. def _get_exp_resources(self, args):
  192. """Get resource requirement for each autotuning experiment
  193. Args:
  194. args (dict): user args
  195. Returns:
  196. num_nodes, num_gpus: the number of gpus and number of nodes used in the autotuning experiments
  197. """
  198. if args.num_nodes > 0:
  199. num_nodes = args.num_nodes
  200. else:
  201. num_nodes = len(self.rm.nodes)
  202. if args.num_gpus > 0:
  203. num_gpus = args.num_gpus
  204. else:
  205. num_gpus = self.rm.num_gpus_per_node
  206. return num_nodes, num_gpus
  207. def metric(self):
  208. return self.autotuning_config.metric
  209. def fast_enabled(self):
  210. return self.autotuning_config.fast
  211. def max_train_batch_size(self):
  212. return self.autotuning_config.max_train_batch_size
  213. def mp_size(self):
  214. return self.autotuning_config.mp_size
  215. def max_train_micro_batch_size_per_gpu(self):
  216. if self.max_train_batch_size() and self.max_train_batch_size(
  217. ) > 0: # if the user specifies a max_train_batch_size
  218. max_train_micro_batch_size = self.max_train_batch_size() * self.mp_size(
  219. ) // (self.exp_num_gpus * self.exp_num_nodes
  220. ) # gradient accumulation steps >=1
  221. return min(self.autotuning_config.max_train_micro_batch_size_per_gpu,
  222. max_train_micro_batch_size)
  223. else:
  224. return self.autotuning_config.max_train_micro_batch_size_per_gpu
  225. def min_train_micro_batch_size_per_gpu(self):
  226. return self.autotuning_config.min_train_micro_batch_size_per_gpu
  227. def num_tuning_micro_batch_sizes(self):
  228. return self.autotuning_config.num_tuning_micro_batch_sizes
  229. def fp16_enabled(self):
  230. if FP16 in self.user_config.keys():
  231. return self.user_config[FP16].get(FP16_ENABLED, FP16_ENABLED_DEFAULT)
  232. else:
  233. return False
  234. def get_gpu_memory_info(self):
  235. return get_accelerator().total_memory()
  236. def get_activation_memory_per_gpu(self):
  237. if self.model_info and "activation_mem_per_gpu" in self.model_info:
  238. return self.model_info["activation_mem_per_gpu"]
  239. def get_instantiation_memory_required_per_gpu(self, zero_stage):
  240. num_params = self.get_model_num_params()
  241. total_gpus = self.exp_num_nodes * self.exp_num_gpus
  242. fp16_enabled = self.fp16_enabled()
  243. if not num_params:
  244. return 0
  245. # assume the model uses Adam optimizer
  246. # ZeroStageEnum.disabled:
  247. params_mem = num_params * (2 if fp16_enabled else 4)
  248. gradients_mem = num_params * (2 if fp16_enabled else 4)
  249. optimizer_mem = num_params * (16 if fp16_enabled else 8)
  250. if zero_stage >= ZeroStageEnum.optimizer_states:
  251. optimizer_mem = optimizer_mem / total_gpus
  252. if zero_stage >= ZeroStageEnum.gradients:
  253. gradients_mem = gradients_mem / total_gpus
  254. if zero_stage >= ZeroStageEnum.weights:
  255. params_mem = params_mem / total_gpus
  256. mem_per_gpu = (params_mem + gradients_mem + optimizer_mem) / self.mp_size()
  257. return mem_per_gpu
  258. def _generate_experiments(self, tuning_space, max_train_batch_size_per_gpu):
  259. """Generates a list of autotuning experiments given a tuning_space.
  260. The corresponding parameter values are replaced by user-defined values in the DeepSpeed configuration file.
  261. Args:
  262. tuning_space ([dict]): A DeepSpeed configuration dictionary where a value can be a list (called a tuning parameter). For example,
  263. {
  264. "zero_optimization": {
  265. "stage": 1,
  266. "reduce_bucket_size": [5e7,
  267. 5e8,
  268. 1e9],
  269. "allgather_bucket_size": [5e7,
  270. 5e8,
  271. 1e9],
  272. }
  273. }
  274. reduce_bucket_size and allgather_bucket_size are the tuning parameters in this tuning space.
  275. Returns:
  276. [list]: a list of experiments generated by taking combinations of values of the tuning space. The above tuning space generates 3*3 = 9 experiments if the user DeepSpeed configuration file does not overwrite the two tuning parameters or define more tuning parameters.
  277. """
  278. exps = []
  279. # each zero stage uses a different template configuration file
  280. config_zero = tuning_space.get(ZERO_OPTIMIZATION, {})
  281. stage = config_zero.get(ZERO_OPTIMIZATION_STAGE, ZERO_OPTIMIZATION_STAGE_DEFAULT)
  282. template_config = {}
  283. if stage == 0:
  284. template_path = DEFAULT_TEMPLATE_PATH_ZERO_0
  285. template_config = hjson.load(open(template_path, 'r'))
  286. prefix = "z0_"
  287. elif stage == 1:
  288. template_path = DEFAULT_TEMPLATE_PATH_ZERO_1
  289. template_config = hjson.load(open(template_path, 'r'))
  290. prefix = "z1_"
  291. elif stage == 2:
  292. template_path = DEFAULT_TEMPLATE_PATH_ZERO_2
  293. template_config = hjson.load(open(template_path, 'r'))
  294. prefix = "z2_"
  295. elif stage == 3:
  296. template_path = DEFAULT_TEMPLATE_PATH_ZERO_3
  297. template_config = hjson.load(open(template_path, 'r'))
  298. model_info = self.model_info
  299. if model_info and "hidden_size" in model_info:
  300. hs = model_info["hidden_size"]
  301. template_config[ZERO_OPTIMIZATION]['reduce_bucket_size'] = hs * hs
  302. template_config[ZERO_OPTIMIZATION][
  303. 'stage3_prefetch_bucket_size'] = 0.9 * hs * hs
  304. template_config[ZERO_OPTIMIZATION][
  305. 'stage3_param_persistence_threshold'] = 10 * hs
  306. prefix = "z3_"
  307. else:
  308. return exps
  309. # replace the corresponding parameter values if the user specifies them in the DeepSpeed configuration file
  310. replace_dict(tuning_space,
  311. self.user_config,
  312. [ZERO_OPTIMIZATION,
  313. TRAIN_MICRO_BATCH_SIZE_PER_GPU])
  314. logger.debug(f"tuning_space = {json.dumps(tuning_space)}")
  315. all_configs = get_all_configs(tuning_space, ignore_keys=["optimizer"])
  316. tuning_keys = get_tuning_keys(tuning_space)
  317. logger.debug(f"tuning_keys = {tuning_keys}")
  318. logger.debug(f"before pruning total configs = {len(all_configs)}")
  319. pruned_list = prune_configs(all_configs)
  320. logger.debug(f"after pruning total configs = {len(pruned_list)}")
  321. for config in pruned_list:
  322. exp_config = copy.deepcopy(template_config)
  323. # fill the template with the expr config
  324. replace_dict(exp_config, config)
  325. # if the config does not use offloading, remove the offloading section
  326. config_zero = config.get(ZERO_OPTIMIZATION, None)
  327. if config_zero:
  328. if OFFLOAD_OPTIMIZER not in config_zero and OFFLOAD_OPTIMIZER in exp_config[
  329. ZERO_OPTIMIZATION]:
  330. del exp_config[ZERO_OPTIMIZATION][OFFLOAD_OPTIMIZER]
  331. if OFFLOAD_PARAM not in config_zero and OFFLOAD_PARAM in exp_config[
  332. ZERO_OPTIMIZATION]:
  333. del exp_config[ZERO_OPTIMIZATION][OFFLOAD_PARAM]
  334. # set gradient accumulation steps according to max_train_batch_size_per_gpu
  335. mbs = exp_config[TRAIN_MICRO_BATCH_SIZE_PER_GPU]
  336. gas = max_train_batch_size_per_gpu // mbs
  337. exp_config[GRADIENT_ACCUMULATION_STEPS] = gas
  338. exp_config[TRAIN_BATCH_SIZE] = mbs * gas * \
  339. self.exp_num_gpus * self.exp_num_nodes // self.mp_size()
  340. exp = {}
  341. # generate the expr name
  342. exp_name = canonical_name(exp_config, tuning_keys, prefix)
  343. exp['name'] = exp_name
  344. exp[DS_CONFIG] = exp_config
  345. exp['num_gpus'] = self.exp_num_gpus
  346. exp['num_nodes'] = self.exp_num_nodes
  347. exps.append(exp)
  348. return exps
  349. def tune(self):
  350. """ Tunes Zero stages, micro batch size per GPU, and other Zero configurations. Performance metrics of different tuning spaces are recorded in self.records.
  351. """
  352. if has_mlflow:
  353. self.mlflow_parent_id = os.environ['MLFLOW_RUN_ID']
  354. mlflow.start_run(run_id=self.mlflow_parent_id)
  355. self.start_time = time.time()
  356. if self.fast_enabled():
  357. logger.info(f"Fast mode is enabled. Tuning micro batch size only.")
  358. # model info profile run with DEFAULT_MIN_MEM_CONFIG
  359. model_info = self.model_info_profile_run()
  360. if model_info:
  361. self.model_info = model_info
  362. else:
  363. return
  364. logger.info(
  365. f"The model has {number_to_string(self.get_model_num_params())} parameters.")
  366. self.gpu_mem = self.get_gpu_memory_info()
  367. logger.info(
  368. f"Memory per GPU in the system is {memory_to_string(self.gpu_mem, postfix='B')}."
  369. )
  370. self.activation_mem = self.get_activation_memory_per_gpu()
  371. logger.info(
  372. f"The model requires at least {memory_to_string(self.activation_mem, postfix='B')} activation memory for micro batch size 1."
  373. )
  374. #TODO: FIX THIS
  375. stage = self.user_config.get(ZERO_OPTIMIZATION,
  376. {}).get(ZERO_OPTIMIZATION_STAGE,
  377. "all")
  378. stage = "all"
  379. user_zero_stages = [stage] if not isinstance(stage, list) else stage
  380. logger.info(f"User-defined zero stages are {stage}.")
  381. mbs = 0
  382. max_mbs = 0
  383. metric_val = 0
  384. required_gpu_mem = self.get_instantiation_memory_required_per_gpu(
  385. ZeroStageEnum.disabled) + self.activation_mem
  386. if self.gpu_mem > required_gpu_mem:
  387. if "all" in user_zero_stages or ZeroStageEnum.disabled in user_zero_stages:
  388. logger.info(
  389. f"The model might be runable with ZERO 0 (which requires at least {memory_to_string(required_gpu_mem, postfix='B')} memory with mbs = 1), adding DEFAULT_TUNING_SPACE_ZERO_0 to the global tuning space"
  390. )
  391. next_max_mbs, next_mbs, next_metric_val = self.tune_space(
  392. DEFAULT_TUNING_SPACE_ZERO_0)
  393. if next_mbs > mbs:
  394. mbs = next_mbs
  395. max_mbs = next_max_mbs
  396. metric_val = next_metric_val
  397. if has_mlflow:
  398. mlflow.log_metric(f"z0{self.metric()}", next_metric_val)
  399. else:
  400. logger.info(
  401. f"The model is not runable with ZERO stage {ZeroStageEnum.disabled} (which requires at least {memory_to_string(required_gpu_mem, postfix='B')} memory with mbs = 1)"
  402. )
  403. required_gpu_mem = self.get_instantiation_memory_required_per_gpu(
  404. ZeroStageEnum.optimizer_states) + self.activation_mem
  405. if self.gpu_mem > required_gpu_mem:
  406. if "all" in user_zero_stages or ZeroStageEnum.optimizer_states in user_zero_stages:
  407. logger.info(
  408. f"The model might be runable with ZERO 1 (which requires at least {memory_to_string(required_gpu_mem, postfix='B')} memory), adding DEFAULT_TUNING_SPACE_ZERO_1 to the global tuning space"
  409. )
  410. next_max_mbs, next_mbs, next_metric_val = self.tune_space(
  411. DEFAULT_TUNING_SPACE_ZERO_1, prev_max_mbs = max_mbs, prev_best_mbs=mbs, prev_best_metric_val=metric_val)
  412. if next_mbs > mbs:
  413. mbs = next_mbs
  414. max_mbs = next_max_mbs
  415. metric_val = next_metric_val
  416. if has_mlflow:
  417. mlflow.log_metric(f"z1{self.metric()}", next_metric_val)
  418. else:
  419. logger.info(
  420. f"The model is not runable with ZERO stage {ZeroStageEnum.optimizer_states} (which requires at least {memory_to_string(required_gpu_mem, postfix='B')} memory with mbs = 1)"
  421. )
  422. required_gpu_mem = self.get_instantiation_memory_required_per_gpu(
  423. ZeroStageEnum.gradients) + self.activation_mem
  424. if self.gpu_mem > required_gpu_mem:
  425. if "all" in user_zero_stages or ZeroStageEnum.gradients in user_zero_stages:
  426. logger.info(
  427. f"The model might be runable with ZERO 2 (which requires at least {memory_to_string(required_gpu_mem, postfix='B')} memory), adding DEFAULT_TUNING_SPACE_ZERO_2 to the global tuning space"
  428. )
  429. next_max_mbs, next_mbs, next_metric_val = self.tune_space(
  430. DEFAULT_TUNING_SPACE_ZERO_2, prev_max_mbs = max_mbs, prev_best_mbs=mbs, prev_best_metric_val=metric_val)
  431. if next_mbs > mbs:
  432. mbs = next_mbs
  433. max_mbs = next_max_mbs
  434. metric_val = next_metric_val
  435. if has_mlflow:
  436. mlflow.log_metric(f"z2{self.metric()}", next_metric_val)
  437. else:
  438. logger.info(
  439. f"The model is not runable with ZERO stage {ZeroStageEnum.gradients} (which requires at least {memory_to_string(required_gpu_mem, postfix='B')} memory with mbs = 1)"
  440. )
  441. required_gpu_mem = self.get_instantiation_memory_required_per_gpu(
  442. ZeroStageEnum.weights) + self.activation_mem
  443. if self.gpu_mem > required_gpu_mem:
  444. if "all" in user_zero_stages or ZeroStageEnum.weights in user_zero_stages:
  445. logger.info(
  446. f"The model might be runable with ZERO 3 (which requires at least {memory_to_string(required_gpu_mem, postfix='B')} memory), adding DEFAULT_TUNING_SPACE_ZERO_3 to the global tuning space"
  447. )
  448. _, _, next_metric_val = self.tune_space(
  449. DEFAULT_TUNING_SPACE_ZERO_3, prev_max_mbs = max_mbs, prev_best_mbs=mbs, prev_best_metric_val=metric_val)
  450. if has_mlflow:
  451. mlflow.log_metric(f"z3{self.metric()}", next_metric_val)
  452. else:
  453. logger.info(
  454. f"The model has {self.get_model_num_params()} parameters and requires at least {memory_to_string(required_gpu_mem, postfix='B')} memory per GPU with DeepSpeed Zero stage {ZeroStageEnum.weights} optimization. Memory per GPU in system is {memory_to_string(self.gpu_mem)}. No tuning is performed."
  455. )
  456. return
  457. if has_mlflow:
  458. mlflow.end_run()
  459. def tune_space(self,
  460. tuning_space,
  461. prev_max_mbs=0,
  462. prev_best_mbs=0,
  463. prev_best_metric_val=0):
  464. config_zero = tuning_space.get(ZERO_OPTIMIZATION, {})
  465. stage = config_zero.get(ZERO_OPTIMIZATION_STAGE, None)
  466. tuning_space_name = TUNING_MICRO_BATCH_SIZE_PREFIX + str(stage)
  467. tuning_micro_batch_sizes = []
  468. max_train_batch_size_per_gpu = 0
  469. tuning_micro_batch_sizes_overwritten = False
  470. # calculate max micro batch size using gpu memory, model instantiation memory and activation memory
  471. # calculated_max_micro_batch_size = (memory_per_gpu - instantiation_memory) // activation_memory_micro_batch_size_1
  472. calculated_max_micro_batch_size = int(
  473. self.gpu_mem -
  474. self.get_instantiation_memory_required_per_gpu(stage)) // self.activation_mem
  475. logger.info(
  476. f"Start tuning for space {tuning_space_name}, calculated_max_micro_batch_size = {calculated_max_micro_batch_size}"
  477. )
  478. if calculated_max_micro_batch_size < prev_max_mbs:
  479. logger.info(
  480. f"No need to tune Zero stage {stage}. End tuning for space {tuning_space_name}"
  481. )
  482. return 0, 0, 0
  483. if TRAIN_MICRO_BATCH_SIZE_PER_GPU in self.user_config and isinstance(
  484. self.user_config[TRAIN_MICRO_BATCH_SIZE_PER_GPU],
  485. list):
  486. # user-specified micro batch size per gpu is a list which overwrites the default tuning behavior
  487. tuning_micro_batch_sizes = [
  488. s for s in self.user_config[TRAIN_MICRO_BATCH_SIZE_PER_GPU]
  489. if isinstance(s,
  490. int)
  491. ]
  492. gas = self.get_gas_from_user_config()
  493. min_micro_batch_size = min(tuning_micro_batch_sizes)
  494. max_micro_batch_size = max(tuning_micro_batch_sizes)
  495. max_train_batch_size_per_gpu = max_micro_batch_size * gas
  496. tuning_micro_batch_sizes_overwritten = True
  497. else:
  498. # auto-detects the list of micro batch sizes to tune
  499. min_micro_batch_size, max_micro_batch_size = self.get_min_max_micro_batch_size(
  500. stage, prev_max_mbs, calculated_max_micro_batch_size)
  501. if max_micro_batch_size < prev_max_mbs:
  502. logger.info(
  503. f"No need to tune Zero stage {stage}. End tuning for space {tuning_space_name}"
  504. )
  505. return 0, 0, 0
  506. tuning_micro_batch_sizes, max_train_batch_size_per_gpu = self.get_tuning_micro_batch_size_list(
  507. min_micro_batch_size,
  508. max_micro_batch_size,
  509. num_tuning_micro_batch_sizes=self.num_tuning_micro_batch_sizes())
  510. logger.info(
  511. f"tuning_micro_batch_sizes = {tuning_micro_batch_sizes}, max_train_batch_size_per_gpu = {max_train_batch_size_per_gpu}"
  512. )
  513. # return if the tuning_micro_batch_sizes list is empty
  514. if not tuning_micro_batch_sizes:
  515. logger.info(f"End tuning for space {tuning_space_name}")
  516. return 0, 0, 0
  517. # tune micro batch sizes and gradient accumulation steps given max_train_batch_size_per_gpu
  518. tuning_micro_batch_sizes = self.run_tuning_micro_batch_sizes(
  519. tuning_micro_batch_sizes,
  520. max_train_batch_size_per_gpu,
  521. min_micro_batch_size,
  522. stage,
  523. tuning_micro_batch_sizes_overwritten)
  524. fast_best_record = self.get_best_space_record(tuning_space_name)
  525. fast_best_metric_val = fast_best_record[1] if fast_best_record else 0
  526. fast_best_mbs = fast_best_record[0][DS_CONFIG][
  527. TRAIN_MICRO_BATCH_SIZE_PER_GPU] if fast_best_record else 0
  528. logger.info(
  529. f"fast_best_mbs = {fast_best_mbs}, name = {fast_best_record[0]['name']}")
  530. if self.fast_enabled() or stage == 0:
  531. logger.info(f"End tuning for space: {tuning_space_name}")
  532. return max_micro_batch_size, fast_best_mbs, fast_best_metric_val
  533. # if the best metric or the micro batch size for that best metric in the current Zero stage after tuning micro batch size is less than the corresponding value in the previous Zero stage, return, do not tune other Zero configuration parameters
  534. if stage > 0:
  535. if fast_best_mbs <= prev_best_mbs or fast_best_metric_val < prev_best_metric_val:
  536. logger.info(
  537. f"End tuning for space: {tuning_space_name}. No need to tune other Zero configuration parameters."
  538. )
  539. return max_micro_batch_size, fast_best_mbs, fast_best_metric_val
  540. tuning_space[TRAIN_MICRO_BATCH_SIZE_PER_GPU] = tuning_micro_batch_sizes
  541. tuning_space_name = canonical_name(tuning_space,
  542. tuning_keys=get_tuning_keys(tuning_space),
  543. prefix="z" + str(stage) + "_",
  544. omit_val=True)
  545. logger.info(f'Tuning space is {tuning_space}')
  546. logger.info(f'Tuning space name is {tuning_space_name}')
  547. exps = self._generate_experiments(tuning_space, max_train_batch_size_per_gpu)
  548. logger.info(f'Tuner type is {self.autotuning_config.tuner_type}')
  549. if self.autotuning_config.tuner_type == AUTOTUNING_TUNER_MODELBASED:
  550. t = ModelBasedTuner(exps, self.rm, self.metric(), tuning_space)
  551. elif self.autotuning_config.tuner_type == AUTOTUNING_TUNER_RANDOM:
  552. t = RandomTuner(exps, self.rm, self.metric())
  553. else:
  554. t = GridSearchTuner(exps, self.rm, self.metric())
  555. sample_size = len(self.rm.nodes) * self.rm.num_gpus_per_node // (
  556. self.exp_num_gpus * self.exp_num_nodes)
  557. num_exps = t.tune(sample_size=sample_size,
  558. n_trials=self.autotuning_config.tuner_num_trials,
  559. early_stopping=self.autotuning_config.tuner_early_stopping)
  560. exp = t.best_exp
  561. metric_val = t.best_metric_val
  562. if exp:
  563. self.update_records(tuning_space_name, exp, metric_val, num_exps)
  564. full_best_record = self.get_best_space_record(tuning_space_name)
  565. full_best_metric_val = full_best_record[1] if full_best_record else -1
  566. if full_best_metric_val > fast_best_metric_val:
  567. best_metric_val = full_best_metric_val
  568. best_mbs = full_best_record[0][DS_CONFIG][
  569. TRAIN_MICRO_BATCH_SIZE_PER_GPU] if full_best_record else -1
  570. else:
  571. best_metric_val = fast_best_metric_val
  572. best_mbs = fast_best_mbs
  573. logger.info(f"End tuning for space: {tuning_space_name}")
  574. return max_micro_batch_size, best_mbs, best_metric_val
  575. def get_plauteu_mbs(self, tuning_space_name):
  576. if tuning_space_name not in self.records:
  577. return 0
  578. space_records = self.records[tuning_space_name]
  579. sorted_space_records = sorted(
  580. space_records,
  581. key=lambda x: x[0][DS_CONFIG][TRAIN_MICRO_BATCH_SIZE_PER_GPU])
  582. prev_metric_val = None
  583. prev_micro_batch_size = 0
  584. for (exp, metric_val, _) in sorted_space_records:
  585. if prev_metric_val:
  586. if metric_val < prev_metric_val:
  587. break
  588. if (metric_val >= prev_metric_val
  589. and (metric_val - prev_metric_val) / prev_metric_val <
  590. METRIC_PERCENT_DIFF_CONST):
  591. break
  592. prev_metric_val = metric_val
  593. prev_micro_batch_size = exp[DS_CONFIG][TRAIN_MICRO_BATCH_SIZE_PER_GPU]
  594. plateau_mbs = prev_micro_batch_size
  595. return plateau_mbs
  596. def get_model_num_params(self):
  597. if self.model_info and "num_params" in self.model_info:
  598. return self.model_info["num_params"]
  599. def model_info_profile_run(self):
  600. """Does a model information profling experiment that collects the number of model parameters and activation memory.\
  601. The experiment produces a "profile_model_info" folder under self.results_dir.
  602. Returns:
  603. [dict]: a model information dictionary, e.g., {"num_params": 335144976, "trainable_num_params": 335144976, "activation_mem_per_gpu": 324358144, "rank": 0}
  604. """
  605. logger.info("Starting model info profile run.")
  606. model_info = self.autotuning_config.model_info
  607. if model_info and MODEL_INFO_NUM_PARAMS in model_info:
  608. return model_info
  609. ds_config = copy.deepcopy(self.user_config)
  610. replace_dict(ds_config, DEFAULT_MIN_MEM_CONFIG)
  611. model_info_path = os.path.join(self.results_dir,
  612. "profile_model_info",
  613. "model_info.json")
  614. ds_config[AUTOTUNING] = {
  615. "enabled": True,
  616. "model_info_path": model_info_path,
  617. "model_info": {
  618. "profile": True
  619. }
  620. }
  621. exp_config = {}
  622. exp_name = "profile_model_info"
  623. exp_config['name'] = exp_name
  624. exp_config[DS_CONFIG] = ds_config
  625. exp_config['num_gpus'] = self.exp_num_gpus
  626. exp_config['num_nodes'] = self.exp_num_nodes
  627. exp_path = os.path.join(self.exps_dir, f'{exp_name}.json')
  628. with open(exp_path, 'w', buffering=BUFSIZE) as fd:
  629. json.dump(exp_config, fd)
  630. fd.flush()
  631. os.fsync(fd)
  632. self.rm.schedule_experiments([exp_path])
  633. self.rm.run()
  634. for exp_id, (exp_json, err) in self.rm.finished_experiments.items():
  635. self.rm.clear()
  636. if err:
  637. logger.error(
  638. f"The model is not runnable with DeepSpeed with error = {err}")
  639. return None
  640. if os.path.exists(model_info_path):
  641. with open(model_info_path, 'r') as f:
  642. model_info = hjson.load(f)
  643. return model_info
  644. def update_records(self, space_name, exp, metric_val, num_exps):
  645. if space_name not in self.records:
  646. self.records[space_name] = [(exp, metric_val, num_exps)]
  647. else:
  648. self.records[space_name].append((exp, metric_val, num_exps))
  649. def get_best_space_record(self, space_name):
  650. if space_name not in self.records:
  651. return None
  652. space_records = self.records[space_name]
  653. best_space_record = None
  654. space_num_exps = 0
  655. for (exp, metric_val, num_exps) in space_records:
  656. space_num_exps += num_exps
  657. if best_space_record is None or metric_val > best_space_record[1]:
  658. best_space_record = (exp, metric_val)
  659. if best_space_record:
  660. best_space_record = best_space_record + (space_num_exps, )
  661. return best_space_record
  662. def get_best_space_records(self):
  663. best_space_records = {}
  664. global_best_record = None
  665. for space_name, space_records in self.records.items():
  666. best_space_record = self.get_best_space_record(space_name)
  667. if best_space_record:
  668. best_space_records[space_name] = best_space_record
  669. if not global_best_record or best_space_record[1] > global_best_record[1]:
  670. global_best_record = best_space_record
  671. if global_best_record:
  672. best_space_records[GLOBAL_TUNING_SPACE] = global_best_record
  673. return best_space_records
  674. def run_tuning_micro_batch_sizes(self,
  675. tuning_micro_batch_sizes,
  676. max_train_batch_size_per_gpu,
  677. min_micro_batch_size,
  678. stage,
  679. tuning_micro_batch_sizes_overwritten):
  680. assert tuning_micro_batch_sizes, "the tuning micro batch size list is empty"
  681. tuning_micro_batch_sizes.sort()
  682. max_micro_batch_size = tuning_micro_batch_sizes[-1]
  683. max_micro_batch_size_metric_val = 0
  684. ds_config = get_first_config(self.user_config)
  685. ds_config[ZERO_OPTIMIZATION] = {ZERO_OPTIMIZATION_STAGE: stage}
  686. tuning_space_name = TUNING_MICRO_BATCH_SIZE_PREFIX + str(stage)
  687. exp_paths = []
  688. for mbs in tuning_micro_batch_sizes:
  689. ds_config[TRAIN_MICRO_BATCH_SIZE_PER_GPU] = mbs
  690. gas = max_train_batch_size_per_gpu // mbs
  691. ds_config[GRADIENT_ACCUMULATION_STEPS] = gas
  692. ds_config[TRAIN_BATCH_SIZE] = mbs * gas * \
  693. self.exp_num_gpus * self.exp_num_nodes // self.mp_size()
  694. exp_name = tuning_space_name + "_gas" + str(gas) + "_tmbspg" + str(mbs)
  695. exp_config = {}
  696. exp_config['name'] = exp_name
  697. exp_config[DS_CONFIG] = ds_config
  698. exp_config['num_gpus'] = self.exp_num_gpus
  699. exp_config['num_nodes'] = self.exp_num_nodes
  700. exp_path = os.path.join(self.exps_dir, f'{exp_name}.json')
  701. with open(exp_path, 'w', buffering=BUFSIZE) as fd:
  702. json.dump(exp_config, fd)
  703. fd.flush()
  704. os.fsync(fd)
  705. exp_paths.append(exp_path)
  706. self.rm.schedule_experiments(exp_paths)
  707. self.rm.run()
  708. for exp_id, (exp, err) in self.rm.finished_experiments.items():
  709. if exp:
  710. metric_file = exp[DS_CONFIG][AUTOTUNING][AUTOTUNING_METRIC_PATH]
  711. if os.path.exists(metric_file):
  712. with open(metric_file, 'r') as f:
  713. results = hjson.load(f)
  714. metric_val = results[self.metric()]
  715. self.update_records(tuning_space_name, exp, metric_val, 1)
  716. if max_micro_batch_size == exp[DS_CONFIG][
  717. TRAIN_MICRO_BATCH_SIZE_PER_GPU]:
  718. max_micro_batch_size_metric_val = metric_val
  719. if has_mlflow:
  720. os.environ.pop('MLFLOW_RUN_ID')
  721. mlflow.start_run(nested=True, run_name=exp['name'])
  722. for metric in results:
  723. mlflow.log_metric(metric, results[metric])
  724. mlflow.end_run()
  725. os.environ['MLFLOW_RUN_ID'] = self.mlflow_parent_id
  726. else:
  727. self.update_records(tuning_space_name, exp, 0, 1)
  728. else:
  729. mbs = exp[DS_CONFIG][TRAIN_MICRO_BATCH_SIZE_PER_GPU]
  730. logger.info(f"micro batch size = {mbs} was not run successfully")
  731. self.rm.clear()
  732. if tuning_micro_batch_sizes_overwritten:
  733. return tuning_micro_batch_sizes
  734. # in a auto-detected tuning_micro_batch_sizs list, max_micro_batch_size might not be performant as the memory consumption is close to max
  735. # try smaller values while gas stays the same
  736. # if finding a more performant mbs value, use it to replace max_micro_batch_size in the list
  737. min_micro_batch_size_with_same_gas = (
  738. tuning_micro_batch_sizes[-2] +
  739. 1) if len(tuning_micro_batch_sizes) > 1 else min_micro_batch_size
  740. prev_best_metric_val = max_micro_batch_size_metric_val
  741. prev_best_mbs = max_micro_batch_size
  742. stride = (max_micro_batch_size - min_micro_batch_size_with_same_gas) // 3
  743. if stride == 0:
  744. stride = 1
  745. for mbs in reversed(
  746. range(min_micro_batch_size_with_same_gas,
  747. max_micro_batch_size,
  748. stride)):
  749. ds_config[TRAIN_MICRO_BATCH_SIZE_PER_GPU] = mbs
  750. gas = max_train_batch_size_per_gpu // mbs
  751. ds_config[GRADIENT_ACCUMULATION_STEPS] = gas
  752. ds_config[TRAIN_BATCH_SIZE] = mbs * gas * \
  753. self.exp_num_gpus * self.exp_num_nodes // self.mp_size()
  754. exp_name = tuning_space_name + "_gas" + str(gas) + "_tmbspg" + str(mbs)
  755. exp, metric_val = self.run_ds_config(ds_config, exp_name)
  756. if metric_val:
  757. with open(metric_file, 'r') as f:
  758. results = hjson.load(f)
  759. metric_val = results[self.metric()]
  760. if has_mlflow:
  761. os.environ.pop('MLFLOW_RUN_ID')
  762. mlflow.start_run(nested=True, run_name=exp_name)
  763. for metric in results:
  764. mlflow.log_metric(metric, results[metric])
  765. mlflow.end_run()
  766. os.environ['MLFLOW_RUN_ID'] = self.mlflow_parent_id
  767. self.update_records(tuning_space_name, exp, metric_val, 1)
  768. if metric_val > prev_best_metric_val * (1 + METRIC_PERCENT_DIFF_CONST):
  769. prev_best_metric_val = metric_val
  770. prev_best_mbs = mbs
  771. else:
  772. break
  773. else:
  774. self.update_records(tuning_space_name, exp, 0, 1)
  775. break
  776. if prev_best_mbs != max_micro_batch_size:
  777. tuning_micro_batch_sizes[-1] = prev_best_mbs
  778. return tuning_micro_batch_sizes
  779. def get_min_max_micro_batch_size(self,
  780. stage,
  781. min_micro_batch_size,
  782. calculated_max_micro_batch_size):
  783. # get min and max micro batch size with gradient accumulation steps = 1
  784. if min_micro_batch_size > calculated_max_micro_batch_size:
  785. return -1, -1
  786. used_micro_batch_sizes = []
  787. tuning_space_name = TUNING_MICRO_BATCH_SIZE_PREFIX + str(stage)
  788. ds_config = get_first_config(self.user_config)
  789. ds_config[ZERO_OPTIMIZATION] = {ZERO_OPTIMIZATION_STAGE: stage}
  790. gas = self.get_gas_from_user_config()
  791. ds_config[GRADIENT_ACCUMULATION_STEPS] = gas
  792. # search for the min micro batch size
  793. if min_micro_batch_size < 1:
  794. if TRAIN_MICRO_BATCH_SIZE_PER_GPU in self.user_config and isinstance(
  795. self.user_config[TRAIN_MICRO_BATCH_SIZE_PER_GPU],
  796. int):
  797. # user specifies train_micro_batch_size_per_gpu as an int
  798. mbs = int(self.user_config[TRAIN_MICRO_BATCH_SIZE_PER_GPU])
  799. else:
  800. # user does not specify train_micro_batch_size_per_gpu or sets it to "auto" when using Hugging Face
  801. val = self.get_val_from_user_args(TRAIN_MICRO_BATCH_SIZE_PER_GPU)
  802. if val:
  803. mbs = int(val)
  804. else:
  805. mbs = 1
  806. assert mbs > 0, "The micro batch size per GPU must be greater than 0."
  807. ds_config[TRAIN_MICRO_BATCH_SIZE_PER_GPU] = mbs
  808. ds_config[GRADIENT_ACCUMULATION_STEPS] = gas
  809. ds_config[TRAIN_BATCH_SIZE] = mbs * gas * \
  810. self.exp_num_gpus * self.exp_num_nodes // self.mp_size()
  811. exp_name = tuning_space_name + "_gas" + str(gas) + "_tmbspg" + str(mbs)
  812. exp, metric_val = self.run_ds_config(ds_config, exp_name)
  813. if metric_val:
  814. self.update_records(tuning_space_name, exp, metric_val, 1)
  815. used_micro_batch_sizes.append(mbs)
  816. min_micro_batch_size = mbs
  817. else:
  818. self.update_records(tuning_space_name, exp, 0, 1)
  819. logger.info(
  820. f"User-specified micro batch size per GPU {mbs} does not run")
  821. if self.min_train_micro_batch_size_per_gpu() == mbs:
  822. return -1, -1
  823. mbs = self.min_train_micro_batch_size_per_gpu()
  824. ds_config[TRAIN_MICRO_BATCH_SIZE_PER_GPU] = mbs
  825. ds_config[GRADIENT_ACCUMULATION_STEPS] = gas
  826. ds_config[TRAIN_BATCH_SIZE] = mbs * gas * \
  827. self.exp_num_gpus * self.exp_num_nodes // self.mp_size()
  828. exp_name = tuning_space_name + "_gas" + str(gas) + "_tmbspg" + str(mbs)
  829. exp, metric_val = self.run_ds_config(ds_config, exp_name)
  830. if not metric_val:
  831. self.update_records(tuning_space_name, exp, 0, 1)
  832. logger.info(
  833. f"min_train_micro_batch_size_per_gpu {mbs} is not runnable.")
  834. return -1, -1
  835. self.update_records(tuning_space_name, exp, metric_val, 1)
  836. min_micro_batch_size = mbs
  837. used_micro_batch_sizes.append(mbs)
  838. else:
  839. ds_config[TRAIN_MICRO_BATCH_SIZE_PER_GPU] = min_micro_batch_size
  840. ds_config[GRADIENT_ACCUMULATION_STEPS] = gas
  841. ds_config[TRAIN_BATCH_SIZE] = min_micro_batch_size * gas * \
  842. self.exp_num_gpus * self.exp_num_nodes // self.mp_size()
  843. exp_name = tuning_space_name + "_gas" + str(gas) + "_tmbspg" + str(
  844. min_micro_batch_size)
  845. exp, metric_val = self.run_ds_config(ds_config, exp_name)
  846. if metric_val:
  847. self.update_records(tuning_space_name, exp, metric_val, 1)
  848. used_micro_batch_sizes.append(min_micro_batch_size)
  849. else:
  850. self.update_records(tuning_space_name, exp, 0, 1)
  851. return -1, -1
  852. # search for the max micro batch size
  853. max_micro_batch_size = min(calculated_max_micro_batch_size,
  854. self.max_train_micro_batch_size_per_gpu())
  855. for mbs in [
  856. math.ceil(1.05 * max_micro_batch_size),
  857. max_micro_batch_size,
  858. int(0.95 * max_micro_batch_size)
  859. ]:
  860. if mbs > self.max_train_micro_batch_size_per_gpu():
  861. continue
  862. if mbs in used_micro_batch_sizes:
  863. return min_micro_batch_size, mbs
  864. ds_config[TRAIN_MICRO_BATCH_SIZE_PER_GPU] = mbs
  865. ds_config[TRAIN_BATCH_SIZE] = mbs * gas * \
  866. self.exp_num_gpus * self.exp_num_nodes // self.mp_size()
  867. exp_name = tuning_space_name + "_gas" + str(gas) + "_tmbspg" + str(mbs)
  868. exp, metric_val = self.run_ds_config(ds_config, exp_name)
  869. if metric_val:
  870. logger.info(f"mbs = {mbs} is found as max mbs")
  871. self.update_records(tuning_space_name, exp, metric_val, 1)
  872. used_micro_batch_sizes.append(mbs)
  873. return min_micro_batch_size, mbs
  874. else:
  875. self.update_records(tuning_space_name, exp, 0, 1)
  876. space_records = self.records[
  877. tuning_space_name] if tuning_space_name in self.records else []
  878. if space_records:
  879. prev_idx = min(range(len(space_records)),
  880. key=lambda i: abs(space_records[i][0][DS_CONFIG][
  881. TRAIN_MICRO_BATCH_SIZE_PER_GPU] - min_micro_batch_size))
  882. prev_metric_val = space_records[prev_idx][1]
  883. else:
  884. prev_metric_val = None
  885. low = min_micro_batch_size
  886. high = max_micro_batch_size
  887. # binary search until low is the smallest micro batch size that OOMs.
  888. while low <= high:
  889. mid = int((low + high) // 2)
  890. logger.debug(f"trying mbs = {mid}, low = {low}, high = {high}")
  891. if mid not in used_micro_batch_sizes:
  892. ds_config[TRAIN_MICRO_BATCH_SIZE_PER_GPU] = mid
  893. ds_config[TRAIN_BATCH_SIZE] = mid * gas * \
  894. self.exp_num_gpus * self.exp_num_nodes // self.mp_size()
  895. exp_name = tuning_space_name + "_gas" + str(gas) + "_tmbspg" + str(mid)
  896. exp, metric_val = self.run_ds_config(ds_config, exp_name)
  897. if metric_val:
  898. low = mid + 1
  899. self.update_records(tuning_space_name, exp, metric_val, 1)
  900. used_micro_batch_sizes.append(mid)
  901. if prev_metric_val and ((metric_val - prev_metric_val) /
  902. prev_metric_val) < METRIC_PERCENT_DIFF_CONST:
  903. logger.info(f"performance plateaus at mbs = {low}")
  904. break
  905. prev_metric_val = metric_val
  906. else:
  907. self.update_records(tuning_space_name, exp, 0, 1)
  908. high = mid - 1
  909. else:
  910. low = mid + 1
  911. max_micro_batch_size = low - 1
  912. logger.info(
  913. f"min_micro_batch_size = {min_micro_batch_size}, max_micro_batch_size = {max_micro_batch_size}."
  914. )
  915. return min_micro_batch_size, max_micro_batch_size
  916. def get_gas_from_user_config(self):
  917. gas = 1
  918. if GRADIENT_ACCUMULATION_STEPS in self.user_config:
  919. gas_in_config = self.user_config[GRADIENT_ACCUMULATION_STEPS]
  920. if isinstance(gas_in_config, int):
  921. gas = gas_in_config
  922. elif gas_in_config == "auto": # GRADIENT_ACCUMULATION_STEPS: "auto"
  923. val = self.get_val_from_config(GRADIENT_ACCUMULATION_STEPS)
  924. if val:
  925. gas = int(val)
  926. elif isinstance(gas_in_config, list):
  927. logger.info(
  928. f"Specifying a list of {GRADIENT_ACCUMULATION_STEPS} to tune is not supported. 1 would be used."
  929. )
  930. assert gas > 0, "Gradient accumulation steps must be positive."
  931. return gas
  932. def get_val_from_user_args(self, ds_name):
  933. arg_mappings = self.autotuning_config.arg_mappings
  934. user_args = self.args.user_args
  935. if arg_mappings and ds_name in arg_mappings:
  936. arg_name = arg_mappings[ds_name]
  937. if arg_name in user_args:
  938. idx = user_args.index(arg_name)
  939. if user_args[idx + 1].isnumeric():
  940. return (user_args[idx + 1])
  941. return None
  942. def get_tuning_micro_batch_size_list(self,
  943. min_micro_batch_size,
  944. max_micro_batch_size,
  945. num_tuning_micro_batch_sizes):
  946. """Get a list of micro batch sizes to tune based on min and max values, as well as the size of the list.
  947. Args:
  948. min_micro_batch_size ([int]): min micro batch size per GPU
  949. max_micro_batch_size ([int]): max micro batch size per GPU
  950. num_tuning_micro_batch_sizes (int): the number of items in the returned list
  951. Returns:
  952. [list]: a list of micro batch sizes to tune.
  953. """
  954. if min_micro_batch_size <= 0 or max_micro_batch_size <= 0:
  955. logger.info(
  956. f"min_micro_batch_size = {min_micro_batch_size}, max_micro_batch_size = {max_micro_batch_size}"
  957. )
  958. return [], 0
  959. # NUM_GPUS=$(( ${NUM_WORKERS} * ${NUM_GPUS_PER_WORKER} ))
  960. # DP_SIZE=$(( ${NUM_GPUS} / (${PP_SIZE} * ${MP_SIZE}) ))
  961. # GRAD_ACC_STEPS=$(( ${TARGET_GLOBAL_BATCH_SIZE} / (${BATCH_SIZE} * ${DP_SIZE}) ))
  962. if self.max_train_batch_size() and self.max_train_batch_size(
  963. ) > 0: # if the user specifies a max_train_batch_size
  964. max_train_batch_size_per_gpu = self.max_train_batch_size() * self.mp_size(
  965. ) // (self.exp_num_gpus * self.exp_num_nodes)
  966. else:
  967. gas = self.get_gas_from_user_config()
  968. max_train_batch_size_per_gpu = max_micro_batch_size * gas // self.mp_size()
  969. logger.info(f"max_train_batch_size_per_gpu = {max_train_batch_size_per_gpu}")
  970. if min_micro_batch_size < max_micro_batch_size // 2:
  971. min_micro_batch_size = max_micro_batch_size // 2
  972. # constant stride
  973. stride = (max_micro_batch_size -
  974. min_micro_batch_size) // num_tuning_micro_batch_sizes
  975. if stride == 0:
  976. stride = 1
  977. ls = []
  978. min_gas = max_train_batch_size_per_gpu // max_micro_batch_size
  979. # if gas is the same as min_gas, do not add mbs to the tuning list
  980. for mbs in range(min_micro_batch_size, max_micro_batch_size, stride):
  981. if max_micro_batch_size // mbs != min_gas:
  982. ls.append(mbs)
  983. ls.append(max_micro_batch_size)
  984. return ls, max_train_batch_size_per_gpu
  985. def run_ds_config(self, ds_config, exp_name):
  986. exp_config = {}
  987. exp_config['name'] = exp_name
  988. exp_config[DS_CONFIG] = ds_config
  989. exp_config['num_gpus'] = self.exp_num_gpus
  990. exp_config['num_nodes'] = self.exp_num_nodes
  991. exp_path = os.path.join(self.exps_dir, f'{exp_name}.json')
  992. logger.debug(f'run_ds_config exp_name = {exp_name}')
  993. with open(exp_path, 'w', buffering=BUFSIZE) as fd:
  994. json.dump(exp_config, fd)
  995. fd.flush()
  996. os.fsync(fd)
  997. self.rm.schedule_experiments([exp_path])
  998. self.rm.run()
  999. exp, metric_val = self.rm.parse_results(self.metric())
  1000. self.rm.clear()
  1001. return exp, metric_val
  1002. def write_optimal_config(self):
  1003. best_space_records = self.get_best_space_records()
  1004. if GLOBAL_TUNING_SPACE not in best_space_records:
  1005. return
  1006. best_exp, best_metric_val, _ = best_space_records[GLOBAL_TUNING_SPACE]
  1007. if best_exp:
  1008. exp_dir = best_exp["result_dir"]
  1009. cmd = None
  1010. with open(os.path.join(exp_dir, "cmd.txt"), "r") as f:
  1011. cmd = [str(i) for i in f.read().split()]
  1012. ds_config = hjson.load(open(os.path.join(exp_dir, "ds_config.json"), "r"))
  1013. ds_config.pop(AUTOTUNING)
  1014. ds_config_path = os.path.join(self.results_dir, "ds_config_optimal.json")
  1015. json.dump(ds_config, open(ds_config_path, "w"))
  1016. cmd_path = os.path.join(self.results_dir, "cmd_optimal.txt")
  1017. with open(cmd_path, "w") as fd:
  1018. fd.write(" ".join(cmd))
  1019. fd.write("\n")
  1020. fd.flush()
  1021. self.optimal_cmd = cmd
  1022. self.optmal_ds_config = ds_config
  1023. logger.info(
  1024. f"Wrote the optimal DeepSpeed configuration found by autotuning to {ds_config_path}, and the corresponding DeepSpeed command to {cmd_path}"
  1025. )
  1026. def run_after_tuning(self):
  1027. """ Launches the training with the optimal DeepSpeed configuration found through the autotuning process.
  1028. "ds_config_optimal.json" describing the optmimal DeepSpeed configuration as well the command used to launch training "cmd_optimal.txt" are saved to self.results_dir.
  1029. """
  1030. if self.optimal_cmd:
  1031. result = subprocess.Popen(self.optimal_cmd)
  1032. result.wait()
  1033. logger.info(
  1034. f"Done running with the optimal DeepSpeed configuration using {self.optimal_cmd}"
  1035. )
  1036. else:
  1037. logger.info(f"No optimal DeepSpeed configuration found by autotuning.")