123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- import copy
- import torch
- import torch.nn as nn
- import torch.distributed as dist
- import pytest
- import deepspeed
- from deepspeed.runtime.pipe.topology import PipeDataParallelTopology, PipeModelDataParallelTopology
- PipeTopo = PipeDataParallelTopology
- from deepspeed.pipe import PipelineModule, LayerSpec
- from deepspeed.utils import RepeatingLoader
- from common import distributed_test
- from simple_model import args_from_dict
- HIDDEN_DIM = 32
- LAYERS = 8
- @pytest.fixture
- def sequential_model():
- model = torch.nn.Sequential(
- *[nn.Linear(HIDDEN_DIM,
- HIDDEN_DIM) for _ in range(LAYERS)],
- nn.Linear(HIDDEN_DIM,
- 1),
- )
- return model
- @pytest.fixture
- def simple_args(tmpdir):
- config_dict = {
- "train_batch_size": 1,
- "train_micro_batch_size_per_gpu": 1,
- "steps_per_print": 1,
- "optimizer": {
- "type": "Adam",
- "params": {
- "lr": 0.001,
- "betas": [0.9,
- 0.999],
- "eps": 1e-8,
- "weight_decay": 3e-7
- }
- },
- "pipeline": {
- "activation_checkpoint_interval": 1
- }
- }
- args = args_from_dict(tmpdir, config_dict)
- return args
- def test_pipe_module_sequential(sequential_model, simple_args):
- batch_input = torch.randn(1, HIDDEN_DIM)
- @distributed_test(world_size=4)
- def _helper():
- base_model = copy.deepcopy(sequential_model)
- base_input = batch_input.clone().detach()
- base_output = base_model(base_input)
- base_output = base_output
- base_params = sum(p.numel() for p in base_model.parameters())
- pipe_model = copy.deepcopy(sequential_model)
- pipe_model = PipelineModule(layers=pipe_model, num_stages=4)
- # Ensure all parameters are accounted for.
- my_params = sum(p.numel() for p in pipe_model.parameters())
- total_pipe_params = torch.LongTensor([my_params]).to('cuda')
- dist.all_reduce(total_pipe_params)
- total_pipe_params = total_pipe_params.item()
- assert total_pipe_params == base_params
- pipe_model, _, _, _ = deepspeed.initialize(
- args=simple_args,
- model=pipe_model,
- model_parameters=[p for p in pipe_model.parameters()])
- if pipe_model.is_first_stage or pipe_model.is_last_stage:
- pipe_input = base_input.clone().detach().to('cuda')
- # label 0 is meaningless
- dataset = [(pipe_input, 0)]
- loader = RepeatingLoader(dataset)
- data_iter = iter(loader)
- else:
- data_iter = None
- pipe_output = pipe_model.eval_batch(data_iter=data_iter)
- base_output = base_output.to('cpu')
- pipe_output = pipe_output.to('cpu')
- assert torch.allclose(base_output, pipe_output)
- _helper()
|