test_many_actors.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import os
  2. import ray
  3. import ray._private.test_utils as test_utils
  4. import time
  5. import tqdm
  6. from many_nodes_tests.dashboard_test import DashboardTestAtScale
  7. from ray._private.state_api_test_utils import summarize_worker_startup_time
  8. is_smoke_test = True
  9. if "SMOKE_TEST" in os.environ:
  10. MAX_ACTORS_IN_CLUSTER = 100
  11. else:
  12. MAX_ACTORS_IN_CLUSTER = 10000
  13. is_smoke_test = False
  14. def test_max_actors():
  15. # TODO (Alex): Dynamically set this based on number of cores
  16. cpus_per_actor = 0.25
  17. @ray.remote(num_cpus=cpus_per_actor)
  18. class Actor:
  19. def foo(self):
  20. pass
  21. actors = [
  22. Actor.remote()
  23. for _ in tqdm.trange(MAX_ACTORS_IN_CLUSTER, desc="Launching actors")
  24. ]
  25. done = ray.get([actor.foo.remote() for actor in actors])
  26. for result in done:
  27. assert result is None
  28. def no_resource_leaks():
  29. return test_utils.no_resource_leaks_excluding_node_resources()
  30. addr = ray.init(address="auto")
  31. test_utils.wait_for_condition(no_resource_leaks)
  32. monitor_actor = test_utils.monitor_memory_usage()
  33. dashboard_test = DashboardTestAtScale(addr)
  34. start_time = time.time()
  35. test_max_actors()
  36. end_time = time.time()
  37. ray.get(monitor_actor.stop_run.remote())
  38. used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote())
  39. print(f"Peak memory usage: {round(used_gb, 2)}GB")
  40. print(f"Peak memory usage per processes:\n {usage}")
  41. del monitor_actor
  42. # Get the dashboard result
  43. test_utils.wait_for_condition(no_resource_leaks)
  44. rate = MAX_ACTORS_IN_CLUSTER / (end_time - start_time)
  45. try:
  46. summarize_worker_startup_time()
  47. except Exception as e:
  48. print("Failed to summarize worker startup time.")
  49. print(e)
  50. print(
  51. f"Success! Started {MAX_ACTORS_IN_CLUSTER} actors in "
  52. f"{end_time - start_time}s. ({rate} actors/s)"
  53. )
  54. results = {
  55. "actors_per_second": rate,
  56. "num_actors": MAX_ACTORS_IN_CLUSTER,
  57. "time": end_time - start_time,
  58. "success": "1",
  59. "_peak_memory": round(used_gb, 2),
  60. "_peak_process_memory": usage,
  61. }
  62. if not is_smoke_test:
  63. results["perf_metrics"] = [
  64. {
  65. "perf_metric_name": "actors_per_second",
  66. "perf_metric_value": rate,
  67. "perf_metric_type": "THROUGHPUT",
  68. }
  69. ]
  70. dashboard_test.update_release_test_result(results)
  71. test_utils.safe_write_to_results_json(results)