example.py 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. import sys
  2. import time
  3. from collections import Counter
  4. import ray
  5. @ray.remote
  6. def get_host_name(x):
  7. import platform
  8. import time
  9. time.sleep(0.01)
  10. return x + (platform.node(),)
  11. def wait_for_nodes(expected):
  12. # Wait for all nodes to join the cluster.
  13. while True:
  14. num_nodes = len(ray.nodes())
  15. if num_nodes < expected:
  16. print(
  17. "{} nodes have joined so far, waiting for {} more.".format(
  18. num_nodes, expected - num_nodes
  19. )
  20. )
  21. sys.stdout.flush()
  22. time.sleep(1)
  23. else:
  24. break
  25. def main():
  26. wait_for_nodes(4)
  27. # Check that objects can be transferred from each node to each other node.
  28. for i in range(10):
  29. print("Iteration {}".format(i))
  30. results = [get_host_name.remote(get_host_name.remote(())) for _ in range(100)]
  31. print(Counter(ray.get(results)))
  32. sys.stdout.flush()
  33. print("Success!")
  34. sys.stdout.flush()
  35. time.sleep(20)
  36. if __name__ == "__main__":
  37. ray.init(address="localhost:6379")
  38. main()