123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- # This workload stresses distributed reference counting by passing and
- # returning serialized ObjectRefs.
- import time
- import random
- import numpy as np
- import ray
- from ray.cluster_utils import Cluster
- from ray._private.test_utils import safe_write_to_results_json
- def update_progress(result):
- result["last_update"] = time.time()
- safe_write_to_results_json(result)
- num_redis_shards = 5
- redis_max_memory = 10**8
- object_store_memory = 10**8
- num_nodes = 10
- message = (
- "Make sure there is enough memory on this machine to run this "
- "workload. We divide the system memory by 2 to provide a buffer."
- )
- assert (
- num_nodes * object_store_memory + num_redis_shards * redis_max_memory
- < ray._private.utils.get_system_memory() / 2
- ), message
- # Simulate a cluster on one machine.
- cluster = Cluster()
- for i in range(num_nodes):
- cluster.add_node(
- redis_port=6379 if i == 0 else None,
- num_redis_shards=num_redis_shards if i == 0 else None,
- num_cpus=2,
- num_gpus=0,
- resources={str(i): 2},
- object_store_memory=object_store_memory,
- redis_max_memory=redis_max_memory,
- dashboard_host="0.0.0.0",
- )
- ray.init(address=cluster.address)
- # Run the workload.
- @ray.remote(max_retries=0)
- def churn():
- return ray.put(np.zeros(1024 * 1024, dtype=np.uint8))
- @ray.remote(max_retries=0)
- def child(*xs):
- obj_ref = ray.put(np.zeros(1024 * 1024, dtype=np.uint8))
- return obj_ref
- @ray.remote(max_retries=0)
- def f(*xs):
- if xs:
- return random.choice(xs)
- else:
- return child.remote(*xs)
- iteration = 0
- ids = []
- start_time = time.time()
- previous_time = start_time
- while True:
- for _ in range(50):
- new_constrained_ids = [
- f._remote(args=ids, resources={str(i % num_nodes): 1}) for i in range(25)
- ]
- new_unconstrained_ids = [f.remote(*ids) for _ in range(25)]
- ids = new_constrained_ids + new_unconstrained_ids
- # Fill the object store while the tasks are running.
- for i in range(num_nodes):
- for _ in range(10):
- [
- churn._remote(args=[], resources={str(i % num_nodes): 1})
- for _ in range(10)
- ]
- # Make sure that the objects are still available.
- child_ids = ray.get(ids)
- for child_id in child_ids:
- ray.get(child_id)
- new_time = time.time()
- print(
- "Iteration {}:\n"
- " - Iteration time: {}.\n"
- " - Absolute time: {}.\n"
- " - Total elapsed time: {}.".format(
- iteration, new_time - previous_time, new_time, new_time - start_time
- )
- )
- previous_time = new_time
- iteration += 1
- update_progress(
- {
- "iteration": iteration,
- "iteration_time": new_time - previous_time,
- "absolute_time": new_time,
- "elapsed_time": new_time - start_time,
- }
- )
|