multinode_runner.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. '''Copyright The Microsoft DeepSpeed Team'''
  2. import os
  3. import sys
  4. import shutil
  5. import subprocess
  6. import warnings
  7. from shlex import split
  8. from abc import ABC, abstractmethod
  9. from deepspeed.accelerator import get_accelerator
  10. from ..utils import logger
  11. from .constants import PDSH_MAX_FAN_OUT, MVAPICH_TMP_HOSTFILE
  12. class MultiNodeRunner(ABC):
  13. def __init__(self, args, world_info_base64):
  14. self.args = args
  15. self.validate_args()
  16. self.user_arguments = self.parse_user_args()
  17. self.user_script = args.user_script
  18. self.world_info_base64 = world_info_base64
  19. self.exports = {}
  20. @abstractmethod
  21. def backend_exists(self):
  22. """Return whether the corresponding backend exists"""
  23. @abstractmethod
  24. def get_cmd(self, environment, active_resources):
  25. """Return the command to execute on node"""
  26. def add_export(self, key, var):
  27. self.exports[key.strip()] = var.strip()
  28. def parse_user_args(self):
  29. return self.args.user_args
  30. @property
  31. def name(self):
  32. """Return the name of the backend"""
  33. return self.__class__.__name__
  34. def validate_args(self):
  35. """Validate self.args"""
  36. class PDSHRunner(MultiNodeRunner):
  37. def __init__(self, args, world_info_base64):
  38. super().__init__(args, world_info_base64)
  39. def backend_exists(self):
  40. return shutil.which('pdsh')
  41. @property
  42. def name(self):
  43. return "pdsh"
  44. def parse_user_args(self):
  45. return list(
  46. map(lambda x: x if x.startswith("-") else f"'{x}'",
  47. self.args.user_args))
  48. def get_cmd(self, environment, active_resources):
  49. environment['PDSH_RCMD_TYPE'] = 'ssh'
  50. active_workers = ",".join(active_resources.keys())
  51. logger.info("Running on the following workers: %s" % active_workers)
  52. # PDSH flags for max node fan out and specific hosts to launch on
  53. # See https://linux.die.net/man/1/pdsh for flag details
  54. pdsh_cmd_args = [
  55. 'pdsh',
  56. '-S',
  57. '-f',
  58. str(PDSH_MAX_FAN_OUT),
  59. '-w',
  60. active_workers
  61. ] + split(self.args.launcher_args)
  62. exports = ""
  63. for key, val in self.exports.items():
  64. exports += "export {}={}; ".format(key, val)
  65. # https://linux.die.net/man/1/pdsh
  66. # %n will be replaced by pdsh command
  67. deepspeed_launch = [
  68. exports,
  69. f"cd {os.path.abspath('.')};",
  70. sys.executable,
  71. "-u",
  72. "-m",
  73. "deepspeed.launcher.launch",
  74. f'--world_info={self.world_info_base64}',
  75. "--node_rank=%n",
  76. f"--master_addr={self.args.master_addr}",
  77. f"--master_port={self.args.master_port}"
  78. ]
  79. if self.args.no_python:
  80. deepspeed_launch.append("--no_python")
  81. if self.args.module:
  82. deepspeed_launch.append("--module")
  83. if self.args.no_local_rank:
  84. deepspeed_launch.append("--no_local_rank")
  85. if self.args.save_pid:
  86. deepspeed_launch += ["--save_pid", f"{os.getpid()}"]
  87. if self.args.elastic_training:
  88. deepspeed_launch.append("--enable_elastic_training")
  89. deepspeed_launch.append(f"--max_elastic_nodes={self.args.max_elastic_nodes}")
  90. deepspeed_launch.append(f"--min_elastic_nodes={self.args.min_elastic_nodes}")
  91. cmd_to_search = [i + "\\" for i in deepspeed_launch[2:6]]
  92. kill_command = pdsh_cmd_args + ["pkill -f ", " ".join(cmd_to_search)[:-2]]
  93. return pdsh_cmd_args + deepspeed_launch + [self.user_script
  94. ] + self.user_arguments, kill_command
  95. class OpenMPIRunner(MultiNodeRunner):
  96. def __init__(self, args, world_info_base64, resource_pool):
  97. super().__init__(args, world_info_base64)
  98. self.resource_pool = resource_pool
  99. self.add_export('UCX_TLS', 'tcp')
  100. def backend_exists(self):
  101. #TODO: if IB is available we should suggestion mvapich
  102. return shutil.which('ompi_info')
  103. @property
  104. def name(self):
  105. return "openmpi"
  106. def validate_args(self):
  107. super().validate_args()
  108. #TODO: Allow for include/exclude at node-level but not gpu-level
  109. if self.args.include != "" or self.args.exclude != "":
  110. raise ValueError(
  111. f"{self.name} backend does not support worker include/exclusion")
  112. if self.args.num_nodes != -1 or self.args.num_gpus != -1:
  113. raise ValueError(
  114. f"{self.name} backend does not support limiting num nodes/gpus")
  115. def get_cmd(self, environment, active_resources):
  116. total_process_count = sum(self.resource_pool.values())
  117. mpirun_cmd = [
  118. 'mpirun',
  119. '-n',
  120. f'{total_process_count}',
  121. '-hostfile',
  122. f'{self.args.hostfile}',
  123. '--mca',
  124. 'btl',
  125. '^openib',
  126. '--mca',
  127. 'btl_tcp_if_include',
  128. 'eth0',
  129. ] + split(self.args.launcher_args)
  130. export_cmd = []
  131. for k, v in self.exports.items():
  132. export_cmd += ['-x', "{}={}".format(k, v)]
  133. python_exec = []
  134. if not self.args.no_python:
  135. python_exec = [sys.executable, "-u"]
  136. if self.args.module:
  137. python_exec.append("-m")
  138. return mpirun_cmd + export_cmd + python_exec + [self.user_script
  139. ] + self.user_arguments
  140. class MPICHRunner(MultiNodeRunner):
  141. def __init__(self, args, world_info_base64, resource_pool):
  142. super().__init__(args, world_info_base64)
  143. self.resource_pool = resource_pool
  144. def backend_exists(self):
  145. #TODO: if IB is available we should suggestion mpich
  146. return shutil.which('mpirun') #mpich_info
  147. @property
  148. def name(self):
  149. return "mpich"
  150. def validate_args(self):
  151. super().validate_args()
  152. #TODO: Allow for include/exclude at node-level but not gpu-level
  153. if self.args.include != "" or self.args.exclude != "":
  154. raise ValueError(
  155. f"{self.name} backend does not support worker include/exclusion")
  156. if self.args.num_nodes != -1 or self.args.num_gpus != -1:
  157. raise ValueError(
  158. f"{self.name} backend does not support limiting num nodes/gpus")
  159. def get_cmd(self, environment, active_resources):
  160. devices_per_node = self.resource_pool.values()
  161. total_process_count = sum(devices_per_node)
  162. process_per_node = list(devices_per_node)[0]
  163. mpirun_cmd = [
  164. 'mpirun',
  165. '-n',
  166. f'{total_process_count}',
  167. '-ppn',
  168. f'{process_per_node}',
  169. ] + split(self.args.launcher_args)
  170. export_cmd = []
  171. for k, v in self.exports.items():
  172. export_cmd += ['-x', "{}={}".format(k, v)]
  173. python_exec = []
  174. if not self.args.no_python:
  175. python_exec = [sys.executable, "-u"]
  176. if self.args.module:
  177. python_exec.append("-m")
  178. return mpirun_cmd + python_exec + [self.user_script] + self.user_arguments
  179. class SlurmRunner(MultiNodeRunner):
  180. def __init__(self, args, world_info_base64, resource_pool):
  181. super().__init__(args, world_info_base64)
  182. self.resource_pool = resource_pool
  183. def backend_exists(self):
  184. return shutil.which('sinfo')
  185. @property
  186. def name(self):
  187. return 'slurm'
  188. def get_cmd(self, environment, active_resources):
  189. assert not getattr(self.args, 'detect_nvlink_pairs', False), "slurm backend does not support remapping visible devices"
  190. total_process_count = sum(self.resource_pool.values())
  191. srun_cmd = [
  192. 'srun',
  193. '-n',
  194. f'{total_process_count}',
  195. ] + split(self.args.launcher_args)
  196. if getattr(self.args, 'slurm_comment', ''):
  197. srun_cmd += ['--comment', self.args.slurm_comment]
  198. if self.args.include != "":
  199. srun_cmd.append('--include')
  200. srun_cmd.append(f'{self.args.include}')
  201. if self.args.exclude != "":
  202. srun_cmd.append('--exclude')
  203. srun_cmd.append(f'{self.args.exclude}')
  204. if self.args.num_nodes > 0:
  205. srun_cmd.append('--nodes')
  206. srun_cmd.append(f'{self.args.num_nodes}')
  207. if self.args.num_gpus > 0:
  208. srun_cmd.append('--gpus')
  209. srun_cmd.append(f'{self.args.num_gpus}')
  210. exports = '--export=ALL'
  211. for key, val in self.exports.items():
  212. exports += f",{key}={val}"
  213. python_exec = [sys.executable, "-u"]
  214. command = srun_cmd + [exports] + python_exec + [self.user_script
  215. ] + self.user_arguments
  216. return command
  217. class MVAPICHRunner(MultiNodeRunner):
  218. def __init__(self, args, world_info_base64, resource_pool):
  219. super().__init__(args, world_info_base64)
  220. self.resource_pool = resource_pool
  221. # Disable the CMA kernel module, not available on Ubuntu systems
  222. self.add_export('MV2_SMP_USE_CMA', '0')
  223. # If we fail this will output more verbose logging
  224. self.add_export('MV2_DEBUG_SHOW_BACKTRACE', '1')
  225. # Enabled cuda-aware communication
  226. if get_accelerator().device_name() == 'cuda':
  227. self.add_export('MV2_USE_CUDA', '1')
  228. # Support deep learning frameworks: http://hidl.cse.ohio-state.edu/userguide/horovod/
  229. self.add_export('MV2_SUPPORT_DL', '1')
  230. # Support MPI_THREAD_MULTIPLE
  231. self.add_export('MV2_ENABLE_AFFINITY', '0')
  232. # Performance tuning flags for allgather
  233. self.add_export('MV2_INTER_ALLGATHER_TUNING', '5')
  234. self.add_export('MV2_CUDA_USE_NAIVE', '0')
  235. def backend_exists(self):
  236. #TODO: if IB is available we should suggestion mvapich
  237. mpiname_exists = shutil.which('mpiname')
  238. exists = False
  239. if not mpiname_exists:
  240. warnings.warn("mpiname does not exist, mvapich is not installed properly")
  241. else:
  242. results = subprocess.check_output('mpiname', shell=True)
  243. mpiname_results = results.decode('utf-8').strip()
  244. if "MVAPICH2-GDR" in mpiname_results:
  245. exists = True
  246. else:
  247. warnings.warn(
  248. f"Expected MVAPICH2-GDR as return for mpiname but received {mpiname_results}"
  249. )
  250. return exists
  251. @property
  252. def name(self):
  253. return "mvapich"
  254. def validate_args(self):
  255. super().validate_args()
  256. #TODO: Allow for include/exclude at node-level but not gpu-level
  257. if self.args.include != "" or self.args.exclude != "":
  258. raise ValueError(
  259. f"{self.name} backend does not support worker include/exclusion")
  260. if self.args.num_nodes != -1 or self.args.num_gpus != -1:
  261. raise ValueError(
  262. f"{self.name} backend does not support limiting num nodes/gpus")
  263. def get_cmd(self, environment, active_resources):
  264. devices_per_node = self.resource_pool.values()
  265. total_process_count = sum(devices_per_node)
  266. process_per_node = list(devices_per_node)[0]
  267. if not all([n == process_per_node for n in devices_per_node]):
  268. raise ValueError("mvapich requires same number of devices per node")
  269. with open(MVAPICH_TMP_HOSTFILE, 'w') as fd:
  270. for host in self.resource_pool.keys():
  271. fd.write(f'{host}\n')
  272. mpirun_cmd = [
  273. 'mpirun',
  274. '-np',
  275. f'{total_process_count}',
  276. '-ppn',
  277. f'{process_per_node}',
  278. '--hostfile',
  279. f'{MVAPICH_TMP_HOSTFILE}',
  280. ] + split(self.args.launcher_args)
  281. export_cmd = []
  282. for k, v in self.exports.items():
  283. export_cmd += ['-env', "{}={}".format(k, v)]
  284. python_exec = []
  285. if not self.args.no_python:
  286. python_exec = [sys.executable, "-u"]
  287. if self.args.module:
  288. python_exec.append("-m")
  289. return mpirun_cmd + export_cmd + python_exec + [self.user_script
  290. ] + self.user_arguments