123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- #!/usr/bin/env python3
- """
- Helper file for benchmark_worker_startup.py. This file runs a particular test
- configuration.
- """
- import argparse
- import ray
- import sys
- import time
- @ray.remote
- class Actor:
- def run_code(self, should_import_torch: bool):
- if should_import_torch:
- import torch # noqa: F401
- @ray.remote
- def task(should_import_torch: bool):
- if should_import_torch:
- import torch # noqa: F401
- def main(
- metrics_actor,
- test_name: str,
- num_runs: int,
- num_tasks_or_actors_per_run: int,
- num_cpus_in_cluster: int,
- num_gpus_in_cluster: int,
- library_to_import: str,
- use_actors: bool,
- with_gpu: bool,
- with_runtime_env: bool,
- ):
- num_gpus = (num_gpus_in_cluster / num_tasks_or_actors_per_run) if with_gpu else 0
- num_cpus = num_cpus_in_cluster / num_tasks_or_actors_per_run
- print(f"Assigning each task/actor {num_cpus} num_cpus and {num_gpus} num_gpus")
- actor_with_resources = Actor.options(num_gpus=num_gpus, num_cpus=num_cpus)
- task_with_resources = task.options(num_gpus=num_gpus, num_cpus=num_cpus)
- should_import_torch = library_to_import == "torch"
- print(f"should_import_torch: {should_import_torch}")
- fail_if_incorrect_runtime_env(expect_runtime_env=with_runtime_env)
- def with_actors():
- actors = [
- actor_with_resources.remote() for _ in range(num_tasks_or_actors_per_run)
- ]
- ray.get([actor.run_code.remote(should_import_torch) for actor in actors])
- def with_tasks():
- ray.get(
- [
- task_with_resources.remote(should_import_torch)
- for _ in range(num_tasks_or_actors_per_run)
- ]
- )
- func_to_measure = with_actors if use_actors else with_tasks
- for run in range(num_runs):
- print(f"Starting measurement for run {run}")
- start = time.time()
- func_to_measure()
- dur_s = time.time() - start
- ray.get(metrics_actor.submit.remote(test_name, dur_s))
- def fail_if_incorrect_runtime_env(expect_runtime_env: bool):
- ctx = ray.runtime_context.get_runtime_context()
- print(f"Found runtime_env={ctx.runtime_env}")
- if expect_runtime_env and ctx.runtime_env == {}:
- raise AssertionError(
- f"Expected a runtime environment but found runtime_env={ctx.runtime_env}"
- )
- if not expect_runtime_env and ctx.runtime_env != {}:
- raise AssertionError(
- f"Expected no runtime environment but found runtime_env={ctx.runtime_env}"
- )
- def parse_args():
- parser = argparse.ArgumentParser()
- parser.add_argument("--metrics_actor_name", type=str, required=True)
- parser.add_argument("--metrics_actor_namespace", type=str, required=True)
- parser.add_argument("--test_name", type=str, required=True)
- parser.add_argument("--num_runs", type=int, required=True)
- parser.add_argument("--num_tasks_or_actors_per_run", type=int, required=True)
- parser.add_argument("--num_cpus_in_cluster", type=int, required=True)
- parser.add_argument("--num_gpus_in_cluster", type=int, required=True)
- parser.add_argument(
- "--library_to_import", type=str, required=True, choices=["torch", "none"]
- )
- group = parser.add_mutually_exclusive_group(required=True)
- group.add_argument("--with_actors", action="store_true")
- group.add_argument("--with_tasks", action="store_true")
- group = parser.add_mutually_exclusive_group(required=True)
- group.add_argument("--with_gpu", action="store_true")
- group.add_argument("--without_gpu", action="store_true")
- group = parser.add_mutually_exclusive_group(required=True)
- group.add_argument("--with_runtime_env", action="store_true")
- group.add_argument("--without_runtime_env", action="store_true")
- return parser.parse_args()
- if __name__ == "__main__":
- args = parse_args()
- metrics_actor = ray.get_actor(
- args.metrics_actor_name,
- args.metrics_actor_namespace,
- )
- sys.exit(
- main(
- metrics_actor,
- args.test_name,
- args.num_runs,
- args.num_tasks_or_actors_per_run,
- args.num_cpus_in_cluster,
- args.num_gpus_in_cluster,
- args.library_to_import,
- args.with_actors,
- args.with_gpu,
- args.with_runtime_env,
- )
- )
|