multinode_runner.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. import os
  2. import sys
  3. import shutil
  4. import subprocess
  5. import warnings
  6. from abc import ABC, abstractmethod
  7. from ..utils import logger
  8. from .constants import PDSH_MAX_FAN_OUT, MVAPICH_TMP_HOSTFILE
  9. class MultiNodeRunner(ABC):
  10. def __init__(self, args, world_info_base64):
  11. self.args = args
  12. self.validate_args()
  13. self.user_arguments = self.parse_user_args()
  14. self.user_script = args.user_script
  15. self.world_info_base64 = world_info_base64
  16. self.exports = {}
  17. @abstractmethod
  18. def backend_exists(self):
  19. """Return whether the corresponding backend exists"""
  20. @abstractmethod
  21. def get_cmd(self, environment, active_resources):
  22. """Return the command to execute on node"""
  23. def add_export(self, key, var):
  24. self.exports[key.strip()] = var.strip()
  25. def parse_user_args(self):
  26. return self.args.user_args
  27. @property
  28. def name(self):
  29. """Return the name of the backend"""
  30. return self.__class__.__name__
  31. def validate_args(self):
  32. """Validate self.args"""
  33. class PDSHRunner(MultiNodeRunner):
  34. def __init__(self, args, world_info_base64):
  35. super().__init__(args, world_info_base64)
  36. def backend_exists(self):
  37. return shutil.which('pdsh')
  38. @property
  39. def name(self):
  40. return "pdsh"
  41. def parse_user_args(self):
  42. return list(
  43. map(lambda x: x if x.startswith("-") else f"'{x}'",
  44. self.args.user_args))
  45. def get_cmd(self, environment, active_resources):
  46. environment['PDSH_RCMD_TYPE'] = 'ssh'
  47. active_workers = ",".join(active_resources.keys())
  48. logger.info("Running on the following workers: %s" % active_workers)
  49. # PDSH flags for max node fan out and specific hosts to launch on
  50. # See https://linux.die.net/man/1/pdsh for flag details
  51. pdsh_cmd_args = ['pdsh', '-f', str(PDSH_MAX_FAN_OUT), '-w', active_workers]
  52. exports = ""
  53. for key, val in self.exports.items():
  54. exports += f"export {key}={val}; "
  55. # https://linux.die.net/man/1/pdsh
  56. # %n will be replaced by pdsh command
  57. deepspeed_launch = [
  58. exports,
  59. f"cd {os.path.abspath('.')};",
  60. sys.executable,
  61. "-u",
  62. "-m",
  63. "deepspeed.launcher.launch",
  64. f'--world_info={self.world_info_base64}',
  65. "--node_rank=%n",
  66. f"--master_addr={self.args.master_addr}",
  67. f"--master_port={self.args.master_port}"
  68. ]
  69. return pdsh_cmd_args + deepspeed_launch + [self.user_script
  70. ] + self.user_arguments
  71. class OpenMPIRunner(MultiNodeRunner):
  72. def __init__(self, args, world_info_base64, resource_pool):
  73. super().__init__(args, world_info_base64)
  74. self.resource_pool = resource_pool
  75. self.add_export('UCX_TLS', 'tcp')
  76. def backend_exists(self):
  77. #TODO: if IB is available we should suggestion mvapich
  78. return shutil.which('ompi_info')
  79. @property
  80. def name(self):
  81. return "openmpi"
  82. def validate_args(self):
  83. super().validate_args()
  84. #TODO: Allow for include/exclude at node-level but not gpu-level
  85. if self.args.include != "" or self.args.exclude != "":
  86. raise ValueError(
  87. f"{self.name} backend does not support worker include/exclusion")
  88. if self.args.num_nodes != -1 or self.args.num_gpus != -1:
  89. raise ValueError(
  90. f"{self.name} backend does not support limiting num nodes/gpus")
  91. def get_cmd(self, environment, active_resources):
  92. total_process_count = sum(self.resource_pool.values())
  93. mpirun_cmd = [
  94. 'mpirun',
  95. '-n',
  96. f'{total_process_count}',
  97. '-hostfile',
  98. f'{self.args.hostfile}',
  99. '--mca',
  100. 'btl',
  101. '^openib',
  102. '--mca',
  103. 'btl_tcp_if_include',
  104. 'eth0',
  105. ]
  106. export_cmd = []
  107. for k, v in self.exports.items():
  108. export_cmd += ['-x', f'{k}={v}']
  109. python_exec = [sys.executable, "-u"]
  110. return mpirun_cmd + export_cmd + python_exec + [self.user_script
  111. ] + self.user_arguments
  112. class MVAPICHRunner(MultiNodeRunner):
  113. def __init__(self, args, world_info_base64, resource_pool):
  114. super().__init__(args, world_info_base64)
  115. self.resource_pool = resource_pool
  116. # Disable the CMA kernel module, not available on Ubuntu systems
  117. self.add_export('MV2_SMP_USE_CMA', '0')
  118. # If we fail this will output more verbose logging
  119. self.add_export('MV2_DEBUG_SHOW_BACKTRACE', '1')
  120. # Enabled cuda-aware communication
  121. self.add_export('MV2_USE_CUDA', '1')
  122. # Support deep learning frameworks: http://hidl.cse.ohio-state.edu/userguide/horovod/
  123. self.add_export('MV2_SUPPORT_DL', '1')
  124. # Support MPI_THREAD_MULTIPLE
  125. self.add_export('MV2_ENABLE_AFFINITY', '0')
  126. # Performance tuning flags for allgather
  127. self.add_export('MV2_INTER_ALLGATHER_TUNING', '5')
  128. self.add_export('MV2_CUDA_USE_NAIVE', '0')
  129. def backend_exists(self):
  130. #TODO: if IB is available we should suggestion mvapich
  131. mpiname_exists = shutil.which('mpiname')
  132. exists = False
  133. if not mpiname_exists:
  134. warnings.warn("mpiname does not exist, mvapich is not installed properly")
  135. else:
  136. results = subprocess.check_output('mpiname', shell=True)
  137. mpiname_results = results.decode('utf-8').strip()
  138. if "MVAPICH2-GDR" in mpiname_results:
  139. exists = True
  140. else:
  141. warnings.warn(
  142. f"Expected MVAPICH2-GDR as return for mpiname but received {mpiname_results}"
  143. )
  144. return exists
  145. @property
  146. def name(self):
  147. return "mvapich"
  148. def validate_args(self):
  149. super().validate_args()
  150. #TODO: Allow for include/exclude at node-level but not gpu-level
  151. if self.args.include != "" or self.args.exclude != "":
  152. raise ValueError(
  153. f"{self.name} backend does not support worker include/exclusion")
  154. if self.args.num_nodes != -1 or self.args.num_gpus != -1:
  155. raise ValueError(
  156. f"{self.name} backend does not support limiting num nodes/gpus")
  157. def get_cmd(self, environment, active_resources):
  158. devices_per_node = self.resource_pool.values()
  159. total_process_count = sum(devices_per_node)
  160. process_per_node = list(devices_per_node)[0]
  161. if not all([n == process_per_node for n in devices_per_node]):
  162. raise ValueError("mvapich requires same number of devices per node")
  163. with open(MVAPICH_TMP_HOSTFILE, 'w') as fd:
  164. for host in self.resource_pool.keys():
  165. fd.write(f'{host}\n')
  166. mpirun_cmd = [
  167. 'mpirun',
  168. '-np',
  169. f'{total_process_count}',
  170. '-ppn',
  171. f'{process_per_node}',
  172. '--hostfile',
  173. f'{MVAPICH_TMP_HOSTFILE}',
  174. ]
  175. export_cmd = []
  176. for k, v in self.exports.items():
  177. export_cmd += ['-env', f'{k}={v}']
  178. python_exec = [sys.executable, "-u"]
  179. return mpirun_cmd + export_cmd + python_exec + [self.user_script
  180. ] + self.user_arguments