many_tasks_serialized_ids.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. # This workload stresses distributed reference counting by passing and
  2. # returning serialized ObjectRefs.
  3. import time
  4. import random
  5. import numpy as np
  6. import ray
  7. from ray.cluster_utils import Cluster
  8. from ray._private.test_utils import safe_write_to_results_json
  9. def update_progress(result):
  10. result["last_update"] = time.time()
  11. safe_write_to_results_json(result)
  12. num_redis_shards = 5
  13. redis_max_memory = 10**8
  14. object_store_memory = 10**8
  15. num_nodes = 10
  16. message = (
  17. "Make sure there is enough memory on this machine to run this "
  18. "workload. We divide the system memory by 2 to provide a buffer."
  19. )
  20. assert (
  21. num_nodes * object_store_memory + num_redis_shards * redis_max_memory
  22. < ray._private.utils.get_system_memory() / 2
  23. ), message
  24. # Simulate a cluster on one machine.
  25. cluster = Cluster()
  26. for i in range(num_nodes):
  27. cluster.add_node(
  28. redis_port=6379 if i == 0 else None,
  29. num_redis_shards=num_redis_shards if i == 0 else None,
  30. num_cpus=2,
  31. num_gpus=0,
  32. resources={str(i): 2},
  33. object_store_memory=object_store_memory,
  34. redis_max_memory=redis_max_memory,
  35. dashboard_host="0.0.0.0",
  36. )
  37. ray.init(address=cluster.address)
  38. # Run the workload.
  39. @ray.remote(max_retries=0)
  40. def churn():
  41. return ray.put(np.zeros(1024 * 1024, dtype=np.uint8))
  42. @ray.remote(max_retries=0)
  43. def child(*xs):
  44. obj_ref = ray.put(np.zeros(1024 * 1024, dtype=np.uint8))
  45. return obj_ref
  46. @ray.remote(max_retries=0)
  47. def f(*xs):
  48. if xs:
  49. return random.choice(xs)
  50. else:
  51. return child.remote(*xs)
  52. iteration = 0
  53. ids = []
  54. start_time = time.time()
  55. previous_time = start_time
  56. while True:
  57. for _ in range(50):
  58. new_constrained_ids = [
  59. f._remote(args=ids, resources={str(i % num_nodes): 1}) for i in range(25)
  60. ]
  61. new_unconstrained_ids = [f.remote(*ids) for _ in range(25)]
  62. ids = new_constrained_ids + new_unconstrained_ids
  63. # Fill the object store while the tasks are running.
  64. for i in range(num_nodes):
  65. for _ in range(10):
  66. [
  67. churn._remote(args=[], resources={str(i % num_nodes): 1})
  68. for _ in range(10)
  69. ]
  70. # Make sure that the objects are still available.
  71. child_ids = ray.get(ids)
  72. for child_id in child_ids:
  73. ray.get(child_id)
  74. new_time = time.time()
  75. print(
  76. "Iteration {}:\n"
  77. " - Iteration time: {}.\n"
  78. " - Absolute time: {}.\n"
  79. " - Total elapsed time: {}.".format(
  80. iteration, new_time - previous_time, new_time, new_time - start_time
  81. )
  82. )
  83. previous_time = new_time
  84. iteration += 1
  85. update_progress(
  86. {
  87. "iteration": iteration,
  88. "iteration_time": new_time - previous_time,
  89. "absolute_time": new_time,
  90. "elapsed_time": new_time - start_time,
  91. }
  92. )