test_pipe.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. import os
  2. import torch
  3. import torch.nn as nn
  4. import torch.nn.functional as F
  5. import torch.distributed as dist
  6. import pytest
  7. import deepspeed
  8. import deepspeed.runtime.utils as ds_utils
  9. from deepspeed.runtime.pipe.topology import PipeDataParallelTopology, PipeModelDataParallelTopology
  10. PipeTopo = PipeDataParallelTopology
  11. import deepspeed.runtime.pipe.module as PipelineModule
  12. from deepspeed.runtime.pipe.module import LayerSpec
  13. from common import distributed_test
  14. pytest.skip(
  15. "skipping until we can figure out what's causing these to hang inside our CI",
  16. allow_module_level=True)
  17. def rel_diff(A, B):
  18. return abs(A - B) / abs(A)
  19. # All models
  20. from simple_model import args_from_dict
  21. class AlexNet(nn.Module):
  22. def __init__(self, num_classes=10):
  23. super(AlexNet, self).__init__()
  24. self.features = nn.Sequential(
  25. nn.Conv2d(3,
  26. 64,
  27. kernel_size=11,
  28. stride=4,
  29. padding=5),
  30. nn.ReLU(inplace=True),
  31. nn.MaxPool2d(kernel_size=2,
  32. stride=2),
  33. nn.Conv2d(64,
  34. 192,
  35. kernel_size=5,
  36. padding=2),
  37. nn.ReLU(inplace=True),
  38. nn.MaxPool2d(kernel_size=2,
  39. stride=2),
  40. nn.Conv2d(192,
  41. 384,
  42. kernel_size=3,
  43. padding=1),
  44. nn.ReLU(inplace=True),
  45. nn.Conv2d(384,
  46. 256,
  47. kernel_size=3,
  48. padding=1),
  49. nn.ReLU(inplace=True),
  50. nn.Conv2d(256,
  51. 256,
  52. kernel_size=3,
  53. padding=1),
  54. nn.ReLU(inplace=True),
  55. nn.MaxPool2d(kernel_size=2,
  56. stride=2),
  57. )
  58. self.classifier = nn.Linear(256, num_classes)
  59. self.loss_fn = nn.CrossEntropyLoss()
  60. def forward(self, x, y):
  61. x = self.features(x)
  62. x = x.view(x.size(0), -1)
  63. x = self.classifier(x)
  64. return self.loss_fn(x, y)
  65. class AlexNetPipe(PipelineModule.PipelineModule):
  66. def __init__(self, num_classes=10, **kwargs):
  67. self.num_classes = num_classes
  68. specs = [
  69. LayerSpec(nn.Conv2d, 3, 64, kernel_size=11, stride=4, padding=5),
  70. LayerSpec(nn.ReLU, inplace=True),
  71. LayerSpec(nn.MaxPool2d, kernel_size=2, stride=2),
  72. LayerSpec(nn.Conv2d, 64, 192, kernel_size=5, padding=2),
  73. F.relu,
  74. LayerSpec(nn.MaxPool2d, kernel_size=2, stride=2),
  75. LayerSpec(nn.Conv2d, 192, 384, kernel_size=3, padding=1),
  76. F.relu,
  77. LayerSpec(nn.Conv2d, 384, 256, kernel_size=3, padding=1),
  78. F.relu,
  79. LayerSpec(nn.Conv2d, 256, 256, kernel_size=3, padding=1),
  80. F.relu,
  81. LayerSpec(nn.MaxPool2d, kernel_size=2, stride=2),
  82. lambda x: x.view(x.size(0), -1),
  83. LayerSpec(nn.Linear, 256, self.num_classes), # classifier
  84. ]
  85. super().__init__(layers=specs, loss_fn=nn.CrossEntropyLoss(), **kwargs)
  86. def cifar_trainset(fp16=False):
  87. import torchvision
  88. import torchvision.transforms as transforms
  89. transform_list = [
  90. transforms.ToTensor(),
  91. transforms.Normalize((0.5,
  92. 0.5,
  93. 0.5),
  94. (0.5,
  95. 0.5,
  96. 0.5)),
  97. ]
  98. if fp16:
  99. transform_list.append(torchvision.transforms.Lambda(lambda x: x.half()))
  100. transform = transforms.Compose(transform_list)
  101. local_rank = torch.cuda.current_device()
  102. # Only one rank per machine downloads.
  103. dist.barrier()
  104. if local_rank != 0:
  105. dist.barrier()
  106. trainset = torchvision.datasets.CIFAR10(root='/tmp/cifar10-data',
  107. train=True,
  108. download=True,
  109. transform=transform)
  110. if local_rank == 0:
  111. dist.barrier()
  112. return trainset
  113. def train_cifar(model, args, num_steps=400, average_dp_losses=True, fp16=True, seed=123):
  114. with torch.random.fork_rng(devices=[torch.cuda.current_device()]):
  115. ds_utils.set_random_seed(seed)
  116. trainset = cifar_trainset(fp16=fp16)
  117. args.local_rank = dist.get_rank()
  118. engine, _, _, _ = deepspeed.initialize(
  119. args=args,
  120. model=model,
  121. model_parameters=[p for p in model.parameters()],
  122. training_data=trainset)
  123. losses = []
  124. for step in range(num_steps):
  125. loss = engine.train_batch()
  126. losses.append(loss.item())
  127. if step % 50 == 0:
  128. print(f'STEP={step} LOSS={loss.item()}')
  129. if average_dp_losses:
  130. loss_tensor = torch.tensor(losses).cuda()
  131. dist.all_reduce(loss_tensor)
  132. loss_tensor /= dist.get_world_size()
  133. losses = loss_tensor.tolist()
  134. return losses
  135. @pytest.mark.parametrize('base_topo,test_topo',
  136. [
  137. (PipeTopo(num_pp=1,
  138. num_dp=4),
  139. PipeTopo(num_pp=2,
  140. num_dp=2)),
  141. (PipeTopo(num_pp=1,
  142. num_dp=4),
  143. PipeTopo(num_pp=4,
  144. num_dp=1)),
  145. ])
  146. def test_pipe_cifar10_seedlayers(base_topo, test_topo, tmpdir):
  147. config_dict = {
  148. "train_batch_size": 16,
  149. "train_micro_batch_size_per_gpu": 4,
  150. "steps_per_print": 20,
  151. "optimizer": {
  152. "type": "Adam",
  153. "params": {
  154. "lr": 0.001,
  155. "betas": [0.9,
  156. 0.999],
  157. "eps": 1e-8,
  158. "weight_decay": 3e-7
  159. }
  160. },
  161. "zero_optimization": {
  162. "stage": 0
  163. },
  164. "fp16": {
  165. "enabled": False
  166. },
  167. "pipeline": {
  168. "seed_layers": True,
  169. "activation_checkpoint_interval": 1
  170. }
  171. }
  172. args = args_from_dict(tmpdir, config_dict)
  173. @distributed_test(world_size=4)
  174. def _helper(base_topo, test_topo, tmpdir, steps=500):
  175. assert steps >= 100
  176. base_model = AlexNetPipe(num_classes=10,
  177. topology=base_topo,
  178. seed_layers=config_dict['pipeline']['seed_layers'])
  179. base_losses = train_cifar(base_model,
  180. args,
  181. num_steps=steps,
  182. fp16=config_dict['fp16']['enabled'])
  183. test_model = AlexNetPipe(num_classes=10,
  184. topology=test_topo,
  185. seed_layers=config_dict['pipeline']['seed_layers'])
  186. test_losses = train_cifar(test_model,
  187. args,
  188. num_steps=steps,
  189. fp16=config_dict['fp16']['enabled'])
  190. abs_diffs = [l0 - l1 for l0, l1 in zip(base_losses, test_losses)]
  191. rel_diffs = [rel_diff(l0, l1) for l0, l1 in zip(base_losses, test_losses)]
  192. if dist.get_rank() == 0:
  193. print(
  194. f'abs min={min(abs_diffs)} max={max(abs_diffs)} avg={sum(abs_diffs)/len(abs_diffs)}'
  195. )
  196. print(
  197. f'rel min={min(rel_diffs)} max={max(rel_diffs)} avg={sum(rel_diffs)/len(rel_diffs)}'
  198. )
  199. print(
  200. f'first: base={base_losses[0]} test={test_losses[0]} abs={abs_diffs[0]} rel={rel_diffs[0]}'
  201. )
  202. for lastX in [1, 10, 100]:
  203. base_avg = sum(base_losses[-lastX:]) / lastX
  204. test_avg = sum(test_losses[-lastX:]) / lastX
  205. print(
  206. f'last-{lastX}: base={base_avg} test={test_avg} abs={base_avg - test_avg} rel={rel_diff(base_avg, test_avg)}'
  207. )
  208. lastX = 100
  209. base = base_losses[-lastX:]
  210. base_avg = sum(base) / len(base)
  211. test = test_losses[-lastX:]
  212. test_avg = sum(test) / len(test)
  213. assert rel_diff(base_avg, test_avg) < 0.03
  214. _helper(base_topo, test_topo, tmpdir)