123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640 |
- import asyncio
- import datetime
- import json
- import logging
- import os
- import socket
- import subprocess
- import sys
- import traceback
- import warnings
- import aioredis
- import ray
- import ray.dashboard.modules.reporter.reporter_consts as reporter_consts
- from ray.dashboard import k8s_utils
- import ray.dashboard.utils as dashboard_utils
- import ray.experimental.internal_kv as internal_kv
- from ray._private.gcs_pubsub import gcs_pubsub_enabled, GcsAioPublisher
- import ray._private.services
- import ray._private.utils
- from ray._private.gcs_utils import use_gcs_for_bootstrap
- from ray.core.generated import reporter_pb2
- from ray.core.generated import reporter_pb2_grpc
- from ray.ray_constants import DEBUG_AUTOSCALING_STATUS
- from ray._private.metrics_agent import MetricsAgent, Gauge, Record
- from ray.util.debug import log_once
- import psutil
- logger = logging.getLogger(__name__)
- enable_gpu_usage_check = True
- # Are we in a K8s pod?
- IN_KUBERNETES_POD = "KUBERNETES_SERVICE_HOST" in os.environ
- try:
- import gpustat.core as gpustat
- except (ModuleNotFoundError, ImportError):
- gpustat = None
- if log_once("gpustat_import_warning"):
- warnings.warn(
- "`gpustat` package is not installed. GPU monitoring is "
- "not available. To have full functionality of the "
- "dashboard please install `pip install ray["
- "default]`.)"
- )
- def recursive_asdict(o):
- if isinstance(o, tuple) and hasattr(o, "_asdict"):
- return recursive_asdict(o._asdict())
- if isinstance(o, (tuple, list)):
- L = []
- for k in o:
- L.append(recursive_asdict(k))
- return L
- if isinstance(o, dict):
- D = {k: recursive_asdict(v) for k, v in o.items()}
- return D
- return o
- def jsonify_asdict(o) -> str:
- return json.dumps(dashboard_utils.to_google_style(recursive_asdict(o)))
- # A list of gauges to record and export metrics.
- METRICS_GAUGES = {
- "node_cpu_utilization": Gauge(
- "node_cpu_utilization", "Total CPU usage on a ray node", "percentage", ["ip"]
- ),
- "node_cpu_count": Gauge(
- "node_cpu_count", "Total CPUs available on a ray node", "cores", ["ip"]
- ),
- "node_mem_used": Gauge(
- "node_mem_used", "Memory usage on a ray node", "bytes", ["ip"]
- ),
- "node_mem_available": Gauge(
- "node_mem_available", "Memory available on a ray node", "bytes", ["ip"]
- ),
- "node_mem_total": Gauge(
- "node_mem_total", "Total memory on a ray node", "bytes", ["ip"]
- ),
- "node_gpus_available": Gauge(
- "node_gpus_available",
- "Total GPUs available on a ray node",
- "percentage",
- ["ip"],
- ),
- "node_gpus_utilization": Gauge(
- "node_gpus_utilization", "Total GPUs usage on a ray node", "percentage", ["ip"]
- ),
- "node_gram_used": Gauge(
- "node_gram_used", "Total GPU RAM usage on a ray node", "bytes", ["ip"]
- ),
- "node_gram_available": Gauge(
- "node_gram_available", "Total GPU RAM available on a ray node", "bytes", ["ip"]
- ),
- "node_disk_usage": Gauge(
- "node_disk_usage", "Total disk usage (bytes) on a ray node", "bytes", ["ip"]
- ),
- "node_disk_free": Gauge(
- "node_disk_free", "Total disk free (bytes) on a ray node", "bytes", ["ip"]
- ),
- "node_disk_utilization_percentage": Gauge(
- "node_disk_utilization_percentage",
- "Total disk utilization (percentage) on a ray node",
- "percentage",
- ["ip"],
- ),
- "node_network_sent": Gauge(
- "node_network_sent", "Total network sent", "bytes", ["ip"]
- ),
- "node_network_received": Gauge(
- "node_network_received", "Total network received", "bytes", ["ip"]
- ),
- "node_network_send_speed": Gauge(
- "node_network_send_speed", "Network send speed", "bytes/sec", ["ip"]
- ),
- "node_network_receive_speed": Gauge(
- "node_network_receive_speed", "Network receive speed", "bytes/sec", ["ip"]
- ),
- "raylet_cpu": Gauge(
- "raylet_cpu", "CPU usage of the raylet on a node.", "percentage", ["ip", "pid"]
- ),
- "raylet_mem": Gauge(
- "raylet_mem", "Memory usage of the raylet on a node", "mb", ["ip", "pid"]
- ),
- "cluster_active_nodes": Gauge(
- "cluster_active_nodes", "Active nodes on the cluster", "count", ["node_type"]
- ),
- "cluster_failed_nodes": Gauge(
- "cluster_failed_nodes", "Failed nodes on the cluster", "count", ["node_type"]
- ),
- "cluster_pending_nodes": Gauge(
- "cluster_pending_nodes", "Pending nodes on the cluster", "count", ["node_type"]
- ),
- }
- class ReporterAgent(
- dashboard_utils.DashboardAgentModule, reporter_pb2_grpc.ReporterServiceServicer
- ):
- """A monitor process for monitoring Ray nodes.
- Attributes:
- dashboard_agent: The DashboardAgent object contains global config
- """
- def __init__(self, dashboard_agent):
- """Initialize the reporter object."""
- super().__init__(dashboard_agent)
- if IN_KUBERNETES_POD:
- # psutil does not compute this correctly when in a K8s pod.
- # Use ray._private.utils instead.
- cpu_count = ray._private.utils.get_num_cpus()
- self._cpu_counts = (cpu_count, cpu_count)
- else:
- self._cpu_counts = (psutil.cpu_count(), psutil.cpu_count(logical=False))
- self._ip = dashboard_agent.ip
- if not use_gcs_for_bootstrap():
- self._redis_address, _ = dashboard_agent.redis_address
- self._is_head_node = self._ip == self._redis_address
- else:
- self._is_head_node = self._ip == dashboard_agent.gcs_address.split(":")[0]
- self._hostname = socket.gethostname()
- self._workers = set()
- self._network_stats_hist = [(0, (0.0, 0.0))] # time, (sent, recv)
- self._metrics_agent = MetricsAgent(
- "127.0.0.1" if self._ip == "127.0.0.1" else "",
- dashboard_agent.metrics_export_port,
- )
- self._key = (
- f"{reporter_consts.REPORTER_PREFIX}" f"{self._dashboard_agent.node_id}"
- )
- async def GetProfilingStats(self, request, context):
- pid = request.pid
- duration = request.duration
- profiling_file_path = os.path.join(
- ray._private.utils.get_ray_temp_dir(), f"{pid}_profiling.txt"
- )
- sudo = "sudo" if ray._private.utils.get_user() != "root" else ""
- process = await asyncio.create_subprocess_shell(
- f"{sudo} $(which py-spy) record "
- f"-o {profiling_file_path} -p {pid} -d {duration} -f speedscope",
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- shell=True,
- )
- stdout, stderr = await process.communicate()
- if process.returncode != 0:
- profiling_stats = ""
- else:
- with open(profiling_file_path, "r") as f:
- profiling_stats = f.read()
- return reporter_pb2.GetProfilingStatsReply(
- profiling_stats=profiling_stats, std_out=stdout, std_err=stderr
- )
- async def ReportOCMetrics(self, request, context):
- # This function receives a GRPC containing OpenCensus (OC) metrics
- # from a Ray process, then exposes those metrics to Prometheus.
- try:
- self._metrics_agent.record_metric_points_from_protobuf(request.metrics)
- except Exception:
- logger.error(traceback.format_exc())
- return reporter_pb2.ReportOCMetricsReply()
- @staticmethod
- def _get_cpu_percent():
- if IN_KUBERNETES_POD:
- return k8s_utils.cpu_percent()
- else:
- return psutil.cpu_percent()
- @staticmethod
- def _get_gpu_usage():
- global enable_gpu_usage_check
- if gpustat is None or not enable_gpu_usage_check:
- return []
- gpu_utilizations = []
- gpus = []
- try:
- gpus = gpustat.new_query().gpus
- except Exception as e:
- logger.debug(f"gpustat failed to retrieve GPU information: {e}")
- # gpustat calls pynvml.nvmlInit()
- # On machines without GPUs, this can run subprocesses that spew to
- # stderr. Then with log_to_driver=True, we get log spew from every
- # single raylet. To avoid this, disable the GPU usage check on
- # certain errors.
- # https://github.com/ray-project/ray/issues/14305
- # https://github.com/ray-project/ray/pull/21686
- if type(e).__name__ == "NVMLError_DriverNotLoaded":
- enable_gpu_usage_check = False
- for gpu in gpus:
- # Note the keys in this dict have periods which throws
- # off javascript so we change .s to _s
- gpu_data = {"_".join(key.split(".")): val for key, val in gpu.entry.items()}
- gpu_utilizations.append(gpu_data)
- return gpu_utilizations
- @staticmethod
- def _get_boot_time():
- if IN_KUBERNETES_POD:
- # Return start time of container entrypoint
- return psutil.Process(pid=1).create_time()
- else:
- return psutil.boot_time()
- @staticmethod
- def _get_network_stats():
- ifaces = [
- v for k, v in psutil.net_io_counters(pernic=True).items() if k[0] == "e"
- ]
- sent = sum((iface.bytes_sent for iface in ifaces))
- recv = sum((iface.bytes_recv for iface in ifaces))
- return sent, recv
- @staticmethod
- def _get_mem_usage():
- total = ray._private.utils.get_system_memory()
- used = ray._private.utils.get_used_memory()
- available = total - used
- percent = round(used / total, 3) * 100
- return total, available, percent, used
- @staticmethod
- def _get_disk_usage():
- if IN_KUBERNETES_POD:
- # If in a K8s pod, disable disk display by passing in dummy values.
- return {
- "/": psutil._common.sdiskusage(total=1, used=0, free=1, percent=0.0)
- }
- root = os.environ["USERPROFILE"] if sys.platform == "win32" else os.sep
- tmp = ray._private.utils.get_user_temp_dir()
- return {
- "/": psutil.disk_usage(root),
- tmp: psutil.disk_usage(tmp),
- }
- def _get_workers(self):
- raylet_proc = self._get_raylet_proc()
- if raylet_proc is None:
- return []
- else:
- workers = set(raylet_proc.children())
- self._workers.intersection_update(workers)
- self._workers.update(workers)
- self._workers.discard(psutil.Process())
- return [
- w.as_dict(
- attrs=[
- "pid",
- "create_time",
- "cpu_percent",
- "cpu_times",
- "cmdline",
- "memory_info",
- ]
- )
- for w in self._workers
- if w.status() != psutil.STATUS_ZOMBIE
- ]
- @staticmethod
- def _get_raylet_proc():
- try:
- curr_proc = psutil.Process()
- # Here, parent is always raylet because the
- # dashboard agent is a child of the raylet process.
- parent = curr_proc.parent()
- if parent is not None:
- if parent.pid == 1:
- return None
- if parent.status() == psutil.STATUS_ZOMBIE:
- return None
- return parent
- except (psutil.AccessDenied, ProcessLookupError):
- pass
- return None
- def _get_raylet(self):
- raylet_proc = self._get_raylet_proc()
- if raylet_proc is None:
- return {}
- else:
- return raylet_proc.as_dict(
- attrs=[
- "pid",
- "create_time",
- "cpu_percent",
- "cpu_times",
- "cmdline",
- "memory_info",
- ]
- )
- def _get_load_avg(self):
- if sys.platform == "win32":
- cpu_percent = psutil.cpu_percent()
- load = (cpu_percent, cpu_percent, cpu_percent)
- else:
- load = os.getloadavg()
- per_cpu_load = tuple((round(x / self._cpu_counts[0], 2) for x in load))
- return load, per_cpu_load
- def _get_all_stats(self):
- now = dashboard_utils.to_posix_time(datetime.datetime.utcnow())
- network_stats = self._get_network_stats()
- self._network_stats_hist.append((now, network_stats))
- self._network_stats_hist = self._network_stats_hist[-7:]
- then, prev_network_stats = self._network_stats_hist[0]
- prev_send, prev_recv = prev_network_stats
- now_send, now_recv = network_stats
- network_speed_stats = (
- (now_send - prev_send) / (now - then),
- (now_recv - prev_recv) / (now - then),
- )
- return {
- "now": now,
- "hostname": self._hostname,
- "ip": self._ip,
- "cpu": self._get_cpu_percent(),
- "cpus": self._cpu_counts,
- "mem": self._get_mem_usage(),
- "workers": self._get_workers(),
- "raylet": self._get_raylet(),
- "bootTime": self._get_boot_time(),
- "loadAvg": self._get_load_avg(),
- "disk": self._get_disk_usage(),
- "gpus": self._get_gpu_usage(),
- "network": network_stats,
- "network_speed": network_speed_stats,
- # Deprecated field, should be removed with frontend.
- "cmdline": self._get_raylet().get("cmdline", []),
- }
- def _record_stats(self, stats, cluster_stats):
- records_reported = []
- ip = stats["ip"]
- # -- Instance count of cluster --
- # Only report cluster stats on head node
- if "autoscaler_report" in cluster_stats and self._is_head_node:
- active_nodes = cluster_stats["autoscaler_report"]["active_nodes"]
- for node_type, active_node_count in active_nodes.items():
- records_reported.append(
- Record(
- gauge=METRICS_GAUGES["cluster_active_nodes"],
- value=active_node_count,
- tags={"node_type": node_type},
- )
- )
- failed_nodes = cluster_stats["autoscaler_report"]["failed_nodes"]
- failed_nodes_dict = {}
- for node_ip, node_type in failed_nodes:
- if node_type in failed_nodes_dict:
- failed_nodes_dict[node_type] += 1
- else:
- failed_nodes_dict[node_type] = 1
- for node_type, failed_node_count in failed_nodes_dict.items():
- records_reported.append(
- Record(
- gauge=METRICS_GAUGES["cluster_failed_nodes"],
- value=failed_node_count,
- tags={"node_type": node_type},
- )
- )
- pending_nodes = cluster_stats["autoscaler_report"]["pending_nodes"]
- pending_nodes_dict = {}
- for node_ip, node_type, status_message in pending_nodes:
- if node_type in pending_nodes_dict:
- pending_nodes_dict[node_type] += 1
- else:
- pending_nodes_dict[node_type] = 1
- for node_type, pending_node_count in pending_nodes_dict.items():
- records_reported.append(
- Record(
- gauge=METRICS_GAUGES["cluster_pending_nodes"],
- value=pending_node_count,
- tags={"node_type": node_type},
- )
- )
- # -- CPU per node --
- cpu_usage = float(stats["cpu"])
- cpu_record = Record(
- gauge=METRICS_GAUGES["node_cpu_utilization"],
- value=cpu_usage,
- tags={"ip": ip},
- )
- cpu_count, _ = stats["cpus"]
- cpu_count_record = Record(
- gauge=METRICS_GAUGES["node_cpu_count"], value=cpu_count, tags={"ip": ip}
- )
- # -- Mem per node --
- mem_total, mem_available, _, mem_used = stats["mem"]
- mem_used_record = Record(
- gauge=METRICS_GAUGES["node_mem_used"], value=mem_used, tags={"ip": ip}
- )
- mem_available_record = Record(
- gauge=METRICS_GAUGES["node_mem_available"],
- value=mem_available,
- tags={"ip": ip},
- )
- mem_total_record = Record(
- gauge=METRICS_GAUGES["node_mem_total"], value=mem_total, tags={"ip": ip}
- )
- # -- GPU per node --
- gpus = stats["gpus"]
- gpus_available = len(gpus)
- if gpus_available:
- gpus_utilization, gram_used, gram_total = 0, 0, 0
- for gpu in gpus:
- gpus_utilization += gpu["utilization_gpu"]
- gram_used += gpu["memory_used"]
- gram_total += gpu["memory_total"]
- gram_available = gram_total - gram_used
- gpus_available_record = Record(
- gauge=METRICS_GAUGES["node_gpus_available"],
- value=gpus_available,
- tags={"ip": ip},
- )
- gpus_utilization_record = Record(
- gauge=METRICS_GAUGES["node_gpus_utilization"],
- value=gpus_utilization,
- tags={"ip": ip},
- )
- gram_used_record = Record(
- gauge=METRICS_GAUGES["node_gram_used"], value=gram_used, tags={"ip": ip}
- )
- gram_available_record = Record(
- gauge=METRICS_GAUGES["node_gram_available"],
- value=gram_available,
- tags={"ip": ip},
- )
- records_reported.extend(
- [
- gpus_available_record,
- gpus_utilization_record,
- gram_used_record,
- gram_available_record,
- ]
- )
- # -- Disk per node --
- used, free = 0, 0
- for entry in stats["disk"].values():
- used += entry.used
- free += entry.free
- disk_utilization = float(used / (used + free)) * 100
- disk_usage_record = Record(
- gauge=METRICS_GAUGES["node_disk_usage"], value=used, tags={"ip": ip}
- )
- disk_free_record = Record(
- gauge=METRICS_GAUGES["node_disk_free"], value=free, tags={"ip": ip}
- )
- disk_utilization_percentage_record = Record(
- gauge=METRICS_GAUGES["node_disk_utilization_percentage"],
- value=disk_utilization,
- tags={"ip": ip},
- )
- # -- Network speed (send/receive) stats per node --
- network_stats = stats["network"]
- network_sent_record = Record(
- gauge=METRICS_GAUGES["node_network_sent"],
- value=network_stats[0],
- tags={"ip": ip},
- )
- network_received_record = Record(
- gauge=METRICS_GAUGES["node_network_received"],
- value=network_stats[1],
- tags={"ip": ip},
- )
- # -- Network speed (send/receive) per node --
- network_speed_stats = stats["network_speed"]
- network_send_speed_record = Record(
- gauge=METRICS_GAUGES["node_network_send_speed"],
- value=network_speed_stats[0],
- tags={"ip": ip},
- )
- network_receive_speed_record = Record(
- gauge=METRICS_GAUGES["node_network_receive_speed"],
- value=network_speed_stats[1],
- tags={"ip": ip},
- )
- raylet_stats = stats["raylet"]
- if raylet_stats:
- raylet_pid = str(raylet_stats["pid"])
- # -- raylet CPU --
- raylet_cpu_usage = float(raylet_stats["cpu_percent"]) * 100
- raylet_cpu_record = Record(
- gauge=METRICS_GAUGES["raylet_cpu"],
- value=raylet_cpu_usage,
- tags={"ip": ip, "pid": raylet_pid},
- )
- # -- raylet mem --
- raylet_mem_usage = float(raylet_stats["memory_info"].rss) / 1e6
- raylet_mem_record = Record(
- gauge=METRICS_GAUGES["raylet_mem"],
- value=raylet_mem_usage,
- tags={"ip": ip, "pid": raylet_pid},
- )
- records_reported.extend([raylet_cpu_record, raylet_mem_record])
- records_reported.extend(
- [
- cpu_record,
- cpu_count_record,
- mem_used_record,
- mem_available_record,
- mem_total_record,
- disk_usage_record,
- disk_free_record,
- disk_utilization_percentage_record,
- network_sent_record,
- network_received_record,
- network_send_speed_record,
- network_receive_speed_record,
- ]
- )
- return records_reported
- async def _perform_iteration(self, publish):
- """Get any changes to the log files and push updates to Redis."""
- while True:
- try:
- formatted_status_string = internal_kv._internal_kv_get(
- DEBUG_AUTOSCALING_STATUS
- )
- cluster_stats = (
- json.loads(formatted_status_string.decode())
- if formatted_status_string
- else {}
- )
- stats = self._get_all_stats()
- records_reported = self._record_stats(stats, cluster_stats)
- self._metrics_agent.record_reporter_stats(records_reported)
- await publish(self._key, jsonify_asdict(stats))
- except Exception:
- logger.exception("Error publishing node physical stats.")
- await asyncio.sleep(reporter_consts.REPORTER_UPDATE_INTERVAL_MS / 1000)
- async def run(self, server):
- reporter_pb2_grpc.add_ReporterServiceServicer_to_server(self, server)
- if gcs_pubsub_enabled():
- gcs_addr = self._dashboard_agent.gcs_address
- if gcs_addr is None:
- aioredis_client = await aioredis.create_redis_pool(
- address=self._dashboard_agent.redis_address,
- password=self._dashboard_agent.redis_password,
- )
- gcs_addr = await aioredis_client.get("GcsServerAddress")
- gcs_addr = gcs_addr.decode()
- publisher = GcsAioPublisher(address=gcs_addr)
- async def publish(key: str, data: str):
- await publisher.publish_resource_usage(key, data)
- else:
- aioredis_client = await aioredis.create_redis_pool(
- address=self._dashboard_agent.redis_address,
- password=self._dashboard_agent.redis_password,
- )
- async def publish(key: str, data: str):
- await aioredis_client.publish(key, data)
- await self._perform_iteration(publish)
- @staticmethod
- def is_minimal_module():
- return False
|