123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- # Copyright (c) Microsoft Corporation.
- # SPDX-License-Identifier: Apache-2.0
- # DeepSpeed Team
- import torch
- import deepspeed
- from pytest import approx
- from unit.common import DistributedTest
- from unit.multi_output_model import MultiOutputModel, multi_output_dataloader
- class TestTwoOutputModel(DistributedTest):
- world_size = 1
- def test(self, tmpdir):
- grad_accumulation_steps = 2
- micro_batch_size = 1
- world_size = self.world_size
- config_dict = {
- "train_micro_batch_size_per_gpu": micro_batch_size,
- "gradient_accumulation_steps": grad_accumulation_steps,
- "train_batch_size": micro_batch_size * grad_accumulation_steps * world_size,
- "steps_per_print": 1,
- "optimizer": {
- "type": "Adam",
- "params": {
- "lr": 0.00015
- }
- },
- "fp16": {
- "enabled": True
- }
- }
- hidden_dim = 10
- weight_value = 0.1
- model = MultiOutputModel(hidden_dim, weight_value)
- model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters())
- total_samples = 4
- data_loader = multi_output_dataloader(model=model,
- total_samples=total_samples,
- hidden_dim=hidden_dim,
- device=model.device,
- inputs=[1.0, 2.0],
- targets=[1, 2])
- for n, batch in enumerate(data_loader):
- assert len(batch) % 2 == 0, \
- f"multi_output_dataloader failed to return even number of data samples (input+target)"
- midpoint = len(batch) // 2
- inputs, targets = batch[:midpoint], batch[midpoint:]
- loss_tuple = model(inputs, targets)
- expected_loss = torch.tensor(2.302734375, dtype=torch.half, device=model.device)
- for loss in loss_tuple:
- assert loss.shape == torch.Size([])
- assert loss.item() == approx(expected_loss.item())
- summed_loss = sum(loss_tuple)
- scaled_loss = model.backward(summed_loss)
- expected_scaled_loss = summed_loss.float() / grad_accumulation_steps
- assert scaled_loss.item() == approx(expected_scaled_loss.item())
- model.step()
- class TestThreeOutputModel(DistributedTest):
- world_size = 1
- def test(self, tmpdir):
- grad_accumulation_steps = 3
- micro_batch_size = 1
- world_size = 1
- config_dict = {
- "train_micro_batch_size_per_gpu": micro_batch_size,
- "gradient_accumulation_steps": grad_accumulation_steps,
- "train_batch_size": micro_batch_size * grad_accumulation_steps * world_size,
- "steps_per_print": 1,
- "optimizer": {
- "type": "Adam",
- "params": {
- "lr": 0.00015
- }
- },
- "fp16": {
- "enabled": True
- }
- }
- hidden_dim = 10
- weight_value = 0.1
- model = MultiOutputModel(hidden_dim, weight_value)
- model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters())
- total_samples = grad_accumulation_steps * micro_batch_size * 2
- data_loader = multi_output_dataloader(model=model,
- total_samples=total_samples,
- hidden_dim=hidden_dim,
- device=model.device,
- inputs=[1.0, 2.0, 3.0],
- targets=[1, 2, 3])
- for n, batch in enumerate(data_loader):
- assert len(batch) % 2 == 0, \
- f"multi_output_dataloader failed to return even number of data samples (input+target)"
- midpoint = len(batch) // 2
- inputs, targets = batch[:midpoint], batch[midpoint:]
- loss_tuple = model(inputs, targets)
- assert len(loss_tuple) == 3
- expected_loss = torch.tensor(2.302734375, dtype=torch.half, device=model.device)
- for loss in loss_tuple:
- assert loss.shape == torch.Size([])
- assert loss.item() == approx(expected_loss.item())
- summed_loss = sum(loss_tuple)
- scaled_loss = model.backward(summed_loss)
- expected_scaled_loss = summed_loss.float() / grad_accumulation_steps
- assert scaled_loss.item() == approx(expected_scaled_loss.item())
- model.step()
|