engine.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602
  1. # Copyright (c) Microsoft Corporation.
  2. # SPDX-License-Identifier: Apache-2.0
  3. # DeepSpeed Team
  4. import torch
  5. import time
  6. import os
  7. from deepspeed import comm as dist
  8. from deepspeed.utils.logging import log_dist
  9. from torch.nn.modules import Module
  10. from packaging import version as pkg_version
  11. from deepspeed.runtime.checkpoint_engine.torch_checkpoint_engine import TorchCheckpointEngine
  12. from deepspeed.utils.timer import SynchronizedWallClockTimer
  13. from ..runtime.state_dict_factory import SDLoaderFactory
  14. from ..runtime.weight_quantizer import WeightQuantization
  15. from ..module_inject import replace_transformer_layer, generic_injection
  16. from ..comm.comm import init_distributed
  17. from ..pipe import PipelineModule
  18. from ..moe.utils import has_moe_layers
  19. from ..module_inject import LinearAllreduce, LinearLayer, Normalize, ReplaceWithTensorSlicing
  20. from deepspeed.accelerator import get_accelerator
  21. from ..module_inject.policy import TransformerPolicy
  22. from ..module_inject.auto_tp import AutoTP
  23. from ..module_inject.replace_policy import generic_policies
  24. from ..module_inject.auto_tp_model_utils import build_bloom_alibi_tensor, build_mpt_atten_bias_tensor, build_mpt_alibi_tensor
  25. DS_INFERENCE_ENABLED = False
  26. from torch import nn
  27. INFERENCE_MODEL_TIMER = "model-forward-inference"
  28. class InferenceEngine(Module):
  29. inference_mp_group = None
  30. inference_ep_group = None
  31. expert_mp_group = None
  32. def __init__(self, model, config):
  33. """
  34. Args:
  35. model: torch.nn.Module
  36. config: DeepSpeedInferenceConfig
  37. """
  38. global DS_INFERENCE_ENABLED
  39. DS_INFERENCE_ENABLED = True
  40. super().__init__()
  41. self.module = model
  42. self._config = config
  43. self._get_model_config_generate(config) # keep for weird backward compatibility
  44. # patch model generate with ours if model uses it
  45. if hasattr(self.module, "generate"):
  46. self.generate = self._generate
  47. if hasattr(self.module, "config"):
  48. TransformerPolicy.hf_model_config = self.module.config
  49. # todo: keep this self.injection_dict because we don't use to change config.injection_policy API
  50. # todo: this will get changed when Molly's PR on auto injection dict is merged
  51. self.injection_dict = config.injection_policy
  52. # todo: refactor the mp_group and mp_size related in the next refactor
  53. self.mp_group = config.tensor_parallel.tp_group
  54. self.mpu = config.tensor_parallel.mpu
  55. #self._validate_args(self.mpu, config.replace_with_kernel_inject)
  56. self.quantize_merge_count = 1
  57. self.quantization_scales = None
  58. # these are not needed in the config as we are creating them ourselves in the inference engine
  59. self.ep_group = None # config.moe.ep_group
  60. self.expert_mp_group = None # config.moe.ep_mp_group
  61. self.cuda_graph_created = False
  62. self.checkpoint_engine = TorchCheckpointEngine()
  63. quantization_setting = None
  64. self._init_quantization_setting(
  65. quantization_setting) # todo: update with the new quant config for weight quant
  66. self.model_profile_enabled = False
  67. self._model_times = []
  68. if not self.injection_dict and config.replace_with_kernel_inject:
  69. # This is a hack to remove the prepare_mask function on HF side for BLOOM architecture
  70. self.remove_mask_prepare_for_bloom()
  71. if self.injection_dict or not config.replace_with_kernel_inject:
  72. # This is a hack to redefine the alibi func due to TP
  73. if config.tensor_parallel.tp_size > 1:
  74. self.build_alibi_tensor()
  75. self.build_attn_bias()
  76. if get_accelerator().device_name() == 'cuda' and config.enable_cuda_graph:
  77. assert pkg_version.parse(torch.__version__) >= pkg_version.parse("1.10"), \
  78. "If you want to use cuda graph, please upgrade torch to at least v1.10"
  79. # Check if model passed to engine is loaded w/ meta tensors, in which case
  80. # kernel injection must be enabled.
  81. # NOTE: This check assumes a Hugging Face hierarchy for the device type i.e. module.device.type
  82. self.model_meta_device = self.module.device.type == 'meta' if hasattr(self.module, "device") else False
  83. # convert model to intended dtype
  84. if config.dtype:
  85. self._convert_to_dtype(config)
  86. if self.mpu:
  87. config.tensor_parallel.tp_size = dist.get_world_size(group=self.mpu.get_model_parallel_group())
  88. self.mp_group = self.mpu.get_model_parallel_group()
  89. elif config.tensor_parallel.tp_size > 1:
  90. self._create_model_parallel_group(config)
  91. config.tensor_parallel.tp_group = self.mp_group
  92. if isinstance(self.module, torch.nn.Module):
  93. moe, _ = has_moe_layers(self.module)
  94. else:
  95. moe = False
  96. if moe and dist.get_world_size() > 1:
  97. self._create_ep_parallel_group(config.moe.moe_experts)
  98. # We only support three modes: 1) user specified policy for tensor-parallelism, 2) kernel injection (replace_with_kernel_inject), and 3) automatic tensor parallelism if tp_size > 1.
  99. if self.injection_dict:
  100. # 1. User specified Tensor Parallelism
  101. # Let's relax this constraint a bit, as we can let the user to pass an injection dict, i.e. {layer: injection_policy} to inject kernels
  102. # assert not config.replace_with_kernel_inject, "Cannot use both user specified injection policy and kernel injection"
  103. for client_module, injection_policy in self.injection_dict.items():
  104. assert issubclass(client_module,
  105. torch.nn.Module), f"{client_module} is not a subclass of torch.nn.Module"
  106. # construct the tuple and pass that instead of a string or dict.
  107. if isinstance(injection_policy, str):
  108. config.injection_policy_tuple = (injection_policy, )
  109. else:
  110. config.injection_policy_tuple = injection_policy
  111. layer_names = [name for name, _ in self.module.named_modules()]
  112. for policy in config.injection_policy_tuple:
  113. if not any(name.endswith(policy) for name in layer_names):
  114. raise ValueError(f"Injection policy layer'{policy}' not valid.")
  115. self._apply_injection_policy(config, client_module)
  116. else:
  117. if config.replace_with_kernel_inject:
  118. # 2. DeepSpeed Kernel Injection
  119. self._apply_injection_policy(config)
  120. elif config.tensor_parallel.tp_size > 1:
  121. # 3. Automatic Tensor Parallelism
  122. parser_dict = AutoTP.tp_parser(model)
  123. print("AutoTP: ", parser_dict)
  124. for client_module, injection_policy in parser_dict:
  125. if isinstance(injection_policy, str):
  126. config.injection_policy_tuple = (injection_policy, )
  127. else:
  128. config.injection_policy_tuple = injection_policy
  129. self._apply_injection_policy(config, client_module)
  130. device = get_accelerator().current_device_name()
  131. self.module.to(device)
  132. if config.tensor_parallel.tp_size > 1:
  133. _rng_state = get_accelerator().get_rng_state().to(get_accelerator().current_device_name())
  134. dist.broadcast(_rng_state, 0)
  135. get_accelerator().set_rng_state(_rng_state.cpu())
  136. if config.tensor_parallel.tp_size > 1:
  137. assert not config.enable_cuda_graph, "Cuda graph is not supported for model parallelism"
  138. # Check if local CUDA graphs can be created in replacement modules
  139. self.local_cuda_graph = self._local_cuda_graph_used(self.module)
  140. def profile_model_time(self, use_cuda_events=True):
  141. if not self.model_profile_enabled and not self._config.enable_cuda_graph:
  142. self.module.register_forward_pre_hook(self._pre_forward_hook)
  143. self.module.register_forward_hook(self._post_forward_hook)
  144. self.model_profile_enabled = True
  145. self.use_cuda_events = use_cuda_events
  146. if self.use_cuda_events:
  147. self.timers = SynchronizedWallClockTimer()
  148. # todo: remove this once all the config dicts are centralized from top level pydantic config
  149. def _get_model_config_generate(self, config):
  150. # this is being passed to replace_transformer_layer(config=self.user_model_config_dict)
  151. self.config = getattr(self.module, 'config', None) if config.config is None else config.config
  152. def remove_mask_prepare_for_bloom(self):
  153. if hasattr(self.module, 'transformer'):
  154. if hasattr(self.module.transformer, '_prepare_attn_mask'):
  155. self.module.transformer._prepare_attn_mask = lambda attention_mask, *args, **kwargs: attention_mask
  156. def build_alibi_tensor(self):
  157. if hasattr(self.module, 'transformer'):
  158. if hasattr(self.module.transformer, 'build_alibi_tensor'):
  159. self.module.transformer.build_alibi_tensor = build_bloom_alibi_tensor
  160. if hasattr(self.module.transformer, 'build_mpt_alibi_tensor'):
  161. self.module.transformer.build_mpt_alibi_tensor_orig = self.module.transformer.build_mpt_alibi_tensor
  162. self.module.transformer.__class__.build_mpt_alibi_tensor = build_mpt_alibi_tensor
  163. def build_attn_bias(self):
  164. if hasattr(self.module, 'transformer'):
  165. if hasattr(self.module.transformer, '_attn_bias'):
  166. self.module.transformer._attn_bias_orig = self.module.transformer._attn_bias
  167. self.module.transformer.__class__._attn_bias = build_mpt_atten_bias_tensor
  168. def _pre_forward_hook(self, module, *inputs, **kwargs):
  169. if self.use_cuda_events:
  170. self.timers(INFERENCE_MODEL_TIMER).start()
  171. else:
  172. get_accelerator().synchronize()
  173. self._start = time.time()
  174. def _post_forward_hook(self, module, input, output):
  175. if self.use_cuda_events:
  176. self.timers(INFERENCE_MODEL_TIMER).stop()
  177. elapsed_time = self.timers(INFERENCE_MODEL_TIMER).elapsed(reset=True)
  178. else:
  179. get_accelerator().synchronize()
  180. self._end = time.time()
  181. elapsed_time = (self._end - self._start) * 1e3 # convert seconds to ms
  182. self._model_times.append(elapsed_time)
  183. def _create_model_parallel_group(self, config):
  184. # Call the init process
  185. if InferenceEngine.inference_mp_group is None:
  186. init_distributed()
  187. local_rank = int(os.getenv('LOCAL_RANK', '0'))
  188. get_accelerator().set_device(local_rank)
  189. ranks = [i for i in range(config.tensor_parallel.tp_size)]
  190. self.mp_group = dist.new_group(ranks)
  191. InferenceEngine.inference_mp_group = self.mp_group
  192. else:
  193. self.mp_group = InferenceEngine.inference_mp_group
  194. def _create_ep_parallel_group(self, moe_experts):
  195. # Call the init process
  196. self.ep_group = {}
  197. self.expert_mp_group = {}
  198. moe_experts = moe_experts if type(moe_experts) is list else [moe_experts]
  199. for e in moe_experts:
  200. self.ep_group.update({e: None})
  201. self.expert_mp_group.update({e: None})
  202. for moe_ep_size in self.ep_group.keys():
  203. num_ep_groups = dist.get_world_size() // moe_ep_size
  204. for i in range(num_ep_groups):
  205. ep_cnt = i * moe_ep_size
  206. size = dist.get_world_size() if moe_ep_size > dist.get_world_size() else moe_ep_size
  207. ranks = list(range(ep_cnt, ep_cnt + size))
  208. _ep_group = dist.new_group(ranks)
  209. if dist.get_rank() in ranks:
  210. self.ep_group.update({moe_ep_size: _ep_group})
  211. if dist.get_world_size() > moe_ep_size:
  212. num_expert_mp_groups = dist.get_world_size() // num_ep_groups
  213. expert_mp_size = dist.get_world_size() // moe_ep_size
  214. for i in range(num_expert_mp_groups):
  215. expert_mp_comm_ranks = [i + nr * moe_ep_size for nr in range(expert_mp_size)]
  216. _expert_mp_group = dist.new_group(expert_mp_comm_ranks)
  217. if dist.get_rank() in expert_mp_comm_ranks:
  218. self.expert_mp_group.update({moe_ep_size: _expert_mp_group})
  219. def _init_quantization_setting(self, quantization_setting):
  220. self.quantize_bits = 8
  221. self.mlp_extra_grouping = False
  222. self.quantize_groups = 1
  223. if type(quantization_setting) is tuple:
  224. self.mlp_extra_grouping, \
  225. self.quantize_groups = quantization_setting
  226. elif quantization_setting is not None:
  227. self.quantize_groups = quantization_setting
  228. log_dist(
  229. f"quantize_bits = {self.quantize_bits} "
  230. f"mlp_extra_grouping = {self.mlp_extra_grouping}, "
  231. f"quantize_groups = {self.quantize_groups}", [0])
  232. # TODO: remove this function and add this functionality to pydantic config checking
  233. def _validate_args(self, mpu, replace_with_kernel_inject):
  234. # TODO: to support SD pipeline we need to avoid this check for now
  235. if replace_with_kernel_inject and not isinstance(self.module, Module):
  236. raise ValueError(f"model must be a torch.nn.Module, got {type(self.module)}")
  237. if not isinstance(self._config.tensor_parallel.tp_size, int) or self._config.tensor_parallel.tp_size < 1:
  238. raise ValueError(f"mp_size must be an int >= 1, got {self._config.tensor_parallel.tp_size}")
  239. if mpu:
  240. methods = ["get_model_parallel_group", "get_data_parallel_group"]
  241. for method in methods:
  242. if not hasattr(mpu, method):
  243. raise ValueError(f"mpu is missing {method}")
  244. if self._config.checkpoint is not None and not isinstance(self._config.checkpoint, (str, dict)):
  245. raise ValueError(f"checkpoint must be None, str or dict, got {type(self._config.checkpoint)}")
  246. supported_dtypes = [None, torch.half, torch.int8, torch.float]
  247. if self._config.dtype not in supported_dtypes:
  248. raise ValueError(f"{self._config.dtype} not supported, valid dtype: {supported_dtypes}")
  249. if self.injection_dict is not None and not isinstance(self.injection_dict, dict):
  250. raise ValueError(f"injection_dict must be None or a dict, got: {self.injection_dict}")
  251. def load_model_with_checkpoint(self, r_module):
  252. self.mp_replace = ReplaceWithTensorSlicing(
  253. mp_group=self.mp_group, mp_size=self._config.tensor_parallel.tp_size) #, out_dim=0, in_dim=1)
  254. error_msgs = []
  255. def load(module, state_dict, prefix):
  256. args = (state_dict, prefix, {}, True, [], [], error_msgs)
  257. if hasattr(module, 'weight'):
  258. if module.weight.data.is_meta:
  259. # meta tensor cannot be casted or copied to, so we need to replace it with a normal tensor here
  260. module.weight = torch.nn.parameter.Parameter(data=torch.empty_like(module.weight.data,
  261. device="cpu"),
  262. requires_grad=module.weight.data.requires_grad)
  263. if 'query_key_value' in prefix:
  264. module.weight = self.mp_replace.strided_copy(module.weight.data,
  265. state_dict[prefix + 'weight'],
  266. num_splits=3)
  267. else:
  268. module.weight = self.mp_replace.copy(module.weight.data, state_dict[prefix + 'weight'])
  269. else:
  270. if module.norm.weight.data.is_meta:
  271. # meta tensor cannot be casted or copied to, so we need to replace it with a normal tensor here
  272. module.norm.weight = torch.nn.parameter.Parameter(
  273. data=torch.empty_like(module.norm.weight.data, device="cpu"),
  274. requires_grad=module.norm.weight.data.requires_grad)
  275. module.norm.weight = self.mp_replace.copy(module.norm.weight.data, state_dict[prefix + 'weight'])
  276. if prefix + 'bias' in self.key_list:
  277. if hasattr(module, 'norm'):
  278. if module.norm.bias.data.is_meta:
  279. # meta tensor cannot be casted or copied to, so we need to replace it with a normal tensor here
  280. module.norm.bias = torch.nn.parameter.Parameter(
  281. data=torch.empty_like(module.norm.bias.data, device="cpu"),
  282. requires_grad=module.norm.bias.data.requires_grad)
  283. module.norm.bias = self.mp_replace.copy(module.norm.bias, state_dict[prefix + 'bias'])
  284. else:
  285. if module.bias.data.is_meta:
  286. # meta tensor cannot be casted or copied to, so we need to replace it with a normal tensor here
  287. module.bias = torch.nn.parameter.Parameter(data=torch.empty_like(module.bias.data,
  288. device="cpu"),
  289. requires_grad=module.bias.data.requires_grad)
  290. data = state_dict[prefix + 'bias']
  291. data = data.to(get_accelerator().current_device_name())
  292. module.bias = self.mp_replace.copy(module.bias, data)
  293. layer_policies = {
  294. nn.Linear: load,
  295. nn.Embedding: load,
  296. nn.LayerNorm: load,
  297. LinearLayer: load,
  298. LinearAllreduce: load
  299. }
  300. def load_module_recursive(module, prefix='', level=0):
  301. for name, child in module.named_children():
  302. if child.__class__ in layer_policies:
  303. checking_key = prefix + name + '.'
  304. if not any(checking_key in item for item in self.key_list):
  305. continue
  306. if len(list(child.parameters())) > 0 and list(child.parameters())[0].numel() == 0:
  307. if len(child.weight.ds_shape) == 1:
  308. child = Normalize(dim=child.weight.ds_shape[-1], dtype=child.weight.dtype, eps=child.eps)
  309. setattr(module, name, child)
  310. load(child, self.sd, prefix + name + '.')
  311. else:
  312. load_module_recursive(child, prefix if level == 0 else prefix + name + '.', level + 1)
  313. load_module_recursive(r_module)
  314. embedding_weight = None
  315. for n, p in r_module.named_parameters():
  316. if "word_embeddings." in n or "embed_tokens." in n or "wte." in n:
  317. embedding_weight = p
  318. if embedding_weight is not None and hasattr(r_module, "lm_head") and hasattr(
  319. r_module.lm_head, "weight") and r_module.lm_head.weight.is_meta:
  320. r_module.lm_head.weight = embedding_weight
  321. def _apply_injection_policy(self, config, client_module=None):
  322. # client_module is only passed when using the injection_dict method.
  323. checkpoint_dir = config.checkpoint
  324. checkpoint = SDLoaderFactory.get_sd_loader_json(checkpoint_dir,
  325. self.checkpoint_engine) if checkpoint_dir is not None else None
  326. generic_injection(self.module, dtype=config.dtype, enable_cuda_graph=config.enable_cuda_graph)
  327. if isinstance(self.module, torch.nn.Module):
  328. # config is our DeepSpeedInferenceConfig and self.config is the HF model config
  329. replace_transformer_layer(client_module, self.module, checkpoint, config, self.config)
  330. def _get_all_ckpt_names(self, checkpoints_path, tag):
  331. ckpt_file_pattern = self._get_ckpt_name(checkpoints_path, tag, mp_placeholder="*")
  332. import glob
  333. ckpt_files = glob.glob(ckpt_file_pattern)
  334. ckpt_files.sort()
  335. return ckpt_files
  336. def _get_ckpt_name(self, checkpoints_path, tag, mp_placeholder=None):
  337. if mp_placeholder is not None:
  338. mp_rank_str = mp_placeholder
  339. else:
  340. mp_rank = 0 if self.mpu is None else self.mpu.get_model_parallel_rank()
  341. mp_rank_str = "{:02d}".format(mp_rank)
  342. ckpt_name = os.path.join(
  343. checkpoints_path,
  344. "mp_rank_" + mp_rank_str + "_model_states.pt",
  345. )
  346. return ckpt_name
  347. def _load_checkpoint(self, load_dir, load_module_strict=True, tag=None):
  348. is_pipe_parallel = isinstance(self.module, PipelineModule)
  349. if is_pipe_parallel:
  350. raise RuntimeError('pipeline parallelism is currently not supported in inference.')
  351. if not isinstance(load_dir, dict) and os.path.isdir(load_dir):
  352. if tag is None:
  353. latest_path = os.path.join(load_dir, "latest")
  354. if os.path.isfile(latest_path):
  355. with open(latest_path, "r") as fd:
  356. tag = fd.read().strip()
  357. ckpt_list = self._get_all_ckpt_names(load_dir, tag)
  358. sd_loader = SDLoaderFactory.get_sd_loader(ckpt_list, self.checkpoint_engine)
  359. else:
  360. sd_loader = SDLoaderFactory.get_sd_loader_json(load_dir, self.checkpoint_engine)
  361. checkpoint = sd_loader['checkpoints']
  362. if type(checkpoint) is list:
  363. self.sd = torch.load(checkpoint[0], map_location='cpu')
  364. self.key_list = list(self.sd.keys())
  365. self.load_model_with_checkpoint(self.module)
  366. for i in range(1, len(checkpoint)):
  367. if not dist.is_initialized() or dist.get_rank() == 0:
  368. print(f"loading checkpoint ({i})")
  369. self.sd = torch.load(checkpoint[i], map_location=get_accelerator().device_name())
  370. self.key_list = list(self.sd.keys())
  371. self.load_model_with_checkpoint(self.module)
  372. else:
  373. mp_rank = 0 if self.mpu is None else self.mpu.get_model_parallel_rank()
  374. load_path, checkpoint, quantize_config = sd_loader.load(self._config.tensor_parallel.tp_size,
  375. mp_rank,
  376. is_pipe_parallel=is_pipe_parallel,
  377. quantize=(self._config.dtype is torch.int8),
  378. quantize_groups=self.quantize_groups,
  379. mlp_extra_grouping=self.mlp_extra_grouping)
  380. self.quantization_scales, self.quantize_merge_count = quantize_config
  381. moe, _ = has_moe_layers(self.module)
  382. if moe:
  383. from deepspeed.runtime.engine import DeepSpeedEngine
  384. old_moe_load = False
  385. if not isinstance(checkpoint['num_experts'], list):
  386. old_moe_load = True
  387. DeepSpeedEngine.load_moe_state_dict(load_dir,
  388. tag,
  389. state_dict=checkpoint[self._choose_module_key(checkpoint)],
  390. old_moe_load=old_moe_load,
  391. model=self.module,
  392. mpu=self.mpu,
  393. checkpoint_engine=self.checkpoint_engine)
  394. self.module.load_state_dict(state_dict=checkpoint[self._choose_module_key(checkpoint)],
  395. strict=load_module_strict)
  396. def _choose_module_key(self, sd):
  397. assert not ('module' in sd
  398. and 'model' in sd), "checkpoint has both 'model' and 'module' keys, not sure how to proceed"
  399. assert 'module' in sd or 'model' in sd, "checkpoint contains neither 'model' or 'module' keys, not sure how to proceed"
  400. if 'module' in sd:
  401. return 'module'
  402. elif 'model' in sd:
  403. return 'model'
  404. def _convert_to_dtype(self, config):
  405. if not isinstance(self.module, torch.nn.Module):
  406. return
  407. if False: #config.dtype is torch.int8 and self.quantization_scales is None:
  408. quantizer = WeightQuantization(mlp_extra_grouping=self.mlp_extra_grouping)
  409. model, self.quantization_scales = quantizer.model_quantize(self.module, self.injection_dict,
  410. self.quantize_bits, self.quantize_groups)
  411. elif config.dtype == torch.half:
  412. self.module.half()
  413. elif config.dtype == torch.bfloat16:
  414. self.module.bfloat16()
  415. elif config.dtype == torch.float:
  416. self.module.float()
  417. def _create_cuda_graph(self, *inputs, **kwargs):
  418. # warmup to create the workspace and cublas handle
  419. cuda_stream = get_accelerator().Stream()
  420. cuda_stream.wait_stream(get_accelerator().current_stream())
  421. with get_accelerator().stream(cuda_stream):
  422. for i in range(3):
  423. ret = self.module(*inputs, **kwargs)
  424. get_accelerator().current_stream().wait_stream(cuda_stream)
  425. # create cuda_graph and assign static_inputs and static_outputs
  426. self._cuda_graphs = torch.cuda.CUDAGraph()
  427. self.static_inputs = inputs
  428. self.static_kwargs = kwargs
  429. with torch.cuda.graph(self._cuda_graphs):
  430. self.static_output = self.module(*self.static_inputs, **self.static_kwargs)
  431. self.cuda_graph_created = True
  432. def _graph_replay(self, *inputs, **kwargs):
  433. for i in range(len(inputs)):
  434. if torch.is_tensor(inputs[i]):
  435. self.static_inputs[i].copy_(inputs[i])
  436. for k in kwargs:
  437. if torch.is_tensor(kwargs[k]):
  438. self.static_kwargs[k].copy_(kwargs[k])
  439. self._cuda_graphs.replay()
  440. return self.static_output
  441. def model_times(self):
  442. assert self.model_profile_enabled, "model profiling is not enabled"
  443. model_times = self._model_times
  444. if self._config.enable_cuda_graph and len(self._model_times) == 0:
  445. raise ValueError("Model times are empty and cuda graph is enabled. If "
  446. "this is a GPT-style model this combo is not supported. If this is a "
  447. "BERT-style model this is a bug, please report it. "
  448. f"Model type is: {type(self.module)}")
  449. self._model_times = []
  450. return model_times
  451. def _module_match(self, module):
  452. for policy in generic_policies:
  453. policy = policy()
  454. if policy.match_replaced(module):
  455. return True
  456. return False
  457. def _local_cuda_graph_used(self, module):
  458. if isinstance(module, torch.nn.Module):
  459. return False
  460. else:
  461. sub_module_cuda_graph = False
  462. for name in module.__dict__.keys():
  463. sub_module = getattr(module, name)
  464. if self._module_match(sub_module) and hasattr(sub_module, "enable_cuda_graph"):
  465. sub_module_cuda_graph = True
  466. return sub_module_cuda_graph
  467. def forward(self, *inputs, **kwargs):
  468. """Execute forward propagation
  469. Arguments:
  470. *inputs: Variable length input list
  471. **kwargs: variable length keyword arguments
  472. """
  473. start = None
  474. if self.model_profile_enabled and get_accelerator().device_name() == 'cuda' and self._config.enable_cuda_graph:
  475. get_accelerator().synchronize()
  476. start = time.time()
  477. if get_accelerator().device_name() == 'cuda' and self._config.enable_cuda_graph and not self.local_cuda_graph:
  478. if self.cuda_graph_created:
  479. outputs = self._graph_replay(*inputs, **kwargs)
  480. else:
  481. self._create_cuda_graph(*inputs, **kwargs)
  482. outputs = self._graph_replay(*inputs, **kwargs)
  483. else:
  484. outputs = self.module(*inputs, **kwargs)
  485. if self.model_profile_enabled and self._config.enable_cuda_graph:
  486. get_accelerator().synchronize()
  487. duration = (time.time() - start) * 1e3 # convert seconds to ms
  488. self._model_times.append(duration)
  489. return outputs
  490. def _generate(self, *inputs, **kwargs):
  491. # Reset KV-cache at the beginning of generate
  492. if hasattr(self.module, 'reset_cache'):
  493. self.module.reset_cache()
  494. num_beams = 1
  495. if "generation_config" in kwargs:
  496. gen_config = kwargs["generation_config"]
  497. num_beams = getattr(gen_config, "num_beams", 1)
  498. if "num_beams" in kwargs:
  499. num_beams = kwargs["num_beams"]
  500. if num_beams > 1:
  501. raise NotImplementedError("DeepSpeed does not support `num_beams` > 1, if this is important to you please "
  502. "add your request to: https://github.com/microsoft/DeepSpeed/issues/2506")
  503. return self.module.generate(*inputs, **kwargs)