test_scheduling.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import ray
  2. import argparse
  3. from time import time, sleep
  4. from math import floor
  5. from ray._private.test_utils import safe_write_to_results_json
  6. import ray._private.test_utils as test_utils
  7. @ray.remote
  8. def simple_task(t):
  9. sleep(t)
  10. @ray.remote
  11. class SimpleActor:
  12. def __init__(self, job=None):
  13. self._job = job
  14. def ready(self):
  15. return
  16. def do_job(self):
  17. if self._job is not None:
  18. self._job()
  19. def start_tasks(num_task, num_cpu_per_task, task_duration):
  20. ray.get(
  21. [
  22. simple_task.options(num_cpus=num_cpu_per_task).remote(task_duration)
  23. for _ in range(num_task)
  24. ]
  25. )
  26. def measure(f):
  27. start = time()
  28. ret = f()
  29. end = time()
  30. return (end - start, ret)
  31. def start_actor(num_actors, num_actors_per_nodes, job):
  32. resources = {"node": floor(1.0 / num_actors_per_nodes)}
  33. submission_cost, actors = measure(
  34. lambda: [
  35. SimpleActor.options(resources=resources, num_cpus=0).remote(job)
  36. for _ in range(num_actors)
  37. ]
  38. )
  39. ready_cost, _ = measure(lambda: ray.get([actor.ready.remote() for actor in actors]))
  40. actor_job_cost, _ = measure(
  41. lambda: ray.get([actor.do_job.remote() for actor in actors])
  42. )
  43. return (submission_cost, ready_cost, actor_job_cost)
  44. if __name__ == "__main__":
  45. parser = argparse.ArgumentParser(prog="Test Scheduling")
  46. # Task workloads
  47. parser.add_argument(
  48. "--total-num-task", type=int, help="Total number of tasks.", required=False
  49. )
  50. parser.add_argument(
  51. "--num-cpu-per-task",
  52. type=int,
  53. help="Resources needed for tasks.",
  54. required=False,
  55. )
  56. parser.add_argument(
  57. "--task-duration-s",
  58. type=int,
  59. help="How long does each task execute.",
  60. required=False,
  61. default=1,
  62. )
  63. # Actor workloads
  64. parser.add_argument(
  65. "--total-num-actors", type=int, help="Total number of actors.", required=True
  66. )
  67. parser.add_argument(
  68. "--num-actors-per-nodes",
  69. type=int,
  70. help="How many actors to allocate for each nodes.",
  71. required=True,
  72. )
  73. ray.init(address="auto")
  74. monitor_actor = test_utils.monitor_memory_usage()
  75. total_cpus_per_node = [node["Resources"].get("CPU", 0) for node in ray.nodes()]
  76. num_nodes = len(total_cpus_per_node)
  77. total_cpus = sum(total_cpus_per_node)
  78. args = parser.parse_args()
  79. job = None
  80. if args.total_num_task is not None:
  81. if args.num_cpu_per_task is None:
  82. args.num_cpu_per_task = floor(1.0 * total_cpus / args.total_num_task)
  83. job = lambda: start_tasks( # noqa: E731
  84. args.total_num_task, args.num_cpu_per_task, args.task_duration_s
  85. )
  86. submission_cost, ready_cost, actor_job_cost = start_actor(
  87. args.total_num_actors, args.num_actors_per_nodes, job
  88. )
  89. ray.get(monitor_actor.stop_run.remote())
  90. used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote())
  91. print(f"Peak memory usage: {round(used_gb, 2)}GB")
  92. print(f"Peak memory usage per processes:\n {usage}")
  93. del monitor_actor
  94. result = {
  95. "total_num_task": args.total_num_task,
  96. "num_cpu_per_task": args.num_cpu_per_task,
  97. "task_duration_s": args.task_duration_s,
  98. "total_num_actors": args.total_num_actors,
  99. "num_actors_per_nodes": args.num_actors_per_nodes,
  100. "num_nodes": num_nodes,
  101. "total_cpus": total_cpus,
  102. "submission_cost": submission_cost,
  103. "ready_cost": ready_cost,
  104. "actor_job_cost": actor_job_cost,
  105. "_peak_memory": round(used_gb, 2),
  106. "_peak_process_memory": usage,
  107. "_runtime": submission_cost + ready_cost + actor_job_cost,
  108. }
  109. safe_write_to_results_json(result)
  110. print(result)