many_drivers.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. # This workload tests many drivers using the same cluster.
  2. import time
  3. import argparse
  4. import ray
  5. from ray.cluster_utils import Cluster
  6. from ray._private.test_utils import run_string_as_driver, safe_write_to_results_json
  7. def update_progress(result):
  8. result["last_update"] = time.time()
  9. safe_write_to_results_json(result)
  10. num_redis_shards = 5
  11. redis_max_memory = 10**8
  12. object_store_memory = 10**8
  13. num_nodes = 4
  14. message = (
  15. "Make sure there is enough memory on this machine to run this "
  16. "workload. We divide the system memory by 2 to provide a buffer."
  17. )
  18. assert (
  19. num_nodes * object_store_memory + num_redis_shards * redis_max_memory
  20. < ray._private.utils.get_system_memory() / 2
  21. ), message
  22. # Simulate a cluster on one machine.
  23. cluster = Cluster()
  24. for i in range(num_nodes):
  25. cluster.add_node(
  26. redis_port=6379 if i == 0 else None,
  27. num_redis_shards=num_redis_shards if i == 0 else None,
  28. num_cpus=4,
  29. num_gpus=0,
  30. resources={str(i): 5},
  31. object_store_memory=object_store_memory,
  32. redis_max_memory=redis_max_memory,
  33. dashboard_host="0.0.0.0",
  34. )
  35. ray.init(address=cluster.address)
  36. # Run the workload.
  37. # Define a driver script that runs a few tasks and actors on each node in the
  38. # cluster.
  39. driver_script = """
  40. import ray
  41. ray.init(address="{}")
  42. num_nodes = {}
  43. @ray.remote
  44. def f():
  45. return 1
  46. @ray.remote
  47. class Actor(object):
  48. def method(self):
  49. return 1
  50. for _ in range(5):
  51. for i in range(num_nodes):
  52. assert (ray.get(
  53. f._remote(args=[],
  54. kwargs={{}},
  55. resources={{str(i): 1}})) == 1)
  56. actor = Actor._remote(
  57. args=[], kwargs={{}}, resources={{str(i): 1}})
  58. assert ray.get(actor.method.remote()) == 1
  59. # Tests datasets doesn't leak workers.
  60. ray.data.range(100).map(lambda x: x).take()
  61. print("success")
  62. """.format(
  63. cluster.address, num_nodes
  64. )
  65. @ray.remote
  66. def run_driver():
  67. output = run_string_as_driver(driver_script, encode="utf-8")
  68. assert "success" in output
  69. iteration = 0
  70. running_ids = [
  71. run_driver._remote(args=[], kwargs={}, num_cpus=0, resources={str(i): 0.01})
  72. for i in range(num_nodes)
  73. ]
  74. start_time = time.time()
  75. previous_time = start_time
  76. parser = argparse.ArgumentParser(prog="Many Drivers long running tests")
  77. parser.add_argument(
  78. "--iteration-num", type=int, help="How many iterations to run", required=False
  79. )
  80. parser.add_argument(
  81. "--smoke-test",
  82. action="store_true",
  83. help="Whether or not the test is smoke test.",
  84. default=False,
  85. )
  86. args = parser.parse_args()
  87. iteration_num = args.iteration_num
  88. if args.smoke_test:
  89. iteration_num = 400
  90. while True:
  91. if iteration_num is not None and iteration_num < iteration:
  92. break
  93. # Wait for a driver to finish and start a new driver.
  94. [ready_id], running_ids = ray.wait(running_ids, num_returns=1)
  95. ray.get(ready_id)
  96. running_ids.append(
  97. run_driver._remote(
  98. args=[], kwargs={}, num_cpus=0, resources={str(iteration % num_nodes): 0.01}
  99. )
  100. )
  101. new_time = time.time()
  102. print(
  103. "Iteration {}:\n"
  104. " - Iteration time: {}.\n"
  105. " - Absolute time: {}.\n"
  106. " - Total elapsed time: {}.".format(
  107. iteration, new_time - previous_time, new_time, new_time - start_time
  108. )
  109. )
  110. update_progress(
  111. {
  112. "iteration": iteration,
  113. "iteration_time": new_time - previous_time,
  114. "absolute_time": new_time,
  115. "elapsed_time": new_time - start_time,
  116. }
  117. )
  118. previous_time = new_time
  119. iteration += 1