multinode_runner.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459
  1. # Copyright (c) Microsoft Corporation.
  2. # SPDX-License-Identifier: Apache-2.0
  3. # DeepSpeed Team
  4. import os
  5. import sys
  6. import shutil
  7. import subprocess
  8. import warnings
  9. from shlex import split
  10. from abc import ABC, abstractmethod
  11. from deepspeed.accelerator import get_accelerator
  12. from ..utils import logger, get_numactl_cmd
  13. from .constants import PDSH_MAX_FAN_OUT, MVAPICH_TMP_HOSTFILE
  14. class MultiNodeRunner(ABC):
  15. def __init__(self, args, world_info_base64):
  16. self.args = args
  17. self.validate_args()
  18. self.user_arguments = self.parse_user_args()
  19. self.user_script = args.user_script
  20. self.world_info_base64 = world_info_base64
  21. self.exports = {}
  22. @abstractmethod
  23. def backend_exists(self):
  24. """Return whether the corresponding backend exists"""
  25. @abstractmethod
  26. def get_cmd(self, environment, active_resources):
  27. """Return the command to execute on node"""
  28. def add_export(self, key, var):
  29. self.exports[key.strip()] = var.strip()
  30. def parse_user_args(self):
  31. return self.args.user_args
  32. @property
  33. def name(self):
  34. """Return the name of the backend"""
  35. return self.__class__.__name__
  36. def validate_args(self):
  37. """Validate self.args"""
  38. class PDSHRunner(MultiNodeRunner):
  39. def __init__(self, args, world_info_base64):
  40. super().__init__(args, world_info_base64)
  41. def backend_exists(self):
  42. return shutil.which('pdsh')
  43. def parse_user_args(self):
  44. processed_args = []
  45. for arg in self.args.user_args:
  46. # With pdsh, if we are passing a string as an argument, it will get
  47. # split on whitespace. To avoid this and support strings that
  48. # contain '"', we do this extra processing step:
  49. if " " in arg:
  50. arg = '"{}"'.format(arg.replace('"', '\\"'))
  51. processed_args.append(arg)
  52. return processed_args
  53. @property
  54. def name(self):
  55. return "pdsh"
  56. def get_cmd(self, environment, active_resources):
  57. environment['PDSH_RCMD_TYPE'] = 'ssh'
  58. if self.args.ssh_port is not None: # only specify ssh port if it is specified
  59. environment["PDSH_SSH_ARGS_APPEND"] += f" -p {self.args.ssh_port}"
  60. active_workers = ",".join(active_resources.keys())
  61. logger.info("Running on the following workers: %s" % active_workers)
  62. # PDSH flags for max node fan out and specific hosts to launch on
  63. # See https://linux.die.net/man/1/pdsh for flag details
  64. pdsh_cmd_args = ['pdsh', '-S', '-f', str(PDSH_MAX_FAN_OUT), '-w', active_workers] + split(
  65. self.args.launcher_args)
  66. exports = ""
  67. for key, val in self.exports.items():
  68. exports += "export {}={}; ".format(key, val)
  69. # https://linux.die.net/man/1/pdsh
  70. # %n will be replaced by pdsh command
  71. deepspeed_launch = [
  72. exports, f"cd {os.path.abspath('.')};", sys.executable, "-u", "-m", "deepspeed.launcher.launch",
  73. f'--world_info={self.world_info_base64}', "--node_rank=%n", f"--master_addr={self.args.master_addr}",
  74. f"--master_port={self.args.master_port}"
  75. ]
  76. if self.args.no_python:
  77. deepspeed_launch.append("--no_python")
  78. if self.args.module:
  79. deepspeed_launch.append("--module")
  80. if self.args.no_local_rank:
  81. deepspeed_launch.append("--no_local_rank")
  82. if self.args.save_pid:
  83. deepspeed_launch += ["--save_pid", f"{os.getpid()}"]
  84. if self.args.elastic_training:
  85. deepspeed_launch.append("--enable_elastic_training")
  86. deepspeed_launch.append(f"--max_elastic_nodes={self.args.max_elastic_nodes}")
  87. deepspeed_launch.append(f"--min_elastic_nodes={self.args.min_elastic_nodes}")
  88. cmd_to_search = [i + "\\" for i in deepspeed_launch[2:6]]
  89. kill_command = pdsh_cmd_args + ["pkill -f ", " ".join(cmd_to_search)[:-2]]
  90. return pdsh_cmd_args + deepspeed_launch + [self.user_script] + self.user_arguments, kill_command, environment
  91. class OpenMPIRunner(MultiNodeRunner):
  92. def __init__(self, args, world_info_base64, resource_pool):
  93. super().__init__(args, world_info_base64)
  94. self.resource_pool = resource_pool
  95. self.add_export('UCX_TLS', 'tcp')
  96. def backend_exists(self):
  97. #TODO: if IB is available we should suggestion mvapich
  98. return shutil.which('ompi_info')
  99. @property
  100. def name(self):
  101. return "openmpi"
  102. def validate_args(self):
  103. super().validate_args()
  104. #TODO: Allow for include/exclude at node-level but not gpu-level
  105. if self.args.include != "" or self.args.exclude != "":
  106. raise ValueError(f"{self.name} backend does not support worker include/exclusion")
  107. if self.args.num_nodes != -1 or self.args.num_gpus != -1:
  108. raise ValueError(f"{self.name} backend does not support limiting num nodes/gpus")
  109. def get_cmd(self, environment, active_resources):
  110. total_process_count = sum(self.resource_pool.values())
  111. mpirun_cmd = [
  112. 'mpirun',
  113. '-n',
  114. f'{total_process_count}',
  115. '-hostfile',
  116. f'{self.args.hostfile}',
  117. '--mca',
  118. 'btl',
  119. '^openib',
  120. '--mca',
  121. 'btl_tcp_if_include',
  122. 'eth0',
  123. ] + split(self.args.launcher_args)
  124. export_cmd = []
  125. for k, v in self.exports.items():
  126. export_cmd += ['-x', "{}={}".format(k, v)]
  127. python_exec = []
  128. if not self.args.no_python:
  129. python_exec = [sys.executable, "-u"]
  130. if self.args.module:
  131. python_exec.append("-m")
  132. return mpirun_cmd + export_cmd + python_exec + [self.user_script] + self.user_arguments
  133. class MPICHRunner(MultiNodeRunner):
  134. def __init__(self, args, world_info_base64, resource_pool):
  135. super().__init__(args, world_info_base64)
  136. self.resource_pool = resource_pool
  137. def backend_exists(self):
  138. #TODO: if IB is available we should suggestion mpich
  139. return shutil.which('mpirun') #mpich_info
  140. @property
  141. def name(self):
  142. return "mpich"
  143. def validate_args(self):
  144. super().validate_args()
  145. #TODO: Allow for include/exclude at node-level but not gpu-level
  146. if self.args.include != "" or self.args.exclude != "":
  147. raise ValueError(f"{self.name} backend does not support worker include/exclusion")
  148. if self.args.num_nodes != -1 or self.args.num_gpus != -1:
  149. raise ValueError(f"{self.name} backend does not support limiting num nodes/gpus")
  150. def get_cmd(self, environment, active_resources):
  151. devices_per_node = self.resource_pool.values()
  152. total_process_count = sum(devices_per_node)
  153. process_per_node = list(devices_per_node)[0]
  154. if not all([n == process_per_node for n in devices_per_node]):
  155. raise ValueError("MPICH requires same number of devices per node")
  156. mpirun_cmd = [
  157. 'mpirun',
  158. '-n',
  159. f'{total_process_count}',
  160. '-ppn',
  161. f'{process_per_node}',
  162. ] + split(self.args.launcher_args)
  163. export_cmd = []
  164. for k, v in self.exports.items():
  165. export_cmd += ['-genv', "{}={}".format(k, v)]
  166. export_cmd += ['-genv', 'MASTER_ADDR', str(self.args.master_addr)]
  167. export_cmd += ['-genv', 'MASTER_PORT', str(self.args.master_port)]
  168. export_cmd += ['-genv', 'WORLD_SIZE', str(total_process_count)]
  169. export_cmd += ['-genv', 'LOCAL_SIZE', str(process_per_node)]
  170. export_cmd += ['-hosts']
  171. hosts = ""
  172. for i, host in enumerate(self.resource_pool.keys()):
  173. if i == 0:
  174. hosts = f"{host}"
  175. else:
  176. hosts += f",{host}"
  177. export_cmd += [hosts]
  178. helper_args = ["--launcher"] + [self.args.launcher]
  179. python_exec = []
  180. if not self.args.no_python:
  181. python_exec += [sys.executable, "-u"]
  182. if self.args.module:
  183. python_exec.append("-m")
  184. helper_args.append("--module")
  185. else:
  186. helper_args.append("--no_python")
  187. helper_cmd = str(os.path.dirname(os.path.realpath(__file__))) + '/launcher_helper.py'
  188. helper_cmd = [helper_cmd] + helper_args + [self.user_script] + self.user_arguments
  189. return mpirun_cmd + export_cmd + python_exec + helper_cmd
  190. class IMPIRunner(MultiNodeRunner):
  191. def __init__(self, args, world_info_base64, resource_pool):
  192. super().__init__(args, world_info_base64)
  193. self.resource_pool = resource_pool
  194. def backend_exists(self):
  195. #TODO: if IB is available we should suggestion mpich
  196. return shutil.which('mpirun') #mpich_info
  197. @property
  198. def name(self):
  199. return "impi"
  200. def validate_args(self):
  201. super().validate_args()
  202. #TODO: Allow for include/exclude at node-level but not gpu-level
  203. if self.args.include != "" or self.args.exclude != "":
  204. raise ValueError(f"{self.name} backend does not support worker include/exclusion")
  205. if self.args.num_nodes != -1 or self.args.num_gpus != -1:
  206. raise ValueError(f"{self.name} backend does not support limiting num nodes/gpus")
  207. def get_cmd(self, environment, active_resources):
  208. devices_per_node = self.resource_pool.values()
  209. total_process_count = sum(devices_per_node)
  210. process_per_node = list(devices_per_node)[0]
  211. if not all([n == process_per_node for n in devices_per_node]):
  212. raise ValueError("Intel MPI requires same number of devices per node")
  213. mpirun_cmd = [
  214. 'mpirun',
  215. '-ppn',
  216. f'{process_per_node}',
  217. ] + split(self.args.launcher_args)
  218. export_cmd = []
  219. for k, v in self.exports.items():
  220. export_cmd += ['-genv', f'{k}', f'{v}']
  221. if self.args.bind_cores_to_rank:
  222. cores_per_rank, _ = get_numactl_cmd(self.args.bind_core_list, process_per_node, 0)
  223. export_cmd += ['-genv', 'OMP_NUM_THREADS', str(cores_per_rank)]
  224. export_cmd += ['-genv', 'MASTER_ADDR', str(self.args.master_addr)]
  225. export_cmd += ['-genv', 'MASTER_PORT', str(self.args.master_port)]
  226. export_cmd += ['-genv', 'WORLD_SIZE', str(total_process_count)]
  227. export_cmd += ['-genv', 'LOCAL_SIZE', str(process_per_node)]
  228. # turn off IMPI core binding, use deepspeed's own core binding
  229. export_cmd += ['-genv', 'I_MPI_PIN', '0']
  230. export_cmd += ['-hosts']
  231. hosts = ""
  232. for i, host in enumerate(self.resource_pool.keys()):
  233. if i == 0:
  234. hosts = f"{host}"
  235. else:
  236. hosts += f",{host}"
  237. export_cmd += [hosts]
  238. per_host_cmd = []
  239. for i in range(total_process_count):
  240. local_rank = i % process_per_node
  241. python_exec = []
  242. if self.args.bind_cores_to_rank:
  243. _, numactl_cmd = get_numactl_cmd(self.args.bind_core_list, process_per_node, local_rank)
  244. python_exec += numactl_cmd
  245. if not self.args.no_python:
  246. python_exec += [sys.executable, "-u"]
  247. if self.args.module:
  248. python_exec.append("-m")
  249. env_mapping = ['-env', 'RANK', str(i)]
  250. env_mapping += ['-env', 'LOCAL_RANK', str(local_rank)]
  251. if i == 0:
  252. per_host_cmd = ['-n', '1'] + env_mapping + python_exec + [self.user_script] + self.user_arguments
  253. else:
  254. per_host_cmd = per_host_cmd + [':', '-n', '1'] + env_mapping + python_exec + [self.user_script
  255. ] + self.user_arguments
  256. print(mpirun_cmd + export_cmd + per_host_cmd)
  257. return mpirun_cmd + export_cmd + per_host_cmd
  258. class SlurmRunner(MultiNodeRunner):
  259. def __init__(self, args, world_info_base64, resource_pool):
  260. super().__init__(args, world_info_base64)
  261. self.resource_pool = resource_pool
  262. def backend_exists(self):
  263. return shutil.which('sinfo')
  264. @property
  265. def name(self):
  266. return 'slurm'
  267. def get_cmd(self, environment, active_resources):
  268. assert not getattr(self.args, 'detect_nvlink_pairs',
  269. False), "slurm backend does not support remapping visible devices"
  270. total_process_count = sum(self.resource_pool.values())
  271. srun_cmd = [
  272. 'srun',
  273. '-n',
  274. f'{total_process_count}',
  275. ] + split(self.args.launcher_args)
  276. if getattr(self.args, 'slurm_comment', ''):
  277. srun_cmd += ['--comment', self.args.slurm_comment]
  278. if self.args.include != "":
  279. srun_cmd.append('--include')
  280. srun_cmd.append(f'{self.args.include}')
  281. if self.args.exclude != "":
  282. srun_cmd.append('--exclude')
  283. srun_cmd.append(f'{self.args.exclude}')
  284. if self.args.num_nodes > 0:
  285. srun_cmd.append('--nodes')
  286. srun_cmd.append(f'{self.args.num_nodes}')
  287. if self.args.num_gpus > 0:
  288. srun_cmd.append('--gpus')
  289. srun_cmd.append(f'{self.args.num_gpus}')
  290. exports = '--export=ALL'
  291. for key, val in self.exports.items():
  292. exports += f",{key}={val}"
  293. python_exec = [sys.executable, "-u"]
  294. command = srun_cmd + [exports] + python_exec + [self.user_script] + self.user_arguments
  295. return command
  296. class MVAPICHRunner(MultiNodeRunner):
  297. def __init__(self, args, world_info_base64, resource_pool):
  298. super().__init__(args, world_info_base64)
  299. self.resource_pool = resource_pool
  300. # Disable the CMA kernel module, not available on Ubuntu systems
  301. self.add_export('MV2_SMP_USE_CMA', '0')
  302. # If we fail this will output more verbose logging
  303. self.add_export('MV2_DEBUG_SHOW_BACKTRACE', '1')
  304. # Enabled cuda-aware communication
  305. if get_accelerator().device_name() == 'cuda':
  306. self.add_export('MV2_USE_CUDA', '1')
  307. # Support deep learning frameworks: http://hidl.cse.ohio-state.edu/userguide/horovod/
  308. self.add_export('MV2_SUPPORT_DL', '1')
  309. # Support MPI_THREAD_MULTIPLE
  310. self.add_export('MV2_ENABLE_AFFINITY', '0')
  311. # Performance tuning flags for allgather
  312. self.add_export('MV2_INTER_ALLGATHER_TUNING', '5')
  313. self.add_export('MV2_CUDA_USE_NAIVE', '0')
  314. def backend_exists(self):
  315. #TODO: if IB is available we should suggestion mvapich
  316. mpiname_exists = shutil.which('mpiname')
  317. exists = False
  318. if not mpiname_exists:
  319. warnings.warn("mpiname does not exist, mvapich is not installed properly")
  320. else:
  321. results = subprocess.check_output('mpiname', shell=True)
  322. mpiname_results = results.decode('utf-8').strip()
  323. if "MVAPICH2-GDR" in mpiname_results:
  324. exists = True
  325. else:
  326. warnings.warn(f"Expected MVAPICH2-GDR as return for mpiname but received {mpiname_results}")
  327. return exists
  328. @property
  329. def name(self):
  330. return "mvapich"
  331. def validate_args(self):
  332. super().validate_args()
  333. #TODO: Allow for include/exclude at node-level but not gpu-level
  334. if self.args.include != "" or self.args.exclude != "":
  335. raise ValueError(f"{self.name} backend does not support worker include/exclusion")
  336. if self.args.num_nodes != -1 or self.args.num_gpus != -1:
  337. raise ValueError(f"{self.name} backend does not support limiting num nodes/gpus")
  338. def get_cmd(self, environment, active_resources):
  339. devices_per_node = self.resource_pool.values()
  340. total_process_count = sum(devices_per_node)
  341. process_per_node = list(devices_per_node)[0]
  342. if not all([n == process_per_node for n in devices_per_node]):
  343. raise ValueError("mvapich requires same number of devices per node")
  344. with open(MVAPICH_TMP_HOSTFILE, 'w') as fd:
  345. for host in self.resource_pool.keys():
  346. fd.write(f'{host}\n')
  347. mpirun_cmd = [
  348. 'mpirun',
  349. '-np',
  350. f'{total_process_count}',
  351. '-ppn',
  352. f'{process_per_node}',
  353. '--hostfile',
  354. f'{MVAPICH_TMP_HOSTFILE}',
  355. ] + split(self.args.launcher_args)
  356. export_cmd = []
  357. for k, v in self.exports.items():
  358. export_cmd += ['-env', "{}={}".format(k, v)]
  359. python_exec = []
  360. if not self.args.no_python:
  361. python_exec = [sys.executable, "-u"]
  362. if self.args.module:
  363. python_exec.append("-m")
  364. return mpirun_cmd + export_cmd + python_exec + [self.user_script] + self.user_arguments