test_data_efficiency.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. # Copyright (c) Microsoft Corporation.
  2. # SPDX-License-Identifier: Apache-2.0
  3. # DeepSpeed Team
  4. import torch
  5. import os
  6. import deepspeed
  7. from deepspeed.accelerator import get_accelerator
  8. from unit.common import DistributedTest
  9. from unit.simple_model import Curriculum_SimpleModel, SimpleModel, random_dataloader, random_dataset
  10. class MPU():
  11. def __init__(self, tp_world_size):
  12. self.rank = deepspeed.comm.get_rank()
  13. self.world_size = deepspeed.comm.get_world_size()
  14. self.tp_world_size = tp_world_size
  15. for i in range(0, self.world_size, tp_world_size):
  16. ranks = range(i, i + tp_world_size)
  17. group = deepspeed.comm.new_group(ranks)
  18. if self.rank in ranks:
  19. self.tp_group = group
  20. for i in range(0, tp_world_size):
  21. ranks = range(i, self.world_size, tp_world_size)
  22. group = deepspeed.comm.new_group(ranks)
  23. if self.rank in ranks:
  24. self.dp_group = group
  25. def get_model_parallel_rank(self):
  26. return self.rank % self.tp_world_size
  27. def get_model_parallel_world_size(self):
  28. return self.tp_world_size
  29. def get_data_parallel_rank(self):
  30. return self.rank // self.tp_world_size
  31. def get_data_parallel_world_size(self):
  32. return self.world_size // self.tp_world_size
  33. def get_data_parallel_group(self):
  34. return self.dp_group
  35. def get_model_parallel_group(self):
  36. return self.tp_group
  37. class TestDataEfficiency(DistributedTest):
  38. world_size = 2
  39. def test_curriculum_learning(self):
  40. config_dict = {
  41. "train_batch_size": 2,
  42. "steps_per_print": 1,
  43. "optimizer": {
  44. "type": "Adam",
  45. "params": {
  46. "lr": 0.00015,
  47. "weight_decay": 0.01
  48. }
  49. },
  50. "gradient_clipping": 1.0,
  51. "fp16": {
  52. "enabled": True,
  53. "loss_scale": 0,
  54. "initial_scale_power": 16
  55. },
  56. "data_efficiency": {
  57. "enabled": True,
  58. "seed": 1234,
  59. "data_sampling": {
  60. "enabled": True,
  61. "num_workers": 0,
  62. "curriculum_learning": {
  63. "enabled": True,
  64. "data_cluster_path": "/tmp",
  65. "curriculum_metrics": {
  66. "dummy_metric": {
  67. "index_to_sample_path": "dummy",
  68. "index_to_metric_path": "dummy",
  69. "difficulty_type": "value",
  70. "clustering_type": "single_cluster",
  71. "min_difficulty": 2,
  72. "max_difficulty": 10,
  73. "schedule_type": "fixed_root",
  74. "schedule_config": {
  75. "total_curriculum_step": 8,
  76. "difficulty_step": 2,
  77. "root_degree": 1
  78. }
  79. }
  80. }
  81. }
  82. }
  83. }
  84. }
  85. def data_post_process(data, data_sampler_state_dict):
  86. assert 'dummy_metric' in data_sampler_state_dict['current_difficulties']
  87. return data
  88. hidden_dim = 10
  89. model = SimpleModel(hidden_dim)
  90. dataset = random_dataset(20, hidden_dim, torch.device('cpu'), dtype=torch.half)
  91. model, _, data_loader, _ = deepspeed.initialize(config=config_dict,
  92. model=model,
  93. training_data=dataset,
  94. model_parameters=model.parameters(),
  95. mpu=MPU(1))
  96. if model.mpu.get_data_parallel_rank() == 0 and not os.path.exists('/tmp'):
  97. os.makedirs('/tmp')
  98. model.set_data_post_process_func(data_post_process)
  99. for n, batch in enumerate(data_loader):
  100. x = batch[0].to(get_accelerator().current_device_name())
  101. y = batch[1].to(get_accelerator().current_device_name())
  102. loss = model(x, y)
  103. model.backward(loss)
  104. model.step()
  105. if n >= 10:
  106. break
  107. class TestLegacyCurriculumScheduler(DistributedTest):
  108. world_size = 2
  109. def test_fixed_discrete(self):
  110. config_dict = {
  111. "train_batch_size": 2,
  112. "steps_per_print": 1,
  113. "optimizer": {
  114. "type": "Adam",
  115. "params": {
  116. "lr": 0.00015,
  117. "weight_decay": 0.01
  118. }
  119. },
  120. "gradient_clipping": 1.0,
  121. "fp16": {
  122. "enabled": True,
  123. "loss_scale": 0,
  124. "initial_scale_power": 16
  125. },
  126. "curriculum_learning": {
  127. "enabled": True,
  128. "curriculum_type": "seqlen",
  129. "min_difficulty": 1,
  130. "max_difficulty": 5,
  131. "schedule_type": "fixed_discrete",
  132. "schedule_config": {
  133. "difficulty": [1, 2, 3, 4, 5],
  134. "max_step": [2, 4, 6, 8]
  135. }
  136. }
  137. }
  138. hidden_dim = 10
  139. ground_truths = {1: 1, 2: 1, 3: 2, 4: 2, 5: 3, 6: 3, 7: 4, 8: 4}
  140. model = Curriculum_SimpleModel(hidden_dim)
  141. model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters())
  142. data_loader = random_dataloader(model=model, total_samples=20, hidden_dim=hidden_dim, device=model.device)
  143. for n, batch in enumerate(data_loader):
  144. loss, seqlen = model(batch[0], batch[1])
  145. model.backward(loss)
  146. model.step()
  147. true_seqlen = 5
  148. if n + 1 in ground_truths:
  149. true_seqlen = ground_truths[n + 1]
  150. assert seqlen == true_seqlen, f"Incorrect curriculum schedule"
  151. def test_fixed_linear(self):
  152. config_dict = {
  153. "train_batch_size": 2,
  154. "steps_per_print": 1,
  155. "optimizer": {
  156. "type": "Adam",
  157. "params": {
  158. "lr": 0.00015,
  159. "weight_decay": 0.01
  160. }
  161. },
  162. "gradient_clipping": 1.0,
  163. "fp16": {
  164. "enabled": True,
  165. "loss_scale": 0,
  166. "initial_scale_power": 16
  167. },
  168. "curriculum_learning": {
  169. "enabled": True,
  170. "curriculum_type": "seqlen",
  171. "min_difficulty": 2,
  172. "max_difficulty": 10,
  173. "schedule_type": "fixed_linear",
  174. "schedule_config": {
  175. "total_curriculum_step": 8,
  176. "difficulty_step": 2
  177. }
  178. }
  179. }
  180. hidden_dim = 10
  181. ground_truths = {1: 2, 2: 4, 3: 4, 4: 6, 5: 6, 6: 8, 7: 8, 8: 10, 9: 10, 10: 10}
  182. model = Curriculum_SimpleModel(hidden_dim)
  183. model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters())
  184. data_loader = random_dataloader(model=model, total_samples=20, hidden_dim=hidden_dim, device=model.device)
  185. for n, batch in enumerate(data_loader):
  186. loss, seqlen = model(batch[0], batch[1])
  187. model.backward(loss)
  188. model.step()
  189. if n + 1 in ground_truths:
  190. true_seqlen = ground_truths[n + 1]
  191. assert seqlen == true_seqlen, f"Incorrect curriculum schedule"