test_single_node.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. import numpy as np
  2. import time
  3. import ray
  4. import ray.autoscaler.sdk
  5. from ray._private.test_utils import Semaphore
  6. import json
  7. import os
  8. from time import perf_counter
  9. from tqdm import trange, tqdm
  10. MAX_ARGS = 10000
  11. MAX_RETURNS = 3000
  12. MAX_RAY_GET_ARGS = 10000
  13. MAX_QUEUED_TASKS = 1_000_000
  14. MAX_RAY_GET_SIZE = 100 * 2 ** 30
  15. def assert_no_leaks():
  16. total = ray.cluster_resources()
  17. current = ray.available_resources()
  18. total.pop("memory")
  19. total.pop("object_store_memory")
  20. current.pop("memory")
  21. current.pop("object_store_memory")
  22. assert total == current, (total, current)
  23. def test_many_args():
  24. @ray.remote
  25. def sum_args(*args):
  26. return sum(sum(arg) for arg in args)
  27. args = [[1 for _ in range(10000)] for _ in range(MAX_ARGS)]
  28. result = ray.get(sum_args.remote(*args))
  29. assert result == MAX_ARGS * 10000
  30. def test_many_returns():
  31. @ray.remote(num_returns=MAX_RETURNS)
  32. def f():
  33. to_return = []
  34. for _ in range(MAX_RETURNS):
  35. obj = list(range(10000))
  36. to_return.append(obj)
  37. return tuple(to_return)
  38. returned_refs = f.remote()
  39. assert len(returned_refs) == MAX_RETURNS
  40. for ref in returned_refs:
  41. expected = list(range(10000))
  42. obj = ray.get(ref)
  43. assert obj == expected
  44. def test_ray_get_args():
  45. def with_dese():
  46. print("Putting test objects:")
  47. refs = []
  48. for _ in trange(MAX_RAY_GET_ARGS):
  49. obj = list(range(10000))
  50. refs.append(ray.put(obj))
  51. print("Getting objects")
  52. results = ray.get(refs)
  53. assert len(results) == MAX_RAY_GET_ARGS
  54. print("Asserting correctness")
  55. for obj in tqdm(results):
  56. expected = list(range(10000))
  57. assert obj == expected
  58. def with_zero_copy():
  59. print("Putting test objects:")
  60. refs = []
  61. for _ in trange(MAX_RAY_GET_ARGS):
  62. obj = np.arange(10000)
  63. refs.append(ray.put(obj))
  64. print("Getting objects")
  65. results = ray.get(refs)
  66. assert len(results) == MAX_RAY_GET_ARGS
  67. print("Asserting correctness")
  68. for obj in tqdm(results):
  69. expected = np.arange(10000)
  70. assert (obj == expected).all()
  71. with_dese()
  72. print("Done with dese")
  73. with_zero_copy()
  74. print("Done with zero copy")
  75. def test_many_queued_tasks():
  76. sema = Semaphore.remote(0)
  77. @ray.remote(num_cpus=1)
  78. def block():
  79. ray.get(sema.acquire.remote())
  80. @ray.remote(num_cpus=1)
  81. def f():
  82. pass
  83. num_cpus = int(ray.cluster_resources()["CPU"])
  84. blocked_tasks = []
  85. for _ in range(num_cpus):
  86. blocked_tasks.append(block.remote())
  87. print("Submitting many tasks")
  88. pending_tasks = []
  89. for _ in trange(MAX_QUEUED_TASKS):
  90. pending_tasks.append(f.remote())
  91. # Make sure all the tasks can actually run.
  92. for _ in range(num_cpus):
  93. sema.release.remote()
  94. print("Unblocking tasks")
  95. for ref in tqdm(pending_tasks):
  96. assert ray.get(ref) is None
  97. def test_large_object():
  98. print("Generating object")
  99. obj = np.zeros(MAX_RAY_GET_SIZE, dtype=np.int8)
  100. print("Putting object")
  101. ref = ray.put(obj)
  102. del obj
  103. print("Getting object")
  104. big_obj = ray.get(ref)
  105. assert big_obj[0] == 0
  106. assert big_obj[-1] == 0
  107. ray.init(address="auto")
  108. args_start = perf_counter()
  109. test_many_args()
  110. args_end = perf_counter()
  111. time.sleep(5)
  112. assert_no_leaks()
  113. print("Finished many args")
  114. returns_start = perf_counter()
  115. test_many_returns()
  116. returns_end = perf_counter()
  117. time.sleep(5)
  118. assert_no_leaks()
  119. print("Finished many returns")
  120. get_start = perf_counter()
  121. test_ray_get_args()
  122. get_end = perf_counter()
  123. time.sleep(5)
  124. assert_no_leaks()
  125. print("Finished ray.get on many objects")
  126. queued_start = perf_counter()
  127. test_many_queued_tasks()
  128. queued_end = perf_counter()
  129. time.sleep(5)
  130. assert_no_leaks()
  131. print("Finished queueing many tasks")
  132. large_object_start = perf_counter()
  133. test_large_object()
  134. large_object_end = perf_counter()
  135. time.sleep(5)
  136. assert_no_leaks()
  137. print("Done")
  138. args_time = args_end - args_start
  139. returns_time = returns_end - returns_start
  140. get_time = get_end - get_start
  141. queued_time = queued_end - queued_start
  142. large_object_time = large_object_end - large_object_start
  143. print(f"Many args time: {args_time} ({MAX_ARGS} args)")
  144. print(f"Many returns time: {returns_time} ({MAX_RETURNS} returns)")
  145. print(f"Ray.get time: {get_time} ({MAX_RAY_GET_ARGS} args)")
  146. print(f"Queued task time: {queued_time} ({MAX_QUEUED_TASKS} tasks)")
  147. print(f"Ray.get large object time: {large_object_time} " f"({MAX_RAY_GET_SIZE} bytes)")
  148. if "TEST_OUTPUT_JSON" in os.environ:
  149. out_file = open(os.environ["TEST_OUTPUT_JSON"], "w")
  150. results = {
  151. "args_time": args_time,
  152. "num_args": MAX_ARGS,
  153. "returns_time": returns_time,
  154. "num_returns": MAX_RETURNS,
  155. "get_time": get_time,
  156. "num_get_args": MAX_RAY_GET_ARGS,
  157. "queued_time": queued_time,
  158. "num_queued": MAX_QUEUED_TASKS,
  159. "large_object_time": large_object_time,
  160. "large_object_size": MAX_RAY_GET_SIZE,
  161. "success": "1",
  162. }
  163. results["perf_metrics"] = [
  164. {
  165. "perf_metric_name": f"{MAX_ARGS}_args_time",
  166. "perf_metric_value": args_time,
  167. "perf_metric_type": "LATENCY",
  168. },
  169. {
  170. "perf_metric_name": f"{MAX_RETURNS}_returns_time",
  171. "perf_metric_value": returns_time,
  172. "perf_metric_type": "LATENCY",
  173. },
  174. {
  175. "perf_metric_name": f"{MAX_RAY_GET_ARGS}_get_time",
  176. "perf_metric_value": get_time,
  177. "perf_metric_type": "LATENCY",
  178. },
  179. {
  180. "perf_metric_name": f"{MAX_QUEUED_TASKS}_queued_time",
  181. "perf_metric_value": queued_time,
  182. "perf_metric_type": "LATENCY",
  183. },
  184. {
  185. "perf_metric_name": f"{MAX_RAY_GET_SIZE}_large_object_time",
  186. "perf_metric_value": large_object_time,
  187. "perf_metric_type": "LATENCY",
  188. },
  189. ]
  190. json.dump(results, out_file)