serve_test_utils.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. #!/usr/bin/env python3
  2. import json
  3. import logging
  4. import os
  5. import random
  6. import ray
  7. import re
  8. import subprocess
  9. from collections import defaultdict
  10. from serve_test_cluster_utils import NUM_CPU_PER_NODE
  11. from subprocess import PIPE
  12. from typing import Dict, List, Optional, Union
  13. logger = logging.getLogger(__file__)
  14. DEFAULT_RELEASE_OUTPUT_PATH = "/tmp/release_test_out.json"
  15. def is_smoke_test():
  16. return os.environ.get("IS_SMOKE_TEST", "0") == "1"
  17. def parse_time_to_ms(time_string: str) -> float:
  18. """Given a time string with various unit, convert
  19. to ms in float:
  20. wrk time unit reference
  21. https://github.com/wg/wrk/blob/master/src/units.c#L17-L21
  22. Example:
  23. "71.91ms" -> 71.91
  24. "50us" -> 0.05
  25. "1.5s" -> 1500
  26. """
  27. # Group 1 - (one or more digits + optional dot + one or more digits)
  28. # 71.91 / 50 / 1.5
  29. # Group 2 - (All words)
  30. # ms / us / s
  31. parsed = re.split(r"(\d+.?\d+)(\w+)", time_string)
  32. values = [val for val in parsed if val]
  33. if values[1] == "ms":
  34. return float(values[0])
  35. elif values[1] == "us":
  36. return float(values[0]) / 1000
  37. elif values[1] == "s":
  38. return float(values[0]) * 1000
  39. # Should not return here in common benchmark
  40. return values[1]
  41. def parse_size_to_KB(size_string: str) -> float:
  42. """Given a size string with various unit, convert
  43. to KB in float:
  44. wrk binary unit reference
  45. https://github.com/wg/wrk/blob/master/src/units.c#L29-L33
  46. Example:
  47. "200.56KB" -> 200.56
  48. "50MB" -> 51200
  49. "0.5GB" -> 524288
  50. """
  51. # Group 1 - (one or more digits + optional dot + one or more digits)
  52. # 200.56 / 50 / 0.5
  53. # Group 2 - (All words)
  54. # KB / MB / GB
  55. parsed = re.split(r"(\d+.?\d+)(\w*)", size_string)
  56. values = [val for val in parsed if val]
  57. if values[1] == "KB":
  58. return float(values[0])
  59. elif values[1] == "MB":
  60. return float(values[0]) * 1024
  61. elif values[1] == "GB":
  62. return float(values[0]) * 1024 * 1024
  63. # Bytes
  64. return float(values[0]) / 1000
  65. def parse_metric_to_base(metric_string: str) -> float:
  66. """Given a metric string with various unit, convert
  67. to original base
  68. wrk metric unit reference
  69. https://github.com/wg/wrk/blob/master/src/units.c#L35-L39
  70. Example:
  71. "71.91" -> 71.91
  72. "1.32k" -> 1320
  73. "1.5M" -> 1500000
  74. """
  75. parsed = re.split(r"(\d+.?\d+)(\w*)", metric_string)
  76. values = [val for val in parsed if val]
  77. if len(values) == 1:
  78. return float(values[0])
  79. if values[1] == "k":
  80. return float(values[0]) * 1000
  81. elif values[1] == "M":
  82. return float(values[0]) * 1000 * 1000
  83. # Should not return here in common benchmark
  84. return values[1]
  85. def parse_wrk_decoded_stdout(decoded_out):
  86. """
  87. Parse decoded wrk stdout to a dictionary.
  88. # Sample wrk stdout:
  89. #
  90. Running 10s test @ http://127.0.0.1:8000/echo
  91. 8 threads and 96 connections
  92. Thread Stats Avg Stdev Max +/- Stdev
  93. Latency 72.32ms 6.00ms 139.00ms 91.60%
  94. Req/Sec 165.99 34.84 242.00 57.20%
  95. Latency Distribution
  96. 50% 70.78ms
  97. 75% 72.59ms
  98. 90% 75.67ms
  99. 99% 98.71ms
  100. 13306 requests in 10.10s, 1.95MB read
  101. Requests/sec: 1317.73
  102. Transfer/sec: 198.19KB
  103. Returns:
  104. {'latency_avg_ms': 72.32, 'latency_stdev_ms': 6.0,
  105. 'latency_max_ms': 139.0, 'latency_+/-_stdev %': 91.6,
  106. 'req/sec_avg': 165.99, 'req/sec_stdev': 34.84,
  107. 'req/sec_max': 242.0, 'req/sec_+/-_stdev %': 57.2,
  108. 'P50_latency_ms': 70.78, 'P75_latency_ms': 72.59,
  109. 'P90_latency_ms': 75.67, 'P99_latency_ms': 98.71,
  110. 'requests/sec': 1317.73, 'transfer/sec_KB': 198.19
  111. """
  112. metrics_dict = {}
  113. for line in decoded_out.splitlines():
  114. parsed = re.split(r"\s+", line.strip())
  115. # Statistics section
  116. # Thread Stats Avg Stdev Max +/- Stdev
  117. # Latency 72.32ms 6.00ms 139.00ms 91.60%
  118. # Req/Sec 165.99 34.84 242.00 57.20%
  119. if parsed[0] == "Latency" and len(parsed) == 5:
  120. metrics_dict["per_thread_latency_avg_ms"] = parse_time_to_ms(parsed[1])
  121. metrics_dict["per_thread_latency_max_ms"] = parse_time_to_ms(parsed[3])
  122. elif parsed[0] == "Req/Sec" and len(parsed) == 5:
  123. metrics_dict["per_thread_tps"] = parse_metric_to_base(parsed[1])
  124. metrics_dict["per_thread_max_tps"] = parse_metric_to_base(parsed[3])
  125. # Latency Distribution header, ignored
  126. elif parsed[0] == "Latency" and parsed[1] == "Distribution":
  127. continue
  128. # Percentile section
  129. # 50% 70.78ms
  130. # 75% 72.59ms
  131. # 90% 75.67ms
  132. # 99% 98.71ms
  133. elif parsed[0] == "50%":
  134. metrics_dict["P50_latency_ms"] = parse_time_to_ms(parsed[1])
  135. elif parsed[0] == "75%":
  136. metrics_dict["P75_latency_ms"] = parse_time_to_ms(parsed[1])
  137. elif parsed[0] == "90%":
  138. metrics_dict["P90_latency_ms"] = parse_time_to_ms(parsed[1])
  139. elif parsed[0] == "99%":
  140. metrics_dict["P99_latency_ms"] = parse_time_to_ms(parsed[1])
  141. # Total requests and transfer (might have timeout too)
  142. # 13306 requests in 10.10s, 1.95MB read
  143. elif len(parsed) >= 6 and parsed[1] == "requests":
  144. metrics_dict["per_node_total_thoughput"] = int(parsed[0])
  145. metrics_dict["per_node_total_transfer_KB"] = parse_size_to_KB(parsed[4])
  146. # Socket errors: connect 0, read 0, write 0, timeout 100
  147. elif parsed[0] == "Socket" and parsed[1] == "errors:":
  148. metrics_dict["per_node_total_timeout_requests"] = parse_metric_to_base(
  149. parsed[-1]
  150. )
  151. # Summary section
  152. # Requests/sec: 1317.73
  153. # Transfer/sec: 198.19KB
  154. elif parsed[0] == "Requests/sec:":
  155. metrics_dict["per_nodel_tps"] = parse_metric_to_base(parsed[1])
  156. elif parsed[0] == "Transfer/sec:":
  157. metrics_dict["per_node_transfer_per_sec_KB"] = parse_size_to_KB(parsed[1])
  158. return metrics_dict
  159. @ray.remote
  160. def run_one_wrk_trial(
  161. trial_length: str,
  162. num_connections: int,
  163. http_host: str,
  164. http_port: str,
  165. endpoint: str = "",
  166. ) -> None:
  167. proc = subprocess.Popen(
  168. [
  169. "wrk",
  170. "-c",
  171. str(num_connections),
  172. "-t",
  173. str(NUM_CPU_PER_NODE),
  174. "-d",
  175. trial_length,
  176. "--latency",
  177. f"http://{http_host}:{http_port}/{endpoint}",
  178. ],
  179. stdout=PIPE,
  180. stderr=PIPE,
  181. )
  182. proc.wait()
  183. out, err = proc.communicate()
  184. if err.decode() != "":
  185. logger.error(err.decode())
  186. return out.decode()
  187. def aggregate_all_metrics(metrics_from_all_nodes: Dict[str, List[Union[float, int]]]):
  188. num_nodes = len(metrics_from_all_nodes["per_nodel_tps"])
  189. return {
  190. # Per thread metrics
  191. "per_thread_latency_avg_ms": round(
  192. sum(metrics_from_all_nodes["per_thread_latency_avg_ms"]) / num_nodes, 2
  193. ),
  194. "per_thread_latency_max_ms": max(
  195. metrics_from_all_nodes["per_thread_latency_max_ms"]
  196. ),
  197. "per_thread_avg_tps": round(
  198. sum(metrics_from_all_nodes["per_thread_tps"]) / num_nodes, 2
  199. ),
  200. "per_thread_max_tps": max(metrics_from_all_nodes["per_thread_max_tps"]),
  201. # Per wrk node metrics
  202. "per_node_avg_tps": round(
  203. sum(metrics_from_all_nodes["per_nodel_tps"]) / num_nodes, 2
  204. ),
  205. "per_node_avg_transfer_per_sec_KB": round(
  206. sum(metrics_from_all_nodes["per_node_transfer_per_sec_KB"]) / num_nodes, 2
  207. ),
  208. # Cluster metrics
  209. "cluster_total_thoughput": sum(
  210. metrics_from_all_nodes["per_node_total_thoughput"]
  211. ),
  212. "cluster_total_transfer_KB": sum(
  213. metrics_from_all_nodes["per_node_total_transfer_KB"]
  214. ),
  215. "cluster_total_timeout_requests": sum(
  216. metrics_from_all_nodes["per_node_total_timeout_requests"]
  217. ),
  218. "cluster_max_P50_latency_ms": max(metrics_from_all_nodes["P50_latency_ms"]),
  219. "cluster_max_P75_latency_ms": max(metrics_from_all_nodes["P75_latency_ms"]),
  220. "cluster_max_P90_latency_ms": max(metrics_from_all_nodes["P90_latency_ms"]),
  221. "cluster_max_P99_latency_ms": max(metrics_from_all_nodes["P99_latency_ms"]),
  222. }
  223. def run_wrk_on_all_nodes(
  224. trial_length: str,
  225. num_connections: int,
  226. http_host: str,
  227. http_port: str,
  228. all_endpoints: List[str] = None,
  229. ignore_output: bool = False,
  230. exclude_head: bool = False,
  231. debug: bool = False,
  232. ):
  233. """
  234. Use ray task to run one wrk trial on each node alive, picked randomly
  235. from all available deployments.
  236. Returns:
  237. all_metrics: (Dict[str, List[Union[float, int]]]) Parsed wrk metrics
  238. from each wrk on each running node
  239. all_wrk_stdout: (List[str]) decoded stdout of each wrk trial for per
  240. node checks at the end of experiment
  241. """
  242. all_metrics = defaultdict(list)
  243. all_wrk_stdout = []
  244. rst_ray_refs = []
  245. for node in ray.nodes():
  246. if exclude_head and node["Resources"].get("node:__internal_head__") == 1.0:
  247. continue
  248. if node["Alive"]:
  249. node_resource = f"node:{node['NodeManagerAddress']}"
  250. # Randomly pick one from all available endpoints in ray cluster
  251. endpoint = random.choice(all_endpoints)
  252. rst_ray_refs.append(
  253. run_one_wrk_trial.options(
  254. num_cpus=0, resources={node_resource: 0.01}
  255. ).remote(trial_length, num_connections, http_host, http_port, endpoint)
  256. )
  257. print("Waiting for wrk trials to finish...")
  258. ray.wait(rst_ray_refs, num_returns=len(rst_ray_refs))
  259. print("Trials finished!")
  260. if ignore_output:
  261. return
  262. for i, decoded_output in enumerate(ray.get(rst_ray_refs)):
  263. if debug:
  264. print(f"decoded_output {i}: {decoded_output}")
  265. all_wrk_stdout.append(decoded_output)
  266. parsed_metrics = parse_wrk_decoded_stdout(decoded_output)
  267. # Per thread metrics
  268. all_metrics["per_thread_latency_avg_ms"].append(
  269. parsed_metrics["per_thread_latency_avg_ms"]
  270. )
  271. all_metrics["per_thread_latency_max_ms"].append(
  272. parsed_metrics["per_thread_latency_max_ms"]
  273. )
  274. all_metrics["per_thread_tps"].append(parsed_metrics["per_thread_tps"])
  275. all_metrics["per_thread_max_tps"].append(parsed_metrics["per_thread_max_tps"])
  276. # Per node metrics
  277. all_metrics["P50_latency_ms"].append(parsed_metrics["P50_latency_ms"])
  278. all_metrics["P75_latency_ms"].append(parsed_metrics["P75_latency_ms"])
  279. all_metrics["P90_latency_ms"].append(parsed_metrics["P90_latency_ms"])
  280. all_metrics["P99_latency_ms"].append(parsed_metrics["P99_latency_ms"])
  281. all_metrics["per_node_total_thoughput"].append(
  282. parsed_metrics["per_node_total_thoughput"]
  283. )
  284. all_metrics["per_node_total_transfer_KB"].append(
  285. parsed_metrics["per_node_total_transfer_KB"]
  286. )
  287. all_metrics["per_nodel_tps"].append(parsed_metrics["per_nodel_tps"])
  288. all_metrics["per_node_transfer_per_sec_KB"].append(
  289. parsed_metrics["per_node_transfer_per_sec_KB"]
  290. )
  291. all_metrics["per_node_total_timeout_requests"].append(
  292. parsed_metrics.get("per_node_total_timeout_requests", 0)
  293. )
  294. return all_metrics, all_wrk_stdout
  295. def save_test_results(
  296. test_results: Dict,
  297. output_path: Optional[str] = None,
  298. ):
  299. results_file_path = output_path or os.environ.get(
  300. "TEST_OUTPUT_JSON", DEFAULT_RELEASE_OUTPUT_PATH
  301. )
  302. with open(results_file_path, "wt") as f:
  303. json.dump(test_results, f)