test_many_tasks.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. import click
  2. import ray
  3. import ray._private.test_utils as test_utils
  4. import time
  5. import tqdm
  6. from ray.util.state import summarize_tasks
  7. from many_nodes_tests.dashboard_test import DashboardTestAtScale
  8. from ray._private.state_api_test_utils import (
  9. StateAPICallSpec,
  10. periodic_invoke_state_apis_with_actor,
  11. summarize_worker_startup_time,
  12. )
  13. sleep_time = 300
  14. def test_max_running_tasks(num_tasks):
  15. cpus_per_task = 0.25
  16. @ray.remote(num_cpus=cpus_per_task)
  17. def task():
  18. time.sleep(sleep_time)
  19. def time_up(start_time):
  20. return time.time() - start_time >= sleep_time
  21. refs = [task.remote() for _ in tqdm.trange(num_tasks, desc="Launching tasks")]
  22. max_cpus = ray.cluster_resources()["CPU"]
  23. min_cpus_available = max_cpus
  24. start_time = time.time()
  25. for _ in tqdm.trange(int(sleep_time / 0.1), desc="Waiting"):
  26. try:
  27. cur_cpus = ray.available_resources().get("CPU", 0)
  28. min_cpus_available = min(min_cpus_available, cur_cpus)
  29. except Exception:
  30. # There are race conditions `.get` can fail if a new heartbeat
  31. # comes at the same time.
  32. pass
  33. if time_up(start_time):
  34. print(f"Time up for sleeping {sleep_time} seconds")
  35. break
  36. time.sleep(0.1)
  37. # There are some relevant magic numbers in this check. 10k tasks each
  38. # require 1/4 cpus. Therefore, ideally 2.5k cpus will be used.
  39. used_cpus = max_cpus - min_cpus_available
  40. err_str = f"Only {used_cpus}/{max_cpus} cpus used."
  41. # 1500 tasks. Note that it is a pretty low threshold, and the
  42. # performance should be tracked via perf dashboard.
  43. threshold = num_tasks * cpus_per_task * 0.60
  44. print(f"{used_cpus}/{max_cpus} used.")
  45. assert used_cpus > threshold, err_str
  46. for _ in tqdm.trange(num_tasks, desc="Ensuring all tasks have finished"):
  47. done, refs = ray.wait(refs)
  48. assert ray.get(done[0]) is None
  49. return used_cpus
  50. def no_resource_leaks():
  51. return test_utils.no_resource_leaks_excluding_node_resources()
  52. @click.command()
  53. @click.option("--num-tasks", required=True, type=int, help="Number of tasks to launch.")
  54. def test(num_tasks):
  55. addr = ray.init(address="auto")
  56. test_utils.wait_for_condition(no_resource_leaks)
  57. monitor_actor = test_utils.monitor_memory_usage()
  58. dashboard_test = DashboardTestAtScale(addr)
  59. def not_none(res):
  60. return res is not None
  61. api_caller = periodic_invoke_state_apis_with_actor(
  62. apis=[StateAPICallSpec(summarize_tasks, not_none)],
  63. call_interval_s=4,
  64. print_result=True,
  65. )
  66. start_time = time.time()
  67. used_cpus = test_max_running_tasks(num_tasks)
  68. end_time = time.time()
  69. ray.get(monitor_actor.stop_run.remote())
  70. used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote())
  71. print(f"Peak memory usage: {round(used_gb, 2)}GB")
  72. print(f"Peak memory usage per processes:\n {usage}")
  73. ray.get(api_caller.stop.remote())
  74. del api_caller
  75. del monitor_actor
  76. test_utils.wait_for_condition(no_resource_leaks)
  77. try:
  78. summarize_worker_startup_time()
  79. except Exception as e:
  80. print("Failed to summarize worker startup time.")
  81. print(e)
  82. rate = num_tasks / (end_time - start_time - sleep_time)
  83. print(
  84. f"Success! Started {num_tasks} tasks in {end_time - start_time}s. "
  85. f"({rate} tasks/s)"
  86. )
  87. results = {
  88. "tasks_per_second": rate,
  89. "num_tasks": num_tasks,
  90. "time": end_time - start_time,
  91. "used_cpus": used_cpus,
  92. "success": "1",
  93. "_peak_memory": round(used_gb, 2),
  94. "_peak_process_memory": usage,
  95. "perf_metrics": [
  96. {
  97. "perf_metric_name": "tasks_per_second",
  98. "perf_metric_value": rate,
  99. "perf_metric_type": "THROUGHPUT",
  100. },
  101. {
  102. "perf_metric_name": "used_cpus_by_deadline",
  103. "perf_metric_value": used_cpus,
  104. "perf_metric_type": "THROUGHPUT",
  105. },
  106. ],
  107. }
  108. dashboard_test.update_release_test_result(results)
  109. test_utils.safe_write_to_results_json(results)
  110. if __name__ == "__main__":
  111. test()