# Copyright (c) Microsoft Corporation. # SPDX-License-Identifier: Apache-2.0 # DeepSpeed Team import json from .constants import * class ElasticityError(Exception): """ Base exception for all elasticity related errors """ class ElasticityConfigError(ElasticityError): """ Elasticity configuration error """ class ElasticityIncompatibleWorldSize(ElasticityError): """ Attempting to run a world size that is incompatible with a given elastic config """ class ElasticityConfig: """ Elastic config object, constructed from a param dictionary that only contains elastic config parameters, example below: If elasticity is enabled, user must specify (at least) max_train_batch_size and micro_batch_sizes. { "enabled": true, "max_train_batch_size": 2000, "micro_batch_sizes": [2,4,6], "min_gpus": 1, "max_gpus" : 10000 "min_time": 20 "ignore_non_elastic_batch_info": false "version": 0.1 } """ def __init__(self, param_dict): self.enabled = param_dict.get(ENABLED, ENABLED_DEFAULT) if self.enabled: if MAX_ACCEPTABLE_BATCH_SIZE in param_dict: self.max_acceptable_batch_size = param_dict[MAX_ACCEPTABLE_BATCH_SIZE] else: raise ElasticityConfigError(f"Elasticity config missing {MAX_ACCEPTABLE_BATCH_SIZE}") if MICRO_BATCHES in param_dict: self.micro_batches = param_dict[MICRO_BATCHES] else: raise ElasticityConfigError(f"Elasticity config missing {MICRO_BATCHES}") else: self.max_acceptable_batch_size = param_dict.get(MAX_ACCEPTABLE_BATCH_SIZE, MAX_ACCEPTABLE_BATCH_SIZE_DEFAULT) self.micro_batches = param_dict.get(MICRO_BATCHES, MICRO_BATCHES_DEFAULT) if not isinstance(self.micro_batches, list): raise ElasticityConfigError( f"Elasticity expected value of {MICRO_BATCHES} to be a " f"list of micro batches, instead is: {type(self.micro_batches)}, containing: {self.micro_batches}") if not all(map(lambda m: isinstance(m, int), self.micro_batches)): raise ElasticityConfigError(f"Elasticity expected {MICRO_BATCHES} to only contain a list of integers, " f"instead contains: f{self.micro_batches}") if not all(map(lambda m: m > 0, self.micro_batches)): raise ElasticityConfigError(f"Elasticity expected {MICRO_BATCHES} to only contain positive integers, " f"instead contains: f{self.micro_batches}") self.min_gpus = param_dict.get(MIN_GPUS, MIN_GPUS_DEFAULT) self.max_gpus = param_dict.get(MAX_GPUS, MAX_GPUS_DEFAULT) if self.min_gpus < 1 or self.max_gpus < 1: raise ElasticityConfigError("Elasticity min/max gpus must be > 0, " f"given min_gpus: {self.min_gpus}, max_gpus: {self.max_gpus}") if self.max_gpus < self.min_gpus: raise ElasticityConfigError("Elasticity min_gpus cannot be greater than max_gpus, " f"given min_gpus: {self.min_gpus}, max_gpus: {self.max_gpus}") self.model_parallel_size = param_dict.get(MODEL_PARALLEL_SIZE, MODEL_PARALLEL_SIZE_DEFAULT) if self.model_parallel_size < 1: raise ElasticityConfigError("Model-Parallel size cannot be less than 1, " f"given model-parallel size: {self.model_parallel_size}") self.num_gpus_per_node = param_dict.get(NUM_GPUS_PER_NODE, NUM_GPUS_PER_NODE_DEFAULT) if self.num_gpus_per_node < 1: raise ElasticityConfigError("Number of GPUs per node cannot be less than 1, " f"given number of GPUs per node: {self.num_gpus_per_node}") self.min_time = param_dict.get(MIN_TIME, MIN_TIME_DEFAULT) if self.min_time < 0: raise ElasticityConfigError(f"Elasticity min time needs to be >= 0: given {self.min_time}") self.version = param_dict.get(VERSION, VERSION_DEFAULT) self.prefer_larger_batch_size = param_dict.get(PREFER_LARGER_BATCH, PREFER_LARGER_BATCH_DEFAULT) self.ignore_non_elastic_batch_info = param_dict.get(IGNORE_NON_ELASTIC_BATCH_INFO, IGNORE_NON_ELASTIC_BATCH_INFO_DEFAULT) def repr(self): return self.__dict__ def __repr__(self): return json.dumps(self.__dict__, sort_keys=True, indent=4)