multinode_runner.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447
  1. # Copyright (c) Microsoft Corporation.
  2. # SPDX-License-Identifier: Apache-2.0
  3. # DeepSpeed Team
  4. import os
  5. import sys
  6. import shutil
  7. import subprocess
  8. import warnings
  9. from shlex import split
  10. from abc import ABC, abstractmethod
  11. from deepspeed.accelerator import get_accelerator
  12. from ..utils import logger, get_numactl_cmd
  13. from .constants import PDSH_MAX_FAN_OUT, MVAPICH_TMP_HOSTFILE
  14. class MultiNodeRunner(ABC):
  15. def __init__(self, args, world_info_base64):
  16. self.args = args
  17. self.validate_args()
  18. self.user_arguments = self.parse_user_args()
  19. self.user_script = args.user_script
  20. self.world_info_base64 = world_info_base64
  21. self.exports = {}
  22. @abstractmethod
  23. def backend_exists(self):
  24. """Return whether the corresponding backend exists"""
  25. @abstractmethod
  26. def get_cmd(self, environment, active_resources):
  27. """Return the command to execute on node"""
  28. def add_export(self, key, var):
  29. self.exports[key.strip()] = var.strip()
  30. def parse_user_args(self):
  31. return self.args.user_args
  32. @property
  33. def name(self):
  34. """Return the name of the backend"""
  35. return self.__class__.__name__
  36. def validate_args(self):
  37. """Validate self.args"""
  38. class PDSHRunner(MultiNodeRunner):
  39. def __init__(self, args, world_info_base64):
  40. super().__init__(args, world_info_base64)
  41. def backend_exists(self):
  42. return shutil.which('pdsh')
  43. @property
  44. def name(self):
  45. return "pdsh"
  46. def parse_user_args(self):
  47. return list(map(lambda x: x if x.startswith("-") else f"'{x}'", self.args.user_args))
  48. def get_cmd(self, environment, active_resources):
  49. environment['PDSH_RCMD_TYPE'] = 'ssh'
  50. if self.args.ssh_port is not None: # only specify ssh port if it is specified
  51. environment["PDSH_SSH_ARGS_APPEND"] += f" -p {self.args.ssh_port}"
  52. active_workers = ",".join(active_resources.keys())
  53. logger.info("Running on the following workers: %s" % active_workers)
  54. # PDSH flags for max node fan out and specific hosts to launch on
  55. # See https://linux.die.net/man/1/pdsh for flag details
  56. pdsh_cmd_args = ['pdsh', '-S', '-f', str(PDSH_MAX_FAN_OUT), '-w', active_workers] + split(
  57. self.args.launcher_args)
  58. exports = ""
  59. for key, val in self.exports.items():
  60. exports += "export {}={}; ".format(key, val)
  61. # https://linux.die.net/man/1/pdsh
  62. # %n will be replaced by pdsh command
  63. deepspeed_launch = [
  64. exports, f"cd {os.path.abspath('.')};", sys.executable, "-u", "-m", "deepspeed.launcher.launch",
  65. f'--world_info={self.world_info_base64}', "--node_rank=%n", f"--master_addr={self.args.master_addr}",
  66. f"--master_port={self.args.master_port}"
  67. ]
  68. if self.args.no_python:
  69. deepspeed_launch.append("--no_python")
  70. if self.args.module:
  71. deepspeed_launch.append("--module")
  72. if self.args.no_local_rank:
  73. deepspeed_launch.append("--no_local_rank")
  74. if self.args.save_pid:
  75. deepspeed_launch += ["--save_pid", f"{os.getpid()}"]
  76. if self.args.elastic_training:
  77. deepspeed_launch.append("--enable_elastic_training")
  78. deepspeed_launch.append(f"--max_elastic_nodes={self.args.max_elastic_nodes}")
  79. deepspeed_launch.append(f"--min_elastic_nodes={self.args.min_elastic_nodes}")
  80. cmd_to_search = [i + "\\" for i in deepspeed_launch[2:6]]
  81. kill_command = pdsh_cmd_args + ["pkill -f ", " ".join(cmd_to_search)[:-2]]
  82. return pdsh_cmd_args + deepspeed_launch + [self.user_script] + self.user_arguments, kill_command, environment
  83. class OpenMPIRunner(MultiNodeRunner):
  84. def __init__(self, args, world_info_base64, resource_pool):
  85. super().__init__(args, world_info_base64)
  86. self.resource_pool = resource_pool
  87. self.add_export('UCX_TLS', 'tcp')
  88. def backend_exists(self):
  89. #TODO: if IB is available we should suggestion mvapich
  90. return shutil.which('ompi_info')
  91. @property
  92. def name(self):
  93. return "openmpi"
  94. def validate_args(self):
  95. super().validate_args()
  96. #TODO: Allow for include/exclude at node-level but not gpu-level
  97. if self.args.include != "" or self.args.exclude != "":
  98. raise ValueError(f"{self.name} backend does not support worker include/exclusion")
  99. if self.args.num_nodes != -1 or self.args.num_gpus != -1:
  100. raise ValueError(f"{self.name} backend does not support limiting num nodes/gpus")
  101. def get_cmd(self, environment, active_resources):
  102. total_process_count = sum(self.resource_pool.values())
  103. mpirun_cmd = [
  104. 'mpirun',
  105. '-n',
  106. f'{total_process_count}',
  107. '-hostfile',
  108. f'{self.args.hostfile}',
  109. '--mca',
  110. 'btl',
  111. '^openib',
  112. '--mca',
  113. 'btl_tcp_if_include',
  114. 'eth0',
  115. ] + split(self.args.launcher_args)
  116. export_cmd = []
  117. for k, v in self.exports.items():
  118. export_cmd += ['-x', "{}={}".format(k, v)]
  119. python_exec = []
  120. if not self.args.no_python:
  121. python_exec = [sys.executable, "-u"]
  122. if self.args.module:
  123. python_exec.append("-m")
  124. return mpirun_cmd + export_cmd + python_exec + [self.user_script] + self.user_arguments
  125. class MPICHRunner(MultiNodeRunner):
  126. def __init__(self, args, world_info_base64, resource_pool):
  127. super().__init__(args, world_info_base64)
  128. self.resource_pool = resource_pool
  129. def backend_exists(self):
  130. #TODO: if IB is available we should suggestion mpich
  131. return shutil.which('mpirun') #mpich_info
  132. @property
  133. def name(self):
  134. return "mpich"
  135. def validate_args(self):
  136. super().validate_args()
  137. #TODO: Allow for include/exclude at node-level but not gpu-level
  138. if self.args.include != "" or self.args.exclude != "":
  139. raise ValueError(f"{self.name} backend does not support worker include/exclusion")
  140. if self.args.num_nodes != -1 or self.args.num_gpus != -1:
  141. raise ValueError(f"{self.name} backend does not support limiting num nodes/gpus")
  142. def get_cmd(self, environment, active_resources):
  143. devices_per_node = self.resource_pool.values()
  144. total_process_count = sum(devices_per_node)
  145. process_per_node = list(devices_per_node)[0]
  146. if not all([n == process_per_node for n in devices_per_node]):
  147. raise ValueError("MPICH requires same number of devices per node")
  148. mpirun_cmd = [
  149. 'mpirun',
  150. ] + split(self.args.launcher_args)
  151. export_cmd = []
  152. for k, v in self.exports.items():
  153. export_cmd += ['-genv', "{}={}".format(k, v)]
  154. export_cmd += ['-genv', 'MASTER_ADDR', str(self.args.master_addr)]
  155. export_cmd += ['-genv', 'MASTER_PORT', str(self.args.master_port)]
  156. export_cmd += ['-genv', 'WORLD_SIZE', str(total_process_count)]
  157. export_cmd += ['-genv', 'LOCAL_SIZE', str(process_per_node)]
  158. hosts = list(self.resource_pool.keys())
  159. per_host_cmd = []
  160. host_id = 0
  161. host_count = 0
  162. for i in range(total_process_count):
  163. local_rank = i % process_per_node
  164. python_exec = []
  165. if not self.args.no_python:
  166. python_exec += [sys.executable, "-u"]
  167. if self.args.module:
  168. python_exec.append("-m")
  169. env_mapping = ['-env', 'RANK', str(i)]
  170. env_mapping += ['-env', 'LOCAL_RANK', str(local_rank)]
  171. if i == 0:
  172. per_host_cmd = ['-n', '1', '-host', hosts[host_id]
  173. ] + env_mapping + python_exec + [self.user_script] + self.user_arguments
  174. else:
  175. per_host_cmd = per_host_cmd + [':', '-n', '1', '-host', hosts[host_id]
  176. ] + env_mapping + python_exec + [self.user_script] + self.user_arguments
  177. host_count = host_count + 1
  178. if host_count == process_per_node:
  179. host_id = host_id + 1
  180. host_count = 0
  181. return mpirun_cmd + export_cmd + per_host_cmd
  182. class IMPIRunner(MultiNodeRunner):
  183. def __init__(self, args, world_info_base64, resource_pool):
  184. super().__init__(args, world_info_base64)
  185. self.resource_pool = resource_pool
  186. def backend_exists(self):
  187. #TODO: if IB is available we should suggestion mpich
  188. return shutil.which('mpirun') #mpich_info
  189. @property
  190. def name(self):
  191. return "impi"
  192. def validate_args(self):
  193. super().validate_args()
  194. #TODO: Allow for include/exclude at node-level but not gpu-level
  195. if self.args.include != "" or self.args.exclude != "":
  196. raise ValueError(f"{self.name} backend does not support worker include/exclusion")
  197. if self.args.num_nodes != -1 or self.args.num_gpus != -1:
  198. raise ValueError(f"{self.name} backend does not support limiting num nodes/gpus")
  199. def get_cmd(self, environment, active_resources):
  200. devices_per_node = self.resource_pool.values()
  201. total_process_count = sum(devices_per_node)
  202. process_per_node = list(devices_per_node)[0]
  203. if not all([n == process_per_node for n in devices_per_node]):
  204. raise ValueError("Intel MPI requires same number of devices per node")
  205. mpirun_cmd = [
  206. 'mpirun',
  207. '-ppn',
  208. f'{process_per_node}',
  209. ] + split(self.args.launcher_args)
  210. export_cmd = []
  211. for k, v in self.exports.items():
  212. export_cmd += ['-genv', f'{k}', f'{v}']
  213. if self.args.bind_cores_to_rank:
  214. cores_per_rank, _ = get_numactl_cmd(self.args.bind_core_list, process_per_node, 0)
  215. export_cmd += ['-genv', 'OMP_NUM_THREADS', str(cores_per_rank)]
  216. export_cmd += ['-genv', 'MASTER_ADDR', str(self.args.master_addr)]
  217. export_cmd += ['-genv', 'MASTER_PORT', str(self.args.master_port)]
  218. export_cmd += ['-genv', 'WORLD_SIZE', str(total_process_count)]
  219. export_cmd += ['-genv', 'LOCAL_SIZE', str(process_per_node)]
  220. export_cmd += ['-hosts']
  221. hosts = ""
  222. for i, host in enumerate(self.resource_pool.keys()):
  223. if i == 0:
  224. hosts = f"{host}"
  225. else:
  226. hosts += f",{host}"
  227. export_cmd += [hosts]
  228. per_host_cmd = []
  229. for i in range(total_process_count):
  230. local_rank = i % process_per_node
  231. python_exec = []
  232. if self.args.bind_cores_to_rank:
  233. _, numactl_cmd = get_numactl_cmd(self.args.bind_core_list, process_per_node, local_rank)
  234. python_exec += numactl_cmd
  235. if not self.args.no_python:
  236. python_exec += [sys.executable, "-u"]
  237. if self.args.module:
  238. python_exec.append("-m")
  239. env_mapping = ['-env', 'RANK', str(i)]
  240. env_mapping += ['-env', 'LOCAL_RANK', str(local_rank)]
  241. if i == 0:
  242. per_host_cmd = ['-n', '1'] + env_mapping + python_exec + [self.user_script] + self.user_arguments
  243. else:
  244. per_host_cmd = per_host_cmd + [':', '-n', '1'] + env_mapping + python_exec + [self.user_script
  245. ] + self.user_arguments
  246. print(mpirun_cmd + export_cmd + per_host_cmd)
  247. return mpirun_cmd + export_cmd + per_host_cmd
  248. class SlurmRunner(MultiNodeRunner):
  249. def __init__(self, args, world_info_base64, resource_pool):
  250. super().__init__(args, world_info_base64)
  251. self.resource_pool = resource_pool
  252. def backend_exists(self):
  253. return shutil.which('sinfo')
  254. @property
  255. def name(self):
  256. return 'slurm'
  257. def get_cmd(self, environment, active_resources):
  258. assert not getattr(self.args, 'detect_nvlink_pairs',
  259. False), "slurm backend does not support remapping visible devices"
  260. total_process_count = sum(self.resource_pool.values())
  261. srun_cmd = [
  262. 'srun',
  263. '-n',
  264. f'{total_process_count}',
  265. ] + split(self.args.launcher_args)
  266. if getattr(self.args, 'slurm_comment', ''):
  267. srun_cmd += ['--comment', self.args.slurm_comment]
  268. if self.args.include != "":
  269. srun_cmd.append('--include')
  270. srun_cmd.append(f'{self.args.include}')
  271. if self.args.exclude != "":
  272. srun_cmd.append('--exclude')
  273. srun_cmd.append(f'{self.args.exclude}')
  274. if self.args.num_nodes > 0:
  275. srun_cmd.append('--nodes')
  276. srun_cmd.append(f'{self.args.num_nodes}')
  277. if self.args.num_gpus > 0:
  278. srun_cmd.append('--gpus')
  279. srun_cmd.append(f'{self.args.num_gpus}')
  280. exports = '--export=ALL'
  281. for key, val in self.exports.items():
  282. exports += f",{key}={val}"
  283. python_exec = [sys.executable, "-u"]
  284. command = srun_cmd + [exports] + python_exec + [self.user_script] + self.user_arguments
  285. return command
  286. class MVAPICHRunner(MultiNodeRunner):
  287. def __init__(self, args, world_info_base64, resource_pool):
  288. super().__init__(args, world_info_base64)
  289. self.resource_pool = resource_pool
  290. # Disable the CMA kernel module, not available on Ubuntu systems
  291. self.add_export('MV2_SMP_USE_CMA', '0')
  292. # If we fail this will output more verbose logging
  293. self.add_export('MV2_DEBUG_SHOW_BACKTRACE', '1')
  294. # Enabled cuda-aware communication
  295. if get_accelerator().device_name() == 'cuda':
  296. self.add_export('MV2_USE_CUDA', '1')
  297. # Support deep learning frameworks: http://hidl.cse.ohio-state.edu/userguide/horovod/
  298. self.add_export('MV2_SUPPORT_DL', '1')
  299. # Support MPI_THREAD_MULTIPLE
  300. self.add_export('MV2_ENABLE_AFFINITY', '0')
  301. # Performance tuning flags for allgather
  302. self.add_export('MV2_INTER_ALLGATHER_TUNING', '5')
  303. self.add_export('MV2_CUDA_USE_NAIVE', '0')
  304. def backend_exists(self):
  305. #TODO: if IB is available we should suggestion mvapich
  306. mpiname_exists = shutil.which('mpiname')
  307. exists = False
  308. if not mpiname_exists:
  309. warnings.warn("mpiname does not exist, mvapich is not installed properly")
  310. else:
  311. results = subprocess.check_output('mpiname', shell=True)
  312. mpiname_results = results.decode('utf-8').strip()
  313. if "MVAPICH2-GDR" in mpiname_results:
  314. exists = True
  315. else:
  316. warnings.warn(f"Expected MVAPICH2-GDR as return for mpiname but received {mpiname_results}")
  317. return exists
  318. @property
  319. def name(self):
  320. return "mvapich"
  321. def validate_args(self):
  322. super().validate_args()
  323. #TODO: Allow for include/exclude at node-level but not gpu-level
  324. if self.args.include != "" or self.args.exclude != "":
  325. raise ValueError(f"{self.name} backend does not support worker include/exclusion")
  326. if self.args.num_nodes != -1 or self.args.num_gpus != -1:
  327. raise ValueError(f"{self.name} backend does not support limiting num nodes/gpus")
  328. def get_cmd(self, environment, active_resources):
  329. devices_per_node = self.resource_pool.values()
  330. total_process_count = sum(devices_per_node)
  331. process_per_node = list(devices_per_node)[0]
  332. if not all([n == process_per_node for n in devices_per_node]):
  333. raise ValueError("mvapich requires same number of devices per node")
  334. with open(MVAPICH_TMP_HOSTFILE, 'w') as fd:
  335. for host in self.resource_pool.keys():
  336. fd.write(f'{host}\n')
  337. mpirun_cmd = [
  338. 'mpirun',
  339. '-np',
  340. f'{total_process_count}',
  341. '-ppn',
  342. f'{process_per_node}',
  343. '--hostfile',
  344. f'{MVAPICH_TMP_HOSTFILE}',
  345. ] + split(self.args.launcher_args)
  346. export_cmd = []
  347. for k, v in self.exports.items():
  348. export_cmd += ['-env', "{}={}".format(k, v)]
  349. python_exec = []
  350. if not self.args.no_python:
  351. python_exec = [sys.executable, "-u"]
  352. if self.args.module:
  353. python_exec.append("-m")
  354. return mpirun_cmd + export_cmd + python_exec + [self.user_script] + self.user_arguments