123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- import subprocess
- import click
- import json
- import os
- import time
- from logger import logger
- WORKLOAD_SCRIPTS = [
- "test_core.py",
- ]
- def setup_cluster():
- from ray.cluster_utils import AutoscalingCluster
- cluster = AutoscalingCluster(
- head_resources={"CPU": 0},
- worker_node_types={
- "type-1": {
- "resources": {"CPU": 4},
- "node_config": {},
- "min_workers": 0,
- "max_workers": 10,
- },
- },
- idle_timeout_minutes=1 * 0.1,
- )
- cluster.start(_system_config={"enable_autoscaler_v2": True})
- return cluster
- def run_test():
- failed_workloads = []
- for workload in WORKLOAD_SCRIPTS:
- # Run the python script.
- logger.info(f"Running workload {workload}:")
- try:
- subprocess.check_call(["python", workload])
- except subprocess.CalledProcessError as e:
- failed_workloads.append((workload, e))
- if failed_workloads:
- for workload, e in failed_workloads:
- logger.error(f"Workload {workload} failed with {e}")
- else:
- logger.info("All workloads passed!")
- @click.command()
- @click.option("--local", is_flag=True, help="Run locally.", default=False)
- def run(local):
- start_time = time.time()
- cluster = None
- try:
- if local:
- cluster = setup_cluster()
- run_test()
- cluster.shutdown()
- else:
- run_test()
- success = "1"
- except Exception as e:
- logger.error(f"Test failed with {e}")
- success = "0"
- finally:
- if cluster:
- cluster.shutdown()
- results = {
- "time": time.time() - start_time,
- "success": success,
- }
- if "TEST_OUTPUT_JSON" in os.environ:
- out_file = open(os.environ["TEST_OUTPUT_JSON"], "w")
- json.dump(results, out_file)
- print(json.dumps(results, indent=2))
- if __name__ == "__main__":
- run()
|