123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- # Copyright (c) Microsoft Corporation.
- # SPDX-License-Identifier: Apache-2.0
- # DeepSpeed Team
- import json
- from .constants import *
- class ElasticityError(Exception):
- """
- Base exception for all elasticity related errors
- """
- class ElasticityConfigError(ElasticityError):
- """
- Elasticity configuration error
- """
- class ElasticityIncompatibleWorldSize(ElasticityError):
- """
- Attempting to run a world size that is incompatible with a given elastic config
- """
- class ElasticityConfig:
- """
- Elastic config object, constructed from a param dictionary that only contains elastic
- config parameters, example below:
- If elasticity is enabled, user must specify (at least) max_train_batch_size
- and micro_batch_sizes.
- {
- "enabled": true,
- "max_train_batch_size": 2000,
- "micro_batch_sizes": [2,4,6],
- "min_gpus": 1,
- "max_gpus" : 10000
- "min_time": 20
- "ignore_non_elastic_batch_info": false
- "version": 0.1
- }
- """
- def __init__(self, param_dict):
- self.enabled = param_dict.get(ENABLED, ENABLED_DEFAULT)
- if self.enabled:
- if MAX_ACCEPTABLE_BATCH_SIZE in param_dict:
- self.max_acceptable_batch_size = param_dict[MAX_ACCEPTABLE_BATCH_SIZE]
- else:
- raise ElasticityConfigError(f"Elasticity config missing {MAX_ACCEPTABLE_BATCH_SIZE}")
- if MICRO_BATCHES in param_dict:
- self.micro_batches = param_dict[MICRO_BATCHES]
- else:
- raise ElasticityConfigError(f"Elasticity config missing {MICRO_BATCHES}")
- else:
- self.max_acceptable_batch_size = param_dict.get(MAX_ACCEPTABLE_BATCH_SIZE,
- MAX_ACCEPTABLE_BATCH_SIZE_DEFAULT)
- self.micro_batches = param_dict.get(MICRO_BATCHES, MICRO_BATCHES_DEFAULT)
- if not isinstance(self.micro_batches, list):
- raise ElasticityConfigError(
- f"Elasticity expected value of {MICRO_BATCHES} to be a "
- f"list of micro batches, instead is: {type(self.micro_batches)}, containing: {self.micro_batches}")
- if not all(map(lambda m: isinstance(m, int), self.micro_batches)):
- raise ElasticityConfigError(f"Elasticity expected {MICRO_BATCHES} to only contain a list of integers, "
- f"instead contains: f{self.micro_batches}")
- if not all(map(lambda m: m > 0, self.micro_batches)):
- raise ElasticityConfigError(f"Elasticity expected {MICRO_BATCHES} to only contain positive integers, "
- f"instead contains: f{self.micro_batches}")
- self.min_gpus = param_dict.get(MIN_GPUS, MIN_GPUS_DEFAULT)
- self.max_gpus = param_dict.get(MAX_GPUS, MAX_GPUS_DEFAULT)
- if self.min_gpus < 1 or self.max_gpus < 1:
- raise ElasticityConfigError("Elasticity min/max gpus must be > 0, "
- f"given min_gpus: {self.min_gpus}, max_gpus: {self.max_gpus}")
- if self.max_gpus < self.min_gpus:
- raise ElasticityConfigError("Elasticity min_gpus cannot be greater than max_gpus, "
- f"given min_gpus: {self.min_gpus}, max_gpus: {self.max_gpus}")
- self.model_parallel_size = param_dict.get(MODEL_PARALLEL_SIZE, MODEL_PARALLEL_SIZE_DEFAULT)
- if self.model_parallel_size < 1:
- raise ElasticityConfigError("Model-Parallel size cannot be less than 1, "
- f"given model-parallel size: {self.model_parallel_size}")
- self.num_gpus_per_node = param_dict.get(NUM_GPUS_PER_NODE, NUM_GPUS_PER_NODE_DEFAULT)
- if self.num_gpus_per_node < 1:
- raise ElasticityConfigError("Number of GPUs per node cannot be less than 1, "
- f"given number of GPUs per node: {self.num_gpus_per_node}")
- self.min_time = param_dict.get(MIN_TIME, MIN_TIME_DEFAULT)
- if self.min_time < 0:
- raise ElasticityConfigError(f"Elasticity min time needs to be >= 0: given {self.min_time}")
- self.version = param_dict.get(VERSION, VERSION_DEFAULT)
- self.prefer_larger_batch_size = param_dict.get(PREFER_LARGER_BATCH, PREFER_LARGER_BATCH_DEFAULT)
- self.ignore_non_elastic_batch_info = param_dict.get(IGNORE_NON_ELASTIC_BATCH_INFO,
- IGNORE_NON_ELASTIC_BATCH_INFO_DEFAULT)
- def repr(self):
- return self.__dict__
- def __repr__(self):
- return json.dumps(self.__dict__, sort_keys=True, indent=4)
|