test_ds_config_dict.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. # Copyright (c) Microsoft Corporation.
  2. # SPDX-License-Identifier: Apache-2.0
  3. # DeepSpeed Team
  4. # A test on its own
  5. import os
  6. import pytest
  7. import json
  8. import hjson
  9. import argparse
  10. from deepspeed.runtime.zero.config import DeepSpeedZeroConfig
  11. from deepspeed.accelerator import get_accelerator
  12. from unit.common import DistributedTest, get_test_path
  13. from unit.simple_model import SimpleModel, create_config_from_dict, random_dataloader
  14. import deepspeed.comm as dist
  15. # A test on its own
  16. import deepspeed
  17. from deepspeed.runtime.config import DeepSpeedConfig, get_bfloat16_enabled
  18. class TestBasicConfig(DistributedTest):
  19. world_size = 1
  20. def test_accelerator(self):
  21. assert (get_accelerator().is_available())
  22. def test_check_version(self):
  23. assert hasattr(deepspeed, "__git_hash__")
  24. assert hasattr(deepspeed, "__git_branch__")
  25. assert hasattr(deepspeed, "__version__")
  26. assert hasattr(deepspeed, "__version_major__")
  27. assert hasattr(deepspeed, "__version_minor__")
  28. assert hasattr(deepspeed, "__version_patch__")
  29. @pytest.fixture
  30. def base_config():
  31. config_dict = {
  32. "train_batch_size": 1,
  33. "optimizer": {
  34. "type": "Adam",
  35. "params": {
  36. "lr": 0.00015
  37. }
  38. },
  39. "fp16": {
  40. "enabled": True
  41. }
  42. }
  43. return config_dict
  44. def _run_batch_config(ds_config, train_batch=None, micro_batch=None, gas=None):
  45. ds_config.train_batch_size = train_batch
  46. ds_config.train_micro_batch_size_per_gpu = micro_batch
  47. ds_config.gradient_accumulation_steps = gas
  48. success = True
  49. try:
  50. ds_config._configure_train_batch_size()
  51. except AssertionError:
  52. success = False
  53. return success
  54. def _batch_assert(status, ds_config, batch, micro_batch, gas, success):
  55. if not success:
  56. assert not status
  57. print("Failed but All is well")
  58. return
  59. assert ds_config.train_batch_size == batch
  60. assert ds_config.train_micro_batch_size_per_gpu == micro_batch
  61. assert ds_config.gradient_accumulation_steps == gas
  62. print("All is well")
  63. #Tests different batch config provided in deepspeed json file
  64. @pytest.mark.parametrize('num_ranks,batch,micro_batch,gas,success',
  65. [(2,32,16,1,True),
  66. (2,32,8,2,True),
  67. (2,33,17,2,False),
  68. (2,32,18,1,False)]) # yapf: disable
  69. class TestBatchConfig(DistributedTest):
  70. world_size = 2
  71. def test(self, num_ranks, batch, micro_batch, gas, success):
  72. assert dist.get_world_size() == num_ranks, \
  73. 'The test assumes a world size of f{num_ranks}'
  74. ds_batch_config = get_test_path('ds_batch_config.json')
  75. ds_config = DeepSpeedConfig(ds_batch_config)
  76. #test cases when all parameters are provided
  77. status = _run_batch_config(ds_config, train_batch=batch, micro_batch=micro_batch, gas=gas)
  78. _batch_assert(status, ds_config, batch, micro_batch, gas, success)
  79. #test cases when two out of three parameters are provided
  80. status = _run_batch_config(ds_config, train_batch=batch, micro_batch=micro_batch)
  81. _batch_assert(status, ds_config, batch, micro_batch, gas, success)
  82. if success:
  83. #when gas is provided with one more parameter
  84. status = _run_batch_config(ds_config, train_batch=batch, gas=gas)
  85. _batch_assert(status, ds_config, batch, micro_batch, gas, success)
  86. status = _run_batch_config(ds_config, micro_batch=micro_batch, gas=gas)
  87. _batch_assert(status, ds_config, batch, micro_batch, gas, success)
  88. #test the case when only micro_batch or train_batch is provided
  89. if gas == 1:
  90. status = _run_batch_config(ds_config, micro_batch=micro_batch)
  91. _batch_assert(status, ds_config, batch, micro_batch, gas, success)
  92. status = _run_batch_config(ds_config, train_batch=batch)
  93. _batch_assert(status, ds_config, batch, micro_batch, gas, success)
  94. else:
  95. #when only gas is provided
  96. status = _run_batch_config(ds_config, gas=gas)
  97. _batch_assert(status, ds_config, batch, micro_batch, gas, success)
  98. #when gas is provided with something else and gas does not divide batch
  99. if gas != 1:
  100. status = _run_batch_config(ds_config, train_batch=batch, gas=gas)
  101. _batch_assert(status, ds_config, batch, micro_batch, gas, success)
  102. def test_temp_config_json(tmpdir):
  103. config_dict = {
  104. "train_batch_size": 1,
  105. }
  106. config_path = create_config_from_dict(tmpdir, config_dict)
  107. config_json = json.load(open(config_path, 'r'))
  108. assert 'train_batch_size' in config_json
  109. @pytest.mark.parametrize("gather_weights_key",
  110. ["stage3_gather_16bit_weights_on_model_save", "stage3_gather_fp16_weights_on_model_save"])
  111. def test_gather_16bit_params_on_model_save(gather_weights_key):
  112. config_dict = {
  113. gather_weights_key: True,
  114. }
  115. config = DeepSpeedZeroConfig(**config_dict)
  116. assert config.gather_16bit_weights_on_model_save == True
  117. @pytest.mark.parametrize("bf16_key", ["bf16", "bfloat16"])
  118. def test_get_bfloat16_enabled(bf16_key):
  119. cfg = {
  120. bf16_key: {
  121. "enabled": True,
  122. },
  123. }
  124. assert get_bfloat16_enabled(cfg) == True
  125. class TestConfigLoad(DistributedTest):
  126. world_size = 1
  127. def test_dict(self, base_config):
  128. hidden_dim = 10
  129. model = SimpleModel(hidden_dim)
  130. model, _, _, _ = deepspeed.initialize(config=base_config, model=model, model_parameters=model.parameters())
  131. def test_json(self, base_config, tmpdir):
  132. config_path = os.path.join(tmpdir, "config.json")
  133. with open(config_path, 'w') as fp:
  134. json.dump(base_config, fp)
  135. hidden_dim = 10
  136. model = SimpleModel(hidden_dim)
  137. model, _, _, _ = deepspeed.initialize(config=config_path, model=model, model_parameters=model.parameters())
  138. def test_hjson(self, base_config, tmpdir):
  139. config_path = os.path.join(tmpdir, "config.json")
  140. with open(config_path, 'w') as fp:
  141. hjson.dump(base_config, fp)
  142. hidden_dim = 10
  143. model = SimpleModel(hidden_dim)
  144. model, _, _, _ = deepspeed.initialize(config=config_path, model=model, model_parameters=model.parameters())
  145. class TestDeprecatedDeepScaleConfig(DistributedTest):
  146. world_size = 1
  147. def test(self, base_config, tmpdir):
  148. config_path = create_config_from_dict(tmpdir, base_config)
  149. parser = argparse.ArgumentParser()
  150. args = parser.parse_args(args='')
  151. args.deepscale_config = config_path
  152. args.local_rank = 0
  153. hidden_dim = 10
  154. model = SimpleModel(hidden_dim)
  155. model, _, _, _ = deepspeed.initialize(args=args, model=model, model_parameters=model.parameters())
  156. data_loader = random_dataloader(model=model, total_samples=5, hidden_dim=hidden_dim, device=model.device)
  157. for n, batch in enumerate(data_loader):
  158. loss = model(batch[0], batch[1])
  159. model.backward(loss)
  160. model.step()
  161. class TestDistInit(DistributedTest):
  162. world_size = 1
  163. def test(self, base_config):
  164. hidden_dim = 10
  165. model = SimpleModel(hidden_dim)
  166. model, _, _, _ = deepspeed.initialize(config=base_config,
  167. model=model,
  168. model_parameters=model.parameters(),
  169. dist_init_required=True)
  170. data_loader = random_dataloader(model=model, total_samples=5, hidden_dim=hidden_dim, device=model.device)
  171. for n, batch in enumerate(data_loader):
  172. loss = model(batch[0], batch[1])
  173. model.backward(loss)
  174. model.step()
  175. class TestInitNoOptimizer(DistributedTest):
  176. world_size = 1
  177. def test(self, base_config):
  178. del base_config["optimizer"]
  179. hidden_dim = 10
  180. model = SimpleModel(hidden_dim=hidden_dim)
  181. model, _, _, _ = deepspeed.initialize(config=base_config, model=model)
  182. data_loader = random_dataloader(model=model, total_samples=5, hidden_dim=hidden_dim, device=model.device)
  183. for n, batch in enumerate(data_loader):
  184. loss = model(batch[0], batch[1])
  185. with pytest.raises(AssertionError):
  186. model.backward(loss)
  187. with pytest.raises(AssertionError):
  188. model.step()
  189. class TestArgs(DistributedTest):
  190. world_size = 1
  191. def test_none_args(self, base_config):
  192. model = SimpleModel(hidden_dim=10)
  193. model, _, _, _ = deepspeed.initialize(args=None, model=model, config=base_config)
  194. data_loader = random_dataloader(model=model, total_samples=5, hidden_dim=10, device=model.device)
  195. for n, batch in enumerate(data_loader):
  196. loss = model(batch[0], batch[1])
  197. def test_no_args(self, base_config):
  198. model = SimpleModel(hidden_dim=10)
  199. model, _, _, _ = deepspeed.initialize(model=model, config=base_config)
  200. data_loader = random_dataloader(model=model, total_samples=5, hidden_dim=10, device=model.device)
  201. for n, batch in enumerate(data_loader):
  202. loss = model(batch[0], batch[1])
  203. class TestNoModel(DistributedTest):
  204. world_size = 1
  205. def test(self, base_config):
  206. model = SimpleModel(hidden_dim=10)
  207. with pytest.raises(AssertionError):
  208. model, _, _, _ = deepspeed.initialize(model=None, config=base_config)
  209. with pytest.raises(AssertionError):
  210. model, _, _, _ = deepspeed.initialize(model, config=base_config)