dashboard_test.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. import asyncio
  2. import time
  3. import urllib
  4. from typing import Dict, Optional, List
  5. from pprint import pprint
  6. import requests
  7. import ray
  8. import logging
  9. from collections import defaultdict
  10. from ray.util.state import list_nodes
  11. from ray._private.test_utils import fetch_prometheus_metrics
  12. from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
  13. from pydantic import BaseModel
  14. from ray.dashboard.consts import DASHBOARD_METRIC_PORT
  15. from ray.dashboard.utils import get_address_for_submission_client
  16. logger = logging.getLogger(__name__)
  17. def calc_p(latencies, percent):
  18. if len(latencies) == 0:
  19. return 0
  20. return round(sorted(latencies)[int(len(latencies) / 100 * percent)] * 1000, 3)
  21. class Result(BaseModel):
  22. success: bool
  23. # endpoints -> list of latencies
  24. result: Dict[str, List[float]]
  25. # Dashboard memory usage in MB.
  26. memory_mb: Optional[float]
  27. # Currently every endpoint is GET endpoints.
  28. endpoints = [
  29. "/logical/actors",
  30. "/nodes?view=summary",
  31. "/",
  32. "/api/cluster_status",
  33. "/events",
  34. "/api/jobs/",
  35. "/api/v0/logs",
  36. "/api/prometheus_health",
  37. ]
  38. @ray.remote(num_cpus=0)
  39. class DashboardTester:
  40. def __init__(self, interval_s: int = 1):
  41. self.dashboard_url = get_address_for_submission_client(None)
  42. # Ping interval for all endpoints.
  43. self.interval_s = interval_s
  44. # endpoint -> a list of latencies
  45. self.result = defaultdict(list)
  46. async def run(self):
  47. await asyncio.gather(*[self.ping(endpoint) for endpoint in endpoints])
  48. async def ping(self, endpoint):
  49. """Synchronously call an endpoint."""
  50. node_id = ray.get_runtime_context().get_node_id()
  51. while True:
  52. start = time.monotonic()
  53. # for logs API, we should append node ID and glob.
  54. if "/api/v0/logs" in endpoint:
  55. glob_filter = "*"
  56. options_dict = {"node_id": node_id, "glob": glob_filter}
  57. url = (
  58. f"{self.dashboard_url}{endpoint}?"
  59. f"{urllib.parse.urlencode(options_dict)}"
  60. )
  61. else:
  62. url = f"{self.dashboard_url}{endpoint}"
  63. resp = requests.get(url, timeout=30)
  64. elapsed = time.monotonic() - start
  65. if resp.status_code == 200:
  66. self.result[endpoint].append(time.monotonic() - start)
  67. else:
  68. try:
  69. resp.raise_for_status()
  70. except Exception as e:
  71. logger.exception(e)
  72. await asyncio.sleep(max(0, self.interval_s, elapsed))
  73. def get_result(self):
  74. return self.result
  75. class DashboardTestAtScale:
  76. """This is piggybacked into existing scalability tests."""
  77. def __init__(self, addr: ray._private.worker.RayContext):
  78. self.addr = addr
  79. # Schedule the actor on the current node (which is a head node).
  80. current_node_ip = ray._private.worker.global_worker.node_ip_address
  81. nodes = list_nodes(filters=[("node_ip", "=", current_node_ip)])
  82. assert len(nodes) > 0, f"{current_node_ip} not found in the cluster"
  83. node = nodes[0]
  84. # Schedule on a head node.
  85. self.tester = DashboardTester.options(
  86. scheduling_strategy=NodeAffinitySchedulingStrategy(
  87. node_id=node["node_id"], soft=False
  88. )
  89. ).remote()
  90. self.tester.run.remote()
  91. def get_result(self):
  92. """Get the result from the test.
  93. Returns:
  94. A tuple of success, and the result (Result object).
  95. """
  96. try:
  97. result = ray.get(self.tester.get_result.remote(), timeout=60)
  98. except ray.exceptions.GetTimeoutError:
  99. return Result(success=False)
  100. # Get the memory usage.
  101. dashboard_export_addr = "{}:{}".format(
  102. self.addr["raylet_ip_address"], DASHBOARD_METRIC_PORT
  103. )
  104. metrics = fetch_prometheus_metrics([dashboard_export_addr])
  105. memories = []
  106. for name, samples in metrics.items():
  107. if name == "ray_component_uss_mb":
  108. for sample in samples:
  109. if sample.labels["Component"] == "dashboard":
  110. memories.append(sample.value)
  111. return Result(
  112. success=True, result=result, memory_mb=max(memories) if memories else None
  113. )
  114. def update_release_test_result(self, release_result: dict):
  115. test_result = self.get_result()
  116. def calc_endpoints_p(result, percent):
  117. return {
  118. # sort -> get PX -> convert second to ms -> round up.
  119. endpoint: calc_p(latencies, percent)
  120. for endpoint, latencies in result.items()
  121. }
  122. print("======Print per dashboard endpoint latencies======")
  123. print("=====================P50==========================")
  124. pprint(calc_endpoints_p(test_result.result, 50))
  125. print("=====================P95==========================")
  126. pprint(calc_endpoints_p(test_result.result, 95))
  127. print("=====================P99==========================")
  128. pprint(calc_endpoints_p(test_result.result, 99))
  129. latencies = []
  130. for per_endpoint_latencies in test_result.result.values():
  131. latencies.extend(per_endpoint_latencies)
  132. aggregated_metrics = {
  133. "p50": calc_p(latencies, 50),
  134. "p95": calc_p(latencies, 95),
  135. "p99": calc_p(latencies, 99),
  136. }
  137. print("=====================Aggregated====================")
  138. pprint(aggregated_metrics)
  139. release_result["_dashboard_test_success"] = test_result.success
  140. if test_result.success:
  141. if "perf_metrics" not in release_result:
  142. release_result["perf_metrics"] = []
  143. release_result["perf_metrics"].extend(
  144. [
  145. {
  146. "perf_metric_name": f"dashboard_{p}_latency_ms",
  147. "perf_metric_value": value,
  148. "perf_metric_type": "LATENCY",
  149. }
  150. for p, value in aggregated_metrics.items()
  151. ]
  152. )
  153. release_result["_dashboard_memory_usage_mb"] = test_result.memory_mb