numa.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. # Copyright (c) Microsoft Corporation.
  2. # SPDX-License-Identifier: Apache-2.0
  3. # DeepSpeed Team
  4. # return a list of list for cores to numa mapping
  5. # [
  6. # [ cores for numa 0 ]
  7. # [ cores belong to numa 1 ]
  8. # ...
  9. # ]
  10. import distutils
  11. import os
  12. import psutil
  13. import subprocess
  14. # return a list of list for cores to numa mapping
  15. # [
  16. # [ cores for numa 0 ]
  17. # [ cores belong to numa 1 ]
  18. # ...
  19. # ]
  20. def get_numa_cores():
  21. ret = []
  22. output = subprocess.check_output(['numactl', '--hardware']).decode("utf-8")
  23. lines = output.split('\n')
  24. for line in lines:
  25. if line.startswith('available:'):
  26. num_numas = int(line.split(' ')[1])
  27. break
  28. for numa in range(num_numas):
  29. for line in lines:
  30. if line.startswith(f'node {numa} cpus:'):
  31. cores = line.split(' ')[3:]
  32. ret.append([int(core) for core in cores])
  33. return ret
  34. def check_for_numactl_pkg():
  35. libs = dict(
  36. dpkg=["-l", "numactl", "apt"],
  37. pacman=["-Q", "numactl", "pacman"],
  38. rpm=["-q", "numactl", "yum"],
  39. )
  40. found = False
  41. for pkgmgr, data in libs.items():
  42. flag, lib, tool = data
  43. path = distutils.spawn.find_executable(pkgmgr)
  44. if path is not None:
  45. cmd = f"{pkgmgr} {flag} {lib}"
  46. result = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
  47. if result.wait() == 0:
  48. found = True
  49. else:
  50. print(f"please install the {lib} package with {tool}")
  51. break
  52. return found
  53. def parse_range(rng):
  54. try:
  55. value = int(rng)
  56. return range(value, value + 1)
  57. except ValueError:
  58. # value is not a single number
  59. parts = rng.split('-')
  60. if len(parts) != 2:
  61. raise ValueError("Bad range: '%s', range must be either a number or two number separated by dash" %
  62. (rng, ))
  63. start = int(parts[0])
  64. end = int(parts[1])
  65. if start > end:
  66. raise ValueError("Bad range: '%s', range end must larger than or equal to start" % (rng, ))
  67. return range(start, end + 1)
  68. # parse comma and dash separated range list into list
  69. # i.e. "0,2-4,6" --> [0, 2, 3, 4, 6]
  70. # rules:
  71. # 1. Range list number be comma separated, each item are either a single number,
  72. # or a range marked by two numbers (both number are included in the range)
  73. # 2. Sub ranges must be in ascend order and not overlap with each other
  74. # 3. No space in the range expression
  75. def parse_range_list(range_str):
  76. number_list = []
  77. last = -1
  78. range_list = range_str.split(',')
  79. for sub_range in range_list:
  80. sub_number_list = parse_range(sub_range)
  81. if sub_number_list[0] <= last:
  82. raise ValueError(
  83. "Bad range: '%s', sub ranges must not overlap with each other and should be in ascend order" %
  84. (range_str, ))
  85. last = sub_number_list[-1]
  86. number_list.extend(sub_number_list)
  87. return number_list
  88. def get_numactl_cmd(bind_core_list, num_local_procs, local_rank):
  89. numactl_cmd = []
  90. check_for_numactl_pkg()
  91. if 'KMP_AFFINITY' in os.environ.keys():
  92. raise ValueError("Environment variable KMP_AFFINITY conflicts with numactl "
  93. "because it interfere with how many CPU cores numactl can set. "
  94. "Unset KMP_AFFINITY before launching deepspeed.\n\n"
  95. "\t$ unset KMP_AFFINITY\n"
  96. "\t$ deepspeed <deepspeed command parameters>")
  97. if bind_core_list is not None:
  98. core_list = parse_range_list(bind_core_list)
  99. total_cores = len(core_list)
  100. else:
  101. total_cores = psutil.cpu_count(logical=False)
  102. core_list = range(total_cores)
  103. cores_per_rank = total_cores // num_local_procs
  104. assert cores_per_rank >= 1, "At least one core needs to be assigned to each rank"
  105. core_list_for_rank = core_list[cores_per_rank * local_rank:cores_per_rank * (local_rank + 1)]
  106. numactl_cmd.append("numactl")
  107. # check if all cores belong to same numa, if true, bind process to that numa domain with -m parameter
  108. numa_cores = get_numa_cores()
  109. num_numas = len(numa_cores)
  110. numa_mode = "normal"
  111. non_empty_numa_list = []
  112. empty_numa_list = []
  113. previous_numa_cores = []
  114. numa_node_list = []
  115. numa_node_list_list = []
  116. for i in range(num_numas):
  117. # look for empty numa which is HBM numa
  118. if numa_cores[i] == []:
  119. empty_numa_list.append(i)
  120. else:
  121. non_empty_numa_list.append(i)
  122. # check for fakenuma
  123. if numa_cores[i] == previous_numa_cores:
  124. if numa_node_list == []:
  125. #first duplication, add previous node into list
  126. numa_node_list.append(i - 1)
  127. numa_node_list.append(i)
  128. else:
  129. if numa_node_list != []:
  130. numa_node_list_list.append(numa_node_list)
  131. numa_node_list = []
  132. previous_numa_cores = numa_cores[i]
  133. if numa_node_list != []:
  134. numa_node_list_list.append(numa_node_list)
  135. if empty_numa_list != [] and len(empty_numa_list) == len(non_empty_numa_list):
  136. numa_mode = "flat_hbm"
  137. numa_dict = dict(zip(non_empty_numa_list, empty_numa_list))
  138. elif numa_node_list_list != []:
  139. numa_mode = "fake"
  140. if numa_mode == "normal":
  141. for i in range(num_numas):
  142. if set(core_list_for_rank) <= set(numa_cores[i]):
  143. numactl_cmd.append("-m")
  144. numactl_cmd.append(f"{i}")
  145. break
  146. elif numa_mode == "flat_hbm":
  147. for i in range(num_numas):
  148. if set(core_list_for_rank) <= set(numa_cores[i]):
  149. numactl_cmd.append("-p")
  150. numactl_cmd.append(f"{numa_dict[i]}")
  151. break
  152. elif numa_mode == "fake":
  153. for i in range(num_numas):
  154. if set(core_list_for_rank) <= set(numa_cores[i]):
  155. for nodes in numa_node_list_list:
  156. if i in nodes:
  157. numactl_cmd.append("-m")
  158. numactl_cmd.append(f"{','.join(map(str, nodes))}")
  159. break
  160. # the following construct break the outer loop if inner loop breaks
  161. else:
  162. continue
  163. break
  164. numactl_cmd.append("-C")
  165. last_core = core_list_for_rank[0]
  166. first_core = last_core
  167. core_list_str = f"{last_core}"
  168. for core_id in core_list_for_rank[1:]:
  169. if core_id == last_core + 1:
  170. last_core = core_id
  171. continue
  172. else:
  173. if first_core == last_core:
  174. core_list_str = f"{core_list_str},{core_id}"
  175. else:
  176. core_list_str = f"{core_list_str}-{last_core},{core_id}"
  177. first_core = core_id
  178. last_core = core_id
  179. if first_core != last_core:
  180. core_list_str = f"{core_list_str}-{last_core}"
  181. numactl_cmd.append(f"{core_list_str}")
  182. return cores_per_rank, numactl_cmd