123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364 |
- #!/usr/bin/env python3
- """
- $ ./benchmark_worker_startup.py --help
- usage: benchmark_worker_startup.py [-h] --num_gpus_in_cluster
- NUM_GPUS_IN_CLUSTER
- --num_cpus_in_cluster
- NUM_CPUS_IN_CLUSTER
- --num_tasks_or_actors_per_run
- NUM_TASKS_OR_ACTORS_PER_RUN
- --num_measurements_per_configuration
- NUM_MEASUREMENTS_PER_CONFIGURATION
- This release test measures Ray worker startup time. Specifically, it
- measures the time to start N different tasks or actors, where each task or
- actor imports a large library (currently PyTorch). N is configurable. The
- test runs under a few different configurations: {task, actor} x {runtime
- env, no runtime env} x {GPU, no GPU} x {cold start, warm start} x {import
- torch, no imports}.
- options:
- -h, --help show this help message and exit
- --num_gpus_in_cluster NUM_GPUS_IN_CLUSTER
- The number of GPUs in the cluster. This determines
- how many GPU resources each actor/task requests.
- --num_cpus_in_cluster NUM_CPUS_IN_CLUSTER
- The number of CPUs in the cluster. This determines
- how many CPU resources each actor/task requests.
- --num_tasks_or_actors_per_run NUM_TASKS_OR_ACTORS_PER_RUN
- The number of tasks or actors per 'run'. A run
- starts this many tasks/actors and consitutes a
- single measurement. Several runs can be composed
- within a single job for measure warm start, or
- spread across different jobs to measure cold start.
- --num_measurements_per_configuration NUM_MEASUREMENTS_PER_CONFIGURATION
- The number of measurements to record per
- configuration.
- This script uses test_single_configuration.py to run the actual
- measurements.
- """
- from collections import defaultdict
- from dataclasses import dataclass
- from ray._private.test_utils import safe_write_to_results_json
- from ray.job_submission import JobSubmissionClient, JobStatus
- import argparse
- import asyncio
- import random
- import ray
- import statistics
- import subprocess
- import sys
- def main(
- num_cpus_in_cluster: int,
- num_gpus_in_cluster: int,
- num_tasks_or_actors_per_run: int,
- num_measurements_per_configuration: int,
- ):
- """
- Generate test cases, then run them in random order via run_and_stream_logs.
- """
- metrics_actor_name = "metrics_actor"
- metrics_actor_namespace = "metrics_actor_namespace"
- metrics_actor = MetricsActor.options( # noqa: F841
- name=metrics_actor_name,
- namespace=metrics_actor_namespace,
- ).remote(
- expected_measurements_per_test=num_measurements_per_configuration,
- )
- print_disk_config()
- run_matrix = generate_test_matrix(
- num_cpus_in_cluster,
- num_gpus_in_cluster,
- num_tasks_or_actors_per_run,
- num_measurements_per_configuration,
- )
- print(f"List of tests: {run_matrix}")
- for test in random.sample(list(run_matrix), k=len(run_matrix)):
- print(f"Running test {test}")
- asyncio.run(
- run_and_stream_logs(
- metrics_actor_name,
- metrics_actor_namespace,
- test,
- )
- )
- @ray.remote(num_cpus=0)
- class MetricsActor:
- """
- Actor which tests will report metrics to.
- """
- def __init__(self, expected_measurements_per_test: int):
- self.measurements = defaultdict(list)
- self.expected_measurements_per_test = expected_measurements_per_test
- def submit(self, test_name: str, latency: float):
- print(f"got latency {latency} s for test {test_name}")
- self.measurements[test_name].append(latency)
- results = self.create_results_dict_from_measurements(
- self.measurements, self.expected_measurements_per_test
- )
- safe_write_to_results_json(results)
- assert (
- len(self.measurements[test_name]) <= self.expected_measurements_per_test
- ), (
- f"Expected {self.measurements[test_name]} to not have more elements than "
- f"{self.expected_measurements_per_test}"
- )
- @staticmethod
- def create_results_dict_from_measurements(
- all_measurements, expected_measurements_per_test
- ):
- results = {}
- perf_metrics = []
- for test_name, measurements in all_measurements.items():
- test_summary = {
- "measurements": measurements,
- }
- if len(measurements) == expected_measurements_per_test:
- median = statistics.median(measurements)
- test_summary["p50"] = median
- perf_metrics.append(
- {
- "perf_metric_name": f"p50.{test_name}",
- "perf_metric_value": median,
- "perf_metric_type": "LATENCY",
- }
- )
- results[test_name] = test_summary
- results["perf_metrics"] = perf_metrics
- return results
- def print_disk_config():
- print("Getting disk sizes via df -h")
- subprocess.check_call("df -h", shell=True)
- def generate_test_matrix(
- num_cpus_in_cluster: int,
- num_gpus_in_cluster: int,
- num_tasks_or_actors_per_run: int,
- num_measurements_per_test: int,
- ):
- num_repeated_jobs_or_runs = num_measurements_per_test
- total_num_tasks_or_actors = num_tasks_or_actors_per_run * num_repeated_jobs_or_runs
- num_jobs_per_type = {
- "cold_start": num_repeated_jobs_or_runs,
- "warm_start": 1,
- }
- imports_to_try = ["torch", "none"]
- tests = set()
- for with_tasks in [True, False]:
- for with_gpu in [True, False]:
- # Do not run without runtime env. TODO(cade) Infra team added cgroups to
- # default runtime env, need to find some way around that if we want
- # "pure" (non-runtime-env) measurements.
- for with_runtime_env in [True]:
- for import_to_try in imports_to_try:
- for num_jobs in num_jobs_per_type.values():
- num_tasks_or_actors_per_job = (
- total_num_tasks_or_actors // num_jobs
- )
- num_runs_per_job = (
- num_tasks_or_actors_per_job // num_tasks_or_actors_per_run
- )
- test = TestConfiguration(
- num_jobs=num_jobs,
- num_runs_per_job=num_runs_per_job,
- num_tasks_or_actors_per_run=num_tasks_or_actors_per_run,
- with_tasks=with_tasks,
- with_gpu=with_gpu,
- with_runtime_env=with_runtime_env,
- import_to_try=import_to_try,
- num_cpus_in_cluster=num_cpus_in_cluster,
- num_gpus_in_cluster=num_gpus_in_cluster,
- num_nodes_in_cluster=1,
- )
- tests.add(test)
- return tests
- @dataclass(eq=True, frozen=True)
- class TestConfiguration:
- num_jobs: int
- num_runs_per_job: int
- num_tasks_or_actors_per_run: int
- with_gpu: bool
- with_tasks: bool
- with_runtime_env: bool
- import_to_try: str
- num_cpus_in_cluster: int
- num_gpus_in_cluster: int
- num_nodes_in_cluster: int
- def __repr__(self):
- with_gpu_str = "with_gpu" if self.with_gpu else "without_gpu"
- executable_unit = "tasks" if self.with_tasks else "actors"
- cold_or_warm_start = "cold" if self.num_jobs > 1 else "warm"
- with_runtime_env_str = (
- "with_runtime_env" if self.with_runtime_env else "without_runtime_env"
- )
- single_node_or_multi_node = (
- "single_node" if self.num_nodes_in_cluster == 1 else "multi_node"
- )
- import_torch_or_none = (
- "import_torch" if self.import_to_try == "torch" else "no_import"
- )
- return "-".join(
- [
- f"seconds_to_{cold_or_warm_start}_start_"
- f"{self.num_tasks_or_actors_per_run}_{executable_unit}",
- import_torch_or_none,
- with_gpu_str,
- single_node_or_multi_node,
- with_runtime_env_str,
- f"{self.num_cpus_in_cluster}_CPU_{self.num_gpus_in_cluster}"
- "_GPU_cluster",
- ]
- )
- async def run_and_stream_logs(
- metrics_actor_name, metrics_actor_namespace, test: TestConfiguration
- ):
- """
- Run a particular test configuration by invoking ./test_single_configuration.py.
- """
- client = JobSubmissionClient("http://127.0.0.1:8265")
- entrypoint = generate_entrypoint(metrics_actor_name, metrics_actor_namespace, test)
- for _ in range(test.num_jobs):
- print(f"Running {entrypoint}")
- if not test.with_runtime_env:
- # On non-workspaces, this will run as a job but without a runtime env.
- subprocess.check_call(entrypoint, shell=True)
- else:
- job_id = client.submit_job(
- entrypoint=entrypoint,
- runtime_env={"working_dir": "./"},
- )
- try:
- async for lines in client.tail_job_logs(job_id):
- print(lines, end="")
- except KeyboardInterrupt:
- print(f"Stopping job {job_id}")
- client.stop_job(job_id)
- raise
- job_status = client.get_job_status(job_id)
- if job_status != JobStatus.SUCCEEDED:
- raise ValueError(
- f"Job {job_id} was not successful; status is {job_status}"
- )
- def generate_entrypoint(
- metrics_actor_name: str, metrics_actor_namespace: str, test: TestConfiguration
- ):
- task_or_actor_arg = "--with_tasks" if test.with_tasks else "--with_actors"
- with_gpu_arg = "--with_gpu" if test.with_gpu else "--without_gpu"
- with_runtime_env_arg = (
- "--with_runtime_env" if test.with_runtime_env else "--without_runtime_env"
- )
- return " ".join(
- [
- "python ./test_single_configuration.py",
- f"--metrics_actor_name {metrics_actor_name}",
- f"--metrics_actor_namespace {metrics_actor_namespace}",
- f"--test_name {test}",
- f"--num_runs {test.num_runs_per_job} ",
- f"--num_tasks_or_actors_per_run {test.num_tasks_or_actors_per_run}",
- f"--num_cpus_in_cluster {test.num_cpus_in_cluster}",
- f"--num_gpus_in_cluster {test.num_gpus_in_cluster}",
- task_or_actor_arg,
- with_gpu_arg,
- with_runtime_env_arg,
- f"--library_to_import {test.import_to_try}",
- ]
- )
- def parse_args():
- parser = argparse.ArgumentParser(
- description="This release test measures Ray worker startup time. "
- "Specifically, it measures the time to start N different tasks or"
- " actors, where each task or actor imports a large library ("
- "currently PyTorch). N is configurable.\nThe test runs under a "
- "few different configurations: {task, actor} x {runtime env, "
- "no runtime env} x {GPU, no GPU} x {cold start, warm start} x "
- "{import torch, no imports}.",
- epilog="This script uses test_single_configuration.py to run the "
- "actual measurements.",
- )
- parser.add_argument(
- "--num_gpus_in_cluster",
- type=int,
- required=True,
- help="The number of GPUs in the cluster. This determines how many "
- "GPU resources each actor/task requests.",
- )
- parser.add_argument(
- "--num_cpus_in_cluster",
- type=int,
- required=True,
- help="The number of CPUs in the cluster. This determines how many "
- "CPU resources each actor/task requests.",
- )
- parser.add_argument(
- "--num_tasks_or_actors_per_run",
- type=int,
- required=True,
- help="The number of tasks or actors per 'run'. A run starts this "
- "many tasks/actors and consitutes a single measurement. Several "
- "runs can be composed within a single job for measure warm start, "
- "or spread across different jobs to measure cold start.",
- )
- parser.add_argument(
- "--num_measurements_per_configuration",
- type=int,
- required=True,
- help="The number of measurements to record per configuration.",
- )
- return parser.parse_args()
- if __name__ == "__main__":
- args = parse_args()
- sys.exit(
- main(
- args.num_cpus_in_cluster,
- args.num_gpus_in_cluster,
- args.num_tasks_or_actors_per_run,
- args.num_measurements_per_configuration,
- )
- )
|