test_placement_group.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. # This is stress test to run placement group.
  2. # Please don't run it in the cluster
  3. # setup yet. This test uses the cluster util to simulate the
  4. # cluster environment.
  5. import time
  6. from time import perf_counter
  7. from random import random
  8. import json
  9. import logging
  10. import os
  11. import ray
  12. from ray.util.placement_group import placement_group, remove_placement_group
  13. from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
  14. logging.basicConfig(level=logging.INFO)
  15. logger = logging.getLogger(__name__)
  16. # TODO(sang): Increase the number in the actual stress test.
  17. # This number should be divisible by 3.
  18. RESOURCE_QUANTITY = 666
  19. NUM_NODES = 5
  20. CUSTOM_RESOURCES = {"pg_custom": RESOURCE_QUANTITY}
  21. # Create pg that uses 1 resource of cpu & custom resource.
  22. NUM_PG = RESOURCE_QUANTITY
  23. @ray.remote(num_cpus=0, resources={"pg_custom": 1}, max_calls=0)
  24. def mock_task():
  25. time.sleep(0.1)
  26. return True
  27. @ray.remote(num_cpus=0, resources={"pg_custom": 1}, max_restarts=0)
  28. class MockActor:
  29. def __init__(self):
  30. pass
  31. def ping(self):
  32. pass
  33. @ray.remote(num_cpus=0)
  34. def pg_launcher(pre_created_pgs, num_pgs_to_create):
  35. pgs = []
  36. pgs += pre_created_pgs
  37. for i in range(num_pgs_to_create):
  38. pgs.append(placement_group(BUNDLES, strategy="STRICT_SPREAD"))
  39. pgs_removed = []
  40. pgs_unremoved = []
  41. # Randomly choose placement groups to remove.
  42. for pg in pgs:
  43. if random() < 0.5:
  44. pgs_removed.append(pg)
  45. else:
  46. pgs_unremoved.append(pg)
  47. tasks = []
  48. max_actor_cnt = 5
  49. actor_cnt = 0
  50. actors = []
  51. # Randomly schedule tasks or actors on placement groups that
  52. # are not removed.
  53. for pg in pgs_unremoved:
  54. # TODO(sang): Comment in this line causes GCS actor management
  55. # failure. We need to fix it.
  56. if random() < 0.5:
  57. tasks.append(
  58. mock_task.options(
  59. scheduling_strategy=PlacementGroupSchedulingStrategy(
  60. placement_group=pg
  61. )
  62. ).remote()
  63. )
  64. else:
  65. if actor_cnt < max_actor_cnt:
  66. actors.append(
  67. MockActor.options(
  68. scheduling_strategy=PlacementGroupSchedulingStrategy(
  69. placement_group=pg
  70. )
  71. ).remote()
  72. )
  73. actor_cnt += 1
  74. # Remove the rest of placement groups.
  75. for pg in pgs_removed:
  76. remove_placement_group(pg)
  77. ray.get([pg.ready() for pg in pgs_unremoved])
  78. ray.get(tasks)
  79. ray.get([actor.ping.remote() for actor in actors])
  80. # Since placement groups are scheduled, remove them.
  81. for pg in pgs_unremoved:
  82. remove_placement_group(pg)
  83. if __name__ == "__main__":
  84. result = {"success": 0}
  85. # Wait until the expected number of nodes have joined the cluster.
  86. ray.init(address="auto")
  87. while True:
  88. num_nodes = len(ray.nodes())
  89. logger.info("Waiting for nodes {}/{}".format(num_nodes, NUM_NODES + 1))
  90. if num_nodes >= NUM_NODES + 1:
  91. break
  92. time.sleep(5)
  93. logger.info(
  94. "Nodes have all joined. There are %s resources.", ray.cluster_resources()
  95. )
  96. # Scenario 1: Create bunch of placement groups and measure how long
  97. # it takes.
  98. total_creating_time = 0
  99. total_removing_time = 0
  100. repeat = 1
  101. total_trial = repeat * NUM_PG
  102. BUNDLES = [{"pg_custom": 1}] * NUM_NODES
  103. # Create and remove placement groups.
  104. for _ in range(repeat):
  105. pgs = []
  106. for i in range(NUM_PG):
  107. start = perf_counter()
  108. pgs.append(placement_group(BUNDLES, strategy="PACK"))
  109. end = perf_counter()
  110. logger.info(f"append_group iteration {i}")
  111. total_creating_time += end - start
  112. ray.get([pg.ready() for pg in pgs])
  113. for i, pg in enumerate(pgs):
  114. start = perf_counter()
  115. remove_placement_group(pg)
  116. end = perf_counter()
  117. logger.info(f"remove_group iteration {i}")
  118. total_removing_time += end - start
  119. # Validate the correctness.
  120. assert ray.cluster_resources()["pg_custom"] == NUM_NODES * RESOURCE_QUANTITY
  121. # Scenario 2:
  122. # - Launch 30% of placement group in the driver and pass them.
  123. # - Launch 70% of placement group at each remote tasks.
  124. # - Randomly remove placement groups and schedule tasks and actors.
  125. #
  126. # Goal:
  127. # - Make sure jobs are done without breaking GCS server.
  128. # - Make sure all the resources are recovered after the job is done.
  129. # - Measure the creation latency in the stressful environment.
  130. pre_created_num_pgs = round(NUM_PG * 0.3)
  131. num_pgs_to_create = NUM_PG - pre_created_num_pgs
  132. pg_launchers = []
  133. for i in range(3):
  134. pre_created_pgs = [
  135. placement_group(BUNDLES, strategy="STRICT_SPREAD")
  136. for _ in range(pre_created_num_pgs // 3)
  137. ]
  138. pg_launchers.append(pg_launcher.remote(pre_created_pgs, num_pgs_to_create // 3))
  139. ray.get(pg_launchers)
  140. assert ray.cluster_resources()["pg_custom"] == NUM_NODES * RESOURCE_QUANTITY
  141. result["avg_pg_create_time_ms"] = total_creating_time / total_trial * 1000
  142. result["avg_pg_remove_time_ms"] = total_removing_time / total_trial * 1000
  143. result["success"] = 1
  144. result["perf_metrics"] = [
  145. {
  146. "perf_metric_name": "avg_pg_create_time_ms",
  147. "perf_metric_value": result["avg_pg_create_time_ms"],
  148. "perf_metric_type": "LATENCY",
  149. },
  150. {
  151. "perf_metric_name": "avg_pg_remove_time_ms",
  152. "perf_metric_value": result["avg_pg_remove_time_ms"],
  153. "perf_metric_type": "LATENCY",
  154. },
  155. ]
  156. print(
  157. "Avg placement group creating time: "
  158. f"{total_creating_time / total_trial * 1000} ms"
  159. )
  160. print(
  161. "Avg placement group removing time: "
  162. f"{total_removing_time / total_trial* 1000} ms"
  163. )
  164. print("PASSED.")
  165. with open(os.environ["TEST_OUTPUT_JSON"], "w") as out_put:
  166. out_put.write(json.dumps(result))