123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- #!/usr/bin/env python3
- import os
- import zmq
- import time
- from pathlib import Path
- from collections import defaultdict
- from datetime import datetime, timezone
- from typing import NoReturn, Union, List, Dict
- from common.params import Params
- from cereal.messaging import SubMaster
- from system.swaglog import cloudlog
- from system.hardware import HARDWARE
- from common.file_helpers import atomic_write_in_dir
- from system.version import get_normalized_origin, get_short_branch, get_short_version, is_dirty
- from selfdrive.loggerd.config import STATS_DIR, STATS_DIR_FILE_LIMIT, STATS_SOCKET, STATS_FLUSH_TIME_S
- class METRIC_TYPE:
- GAUGE = 'g'
- SAMPLE = 'sa'
- class StatLog:
- def __init__(self):
- self.pid = None
- def connect(self) -> None:
- self.zctx = zmq.Context()
- self.sock = self.zctx.socket(zmq.PUSH)
- self.sock.setsockopt(zmq.LINGER, 10)
- self.sock.connect(STATS_SOCKET)
- self.pid = os.getpid()
- def _send(self, metric: str) -> None:
- if os.getpid() != self.pid:
- self.connect()
- try:
- self.sock.send_string(metric, zmq.NOBLOCK)
- except zmq.error.Again:
- # drop :/
- pass
- def gauge(self, name: str, value: float) -> None:
- self._send(f"{name}:{value}|{METRIC_TYPE.GAUGE}")
- # Samples will be recorded in a buffer and at aggregation time,
- # statistical properties will be logged (mean, count, percentiles, ...)
- def sample(self, name: str, value: float):
- self._send(f"{name}:{value}|{METRIC_TYPE.SAMPLE}")
- def main() -> NoReturn:
- dongle_id = Params().get("DongleId", encoding='utf-8')
- def get_influxdb_line(measurement: str, value: Union[float, Dict[str, float]], timestamp: datetime, tags: dict) -> str:
- res = f"{measurement}"
- for k, v in tags.items():
- res += f",{k}={str(v)}"
- res += " "
- if isinstance(value, float):
- value = {'value': value}
- for k, v in value.items():
- res += f"{k}={v},"
- res += f"dongle_id=\"{dongle_id}\" {int(timestamp.timestamp() * 1e9)}\n"
- return res
- # open statistics socket
- ctx = zmq.Context().instance()
- sock = ctx.socket(zmq.PULL)
- sock.bind(STATS_SOCKET)
- # initialize stats directory
- Path(STATS_DIR).mkdir(parents=True, exist_ok=True)
- # initialize tags
- tags = {
- 'started': False,
- 'version': get_short_version(),
- 'branch': get_short_branch(),
- 'dirty': is_dirty(),
- 'origin': get_normalized_origin(),
- 'deviceType': HARDWARE.get_device_type(),
- }
- # subscribe to deviceState for started state
- sm = SubMaster(['deviceState'])
- idx = 0
- last_flush_time = time.monotonic()
- gauges = {}
- samples: Dict[str, List[float]] = defaultdict(list)
- while True:
- started_prev = sm['deviceState'].started
- sm.update()
- # Update metrics
- while True:
- try:
- metric = sock.recv_string(zmq.NOBLOCK)
- try:
- metric_type = metric.split('|')[1]
- metric_name = metric.split(':')[0]
- metric_value = float(metric.split('|')[0].split(':')[1])
- if metric_type == METRIC_TYPE.GAUGE:
- gauges[metric_name] = metric_value
- elif metric_type == METRIC_TYPE.SAMPLE:
- samples[metric_name].append(metric_value)
- else:
- cloudlog.event("unknown metric type", metric_type=metric_type)
- except Exception:
- cloudlog.event("malformed metric", metric=metric)
- except zmq.error.Again:
- break
- # flush when started state changes or after FLUSH_TIME_S
- if (time.monotonic() > last_flush_time + STATS_FLUSH_TIME_S) or (sm['deviceState'].started != started_prev):
- result = ""
- current_time = datetime.utcnow().replace(tzinfo=timezone.utc)
- tags['started'] = sm['deviceState'].started
- for key, value in gauges.items():
- result += get_influxdb_line(f"gauge.{key}", value, current_time, tags)
- for key, values in samples.items():
- values.sort()
- sample_count = len(values)
- sample_sum = sum(values)
- stats = {
- 'count': sample_count,
- 'min': values[0],
- 'max': values[-1],
- 'mean': sample_sum / sample_count,
- }
- for percentile in [0.05, 0.5, 0.95]:
- value = values[int(round(percentile * (sample_count - 1)))]
- stats[f"p{int(percentile * 100)}"] = value
- result += get_influxdb_line(f"sample.{key}", stats, current_time, tags)
- # clear intermediate data
- gauges.clear()
- samples.clear()
- last_flush_time = time.monotonic()
- # check that we aren't filling up the drive
- if len(os.listdir(STATS_DIR)) < STATS_DIR_FILE_LIMIT:
- if len(result) > 0:
- stats_path = os.path.join(STATS_DIR, f"{current_time.timestamp():.0f}_{idx}")
- with atomic_write_in_dir(stats_path) as f:
- f.write(result)
- idx += 1
- else:
- cloudlog.error("stats dir full")
- if __name__ == "__main__":
- main()
- else:
- statlog = StatLog()
|