accelerated_dag_gpu_microbenchmark.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. # coding: utf-8
  2. import logging
  3. import torch
  4. import ray.cloudpickle as pickle
  5. import io
  6. import cupy
  7. import numpy as np
  8. import time
  9. import ray
  10. from ray.air._internal import torch_utils
  11. import ray.cluster_utils
  12. from ray.dag import InputNode
  13. from ray.util.collective.collective_group import nccl_util
  14. from ray.experimental.channel.torch_tensor_type import TorchTensorType
  15. from ray._private.ray_microbenchmark_helpers import timeit
  16. logger = logging.getLogger(__name__)
  17. SHAPE = None
  18. DTYPE = torch.float16
  19. NUM_ITERS = 10
  20. @ray.remote
  21. class TorchIpcWorker:
  22. def __init__(self):
  23. self.device = torch_utils.get_devices()[0]
  24. def send(self, shape, dtype, value: int):
  25. t = torch.ones(shape, dtype=dtype, device=self.device) * value
  26. if self.device.type == "cuda":
  27. # NOTE(swang): This is needed because the IPC can get sent before
  28. # the value has been written to memory. But somehow the read value
  29. # is still the wrong one?
  30. torch.cuda.synchronize()
  31. h = cupy.cuda.runtime.ipcGetMemHandle(t.data_ptr())
  32. return h
  33. def recv(self, device_ptr, num_bytes, shape, dtype):
  34. h = cupy.cuda.runtime.ipcOpenMemHandle(device_ptr)
  35. m = cupy.cuda.UnownedMemory(h, num_bytes, None)
  36. m_ptr = cupy.cuda.MemoryPointer(m, 0)
  37. tensor = torch.tensor(cupy.ndarray(shape, dtype, m_ptr), device=self.device)
  38. assert tensor.device == self.device
  39. return (tensor[0].item(), tensor.shape, tensor.dtype)
  40. @ray.remote
  41. class TorchTensorWorker:
  42. def __init__(self):
  43. self.device = torch_utils.get_devices()[0]
  44. def send(self, shape, dtype, value: int):
  45. t = torch.ones(shape, dtype=dtype, device=self.device) * value
  46. return t
  47. def recv(self, tensor):
  48. assert tensor.device == self.device
  49. return (tensor[0].item(), tensor.shape, tensor.dtype)
  50. @ray.remote(num_gpus=1)
  51. class NcclWorker:
  52. def __init__(self, world_size, rank, comm_id):
  53. from ray.air._internal import torch_utils
  54. self.device = torch_utils.get_devices()[0]
  55. self.world_size = world_size
  56. self.rank = rank
  57. torch.distributed.init_process_group(
  58. backend="nccl", world_size=world_size, rank=rank
  59. )
  60. def _send(self, buf, num_el, rank):
  61. torch.distributed.send(buf, rank)
  62. def _recv(self, buf, num_el, rank):
  63. torch.distributed.recv(buf, rank)
  64. def do_send_recv(self, shape, dtype):
  65. other_rank = (self.rank + 1) % self.world_size
  66. def _run():
  67. if self.rank == 0:
  68. i = np.random.randint(100)
  69. input_buffer = torch.ones(shape, dtype=dtype, device=self.device) * i
  70. self._send(input_buffer, input_buffer.numel(), other_rank)
  71. else:
  72. input_buffer = torch.zeros(shape, dtype=dtype, device=self.device)
  73. self._recv(input_buffer, input_buffer.numel(), other_rank)
  74. torch.cuda.synchronize()
  75. return timeit("exec_nccl_gpu", _run)
  76. def exec_ray_dag(
  77. label, sender, receiver, use_nccl=False, use_adag=True, dynamic_shape=False
  78. ):
  79. # Test torch.Tensor sent between actors.
  80. with InputNode() as inp:
  81. dag = sender.send.bind(SHAPE, DTYPE, inp)
  82. if use_adag:
  83. dag = dag.with_type_hint(
  84. TorchTensorType(
  85. "auto" if dynamic_shape else SHAPE,
  86. "auto" if dynamic_shape else DTYPE,
  87. transport="nccl" if use_nccl else "auto",
  88. )
  89. )
  90. dag = receiver.recv.bind(dag)
  91. if use_adag:
  92. dag = dag.experimental_compile(_buffer_size_bytes=int(SHAPE[0] * 3))
  93. def _run():
  94. i = np.random.randint(100)
  95. ref = dag.execute(i)
  96. # TODO(swang): Replace with fake ObjectRef.
  97. result = ray.get(ref)
  98. assert result == (i, SHAPE, DTYPE)
  99. else:
  100. def _run():
  101. i = np.random.randint(100)
  102. result = ray.get(dag.execute(i))
  103. assert result == (i, SHAPE, DTYPE)
  104. results = timeit(label, _run)
  105. if use_adag:
  106. dag.teardown()
  107. # Workaround for Ray bug in reusing GPUs too quickly.
  108. # See https://github.com/ray-project/ray/issues/44821.
  109. ray.kill(sender)
  110. ray.kill(receiver)
  111. time.sleep(1)
  112. return results
  113. def exec_ray_dag_ipc(label, sender, receiver, use_nccl=False):
  114. # Test torch.Tensor sent between actors.
  115. with InputNode() as inp:
  116. dag = sender.send.bind(SHAPE, DTYPE, inp)
  117. dag = receiver.recv.bind(
  118. dag,
  119. # torch.float16 has item size of 2 bytes.
  120. SHAPE[0] * 2,
  121. SHAPE,
  122. nccl_util.TORCH_NUMPY_DTYPE_MAP[DTYPE],
  123. )
  124. compiled_dag = dag.experimental_compile(_buffer_size_bytes=int(SHAPE[0] * 3))
  125. # Flag that each run can set if it sees incorrect results.
  126. ok = [True]
  127. def _run():
  128. i = np.random.randint(100)
  129. output_channel = compiled_dag.execute(i)
  130. # TODO(swang): Replace with fake ObjectRef.
  131. result = output_channel.read()
  132. if result != (i, SHAPE, DTYPE):
  133. ok[0] = False
  134. results = timeit(label, _run)
  135. if not ok[0]:
  136. logger.warning("IPC DAG returned incorrect result")
  137. compiled_dag.teardown()
  138. return results
  139. def _exec_torch_cpu_cpu():
  140. i = np.random.randint(100)
  141. t = torch.ones(SHAPE, dtype=DTYPE) * i
  142. t2 = t.to(copy=True)
  143. assert (t2[0].item(), t2.shape, t2.dtype) == (i, SHAPE, DTYPE)
  144. def _exec_torch_gpu():
  145. i = np.random.randint(100)
  146. device_from = torch.device("cuda:1")
  147. device_to = torch.device("cuda:0")
  148. t = torch.ones(SHAPE, dtype=DTYPE, device=device_from) * i
  149. t2 = t.to(device_to)
  150. torch.cuda.synchronize(device_to)
  151. assert (t2[0].item(), t2.shape, t2.dtype) == (i, SHAPE, DTYPE)
  152. def exec_nccl_gpu():
  153. import cupy.cuda.nccl
  154. comm_id = cupy.cuda.nccl.get_unique_id()
  155. workers = [NcclWorker.remote(2, i, comm_id) for i in range(2)]
  156. tasks = [worker.do_send_recv.remote(SHAPE, DTYPE) for worker in workers]
  157. done_refs, _ = ray.wait(tasks, num_returns=1)
  158. results = ray.get(done_refs[0])
  159. # Workaround for Ray bug in reusing GPUs too quickly.
  160. # See https://github.com/ray-project/ray/issues/44821.
  161. for worker in workers:
  162. ray.kill(worker)
  163. time.sleep(1)
  164. return results
  165. def _exec_torch_gpu_cpu_gpu():
  166. i = np.random.randint(100)
  167. device_from = torch.device("cuda:0")
  168. device_to = torch.device("cuda:1")
  169. t = torch.ones(SHAPE, dtype=DTYPE, device=device_from) * i
  170. t = t.to("cpu")
  171. t2 = t.to(device_to)
  172. torch.cuda.synchronize(device_to)
  173. assert (t2[0].item(), t2.shape, t2.dtype) == (i, SHAPE, DTYPE)
  174. def _exec_pickle_cpu():
  175. i = np.random.randint(100)
  176. t = torch.ones(SHAPE, dtype=DTYPE) * i
  177. byte_stream = io.BytesIO()
  178. pickle.dump(t, byte_stream)
  179. byte_stream.seek(0)
  180. pickle.load(byte_stream)
  181. def _exec_pickle_gpu():
  182. i = np.random.randint(100)
  183. t = torch.ones(SHAPE, dtype=DTYPE, device="cuda") * i
  184. byte_stream = io.BytesIO()
  185. pickle.dump(t, byte_stream)
  186. byte_stream.seek(0)
  187. pickle.load(byte_stream)
  188. def _exec_ray_put_cpu():
  189. i = np.random.randint(100)
  190. t = torch.ones(SHAPE, dtype=DTYPE) * i
  191. ray.get(ray.put(t))
  192. def _exec_ray_put_np_zero_copy():
  193. i = np.random.randint(100)
  194. t = torch.ones(SHAPE, dtype=DTYPE) * i
  195. torch.as_tensor(ray.get(ray.put(t.numpy())))
  196. def _exec_ray_put_gpu():
  197. i = np.random.randint(100)
  198. t = torch.ones(SHAPE, dtype=DTYPE, device="cuda") * i
  199. ray.get(ray.put(t))
  200. def exec_ray_dag_cpu():
  201. sender = TorchTensorWorker.options().remote()
  202. receiver = TorchTensorWorker.options().remote()
  203. return exec_ray_dag("exec_ray_dag_cpu", sender, receiver)
  204. def exec_ray_core_cpu():
  205. time.sleep(1)
  206. sender = TorchTensorWorker.remote()
  207. receiver = TorchTensorWorker.remote()
  208. return exec_ray_dag("exec_ray_core_cpu", sender, receiver, use_adag=False)
  209. def exec_ray_dag_gpu_ipc_gpu():
  210. time.sleep(1)
  211. sender = TorchIpcWorker.options(num_gpus=1).remote()
  212. receiver = TorchIpcWorker.options(num_gpus=1).remote()
  213. return exec_ray_dag_ipc("exec_ray_dag_gpu_ipc_gpu", sender, receiver)
  214. def exec_ray_dag_gpu_cpu_gpu():
  215. time.sleep(1)
  216. sender = TorchTensorWorker.options(num_gpus=1).remote()
  217. receiver = TorchTensorWorker.options(num_gpus=1).remote()
  218. return exec_ray_dag("exec_ray_dag_gpu_cpu_gpu", sender, receiver)
  219. def exec_ray_dag_gpu_nccl(dynamic_shape: bool = False):
  220. time.sleep(1)
  221. sender = TorchTensorWorker.options(num_gpus=1).remote()
  222. receiver = TorchTensorWorker.options(num_gpus=1).remote()
  223. return exec_ray_dag(
  224. "exec_ray_dag_gpu_nccl" + ("_dynamic" if dynamic_shape else ""),
  225. sender,
  226. receiver,
  227. use_nccl=True,
  228. dynamic_shape=dynamic_shape,
  229. )
  230. def exec_ray_core_gpu():
  231. time.sleep(1)
  232. sender = TorchTensorWorker.options(num_gpus=1).remote()
  233. receiver = TorchTensorWorker.options(num_gpus=1).remote()
  234. return exec_ray_dag("exec_ray_core_gpu", sender, receiver, use_adag=False)
  235. def main():
  236. results = []
  237. ray.init(
  238. runtime_env={
  239. "env_vars": {
  240. "CUDA_VISIBLE_DEVICES": "0,1",
  241. # Needed for torch distributed.
  242. "MASTER_ADDR": "localhost",
  243. "MASTER_PORT": "8888",
  244. }
  245. }
  246. )
  247. results += timeit("exec_torch_cpu_cpu", _exec_torch_cpu_cpu)
  248. results += timeit("exec_torch_gpu", _exec_torch_gpu)
  249. results += timeit("exec_torch_gpu_cpu_gpu", _exec_torch_gpu_cpu_gpu)
  250. results += exec_nccl_gpu()
  251. results += timeit("exec_ray_put_cpu", _exec_ray_put_cpu)
  252. results += timeit("exec_ray_put_np_zero_copy", _exec_ray_put_np_zero_copy)
  253. results += timeit("exec_ray_put_gpu", _exec_ray_put_gpu)
  254. results += exec_ray_core_cpu()
  255. results += exec_ray_dag_cpu()
  256. results += exec_ray_core_gpu()
  257. results += exec_ray_dag_gpu_cpu_gpu()
  258. results += exec_ray_dag_gpu_nccl(dynamic_shape=True)
  259. results += exec_ray_dag_gpu_nccl(dynamic_shape=False)
  260. if __name__ == "__main__":
  261. import argparse
  262. parser = argparse.ArgumentParser()
  263. parser.add_argument(
  264. "--tensor-size-bytes",
  265. type=int,
  266. # 100KB
  267. default=100_000,
  268. )
  269. args = parser.parse_args()
  270. # Divide by 2 because we're using torch.float16.
  271. SHAPE = (args.tensor_size_bytes // 2,)
  272. main()