123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- # Copyright (c) Microsoft Corporation.
- # SPDX-License-Identifier: Apache-2.0
- # DeepSpeed Team
- import os
- import sys
- import argparse
- import subprocess
- from deepspeed.utils import logger
- from deepspeed.launcher.constants import MPICH_LAUNCHER
- def parse_args(args=None):
- parser = argparse.ArgumentParser(description="DeepSpeed launcher helper to map environment variables for"
- "multi-node/multi-gpu training jobs.",
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
- parser.add_argument("--launcher",
- default=MPICH_LAUNCHER,
- type=str,
- help="(optional) choose launcher backend for multi-node "
- "training. Options currently include MPICH.")
- parser.add_argument("--module",
- action="store_true",
- help="Change each process to interpret the launch "
- "script as a Python module, executing with the same "
- "behavior as 'python -m'.")
- parser.add_argument("--no_python",
- action="store_true",
- help="Skip prepending the training script with "
- "'python' - just execute it directly.")
- parser.add_argument("user_script", type=str, help="User script to launch, followed by any required "
- "arguments.")
- parser.add_argument('user_args', nargs=argparse.REMAINDER)
- parser.add_argument("--bind_cores_to_rank",
- action="store_true",
- help="Bind each rank to different cores of the host")
- parser.add_argument("--bind_core_list",
- type=str,
- default=None,
- help="List of cores to bind to with comma separated list of "
- "numbers and range. i.e. 1,3-5,7 => [1,3,4,5,7]. When not "
- "specified, all cores on system would be used rank binding")
- return parser.parse_args(args=args)
- def env_mapping(env, rank_name_list=None, local_rank_name_list=None):
- rank = None
- for rank_name in rank_name_list:
- if rank_name in env:
- if rank == None:
- rank = env.get(rank_name)
- elif rank != env.get(rank_name):
- raise EnvironmentError(f"rank number doesn't match!")
- if rank == None:
- raise EnvironmentError(f"rank number is not in current env!")
- env['RANK'] = rank
- local_rank = None
- for local_rank_name in local_rank_name_list:
- if local_rank_name in env:
- if local_rank == None:
- local_rank = env.get(local_rank_name)
- elif local_rank != env.get(local_rank_name):
- raise EnvironmentError(f"local_rank number doesn't match!")
- if local_rank == None:
- raise EnvironmentError(f"rank number is not in current env!")
- env['LOCAL_RANK'] = local_rank
- return env
- def main(args=None):
- args = parse_args(args)
- env = os.environ.copy()
- args.launcher = args.launcher.lower()
- if args.launcher == MPICH_LAUNCHER:
- rank_name_list = ["PMIX_RANK"] + ["PMI_RANK"]
- local_rank_name_list = ["PALS_LOCAL_RANKID"] + ["MPI_LOCALRANKID"]
- env = env_mapping(env, rank_name_list=rank_name_list, local_rank_name_list=local_rank_name_list)
- else:
- raise NotImplementedError(f"Unknown launcher {args.launcher}")
- python_exec = []
- if not args.no_python:
- python_exec += [sys.executable, "-u"]
- if args.module:
- python_exec.append("-m")
- cmd = python_exec + [args.user_script] + args.user_args
- logger.info(f"launcher_helper cmd = {' '.join(cmd)}")
- result = subprocess.Popen(cmd, env=env, close_fds=False)
- result.wait()
- if __name__ == "__main__":
- main()
|