reporter_agent.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640
  1. import asyncio
  2. import datetime
  3. import json
  4. import logging
  5. import os
  6. import socket
  7. import subprocess
  8. import sys
  9. import traceback
  10. import warnings
  11. import aioredis
  12. import ray
  13. import ray.dashboard.modules.reporter.reporter_consts as reporter_consts
  14. from ray.dashboard import k8s_utils
  15. import ray.dashboard.utils as dashboard_utils
  16. import ray.experimental.internal_kv as internal_kv
  17. from ray._private.gcs_pubsub import gcs_pubsub_enabled, GcsAioPublisher
  18. import ray._private.services
  19. import ray._private.utils
  20. from ray._private.gcs_utils import use_gcs_for_bootstrap
  21. from ray.core.generated import reporter_pb2
  22. from ray.core.generated import reporter_pb2_grpc
  23. from ray.ray_constants import DEBUG_AUTOSCALING_STATUS
  24. from ray._private.metrics_agent import MetricsAgent, Gauge, Record
  25. from ray.util.debug import log_once
  26. import psutil
  27. logger = logging.getLogger(__name__)
  28. enable_gpu_usage_check = True
  29. # Are we in a K8s pod?
  30. IN_KUBERNETES_POD = "KUBERNETES_SERVICE_HOST" in os.environ
  31. try:
  32. import gpustat.core as gpustat
  33. except (ModuleNotFoundError, ImportError):
  34. gpustat = None
  35. if log_once("gpustat_import_warning"):
  36. warnings.warn(
  37. "`gpustat` package is not installed. GPU monitoring is "
  38. "not available. To have full functionality of the "
  39. "dashboard please install `pip install ray["
  40. "default]`.)"
  41. )
  42. def recursive_asdict(o):
  43. if isinstance(o, tuple) and hasattr(o, "_asdict"):
  44. return recursive_asdict(o._asdict())
  45. if isinstance(o, (tuple, list)):
  46. L = []
  47. for k in o:
  48. L.append(recursive_asdict(k))
  49. return L
  50. if isinstance(o, dict):
  51. D = {k: recursive_asdict(v) for k, v in o.items()}
  52. return D
  53. return o
  54. def jsonify_asdict(o) -> str:
  55. return json.dumps(dashboard_utils.to_google_style(recursive_asdict(o)))
  56. # A list of gauges to record and export metrics.
  57. METRICS_GAUGES = {
  58. "node_cpu_utilization": Gauge(
  59. "node_cpu_utilization", "Total CPU usage on a ray node", "percentage", ["ip"]
  60. ),
  61. "node_cpu_count": Gauge(
  62. "node_cpu_count", "Total CPUs available on a ray node", "cores", ["ip"]
  63. ),
  64. "node_mem_used": Gauge(
  65. "node_mem_used", "Memory usage on a ray node", "bytes", ["ip"]
  66. ),
  67. "node_mem_available": Gauge(
  68. "node_mem_available", "Memory available on a ray node", "bytes", ["ip"]
  69. ),
  70. "node_mem_total": Gauge(
  71. "node_mem_total", "Total memory on a ray node", "bytes", ["ip"]
  72. ),
  73. "node_gpus_available": Gauge(
  74. "node_gpus_available",
  75. "Total GPUs available on a ray node",
  76. "percentage",
  77. ["ip"],
  78. ),
  79. "node_gpus_utilization": Gauge(
  80. "node_gpus_utilization", "Total GPUs usage on a ray node", "percentage", ["ip"]
  81. ),
  82. "node_gram_used": Gauge(
  83. "node_gram_used", "Total GPU RAM usage on a ray node", "bytes", ["ip"]
  84. ),
  85. "node_gram_available": Gauge(
  86. "node_gram_available", "Total GPU RAM available on a ray node", "bytes", ["ip"]
  87. ),
  88. "node_disk_usage": Gauge(
  89. "node_disk_usage", "Total disk usage (bytes) on a ray node", "bytes", ["ip"]
  90. ),
  91. "node_disk_free": Gauge(
  92. "node_disk_free", "Total disk free (bytes) on a ray node", "bytes", ["ip"]
  93. ),
  94. "node_disk_utilization_percentage": Gauge(
  95. "node_disk_utilization_percentage",
  96. "Total disk utilization (percentage) on a ray node",
  97. "percentage",
  98. ["ip"],
  99. ),
  100. "node_network_sent": Gauge(
  101. "node_network_sent", "Total network sent", "bytes", ["ip"]
  102. ),
  103. "node_network_received": Gauge(
  104. "node_network_received", "Total network received", "bytes", ["ip"]
  105. ),
  106. "node_network_send_speed": Gauge(
  107. "node_network_send_speed", "Network send speed", "bytes/sec", ["ip"]
  108. ),
  109. "node_network_receive_speed": Gauge(
  110. "node_network_receive_speed", "Network receive speed", "bytes/sec", ["ip"]
  111. ),
  112. "raylet_cpu": Gauge(
  113. "raylet_cpu", "CPU usage of the raylet on a node.", "percentage", ["ip", "pid"]
  114. ),
  115. "raylet_mem": Gauge(
  116. "raylet_mem", "Memory usage of the raylet on a node", "mb", ["ip", "pid"]
  117. ),
  118. "cluster_active_nodes": Gauge(
  119. "cluster_active_nodes", "Active nodes on the cluster", "count", ["node_type"]
  120. ),
  121. "cluster_failed_nodes": Gauge(
  122. "cluster_failed_nodes", "Failed nodes on the cluster", "count", ["node_type"]
  123. ),
  124. "cluster_pending_nodes": Gauge(
  125. "cluster_pending_nodes", "Pending nodes on the cluster", "count", ["node_type"]
  126. ),
  127. }
  128. class ReporterAgent(
  129. dashboard_utils.DashboardAgentModule, reporter_pb2_grpc.ReporterServiceServicer
  130. ):
  131. """A monitor process for monitoring Ray nodes.
  132. Attributes:
  133. dashboard_agent: The DashboardAgent object contains global config
  134. """
  135. def __init__(self, dashboard_agent):
  136. """Initialize the reporter object."""
  137. super().__init__(dashboard_agent)
  138. if IN_KUBERNETES_POD:
  139. # psutil does not compute this correctly when in a K8s pod.
  140. # Use ray._private.utils instead.
  141. cpu_count = ray._private.utils.get_num_cpus()
  142. self._cpu_counts = (cpu_count, cpu_count)
  143. else:
  144. self._cpu_counts = (psutil.cpu_count(), psutil.cpu_count(logical=False))
  145. self._ip = dashboard_agent.ip
  146. if not use_gcs_for_bootstrap():
  147. self._redis_address, _ = dashboard_agent.redis_address
  148. self._is_head_node = self._ip == self._redis_address
  149. else:
  150. self._is_head_node = self._ip == dashboard_agent.gcs_address.split(":")[0]
  151. self._hostname = socket.gethostname()
  152. self._workers = set()
  153. self._network_stats_hist = [(0, (0.0, 0.0))] # time, (sent, recv)
  154. self._metrics_agent = MetricsAgent(
  155. "127.0.0.1" if self._ip == "127.0.0.1" else "",
  156. dashboard_agent.metrics_export_port,
  157. )
  158. self._key = (
  159. f"{reporter_consts.REPORTER_PREFIX}" f"{self._dashboard_agent.node_id}"
  160. )
  161. async def GetProfilingStats(self, request, context):
  162. pid = request.pid
  163. duration = request.duration
  164. profiling_file_path = os.path.join(
  165. ray._private.utils.get_ray_temp_dir(), f"{pid}_profiling.txt"
  166. )
  167. sudo = "sudo" if ray._private.utils.get_user() != "root" else ""
  168. process = await asyncio.create_subprocess_shell(
  169. f"{sudo} $(which py-spy) record "
  170. f"-o {profiling_file_path} -p {pid} -d {duration} -f speedscope",
  171. stdout=subprocess.PIPE,
  172. stderr=subprocess.PIPE,
  173. shell=True,
  174. )
  175. stdout, stderr = await process.communicate()
  176. if process.returncode != 0:
  177. profiling_stats = ""
  178. else:
  179. with open(profiling_file_path, "r") as f:
  180. profiling_stats = f.read()
  181. return reporter_pb2.GetProfilingStatsReply(
  182. profiling_stats=profiling_stats, std_out=stdout, std_err=stderr
  183. )
  184. async def ReportOCMetrics(self, request, context):
  185. # This function receives a GRPC containing OpenCensus (OC) metrics
  186. # from a Ray process, then exposes those metrics to Prometheus.
  187. try:
  188. self._metrics_agent.record_metric_points_from_protobuf(request.metrics)
  189. except Exception:
  190. logger.error(traceback.format_exc())
  191. return reporter_pb2.ReportOCMetricsReply()
  192. @staticmethod
  193. def _get_cpu_percent():
  194. if IN_KUBERNETES_POD:
  195. return k8s_utils.cpu_percent()
  196. else:
  197. return psutil.cpu_percent()
  198. @staticmethod
  199. def _get_gpu_usage():
  200. global enable_gpu_usage_check
  201. if gpustat is None or not enable_gpu_usage_check:
  202. return []
  203. gpu_utilizations = []
  204. gpus = []
  205. try:
  206. gpus = gpustat.new_query().gpus
  207. except Exception as e:
  208. logger.debug(f"gpustat failed to retrieve GPU information: {e}")
  209. # gpustat calls pynvml.nvmlInit()
  210. # On machines without GPUs, this can run subprocesses that spew to
  211. # stderr. Then with log_to_driver=True, we get log spew from every
  212. # single raylet. To avoid this, disable the GPU usage check on
  213. # certain errors.
  214. # https://github.com/ray-project/ray/issues/14305
  215. # https://github.com/ray-project/ray/pull/21686
  216. if type(e).__name__ == "NVMLError_DriverNotLoaded":
  217. enable_gpu_usage_check = False
  218. for gpu in gpus:
  219. # Note the keys in this dict have periods which throws
  220. # off javascript so we change .s to _s
  221. gpu_data = {"_".join(key.split(".")): val for key, val in gpu.entry.items()}
  222. gpu_utilizations.append(gpu_data)
  223. return gpu_utilizations
  224. @staticmethod
  225. def _get_boot_time():
  226. if IN_KUBERNETES_POD:
  227. # Return start time of container entrypoint
  228. return psutil.Process(pid=1).create_time()
  229. else:
  230. return psutil.boot_time()
  231. @staticmethod
  232. def _get_network_stats():
  233. ifaces = [
  234. v for k, v in psutil.net_io_counters(pernic=True).items() if k[0] == "e"
  235. ]
  236. sent = sum((iface.bytes_sent for iface in ifaces))
  237. recv = sum((iface.bytes_recv for iface in ifaces))
  238. return sent, recv
  239. @staticmethod
  240. def _get_mem_usage():
  241. total = ray._private.utils.get_system_memory()
  242. used = ray._private.utils.get_used_memory()
  243. available = total - used
  244. percent = round(used / total, 3) * 100
  245. return total, available, percent, used
  246. @staticmethod
  247. def _get_disk_usage():
  248. if IN_KUBERNETES_POD:
  249. # If in a K8s pod, disable disk display by passing in dummy values.
  250. return {
  251. "/": psutil._common.sdiskusage(total=1, used=0, free=1, percent=0.0)
  252. }
  253. root = os.environ["USERPROFILE"] if sys.platform == "win32" else os.sep
  254. tmp = ray._private.utils.get_user_temp_dir()
  255. return {
  256. "/": psutil.disk_usage(root),
  257. tmp: psutil.disk_usage(tmp),
  258. }
  259. def _get_workers(self):
  260. raylet_proc = self._get_raylet_proc()
  261. if raylet_proc is None:
  262. return []
  263. else:
  264. workers = set(raylet_proc.children())
  265. self._workers.intersection_update(workers)
  266. self._workers.update(workers)
  267. self._workers.discard(psutil.Process())
  268. return [
  269. w.as_dict(
  270. attrs=[
  271. "pid",
  272. "create_time",
  273. "cpu_percent",
  274. "cpu_times",
  275. "cmdline",
  276. "memory_info",
  277. ]
  278. )
  279. for w in self._workers
  280. if w.status() != psutil.STATUS_ZOMBIE
  281. ]
  282. @staticmethod
  283. def _get_raylet_proc():
  284. try:
  285. curr_proc = psutil.Process()
  286. # Here, parent is always raylet because the
  287. # dashboard agent is a child of the raylet process.
  288. parent = curr_proc.parent()
  289. if parent is not None:
  290. if parent.pid == 1:
  291. return None
  292. if parent.status() == psutil.STATUS_ZOMBIE:
  293. return None
  294. return parent
  295. except (psutil.AccessDenied, ProcessLookupError):
  296. pass
  297. return None
  298. def _get_raylet(self):
  299. raylet_proc = self._get_raylet_proc()
  300. if raylet_proc is None:
  301. return {}
  302. else:
  303. return raylet_proc.as_dict(
  304. attrs=[
  305. "pid",
  306. "create_time",
  307. "cpu_percent",
  308. "cpu_times",
  309. "cmdline",
  310. "memory_info",
  311. ]
  312. )
  313. def _get_load_avg(self):
  314. if sys.platform == "win32":
  315. cpu_percent = psutil.cpu_percent()
  316. load = (cpu_percent, cpu_percent, cpu_percent)
  317. else:
  318. load = os.getloadavg()
  319. per_cpu_load = tuple((round(x / self._cpu_counts[0], 2) for x in load))
  320. return load, per_cpu_load
  321. def _get_all_stats(self):
  322. now = dashboard_utils.to_posix_time(datetime.datetime.utcnow())
  323. network_stats = self._get_network_stats()
  324. self._network_stats_hist.append((now, network_stats))
  325. self._network_stats_hist = self._network_stats_hist[-7:]
  326. then, prev_network_stats = self._network_stats_hist[0]
  327. prev_send, prev_recv = prev_network_stats
  328. now_send, now_recv = network_stats
  329. network_speed_stats = (
  330. (now_send - prev_send) / (now - then),
  331. (now_recv - prev_recv) / (now - then),
  332. )
  333. return {
  334. "now": now,
  335. "hostname": self._hostname,
  336. "ip": self._ip,
  337. "cpu": self._get_cpu_percent(),
  338. "cpus": self._cpu_counts,
  339. "mem": self._get_mem_usage(),
  340. "workers": self._get_workers(),
  341. "raylet": self._get_raylet(),
  342. "bootTime": self._get_boot_time(),
  343. "loadAvg": self._get_load_avg(),
  344. "disk": self._get_disk_usage(),
  345. "gpus": self._get_gpu_usage(),
  346. "network": network_stats,
  347. "network_speed": network_speed_stats,
  348. # Deprecated field, should be removed with frontend.
  349. "cmdline": self._get_raylet().get("cmdline", []),
  350. }
  351. def _record_stats(self, stats, cluster_stats):
  352. records_reported = []
  353. ip = stats["ip"]
  354. # -- Instance count of cluster --
  355. # Only report cluster stats on head node
  356. if "autoscaler_report" in cluster_stats and self._is_head_node:
  357. active_nodes = cluster_stats["autoscaler_report"]["active_nodes"]
  358. for node_type, active_node_count in active_nodes.items():
  359. records_reported.append(
  360. Record(
  361. gauge=METRICS_GAUGES["cluster_active_nodes"],
  362. value=active_node_count,
  363. tags={"node_type": node_type},
  364. )
  365. )
  366. failed_nodes = cluster_stats["autoscaler_report"]["failed_nodes"]
  367. failed_nodes_dict = {}
  368. for node_ip, node_type in failed_nodes:
  369. if node_type in failed_nodes_dict:
  370. failed_nodes_dict[node_type] += 1
  371. else:
  372. failed_nodes_dict[node_type] = 1
  373. for node_type, failed_node_count in failed_nodes_dict.items():
  374. records_reported.append(
  375. Record(
  376. gauge=METRICS_GAUGES["cluster_failed_nodes"],
  377. value=failed_node_count,
  378. tags={"node_type": node_type},
  379. )
  380. )
  381. pending_nodes = cluster_stats["autoscaler_report"]["pending_nodes"]
  382. pending_nodes_dict = {}
  383. for node_ip, node_type, status_message in pending_nodes:
  384. if node_type in pending_nodes_dict:
  385. pending_nodes_dict[node_type] += 1
  386. else:
  387. pending_nodes_dict[node_type] = 1
  388. for node_type, pending_node_count in pending_nodes_dict.items():
  389. records_reported.append(
  390. Record(
  391. gauge=METRICS_GAUGES["cluster_pending_nodes"],
  392. value=pending_node_count,
  393. tags={"node_type": node_type},
  394. )
  395. )
  396. # -- CPU per node --
  397. cpu_usage = float(stats["cpu"])
  398. cpu_record = Record(
  399. gauge=METRICS_GAUGES["node_cpu_utilization"],
  400. value=cpu_usage,
  401. tags={"ip": ip},
  402. )
  403. cpu_count, _ = stats["cpus"]
  404. cpu_count_record = Record(
  405. gauge=METRICS_GAUGES["node_cpu_count"], value=cpu_count, tags={"ip": ip}
  406. )
  407. # -- Mem per node --
  408. mem_total, mem_available, _, mem_used = stats["mem"]
  409. mem_used_record = Record(
  410. gauge=METRICS_GAUGES["node_mem_used"], value=mem_used, tags={"ip": ip}
  411. )
  412. mem_available_record = Record(
  413. gauge=METRICS_GAUGES["node_mem_available"],
  414. value=mem_available,
  415. tags={"ip": ip},
  416. )
  417. mem_total_record = Record(
  418. gauge=METRICS_GAUGES["node_mem_total"], value=mem_total, tags={"ip": ip}
  419. )
  420. # -- GPU per node --
  421. gpus = stats["gpus"]
  422. gpus_available = len(gpus)
  423. if gpus_available:
  424. gpus_utilization, gram_used, gram_total = 0, 0, 0
  425. for gpu in gpus:
  426. gpus_utilization += gpu["utilization_gpu"]
  427. gram_used += gpu["memory_used"]
  428. gram_total += gpu["memory_total"]
  429. gram_available = gram_total - gram_used
  430. gpus_available_record = Record(
  431. gauge=METRICS_GAUGES["node_gpus_available"],
  432. value=gpus_available,
  433. tags={"ip": ip},
  434. )
  435. gpus_utilization_record = Record(
  436. gauge=METRICS_GAUGES["node_gpus_utilization"],
  437. value=gpus_utilization,
  438. tags={"ip": ip},
  439. )
  440. gram_used_record = Record(
  441. gauge=METRICS_GAUGES["node_gram_used"], value=gram_used, tags={"ip": ip}
  442. )
  443. gram_available_record = Record(
  444. gauge=METRICS_GAUGES["node_gram_available"],
  445. value=gram_available,
  446. tags={"ip": ip},
  447. )
  448. records_reported.extend(
  449. [
  450. gpus_available_record,
  451. gpus_utilization_record,
  452. gram_used_record,
  453. gram_available_record,
  454. ]
  455. )
  456. # -- Disk per node --
  457. used, free = 0, 0
  458. for entry in stats["disk"].values():
  459. used += entry.used
  460. free += entry.free
  461. disk_utilization = float(used / (used + free)) * 100
  462. disk_usage_record = Record(
  463. gauge=METRICS_GAUGES["node_disk_usage"], value=used, tags={"ip": ip}
  464. )
  465. disk_free_record = Record(
  466. gauge=METRICS_GAUGES["node_disk_free"], value=free, tags={"ip": ip}
  467. )
  468. disk_utilization_percentage_record = Record(
  469. gauge=METRICS_GAUGES["node_disk_utilization_percentage"],
  470. value=disk_utilization,
  471. tags={"ip": ip},
  472. )
  473. # -- Network speed (send/receive) stats per node --
  474. network_stats = stats["network"]
  475. network_sent_record = Record(
  476. gauge=METRICS_GAUGES["node_network_sent"],
  477. value=network_stats[0],
  478. tags={"ip": ip},
  479. )
  480. network_received_record = Record(
  481. gauge=METRICS_GAUGES["node_network_received"],
  482. value=network_stats[1],
  483. tags={"ip": ip},
  484. )
  485. # -- Network speed (send/receive) per node --
  486. network_speed_stats = stats["network_speed"]
  487. network_send_speed_record = Record(
  488. gauge=METRICS_GAUGES["node_network_send_speed"],
  489. value=network_speed_stats[0],
  490. tags={"ip": ip},
  491. )
  492. network_receive_speed_record = Record(
  493. gauge=METRICS_GAUGES["node_network_receive_speed"],
  494. value=network_speed_stats[1],
  495. tags={"ip": ip},
  496. )
  497. raylet_stats = stats["raylet"]
  498. if raylet_stats:
  499. raylet_pid = str(raylet_stats["pid"])
  500. # -- raylet CPU --
  501. raylet_cpu_usage = float(raylet_stats["cpu_percent"]) * 100
  502. raylet_cpu_record = Record(
  503. gauge=METRICS_GAUGES["raylet_cpu"],
  504. value=raylet_cpu_usage,
  505. tags={"ip": ip, "pid": raylet_pid},
  506. )
  507. # -- raylet mem --
  508. raylet_mem_usage = float(raylet_stats["memory_info"].rss) / 1e6
  509. raylet_mem_record = Record(
  510. gauge=METRICS_GAUGES["raylet_mem"],
  511. value=raylet_mem_usage,
  512. tags={"ip": ip, "pid": raylet_pid},
  513. )
  514. records_reported.extend([raylet_cpu_record, raylet_mem_record])
  515. records_reported.extend(
  516. [
  517. cpu_record,
  518. cpu_count_record,
  519. mem_used_record,
  520. mem_available_record,
  521. mem_total_record,
  522. disk_usage_record,
  523. disk_free_record,
  524. disk_utilization_percentage_record,
  525. network_sent_record,
  526. network_received_record,
  527. network_send_speed_record,
  528. network_receive_speed_record,
  529. ]
  530. )
  531. return records_reported
  532. async def _perform_iteration(self, publish):
  533. """Get any changes to the log files and push updates to Redis."""
  534. while True:
  535. try:
  536. formatted_status_string = internal_kv._internal_kv_get(
  537. DEBUG_AUTOSCALING_STATUS
  538. )
  539. cluster_stats = (
  540. json.loads(formatted_status_string.decode())
  541. if formatted_status_string
  542. else {}
  543. )
  544. stats = self._get_all_stats()
  545. records_reported = self._record_stats(stats, cluster_stats)
  546. self._metrics_agent.record_reporter_stats(records_reported)
  547. await publish(self._key, jsonify_asdict(stats))
  548. except Exception:
  549. logger.exception("Error publishing node physical stats.")
  550. await asyncio.sleep(reporter_consts.REPORTER_UPDATE_INTERVAL_MS / 1000)
  551. async def run(self, server):
  552. reporter_pb2_grpc.add_ReporterServiceServicer_to_server(self, server)
  553. if gcs_pubsub_enabled():
  554. gcs_addr = self._dashboard_agent.gcs_address
  555. if gcs_addr is None:
  556. aioredis_client = await aioredis.create_redis_pool(
  557. address=self._dashboard_agent.redis_address,
  558. password=self._dashboard_agent.redis_password,
  559. )
  560. gcs_addr = await aioredis_client.get("GcsServerAddress")
  561. gcs_addr = gcs_addr.decode()
  562. publisher = GcsAioPublisher(address=gcs_addr)
  563. async def publish(key: str, data: str):
  564. await publisher.publish_resource_usage(key, data)
  565. else:
  566. aioredis_client = await aioredis.create_redis_pool(
  567. address=self._dashboard_agent.redis_address,
  568. password=self._dashboard_agent.redis_password,
  569. )
  570. async def publish(key: str, data: str):
  571. await aioredis_client.publish(key, data)
  572. await self._perform_iteration(publish)
  573. @staticmethod
  574. def is_minimal_module():
  575. return False