123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236 |
- import numpy as np
- import time
- import ray
- import ray.autoscaler.sdk
- from ray._private.test_utils import Semaphore
- import json
- import os
- from time import perf_counter
- from tqdm import trange, tqdm
- MAX_ARGS = 10000
- MAX_RETURNS = 3000
- MAX_RAY_GET_ARGS = 10000
- MAX_QUEUED_TASKS = 1_000_000
- MAX_RAY_GET_SIZE = 100 * 2 ** 30
- def assert_no_leaks():
- total = ray.cluster_resources()
- current = ray.available_resources()
- total.pop("memory")
- total.pop("object_store_memory")
- current.pop("memory")
- current.pop("object_store_memory")
- assert total == current, (total, current)
- def test_many_args():
- @ray.remote
- def sum_args(*args):
- return sum(sum(arg) for arg in args)
- args = [[1 for _ in range(10000)] for _ in range(MAX_ARGS)]
- result = ray.get(sum_args.remote(*args))
- assert result == MAX_ARGS * 10000
- def test_many_returns():
- @ray.remote(num_returns=MAX_RETURNS)
- def f():
- to_return = []
- for _ in range(MAX_RETURNS):
- obj = list(range(10000))
- to_return.append(obj)
- return tuple(to_return)
- returned_refs = f.remote()
- assert len(returned_refs) == MAX_RETURNS
- for ref in returned_refs:
- expected = list(range(10000))
- obj = ray.get(ref)
- assert obj == expected
- def test_ray_get_args():
- def with_dese():
- print("Putting test objects:")
- refs = []
- for _ in trange(MAX_RAY_GET_ARGS):
- obj = list(range(10000))
- refs.append(ray.put(obj))
- print("Getting objects")
- results = ray.get(refs)
- assert len(results) == MAX_RAY_GET_ARGS
- print("Asserting correctness")
- for obj in tqdm(results):
- expected = list(range(10000))
- assert obj == expected
- def with_zero_copy():
- print("Putting test objects:")
- refs = []
- for _ in trange(MAX_RAY_GET_ARGS):
- obj = np.arange(10000)
- refs.append(ray.put(obj))
- print("Getting objects")
- results = ray.get(refs)
- assert len(results) == MAX_RAY_GET_ARGS
- print("Asserting correctness")
- for obj in tqdm(results):
- expected = np.arange(10000)
- assert (obj == expected).all()
- with_dese()
- print("Done with dese")
- with_zero_copy()
- print("Done with zero copy")
- def test_many_queued_tasks():
- sema = Semaphore.remote(0)
- @ray.remote(num_cpus=1)
- def block():
- ray.get(sema.acquire.remote())
- @ray.remote(num_cpus=1)
- def f():
- pass
- num_cpus = int(ray.cluster_resources()["CPU"])
- blocked_tasks = []
- for _ in range(num_cpus):
- blocked_tasks.append(block.remote())
- print("Submitting many tasks")
- pending_tasks = []
- for _ in trange(MAX_QUEUED_TASKS):
- pending_tasks.append(f.remote())
- # Make sure all the tasks can actually run.
- for _ in range(num_cpus):
- sema.release.remote()
- print("Unblocking tasks")
- for ref in tqdm(pending_tasks):
- assert ray.get(ref) is None
- def test_large_object():
- print("Generating object")
- obj = np.zeros(MAX_RAY_GET_SIZE, dtype=np.int8)
- print("Putting object")
- ref = ray.put(obj)
- del obj
- print("Getting object")
- big_obj = ray.get(ref)
- assert big_obj[0] == 0
- assert big_obj[-1] == 0
- ray.init(address="auto")
- args_start = perf_counter()
- test_many_args()
- args_end = perf_counter()
- time.sleep(5)
- assert_no_leaks()
- print("Finished many args")
- returns_start = perf_counter()
- test_many_returns()
- returns_end = perf_counter()
- time.sleep(5)
- assert_no_leaks()
- print("Finished many returns")
- get_start = perf_counter()
- test_ray_get_args()
- get_end = perf_counter()
- time.sleep(5)
- assert_no_leaks()
- print("Finished ray.get on many objects")
- queued_start = perf_counter()
- test_many_queued_tasks()
- queued_end = perf_counter()
- time.sleep(5)
- assert_no_leaks()
- print("Finished queueing many tasks")
- large_object_start = perf_counter()
- test_large_object()
- large_object_end = perf_counter()
- time.sleep(5)
- assert_no_leaks()
- print("Done")
- args_time = args_end - args_start
- returns_time = returns_end - returns_start
- get_time = get_end - get_start
- queued_time = queued_end - queued_start
- large_object_time = large_object_end - large_object_start
- print(f"Many args time: {args_time} ({MAX_ARGS} args)")
- print(f"Many returns time: {returns_time} ({MAX_RETURNS} returns)")
- print(f"Ray.get time: {get_time} ({MAX_RAY_GET_ARGS} args)")
- print(f"Queued task time: {queued_time} ({MAX_QUEUED_TASKS} tasks)")
- print(f"Ray.get large object time: {large_object_time} " f"({MAX_RAY_GET_SIZE} bytes)")
- if "TEST_OUTPUT_JSON" in os.environ:
- out_file = open(os.environ["TEST_OUTPUT_JSON"], "w")
- results = {
- "args_time": args_time,
- "num_args": MAX_ARGS,
- "returns_time": returns_time,
- "num_returns": MAX_RETURNS,
- "get_time": get_time,
- "num_get_args": MAX_RAY_GET_ARGS,
- "queued_time": queued_time,
- "num_queued": MAX_QUEUED_TASKS,
- "large_object_time": large_object_time,
- "large_object_size": MAX_RAY_GET_SIZE,
- "success": "1",
- }
- results["perf_metrics"] = [
- {
- "perf_metric_name": f"{MAX_ARGS}_args_time",
- "perf_metric_value": args_time,
- "perf_metric_type": "LATENCY",
- },
- {
- "perf_metric_name": f"{MAX_RETURNS}_returns_time",
- "perf_metric_value": returns_time,
- "perf_metric_type": "LATENCY",
- },
- {
- "perf_metric_name": f"{MAX_RAY_GET_ARGS}_get_time",
- "perf_metric_value": get_time,
- "perf_metric_type": "LATENCY",
- },
- {
- "perf_metric_name": f"{MAX_QUEUED_TASKS}_queued_time",
- "perf_metric_value": queued_time,
- "perf_metric_type": "LATENCY",
- },
- {
- "perf_metric_name": f"{MAX_RAY_GET_SIZE}_large_object_time",
- "perf_metric_value": large_object_time,
- "perf_metric_type": "LATENCY",
- },
- ]
- json.dump(results, out_file)
|