test_single_configuration.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. #!/usr/bin/env python3
  2. """
  3. Helper file for benchmark_worker_startup.py. This file runs a particular test
  4. configuration.
  5. """
  6. import argparse
  7. import ray
  8. import sys
  9. import time
  10. @ray.remote
  11. class Actor:
  12. def run_code(self, should_import_torch: bool):
  13. if should_import_torch:
  14. import torch # noqa: F401
  15. @ray.remote
  16. def task(should_import_torch: bool):
  17. if should_import_torch:
  18. import torch # noqa: F401
  19. def main(
  20. metrics_actor,
  21. test_name: str,
  22. num_runs: int,
  23. num_tasks_or_actors_per_run: int,
  24. num_cpus_in_cluster: int,
  25. num_gpus_in_cluster: int,
  26. library_to_import: str,
  27. use_actors: bool,
  28. with_gpu: bool,
  29. with_runtime_env: bool,
  30. ):
  31. num_gpus = (num_gpus_in_cluster / num_tasks_or_actors_per_run) if with_gpu else 0
  32. num_cpus = num_cpus_in_cluster / num_tasks_or_actors_per_run
  33. print(f"Assigning each task/actor {num_cpus} num_cpus and {num_gpus} num_gpus")
  34. actor_with_resources = Actor.options(num_gpus=num_gpus, num_cpus=num_cpus)
  35. task_with_resources = task.options(num_gpus=num_gpus, num_cpus=num_cpus)
  36. should_import_torch = library_to_import == "torch"
  37. print(f"should_import_torch: {should_import_torch}")
  38. fail_if_incorrect_runtime_env(expect_runtime_env=with_runtime_env)
  39. def with_actors():
  40. actors = [
  41. actor_with_resources.remote() for _ in range(num_tasks_or_actors_per_run)
  42. ]
  43. ray.get([actor.run_code.remote(should_import_torch) for actor in actors])
  44. def with_tasks():
  45. ray.get(
  46. [
  47. task_with_resources.remote(should_import_torch)
  48. for _ in range(num_tasks_or_actors_per_run)
  49. ]
  50. )
  51. func_to_measure = with_actors if use_actors else with_tasks
  52. for run in range(num_runs):
  53. print(f"Starting measurement for run {run}")
  54. start = time.time()
  55. func_to_measure()
  56. dur_s = time.time() - start
  57. ray.get(metrics_actor.submit.remote(test_name, dur_s))
  58. def fail_if_incorrect_runtime_env(expect_runtime_env: bool):
  59. ctx = ray.runtime_context.get_runtime_context()
  60. print(f"Found runtime_env={ctx.runtime_env}")
  61. if expect_runtime_env and ctx.runtime_env == {}:
  62. raise AssertionError(
  63. f"Expected a runtime environment but found runtime_env={ctx.runtime_env}"
  64. )
  65. if not expect_runtime_env and ctx.runtime_env != {}:
  66. raise AssertionError(
  67. f"Expected no runtime environment but found runtime_env={ctx.runtime_env}"
  68. )
  69. def parse_args():
  70. parser = argparse.ArgumentParser()
  71. parser.add_argument("--metrics_actor_name", type=str, required=True)
  72. parser.add_argument("--metrics_actor_namespace", type=str, required=True)
  73. parser.add_argument("--test_name", type=str, required=True)
  74. parser.add_argument("--num_runs", type=int, required=True)
  75. parser.add_argument("--num_tasks_or_actors_per_run", type=int, required=True)
  76. parser.add_argument("--num_cpus_in_cluster", type=int, required=True)
  77. parser.add_argument("--num_gpus_in_cluster", type=int, required=True)
  78. parser.add_argument(
  79. "--library_to_import", type=str, required=True, choices=["torch", "none"]
  80. )
  81. group = parser.add_mutually_exclusive_group(required=True)
  82. group.add_argument("--with_actors", action="store_true")
  83. group.add_argument("--with_tasks", action="store_true")
  84. group = parser.add_mutually_exclusive_group(required=True)
  85. group.add_argument("--with_gpu", action="store_true")
  86. group.add_argument("--without_gpu", action="store_true")
  87. group = parser.add_mutually_exclusive_group(required=True)
  88. group.add_argument("--with_runtime_env", action="store_true")
  89. group.add_argument("--without_runtime_env", action="store_true")
  90. return parser.parse_args()
  91. if __name__ == "__main__":
  92. args = parse_args()
  93. metrics_actor = ray.get_actor(
  94. args.metrics_actor_name,
  95. args.metrics_actor_namespace,
  96. )
  97. sys.exit(
  98. main(
  99. metrics_actor,
  100. args.test_name,
  101. args.num_runs,
  102. args.num_tasks_or_actors_per_run,
  103. args.num_cpus_in_cluster,
  104. args.num_gpus_in_cluster,
  105. args.library_to_import,
  106. args.with_actors,
  107. args.with_gpu,
  108. args.with_runtime_env,
  109. )
  110. )