__init__.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. '''
  2. Copyright 2020 The Microsoft DeepSpeed Team
  3. '''
  4. import sys
  5. import types
  6. from typing import Optional, Union
  7. import torch
  8. from torch.optim import Optimizer
  9. from torch.optim.lr_scheduler import _LRScheduler
  10. from packaging import version as pkg_version
  11. from . import ops
  12. from . import module_inject
  13. from .runtime.engine import DeepSpeedEngine, DeepSpeedOptimizerCallable, DeepSpeedSchedulerCallable
  14. from .runtime.engine import ADAM_OPTIMIZER, LAMB_OPTIMIZER
  15. from .runtime.pipe.engine import PipelineEngine
  16. from .inference.engine import InferenceEngine
  17. from .runtime.lr_schedules import add_tuning_arguments
  18. from .runtime.config import DeepSpeedConfig, DeepSpeedConfigError
  19. from .runtime.activation_checkpointing import checkpointing
  20. from .ops.transformer import DeepSpeedTransformerLayer, DeepSpeedTransformerConfig
  21. from .module_inject import replace_transformer_layer, revert_transformer_layer
  22. from .utils import log_dist
  23. from .utils.distributed import init_distributed
  24. from .runtime import zero
  25. from .pipe import PipelineModule
  26. from .git_version_info import version, git_hash, git_branch
  27. def _parse_version(version_str):
  28. '''Parse a version string and extract the major, minor, and patch versions.'''
  29. ver = pkg_version.parse(version_str)
  30. return ver.major, ver.minor, ver.micro
  31. # Export version information
  32. __version__ = version
  33. __version_major__, __version_minor__, __version_patch__ = _parse_version(__version__)
  34. __git_hash__ = git_hash
  35. __git_branch__ = git_branch
  36. def initialize(args=None,
  37. model: torch.nn.Module = None,
  38. optimizer: Optional[Union[Optimizer,
  39. DeepSpeedOptimizerCallable]] = None,
  40. model_parameters: Optional[torch.nn.Module] = None,
  41. training_data: Optional[torch.utils.data.Dataset] = None,
  42. lr_scheduler: Optional[Union[_LRScheduler,
  43. DeepSpeedSchedulerCallable]] = None,
  44. mpu=None,
  45. dist_init_required: Optional[bool] = None,
  46. collate_fn=None,
  47. config=None,
  48. config_params=None):
  49. """Initialize the DeepSpeed Engine.
  50. Arguments:
  51. args: an object containing local_rank and deepspeed_config fields.
  52. This is optional if `config` is passed.
  53. model: Required: nn.module class before apply any wrappers
  54. optimizer: Optional: a user defined Optimizer or Callable that returns an Optimizer object.
  55. This overrides any optimizer definition in the DeepSpeed json config.
  56. model_parameters: Optional: An iterable of torch.Tensors or dicts.
  57. Specifies what Tensors should be optimized.
  58. training_data: Optional: Dataset of type torch.utils.data.Dataset
  59. lr_scheduler: Optional: Learning Rate Scheduler Object or a Callable that takes an Optimizer and returns a Scheduler object.
  60. The scheduler object should define a get_lr(), step(), state_dict(), and load_state_dict() methods
  61. mpu: Optional: A model parallelism unit object that implements
  62. get_{model,data}_parallel_{rank,group,world_size}()
  63. dist_init_required: Optional: None will auto-initialize torch.distributed if needed,
  64. otherwise the user can force it to be initialized or not via boolean.
  65. collate_fn: Optional: Merges a list of samples to form a
  66. mini-batch of Tensor(s). Used when using batched loading from a
  67. map-style dataset.
  68. config: Optional: Instead of requiring args.deepspeed_config you can pass your deepspeed config
  69. as an argument instead, as a path or a dictionary.
  70. config_params: Optional: Same as `config`, kept for backwards compatibility.
  71. Returns:
  72. A tuple of ``engine``, ``optimizer``, ``training_dataloader``, ``lr_scheduler``
  73. * ``engine``: DeepSpeed runtime engine which wraps the client model for distributed training.
  74. * ``optimizer``: Wrapped optimizer if a user defined ``optimizer`` is supplied, or if
  75. optimizer is specified in json config else ``None``.
  76. * ``training_dataloader``: DeepSpeed dataloader if ``training_data`` was supplied,
  77. otherwise ``None``.
  78. * ``lr_scheduler``: Wrapped lr scheduler if user ``lr_scheduler`` is passed, or
  79. if ``lr_scheduler`` specified in JSON configuration. Otherwise ``None``.
  80. """
  81. log_dist("DeepSpeed info: version={}, git-hash={}, git-branch={}".format(
  82. __version__,
  83. __git_hash__,
  84. __git_branch__),
  85. ranks=[0])
  86. assert model is not None, "deepspeed.initialize requires a model"
  87. if not isinstance(model, PipelineModule):
  88. engine = DeepSpeedEngine(args=args,
  89. model=model,
  90. optimizer=optimizer,
  91. model_parameters=model_parameters,
  92. training_data=training_data,
  93. lr_scheduler=lr_scheduler,
  94. mpu=mpu,
  95. dist_init_required=dist_init_required,
  96. collate_fn=collate_fn,
  97. config=config,
  98. config_params=config_params)
  99. else:
  100. assert mpu is None, "mpu must be None with pipeline parallelism"
  101. engine = PipelineEngine(args=args,
  102. model=model,
  103. optimizer=optimizer,
  104. model_parameters=model_parameters,
  105. training_data=training_data,
  106. lr_scheduler=lr_scheduler,
  107. mpu=model.mpu(),
  108. dist_init_required=dist_init_required,
  109. collate_fn=collate_fn,
  110. config=config,
  111. config_params=config_params)
  112. return_items = [
  113. engine,
  114. engine.optimizer,
  115. engine.training_dataloader,
  116. engine.lr_scheduler
  117. ]
  118. return tuple(return_items)
  119. def _add_core_arguments(parser):
  120. r"""Helper (internal) function to update an argument parser with an argument group of the core DeepSpeed arguments.
  121. The core set of DeepSpeed arguments include the following:
  122. 1) --deepspeed: boolean flag to enable DeepSpeed
  123. 2) --deepspeed_config <json file path>: path of a json configuration file to configure DeepSpeed runtime.
  124. This is a helper function to the public add_config_arguments()
  125. Arguments:
  126. parser: argument parser
  127. Return:
  128. parser: Updated Parser
  129. """
  130. group = parser.add_argument_group('DeepSpeed', 'DeepSpeed configurations')
  131. group.add_argument(
  132. '--deepspeed',
  133. default=False,
  134. action='store_true',
  135. help=
  136. 'Enable DeepSpeed (helper flag for user code, no impact on DeepSpeed backend)')
  137. group.add_argument('--deepspeed_config',
  138. default=None,
  139. type=str,
  140. help='DeepSpeed json configuration file.')
  141. group.add_argument(
  142. '--deepscale',
  143. default=False,
  144. action='store_true',
  145. help=
  146. 'Deprecated enable DeepSpeed (helper flag for user code, no impact on DeepSpeed backend)'
  147. )
  148. group.add_argument('--deepscale_config',
  149. default=None,
  150. type=str,
  151. help='Deprecated DeepSpeed json configuration file.')
  152. group.add_argument(
  153. '--deepspeed_mpi',
  154. default=False,
  155. action='store_true',
  156. help=
  157. "Run via MPI, this will attempt to discover the necessary variables to initialize torch "
  158. "distributed from the MPI environment")
  159. return parser
  160. def add_config_arguments(parser):
  161. r"""Update the argument parser to enabling parsing of DeepSpeed command line arguments.
  162. The set of DeepSpeed arguments include the following:
  163. 1) --deepspeed: boolean flag to enable DeepSpeed
  164. 2) --deepspeed_config <json file path>: path of a json configuration file to configure DeepSpeed runtime.
  165. Arguments:
  166. parser: argument parser
  167. Return:
  168. parser: Updated Parser
  169. """
  170. parser = _add_core_arguments(parser)
  171. return parser
  172. def init_inference(model,
  173. mp_size=1,
  174. mpu=None,
  175. checkpoint=None,
  176. module_key='module',
  177. dtype=None,
  178. injection_policy=None,
  179. replace_method='auto',
  180. quantization_setting=None,
  181. replace_with_kernel_inject=False,
  182. return_tuple=True):
  183. """Initialize the DeepSpeed InferenceEngine.
  184. Arguments:
  185. model: Required: nn.module class before apply any wrappers
  186. mp_size: Optional: Desired model parallel size, default is 1 meaning no
  187. model parallelism.
  188. mpu: Optional: A model parallelism unit object that implements
  189. get_{model,data}_parallel_{rank,group,world_size}()
  190. checkpoint: Optional: Path to deepspeed compatible checkpoint or path to
  191. JSON with load policy.
  192. dtype: Optional: Desired model data type, will convert model to this type.
  193. Supported target types: torch.half, torch.int8, torch.float
  194. injection_policy: Optional: Dictionary mapping a client nn.Module to its corresponding
  195. injection policy. e.g., {BertLayer : deepspeed.inference.HFBertLayerPolicy}
  196. replace_method: Optional: If 'auto' DeepSpeed will automatically try and replace
  197. model modules with its optimized versions. If an injection_policy is set this will
  198. override the automatic replacement behavior.
  199. quantization_setting: Optional: Quantization settings used for quantizing your model using the MoQ.
  200. The setting can be one element or a tuple. If one value is passed in, we consider it as the number
  201. of groups used in quantization. A tuple is passed in if we want to mention that there is extra-grouping
  202. for the MLP part of a Transformer layer (e.g. (True, 8) shows we quantize the model using 8 groups for
  203. all the network except the MLP part that we use 8 extra grouping).
  204. replace_with_kernel_inject: If set we inject kernel as we initialize the inference-engine
  205. Returns:
  206. A deepspeed.InferenceEngine wrapped model.
  207. """
  208. log_dist("DeepSpeed info: version={}, git-hash={}, git-branch={}".format(
  209. __version__,
  210. __git_hash__,
  211. __git_branch__),
  212. ranks=[0])
  213. if isinstance(model, PipelineModule):
  214. raise NotImplementedError("pipeline module support is not implemented yet")
  215. else:
  216. engine = InferenceEngine(model,
  217. mp_size,
  218. mpu,
  219. checkpoint,
  220. dtype,
  221. injection_policy,
  222. return_tuple,
  223. replace_method,
  224. quantization_setting,
  225. replace_with_kernel_inject)
  226. return engine