many_tasks.py 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. # This workload tests submitting and getting many tasks over and over.
  2. import time
  3. import numpy as np
  4. import ray
  5. from ray.cluster_utils import Cluster
  6. from ray._private.test_utils import safe_write_to_results_json
  7. def update_progress(result):
  8. result["last_update"] = time.time()
  9. safe_write_to_results_json(result)
  10. num_redis_shards = 5
  11. redis_max_memory = 10**8
  12. object_store_memory = 10**8
  13. num_nodes = 10
  14. message = (
  15. "Make sure there is enough memory on this machine to run this "
  16. "workload. We divide the system memory by 2 to provide a buffer."
  17. )
  18. assert (
  19. num_nodes * object_store_memory + num_redis_shards * redis_max_memory
  20. < ray._private.utils.get_system_memory() / 2
  21. ), message
  22. # Simulate a cluster on one machine.
  23. cluster = Cluster()
  24. for i in range(num_nodes):
  25. cluster.add_node(
  26. redis_port=6379 if i == 0 else None,
  27. num_redis_shards=num_redis_shards if i == 0 else None,
  28. num_cpus=2,
  29. num_gpus=0,
  30. resources={str(i): 2},
  31. object_store_memory=object_store_memory,
  32. redis_max_memory=redis_max_memory,
  33. dashboard_host="0.0.0.0",
  34. )
  35. ray.init(address=cluster.address)
  36. # Run the workload.
  37. @ray.remote
  38. def f(*xs):
  39. return np.zeros(1024, dtype=np.uint8)
  40. iteration = 0
  41. ids = []
  42. start_time = time.time()
  43. previous_time = start_time
  44. while True:
  45. for _ in range(50):
  46. new_constrained_ids = [
  47. f._remote(args=[*ids], resources={str(i % num_nodes): 1}) for i in range(25)
  48. ]
  49. new_unconstrained_ids = [f.remote(*ids) for _ in range(25)]
  50. ids = new_constrained_ids + new_unconstrained_ids
  51. ray.get(ids)
  52. new_time = time.time()
  53. print(
  54. "Iteration {}:\n"
  55. " - Iteration time: {}.\n"
  56. " - Absolute time: {}.\n"
  57. " - Total elapsed time: {}.".format(
  58. iteration, new_time - previous_time, new_time, new_time - start_time
  59. )
  60. )
  61. update_progress(
  62. {
  63. "iteration": iteration,
  64. "iteration_time": new_time - previous_time,
  65. "absolute_time": new_time,
  66. "elapsed_time": new_time - start_time,
  67. }
  68. )
  69. previous_time = new_time
  70. iteration += 1