benchmark_worker_startup.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. #!/usr/bin/env python3
  2. """
  3. $ ./benchmark_worker_startup.py --help
  4. usage: benchmark_worker_startup.py [-h] --num_gpus_in_cluster
  5. NUM_GPUS_IN_CLUSTER
  6. --num_cpus_in_cluster
  7. NUM_CPUS_IN_CLUSTER
  8. --num_tasks_or_actors_per_run
  9. NUM_TASKS_OR_ACTORS_PER_RUN
  10. --num_measurements_per_configuration
  11. NUM_MEASUREMENTS_PER_CONFIGURATION
  12. This release test measures Ray worker startup time. Specifically, it
  13. measures the time to start N different tasks or actors, where each task or
  14. actor imports a large library (currently PyTorch). N is configurable. The
  15. test runs under a few different configurations: {task, actor} x {runtime
  16. env, no runtime env} x {GPU, no GPU} x {cold start, warm start} x {import
  17. torch, no imports}.
  18. options:
  19. -h, --help show this help message and exit
  20. --num_gpus_in_cluster NUM_GPUS_IN_CLUSTER
  21. The number of GPUs in the cluster. This determines
  22. how many GPU resources each actor/task requests.
  23. --num_cpus_in_cluster NUM_CPUS_IN_CLUSTER
  24. The number of CPUs in the cluster. This determines
  25. how many CPU resources each actor/task requests.
  26. --num_tasks_or_actors_per_run NUM_TASKS_OR_ACTORS_PER_RUN
  27. The number of tasks or actors per 'run'. A run
  28. starts this many tasks/actors and consitutes a
  29. single measurement. Several runs can be composed
  30. within a single job for measure warm start, or
  31. spread across different jobs to measure cold start.
  32. --num_measurements_per_configuration NUM_MEASUREMENTS_PER_CONFIGURATION
  33. The number of measurements to record per
  34. configuration.
  35. This script uses test_single_configuration.py to run the actual
  36. measurements.
  37. """
  38. from collections import defaultdict
  39. from dataclasses import dataclass
  40. from ray._private.test_utils import safe_write_to_results_json
  41. from ray.job_submission import JobSubmissionClient, JobStatus
  42. import argparse
  43. import asyncio
  44. import random
  45. import ray
  46. import statistics
  47. import subprocess
  48. import sys
  49. def main(
  50. num_cpus_in_cluster: int,
  51. num_gpus_in_cluster: int,
  52. num_tasks_or_actors_per_run: int,
  53. num_measurements_per_configuration: int,
  54. ):
  55. """
  56. Generate test cases, then run them in random order via run_and_stream_logs.
  57. """
  58. metrics_actor_name = "metrics_actor"
  59. metrics_actor_namespace = "metrics_actor_namespace"
  60. metrics_actor = MetricsActor.options( # noqa: F841
  61. name=metrics_actor_name,
  62. namespace=metrics_actor_namespace,
  63. ).remote(
  64. expected_measurements_per_test=num_measurements_per_configuration,
  65. )
  66. print_disk_config()
  67. run_matrix = generate_test_matrix(
  68. num_cpus_in_cluster,
  69. num_gpus_in_cluster,
  70. num_tasks_or_actors_per_run,
  71. num_measurements_per_configuration,
  72. )
  73. print(f"List of tests: {run_matrix}")
  74. for test in random.sample(list(run_matrix), k=len(run_matrix)):
  75. print(f"Running test {test}")
  76. asyncio.run(
  77. run_and_stream_logs(
  78. metrics_actor_name,
  79. metrics_actor_namespace,
  80. test,
  81. )
  82. )
  83. @ray.remote(num_cpus=0)
  84. class MetricsActor:
  85. """
  86. Actor which tests will report metrics to.
  87. """
  88. def __init__(self, expected_measurements_per_test: int):
  89. self.measurements = defaultdict(list)
  90. self.expected_measurements_per_test = expected_measurements_per_test
  91. def submit(self, test_name: str, latency: float):
  92. print(f"got latency {latency} s for test {test_name}")
  93. self.measurements[test_name].append(latency)
  94. results = self.create_results_dict_from_measurements(
  95. self.measurements, self.expected_measurements_per_test
  96. )
  97. safe_write_to_results_json(results)
  98. assert (
  99. len(self.measurements[test_name]) <= self.expected_measurements_per_test
  100. ), (
  101. f"Expected {self.measurements[test_name]} to not have more elements than "
  102. f"{self.expected_measurements_per_test}"
  103. )
  104. @staticmethod
  105. def create_results_dict_from_measurements(
  106. all_measurements, expected_measurements_per_test
  107. ):
  108. results = {}
  109. perf_metrics = []
  110. for test_name, measurements in all_measurements.items():
  111. test_summary = {
  112. "measurements": measurements,
  113. }
  114. if len(measurements) == expected_measurements_per_test:
  115. median = statistics.median(measurements)
  116. test_summary["p50"] = median
  117. perf_metrics.append(
  118. {
  119. "perf_metric_name": f"p50.{test_name}",
  120. "perf_metric_value": median,
  121. "perf_metric_type": "LATENCY",
  122. }
  123. )
  124. results[test_name] = test_summary
  125. results["perf_metrics"] = perf_metrics
  126. return results
  127. def print_disk_config():
  128. print("Getting disk sizes via df -h")
  129. subprocess.check_call("df -h", shell=True)
  130. def generate_test_matrix(
  131. num_cpus_in_cluster: int,
  132. num_gpus_in_cluster: int,
  133. num_tasks_or_actors_per_run: int,
  134. num_measurements_per_test: int,
  135. ):
  136. num_repeated_jobs_or_runs = num_measurements_per_test
  137. total_num_tasks_or_actors = num_tasks_or_actors_per_run * num_repeated_jobs_or_runs
  138. num_jobs_per_type = {
  139. "cold_start": num_repeated_jobs_or_runs,
  140. "warm_start": 1,
  141. }
  142. imports_to_try = ["torch", "none"]
  143. tests = set()
  144. for with_tasks in [True, False]:
  145. for with_gpu in [True, False]:
  146. # Do not run without runtime env. TODO(cade) Infra team added cgroups to
  147. # default runtime env, need to find some way around that if we want
  148. # "pure" (non-runtime-env) measurements.
  149. for with_runtime_env in [True]:
  150. for import_to_try in imports_to_try:
  151. for num_jobs in num_jobs_per_type.values():
  152. num_tasks_or_actors_per_job = (
  153. total_num_tasks_or_actors // num_jobs
  154. )
  155. num_runs_per_job = (
  156. num_tasks_or_actors_per_job // num_tasks_or_actors_per_run
  157. )
  158. test = TestConfiguration(
  159. num_jobs=num_jobs,
  160. num_runs_per_job=num_runs_per_job,
  161. num_tasks_or_actors_per_run=num_tasks_or_actors_per_run,
  162. with_tasks=with_tasks,
  163. with_gpu=with_gpu,
  164. with_runtime_env=with_runtime_env,
  165. import_to_try=import_to_try,
  166. num_cpus_in_cluster=num_cpus_in_cluster,
  167. num_gpus_in_cluster=num_gpus_in_cluster,
  168. num_nodes_in_cluster=1,
  169. )
  170. tests.add(test)
  171. return tests
  172. @dataclass(eq=True, frozen=True)
  173. class TestConfiguration:
  174. num_jobs: int
  175. num_runs_per_job: int
  176. num_tasks_or_actors_per_run: int
  177. with_gpu: bool
  178. with_tasks: bool
  179. with_runtime_env: bool
  180. import_to_try: str
  181. num_cpus_in_cluster: int
  182. num_gpus_in_cluster: int
  183. num_nodes_in_cluster: int
  184. def __repr__(self):
  185. with_gpu_str = "with_gpu" if self.with_gpu else "without_gpu"
  186. executable_unit = "tasks" if self.with_tasks else "actors"
  187. cold_or_warm_start = "cold" if self.num_jobs > 1 else "warm"
  188. with_runtime_env_str = (
  189. "with_runtime_env" if self.with_runtime_env else "without_runtime_env"
  190. )
  191. single_node_or_multi_node = (
  192. "single_node" if self.num_nodes_in_cluster == 1 else "multi_node"
  193. )
  194. import_torch_or_none = (
  195. "import_torch" if self.import_to_try == "torch" else "no_import"
  196. )
  197. return "-".join(
  198. [
  199. f"seconds_to_{cold_or_warm_start}_start_"
  200. f"{self.num_tasks_or_actors_per_run}_{executable_unit}",
  201. import_torch_or_none,
  202. with_gpu_str,
  203. single_node_or_multi_node,
  204. with_runtime_env_str,
  205. f"{self.num_cpus_in_cluster}_CPU_{self.num_gpus_in_cluster}"
  206. "_GPU_cluster",
  207. ]
  208. )
  209. async def run_and_stream_logs(
  210. metrics_actor_name, metrics_actor_namespace, test: TestConfiguration
  211. ):
  212. """
  213. Run a particular test configuration by invoking ./test_single_configuration.py.
  214. """
  215. client = JobSubmissionClient("http://127.0.0.1:8265")
  216. entrypoint = generate_entrypoint(metrics_actor_name, metrics_actor_namespace, test)
  217. for _ in range(test.num_jobs):
  218. print(f"Running {entrypoint}")
  219. if not test.with_runtime_env:
  220. # On non-workspaces, this will run as a job but without a runtime env.
  221. subprocess.check_call(entrypoint, shell=True)
  222. else:
  223. job_id = client.submit_job(
  224. entrypoint=entrypoint,
  225. runtime_env={"working_dir": "./"},
  226. )
  227. try:
  228. async for lines in client.tail_job_logs(job_id):
  229. print(lines, end="")
  230. except KeyboardInterrupt:
  231. print(f"Stopping job {job_id}")
  232. client.stop_job(job_id)
  233. raise
  234. job_status = client.get_job_status(job_id)
  235. if job_status != JobStatus.SUCCEEDED:
  236. raise ValueError(
  237. f"Job {job_id} was not successful; status is {job_status}"
  238. )
  239. def generate_entrypoint(
  240. metrics_actor_name: str, metrics_actor_namespace: str, test: TestConfiguration
  241. ):
  242. task_or_actor_arg = "--with_tasks" if test.with_tasks else "--with_actors"
  243. with_gpu_arg = "--with_gpu" if test.with_gpu else "--without_gpu"
  244. with_runtime_env_arg = (
  245. "--with_runtime_env" if test.with_runtime_env else "--without_runtime_env"
  246. )
  247. return " ".join(
  248. [
  249. "python ./test_single_configuration.py",
  250. f"--metrics_actor_name {metrics_actor_name}",
  251. f"--metrics_actor_namespace {metrics_actor_namespace}",
  252. f"--test_name {test}",
  253. f"--num_runs {test.num_runs_per_job} ",
  254. f"--num_tasks_or_actors_per_run {test.num_tasks_or_actors_per_run}",
  255. f"--num_cpus_in_cluster {test.num_cpus_in_cluster}",
  256. f"--num_gpus_in_cluster {test.num_gpus_in_cluster}",
  257. task_or_actor_arg,
  258. with_gpu_arg,
  259. with_runtime_env_arg,
  260. f"--library_to_import {test.import_to_try}",
  261. ]
  262. )
  263. def parse_args():
  264. parser = argparse.ArgumentParser(
  265. description="This release test measures Ray worker startup time. "
  266. "Specifically, it measures the time to start N different tasks or"
  267. " actors, where each task or actor imports a large library ("
  268. "currently PyTorch). N is configurable.\nThe test runs under a "
  269. "few different configurations: {task, actor} x {runtime env, "
  270. "no runtime env} x {GPU, no GPU} x {cold start, warm start} x "
  271. "{import torch, no imports}.",
  272. epilog="This script uses test_single_configuration.py to run the "
  273. "actual measurements.",
  274. )
  275. parser.add_argument(
  276. "--num_gpus_in_cluster",
  277. type=int,
  278. required=True,
  279. help="The number of GPUs in the cluster. This determines how many "
  280. "GPU resources each actor/task requests.",
  281. )
  282. parser.add_argument(
  283. "--num_cpus_in_cluster",
  284. type=int,
  285. required=True,
  286. help="The number of CPUs in the cluster. This determines how many "
  287. "CPU resources each actor/task requests.",
  288. )
  289. parser.add_argument(
  290. "--num_tasks_or_actors_per_run",
  291. type=int,
  292. required=True,
  293. help="The number of tasks or actors per 'run'. A run starts this "
  294. "many tasks/actors and consitutes a single measurement. Several "
  295. "runs can be composed within a single job for measure warm start, "
  296. "or spread across different jobs to measure cold start.",
  297. )
  298. parser.add_argument(
  299. "--num_measurements_per_configuration",
  300. type=int,
  301. required=True,
  302. help="The number of measurements to record per configuration.",
  303. )
  304. return parser.parse_args()
  305. if __name__ == "__main__":
  306. args = parse_args()
  307. sys.exit(
  308. main(
  309. args.num_cpus_in_cluster,
  310. args.num_gpus_in_cluster,
  311. args.num_tasks_or_actors_per_run,
  312. args.num_measurements_per_configuration,
  313. )
  314. )