__init__.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. '''
  2. Copyright 2020 The Microsoft DeepSpeed Team
  3. '''
  4. import sys
  5. import types
  6. import json
  7. from typing import Optional, Union
  8. import torch
  9. from torch.optim import Optimizer
  10. from torch.optim.lr_scheduler import _LRScheduler
  11. from packaging import version as pkg_version
  12. from . import ops
  13. from . import module_inject
  14. from .runtime.engine import DeepSpeedEngine, DeepSpeedOptimizerCallable, DeepSpeedSchedulerCallable
  15. from .runtime.engine import ADAM_OPTIMIZER, LAMB_OPTIMIZER
  16. from .runtime.pipe.engine import PipelineEngine
  17. from .inference.engine import InferenceEngine
  18. from .inference.config import DeepSpeedInferenceConfig
  19. from .runtime.lr_schedules import add_tuning_arguments
  20. from .runtime.config import DeepSpeedConfig, DeepSpeedConfigError
  21. from .runtime.activation_checkpointing import checkpointing
  22. from .ops.transformer import DeepSpeedTransformerLayer, DeepSpeedTransformerConfig
  23. from .module_inject import replace_transformer_layer, revert_transformer_layer
  24. from .utils import log_dist, OnDevice
  25. from .comm.comm import init_distributed
  26. from .runtime import zero
  27. from .runtime import DeepSpeedOptimizer, ZeROOptimizer
  28. from .pipe import PipelineModule
  29. from .git_version_info import version, git_hash, git_branch
  30. def _parse_version(version_str):
  31. '''Parse a version string and extract the major, minor, and patch versions.'''
  32. ver = pkg_version.parse(version_str)
  33. return ver.major, ver.minor, ver.micro
  34. # Export version information
  35. __version__ = version
  36. __version_major__, __version_minor__, __version_patch__ = _parse_version(__version__)
  37. __git_hash__ = git_hash
  38. __git_branch__ = git_branch
  39. def initialize(args=None,
  40. model: torch.nn.Module = None,
  41. optimizer: Optional[Union[Optimizer,
  42. DeepSpeedOptimizerCallable]] = None,
  43. model_parameters: Optional[torch.nn.Module] = None,
  44. training_data: Optional[torch.utils.data.Dataset] = None,
  45. lr_scheduler: Optional[Union[_LRScheduler,
  46. DeepSpeedSchedulerCallable]] = None,
  47. mpu=None,
  48. dist_init_required: Optional[bool] = None,
  49. collate_fn=None,
  50. config=None,
  51. config_params=None):
  52. """Initialize the DeepSpeed Engine.
  53. Arguments:
  54. args: an object containing local_rank and deepspeed_config fields.
  55. This is optional if `config` is passed.
  56. model: Required: nn.module class before apply any wrappers
  57. optimizer: Optional: a user defined Optimizer or Callable that returns an Optimizer object.
  58. This overrides any optimizer definition in the DeepSpeed json config.
  59. model_parameters: Optional: An iterable of torch.Tensors or dicts.
  60. Specifies what Tensors should be optimized.
  61. training_data: Optional: Dataset of type torch.utils.data.Dataset
  62. lr_scheduler: Optional: Learning Rate Scheduler Object or a Callable that takes an Optimizer and returns a Scheduler object.
  63. The scheduler object should define a get_lr(), step(), state_dict(), and load_state_dict() methods
  64. mpu: Optional: A model parallelism unit object that implements
  65. get_{model,data}_parallel_{rank,group,world_size}()
  66. dist_init_required: Optional: None will auto-initialize torch distributed if needed,
  67. otherwise the user can force it to be initialized or not via boolean.
  68. collate_fn: Optional: Merges a list of samples to form a
  69. mini-batch of Tensor(s). Used when using batched loading from a
  70. map-style dataset.
  71. config: Optional: Instead of requiring args.deepspeed_config you can pass your deepspeed config
  72. as an argument instead, as a path or a dictionary.
  73. config_params: Optional: Same as `config`, kept for backwards compatibility.
  74. Returns:
  75. A tuple of ``engine``, ``optimizer``, ``training_dataloader``, ``lr_scheduler``
  76. * ``engine``: DeepSpeed runtime engine which wraps the client model for distributed training.
  77. * ``optimizer``: Wrapped optimizer if a user defined ``optimizer`` is supplied, or if
  78. optimizer is specified in json config else ``None``.
  79. * ``training_dataloader``: DeepSpeed dataloader if ``training_data`` was supplied,
  80. otherwise ``None``.
  81. * ``lr_scheduler``: Wrapped lr scheduler if user ``lr_scheduler`` is passed, or
  82. if ``lr_scheduler`` specified in JSON configuration. Otherwise ``None``.
  83. """
  84. log_dist("DeepSpeed info: version={}, git-hash={}, git-branch={}".format(
  85. __version__,
  86. __git_hash__,
  87. __git_branch__),
  88. ranks=[0])
  89. # Disable zero.Init context if it's currently enabled
  90. zero.partition_parameters.shutdown_init_context()
  91. assert model is not None, "deepspeed.initialize requires a model"
  92. if not isinstance(model, PipelineModule):
  93. engine = DeepSpeedEngine(args=args,
  94. model=model,
  95. optimizer=optimizer,
  96. model_parameters=model_parameters,
  97. training_data=training_data,
  98. lr_scheduler=lr_scheduler,
  99. mpu=mpu,
  100. dist_init_required=dist_init_required,
  101. collate_fn=collate_fn,
  102. config=config,
  103. config_params=config_params)
  104. else:
  105. assert mpu is None, "mpu must be None with pipeline parallelism"
  106. engine = PipelineEngine(args=args,
  107. model=model,
  108. optimizer=optimizer,
  109. model_parameters=model_parameters,
  110. training_data=training_data,
  111. lr_scheduler=lr_scheduler,
  112. mpu=model.mpu(),
  113. dist_init_required=dist_init_required,
  114. collate_fn=collate_fn,
  115. config=config,
  116. config_params=config_params)
  117. return_items = [
  118. engine,
  119. engine.optimizer,
  120. engine.training_dataloader,
  121. engine.lr_scheduler
  122. ]
  123. return tuple(return_items)
  124. def _add_core_arguments(parser):
  125. r"""Helper (internal) function to update an argument parser with an argument group of the core DeepSpeed arguments.
  126. The core set of DeepSpeed arguments include the following:
  127. 1) --deepspeed: boolean flag to enable DeepSpeed
  128. 2) --deepspeed_config <json file path>: path of a json configuration file to configure DeepSpeed runtime.
  129. This is a helper function to the public add_config_arguments()
  130. Arguments:
  131. parser: argument parser
  132. Return:
  133. parser: Updated Parser
  134. """
  135. group = parser.add_argument_group('DeepSpeed', 'DeepSpeed configurations')
  136. group.add_argument(
  137. '--deepspeed',
  138. default=False,
  139. action='store_true',
  140. help=
  141. 'Enable DeepSpeed (helper flag for user code, no impact on DeepSpeed backend)')
  142. group.add_argument('--deepspeed_config',
  143. default=None,
  144. type=str,
  145. help='DeepSpeed json configuration file.')
  146. group.add_argument(
  147. '--deepscale',
  148. default=False,
  149. action='store_true',
  150. help=
  151. 'Deprecated enable DeepSpeed (helper flag for user code, no impact on DeepSpeed backend)'
  152. )
  153. group.add_argument('--deepscale_config',
  154. default=None,
  155. type=str,
  156. help='Deprecated DeepSpeed json configuration file.')
  157. group.add_argument(
  158. '--deepspeed_mpi',
  159. default=False,
  160. action='store_true',
  161. help=
  162. "Run via MPI, this will attempt to discover the necessary variables to initialize torch "
  163. "distributed from the MPI environment")
  164. return parser
  165. def add_config_arguments(parser):
  166. r"""Update the argument parser to enabling parsing of DeepSpeed command line arguments.
  167. The set of DeepSpeed arguments include the following:
  168. 1) --deepspeed: boolean flag to enable DeepSpeed
  169. 2) --deepspeed_config <json file path>: path of a json configuration file to configure DeepSpeed runtime.
  170. Arguments:
  171. parser: argument parser
  172. Return:
  173. parser: Updated Parser
  174. """
  175. parser = _add_core_arguments(parser)
  176. return parser
  177. def default_inference_config():
  178. """
  179. Return a default DeepSpeed inference configuration dictionary.
  180. """
  181. return DeepSpeedInferenceConfig().dict()
  182. def init_inference(model, config=None, **kwargs):
  183. """Initialize the DeepSpeed InferenceEngine.
  184. Description: all four cases are valid and supported in DS init_inference() API.
  185. # Case 1: user provides no config and no kwargs. Default config will be used.
  186. .. code-block:: python
  187. generator.model = deepspeed.init_inference(generator.model)
  188. string = generator("DeepSpeed is")
  189. print(string)
  190. # Case 2: user provides a config and no kwargs. User supplied config will be used.
  191. .. code-block:: python
  192. generator.model = deepspeed.init_inference(generator.model, config=config)
  193. string = generator("DeepSpeed is")
  194. print(string)
  195. # Case 3: user provides no config and uses keyword arguments (kwargs) only.
  196. .. code-block:: python
  197. generator.model = deepspeed.init_inference(generator.model,
  198. mp_size=world_size,
  199. dtype=torch.half,
  200. replace_with_kernel_inject=True)
  201. string = generator("DeepSpeed is")
  202. print(string)
  203. # Case 4: user provides config and keyword arguments (kwargs). Both config and kwargs are merged and kwargs take precedence.
  204. .. code-block:: python
  205. generator.model = deepspeed.init_inference(generator.model, config={"dtype": torch.half}, replace_with_kernel_inject=True)
  206. string = generator("DeepSpeed is")
  207. print(string)
  208. Arguments:
  209. model: Required: original nn.module object without any wrappers
  210. config: Optional: instead of arguments, you can pass in a DS inference config dict or path to JSON file
  211. Returns:
  212. A deepspeed.InferenceEngine wrapped model.
  213. """
  214. log_dist("DeepSpeed info: version={}, git-hash={}, git-branch={}".format(
  215. __version__,
  216. __git_hash__,
  217. __git_branch__),
  218. ranks=[0])
  219. # Load config_dict from config first
  220. if config is None:
  221. config = {}
  222. if isinstance(config, str):
  223. with open(config, "r") as f:
  224. config_dict = json.load(f)
  225. elif isinstance(config, dict):
  226. config_dict = config
  227. else:
  228. raise ValueError(
  229. f"'config' argument expected string or dictionary, got {type(config)}")
  230. # Update with values from kwargs, ensuring no conflicting overlap between config and kwargs
  231. overlap_keys = set(config_dict.keys()).intersection(kwargs.keys())
  232. # If there is overlap, error out if values are different
  233. for key in overlap_keys:
  234. if config_dict[key] != kwargs[key]:
  235. raise ValueError(
  236. f"Conflicting argument '{key}' in 'config':{config_dict[key]} and kwargs:{kwargs[key]}"
  237. )
  238. config_dict.update(kwargs)
  239. ds_inference_config = DeepSpeedInferenceConfig(**config_dict)
  240. engine = InferenceEngine(model, config=ds_inference_config)
  241. return engine