123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619 |
- # Copyright (c) Microsoft Corporation.
- # SPDX-License-Identifier: Apache-2.0
- # DeepSpeed Team
- import deepspeed
- from types import SimpleNamespace
- from deepspeed.ops.op_builder import CPUAdamBuilder
- from deepspeed.checkpoint.utils import clone_tensors_for_torch_save, get_model_ckpt_name_for_rank
- from deepspeed.accelerator import get_accelerator
- from deepspeed.runtime.utils import required_torch_version
- from unit.common import DistributedTest, DistributedFixture
- from unit.simple_model import *
- from unit.checkpoint.common import *
- import pytest
- class TestZeROCheckpoint(DistributedTest):
- world_size = 2
- @pytest.mark.parametrize('zero_stage', [3])
- def test_pipeline_checkpoint_loading(self, tmpdir, zero_stage):
- config_dict = {
- "train_batch_size": 2,
- "optimizer": {
- "type": 'Adam'
- },
- "fp16": {
- "enabled": True,
- "initial_scale_power": 8
- },
- "zero_optimization": {
- "stage": zero_stage,
- "pipeline_loading_checkpoint": True,
- }
- }
- hidden_dim = 10
- with deepspeed.zero.Init():
- models = [SimpleModel(hidden_dim, empty_grad=False) for _ in range(2)]
- checkpoint_correctness_verification(config_dict, models, hidden_dim, tmpdir, load_module_only=True)
- @pytest.mark.parametrize('zero_stage, use_cpu_offload, adam_optimizer', [(1, False, 'Adam'), (2, False, 'Adam'),
- (2, True, 'deepspeed_adam'),
- (3, False, 'Adam'),
- (3, True, 'deepspeed_adam')])
- def test_load_optimizer_state(self, tmpdir, zero_stage, use_cpu_offload, adam_optimizer):
- if use_cpu_offload and not deepspeed.ops.__compatible_ops__[CPUAdamBuilder.NAME]:
- pytest.skip("cpu-adam is not compatible")
- config_dict = {
- "train_batch_size": 2,
- "steps_per_print": 1,
- "optimizer": {
- "type": 'Adam',
- "params": {
- "lr": 0.00015,
- "betas": [0.8, 0.999],
- "eps": 1e-8,
- "weight_decay": 3e-7
- }
- },
- "fp16": {
- "enabled": True,
- "initial_scale_power": 8
- },
- "wall_clock_breakdown": True,
- "zero_optimization": {
- "stage": zero_stage,
- "cpu_offload": use_cpu_offload
- }
- }
- hidden_dim = 10
- if zero_stage == 3:
- with deepspeed.zero.Init():
- models = [SimpleModel(hidden_dim, empty_grad=False) for _ in range(2)]
- else:
- models = [SimpleModel(hidden_dim, empty_grad=False) for _ in range(2)]
- checkpoint_correctness_verification(config_dict, models, hidden_dim, tmpdir, load_optimizer_states=True)
- @pytest.mark.parametrize('zero_stage, use_cpu_offload, adam_optimizer', [(1, False, "Adam"), (2, False, "Adam"),
- (2, True, 'deepspeed_adam'),
- (3, False, 'Adam'),
- (3, True, 'deepspeed_adam')])
- def test_not_load_optimizer_state(self, tmpdir, zero_stage, use_cpu_offload, adam_optimizer):
- if use_cpu_offload and not deepspeed.ops.__compatible_ops__[CPUAdamBuilder.NAME]:
- pytest.skip("cpu-adam is not compatible")
- config_dict = {
- "train_batch_size": 2,
- "steps_per_print": 1,
- "optimizer": {
- "type": 'Adam',
- "params": {
- "lr": 0.00015,
- "betas": [0.8, 0.999],
- "eps": 1e-8,
- "weight_decay": 3e-7
- }
- },
- "fp16": {
- "enabled": True
- },
- "zero_optimization": {
- "stage": zero_stage,
- "cpu_offload": use_cpu_offload
- }
- }
- hidden_dim = 10
- if zero_stage == 3:
- global DeepSpeedZeroOptimizer_Stage3
- from deepspeed.runtime.zero.stage3 import DeepSpeedZeroOptimizer_Stage3
- with deepspeed.zero.Init():
- models = [SimpleModel(hidden_dim, empty_grad=False) for _ in range(2)]
- else:
- models = [SimpleModel(hidden_dim, empty_grad=False) for _ in range(2)]
- checkpoint_correctness_verification(config_dict, models, hidden_dim, tmpdir, load_optimizer_states=False)
- @pytest.mark.parametrize('zero_stage', [1, 2])
- def test_hybrid_optimizer_state(self, tmpdir, zero_stage):
- config_dict = {
- "train_micro_batch_size_per_gpu": 2,
- "gradient_accumulation_steps": 2,
- "steps_per_print": 1,
- "zero_optimization": {
- "stage": zero_stage
- },
- "zero_allow_untested_optimizer": True,
- "fp16": {
- "enabled": True,
- "initial_scale_power": 8
- }
- }
- hidden_dim = 10
- models = [SimpleModel(hidden_dim=hidden_dim) for _ in range(2)]
- optimizers = [HybridStateOptimizer(model.parameters()) for model in models]
- checkpoint_correctness_verification(config_dict,
- models=models,
- base_optimizers=optimizers,
- hidden_dim=hidden_dim,
- tmpdir=tmpdir,
- load_optimizer_states=True)
- @pytest.mark.parametrize('zero_stage', [0, 1, 2, 3])
- def test_load_module_only(self, tmpdir, zero_stage):
- config_dict = {
- "train_batch_size": 2,
- "optimizer": {
- "type": 'Adam'
- },
- "fp16": {
- "enabled": True,
- "initial_scale_power": 8
- },
- "zero_optimization": {
- "stage": zero_stage,
- }
- }
- hidden_dim = 10
- if zero_stage == 3:
- with deepspeed.zero.Init():
- models = [SimpleModel(hidden_dim, empty_grad=False) for _ in range(2)]
- else:
- models = [SimpleModel(hidden_dim, empty_grad=False) for _ in range(2)]
- checkpoint_correctness_verification(config_dict, models, hidden_dim, tmpdir, load_module_only=True)
- class ws4_model_checkpoint(DistributedFixture):
- world_size = 4
- def run(self, class_tmpdir, elastic_save, load_optim):
- ds_config = {
- "train_batch_size": 4,
- "optimizer": {
- "type": 'Adam'
- },
- "fp16": {
- "enabled": True,
- "initial_scale_power": 8
- },
- "zero_optimization": {
- "stage": 2,
- "elastic_checkpoint": elastic_save
- }
- }
- hidden_dim = 10
- model = SimpleModel(hidden_dim)
- model, _, _, _ = deepspeed.initialize(config=ds_config, model=model, model_parameters=model.parameters())
- data_loader = random_dataloader(model=model, total_samples=8, hidden_dim=hidden_dim, device=model.device)
- for n, batch in enumerate(data_loader):
- loss = model(batch[0], batch[1])
- model.backward(loss)
- model.step()
- if load_optim:
- torch.save(model.optimizer.optimizer.state_dict(), os.path.join(class_tmpdir, 'opt-state-dict'))
- model.save_checkpoint(class_tmpdir)
- @pytest.mark.parametrize("elastic_save", [True, False])
- @pytest.mark.parametrize("elastic_load", [True, False])
- @pytest.mark.parametrize("load_optim", [True, False])
- class TestZeROElasticCheckpoint(DistributedTest):
- world_size = 2
- def test_elastic_checkpoint_fixed_dp(self, tmpdir, elastic_save, elastic_load, load_optim):
- ds_config = {
- "train_batch_size": 2,
- "optimizer": {
- "type": 'Adam'
- },
- "fp16": {
- "enabled": True,
- "initial_scale_power": 8
- },
- "zero_optimization": {
- "stage": 2,
- "elastic_checkpoint": elastic_save
- }
- }
- hidden_dim = 10
- # torch 1.2.* stores raw tensor id numbers in checkpoint state which leads to
- # false positive mismatches in checkpoint state comparisons.
- # Newer torch versions store tensor ids as 0, 1, 2, ...
- expected_mismatch_keys = [] if required_torch_version(min_version=1.4) else ['params']
- models = [SimpleModel(hidden_dim) for _ in range(2)]
- model, _, _, _ = deepspeed.initialize(config=ds_config,
- model=models[0],
- model_parameters=models[0].parameters())
- data_loader = random_dataloader(model=model, total_samples=8, hidden_dim=hidden_dim, device=model.device)
- for n, batch in enumerate(data_loader):
- loss = model(batch[0], batch[1])
- model.backward(loss)
- model.step()
- if load_optim:
- torch.save(model.optimizer.optimizer.state_dict(), os.path.join(tmpdir, 'opt-state-dict'))
- model.save_checkpoint(tmpdir)
- ds_config["zero_optimization"]["elastic_checkpoint"] = elastic_load
- model, _, _, _ = deepspeed.initialize(config=ds_config,
- model=models[1],
- model_parameters=models[1].parameters())
- model.load_checkpoint(tmpdir, load_optimizer_states=load_optim)
- if load_optim:
- saved_sd = torch.load(os.path.join(tmpdir, 'opt-state-dict'))
- curr_sd = model.optimizer.optimizer.state_dict()
- for curr_param_group, saved_param_group in zip(curr_sd['param_groups'], saved_sd['param_groups']):
- compare_state_dicts(curr_param_group, saved_param_group, expected_mismatch_keys)
- data_loader = random_dataloader(model=model, total_samples=8, hidden_dim=hidden_dim, device=model.device)
- for n, batch in enumerate(data_loader):
- loss = model(batch[0], batch[1])
- model.backward(loss)
- model.step()
- def test_elastic_checkpoint_change_dp(self, ws4_model_checkpoint, class_tmpdir, elastic_save, elastic_load,
- load_optim):
- ds_config = {
- "train_batch_size": 4,
- "optimizer": {
- "type": 'Adam'
- },
- "fp16": {
- "enabled": True,
- "initial_scale_power": 8
- },
- "zero_optimization": {
- "stage": 2,
- "elastic_checkpoint": elastic_load
- }
- }
- hidden_dim = 10
- model = SimpleModel(hidden_dim)
- # Load checkpoint with dp world size = 2
- model, _, _, _ = deepspeed.initialize(config=ds_config, model=model, model_parameters=model.parameters())
- if load_optim:
- with pytest.raises(deepspeed.runtime.zero.utils.ZeRORuntimeException):
- model.load_checkpoint(class_tmpdir, load_optimizer_states=load_optim)
- else:
- model.load_checkpoint(class_tmpdir, load_optimizer_states=load_optim)
- class TestZeROSaveLoadEdgeCase(DistributedTest):
- world_size = 2
- @pytest.mark.parametrize('zero_stage', [0, 1, 2, 3])
- def test_immediate_save_load(self, tmpdir, zero_stage):
- config_dict = {
- "train_batch_size": 4,
- "optimizer": {
- "type": 'Adam'
- },
- "fp16": {
- "enabled": True,
- "initial_scale_power": 8
- },
- "zero_optimization": {
- "stage": zero_stage,
- }
- }
- hidden_dim = 10
- model = SimpleModel(hidden_dim)
- ds_model = create_deepspeed_model(config_dict=config_dict, model=model, base_optimizer=None)
- ds_model.save_checkpoint(tmpdir)
- ds_model.load_checkpoint(tmpdir,
- load_optimizer_states=False,
- load_lr_scheduler_states=False,
- load_module_only=False)
- @pytest.mark.parametrize('zero_stage', [0, 1, 2, 3])
- def test_load_immediate_save(self, tmpdir, zero_stage):
- config_dict = {
- "train_batch_size": 4,
- "optimizer": {
- "type": 'Adam'
- },
- "fp16": {
- "enabled": True,
- "initial_scale_power": 8
- },
- "zero_optimization": {
- "stage": zero_stage,
- }
- }
- hidden_dim = 10
- model = SimpleModel(hidden_dim)
- # 1. pretrain a model and save it
- dtype = torch.half
- ds_model = create_deepspeed_model(config_dict=config_dict, model=model, base_optimizer=None)
- data_loader = random_dataloader(model=ds_model,
- total_samples=1,
- hidden_dim=hidden_dim,
- device=ds_model.device,
- dtype=dtype)
- for _, batch in enumerate(data_loader):
- loss = ds_model(batch[0], batch[1])
- ds_model.backward(loss)
- ds_model.step()
- ds_model.empty_partition_cache()
- ds_model.save_checkpoint(tmpdir)
- # 2. load and immediately save a model with a fresh ds engine
- ds_model = create_deepspeed_model(config_dict=config_dict, model=model, base_optimizer=None)
- ds_model.load_checkpoint(tmpdir,
- load_optimizer_states=False,
- load_lr_scheduler_states=False,
- load_module_only=False)
- ds_model.save_checkpoint(tmpdir)
- @pytest.mark.parametrize('zero_stage', [0, 1, 2, 3])
- def test_save_before_accum_grad_is_done(self, tmpdir, zero_stage):
- config_dict = {
- "optimizer": {
- "type": 'Adam'
- },
- "fp16": {
- "enabled": True,
- "initial_scale_power": 8
- },
- "zero_optimization": {
- "stage": zero_stage,
- "stage3_gather_fp16_weights_on_model_save": True,
- },
- "gradient_accumulation_steps": 2,
- "train_micro_batch_size_per_gpu": 1,
- "train_batch_size": 4,
- }
- hidden_dim = 10
- model = SimpleModel(hidden_dim)
- # This test reproduces a bug where one tries to retrieve a 16bit model before grad_accum
- # cycle was completed.
- # So we config grad_accum=2 and step only once and save_16bit_model
- ds_model = create_deepspeed_model(config_dict=config_dict, model=model, base_optimizer=None)
- data_loader = random_dataloader(model=ds_model,
- total_samples=2,
- hidden_dim=hidden_dim,
- device=ds_model.device,
- dtype=torch.half)
- batch = next(iter(data_loader))
- loss = ds_model(batch[0], batch[1])
- ds_model.backward(loss)
- ds_model.step()
- ds_model.empty_partition_cache()
- # we stepped only once, and now save 16bit model before gradient_accumulation_steps=2 is complete
- ds_model.save_16bit_model(tmpdir, "model.pt")
- # let's test just as well that we can save the checkpoint too
- ds_model.save_checkpoint(tmpdir)
- class TestZeROCheckpointFrozenWeights(DistributedTest):
- world_size = 2
- @pytest.mark.parametrize('zero_stage', [1, 2, 3])
- def test_load_optimizer_state(self, tmpdir, zero_stage):
- config_dict = {
- "train_batch_size": 2,
- "steps_per_print": 1,
- "optimizer": {
- "type": 'Adam',
- "params": {
- "lr": 0.00015,
- "betas": [0.8, 0.999],
- "eps": 1e-8,
- "weight_decay": 3e-7
- }
- },
- "fp16": {
- "enabled": True,
- "initial_scale_power": 8
- },
- "wall_clock_breakdown": True,
- "zero_optimization": {
- "stage": zero_stage
- }
- }
- hidden_dim = 10
- with deepspeed.zero.Init(enabled=zero_stage == 3):
- models = [SimpleFrozenModel(hidden_dim, empty_grad=False) for _ in range(2)]
- checkpoint_correctness_verification(config_dict, models, hidden_dim, tmpdir, load_optimizer_states=True)
- @pytest.mark.parametrize('zero_stage', [1, 2, 3])
- def test_not_load_optimizer_state(self, tmpdir, zero_stage):
- config_dict = {
- "train_batch_size": 2,
- "steps_per_print": 1,
- "optimizer": {
- "type": 'Adam',
- "params": {
- "lr": 0.00015,
- "betas": [0.8, 0.999],
- "eps": 1e-8,
- "weight_decay": 3e-7
- }
- },
- "fp16": {
- "enabled": True
- },
- "zero_optimization": {
- "stage": zero_stage
- }
- }
- hidden_dim = 10
- with deepspeed.zero.Init(enabled=zero_stage == 3):
- models = [SimpleFrozenModel(hidden_dim, empty_grad=False) for _ in range(2)]
- checkpoint_correctness_verification(config_dict, models, hidden_dim, tmpdir, load_optimizer_states=False)
- @pytest.mark.parametrize('zero_stage', [1, 2, 3])
- def test_load_module_only(self, tmpdir, zero_stage):
- config_dict = {
- "train_batch_size": 2,
- "optimizer": {
- "type": 'Adam'
- },
- "fp16": {
- "enabled": True,
- "initial_scale_power": 8
- },
- "zero_optimization": {
- "stage": zero_stage,
- }
- }
- hidden_dim = 10
- with deepspeed.zero.Init(enabled=zero_stage == 3):
- models = [SimpleFrozenModel(hidden_dim, empty_grad=False) for _ in range(2)]
- checkpoint_correctness_verification(config_dict, models, hidden_dim, tmpdir, load_module_only=True)
- @pytest.mark.parametrize('zero_stage', [1, 2])
- def test_save_exclude_frozen_weights(self, tmpdir, zero_stage):
- world_size = 1
- config_dict = {
- "train_micro_batch_size_per_gpu": 1,
- "optimizer": {
- "type": 'Adam'
- },
- "fp16": {
- "enabled": True,
- "initial_scale_power": 8
- },
- "zero_optimization": {
- "stage": zero_stage,
- }
- }
- hidden_dim = 10
- model = SimpleFrozenModel(hidden_dim, empty_grad=False)
- ds_engine, _, _, _ = deepspeed.initialize(model=model, model_parameters=model.parameters(), config=config_dict)
- # Validate backwards-compatibility of including frozen parameters in checkpoint
- all_ckpt_folder = os.path.join(tmpdir, 'all_params')
- ds_engine.save_checkpoint(all_ckpt_folder)
- all_params_ckpt_file = get_model_ckpt_name_for_rank(os.path.join(all_ckpt_folder, 'global_step0'), '00')
- loaded_all_param_model = torch.load(all_params_ckpt_file)['module']
- all_param_names = set([n for n, p in model.named_parameters()])
- assert set(loaded_all_param_model.keys()) == all_param_names
- # Validate exclusion of frozen parameters
- trainable_ckpt_folder = os.path.join(tmpdir, 'no_frozen_params')
- ds_engine.save_checkpoint(trainable_ckpt_folder, exclude_frozen_parameters=True)
- trainable_ckpt_file = get_model_ckpt_name_for_rank(os.path.join(trainable_ckpt_folder, 'global_step0'), '00')
- # Excluding frozen parameters should reduce checkpoint size
- assert os.path.getsize(all_params_ckpt_file) > os.path.getsize(trainable_ckpt_file)
- loaded_trainable_param_model = torch.load(trainable_ckpt_file)['module']
- frozen_param_names = set([n for n, p in model.named_parameters() if not p.requires_grad])
- loaded_trainable_param_names = set(loaded_trainable_param_model.keys())
- overlap_names = set.intersection(loaded_trainable_param_names, frozen_param_names)
- assert len(overlap_names) == 0
- trainable_param_names = set([n for n, p in model.named_parameters() if p.requires_grad])
- assert loaded_trainable_param_names == trainable_param_names
- class TestSaveTensorClone(DistributedTest):
- world_size = 1
- @pytest.mark.parametrize('zero_stage', [1, 2])
- @pytest.mark.parametrize('use_cpu_device', [True, False])
- def test_save_tensor_clone(self, tmpdir, zero_stage, use_cpu_device):
- ds_config = {
- "optimizer": {
- "type": "AdamW",
- },
- "zero_optimization": {
- "stage": zero_stage
- },
- "train_batch_size": 1,
- "train_micro_batch_size_per_gpu": 1
- }
- hidden_dim = 1024
- model = SimpleModel(hidden_dim, nlayers=4).half()
- ref_model_state_dict = model.state_dict()
- ds_engine, _, _, _ = deepspeed.initialize(model=model, config_params=ds_config)
- clone_device = torch.device('cpu') if use_cpu_device else get_accelerator().current_device()
- clone_state_dict = clone_tensors_for_torch_save(ds_engine.module.state_dict())
- compare_state_dicts(ref_model_state_dict, clone_state_dict)
- ref_ckpt_file = os.path.join(tmpdir, 'ref_ckpt.pt')
- torch.save(ref_model_state_dict, ref_ckpt_file)
- clone_ckpt_file = os.path.join(tmpdir, 'clone_ckpt.pt')
- torch.save(clone_state_dict, clone_ckpt_file)
- compare_state_dicts(torch.load(ref_ckpt_file), torch.load(clone_ckpt_file))
- class TestZeRONonDistributed(DistributedTest):
- world_size = 1
- init_distributed = False
- @pytest.mark.parametrize('zero_stage', [1, 2, 3])
- def test_chmod_exception_handling(self, monkeypatch, zero_stage):
- config_dict = {
- "optimizer": {
- "type": "AdamW"
- },
- "train_batch_size": 1,
- "zero_optimization": {
- "stage": zero_stage
- }
- }
- args = SimpleNamespace(local_rank=0)
- net = SimpleModel(hidden_dim=4)
- engine, _, _, _ = deepspeed.initialize(args=args,
- config=config_dict,
- model=net,
- model_parameters=net.parameters())
- log_called = False
- def mock_logger_info(message, *args, **kwargs):
- nonlocal log_called
- log_called = True
- monkeypatch.setattr("deepspeed.utils.logger.info", mock_logger_info)
- """
- This is presented for use-cases like Azure Storage File Share (where permissions are not allowed)
- We use a fake file for this test (file not existing would present a similar issue as not being able to chmod)
- """
- fake_recovery_script_dst = os.path.join("tmp", "zero_to_fp32.py")
- engine._change_recovery_script_permissions(fake_recovery_script_dst)
- assert log_called, "Expected deepspeed.utils.logger.info to be called."
|