test_pipeline.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. # Copyright (c) Microsoft Corporation.
  2. # SPDX-License-Identifier: Apache-2.0
  3. # DeepSpeed Team
  4. from deepspeed.runtime.checkpoint_engine.torch_checkpoint_engine import TorchCheckpointEngine
  5. from unit.common import DistributedTest
  6. from unit.simple_model import *
  7. from unit.checkpoint.common import checkpoint_correctness_verification
  8. from unit.util import skip_on_arch
  9. import pytest
  10. class TestPipelineCheckpoint(DistributedTest):
  11. world_size = 4
  12. @pytest.mark.parametrize("zero_stage", [0, 1])
  13. def test_checkpoint_pipe_engine(self, zero_stage, tmpdir):
  14. skip_on_arch(min_arch=7)
  15. config_dict = {
  16. "train_batch_size": 2,
  17. "train_micro_batch_size_per_gpu": 1,
  18. "steps_per_print": 1,
  19. "optimizer": {
  20. "type": "Adam",
  21. "params": {
  22. "lr": 1e-5
  23. }
  24. },
  25. "zero_optimization": {
  26. "stage": zero_stage
  27. },
  28. "fp16": {
  29. "enabled": zero_stage > 0
  30. },
  31. "scheduler": {
  32. "type": "OneCycle",
  33. "params": {
  34. "cycle_first_step_size": 1000,
  35. "cycle_first_stair_count": 500,
  36. "cycle_second_step_size": 1000,
  37. "cycle_second_stair_count": 500,
  38. "decay_step_size": 1000,
  39. "cycle_min_lr": 0.0001,
  40. "cycle_max_lr": 0.0010,
  41. "decay_lr_rate": 0.001,
  42. "cycle_min_mom": 0.85,
  43. "cycle_max_mom": 0.99,
  44. "decay_mom_rate": 0.0
  45. }
  46. }
  47. }
  48. models = [LinearStackPipe(num_stages=2) for _ in range(2)]
  49. checkpoint_correctness_verification(config_dict=config_dict,
  50. models=models,
  51. hidden_dim=models[0].hidden_dim,
  52. tmpdir=tmpdir,
  53. fp16=config_dict['fp16']['enabled'],
  54. load_optimizer_states=True,
  55. load_lr_scheduler_states=True,
  56. train_batch=True)
  57. @pytest.mark.parametrize(
  58. "base_topo,test_topo",
  59. [
  60. #(PipeTopo(num_pp=1,
  61. # num_dp=4),
  62. # PipeTopo(num_pp=4,
  63. # num_dp=1)),
  64. #(PipeTopo(num_pp=2,
  65. # num_dp=2),
  66. # PipeTopo(num_pp=2,
  67. # num_dp=2)),
  68. #(PipeTopo(num_pp=4,
  69. # num_dp=1),
  70. # PipeTopo(num_pp=2,
  71. # num_dp=2)),
  72. ])
  73. def test_checkpoint_pipe_module(self, base_topo, test_topo, tmpdir):
  74. checkpoint_engine = TorchCheckpointEngine()
  75. base_model = LinearStackPipe(topology=base_topo)
  76. base_model.save_state_dict(tmpdir, checkpoint_engine=checkpoint_engine)
  77. dist.barrier()
  78. test_model = LinearStackPipe(topology=test_topo)
  79. test_model.load_state_dir(tmpdir, checkpoint_engine=checkpoint_engine)
  80. # Base and test can have different lengths, so make sure we map from the
  81. # smaller to larger model
  82. if len(base_model.forward_funcs) < len(test_model.forward_funcs):
  83. A = base_model
  84. B = test_model
  85. else:
  86. A = test_model
  87. B = base_model
  88. # Compare layers individually since partitions are different
  89. for idx, A_layer in enumerate(A.forward_funcs):
  90. if not hasattr(A_layer, 'parameters'):
  91. # Skip functionals, etc.
  92. continue
  93. # Find the corresponding layer in B
  94. global_idx = idx + A._local_start
  95. B_local_idx = global_idx - B._local_start
  96. B_layer = B.forward_funcs[B_local_idx]
  97. # Compare layer parameters
  98. for p0, p1 in zip(A_layer.parameters(), B_layer.parameters()):
  99. assert torch.allclose(p0, p1, atol=1e-07), f"Model state {p0} is not equal to {p1}"