1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132 |
- import copy
- import json
- import os
- from random import sample
- import shutil
- import subprocess
- import hjson
- import torch
- import time
- import datetime
- import math
- from ..runtime.config_utils import dict_raise_error_on_duplicate_keys
- from ..runtime.constants import *
- from ..runtime.zero.constants import *
- from ..utils import logger
- from .config import DeepSpeedAutotuningConfig
- from .constants import *
- from .scheduler import ResourceManager, run_experiment
- from .tuner import GridSearchTuner, RandomTuner, ModelBasedTuner
- from .utils import *
- try:
- from tabulate import tabulate
- except ImportError:
- tabulate = None
- class Autotuner:
- """The DeepSpeed Autotuner automatically discovers the optimal DeepSpeed configuration that delivers good training speed. The Autotuner uses model information, system information, and heuristics to efficiently tune system knobs that affect compute and memory efficiencies, such as ZeRO optimization stages, micro-batch sizes, and many other ZeRO optimization configurations. It not only reduces the time and resources user spend on tuning, but also can discover configurations better than hand-tuned methods.
- Autotuning with DeepSpeed requires no code change from DeepSpeed users. Please refer to the README for usage details.
- """
- def __init__(self, args, active_resources):
- self.args = args
- self.selected_exp_dir = None
- assert tabulate is not None, "Missing required package `tabulate`, please install with `pip install deepspeed[autotuning]`."
- logger.debug(f"autotunning args={args}")
- self.user_config = self._get_user_config(args.user_args)
- assert self.user_config is not None, "DeepSpeed configuration is not provided"
- self.autotuning_config = DeepSpeedAutotuningConfig(self.user_config)
- self.exps_dir = DEFAULT_EXPRS_DIR
- if self.autotuning_config.exps_dir and self.autotuning_config.exps_dir != "":
- self.exps_dir = self.autotuning_config.exps_dir
- if self.autotuning_config.overwrite and os.path.exists(self.exps_dir):
- shutil.rmtree(self.exps_dir, ignore_errors=True)
- if not os.path.exists(self.exps_dir):
- os.makedirs(self.exps_dir, exist_ok=True)
- self.results_dir = DEFAULT_RESULTS_DIR
- if self.autotuning_config.results_dir and self.autotuning_config.results_dir != "":
- self.results_dir = self.autotuning_config.results_dir
- if self.autotuning_config.overwrite and os.path.exists(self.results_dir):
- shutil.rmtree(self.results_dir, ignore_errors=True)
- if not os.path.exists(self.results_dir):
- os.makedirs(self.results_dir, exist_ok=True)
- # set the active resource for the autotuner resource manager
- self.rm = self._get_resource_manager(active_resources)
- # get resource requirement for each autotuning experiment
- self.exp_num_nodes, self.exp_num_gpus = self._get_exp_resources(args)
- assert self.exp_num_gpus <= self.rm.num_gpus_per_node, "num_gpus in the autotuning configuration must not be less than the --num_gpus value in the train script if any"
- assert self.exp_num_nodes <= len(
- self.rm.nodes), "num_nodes in the autotuning configuration must not be less than the --num_nodes value in the train script if any"
- self.records = {}
- def print_tuning_results(self):
- """Print the autotuning results in tabular format.
- """
- best_space_records = self.get_best_space_records()
- tab = []
- if best_space_records:
- for key, val in best_space_records.items():
- if not val:
- continue
- row = []
- row.append(key)
- num_exps = 0
- if key == GLOBAL_TUNING_SPACE:
- cnt = 0
- for k, v in best_space_records.items():
- if k != GLOBAL_TUNING_SPACE:
- cnt += v[2]
- num_exps = cnt
- else:
- num_exps = val[2]
- row.append(num_exps)
- row.append(val[1])
- row.append(val[0]['name'])
- tab.append(row)
- summary = tabulate(tab,
- headers=[
- "tuning_space",
- "num_experiments",
- "best_metric_val",
- "best_exp_name"
- ],
- tablefmt="pipe")
- print(summary)
- with open(os.path.join(self.results_dir,
- 'summary.txt'),
- 'w',
- buffering=BUFSIZE) as fd:
- fd.write(summary)
- fd.flush()
- os.fsync(fd)
- if GLOBAL_TUNING_SPACE in best_space_records:
- best_exp, best_metric_val, total_num_exps = best_space_records[GLOBAL_TUNING_SPACE]
- if best_exp:
- logger.info(
- f"{best_exp['name']} is the optimal setup after tuning. The exp result is at {best_exp['result_dir']}."
- )
- else:
- logger.info(
- f"No optimal setup is found. Please check that experiments were run successfully."
- )
- tuning_duration = datetime.timedelta(seconds=(time.time() - self.start_time))
- logger.info(f"Tuning completed in {tuning_duration}")
- with open(os.path.join(self.results_dir, 'summary.txt'), 'a') as f:
- f.write(
- f"\n\nTuning completed in {tuning_duration}. Total number of experiments: {self.rm.experiment_count - 1}."
- )
- f.flush()
- def _get_user_config(self, user_args):
- """Get DeepSpeed configuration from the user arguments passed to the launcher.
- Args:
- user_args ([list]): user arguments passed to the DeepSpeed launcher
- Returns:
- [dict]: DeepSpeed configuration dictionary
- """
- user_config_file = None
- if "--deepspeed_config" in user_args:
- idx = user_args.index("--deepspeed_config")
- assert ".json" in user_args[idx +
- 1], "DeepSpeed --deepspeed_config requires a json file to specify the configuration"
- user_config_file = user_args[idx + 1]
- elif "--deepspeed" in user_args:
- idx = user_args.index("--deepspeed")
- if ".json" in user_args[idx + 1]:
- user_config_file = user_args[idx + 1]
- logger.debug(f"user_config_file = {user_config_file}")
- if user_config_file is not None:
- assert os.path.isfile(
- user_config_file
- ), "DeepSpeed configuration file: {} is not an existing file".format(
- user_config_file
- )
- if os.path.exists(user_config_file):
- return json.load(open(user_config_file,
- "r"),
- object_pairs_hook=dict_raise_error_on_duplicate_keys)
- return None
- def _get_resource_manager(self, active_resources):
- """Initialize and return a resource manager
- Args:
- active_resources ([dict]): A dictionary of hostname and its slots (GPUs), e.g. {"worker-0": "0,1,2,3,4,5,6,7,8"}
- Raises:
- RuntimeError: raises the error if no GPU is available
- Returns:
- [ResourceManager]: A resource manager that schedules and runs autotuning experiments.
- """
- logger.info(f"active_resources = {active_resources}")
- hosts = []
- ngpus_per_node = 100
- for hostname, slots in active_resources.items():
- hosts.append(hostname)
- ngpus_per_node = min(len(slots), ngpus_per_node)
- assert ngpus_per_node > 0, "no gpu is available"
- return ResourceManager(args=self.args,
- hosts=hosts,
- num_gpus_per_node=ngpus_per_node,
- results_dir=self.results_dir,
- exps_dir=self.exps_dir,
- arg_mappings=self.autotuning_config.arg_mappings)
- def _get_exp_resources(self, args):
- """Get resource requirement for each autotuning experiment
- Args:
- args (dict): user args
- Returns:
- num_nodes, num_gpus: the number of gpus and number of nodes used in the autotuning experiments
- """
- if args.num_nodes > 0:
- num_nodes = args.num_nodes
- else:
- num_nodes = len(self.rm.nodes)
- if args.num_gpus > 0:
- num_gpus = args.num_gpus
- else:
- num_gpus = self.rm.num_gpus_per_node
- return num_nodes, num_gpus
- def metric(self):
- return self.autotuning_config.metric
- def fast_enabled(self):
- return self.autotuning_config.fast
- def max_train_batch_size(self):
- return self.autotuning_config.max_train_batch_size
- def mp_size(self):
- return self.autotuning_config.mp_size
- def max_train_micro_batch_size_per_gpu(self):
- if self.max_train_batch_size() and self.max_train_batch_size(
- ) > 0: # if the user specifies a max_train_batch_size
- max_train_micro_batch_size = self.max_train_batch_size() * self.mp_size(
- ) // (self.exp_num_gpus * self.exp_num_nodes
- ) # gradient accumulation steps >=1
- return min(self.autotuning_config.max_train_micro_batch_size_per_gpu,
- max_train_micro_batch_size)
- else:
- return self.autotuning_config.max_train_micro_batch_size_per_gpu
- def min_train_micro_batch_size_per_gpu(self):
- return self.autotuning_config.min_train_micro_batch_size_per_gpu
- def num_tuning_micro_batch_sizes(self):
- return self.autotuning_config.num_tuning_micro_batch_sizes
- def fp16_enabled(self):
- if FP16 in self.user_config.keys():
- return self.user_config[FP16].get(FP16_ENABLED, FP16_ENABLED_DEFAULT)
- else:
- return False
- def get_gpu_memory_info(self):
- return torch.cuda.get_device_properties(0).total_memory
- def get_activation_memory_per_gpu(self):
- if self.model_info and "activation_mem_per_gpu" in self.model_info:
- return self.model_info["activation_mem_per_gpu"]
- def get_instantiation_memory_required_per_gpu(self, zero_stage):
- num_params = self.get_model_num_params()
- total_gpus = self.exp_num_nodes * self.exp_num_gpus
- fp16_enabled = self.fp16_enabled()
- if not num_params:
- return 0
- # assume the model uses Adam optimizer
- # ZERO_OPTIMIZATION_DISABLED:
- params_mem = num_params * (2 if fp16_enabled else 4)
- gradients_mem = num_params * (2 if fp16_enabled else 4)
- optimizer_mem = num_params * (16 if fp16_enabled else 8)
- if zero_stage >= ZERO_OPTIMIZATION_OPTIMIZER_STATES:
- optimizer_mem = optimizer_mem / total_gpus
- if zero_stage >= ZERO_OPTIMIZATION_GRADIENTS:
- gradients_mem = gradients_mem / total_gpus
- if zero_stage >= ZERO_OPTIMIZATION_WEIGHTS:
- params_mem = params_mem / total_gpus
- mem_per_gpu = (params_mem + gradients_mem + optimizer_mem) / self.mp_size()
- return mem_per_gpu
- def _generate_experiments(self, tuning_space, max_train_batch_size_per_gpu):
- """Generates a list of autotuning experiments given a tuning_space.
- The corresponding parameter values are replaced by user-defined values in the DeepSpeed configuration file.
- Args:
- tuning_space ([dict]): A DeepSpeed configuration dictionary where a value can be a list (called a tuning parameter). For example,
- {
- "zero_optimization": {
- "stage": 1,
- "reduce_bucket_size": [5e7,
- 5e8,
- 1e9],
- "allgather_bucket_size": [5e7,
- 5e8,
- 1e9],
- }
- }
- reduce_bucket_size and allgather_bucket_size are the tuning parameters in this tuning space.
- Returns:
- [list]: a list of experiments generated by taking combinations of values of the tuning space. The above tuning space generates 3*3 = 9 experiments if the user DeepSpeed configuration file does not overwrite the two tuning parameters or define more tuning parameters.
- """
- exps = []
- # each zero stage uses a different template configuration file
- config_zero = tuning_space.get(ZERO_OPTIMIZATION, {})
- stage = config_zero.get(ZERO_OPTIMIZATION_STAGE, None)
- template_config = {}
- if stage == 0:
- template_path = DEFAULT_TEMPLATE_PATH_ZERO_0
- template_config = hjson.load(open(template_path, 'r'))
- prefix = "z0_"
- elif stage == 1:
- template_path = DEFAULT_TEMPLATE_PATH_ZERO_1
- template_config = hjson.load(open(template_path, 'r'))
- prefix = "z1_"
- elif stage == 2:
- template_path = DEFAULT_TEMPLATE_PATH_ZERO_2
- template_config = hjson.load(open(template_path, 'r'))
- prefix = "z2_"
- elif stage == 3:
- template_path = DEFAULT_TEMPLATE_PATH_ZERO_3
- template_config = hjson.load(open(template_path, 'r'))
- model_info = self.model_info
- if model_info and "hidden_size" in model_info:
- hs = model_info["hidden_size"]
- template_config[ZERO_OPTIMIZATION][
- ZERO_OPTIMIZATION_REDUCE_BUCKET_SIZE] = hs * hs
- template_config[ZERO_OPTIMIZATION][
- ZERO_OPTIMIZATION_PREFETCH_BUCKET_SIZE] = 0.9 * hs * hs
- template_config[ZERO_OPTIMIZATION][
- ZERO_OPTIMIZATION_PARAM_PERSISTENCE_THRESHOLD] = 10 * hs
- prefix = "z3_"
- else:
- return exps
- # replace the corresponding parameter values if the user specfies them in the DeepSpeed configuration file
- replace_dict(tuning_space,
- self.user_config,
- [ZERO_OPTIMIZATION,
- TRAIN_MICRO_BATCH_SIZE_PER_GPU])
- logger.debug(f"tuning_space = {json.dumps(tuning_space)}")
- all_configs = get_all_configs(tuning_space)
- tuning_keys = get_tuning_keys(tuning_space)
- logger.debug(f"tuning_keys = {tuning_keys}")
- logger.debug(f"before prunning total configs = {len(all_configs)}")
- pruned_list = prune_configs(all_configs)
- logger.debug(f"after prunning total configs = {len(pruned_list)}")
- for config in pruned_list:
- exp_config = copy.deepcopy(template_config)
- # fill the template with the expr config
- replace_dict(exp_config, config)
- # if the config does not use offloading, remove the offloading section
- config_zero = config.get(ZERO_OPTIMIZATION, None)
- if config_zero:
- if OFFLOAD_OPTIMIZER not in config_zero and OFFLOAD_OPTIMIZER in exp_config[
- ZERO_OPTIMIZATION]:
- del exp_config[ZERO_OPTIMIZATION][OFFLOAD_OPTIMIZER]
- if OFFLOAD_PARAM not in config_zero and OFFLOAD_PARAM in exp_config[
- ZERO_OPTIMIZATION]:
- del exp_config[ZERO_OPTIMIZATION][OFFLOAD_PARAM]
- # set gradient accumulation steps according to max_train_batch_size_per_gpu
- mbs = exp_config[TRAIN_MICRO_BATCH_SIZE_PER_GPU]
- gas = max_train_batch_size_per_gpu // mbs
- exp_config[GRADIENT_ACCUMULATION_STEPS] = gas
- exp_config[TRAIN_BATCH_SIZE] = mbs * gas * \
- self.exp_num_gpus * self.exp_num_nodes // self.mp_size()
- exp = {}
- # generate the expr name
- exp_name = canonical_name(exp_config, tuning_keys, prefix)
- exp['name'] = exp_name
- exp[DS_CONFIG] = exp_config
- exp['num_gpus'] = self.exp_num_gpus
- exp['num_nodes'] = self.exp_num_nodes
- exps.append(exp)
- return exps
- def tune(self):
- """ Tunes Zero stages, micro batch size per GPU, and other Zero configurations. Performance metrics of different tuning spaces are recorded in self.records.
- """
- self.start_time = time.time()
- if self.fast_enabled():
- logger.info(f"Fast mode is enabled. Tuning micro batch size only.")
- # model info profile run with DEFAULT_MIN_MEM_CONFIG
- model_info = self.model_info_profile_run()
- if model_info:
- self.model_info = model_info
- else:
- return
- logger.info(
- f"The model has {number_to_string(self.get_model_num_params())} parameters.")
- self.gpu_mem = self.get_gpu_memory_info()
- logger.info(
- f"Memory per GPU in the system is {memory_to_string(self.gpu_mem, postfix='B')}."
- )
- self.activation_mem = self.get_activation_memory_per_gpu()
- logger.info(
- f"The model requires at least {memory_to_string(self.activation_mem, postfix='B')} activation memory for micro batch size 1."
- )
- stage = self.user_config.get(ZERO_OPTIMIZATION,
- {}).get(ZERO_OPTIMIZATION_STAGE,
- "all")
- user_zero_stages = [stage] if not isinstance(stage, list) else stage
- logger.info(f"User-defined zero stages are {stage}.")
- mbs = 0
- max_mbs = 0
- metric_val = 0
- required_gpu_mem = self.get_instantiation_memory_required_per_gpu(
- ZERO_OPTIMIZATION_DISABLED) + self.activation_mem
- if self.gpu_mem > required_gpu_mem:
- if "all" in user_zero_stages or ZERO_OPTIMIZATION_DISABLED in user_zero_stages:
- logger.info(
- f"The model might be runable with ZERO 0 (which requires at least {memory_to_string(required_gpu_mem, postfix='B')} memory with mbs = 1), adding DEFAULT_TUNING_SPACE_ZERO_0 to the global tuning space"
- )
- next_max_mbs, next_mbs, next_metric_val = self.tune_space(
- DEFAULT_TUNING_SPACE_ZERO_0)
- if next_mbs > mbs:
- mbs = next_mbs
- max_mbs = next_max_mbs
- metric_val = next_metric_val
- else:
- logger.info(
- f"The model is not runable with ZERO stage {ZERO_OPTIMIZATION_DISABLED} (which requires at least {memory_to_string(required_gpu_mem, postfix='B')} memory with mbs = 1)"
- )
- required_gpu_mem = self.get_instantiation_memory_required_per_gpu(
- ZERO_OPTIMIZATION_OPTIMIZER_STATES) + self.activation_mem
- if self.gpu_mem > required_gpu_mem:
- if "all" in user_zero_stages or ZERO_OPTIMIZATION_OPTIMIZER_STATES in user_zero_stages:
- logger.info(
- f"The model might be runable with ZERO 1 (which requires at least {memory_to_string(required_gpu_mem, postfix='B')} memory), adding DEFAULT_TUNING_SPACE_ZERO_1 to the global tuning space"
- )
- next_max_mbs, next_mbs, next_metric_val = self.tune_space(
- DEFAULT_TUNING_SPACE_ZERO_1, prev_max_mbs = max_mbs, prev_best_mbs=mbs, prev_best_metric_val=metric_val)
- if next_mbs > mbs:
- mbs = next_mbs
- max_mbs = next_max_mbs
- metric_val = next_metric_val
- else:
- logger.info(
- f"The model is not runable with ZERO stage {ZERO_OPTIMIZATION_OPTIMIZER_STATES} (which requires at least {memory_to_string(required_gpu_mem, postfix='B')} memory with mbs = 1)"
- )
- required_gpu_mem = self.get_instantiation_memory_required_per_gpu(
- ZERO_OPTIMIZATION_GRADIENTS) + self.activation_mem
- if self.gpu_mem > required_gpu_mem:
- if "all" in user_zero_stages or ZERO_OPTIMIZATION_GRADIENTS in user_zero_stages:
- logger.info(
- f"The model might be runable with ZERO 2 (which requires at least {memory_to_string(required_gpu_mem, postfix='B')} memory), adding DEFAULT_TUNING_SPACE_ZERO_2 to the global tuning space"
- )
- next_max_mbs, next_mbs, next_metric_val = self.tune_space(
- DEFAULT_TUNING_SPACE_ZERO_2, prev_max_mbs = max_mbs, prev_best_mbs=mbs, prev_best_metric_val=metric_val)
- if next_mbs > mbs:
- mbs = next_mbs
- max_mbs = next_max_mbs
- metric_val = next_metric_val
- else:
- logger.info(
- f"The model is not runable with ZERO stage {ZERO_OPTIMIZATION_GRADIENTS} (which requires at least {memory_to_string(required_gpu_mem, postfix='B')} memory with mbs = 1)"
- )
- required_gpu_mem = self.get_instantiation_memory_required_per_gpu(
- ZERO_OPTIMIZATION_WEIGHTS) + self.activation_mem
- if self.gpu_mem > required_gpu_mem:
- if "all" in user_zero_stages or ZERO_OPTIMIZATION_WEIGHTS in user_zero_stages:
- logger.info(
- f"The model might be runable with ZERO 3 (which requires at least {memory_to_string(required_gpu_mem, postfix='B')} memory), adding DEFAULT_TUNING_SPACE_ZERO_3 to the global tuning space"
- )
- _, _, _ = self.tune_space(
- DEFAULT_TUNING_SPACE_ZERO_3, prev_max_mbs = max_mbs, prev_best_mbs=mbs, prev_best_metric_val=metric_val)
- else:
- logger.info(
- f"The model has {self.get_model_num_params()} parameters and requires at least {memory_to_string(required_gpu_mem, postfix='B')} memory per GPU with DeepSpeed Zero stage {ZERO_OPTIMIZATION_WEIGHTS} optimization. Memory per GPU in system is {memory_to_string(self.gpu_mem)}. No tuning is performed."
- )
- return
- def tune_space(self,
- tuning_space,
- prev_max_mbs=0,
- prev_best_mbs=0,
- prev_best_metric_val=0):
- config_zero = tuning_space.get(ZERO_OPTIMIZATION, {})
- stage = config_zero.get(ZERO_OPTIMIZATION_STAGE, ZERO_OPTIMIZATION_STAGE_DEFAULT)
- tuning_space_name = TUNING_MICRO_BATCH_SIZE_PREFIX + str(stage)
- tuning_micro_batch_sizes = []
- max_train_batch_size_per_gpu = 0
- tuning_micro_batch_sizes_overwritten = False
- # calcuate max micro batch size using gpu memory, model instatiation memory and activation memory
- # calculated_max_micro_batch_size = (memory_per_gpu - instantiation_memory) // activation_memory_micro_batch_size_1
- calculated_max_micro_batch_size = int(
- self.gpu_mem -
- self.get_instantiation_memory_required_per_gpu(stage)) // self.activation_mem
- logger.info(
- f"Start tuning for space {tuning_space_name}, calculated_max_micro_batch_size = {calculated_max_micro_batch_size}"
- )
- if calculated_max_micro_batch_size < prev_max_mbs:
- logger.info(
- f"No need to tune Zero stage {stage}. End tuning for space {tuning_space_name}"
- )
- return 0, 0, 0
- if TRAIN_MICRO_BATCH_SIZE_PER_GPU in self.user_config and isinstance(
- self.user_config[TRAIN_MICRO_BATCH_SIZE_PER_GPU],
- list):
- # user-specified micro batch size per gpu is a list which overwrites the default tuning behavior
- tuning_micro_batch_sizes = [
- s for s in self.user_config[TRAIN_MICRO_BATCH_SIZE_PER_GPU]
- if isinstance(s,
- int)
- ]
- gas = self.get_gas_from_user_config()
- min_micro_batch_size = min(tuning_micro_batch_sizes)
- max_micro_batch_size = max(tuning_micro_batch_sizes)
- max_train_batch_size_per_gpu = max_micro_batch_size * gas
- tuning_micro_batch_sizes_overwritten = True
- else:
- # auto-detects the list of micro batch sizes to tune
- min_micro_batch_size, max_micro_batch_size = self.get_min_max_micro_batch_size(
- stage, prev_max_mbs, calculated_max_micro_batch_size)
- if max_micro_batch_size < prev_max_mbs:
- logger.info(
- f"No need to tune Zero stage {stage}. End tuning for space {tuning_space_name}"
- )
- return 0, 0, 0
- tuning_micro_batch_sizes, max_train_batch_size_per_gpu = self.get_tuning_micro_batch_size_list(
- min_micro_batch_size,
- max_micro_batch_size,
- num_tuning_micro_batch_sizes=self.num_tuning_micro_batch_sizes())
- logger.info(
- f"tuning_micro_batch_sizes = {tuning_micro_batch_sizes}, max_train_batch_size_per_gpu = {max_train_batch_size_per_gpu}"
- )
- # return if the tuning_micro_batch_sizes list is empty
- if not tuning_micro_batch_sizes:
- logger.info(f"End tuning for space {tuning_space_name}")
- return 0, 0, 0
- # tune micro batch sizes and gradient accumulation steps given max_train_batch_size_per_gpu
- tuning_micro_batch_sizes = self.run_tuning_micro_batch_sizes(
- tuning_micro_batch_sizes,
- max_train_batch_size_per_gpu,
- min_micro_batch_size,
- stage,
- tuning_micro_batch_sizes_overwritten)
- fast_best_record = self.get_best_space_record(tuning_space_name)
- fast_best_metric_val = fast_best_record[1] if fast_best_record else 0
- fast_best_mbs = fast_best_record[0][DS_CONFIG][
- TRAIN_MICRO_BATCH_SIZE_PER_GPU] if fast_best_record else 0
- logger.info(
- f"fast_best_mbs = {fast_best_mbs}, name = {fast_best_record[0]['name']}")
- if self.fast_enabled() or stage == 0:
- logger.info(f"End tuning for space: {tuning_space_name}")
- return max_micro_batch_size, fast_best_mbs, fast_best_metric_val
- # if the best metric or the micro batch size for that best metric in the current Zero stage after tuning micro batch size is less than the corrresponding value in the prevous Zero stage, return, do not tune other Zero configuration paramerts
- if stage > 0:
- if fast_best_mbs <= prev_best_mbs or fast_best_metric_val < prev_best_metric_val:
- logger.info(
- f"End tuning for space: {tuning_space_name}. No need to tune other Zero configuration paramerts."
- )
- return max_micro_batch_size, fast_best_mbs, fast_best_metric_val
- tuning_space[TRAIN_MICRO_BATCH_SIZE_PER_GPU] = tuning_micro_batch_sizes
- tuning_space_name = canonical_name(tuning_space,
- tuning_keys=get_tuning_keys(tuning_space),
- prefix="z" + str(stage) + "_",
- omit_val=True)
- logger.info(f'Tuning space is {tuning_space}')
- logger.info(f'Tuning space name is {tuning_space_name}')
- exps = self._generate_experiments(tuning_space, max_train_batch_size_per_gpu)
- logger.info(f'Tuner type is {self.autotuning_config.tuner_type}')
- if self.autotuning_config.tuner_type == AUTOTUNING_TUNER_MODELBASED:
- t = ModelBasedTuner(exps, self.rm, self.metric(), tuning_space)
- elif self.autotuning_config.tuner_type == AUTOTUNING_TUNER_RANDOM:
- t = RandomTuner(exps, self.rm, self.metric())
- else:
- t = GridSearchTuner(exps, self.rm, self.metric())
- sample_size = len(self.rm.nodes) * self.rm.num_gpus_per_node // (
- self.exp_num_gpus * self.exp_num_nodes)
- num_exps = t.tune(sample_size=sample_size,
- n_trials=self.autotuning_config.tuner_num_trials,
- early_stopping=self.autotuning_config.tuner_early_stopping)
- exp = t.best_exp
- metric_val = t.best_metric_val
- if exp:
- self.update_records(tuning_space_name, exp, metric_val, num_exps)
- full_best_record = self.get_best_space_record(tuning_space_name)
- full_best_metric_val = full_best_record[1] if full_best_record else -1
- if full_best_metric_val > fast_best_metric_val:
- best_metric_val = full_best_metric_val
- best_mbs = full_best_record[0][DS_CONFIG][
- TRAIN_MICRO_BATCH_SIZE_PER_GPU] if full_best_record else -1
- else:
- best_metric_val = fast_best_metric_val
- best_mbs = fast_best_mbs
- logger.info(f"End tuning for space: {tuning_space_name}")
- return max_micro_batch_size, best_mbs, best_metric_val
- def get_plauteu_mbs(self, tuning_space_name):
- if tuning_space_name not in self.records:
- return 0
- space_records = self.records[tuning_space_name]
- sorted_space_records = sorted(
- space_records,
- key=lambda x: x[0][DS_CONFIG][TRAIN_MICRO_BATCH_SIZE_PER_GPU])
- prev_metric_val = None
- prev_micro_batch_size = 0
- for (exp, metric_val, _) in sorted_space_records:
- if prev_metric_val:
- if metric_val < prev_metric_val:
- break
- if (metric_val >= prev_metric_val
- and (metric_val - prev_metric_val) / prev_metric_val <
- METRIC_PERCENT_DIFF_CONST):
- break
- prev_metric_val = metric_val
- prev_micro_batch_size = exp[DS_CONFIG][TRAIN_MICRO_BATCH_SIZE_PER_GPU]
- plateau_mbs = prev_micro_batch_size
- return plateau_mbs
- def get_model_num_params(self):
- if self.model_info and "num_params" in self.model_info:
- return self.model_info["num_params"]
- def model_info_profile_run(self):
- """Does a model information profling experiment that collects the number of model parameters and activation memory.\
- The experiment produces a "profile_model_info" folder under self.results_dir.
- Returns:
- [dict]: a model inforation dictionary, e.g., {"num_params": 335144976, "trainable_num_params": 335144976, "activation_mem_per_gpu": 324358144, "rank": 0}
- """
- logger.info("Starting model info profile run.")
- model_info = self.autotuning_config.model_info
- if model_info and MODEL_INFO_NUM_PARAMS in model_info:
- return model_info
- ds_config = copy.deepcopy(self.user_config)
- replace_dict(ds_config, DEFAULT_MIN_MEM_CONFIG)
- model_info_path = os.path.join(self.results_dir,
- "profile_model_info",
- "model_info.json")
- ds_config[AUTOTUNING] = {
- "enabled": True,
- "model_info_path": model_info_path,
- "model_info": {
- "profile": True
- }
- }
- exp_config = {}
- exp_name = "profile_model_info"
- exp_config['name'] = exp_name
- exp_config[DS_CONFIG] = ds_config
- exp_config['num_gpus'] = self.exp_num_gpus
- exp_config['num_nodes'] = self.exp_num_nodes
- exp_path = os.path.join(self.exps_dir, f'{exp_name}.json')
- with open(exp_path, 'w', buffering=BUFSIZE) as fd:
- json.dump(exp_config, fd)
- fd.flush()
- os.fsync(fd)
- self.rm.schedule_experiments([exp_path])
- self.rm.run()
- for exp_id, (exp_json, err) in self.rm.finished_experiments.items():
- self.rm.clear()
- if err:
- logger.error(
- f"The model is not runnable with DeepSpeed with error = {err}")
- return None
- if os.path.exists(model_info_path):
- with open(model_info_path, 'r') as f:
- model_info = hjson.load(f)
- return model_info
- def update_records(self, space_name, exp, metric_val, num_exps):
- if space_name not in self.records:
- self.records[space_name] = [(exp, metric_val, num_exps)]
- else:
- self.records[space_name].append((exp, metric_val, num_exps))
- def get_best_space_record(self, space_name):
- if space_name not in self.records:
- return None
- space_records = self.records[space_name]
- best_space_record = None
- space_num_exps = 0
- for (exp, metric_val, num_exps) in space_records:
- space_num_exps += num_exps
- if best_space_record is None or metric_val > best_space_record[1]:
- best_space_record = (exp, metric_val)
- if best_space_record:
- best_space_record = best_space_record + (space_num_exps, )
- return best_space_record
- def get_best_space_records(self):
- best_space_records = {}
- global_best_record = None
- for space_name, space_records in self.records.items():
- best_space_record = self.get_best_space_record(space_name)
- if best_space_record:
- best_space_records[space_name] = best_space_record
- if not global_best_record or best_space_record[1] > global_best_record[1]:
- global_best_record = best_space_record
- if global_best_record:
- best_space_records[GLOBAL_TUNING_SPACE] = global_best_record
- return best_space_records
- def run_tuning_micro_batch_sizes(self,
- tuning_micro_batch_sizes,
- max_train_batch_size_per_gpu,
- min_micro_batch_size,
- stage,
- tuning_micro_batch_sizes_overwritten):
- assert tuning_micro_batch_sizes, "the tuning micro batch size list is empty"
- tuning_micro_batch_sizes.sort()
- max_micro_batch_size = tuning_micro_batch_sizes[-1]
- max_micro_batch_size_metric_val = 0
- ds_config = get_first_config(self.user_config)
- ds_config[ZERO_OPTIMIZATION] = {ZERO_OPTIMIZATION_STAGE: stage}
- tuning_space_name = TUNING_MICRO_BATCH_SIZE_PREFIX + str(stage)
- exp_paths = []
- for mbs in tuning_micro_batch_sizes:
- ds_config[TRAIN_MICRO_BATCH_SIZE_PER_GPU] = mbs
- gas = max_train_batch_size_per_gpu // mbs
- ds_config[GRADIENT_ACCUMULATION_STEPS] = gas
- ds_config[TRAIN_BATCH_SIZE] = mbs * gas * \
- self.exp_num_gpus * self.exp_num_nodes // self.mp_size()
- exp_name = tuning_space_name + "_gas" + str(gas) + "_tmbspg" + str(mbs)
- exp_config = {}
- exp_config['name'] = exp_name
- exp_config[DS_CONFIG] = ds_config
- exp_config['num_gpus'] = self.exp_num_gpus
- exp_config['num_nodes'] = self.exp_num_nodes
- exp_path = os.path.join(self.exps_dir, f'{exp_name}.json')
- with open(exp_path, 'w', buffering=BUFSIZE) as fd:
- json.dump(exp_config, fd)
- fd.flush()
- os.fsync(fd)
- exp_paths.append(exp_path)
- self.rm.schedule_experiments(exp_paths)
- self.rm.run()
- for exp_id, (exp, err) in self.rm.finished_experiments.items():
- if exp:
- metric_file = exp[DS_CONFIG][AUTOTUNING][AUTOTUNING_METRIC_PATH]
- if os.path.exists(metric_file):
- with open(metric_file, 'r') as f:
- results = hjson.load(f)
- metric_val = results[self.metric()]
- self.update_records(tuning_space_name, exp, metric_val, 1)
- if max_micro_batch_size == exp[DS_CONFIG][
- TRAIN_MICRO_BATCH_SIZE_PER_GPU]:
- max_micro_batch_size_metric_val = metric_val
- else:
- self.update_records(tuning_space_name, exp, 0, 1)
- else:
- mbs = exp[DS_CONFIG][TRAIN_MICRO_BATCH_SIZE_PER_GPU]
- logger.info(f"micro batch size = {mbs} was not run successfully")
- self.rm.clear()
- if tuning_micro_batch_sizes_overwritten:
- return tuning_micro_batch_sizes
- # in a auto-detected tuning_micro_batch_sizs list, max_micro_batch_size might not be performant as the memory consumption is close to max
- # try smaller values while gas stays the same
- # if finding a more performant mbs value, use it to replace max_micro_batch_size in the list
- min_micro_batch_size_with_same_gas = (
- tuning_micro_batch_sizes[-2] +
- 1) if len(tuning_micro_batch_sizes) > 1 else min_micro_batch_size
- prev_best_metric_val = max_micro_batch_size_metric_val
- prev_best_mbs = max_micro_batch_size
- stride = (max_micro_batch_size - min_micro_batch_size_with_same_gas) // 3
- if stride == 0:
- stride = 1
- for mbs in reversed(
- range(min_micro_batch_size_with_same_gas,
- max_micro_batch_size,
- stride)):
- ds_config[TRAIN_MICRO_BATCH_SIZE_PER_GPU] = mbs
- gas = max_train_batch_size_per_gpu // mbs
- ds_config[GRADIENT_ACCUMULATION_STEPS] = gas
- ds_config[TRAIN_BATCH_SIZE] = mbs * gas * \
- self.exp_num_gpus * self.exp_num_nodes // self.mp_size()
- exp_name = tuning_space_name + "_gas" + str(gas) + "_tmbspg" + str(mbs)
- exp, metric_val = self.run_ds_config(ds_config, exp_name)
- if metric_val:
- self.update_records(tuning_space_name, exp, metric_val, 1)
- if metric_val > prev_best_metric_val * (1 + METRIC_PERCENT_DIFF_CONST):
- prev_best_metric_val = metric_val
- prev_best_mbs = mbs
- else:
- break
- else:
- self.update_records(tuning_space_name, exp, 0, 1)
- break
- if prev_best_mbs != max_micro_batch_size:
- tuning_micro_batch_sizes[-1] = prev_best_mbs
- return tuning_micro_batch_sizes
- def get_min_max_micro_batch_size(self,
- stage,
- min_micro_batch_size,
- calculated_max_micro_batch_size):
- # get min and max micro batch size with gradient accumulation steps = 1
- if min_micro_batch_size > calculated_max_micro_batch_size:
- return -1, -1
- used_micro_batch_sizes = []
- tuning_space_name = TUNING_MICRO_BATCH_SIZE_PREFIX + str(stage)
- ds_config = get_first_config(self.user_config)
- ds_config[ZERO_OPTIMIZATION] = {ZERO_OPTIMIZATION_STAGE: stage}
- gas = self.get_gas_from_user_config()
- ds_config[GRADIENT_ACCUMULATION_STEPS] = gas
- # search for the min micro batch size
- if min_micro_batch_size < 1:
- if TRAIN_MICRO_BATCH_SIZE_PER_GPU in self.user_config and isinstance(
- self.user_config[TRAIN_MICRO_BATCH_SIZE_PER_GPU],
- int):
- # user specifies train_micro_batch_size_per_gpu as an int
- mbs = int(self.user_config[TRAIN_MICRO_BATCH_SIZE_PER_GPU])
- else:
- # user does not specify train_micro_batch_size_per_gpu or sets it to "auto" when using Hugging Face
- val = self.get_val_from_user_args(TRAIN_MICRO_BATCH_SIZE_PER_GPU)
- if val:
- mbs = int(val)
- else:
- mbs = 1
- assert mbs > 0, "The micro batch size per GPU must be greater than 0."
- ds_config[TRAIN_MICRO_BATCH_SIZE_PER_GPU] = mbs
- ds_config[GRADIENT_ACCUMULATION_STEPS] = gas
- ds_config[TRAIN_BATCH_SIZE] = mbs * gas * \
- self.exp_num_gpus * self.exp_num_nodes // self.mp_size()
- exp_name = tuning_space_name + "_gas" + str(gas) + "_tmbspg" + str(mbs)
- exp, metric_val = self.run_ds_config(ds_config, exp_name)
- if metric_val:
- self.update_records(tuning_space_name, exp, metric_val, 1)
- used_micro_batch_sizes.append(mbs)
- min_micro_batch_size = mbs
- else:
- self.update_records(tuning_space_name, exp, 0, 1)
- logger.info(
- f"User-specified micro batch size per GPU {mbs} does not run")
- if self.min_train_micro_batch_size_per_gpu() == mbs:
- return -1, -1
- mbs = self.min_train_micro_batch_size_per_gpu()
- ds_config[TRAIN_MICRO_BATCH_SIZE_PER_GPU] = mbs
- ds_config[GRADIENT_ACCUMULATION_STEPS] = gas
- ds_config[TRAIN_BATCH_SIZE] = mbs * gas * \
- self.exp_num_gpus * self.exp_num_nodes // self.mp_size()
- exp_name = tuning_space_name + "_gas" + str(gas) + "_tmbspg" + str(mbs)
- exp, metric_val = self.run_ds_config(ds_config, exp_name)
- if not metric_val:
- self.update_records(tuning_space_name, exp, 0, 1)
- logger.info(
- f"min_train_micro_batch_size_per_gpu {mbs} is not runnable.")
- return -1, -1
- self.update_records(tuning_space_name, exp, metric_val, 1)
- min_micro_batch_size = mbs
- used_micro_batch_sizes.append(mbs)
- else:
- ds_config[TRAIN_MICRO_BATCH_SIZE_PER_GPU] = min_micro_batch_size
- ds_config[GRADIENT_ACCUMULATION_STEPS] = gas
- ds_config[TRAIN_BATCH_SIZE] = min_micro_batch_size * gas * \
- self.exp_num_gpus * self.exp_num_nodes // self.mp_size()
- exp_name = tuning_space_name + "_gas" + str(gas) + "_tmbspg" + str(
- min_micro_batch_size)
- exp, metric_val = self.run_ds_config(ds_config, exp_name)
- if metric_val:
- self.update_records(tuning_space_name, exp, metric_val, 1)
- used_micro_batch_sizes.append(min_micro_batch_size)
- else:
- self.update_records(tuning_space_name, exp, 0, 1)
- return -1, -1
- # search for the max micro batch size
- max_micro_batch_size = min(calculated_max_micro_batch_size,
- self.max_train_micro_batch_size_per_gpu())
- for mbs in [
- math.ceil(1.05 * max_micro_batch_size),
- max_micro_batch_size,
- int(0.95 * max_micro_batch_size)
- ]:
- if mbs > self.max_train_micro_batch_size_per_gpu():
- continue
- if mbs in used_micro_batch_sizes:
- return min_micro_batch_size, mbs
- ds_config[TRAIN_MICRO_BATCH_SIZE_PER_GPU] = mbs
- ds_config[TRAIN_BATCH_SIZE] = mbs * gas * \
- self.exp_num_gpus * self.exp_num_nodes // self.mp_size()
- exp_name = tuning_space_name + "_gas" + str(gas) + "_tmbspg" + str(mbs)
- exp, metric_val = self.run_ds_config(ds_config, exp_name)
- if metric_val:
- logger.info(f"mbs = {mbs} is found as max mbs")
- self.update_records(tuning_space_name, exp, metric_val, 1)
- used_micro_batch_sizes.append(mbs)
- return min_micro_batch_size, mbs
- else:
- self.update_records(tuning_space_name, exp, 0, 1)
- space_records = self.records[
- tuning_space_name] if tuning_space_name in self.records else []
- if space_records:
- prev_idx = min(range(len(space_records)),
- key=lambda i: abs(space_records[i][0][DS_CONFIG][
- TRAIN_MICRO_BATCH_SIZE_PER_GPU] - min_micro_batch_size))
- prev_metric_val = space_records[prev_idx][1]
- else:
- prev_metric_val = None
- low = min_micro_batch_size
- high = max_micro_batch_size
- while low < high:
- mid = int((low + high) // 2)
- logger.debug(f"trying mbs = {mid}, low = {low}, high = {high}")
- if mid == low:
- break
- if mid not in used_micro_batch_sizes:
- ds_config[TRAIN_MICRO_BATCH_SIZE_PER_GPU] = mid
- ds_config[TRAIN_BATCH_SIZE] = mid * gas * \
- self.exp_num_gpus * self.exp_num_nodes // self.mp_size()
- exp_name = tuning_space_name + "_gas" + str(gas) + "_tmbspg" + str(mid)
- exp, metric_val = self.run_ds_config(ds_config, exp_name)
- if metric_val:
- low = mid
- self.update_records(tuning_space_name, exp, metric_val, 1)
- used_micro_batch_sizes.append(mid)
- if prev_metric_val and ((metric_val - prev_metric_val) /
- prev_metric_val) < METRIC_PERCENT_DIFF_CONST:
- logger.info(f"performance plateaus at mbs = {low}")
- break
- prev_metric_val = metric_val
- else:
- self.update_records(tuning_space_name, exp, 0, 1)
- high = mid - 1
- else:
- low = mid
- max_micro_batch_size = low
- logger.info(
- f"min_micro_batch_size = {min_micro_batch_size}, max_micro_batch_size = {max_micro_batch_size}."
- )
- return min_micro_batch_size, max_micro_batch_size
- def get_gas_from_user_config(self):
- gas = 1
- if GRADIENT_ACCUMULATION_STEPS in self.user_config:
- gas_in_config = self.user_config[GRADIENT_ACCUMULATION_STEPS]
- if isinstance(gas_in_config, int):
- gas = gas_in_config
- elif gas_in_config == "auto": # GRADIENT_ACCUMULATION_STEPS: "auto"
- val = self.get_val_from_config(GRADIENT_ACCUMULATION_STEPS)
- if val:
- gas = int(val)
- elif isinstance(gas_in_config, list):
- logger.info(
- f"Specifying a list of {GRADIENT_ACCUMULATION_STEPS} to tune is not supported. 1 would be used."
- )
- assert gas > 0, "Gradient accumulation steps must be positive."
- return gas
- def get_val_from_user_args(self, ds_name):
- arg_mappings = self.autotuning_config.arg_mappings
- user_args = self.args.user_args
- if arg_mappings and ds_name in arg_mappings:
- arg_name = arg_mappings[ds_name]
- if arg_name in user_args:
- idx = user_args.index(arg_name)
- if user_args[idx + 1].isnumeric():
- return (user_args[idx + 1])
- return None
- def get_tuning_micro_batch_size_list(self,
- min_micro_batch_size,
- max_micro_batch_size,
- num_tuning_micro_batch_sizes):
- """Get a list of micro batch sizes to tune based on min and max values, as well as the size of the list.
- Args:
- min_micro_batch_size ([int]): min micro batch size per GPU
- max_micro_batch_size ([int]): max micro batch size per GPU
- num_tuning_micro_batch_sizes (int): the number of items in the returned list
- Returns:
- [list]: a list of micro batch sizes to tune.
- """
- if min_micro_batch_size <= 0 or max_micro_batch_size <= 0:
- logger.info(
- f"min_micro_batch_size = {min_micro_batch_size}, max_micro_batch_size = {max_micro_batch_size}"
- )
- return [], 0
- # NUM_GPUS=$(( ${NUM_WORKERS} * ${NUM_GPUS_PER_WORKER} ))
- # DP_SIZE=$(( ${NUM_GPUS} / (${PP_SIZE} * ${MP_SIZE}) ))
- # GRAD_ACC_STEPS=$(( ${TARGET_GLOBAL_BATCH_SIZE} / (${BATCH_SIZE} * ${DP_SIZE}) ))
- if self.max_train_batch_size() and self.max_train_batch_size(
- ) > 0: # if the user specifies a max_train_batch_size
- max_train_batch_size_per_gpu = self.max_train_batch_size() * self.mp_size(
- ) // (self.exp_num_gpus * self.exp_num_nodes)
- else:
- gas = self.get_gas_from_user_config()
- max_train_batch_size_per_gpu = max_micro_batch_size * gas // self.mp_size()
- logger.info(f"max_train_batch_size_per_gpu = {max_train_batch_size_per_gpu}")
- if min_micro_batch_size < max_micro_batch_size // 2:
- min_micro_batch_size = max_micro_batch_size // 2
- # constant stride
- stride = (max_micro_batch_size -
- min_micro_batch_size) // num_tuning_micro_batch_sizes
- if stride == 0:
- stride = 1
- ls = []
- min_gas = max_train_batch_size_per_gpu // max_micro_batch_size
- # if gas is the same as min_gas, do not add mbs to the tuning list
- for mbs in range(min_micro_batch_size, max_micro_batch_size, stride):
- if max_micro_batch_size // mbs != min_gas:
- ls.append(mbs)
- ls.append(max_micro_batch_size)
- return ls, max_train_batch_size_per_gpu
- def run_ds_config(self, ds_config, exp_name):
- exp_config = {}
- exp_config['name'] = exp_name
- exp_config[DS_CONFIG] = ds_config
- exp_config['num_gpus'] = self.exp_num_gpus
- exp_config['num_nodes'] = self.exp_num_nodes
- exp_path = os.path.join(self.exps_dir, f'{exp_name}.json')
- logger.debug(f'run_ds_config exp_name = {exp_name}')
- with open(exp_path, 'w', buffering=BUFSIZE) as fd:
- json.dump(exp_config, fd)
- fd.flush()
- os.fsync(fd)
- self.rm.schedule_experiments([exp_path])
- self.rm.run()
- exp, metric_val = self.rm.parse_results(self.metric())
- self.rm.clear()
- return exp, metric_val
- def run_after_tuning(self):
- """ Launches the training with the optmimal DeepSpeed configuration found through the autotuning process.
- "ds_config_optimal.json" describing the optmimal DeepSpeed configuration as well the command used to launch training "cmd_optimal.txt" are saved to self.results_dir.
- """
- best_space_records = self.get_best_space_records()
- if GLOBAL_TUNING_SPACE not in best_space_records:
- return
- best_exp, best_metric_val, _ = best_space_records[GLOBAL_TUNING_SPACE]
- if best_exp:
- logger.info(
- "Start training with the optmimal DeepSpeed configuration found through the tuning process"
- )
- exp_dir = best_exp["result_dir"]
- cmd = None
- with open(os.path.join(exp_dir, "cmd.txt"), "r") as f:
- cmd = [str(i) for i in f.read().split()]
- ds_config = hjson.load(open(os.path.join(exp_dir, "ds_config.json"), "r"))
- ds_config.pop(AUTOTUNING)
- ds_config_path = os.path.join(self.results_dir, "ds_config_optimal.json")
- json.dump(ds_config, open(ds_config_path, "w"))
- idx = cmd.index(os.path.join(exp_dir, "ds_config.json"))
- cmd[idx] = ds_config_path
- cmd_path = os.path.join(self.results_dir, "cmd_optimal.txt")
- with open(cmd_path, "w") as fd:
- fd.write(" ".join(cmd))
- fd.write("\n")
- fd.flush()
- result = subprocess.Popen(cmd)
- result.wait()
- logger.info(
- f"Done running with the optimal DeepSpeed configuration found by autotuning: {ds_config_path}"
- )
|