test_core.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. import ray
  2. from ray._private.test_utils import wait_for_condition
  3. from ray.autoscaler.v2.tests.util import (
  4. NodeCountCheck,
  5. TotalResourceCheck,
  6. check_cluster,
  7. )
  8. import time
  9. from logger import logger
  10. ray.init("auto")
  11. # Sync with the compute config.
  12. HEAD_NODE_CPU = 0
  13. WORKER_NODE_CPU = 4
  14. IDLE_TERMINATION_S = 60 * 5 # 5 min
  15. DEFAULT_RETRY_INTERVAL_MS = 15 * 1000 # 15 sec
  16. ctx = {
  17. "num_cpus": 0,
  18. "num_nodes": 1,
  19. }
  20. logger.info(f"Starting cluster with {ctx['num_nodes']} nodes, {ctx['num_cpus']} cpus")
  21. check_cluster(
  22. [
  23. NodeCountCheck(ctx["num_nodes"]),
  24. TotalResourceCheck({"CPU": ctx["num_cpus"]}),
  25. ]
  26. )
  27. # Request for cluster resources
  28. def test_request_cluster_resources(ctx: dict):
  29. from ray.autoscaler._private.commands import request_resources
  30. request_resources(num_cpus=8)
  31. ctx["num_cpus"] += 8
  32. ctx["num_nodes"] += 8 // WORKER_NODE_CPU
  33. # Assert on number of worker nodes.
  34. logger.info(
  35. f"Requesting cluster constraints: {ctx['num_nodes']} nodes, "
  36. f"{ctx['num_cpus']} cpus"
  37. )
  38. wait_for_condition(
  39. check_cluster,
  40. timeout=60 * 5, # 5min
  41. retry_interval_ms=DEFAULT_RETRY_INTERVAL_MS,
  42. targets=[
  43. NodeCountCheck(ctx["num_nodes"]),
  44. TotalResourceCheck({"CPU": ctx["num_cpus"]}),
  45. ],
  46. )
  47. # Reset the cluster constraints.
  48. request_resources(num_cpus=0)
  49. ctx["num_cpus"] -= 8
  50. ctx["num_nodes"] -= 8 // WORKER_NODE_CPU
  51. logger.info(
  52. f"Waiting for cluster go idle after constraint cleared: {ctx['num_nodes']} "
  53. f"nodes, {ctx['num_cpus']} cpus"
  54. )
  55. wait_for_condition(
  56. check_cluster,
  57. timeout=60 + IDLE_TERMINATION_S, # 1min + idle timeout
  58. retry_interval_ms=DEFAULT_RETRY_INTERVAL_MS,
  59. targets=[
  60. NodeCountCheck(ctx["num_nodes"]),
  61. TotalResourceCheck({"CPU": ctx["num_cpus"]}),
  62. ],
  63. )
  64. # Run actors/tasks that exceed the cluster resources should upscale the cluster
  65. def test_run_tasks_concurrent(ctx: dict):
  66. num_tasks = 2
  67. num_actors = 2
  68. @ray.remote(num_cpus=WORKER_NODE_CPU)
  69. def f():
  70. while True:
  71. time.sleep(1)
  72. @ray.remote(num_cpus=WORKER_NODE_CPU)
  73. class Actor:
  74. def __init__(self):
  75. pass
  76. tasks = [f.remote() for _ in range(num_tasks)]
  77. actors = [Actor.remote() for _ in range(num_actors)]
  78. ctx["num_cpus"] += (num_tasks + num_actors) * WORKER_NODE_CPU
  79. ctx["num_nodes"] += num_tasks + num_actors
  80. logger.info(f"Waiting for {ctx['num_nodes']} nodes, {ctx['num_cpus']} cpus")
  81. wait_for_condition(
  82. check_cluster,
  83. timeout=60 * 5, # 5min
  84. retry_interval_ms=DEFAULT_RETRY_INTERVAL_MS,
  85. targets=[
  86. NodeCountCheck(ctx["num_nodes"]),
  87. TotalResourceCheck({"CPU": ctx["num_cpus"]}),
  88. ],
  89. )
  90. [ray.cancel(task) for task in tasks]
  91. [ray.kill(actor) for actor in actors]
  92. ctx["num_cpus"] -= (num_actors + num_tasks) * WORKER_NODE_CPU
  93. ctx["num_nodes"] -= num_actors + num_tasks
  94. logger.info(
  95. f"Waiting for cluster to scale down to {ctx['num_nodes']} nodes, "
  96. f"{ctx['num_cpus']} cpus"
  97. )
  98. wait_for_condition(
  99. check_cluster,
  100. timeout=60 + IDLE_TERMINATION_S,
  101. retry_interval_ms=DEFAULT_RETRY_INTERVAL_MS,
  102. targets=[
  103. NodeCountCheck(ctx["num_nodes"]),
  104. TotalResourceCheck({"CPU": ctx["num_cpus"]}),
  105. ],
  106. )
  107. test_request_cluster_resources(ctx)
  108. test_run_tasks_concurrent(ctx)