123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708 |
- import torch
- import apex
- import deepspeed
- import argparse
- import pytest
- import json
- import os
- from common import distributed_test
- from simple_model import SimpleModel, SimpleOptimizer, random_dataloader, args_from_dict
- lamb_available = pytest.mark.skipif(not deepspeed.ops.__installed_ops__['lamb'],
- reason="lamb is not installed")
- @lamb_available
- def test_lamb_fp32_grad_clip(tmpdir):
- config_dict = {
- "train_batch_size": 2,
- "steps_per_print": 1,
- "optimizer": {
- "type": "Lamb",
- "params": {
- "lr": 0.00015
- }
- },
- "gradient_clipping": 1.0
- }
- args = args_from_dict(tmpdir, config_dict)
- hidden_dim = 10
- model = SimpleModel(hidden_dim, empty_grad=False)
- @distributed_test(world_size=[1, 2])
- def _test_lamb_fp32_grad_clip(args, model, hidden_dim):
- model, _, _,_ = deepspeed.initialize(args=args,
- model=model,
- model_parameters=model.parameters())
- data_loader = random_dataloader(model=model,
- total_samples=50,
- hidden_dim=hidden_dim,
- device=model.device,
- dtype=torch.float)
- for n, batch in enumerate(data_loader):
- loss = model(batch[0], batch[1])
- model.backward(loss)
- model.step()
- _test_lamb_fp32_grad_clip(args=args, model=model, hidden_dim=hidden_dim)
- @lamb_available
- def test_lamb_fp16_basic(tmpdir):
- config_dict = {
- "train_batch_size": 2,
- "steps_per_print": 1,
- "optimizer": {
- "type": "Lamb",
- "params": {
- "lr": 0.00015
- }
- },
- "gradient_clipping": 1.0,
- "fp16": {
- "enabled": True
- }
- }
- args = args_from_dict(tmpdir, config_dict)
- hidden_dim = 10
- model = SimpleModel(hidden_dim, empty_grad=False)
- @distributed_test(world_size=[1, 2])
- def _test_lamb_fp16_basic(args, model, hidden_dim):
- model, _, _,_ = deepspeed.initialize(args=args,
- model=model,
- model_parameters=model.parameters())
- data_loader = random_dataloader(model=model,
- total_samples=50,
- hidden_dim=hidden_dim,
- device=model.device)
- for n, batch in enumerate(data_loader):
- loss = model(batch[0], batch[1])
- model.backward(loss)
- model.step()
- _test_lamb_fp16_basic(args=args, model=model, hidden_dim=hidden_dim)
- @lamb_available
- def test_lamb_fp16_empty_grad(tmpdir):
- config_dict = {
- "train_batch_size": 2,
- "steps_per_print": 1,
- "optimizer": {
- "type": "Lamb",
- "params": {
- "lr": 0.00015
- }
- },
- "gradient_clipping": 1.0,
- "fp16": {
- "enabled": True
- }
- }
- args = args_from_dict(tmpdir, config_dict)
- hidden_dim = 10
- model = SimpleModel(hidden_dim, empty_grad=True, rank=args.local_rank)
- @distributed_test(world_size=[2])
- def _test_lamb_fp16_empty_grad(args, model, hidden_dim):
- model, _, _,_ = deepspeed.initialize(args=args,
- model=model,
- model_parameters=model.parameters())
- data_loader = random_dataloader(model=model,
- total_samples=50,
- hidden_dim=hidden_dim,
- device=model.device)
- for n, batch in enumerate(data_loader):
- loss = model(batch[0], batch[1])
- model.backward(loss)
- model.step()
- _test_lamb_fp16_empty_grad(args=args, model=model, hidden_dim=hidden_dim)
- def test_adam_fp32_empty_grad(tmpdir):
- config_dict = {
- "train_batch_size": 2,
- "steps_per_print": 1,
- "optimizer": {
- "type": "Adam",
- "params": {
- "lr": 0.00015
- }
- },
- "gradient_clipping": 1.0,
- "fp16": {
- "enabled": False
- }
- }
- args = args_from_dict(tmpdir, config_dict)
- hidden_dim = 10
- model = SimpleModel(hidden_dim, empty_grad=True, rank=args.local_rank)
- @distributed_test(world_size=[2])
- def _test_adam_fp32_empty_grad(args, model, hidden_dim):
- model, _, _,_ = deepspeed.initialize(args=args,
- model=model,
- model_parameters=model.parameters())
- data_loader = random_dataloader(model=model,
- total_samples=50,
- hidden_dim=hidden_dim,
- device=model.device,
- dtype=torch.float)
- for n, batch in enumerate(data_loader):
- loss = model(batch[0], batch[1])
- model.backward(loss)
- model.step()
- _test_adam_fp32_empty_grad(args=args, model=model, hidden_dim=hidden_dim)
- def test_adamw_fp16_basic(tmpdir):
- config_dict = {
- "train_batch_size": 1,
- "steps_per_print": 1,
- "fp16": {
- "enabled": True
- }
- }
- args = args_from_dict(tmpdir, config_dict)
- hidden_dim = 10
- model = SimpleModel(hidden_dim, empty_grad=False)
- @distributed_test(world_size=[1])
- def _test_adamw_fp16_basic(args, model, hidden_dim):
- optimizer = torch.optim.AdamW(params=model.parameters())
- model, _, _,_ = deepspeed.initialize(args=args,
- model=model,
- optimizer=optimizer)
- data_loader = random_dataloader(model=model,
- total_samples=50,
- hidden_dim=hidden_dim,
- device=model.device)
- for n, batch in enumerate(data_loader):
- loss = model(batch[0], batch[1])
- model.backward(loss)
- model.step()
- _test_adamw_fp16_basic(args=args, model=model, hidden_dim=hidden_dim)
- def test_adamw_fp16_empty_grad(tmpdir):
- config_dict = {
- "train_batch_size": 1,
- "steps_per_print": 1,
- "fp16": {
- "enabled": True
- }
- }
- args = args_from_dict(tmpdir, config_dict)
- hidden_dim = 10
- model = SimpleModel(hidden_dim, empty_grad=True)
- @distributed_test(world_size=[1])
- def _test_adamw_fp16_empty_grad(args, model, hidden_dim):
- optimizer = torch.optim.AdamW(params=model.parameters())
- model, _, _,_ = deepspeed.initialize(args=args,
- model=model,
- optimizer=optimizer)
- data_loader = random_dataloader(model=model,
- total_samples=50,
- hidden_dim=hidden_dim,
- device=model.device)
- for n, batch in enumerate(data_loader):
- loss = model(batch[0], batch[1])
- model.backward(loss)
- model.step()
- _test_adamw_fp16_empty_grad(args=args, model=model, hidden_dim=hidden_dim)
- @pytest.mark.parametrize('zero_stage, use_cpu_offload',
- [
- (1,
- False),
- (2,
- False),
- (2,
- True),
- ])
- def test_adam_fp16_zero_onecycle_compatibility(tmpdir, zero_stage, use_cpu_offload):
- if use_cpu_offload and not deepspeed.ops.__installed_ops__['cpu-adam']:
- pytest.skip("cpu-adam is not installed")
- config_dict = {
- "train_batch_size": 1,
- "steps_per_print": 1,
- "optimizer": {
- "type": "Adam",
- "params": {
- "lr": 0.00015
- }
- },
- "scheduler": {
- "type": "OneCycle",
- "params": {
- "cycle_first_step_size": 16000,
- "cycle_first_stair_count": 8000,
- "decay_step_size": 16000,
- "cycle_min_lr": 1e-06,
- "cycle_max_lr": 3e-05,
- "decay_lr_rate": 1e-07,
- "cycle_min_mom": 0.85,
- "cycle_max_mom": 0.99,
- "decay_mom_rate": 0.0
- }
- },
- "fp16": {
- "enabled": True
- },
- "zero_optimization": {
- "stage": zero_stage,
- "cpu_offload": use_cpu_offload
- }
- }
- args = args_from_dict(tmpdir, config_dict)
- hidden_dim = 10
- model = SimpleModel(hidden_dim, empty_grad=True)
- @distributed_test(world_size=[1])
- def _test_adam_fp16_zero_onecycle_compatibility(args, model, hidden_dim):
- model, _, _,_ = deepspeed.initialize(args=args,
- model=model,
- model_parameters=model.parameters())
- data_loader = random_dataloader(model=model,
- total_samples=50,
- hidden_dim=hidden_dim,
- device=model.device)
- for n, batch in enumerate(data_loader):
- loss = model(batch[0], batch[1])
- model.backward(loss)
- model.step()
- _test_adam_fp16_zero_onecycle_compatibility(args=args,
- model=model,
- hidden_dim=hidden_dim)
- @pytest.mark.parametrize('zero_stage, use_cpu_offload',
- [
- (1,
- False),
- (2,
- False),
- (2,
- True),
- ])
- def test_zero_static_scale(tmpdir, zero_stage, use_cpu_offload):
- if use_cpu_offload and not deepspeed.ops.__installed_ops__['cpu-adam']:
- pytest.skip("cpu-adam is not installed")
- config_dict = {
- "train_batch_size": 4,
- "steps_per_print": 1,
- "optimizer": {
- "type": "Adam",
- "params": {
- "lr": 0.00015
- }
- },
- "fp16": {
- "enabled": True,
- "loss_scale": 138.
- },
- "zero_optimization": {
- "stage": zero_stage,
- "cpu_offload": use_cpu_offload
- }
- }
- args = args_from_dict(tmpdir, config_dict)
- @distributed_test(world_size=2)
- def _test_zero_static_scale(args):
- hidden_dim = 10
- model = SimpleModel(hidden_dim, empty_grad=True)
- model, optim, _,_ = deepspeed.initialize(args=args,
- model=model,
- model_parameters=model.parameters())
- # Ensure the static scaler is configured.
- assert optim.dynamic_loss_scale == False
- assert optim.loss_scaler.loss_scale == 138.
- # Now make sure things work..
- data_loader = random_dataloader(model=model,
- total_samples=10,
- hidden_dim=hidden_dim,
- device=model.device)
- for n, batch in enumerate(data_loader):
- loss = model(batch[0], batch[1])
- model.backward(loss)
- model.step()
- _test_zero_static_scale(args)
- def test_zero_static_scale_deprecated_format(tmpdir):
- config_dict = {
- "train_batch_size": 4,
- "steps_per_print": 1,
- "optimizer": {
- "type": "Adam",
- "params": {
- "lr": 0.00015
- }
- },
- "fp16": {
- "enabled": True,
- "loss_scale": 138.
- },
- "zero_optimization": True
- }
- args = args_from_dict(tmpdir, config_dict)
- @distributed_test(world_size=2)
- def _test_zero_static_scale(args):
- hidden_dim = 10
- model = SimpleModel(hidden_dim, empty_grad=True)
- model, optim, _,_ = deepspeed.initialize(args=args,
- model=model,
- model_parameters=model.parameters())
- # Ensure the static scaler is configured.
- assert optim.dynamic_loss_scale == False
- assert optim.loss_scaler.loss_scale == 138.
- # Now make sure things work..
- data_loader = random_dataloader(model=model,
- total_samples=10,
- hidden_dim=hidden_dim,
- device=model.device)
- for n, batch in enumerate(data_loader):
- loss = model(batch[0], batch[1])
- model.backward(loss)
- model.step()
- _test_zero_static_scale(args)
- @pytest.mark.parametrize('zero_stage, use_cpu_offload',
- [
- (1,
- False),
- (2,
- False),
- (2,
- True),
- ])
- def test_zero_allow_untested_optimizer(tmpdir, zero_stage, use_cpu_offload):
- if use_cpu_offload and not deepspeed.ops.__installed_ops__['cpu-adam']:
- pytest.skip("cpu-adam is not installed")
- config_dict = {
- "train_batch_size": 4,
- "steps_per_print": 1,
- "fp16": {
- "enabled": True,
- },
- "zero_optimization": {
- "stage": zero_stage,
- "cpu_offload": use_cpu_offload
- },
- "zero_allow_untested_optimizer": False
- }
- args = args_from_dict(tmpdir, config_dict)
- @distributed_test(world_size=[1])
- def _test_zero_allow_untested_optimizer(args):
- hidden_dim = 10
- model = SimpleModel(hidden_dim, empty_grad=True)
- optimizer = SimpleOptimizer(model.parameters())
- with pytest.raises(AssertionError):
- model, optim, _,_ = deepspeed.initialize(args=args,
- model=model,
- optimizer=optimizer,
- model_parameters=model.parameters())
- _test_zero_allow_untested_optimizer(args)
- @pytest.mark.parametrize('zero_stage, use_cpu_offload',
- [
- (1,
- False),
- (2,
- False),
- (2,
- True),
- ])
- def test_zero_empty_partition(tmpdir, zero_stage, use_cpu_offload):
- if use_cpu_offload and not deepspeed.ops.__installed_ops__['cpu-adam']:
- pytest.skip("cpu-adam is not installed")
- config_dict = {
- "train_micro_batch_size_per_gpu": 1,
- "gradient_accumulation_steps": 1,
- "fp16": {
- "enabled": True,
- "initial_scale_power": 8
- },
- "optimizer": {
- "type": "Adam",
- "params": {
- "lr": 0.00015
- }
- },
- "zero_optimization": {
- "stage": zero_stage,
- "cpu_offload": use_cpu_offload,
- "reduce_bucket_size": 100,
- "allgather_bucket_size": 100
- }
- }
- args = args_from_dict(tmpdir, config_dict)
- @distributed_test(world_size=[3])
- def _test_zero_empty_partition(args):
- hidden_dim = 1
- model = SimpleModel(hidden_dim)
- # Ensure model has 2 parameters, to cause empty partition with DP=3
- assert len(list(model.parameters())) == 2
- model, _, _, _ = deepspeed.initialize(args=args,
- model=model,
- model_parameters=model.parameters())
- # Now make sure things work..
- data_loader = random_dataloader(model=model,
- total_samples=1,
- hidden_dim=hidden_dim,
- device=model.device)
- for n, batch in enumerate(data_loader):
- loss = model(batch[0], batch[1])
- model.backward(loss)
- model.step()
- _test_zero_empty_partition(args)
- def test_adam_amp_basic(tmpdir):
- config_dict = {"train_batch_size": 1, "steps_per_print": 1, "amp": {"enabled": True}}
- args = args_from_dict(tmpdir, config_dict)
- hidden_dim = 10
- model = SimpleModel(hidden_dim, empty_grad=False)
- @distributed_test(world_size=[1])
- def _test_adam_amp_basic(args, model, hidden_dim):
- optimizer = torch.optim.Adam(params=model.parameters())
- model, _, _,_ = deepspeed.initialize(args=args,
- model=model,
- optimizer=optimizer)
- data_loader = random_dataloader(model=model,
- total_samples=50,
- hidden_dim=hidden_dim,
- device=model.device)
- for n, batch in enumerate(data_loader):
- loss = model(batch[0], batch[1])
- model.backward(loss)
- model.step()
- _test_adam_amp_basic(args=args, model=model, hidden_dim=hidden_dim)
- @lamb_available
- def test_lamb_amp_basic(tmpdir):
- config_dict = {
- "train_batch_size": 2,
- "steps_per_print": 1,
- "optimizer": {
- "type": "Lamb",
- "params": {
- "lr": 0.00015
- }
- },
- "gradient_clipping": 1.0,
- "amp": {
- "enabled": True,
- }
- }
- args = args_from_dict(tmpdir, config_dict)
- hidden_dim = 10
- model = SimpleModel(hidden_dim, empty_grad=False)
- @distributed_test(world_size=[1, 2])
- def _test_lamb_amp_basic(args, model, hidden_dim):
- model, _, _,_ = deepspeed.initialize(args=args,
- model=model,
- model_parameters=model.parameters())
- data_loader = random_dataloader(model=model,
- total_samples=50,
- hidden_dim=hidden_dim,
- device=model.device)
- for n, batch in enumerate(data_loader):
- loss = model(batch[0], batch[1])
- model.backward(loss)
- model.step()
- _test_lamb_amp_basic(args=args, model=model, hidden_dim=hidden_dim)
- def test_adam_amp_o2(tmpdir):
- config_dict = {
- "train_batch_size": 2,
- "steps_per_print": 1,
- "optimizer": {
- "type": "Adam",
- "params": {
- "lr": 0.00015
- }
- },
- "gradient_clipping": 1.0,
- "amp": {
- "enabled": True,
- "opt_level": "O2"
- }
- }
- args = args_from_dict(tmpdir, config_dict)
- hidden_dim = 10
- model = SimpleModel(hidden_dim, empty_grad=False)
- @distributed_test(world_size=[1, 2])
- def _test_adam_amp_o2(args, model, hidden_dim):
- model, _, _,_ = deepspeed.initialize(args=args,
- model=model,
- model_parameters=model.parameters())
- data_loader = random_dataloader(model=model,
- total_samples=50,
- hidden_dim=hidden_dim,
- device=model.device)
- for n, batch in enumerate(data_loader):
- loss = model(batch[0], batch[1])
- model.backward(loss)
- model.step()
- _test_adam_amp_o2(args=args, model=model, hidden_dim=hidden_dim)
- def test_adam_amp_o2_empty_grad(tmpdir):
- config_dict = {
- "train_batch_size": 2,
- "steps_per_print": 1,
- "optimizer": {
- "type": "Adam",
- "params": {
- "lr": 0.00015
- }
- },
- "gradient_clipping": 1.0,
- "amp": {
- "enabled": True,
- "opt_level": "O2"
- }
- }
- args = args_from_dict(tmpdir, config_dict)
- hidden_dim = 10
- model = SimpleModel(hidden_dim, empty_grad=False, rank=args.local_rank)
- @distributed_test(world_size=[2])
- def _test_adam_amp_o2_empty_grad(args, model, hidden_dim):
- model, _, _,_ = deepspeed.initialize(args=args,
- model=model,
- model_parameters=model.parameters())
- data_loader = random_dataloader(model=model,
- total_samples=50,
- hidden_dim=hidden_dim,
- device=model.device)
- for n, batch in enumerate(data_loader):
- loss = model(batch[0], batch[1])
- model.backward(loss)
- model.step()
- _test_adam_amp_o2_empty_grad(args=args, model=model, hidden_dim=hidden_dim)
- @pytest.mark.parametrize('zero_stage, optimizer_constructor',
- [(1,
- apex.optimizers.FusedAdam),
- (2,
- torch.optim.Adam),
- (2,
- apex.optimizers.FusedAdam)])
- def test_zero_supported_client_optimizer(tmpdir, zero_stage, optimizer_constructor):
- config_dict = {
- "train_batch_size": 2,
- "steps_per_print": 1,
- "fp16": {
- "enabled": True
- },
- "zero_optimization": {
- "stage": zero_stage
- }
- }
- args = args_from_dict(tmpdir, config_dict)
- hidden_dim = 10
- model = SimpleModel(hidden_dim, empty_grad=False)
- @distributed_test(world_size=[1])
- def _test_zero_supported_client_optimizer(args, model, optimizer_constructor):
- client_optimizer = optimizer_constructor(params=model.parameters())
- model, _, _, _ = deepspeed.initialize(args=args,
- model=model,
- optimizer=client_optimizer)
- _test_zero_supported_client_optimizer(args=args,
- model=model,
- optimizer_constructor=optimizer_constructor)
- def test_zero2_reduce_scatter_off(tmpdir):
- config_dict = {
- "train_batch_size": 2,
- "steps_per_print": 1,
- "optimizer": {
- "type": "Adam",
- "params": {
- "lr": 0.00015
- }
- },
- "gradient_clipping": 1.0,
- "zero_optimization": {
- "stage": 2,
- "contiguous_gradients": True,
- "allgather_bucket_size": 2000000000,
- "reduce_bucket_size": 200000000,
- "overlap_comm": False,
- "reduce_scatter": False
- },
- "fp16": {
- "enabled": True
- }
- }
- args = args_from_dict(tmpdir, config_dict)
- hidden_dim = 10
- model = SimpleModel(hidden_dim, rank=args.local_rank)
- @distributed_test(world_size=[2])
- def _helper(args, model, hidden_dim):
- model, _, _,_ = deepspeed.initialize(args=args,
- model=model,
- model_parameters=model.parameters())
- data_loader = random_dataloader(model=model,
- total_samples=50,
- hidden_dim=hidden_dim,
- device=model.device)
- for n, batch in enumerate(data_loader):
- loss = model(batch[0], batch[1])
- model.backward(loss)
- model.step()
- _helper(args=args, model=model, hidden_dim=hidden_dim)
|