serve_test_cluster_utils.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. #!/usr/bin/env python3
  2. import logging
  3. import ray
  4. import requests
  5. from ray._private.test_utils import monitor_memory_usage
  6. from ray.cluster_utils import Cluster
  7. from ray import serve
  8. from ray.serve.config import DeploymentMode
  9. from ray.serve.context import _get_global_client
  10. logger = logging.getLogger(__file__)
  11. # Cluster setup configs
  12. NUM_CPU_PER_NODE = 10
  13. NUM_CONNECTIONS = 10
  14. def setup_local_single_node_cluster(
  15. num_nodes: int,
  16. num_cpu_per_node=NUM_CPU_PER_NODE,
  17. namespace="serve",
  18. ):
  19. """Setup ray cluster locally via ray.init() and Cluster()
  20. Each actor is simulated in local process on single node,
  21. thus smaller scale by default.
  22. """
  23. cluster = Cluster()
  24. for i in range(num_nodes):
  25. cluster.add_node(
  26. redis_port=6380 if i == 0 else None,
  27. num_cpus=num_cpu_per_node,
  28. num_gpus=0,
  29. resources={str(i): 2, "proxy": 1},
  30. )
  31. ray.init(address=cluster.address, dashboard_host="0.0.0.0", namespace=namespace)
  32. serve.start(proxy_location=DeploymentMode.EveryNode)
  33. return _get_global_client(), cluster
  34. def setup_anyscale_cluster():
  35. """Setup ray cluster at anyscale via ray.client()
  36. Note this is by default large scale and should be kicked off
  37. less frequently.
  38. """
  39. # TODO: Ray client didn't work with releaser script yet because
  40. # we cannot connect to anyscale cluster from its headnode
  41. # ray.client().env({}).connect()
  42. ray.init(
  43. address="auto",
  44. # This flag can be enabled to debug node autoscaler events.
  45. # But the cluster scaling has been stable for now, so we turn it off
  46. # to reduce spam.
  47. runtime_env={"env_vars": {"SERVE_ENABLE_SCALING_LOG": "0"}},
  48. )
  49. serve.start(proxy_location=DeploymentMode.EveryNode)
  50. # Print memory usage on the head node to help diagnose/debug memory leaks.
  51. monitor_memory_usage()
  52. return _get_global_client()
  53. @ray.remote
  54. def warm_up_one_cluster(
  55. num_warmup_iterations: int,
  56. http_host: str,
  57. http_port: str,
  58. endpoint: str,
  59. nonblocking: bool = False,
  60. ) -> None:
  61. # Specifying a low timeout effectively makes requests.get() nonblocking
  62. timeout = 0.0001 if nonblocking else None
  63. logger.info(f"Warming up {endpoint} ..")
  64. for _ in range(num_warmup_iterations):
  65. try:
  66. resp = requests.get(
  67. f"http://{http_host}:{http_port}/{endpoint}", timeout=timeout
  68. ).text
  69. logger.info(resp)
  70. except requests.exceptions.ReadTimeout:
  71. # This exception only gets raised if a timeout is specified in the
  72. # requests.get() call.
  73. logger.info("Issued nonblocking HTTP request.")
  74. return endpoint