123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- # Copyright (c) Microsoft Corporation.
- # SPDX-License-Identifier: Apache-2.0
- # DeepSpeed Team
- import torch
- import os
- import deepspeed
- from deepspeed.accelerator import get_accelerator
- from unit.common import DistributedTest
- from unit.simple_model import Curriculum_SimpleModel, SimpleModel, random_dataloader, random_dataset
- class MPU():
- def __init__(self, tp_world_size):
- self.rank = deepspeed.comm.get_rank()
- self.world_size = deepspeed.comm.get_world_size()
- self.tp_world_size = tp_world_size
- for i in range(0, self.world_size, tp_world_size):
- ranks = range(i, i + tp_world_size)
- group = deepspeed.comm.new_group(ranks)
- if self.rank in ranks:
- self.tp_group = group
- for i in range(0, tp_world_size):
- ranks = range(i, self.world_size, tp_world_size)
- group = deepspeed.comm.new_group(ranks)
- if self.rank in ranks:
- self.dp_group = group
- def get_model_parallel_rank(self):
- return self.rank % self.tp_world_size
- def get_model_parallel_world_size(self):
- return self.tp_world_size
- def get_data_parallel_rank(self):
- return self.rank // self.tp_world_size
- def get_data_parallel_world_size(self):
- return self.world_size // self.tp_world_size
- def get_data_parallel_group(self):
- return self.dp_group
- def get_model_parallel_group(self):
- return self.tp_group
- class TestDataEfficiency(DistributedTest):
- world_size = 2
- def test_curriculum_learning(self):
- config_dict = {
- "train_batch_size": 2,
- "steps_per_print": 1,
- "optimizer": {
- "type": "Adam",
- "params": {
- "lr": 0.00015,
- "weight_decay": 0.01
- }
- },
- "gradient_clipping": 1.0,
- "fp16": {
- "enabled": True,
- "loss_scale": 0,
- "initial_scale_power": 16
- },
- "data_efficiency": {
- "enabled": True,
- "seed": 1234,
- "data_sampling": {
- "enabled": True,
- "num_workers": 0,
- "curriculum_learning": {
- "enabled": True,
- "data_cluster_path": "/tmp",
- "curriculum_metrics": {
- "dummy_metric": {
- "index_to_sample_path": "dummy",
- "index_to_metric_path": "dummy",
- "difficulty_type": "value",
- "clustering_type": "single_cluster",
- "min_difficulty": 2,
- "max_difficulty": 10,
- "schedule_type": "fixed_root",
- "schedule_config": {
- "total_curriculum_step": 8,
- "difficulty_step": 2,
- "root_degree": 1
- }
- }
- }
- }
- }
- }
- }
- def data_post_process(data, data_sampler_state_dict):
- assert 'dummy_metric' in data_sampler_state_dict['current_difficulties']
- return data
- hidden_dim = 10
- model = SimpleModel(hidden_dim)
- dataset = random_dataset(20, hidden_dim, torch.device('cpu'), dtype=torch.half)
- model, _, data_loader, _ = deepspeed.initialize(config=config_dict,
- model=model,
- training_data=dataset,
- model_parameters=model.parameters(),
- mpu=MPU(1))
- if model.mpu.get_data_parallel_rank() == 0 and not os.path.exists('/tmp'):
- os.makedirs('/tmp')
- model.set_data_post_process_func(data_post_process)
- for n, batch in enumerate(data_loader):
- x = batch[0].to(get_accelerator().current_device_name())
- y = batch[1].to(get_accelerator().current_device_name())
- loss = model(x, y)
- model.backward(loss)
- model.step()
- if n >= 10:
- break
- class TestLegacyCurriculumScheduler(DistributedTest):
- world_size = 2
- def test_fixed_discrete(self):
- config_dict = {
- "train_batch_size": 2,
- "steps_per_print": 1,
- "optimizer": {
- "type": "Adam",
- "params": {
- "lr": 0.00015,
- "weight_decay": 0.01
- }
- },
- "gradient_clipping": 1.0,
- "fp16": {
- "enabled": True,
- "loss_scale": 0,
- "initial_scale_power": 16
- },
- "curriculum_learning": {
- "enabled": True,
- "curriculum_type": "seqlen",
- "min_difficulty": 1,
- "max_difficulty": 5,
- "schedule_type": "fixed_discrete",
- "schedule_config": {
- "difficulty": [1, 2, 3, 4, 5],
- "max_step": [2, 4, 6, 8]
- }
- }
- }
- hidden_dim = 10
- ground_truths = {1: 1, 2: 1, 3: 2, 4: 2, 5: 3, 6: 3, 7: 4, 8: 4}
- model = Curriculum_SimpleModel(hidden_dim)
- model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters())
- data_loader = random_dataloader(model=model, total_samples=20, hidden_dim=hidden_dim, device=model.device)
- for n, batch in enumerate(data_loader):
- loss, seqlen = model(batch[0], batch[1])
- model.backward(loss)
- model.step()
- true_seqlen = 5
- if n + 1 in ground_truths:
- true_seqlen = ground_truths[n + 1]
- assert seqlen == true_seqlen, f"Incorrect curriculum schedule"
- def test_fixed_linear(self):
- config_dict = {
- "train_batch_size": 2,
- "steps_per_print": 1,
- "optimizer": {
- "type": "Adam",
- "params": {
- "lr": 0.00015,
- "weight_decay": 0.01
- }
- },
- "gradient_clipping": 1.0,
- "fp16": {
- "enabled": True,
- "loss_scale": 0,
- "initial_scale_power": 16
- },
- "curriculum_learning": {
- "enabled": True,
- "curriculum_type": "seqlen",
- "min_difficulty": 2,
- "max_difficulty": 10,
- "schedule_type": "fixed_linear",
- "schedule_config": {
- "total_curriculum_step": 8,
- "difficulty_step": 2
- }
- }
- }
- hidden_dim = 10
- ground_truths = {1: 2, 2: 4, 3: 4, 4: 6, 5: 6, 6: 8, 7: 8, 8: 10, 9: 10, 10: 10}
- model = Curriculum_SimpleModel(hidden_dim)
- model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters())
- data_loader = random_dataloader(model=model, total_samples=20, hidden_dim=hidden_dim, device=model.device)
- for n, batch in enumerate(data_loader):
- loss, seqlen = model(batch[0], batch[1])
- model.backward(loss)
- model.step()
- if n + 1 in ground_truths:
- true_seqlen = ground_truths[n + 1]
- assert seqlen == true_seqlen, f"Incorrect curriculum schedule"
|