multi_deployment_1k_noop_replica.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. #!/usr/bin/env python3
  2. """
  3. Benchmark test for multi deployment at 1k no-op replica scale.
  4. 1) Start with a single head node.
  5. 2) Start 1000 deployments each with 10 no-op replicas
  6. 3) Launch wrk in each running node to simulate load balanced request
  7. 4) Recursively send queries to random deployments, up to depth=5
  8. 5) Run a 10-minute wrk trial on each node, aggregate results.
  9. Report:
  10. per_thread_latency_avg_ms
  11. per_thread_latency_max_ms
  12. per_thread_avg_tps
  13. per_thread_max_tps
  14. per_node_avg_tps
  15. per_node_avg_transfer_per_sec_KB
  16. cluster_total_thoughput
  17. cluster_total_transfer_KB
  18. cluster_max_P50_latency_ms
  19. cluster_max_P75_latency_ms
  20. cluster_max_P90_latency_ms
  21. cluster_max_P99_latency_ms
  22. """
  23. import click
  24. import logging
  25. import math
  26. import random
  27. from typing import List, Optional
  28. from starlette.requests import Request
  29. from ray import serve
  30. from serve_test_utils import (
  31. aggregate_all_metrics,
  32. run_wrk_on_all_nodes,
  33. save_test_results,
  34. is_smoke_test,
  35. )
  36. from serve_test_cluster_utils import (
  37. setup_local_single_node_cluster,
  38. setup_anyscale_cluster,
  39. NUM_CPU_PER_NODE,
  40. NUM_CONNECTIONS,
  41. )
  42. logger = logging.getLogger(__file__)
  43. logging.basicConfig(level=logging.INFO)
  44. # Experiment configs
  45. DEFAULT_SMOKE_TEST_NUM_REPLICA = 4
  46. DEFAULT_SMOKE_TEST_NUM_DEPLOYMENTS = 4 # 1 replicas each
  47. # TODO:(jiaodong) We should investigate and change this back to 1k
  48. # for now, we won't get valid latency numbers from wrk at 1k replica
  49. # likely due to request timeout.
  50. DEFAULT_FULL_TEST_NUM_REPLICA = 1000
  51. # TODO(simon): we should change this back to 100. But due to long poll issue
  52. # we temporarily downscoped this test.
  53. # https://github.com/ray-project/ray/pull/20270
  54. DEFAULT_FULL_TEST_NUM_DEPLOYMENTS = 10 # 100 replicas each
  55. # Experiment configs - wrk specific
  56. DEFAULT_SMOKE_TEST_TRIAL_LENGTH = "5s"
  57. DEFAULT_FULL_TEST_TRIAL_LENGTH = "10m"
  58. def setup_multi_deployment_replicas(num_replicas, num_deployments) -> List[str]:
  59. num_replica_per_deployment = num_replicas // num_deployments
  60. all_deployment_names = [f"Echo_{i+1}" for i in range(num_deployments)]
  61. ray_actor_options = {"num_cpus": 1}
  62. if not is_smoke_test():
  63. ray_actor_options["resources"] = {"worker": 0.01}
  64. @serve.deployment(
  65. num_replicas=num_replica_per_deployment, ray_actor_options=ray_actor_options
  66. )
  67. class Echo:
  68. def __init__(self):
  69. self.all_app_async_handles = []
  70. async def get_random_async_handle(self):
  71. # sync get_handle() and expected to be called only a few times
  72. # during deployment warmup so each deployment has reference to
  73. # all other handles to send recursive inference call
  74. if len(self.all_app_async_handles) < len(all_deployment_names):
  75. applications = list(serve.status().applications.keys())
  76. self.all_app_async_handles = [
  77. serve.get_app_handle(app) for app in applications
  78. ]
  79. return random.choice(self.all_app_async_handles)
  80. async def handle_request(self, body: bytes, depth: int):
  81. # Max recursive call depth reached
  82. if depth > 4:
  83. return "hi"
  84. next_async_handle = await self.get_random_async_handle()
  85. fut = next_async_handle.handle_request.remote(body, depth + 1)
  86. return await fut
  87. async def __call__(self, request: Request):
  88. return await self.handle_request(await request.body(), 0)
  89. for name in all_deployment_names:
  90. serve.run(Echo.bind(), name=name, route_prefix=f"/{name}")
  91. return all_deployment_names
  92. @click.command()
  93. @click.option("--num-replicas", type=int)
  94. @click.option("--num-deployments", type=int)
  95. @click.option("--trial-length", type=str)
  96. def main(
  97. num_replicas: Optional[int],
  98. num_deployments: Optional[int],
  99. trial_length: Optional[str],
  100. ):
  101. # Give default cluster parameter values based on smoke_test config
  102. # if user provided values explicitly, use them instead.
  103. # IS_SMOKE_TEST is set by args of releaser's e2e.py
  104. if is_smoke_test():
  105. num_replicas = num_replicas or DEFAULT_SMOKE_TEST_NUM_REPLICA
  106. num_deployments = num_deployments or DEFAULT_SMOKE_TEST_NUM_DEPLOYMENTS
  107. trial_length = trial_length or DEFAULT_SMOKE_TEST_TRIAL_LENGTH
  108. logger.info(
  109. f"Running smoke test with {num_replicas} replicas, "
  110. f"{num_deployments} deployments .. \n"
  111. )
  112. # Choose cluster setup based on user config. Local test uses Cluster()
  113. # to mock actors that requires # of nodes to be specified, but ray
  114. # client doesn't need to
  115. num_nodes = int(math.ceil(num_replicas / NUM_CPU_PER_NODE))
  116. logger.info(f"Setting up local ray cluster with {num_nodes} nodes .. \n")
  117. serve_client = setup_local_single_node_cluster(num_nodes)[0]
  118. else:
  119. num_replicas = num_replicas or DEFAULT_FULL_TEST_NUM_REPLICA
  120. num_deployments = num_deployments or DEFAULT_FULL_TEST_NUM_DEPLOYMENTS
  121. trial_length = trial_length or DEFAULT_FULL_TEST_TRIAL_LENGTH
  122. logger.info(
  123. f"Running full test with {num_replicas} replicas, "
  124. f"{num_deployments} deployments .. \n"
  125. )
  126. logger.info("Setting up anyscale ray cluster .. \n")
  127. serve_client = setup_anyscale_cluster()
  128. http_host = str(serve_client._http_config.host)
  129. http_port = str(serve_client._http_config.port)
  130. logger.info(f"Ray serve http_host: {http_host}, http_port: {http_port}")
  131. logger.info(f"Deploying with {num_replicas} target replicas ....\n")
  132. all_endpoints = setup_multi_deployment_replicas(num_replicas, num_deployments)
  133. logger.info("Warming up cluster...\n")
  134. run_wrk_on_all_nodes(
  135. DEFAULT_SMOKE_TEST_TRIAL_LENGTH,
  136. NUM_CONNECTIONS,
  137. http_host,
  138. http_port,
  139. all_endpoints=all_endpoints,
  140. ignore_output=True,
  141. exclude_head=not is_smoke_test(),
  142. debug=True,
  143. )
  144. logger.info(f"Starting wrk trial on all nodes for {trial_length} ....\n")
  145. # For detailed discussion, see https://github.com/wg/wrk/issues/205
  146. # TODO:(jiaodong) What's the best number to use here ?
  147. all_metrics, all_wrk_stdout = run_wrk_on_all_nodes(
  148. trial_length,
  149. NUM_CONNECTIONS,
  150. http_host,
  151. http_port,
  152. all_endpoints=all_endpoints,
  153. exclude_head=not is_smoke_test(),
  154. debug=True,
  155. )
  156. aggregated_metrics = aggregate_all_metrics(all_metrics)
  157. logger.info("Wrk stdout on each node: ")
  158. for wrk_stdout in all_wrk_stdout:
  159. logger.info(wrk_stdout)
  160. logger.info("Final aggregated metrics: ")
  161. for key, val in aggregated_metrics.items():
  162. logger.info(f"{key}: {val}")
  163. save_test_results(aggregated_metrics)
  164. if __name__ == "__main__":
  165. main()
  166. import pytest
  167. import sys
  168. sys.exit(pytest.main(["-v", "-s", __file__]))