123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- '''
- Copyright 2020 The Microsoft DeepSpeed Team
- '''
- import os
- import torch
- from datetime import timedelta
- from .logging import logger
- from ..constants import TORCH_DISTRIBUTED_DEFAULT_PORT, default_pg_timeout
- def init_distributed(dist_backend="nccl",
- auto_mpi_discovery=True,
- distributed_port=TORCH_DISTRIBUTED_DEFAULT_PORT,
- verbose=True,
- timeout=default_pg_timeout,
- init_method=None):
- """Initialize torch.distributed backend, potentially performing MPI discovery if needed
- Arguments:
- dist_backend: Optional (str). torch distributed backend, e.g., nccl, mpi, gloo
- auto_mpi_discovery Optional (bool). if distributed environment variables are not set, attempt to discover them from MPI
- distributed_port: Optional (int). torch distributed backend port
- verbose: Optional (bool). verbose logging
- timeout: Optional (timedelta). Timeout for operations executed against the process group. Default value equals 30 minutes.
- init_method: Optional (string). Torch distributed, URL specifying how to initialize the process group. Default is “env://” if no init_method or store is specified.
- """
- required_env = ["RANK", "WORLD_SIZE", "MASTER_ADDR", "MASTER_PORT", "LOCAL_RANK"]
- if auto_mpi_discovery and not all(map(lambda v: v in os.environ, required_env)):
- if verbose:
- logger.info(
- "Not using the DeepSpeed or torch.distributed launchers, attempting to detect MPI environment..."
- )
- if in_aml() and not in_dlts():
- patch_aml_env_for_torch_nccl_backend(verbose=verbose)
- else:
- mpi_discovery(distributed_port=distributed_port, verbose=verbose)
- if not torch.distributed.is_initialized():
- if verbose:
- logger.info(
- "Initializing torch distributed with backend: {}".format(dist_backend))
- assert isinstance(timeout, timedelta)
- torch.distributed.init_process_group(backend=dist_backend,
- timeout=timeout,
- init_method=init_method)
- def mpi_discovery(distributed_port=TORCH_DISTRIBUTED_DEFAULT_PORT, verbose=True):
- """
- Discovery MPI environment via mpi4py and map to relevant torch.distributed state
- """
- from mpi4py import MPI
- import subprocess
- comm = MPI.COMM_WORLD
- rank = comm.Get_rank()
- world_size = comm.Get_size()
- master_addr = None
- if rank == 0:
- hostname_cmd = ["hostname -I"]
- result = subprocess.check_output(hostname_cmd, shell=True)
- master_addr = result.decode('utf-8').split()[0]
- master_addr = comm.bcast(master_addr, root=0)
- # Determine local rank by assuming hostnames are unique
- proc_name = MPI.Get_processor_name()
- all_procs = comm.allgather(proc_name)
- local_rank = sum([i == proc_name for i in all_procs[:rank]])
- os.environ['RANK'] = str(rank)
- os.environ['WORLD_SIZE'] = str(world_size)
- os.environ['LOCAL_RANK'] = str(local_rank)
- os.environ['MASTER_ADDR'] = master_addr
- os.environ['MASTER_PORT'] = str(distributed_port)
- if verbose:
- logger.info(
- "Discovered MPI settings of world_rank={}, local_rank={}, world_size={}, master_addr={}, master_port={}"
- .format(os.environ['RANK'],
- os.environ['LOCAL_RANK'],
- os.environ['WORLD_SIZE'],
- os.environ['MASTER_ADDR'],
- os.environ['MASTER_PORT']))
- if torch.distributed.is_initialized():
- assert torch.distributed.get_rank() == rank, "MPI rank {} does not match torch rank {}".format(
- rank, torch.distributed.get_rank())
- assert torch.distributed.get_world_size() == world_size, "MPI world size {} does not match torch world size {}".format(
- world_size, torch.distributed.get_world_size())
- def in_aml():
- # Are we running inside an Azure Machine Learning (AML) environment?
- return 'AZUREML_EXPERIMENT_ID' in os.environ
- def in_dlts():
- # Are we running on a DLTS cluster?
- return 'DLTS_JOB_ID' in os.environ
- def patch_aml_env_for_torch_nccl_backend(master_port=6105, verbose=True):
- """Helper routine to get and set environment variables.
- This is adapted from Azure ML's documentation available from:
- https://azure.github.io/azureml-web/docs/cheatsheet/distributed-training/#environment-variables-from-openmpi
- """
- os.environ["RANK"] = os.environ["OMPI_COMM_WORLD_RANK"]
- os.environ["WORLD_SIZE"] = os.environ["OMPI_COMM_WORLD_SIZE"]
- single_node = int(os.environ["OMPI_COMM_WORLD_LOCAL_SIZE"]) == int(
- os.environ["WORLD_SIZE"])
- if not single_node:
- master_node_params = os.environ["AZ_BATCH_MASTER_NODE"].split(":")
- os.environ["MASTER_ADDR"] = master_node_params[0]
- # Do not overwrite master port with that defined in AZ_BATCH_MASTER_NODE
- if "MASTER_PORT" not in os.environ:
- os.environ["MASTER_PORT"] = str(master_port)
- else:
- os.environ["MASTER_ADDR"] = os.environ["AZ_BATCHAI_MPI_MASTER_NODE"]
- os.environ["MASTER_PORT"] = "54965"
- if verbose:
- logger.info("NCCL_SOCKET_IFNAME original value = {}".format(
- os.environ["NCCL_SOCKET_IFNAME"]))
- os.environ["NCCL_SOCKET_IFNAME"] = "^docker0,lo"
- os.environ['LOCAL_RANK'] = os.environ["OMPI_COMM_WORLD_LOCAL_RANK"]
- if verbose:
- logger.info(
- "Discovered AzureML settings of world_rank={}, local_rank={}, world_size={}, master_addr={}, master_port={}"
- .format(os.environ['RANK'],
- os.environ['LOCAL_RANK'],
- os.environ['WORLD_SIZE'],
- os.environ['MASTER_ADDR'],
- os.environ['MASTER_PORT']))
|