123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652 |
- import torch
- import torch.distributed as dist
- import deepspeed
- from deepspeed.runtime.zero.stage2 import FP16_DeepSpeedZeroOptimizer
- from deepspeed.runtime.zero.stage1 import FP16_DeepSpeedZeroOptimizer_Stage1
- from deepspeed.runtime.fp16.fused_optimizer import FP16_Optimizer
- from deepspeed.runtime.fp16.unfused_optimizer import FP16_UnfusedOptimizer
- from deepspeed.runtime.pipe.topology import *
- PipeTopo = PipeDataParallelTopology
- import argparse
- import pytest
- import json
- import os
- import numbers
- from common import distributed_test
- from simple_model import *
- def compare_deepspeed_states(saved_model, loaded_model):
- # These are compared in more depth in other places
- assert hasattr(loaded_model, 'module')
- assert saved_model.csr_tensor_module_names == loaded_model.csr_tensor_module_names
- assert saved_model.skipped_steps == loaded_model.skipped_steps
- assert saved_model.global_steps == loaded_model.global_steps
- def compare_model_states(saved_model, loaded_model, compare_optimizer=True):
- compare_deepspeed_states(saved_model, loaded_model)
- for p0, p1 in zip(saved_model.module.parameters(), loaded_model.module.parameters()):
- assert torch.allclose(p0, p1, atol=1e-07), f"FP16 model state {p0} is not equal to {p1}"
- if not compare_optimizer:
- return
- if isinstance(saved_model.optimizer, FP16_DeepSpeedZeroOptimizer):
- for p0, p1 in zip(saved_model.optimizer.single_partition_of_fp32_groups, loaded_model.optimizer.single_partition_of_fp32_groups):
- assert torch.allclose(p0, p1, atol=1e-07), f"Fp32 model states {p0} is not equal to {p1}"
- elif isinstance(saved_model.optimizer, FP16_DeepSpeedZeroOptimizer_Stage1):
- for partition0, partition1 in zip(saved_model.optimizer.local_sub_partitions_of_fp32_groups, loaded_model.optimizer.local_sub_partitions_of_fp32_groups):
- for p0, p1 in zip(partition0, partition1):
- assert torch.allclose(p0, p1, atol=1e-07), f"Fp32 model states {p0} is not equal to {p1}"
- elif isinstance(saved_model.optimizer, FP16_Optimizer):
- for p0, p1 in zip(saved_model.optimizer.fp32_groups_flat, loaded_model.optimizer.fp32_groups_flat):
- assert torch.allclose(p0, p1, atol=1e-07), f"FP32 model states {p0} is not equal to {p1}"
- elif isinstance(saved_model.optimizer, FP16_UnfusedOptimizer):
- for params0, params1 in zip(saved_model.optimizer.fp32_groups, loaded_model.optimizer.fp32_groups):
- for p0, p1 in zip(params0, params1):
- assert torch.allclose(p0, p1, atol=1e-07), f"FP32 model states {p0} is not equal to {p1}"
- elif isinstance(saved_model.optimizer, torch.optim.Optimizer):
- pass
- else:
- assert False, f'Unexpected Optimizer Type: {saved_model.optimizer}'
- def compare_optimizer_states(saved_model, loaded_model, hidden_dim, fp16=True):
- saved_optimizer = saved_model.optimizer.optimizer if fp16 else saved_model.optimizer
- loaded_optimizer = loaded_model.optimizer.optimizer if fp16 else loaded_model.optimizer
- for state0, state1 in zip(saved_optimizer.state.values(),
- loaded_optimizer.state.values()):
- for s0, s1 in zip(state0.values(), state1.values()):
- if isinstance(s0, torch.Tensor) and isinstance(s1, torch.Tensor):
- assert torch.equal(s0, s1)
- else:
- assert s0 == s1
- def compare_lr_scheduler_states(saved_model, loaded_model):
- assert hasattr(saved_model, 'lr_scheduler')
- assert hasattr(loaded_model, 'lr_scheduler')
- saved_scheduler = saved_model.lr_scheduler
- loaded_scheduler = loaded_model.lr_scheduler
- assert hasattr(saved_scheduler, 'state_dict')
- assert hasattr(loaded_scheduler, 'state_dict')
- saved_sd = saved_scheduler.state_dict()
- loaded_sd = loaded_scheduler.state_dict()
- print(f"saved_sd = {saved_sd}")
- print(f"loaded_sd = {loaded_sd}")
- assert saved_sd.keys() == loaded_sd.keys()
- for state0, state1 in zip(saved_sd.values(), loaded_sd.values()):
- if isinstance(state0, numbers.Number) and isinstance(state1, numbers.Number):
- assert state0 == state1
- def checkpoint_correctness_verification(args,
- model,
- hidden_dim,
- tmpdir,
- load_optimizer_states=False,
- load_lr_scheduler_states=False,
- fp16=True,
- train_batch=False):
- dtype = torch.half if fp16 else torch.float32
- ds_model, _, _, _ = deepspeed.initialize(args=args,
- model=model,
- model_parameters=model.parameters())
- data_loader = random_dataloader(model=ds_model,
- total_samples=50,
- hidden_dim=hidden_dim,
- device=ds_model.device,
- dtype=dtype)
- if train_batch:
- ds_model.set_dataloader(data_loader)
- for n, batch in enumerate(data_loader):
- loss = ds_model.train_batch()
- else:
- for n, batch in enumerate(data_loader):
- loss = ds_model(batch[0], batch[1])
- print(loss)
- ds_model.backward(loss)
- ds_model.step()
- trained_model = ds_model
- save_folder = os.path.join(tmpdir, 'saved_checkpoint')
- save_tag = '1'
- trained_model.save_checkpoint(save_folder, save_tag)
- loaded_model, _, _, _ = deepspeed.initialize(args=args,
- model=model,
- model_parameters=model.parameters())
- loaded_model.load_checkpoint(save_folder,
- save_tag,
- load_optimizer_states=load_optimizer_states,
- load_lr_scheduler_states=load_lr_scheduler_states)
- compare_model_states(trained_model, loaded_model)
- if load_optimizer_states:
- compare_optimizer_states(trained_model, loaded_model, hidden_dim, fp16)
- if load_lr_scheduler_states:
- compare_lr_scheduler_states(trained_model, loaded_model)
- @pytest.mark.skipif(not deepspeed.ops.__installed_ops__['lamb'],
- reason="lamb is not installed")
- def test_checkpoint_unfused_optimizer(tmpdir):
- config_dict = {
- "train_batch_size": 2,
- "steps_per_print": 1,
- "optimizer": {
- "type": "Lamb",
- "params": {
- "lr": 0.00015
- }
- },
- "gradient_clipping": 1.0,
- "fp16": {
- "enabled": True
- },
- "scheduler": {
- "type": "OneCycle",
- "params": {
- "cycle_first_step_size": 1000,
- "cycle_first_stair_count": 500,
- "cycle_second_step_size": 1000,
- "cycle_second_stair_count": 500,
- "decay_step_size": 1000,
- "cycle_min_lr": 0.0001,
- "cycle_max_lr": 0.0010,
- "decay_lr_rate": 0.001,
- "cycle_min_mom": 0.85,
- "cycle_max_mom": 0.99,
- "decay_mom_rate": 0.0
- }
- }
- }
- args = args_from_dict(tmpdir, config_dict)
- hidden_dim = 10
- model = SimpleModel(hidden_dim, empty_grad=False)
- @distributed_test(world_size=[2])
- def _test_checkpoint_unfused_optimizer(args,
- model,
- hidden_dim,
- load_optimizer_states):
- checkpoint_correctness_verification(args,
- model,
- hidden_dim,
- tmpdir,
- load_optimizer_states=load_optimizer_states)
- _test_checkpoint_unfused_optimizer(args=args,
- model=model,
- hidden_dim=hidden_dim,
- load_optimizer_states=True)
- _test_checkpoint_unfused_optimizer(args=args,
- model=model,
- hidden_dim=hidden_dim,
- load_optimizer_states=False)
- def test_checkpoint_fused_optimizer(tmpdir):
- config_dict = {
- "train_batch_size": 2,
- "steps_per_print": 1,
- "optimizer": {
- "type": "Adam",
- "params": {
- "lr": 0.00015,
- "betas": [0.8,
- 0.999],
- "eps": 1e-8,
- "weight_decay": 3e-7
- }
- },
- "fp16": {
- "enabled": True
- }
- }
- args = args_from_dict(tmpdir, config_dict)
- hidden_dim = 10
- model = SimpleModel(hidden_dim, empty_grad=False)
- @distributed_test(world_size=[2])
- def _test_checkpoint_fused_optimizer(args, model, hidden_dim, load_optimizer_states):
- checkpoint_correctness_verification(args,
- model,
- hidden_dim,
- tmpdir,
- load_optimizer_states=load_optimizer_states)
- _test_checkpoint_fused_optimizer(args=args,
- model=model,
- hidden_dim=hidden_dim,
- load_optimizer_states=True)
- _test_checkpoint_fused_optimizer(args=args,
- model=model,
- hidden_dim=hidden_dim,
- load_optimizer_states=False)
- @pytest.mark.parametrize('zero_stage, use_cpu_offload, adam_optimizer',
- [
- (1,
- False,
- 'Adam'),
- (2,
- False,
- 'Adam'),
- (2,
- True,
- 'deepspeed_adam'),
- ])
- def test_checkpoint_zero_optimizer(tmpdir, zero_stage, use_cpu_offload, adam_optimizer):
- if use_cpu_offload and not deepspeed.ops.__installed_ops__['cpu-adam']:
- pytest.skip("cpu-adam is not installed")
- config_dict = {
- "train_batch_size": 2,
- "steps_per_print": 1,
- "optimizer": {
- "type": adam_optimizer,
- "params": {
- "lr": 0.00015,
- "betas": [0.8,
- 0.999],
- "eps": 1e-8,
- "weight_decay": 3e-7
- }
- },
- "fp16": {
- "enabled": True
- },
- "zero_optimization": {
- "stage": zero_stage,
- "cpu_offload": use_cpu_offload
- }
- }
- args = args_from_dict(tmpdir, config_dict)
- hidden_dim = 10
- model = SimpleModel(hidden_dim, empty_grad=False)
- @distributed_test(world_size=[2])
- def _test_checkpoint_zero_optimizer(args, model, hidden_dim, load_optimizer_states):
- checkpoint_correctness_verification(args,
- model,
- hidden_dim,
- tmpdir,
- load_optimizer_states=load_optimizer_states)
- _test_checkpoint_zero_optimizer(args=args,
- model=model,
- hidden_dim=hidden_dim,
- load_optimizer_states=True)
- @pytest.mark.parametrize('zero_stage, use_cpu_offload, adam_optimizer',
- [
- (1,
- False,
- "Adam"),
- (2,
- False,
- "Adam"),
- (2,
- True,
- 'deepspeed_adam'),
- ])
- def test_checkpoint_zero_no_optimizer(tmpdir,
- zero_stage,
- use_cpu_offload,
- adam_optimizer):
- if use_cpu_offload and not deepspeed.ops.__installed_ops__['cpu-adam']:
- pytest.skip("cpu-adam is not installed")
- config_dict = {
- "train_batch_size": 2,
- "steps_per_print": 1,
- "optimizer": {
- "type": adam_optimizer,
- "params": {
- "lr": 0.00015,
- "betas": [0.8,
- 0.999],
- "eps": 1e-8,
- "weight_decay": 3e-7
- }
- },
- "fp16": {
- "enabled": True
- },
- "zero_optimization": {
- "stage": zero_stage,
- "cpu_offload": use_cpu_offload
- }
- }
- args = args_from_dict(tmpdir, config_dict)
- hidden_dim = 10
- model = SimpleModel(hidden_dim, empty_grad=False)
- @distributed_test(world_size=[2])
- def _test_checkpoint_zero_no_optimizer(args,
- model,
- hidden_dim,
- load_optimizer_states):
- checkpoint_correctness_verification(args,
- model,
- hidden_dim,
- tmpdir,
- load_optimizer_states=load_optimizer_states)
- _test_checkpoint_zero_no_optimizer(args=args,
- model=model,
- hidden_dim=hidden_dim,
- load_optimizer_states=False)
- @pytest.mark.parametrize('zero_stage, use_cpu_offload, adam_optimizer',
- [
- (0,
- False,
- 'Adam'),
- (1,
- False,
- 'Adam'),
- (2,
- False,
- 'Adam'),
- (2,
- True,
- 'deepspeed_adam'),
- ])
- def test_checkpoint_lr_scheduler(tmpdir, zero_stage, use_cpu_offload, adam_optimizer):
- if use_cpu_offload and not deepspeed.ops.__installed_ops__['cpu-adam']:
- pytest.skip("cpu-adam is not installed")
- config_dict = {
- "train_batch_size": 2,
- "steps_per_print": 1,
- "optimizer": {
- "type": adam_optimizer,
- "params": {
- "lr": 0.00015,
- "betas": [0.8,
- 0.999],
- "eps": 1e-8,
- "weight_decay": 3e-7
- }
- },
- "fp16": {
- "enabled": True
- },
- "zero_optimization": {
- "stage": zero_stage,
- "cpu_offload": use_cpu_offload
- },
- "scheduler": {
- "type": "WarmupLR",
- "params": {
- "warmup_min_lr": 0,
- "warmup_max_lr": 0.001,
- "warmup_num_steps": 1000
- }
- }
- }
- args = args_from_dict(tmpdir, config_dict)
- hidden_dim = 10
- model = SimpleModel(hidden_dim, empty_grad=False)
- @distributed_test(world_size=[2])
- def _test_checkpoint_lr_scheduler(args,
- model,
- hidden_dim,
- load_optimizer_states,
- load_lr_scheduler_states):
- checkpoint_correctness_verification(
- args,
- model,
- hidden_dim,
- tmpdir,
- load_optimizer_states=load_optimizer_states,
- load_lr_scheduler_states=load_lr_scheduler_states)
- _test_checkpoint_lr_scheduler(args=args,
- model=model,
- hidden_dim=hidden_dim,
- load_optimizer_states=False,
- load_lr_scheduler_states=True)
- @pytest.mark.parametrize('zero_stage, use_cpu_offload, adam_optimizer',
- [
- (0,
- False,
- 'Adam'),
- (1,
- False,
- 'Adam'),
- (2,
- False,
- 'Adam'),
- (2,
- True,
- 'deepspeed_adam'),
- ])
- def test_checkpoint_no_lr_scheduler(tmpdir, zero_stage, use_cpu_offload, adam_optimizer):
- if use_cpu_offload and not deepspeed.ops.__installed_ops__['cpu-adam']:
- pytest.skip("cpu-adam is not installed")
- config_dict = {
- "train_batch_size": 2,
- "steps_per_print": 1,
- "optimizer": {
- "type": adam_optimizer,
- "params": {
- "lr": 1e-5
- }
- },
- "fp16": {
- "enabled": True
- },
- "zero_optimization": {
- "stage": zero_stage,
- "cpu_offload": use_cpu_offload
- },
- "scheduler": {
- "type": "WarmupLR",
- "params": {
- "warmup_min_lr": 0,
- "warmup_max_lr": 0.001,
- "warmup_num_steps": 1000
- }
- },
- }
- args = args_from_dict(tmpdir, config_dict)
- hidden_dim = 10
- model = SimpleModel(hidden_dim, empty_grad=False)
- @distributed_test(world_size=[2])
- def _test_checkpoint_no_lr_scheduler(args,
- model,
- hidden_dim,
- load_optimizer_states,
- load_lr_scheduler_states):
- checkpoint_correctness_verification(
- args,
- model,
- hidden_dim,
- tmpdir,
- load_optimizer_states=load_optimizer_states,
- load_lr_scheduler_states=load_lr_scheduler_states)
- _test_checkpoint_no_lr_scheduler(args=args,
- model=model,
- hidden_dim=hidden_dim,
- load_optimizer_states=False,
- load_lr_scheduler_states=False)
- def test_checkpoint_fp32_optimizer(tmpdir):
- config_dict = {
- "train_batch_size": 2,
- "steps_per_print": 1,
- "optimizer": {
- "type": "Adam",
- "params": {
- "lr": 0.00015,
- "betas": [0.8,
- 0.999],
- "eps": 1e-8,
- "weight_decay": 3e-7
- }
- },
- "fp16": {
- "enabled": False
- }
- }
- args = args_from_dict(tmpdir, config_dict)
- hidden_dim = 10
- model = SimpleModel(hidden_dim, empty_grad=False)
- @distributed_test(world_size=[2])
- def _test_checkpoint_fp32_optimizer(args, model, hidden_dim):
- checkpoint_correctness_verification(args, model, hidden_dim, tmpdir, fp16=False)
- _test_checkpoint_fp32_optimizer(args=args, model=model, hidden_dim=hidden_dim)
- @pytest.mark.parametrize("zero_stage", [0, 1])
- def test_checkpoint_pipe_engine(zero_stage, tmpdir, stages=2):
- config_dict = {
- "train_batch_size": 2,
- "train_micro_batch_size_per_gpu": 1,
- "steps_per_print": 1,
- "optimizer": {
- "type": "Adam",
- "params": {
- "lr": 1e-5
- }
- },
- "zero_optimization": {
- "stage": zero_stage
- },
- "fp16": {
- "enabled": zero_stage > 0
- },
- "scheduler": {
- "type": "OneCycle",
- "params": {
- "cycle_first_step_size": 1000,
- "cycle_first_stair_count": 500,
- "cycle_second_step_size": 1000,
- "cycle_second_stair_count": 500,
- "decay_step_size": 1000,
- "cycle_min_lr": 0.0001,
- "cycle_max_lr": 0.0010,
- "decay_lr_rate": 0.001,
- "cycle_min_mom": 0.85,
- "cycle_max_mom": 0.99,
- "decay_mom_rate": 0.0
- }
- }
- }
- @distributed_test(world_size=4)
- def _test(save_folder, num_stages):
- args = args_from_dict(tmpdir, config_dict)
- model = LinearStackPipe(num_stages=num_stages)
- checkpoint_correctness_verification(args=args,
- model=model,
- hidden_dim=model.hidden_dim,
- tmpdir=save_folder,
- fp16=config_dict['fp16']['enabled'],
- load_optimizer_states=True,
- load_lr_scheduler_states=True,
- train_batch=True)
- _test(tmpdir, num_stages=stages)
- @pytest.mark.parametrize("base_topo,test_topo",
- [
- (PipeTopo(num_pp=1,
- num_dp=4),
- PipeTopo(num_pp=4,
- num_dp=1)),
- (PipeTopo(num_pp=2,
- num_dp=2),
- PipeTopo(num_pp=2,
- num_dp=2)),
- (PipeTopo(num_pp=4,
- num_dp=1),
- PipeTopo(num_pp=2,
- num_dp=2)),
- ])
- def test_checkpoint_pipe_module(base_topo, test_topo, tmpdir):
- @distributed_test(world_size=4)
- def _test(base_topo, test_topo, save_folder):
- base_model = LinearStackPipe(topology=base_topo)
- base_model.save_state_dict(save_folder)
- dist.barrier()
- test_model = LinearStackPipe(topology=test_topo)
- test_model.load_state_dir(save_folder)
- # Base and test can have different lengths, so make sure we map from the
- # smaller to larger model
- if len(base_model.forward_funcs) < len(test_model.forward_funcs):
- A = base_model
- B = test_model
- else:
- A = test_model
- B = base_model
- # Compare layers individually since partitions are different
- for idx, A_layer in enumerate(A.forward_funcs):
- if not hasattr(A_layer, 'parameters'):
- # Skip functionals, etc.
- continue
- # Find the corresponding layer in B
- global_idx = idx + A._local_start
- B_local_idx = global_idx - B._local_start
- B_layer = B.forward_funcs[B_local_idx]
- # Compare layer parameters
- for p0, p1 in zip(A_layer.parameters(), B_layer.parameters()):
- assert torch.allclose(p0, p1, atol=1e-07), f"Model state {p0} is not equal to {p1}"
- _test(base_topo, test_topo, save_folder=tmpdir)
|