statsd.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. #!/usr/bin/env python3
  2. import os
  3. import zmq
  4. import time
  5. from pathlib import Path
  6. from collections import defaultdict
  7. from datetime import datetime, timezone
  8. from typing import NoReturn, Union, List, Dict
  9. from common.params import Params
  10. from cereal.messaging import SubMaster
  11. from system.swaglog import cloudlog
  12. from system.hardware import HARDWARE
  13. from common.file_helpers import atomic_write_in_dir
  14. from system.version import get_normalized_origin, get_short_branch, get_short_version, is_dirty
  15. from selfdrive.loggerd.config import STATS_DIR, STATS_DIR_FILE_LIMIT, STATS_SOCKET, STATS_FLUSH_TIME_S
  16. class METRIC_TYPE:
  17. GAUGE = 'g'
  18. SAMPLE = 'sa'
  19. class StatLog:
  20. def __init__(self):
  21. self.pid = None
  22. def connect(self) -> None:
  23. self.zctx = zmq.Context()
  24. self.sock = self.zctx.socket(zmq.PUSH)
  25. self.sock.setsockopt(zmq.LINGER, 10)
  26. self.sock.connect(STATS_SOCKET)
  27. self.pid = os.getpid()
  28. def _send(self, metric: str) -> None:
  29. if os.getpid() != self.pid:
  30. self.connect()
  31. try:
  32. self.sock.send_string(metric, zmq.NOBLOCK)
  33. except zmq.error.Again:
  34. # drop :/
  35. pass
  36. def gauge(self, name: str, value: float) -> None:
  37. self._send(f"{name}:{value}|{METRIC_TYPE.GAUGE}")
  38. # Samples will be recorded in a buffer and at aggregation time,
  39. # statistical properties will be logged (mean, count, percentiles, ...)
  40. def sample(self, name: str, value: float):
  41. self._send(f"{name}:{value}|{METRIC_TYPE.SAMPLE}")
  42. def main() -> NoReturn:
  43. dongle_id = Params().get("DongleId", encoding='utf-8')
  44. def get_influxdb_line(measurement: str, value: Union[float, Dict[str, float]], timestamp: datetime, tags: dict) -> str:
  45. res = f"{measurement}"
  46. for k, v in tags.items():
  47. res += f",{k}={str(v)}"
  48. res += " "
  49. if isinstance(value, float):
  50. value = {'value': value}
  51. for k, v in value.items():
  52. res += f"{k}={v},"
  53. res += f"dongle_id=\"{dongle_id}\" {int(timestamp.timestamp() * 1e9)}\n"
  54. return res
  55. # open statistics socket
  56. ctx = zmq.Context().instance()
  57. sock = ctx.socket(zmq.PULL)
  58. sock.bind(STATS_SOCKET)
  59. # initialize stats directory
  60. Path(STATS_DIR).mkdir(parents=True, exist_ok=True)
  61. # initialize tags
  62. tags = {
  63. 'started': False,
  64. 'version': get_short_version(),
  65. 'branch': get_short_branch(),
  66. 'dirty': is_dirty(),
  67. 'origin': get_normalized_origin(),
  68. 'deviceType': HARDWARE.get_device_type(),
  69. }
  70. # subscribe to deviceState for started state
  71. sm = SubMaster(['deviceState'])
  72. idx = 0
  73. last_flush_time = time.monotonic()
  74. gauges = {}
  75. samples: Dict[str, List[float]] = defaultdict(list)
  76. while True:
  77. started_prev = sm['deviceState'].started
  78. sm.update()
  79. # Update metrics
  80. while True:
  81. try:
  82. metric = sock.recv_string(zmq.NOBLOCK)
  83. try:
  84. metric_type = metric.split('|')[1]
  85. metric_name = metric.split(':')[0]
  86. metric_value = float(metric.split('|')[0].split(':')[1])
  87. if metric_type == METRIC_TYPE.GAUGE:
  88. gauges[metric_name] = metric_value
  89. elif metric_type == METRIC_TYPE.SAMPLE:
  90. samples[metric_name].append(metric_value)
  91. else:
  92. cloudlog.event("unknown metric type", metric_type=metric_type)
  93. except Exception:
  94. cloudlog.event("malformed metric", metric=metric)
  95. except zmq.error.Again:
  96. break
  97. # flush when started state changes or after FLUSH_TIME_S
  98. if (time.monotonic() > last_flush_time + STATS_FLUSH_TIME_S) or (sm['deviceState'].started != started_prev):
  99. result = ""
  100. current_time = datetime.utcnow().replace(tzinfo=timezone.utc)
  101. tags['started'] = sm['deviceState'].started
  102. for key, value in gauges.items():
  103. result += get_influxdb_line(f"gauge.{key}", value, current_time, tags)
  104. for key, values in samples.items():
  105. values.sort()
  106. sample_count = len(values)
  107. sample_sum = sum(values)
  108. stats = {
  109. 'count': sample_count,
  110. 'min': values[0],
  111. 'max': values[-1],
  112. 'mean': sample_sum / sample_count,
  113. }
  114. for percentile in [0.05, 0.5, 0.95]:
  115. value = values[int(round(percentile * (sample_count - 1)))]
  116. stats[f"p{int(percentile * 100)}"] = value
  117. result += get_influxdb_line(f"sample.{key}", stats, current_time, tags)
  118. # clear intermediate data
  119. gauges.clear()
  120. samples.clear()
  121. last_flush_time = time.monotonic()
  122. # check that we aren't filling up the drive
  123. if len(os.listdir(STATS_DIR)) < STATS_DIR_FILE_LIMIT:
  124. if len(result) > 0:
  125. stats_path = os.path.join(STATS_DIR, f"{current_time.timestamp():.0f}_{idx}")
  126. with atomic_write_in_dir(stats_path) as f:
  127. f.write(result)
  128. idx += 1
  129. else:
  130. cloudlog.error("stats dir full")
  131. if __name__ == "__main__":
  132. main()
  133. else:
  134. statlog = StatLog()