123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- import ray
- from ray._private.test_utils import wait_for_condition
- from ray.autoscaler.v2.tests.util import (
- NodeCountCheck,
- TotalResourceCheck,
- check_cluster,
- )
- import time
- from logger import logger
- ray.init("auto")
- # Sync with the compute config.
- HEAD_NODE_CPU = 0
- WORKER_NODE_CPU = 4
- IDLE_TERMINATION_S = 60 * 5 # 5 min
- DEFAULT_RETRY_INTERVAL_MS = 15 * 1000 # 15 sec
- ctx = {
- "num_cpus": 0,
- "num_nodes": 1,
- }
- logger.info(f"Starting cluster with {ctx['num_nodes']} nodes, {ctx['num_cpus']} cpus")
- check_cluster(
- [
- NodeCountCheck(ctx["num_nodes"]),
- TotalResourceCheck({"CPU": ctx["num_cpus"]}),
- ]
- )
- # Request for cluster resources
- def test_request_cluster_resources(ctx: dict):
- from ray.autoscaler._private.commands import request_resources
- request_resources(num_cpus=8)
- ctx["num_cpus"] += 8
- ctx["num_nodes"] += 8 // WORKER_NODE_CPU
- # Assert on number of worker nodes.
- logger.info(
- f"Requesting cluster constraints: {ctx['num_nodes']} nodes, "
- f"{ctx['num_cpus']} cpus"
- )
- wait_for_condition(
- check_cluster,
- timeout=60 * 5, # 5min
- retry_interval_ms=DEFAULT_RETRY_INTERVAL_MS,
- targets=[
- NodeCountCheck(ctx["num_nodes"]),
- TotalResourceCheck({"CPU": ctx["num_cpus"]}),
- ],
- )
- # Reset the cluster constraints.
- request_resources(num_cpus=0)
- ctx["num_cpus"] -= 8
- ctx["num_nodes"] -= 8 // WORKER_NODE_CPU
- logger.info(
- f"Waiting for cluster go idle after constraint cleared: {ctx['num_nodes']} "
- f"nodes, {ctx['num_cpus']} cpus"
- )
- wait_for_condition(
- check_cluster,
- timeout=60 + IDLE_TERMINATION_S, # 1min + idle timeout
- retry_interval_ms=DEFAULT_RETRY_INTERVAL_MS,
- targets=[
- NodeCountCheck(ctx["num_nodes"]),
- TotalResourceCheck({"CPU": ctx["num_cpus"]}),
- ],
- )
- # Run actors/tasks that exceed the cluster resources should upscale the cluster
- def test_run_tasks_concurrent(ctx: dict):
- num_tasks = 2
- num_actors = 2
- @ray.remote(num_cpus=WORKER_NODE_CPU)
- def f():
- while True:
- time.sleep(1)
- @ray.remote(num_cpus=WORKER_NODE_CPU)
- class Actor:
- def __init__(self):
- pass
- tasks = [f.remote() for _ in range(num_tasks)]
- actors = [Actor.remote() for _ in range(num_actors)]
- ctx["num_cpus"] += (num_tasks + num_actors) * WORKER_NODE_CPU
- ctx["num_nodes"] += num_tasks + num_actors
- logger.info(f"Waiting for {ctx['num_nodes']} nodes, {ctx['num_cpus']} cpus")
- wait_for_condition(
- check_cluster,
- timeout=60 * 5, # 5min
- retry_interval_ms=DEFAULT_RETRY_INTERVAL_MS,
- targets=[
- NodeCountCheck(ctx["num_nodes"]),
- TotalResourceCheck({"CPU": ctx["num_cpus"]}),
- ],
- )
- [ray.cancel(task) for task in tasks]
- [ray.kill(actor) for actor in actors]
- ctx["num_cpus"] -= (num_actors + num_tasks) * WORKER_NODE_CPU
- ctx["num_nodes"] -= num_actors + num_tasks
- logger.info(
- f"Waiting for cluster to scale down to {ctx['num_nodes']} nodes, "
- f"{ctx['num_cpus']} cpus"
- )
- wait_for_condition(
- check_cluster,
- timeout=60 + IDLE_TERMINATION_S,
- retry_interval_ms=DEFAULT_RETRY_INTERVAL_MS,
- targets=[
- NodeCountCheck(ctx["num_nodes"]),
- TotalResourceCheck({"CPU": ctx["num_cpus"]}),
- ],
- )
- test_request_cluster_resources(ctx)
- test_run_tasks_concurrent(ctx)
|