123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- import ray
- import argparse
- from time import time, sleep
- from math import floor
- from ray._private.test_utils import safe_write_to_results_json
- import ray._private.test_utils as test_utils
- @ray.remote
- def simple_task(t):
- sleep(t)
- @ray.remote
- class SimpleActor:
- def __init__(self, job=None):
- self._job = job
- def ready(self):
- return
- def do_job(self):
- if self._job is not None:
- self._job()
- def start_tasks(num_task, num_cpu_per_task, task_duration):
- ray.get(
- [
- simple_task.options(num_cpus=num_cpu_per_task).remote(task_duration)
- for _ in range(num_task)
- ]
- )
- def measure(f):
- start = time()
- ret = f()
- end = time()
- return (end - start, ret)
- def start_actor(num_actors, num_actors_per_nodes, job):
- resources = {"node": floor(1.0 / num_actors_per_nodes)}
- submission_cost, actors = measure(
- lambda: [
- SimpleActor.options(resources=resources, num_cpus=0).remote(job)
- for _ in range(num_actors)
- ]
- )
- ready_cost, _ = measure(lambda: ray.get([actor.ready.remote() for actor in actors]))
- actor_job_cost, _ = measure(
- lambda: ray.get([actor.do_job.remote() for actor in actors])
- )
- return (submission_cost, ready_cost, actor_job_cost)
- if __name__ == "__main__":
- parser = argparse.ArgumentParser(prog="Test Scheduling")
- # Task workloads
- parser.add_argument(
- "--total-num-task", type=int, help="Total number of tasks.", required=False
- )
- parser.add_argument(
- "--num-cpu-per-task",
- type=int,
- help="Resources needed for tasks.",
- required=False,
- )
- parser.add_argument(
- "--task-duration-s",
- type=int,
- help="How long does each task execute.",
- required=False,
- default=1,
- )
- # Actor workloads
- parser.add_argument(
- "--total-num-actors", type=int, help="Total number of actors.", required=True
- )
- parser.add_argument(
- "--num-actors-per-nodes",
- type=int,
- help="How many actors to allocate for each nodes.",
- required=True,
- )
- ray.init(address="auto")
- monitor_actor = test_utils.monitor_memory_usage()
- total_cpus_per_node = [node["Resources"].get("CPU", 0) for node in ray.nodes()]
- num_nodes = len(total_cpus_per_node)
- total_cpus = sum(total_cpus_per_node)
- args = parser.parse_args()
- job = None
- if args.total_num_task is not None:
- if args.num_cpu_per_task is None:
- args.num_cpu_per_task = floor(1.0 * total_cpus / args.total_num_task)
- job = lambda: start_tasks( # noqa: E731
- args.total_num_task, args.num_cpu_per_task, args.task_duration_s
- )
- submission_cost, ready_cost, actor_job_cost = start_actor(
- args.total_num_actors, args.num_actors_per_nodes, job
- )
- ray.get(monitor_actor.stop_run.remote())
- used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote())
- print(f"Peak memory usage: {round(used_gb, 2)}GB")
- print(f"Peak memory usage per processes:\n {usage}")
- del monitor_actor
- result = {
- "total_num_task": args.total_num_task,
- "num_cpu_per_task": args.num_cpu_per_task,
- "task_duration_s": args.task_duration_s,
- "total_num_actors": args.total_num_actors,
- "num_actors_per_nodes": args.num_actors_per_nodes,
- "num_nodes": num_nodes,
- "total_cpus": total_cpus,
- "submission_cost": submission_cost,
- "ready_cost": ready_cost,
- "actor_job_cost": actor_job_cost,
- "_peak_memory": round(used_gb, 2),
- "_peak_process_memory": usage,
- "_runtime": submission_cost + ready_cost + actor_job_cost,
- }
- safe_write_to_results_json(result)
- print(result)
|