test_sparse_grads.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. # Copyright (c) Microsoft Corporation.
  2. # SPDX-License-Identifier: Apache-2.0
  3. # DeepSpeed Team
  4. import torch
  5. import pytest
  6. import deepspeed
  7. from unit.common import DistributedTest
  8. from deepspeed.accelerator import get_accelerator
  9. import deepspeed.utils.groups as groups
  10. if get_accelerator().device_name() == 'hpu':
  11. pytest.skip("sparse_gradients not supported by HPU.", allow_module_level=True)
  12. class Model(torch.nn.Module):
  13. def __init__(self):
  14. super().__init__()
  15. self.emb = torch.nn.EmbeddingBag(10, 3, mode="sum", sparse=True)
  16. self.linear = torch.nn.Linear(3, 1)
  17. def forward(self, x, offsets):
  18. return self.linear(self.emb(x, offsets))
  19. class Adam(torch.optim.Optimizer):
  20. def __init__(self, dense_params, sparse_params):
  21. super().__init__(dense_params + sparse_params, defaults={})
  22. self.adam = torch.optim.Adam(dense_params)
  23. self.adam_sparse = torch.optim.SparseAdam(sparse_params)
  24. @torch.no_grad()
  25. def step(self, closure=None):
  26. loss_1 = self.adam.step(closure)
  27. loss_2 = self.adam_sparse.step(closure)
  28. if loss_1 is not None and loss_2 is not None:
  29. return loss_1 + loss_2
  30. return loss_1 or loss_2
  31. class TestSparseAdam(DistributedTest):
  32. world_size = 2
  33. def test(self):
  34. config_dict = {"train_batch_size": 2, "steps_per_print": 1, "sparse_gradients": True}
  35. model = Model()
  36. optimizer = Adam(list(model.linear.parameters()), list(model.emb.parameters()))
  37. engine, _, _, _ = deepspeed.initialize(model=model, optimizer=optimizer, config=config_dict)
  38. loss = torch.nn.BCEWithLogitsLoss()
  39. x = torch.tensor([1, 2, 4, 5, 4, 3, 2, 9], dtype=torch.long, device=engine.device)
  40. offsets = torch.tensor([0, 4], dtype=torch.long, device=engine.device)
  41. y = torch.tensor([[1.0], [0.0]], device=engine.device)
  42. res = engine(x, offsets)
  43. engine.backward(loss(res, y))
  44. engine.step()
  45. results = [engine.all_gather_scalar(i, groups._get_data_parallel_group()) for i in model.emb.parameters()]
  46. for res in results:
  47. assert torch.allclose(res[0], res[1])