123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- import torch
- import os
- import sys
- import math
- from .common import get_test_path
- from deepspeed.pipe import PipelineModule, LayerSpec
- def get_megatron_version():
- p = os.popen("pip list --format=columns | grep megatron-lm")
- pip_list = p.read()
- assert 'megatron-lm' in pip_list, 'Please install Megatron-LM before getting its version'
- ver_str = pip_list.split()[1]
- return float(ver_str[0])
- def get_gpt2_model(args_others, mp_size=1):
- from megatron.model import GPT2Model
- from megatron.initialize import initialize_megatron
- args_defaults = {
- 'vocab_file': get_test_path('gpt2-vocab.json'),
- 'merge_file': get_test_path('gpt2-merges.txt'),
- 'tokenizer_type': 'GPT2BPETokenizer',
- }
- args_defaults.update(args_others)
- # setting "make-vocab-size-divisible-by" to avoid word-embedding size change in resizing testing.
- sys.argv.extend([
- '--model-parallel-size',
- str(mp_size),
- '--make-vocab-size-divisible-by',
- str(1)
- ])
- initialize_megatron(args_defaults=args_defaults, ignore_unknown_args=True)
- model = GPT2Model(num_tokentypes=0, parallel_output=False)
- model.cuda()
- from torch.nn.parallel.distributed import DistributedDataParallel as torchDDP
- from megatron import mpu
- i = torch.cuda.current_device()
- model = torchDDP(model,
- device_ids=[i],
- output_device=i,
- process_group=mpu.get_data_parallel_group())
- return model
- class MockGPT2ModelPipe(PipelineModule):
- def __init__(self, num_layers, mp_size, args_others, topo, **kwargs):
- from megatron.initialize import initialize_megatron
- args_defaults = {
- 'vocab_file': get_test_path('gpt2-vocab.json'),
- 'merge_file': get_test_path('gpt2-merges.txt'),
- 'tokenizer_type': 'GPT2BPETokenizer',
- }
- args_defaults.update(args_others)
- # setting "make-vocab-size-divisible-by" to avoid word-embedding size change in resizing testing.
- sys.argv.extend([
- '--model-parallel-size',
- str(mp_size),
- '--make-vocab-size-divisible-by',
- str(1)
- ])
- initialize_megatron(args_defaults=args_defaults, ignore_unknown_args=True)
- from megatron.model.transformer import ParallelTransformerLayer
- class ParallelTransformerLayerPipe(ParallelTransformerLayer):
- def forward(self, args):
- # hardcode attn mask for testing, PP requires the attn_mask to be stashed
- attention_mask = torch.tensor([[True]],
- device=torch.cuda.current_device())
- return super().forward(args, attention_mask)
- layers = []
- for x in range(num_layers):
- layers.append(
- LayerSpec(ParallelTransformerLayerPipe,
- self.gpt2_attention_mask_func,
- self.init_method_normal(0.02),
- self.scaled_init_method_normal(0.02,
- num_layers),
- x))
- super().__init__(layers=layers,
- loss_fn=torch.nn.CrossEntropyLoss(),
- topology=topo,
- **kwargs)
- def gpt2_attention_mask_func(self, attention_scores, ltor_mask):
- attention_scores.masked_fill_(ltor_mask, -10000.0)
- return attention_scores
- def init_method_normal(self, sigma):
- """Init method based on N(0, sigma)."""
- def init_(tensor):
- return torch.nn.init.normal_(tensor, mean=0.0, std=sigma)
- return init_
- def scaled_init_method_normal(self, sigma, num_layers):
- """Init method based on N(0, sigma/sqrt(2*num_layers)."""
- std = sigma / math.sqrt(2.0 * num_layers)
- def init_(tensor):
- return torch.nn.init.normal_(tensor, mean=0.0, std=std)
- return init_
|