process_replay.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800
  1. #!/usr/bin/env python3
  2. import os
  3. import time
  4. import copy
  5. import json
  6. import heapq
  7. import signal
  8. import platform
  9. from collections import OrderedDict
  10. from dataclasses import dataclass, field
  11. from typing import Any
  12. from collections.abc import Callable, Iterable
  13. from tqdm import tqdm
  14. import capnp
  15. import cereal.messaging as messaging
  16. from cereal import car
  17. from cereal.services import SERVICE_LIST
  18. from cereal.visionipc import VisionIpcServer, get_endpoint_name as vipc_get_endpoint_name
  19. from openpilot.common.params import Params
  20. from openpilot.common.prefix import OpenpilotPrefix
  21. from openpilot.common.timeout import Timeout
  22. from openpilot.common.realtime import DT_CTRL
  23. from panda.python import ALTERNATIVE_EXPERIENCE
  24. from openpilot.selfdrive.car.car_helpers import get_car, interfaces
  25. from openpilot.selfdrive.manager.process_config import managed_processes
  26. from openpilot.selfdrive.test.process_replay.vision_meta import meta_from_camera_state, available_streams
  27. from openpilot.selfdrive.test.process_replay.migration import migrate_all
  28. from openpilot.selfdrive.test.process_replay.capture import ProcessOutputCapture
  29. from openpilot.tools.lib.logreader import LogIterable
  30. from openpilot.tools.lib.framereader import BaseFrameReader
  31. # Numpy gives different results based on CPU features after version 19
  32. NUMPY_TOLERANCE = 1e-7
  33. PROC_REPLAY_DIR = os.path.dirname(os.path.abspath(__file__))
  34. FAKEDATA = os.path.join(PROC_REPLAY_DIR, "fakedata/")
  35. class DummySocket:
  36. def __init__(self):
  37. self.data: list[bytes] = []
  38. def receive(self, non_blocking: bool = False) -> bytes | None:
  39. if non_blocking:
  40. return None
  41. return self.data.pop()
  42. def send(self, data: bytes):
  43. self.data.append(data)
  44. class LauncherWithCapture:
  45. def __init__(self, capture: ProcessOutputCapture, launcher: Callable):
  46. self.capture = capture
  47. self.launcher = launcher
  48. def __call__(self, *args, **kwargs):
  49. self.capture.link_with_current_proc()
  50. self.launcher(*args, **kwargs)
  51. class ReplayContext:
  52. def __init__(self, cfg):
  53. self.proc_name = cfg.proc_name
  54. self.pubs = cfg.pubs
  55. self.main_pub = cfg.main_pub
  56. self.main_pub_drained = cfg.main_pub_drained
  57. self.unlocked_pubs = cfg.unlocked_pubs
  58. assert(len(self.pubs) != 0 or self.main_pub is not None)
  59. def __enter__(self):
  60. self.open_context()
  61. return self
  62. def __exit__(self, exc_type, exc_obj, exc_tb):
  63. self.close_context()
  64. def open_context(self):
  65. messaging.toggle_fake_events(True)
  66. messaging.set_fake_prefix(self.proc_name)
  67. if self.main_pub is None:
  68. self.events = OrderedDict()
  69. pubs_with_events = [pub for pub in self.pubs if pub not in self.unlocked_pubs]
  70. for pub in pubs_with_events:
  71. self.events[pub] = messaging.fake_event_handle(pub, enable=True)
  72. else:
  73. self.events = {self.main_pub: messaging.fake_event_handle(self.main_pub, enable=True)}
  74. def close_context(self):
  75. del self.events
  76. messaging.toggle_fake_events(False)
  77. messaging.delete_fake_prefix()
  78. @property
  79. def all_recv_called_events(self):
  80. return [man.recv_called_event for man in self.events.values()]
  81. @property
  82. def all_recv_ready_events(self):
  83. return [man.recv_ready_event for man in self.events.values()]
  84. def send_sync(self, pm, endpoint, dat):
  85. self.events[endpoint].recv_called_event.wait()
  86. self.events[endpoint].recv_called_event.clear()
  87. pm.send(endpoint, dat)
  88. self.events[endpoint].recv_ready_event.set()
  89. def unlock_sockets(self):
  90. expected_sets = len(self.events)
  91. while expected_sets > 0:
  92. index = messaging.wait_for_one_event(self.all_recv_called_events)
  93. self.all_recv_called_events[index].clear()
  94. self.all_recv_ready_events[index].set()
  95. expected_sets -= 1
  96. def wait_for_recv_called(self):
  97. messaging.wait_for_one_event(self.all_recv_called_events)
  98. def wait_for_next_recv(self, trigger_empty_recv):
  99. index = messaging.wait_for_one_event(self.all_recv_called_events)
  100. if self.main_pub is not None and self.main_pub_drained and trigger_empty_recv:
  101. self.all_recv_called_events[index].clear()
  102. self.all_recv_ready_events[index].set()
  103. self.all_recv_called_events[index].wait()
  104. @dataclass
  105. class ProcessConfig:
  106. proc_name: str
  107. pubs: list[str]
  108. subs: list[str]
  109. ignore: list[str]
  110. config_callback: Callable | None = None
  111. init_callback: Callable | None = None
  112. should_recv_callback: Callable | None = None
  113. tolerance: float | None = None
  114. processing_time: float = 0.001
  115. timeout: int = 30
  116. simulation: bool = True
  117. main_pub: str | None = None
  118. main_pub_drained: bool = True
  119. vision_pubs: list[str] = field(default_factory=list)
  120. ignore_alive_pubs: list[str] = field(default_factory=list)
  121. unlocked_pubs: list[str] = field(default_factory=list)
  122. class ProcessContainer:
  123. def __init__(self, cfg: ProcessConfig):
  124. self.prefix = OpenpilotPrefix(clean_dirs_on_exit=False)
  125. self.cfg = copy.deepcopy(cfg)
  126. self.process = copy.deepcopy(managed_processes[cfg.proc_name])
  127. self.msg_queue: list[capnp._DynamicStructReader] = []
  128. self.cnt = 0
  129. self.pm: messaging.PubMaster | None = None
  130. self.sockets: list[messaging.SubSocket] | None = None
  131. self.rc: ReplayContext | None = None
  132. self.vipc_server: VisionIpcServer | None = None
  133. self.environ_config: dict[str, Any] | None = None
  134. self.capture: ProcessOutputCapture | None = None
  135. @property
  136. def has_empty_queue(self) -> bool:
  137. return len(self.msg_queue) == 0
  138. @property
  139. def pubs(self) -> list[str]:
  140. return self.cfg.pubs
  141. @property
  142. def subs(self) -> list[str]:
  143. return self.cfg.subs
  144. def _clean_env(self):
  145. for k in self.environ_config.keys():
  146. if k in os.environ:
  147. del os.environ[k]
  148. for k in ["PROC_NAME", "SIMULATION"]:
  149. if k in os.environ:
  150. del os.environ[k]
  151. def _setup_env(self, params_config: dict[str, Any], environ_config: dict[str, Any]):
  152. for k, v in environ_config.items():
  153. if len(v) != 0:
  154. os.environ[k] = v
  155. elif k in os.environ:
  156. del os.environ[k]
  157. os.environ["PROC_NAME"] = self.cfg.proc_name
  158. if self.cfg.simulation:
  159. os.environ["SIMULATION"] = "1"
  160. elif "SIMULATION" in os.environ:
  161. del os.environ["SIMULATION"]
  162. params = Params()
  163. for k, v in params_config.items():
  164. if isinstance(v, bool):
  165. params.put_bool(k, v)
  166. else:
  167. params.put(k, v)
  168. self.environ_config = environ_config
  169. def _setup_vision_ipc(self, all_msgs: LogIterable, frs: dict[str, Any]):
  170. assert len(self.cfg.vision_pubs) != 0
  171. vipc_server = VisionIpcServer("camerad")
  172. streams_metas = available_streams(all_msgs)
  173. for meta in streams_metas:
  174. if meta.camera_state in self.cfg.vision_pubs:
  175. frame_size = (frs[meta.camera_state].w, frs[meta.camera_state].h)
  176. vipc_server.create_buffers(meta.stream, 2, False, *frame_size)
  177. vipc_server.start_listener()
  178. self.vipc_server = vipc_server
  179. self.cfg.vision_pubs = [meta.camera_state for meta in streams_metas if meta.camera_state in self.cfg.vision_pubs]
  180. def _start_process(self):
  181. if self.capture is not None:
  182. self.process.launcher = LauncherWithCapture(self.capture, self.process.launcher)
  183. self.process.prepare()
  184. self.process.start()
  185. def start(
  186. self, params_config: dict[str, Any], environ_config: dict[str, Any],
  187. all_msgs: LogIterable, frs: dict[str, BaseFrameReader] | None,
  188. fingerprint: str | None, capture_output: bool
  189. ):
  190. with self.prefix as p:
  191. self._setup_env(params_config, environ_config)
  192. if self.cfg.config_callback is not None:
  193. params = Params()
  194. self.cfg.config_callback(params, self.cfg, all_msgs)
  195. self.rc = ReplayContext(self.cfg)
  196. self.rc.open_context()
  197. self.pm = messaging.PubMaster(self.cfg.pubs)
  198. self.sockets = [messaging.sub_sock(s, timeout=100) for s in self.cfg.subs]
  199. if len(self.cfg.vision_pubs) != 0:
  200. assert frs is not None
  201. self._setup_vision_ipc(all_msgs, frs)
  202. assert self.vipc_server is not None
  203. if capture_output:
  204. self.capture = ProcessOutputCapture(self.cfg.proc_name, p.prefix)
  205. self._start_process()
  206. if self.cfg.init_callback is not None:
  207. self.cfg.init_callback(self.rc, self.pm, all_msgs, fingerprint)
  208. # wait for process to startup
  209. with Timeout(10, error_msg=f"timed out waiting for process to start: {repr(self.cfg.proc_name)}"):
  210. while not all(self.pm.all_readers_updated(s) for s in self.cfg.pubs if s not in self.cfg.ignore_alive_pubs):
  211. time.sleep(0)
  212. def stop(self):
  213. with self.prefix:
  214. self.process.signal(signal.SIGKILL)
  215. self.process.stop()
  216. self.rc.close_context()
  217. self.prefix.clean_dirs()
  218. self._clean_env()
  219. def run_step(self, msg: capnp._DynamicStructReader, frs: dict[str, BaseFrameReader] | None) -> list[capnp._DynamicStructReader]:
  220. assert self.rc and self.pm and self.sockets and self.process.proc
  221. output_msgs = []
  222. with self.prefix, Timeout(self.cfg.timeout, error_msg=f"timed out testing process {repr(self.cfg.proc_name)}"):
  223. end_of_cycle = True
  224. if self.cfg.should_recv_callback is not None:
  225. end_of_cycle = self.cfg.should_recv_callback(msg, self.cfg, self.cnt)
  226. self.msg_queue.append(msg)
  227. if end_of_cycle:
  228. self.rc.wait_for_recv_called()
  229. # call recv to let sub-sockets reconnect, after we know the process is ready
  230. if self.cnt == 0:
  231. for s in self.sockets:
  232. messaging.recv_one_or_none(s)
  233. # empty recv on drained pub indicates the end of messages, only do that if there're any
  234. trigger_empty_recv = False
  235. if self.cfg.main_pub and self.cfg.main_pub_drained:
  236. trigger_empty_recv = next((True for m in self.msg_queue if m.which() == self.cfg.main_pub), False)
  237. for m in self.msg_queue:
  238. self.pm.send(m.which(), m.as_builder())
  239. # send frames if needed
  240. if self.vipc_server is not None and m.which() in self.cfg.vision_pubs:
  241. camera_state = getattr(m, m.which())
  242. camera_meta = meta_from_camera_state(m.which())
  243. assert frs is not None
  244. img = frs[m.which()].get(camera_state.frameId, pix_fmt="nv12")[0]
  245. self.vipc_server.send(camera_meta.stream, img.flatten().tobytes(),
  246. camera_state.frameId, camera_state.timestampSof, camera_state.timestampEof)
  247. self.msg_queue = []
  248. self.rc.unlock_sockets()
  249. self.rc.wait_for_next_recv(trigger_empty_recv)
  250. for socket in self.sockets:
  251. ms = messaging.drain_sock(socket)
  252. for m in ms:
  253. m = m.as_builder()
  254. m.logMonoTime = msg.logMonoTime + int(self.cfg.processing_time * 1e9)
  255. output_msgs.append(m.as_reader())
  256. self.cnt += 1
  257. assert self.process.proc.is_alive()
  258. return output_msgs
  259. def controlsd_fingerprint_callback(rc, pm, msgs, fingerprint):
  260. print("start fingerprinting")
  261. params = Params()
  262. canmsgs = [msg for msg in msgs if msg.which() == "can"][:300]
  263. # controlsd expects one arbitrary can and pandaState
  264. rc.send_sync(pm, "can", messaging.new_message("can", 1))
  265. pm.send("pandaStates", messaging.new_message("pandaStates", 1))
  266. rc.send_sync(pm, "can", messaging.new_message("can", 1))
  267. rc.wait_for_next_recv(True)
  268. # fingerprinting is done, when CarParams is set
  269. while params.get("CarParams") is None:
  270. if len(canmsgs) == 0:
  271. raise ValueError("Fingerprinting failed. Run out of can msgs")
  272. m = canmsgs.pop(0)
  273. rc.send_sync(pm, "can", m.as_builder().to_bytes())
  274. rc.wait_for_next_recv(False)
  275. def get_car_params_callback(rc, pm, msgs, fingerprint):
  276. params = Params()
  277. if fingerprint:
  278. CarInterface, _, _ = interfaces[fingerprint]
  279. CP = CarInterface.get_non_essential_params(fingerprint)
  280. else:
  281. can = DummySocket()
  282. sendcan = DummySocket()
  283. canmsgs = [msg for msg in msgs if msg.which() == "can"]
  284. has_cached_cp = params.get("CarParamsCache") is not None
  285. assert len(canmsgs) != 0, "CAN messages are required for fingerprinting"
  286. assert os.environ.get("SKIP_FW_QUERY", False) or has_cached_cp, \
  287. "CarParamsCache is required for fingerprinting. Make sure to keep carParams msgs in the logs."
  288. for m in canmsgs[:300]:
  289. can.send(m.as_builder().to_bytes())
  290. _, CP = get_car(can, sendcan, Params().get_bool("ExperimentalLongitudinalEnabled"))
  291. params.put("CarParams", CP.to_bytes())
  292. return CP
  293. def controlsd_rcv_callback(msg, cfg, frame):
  294. # no sendcan until controlsd is initialized
  295. if msg.which() != "can":
  296. return False
  297. socks = [
  298. s for s in cfg.subs if
  299. frame % int(SERVICE_LIST[msg.which()].frequency / SERVICE_LIST[s].frequency) == 0
  300. ]
  301. if "sendcan" in socks and (frame - 1) < 2000:
  302. socks.remove("sendcan")
  303. return len(socks) > 0
  304. def calibration_rcv_callback(msg, cfg, frame):
  305. # calibrationd publishes 1 calibrationData every 5 cameraOdometry packets.
  306. # should_recv always true to increment frame
  307. return (frame - 1) == 0 or msg.which() == 'cameraOdometry'
  308. def torqued_rcv_callback(msg, cfg, frame):
  309. # should_recv always true to increment frame
  310. return (frame - 1) == 0 or msg.which() == 'liveLocationKalman'
  311. def dmonitoringmodeld_rcv_callback(msg, cfg, frame):
  312. return msg.which() == "driverCameraState"
  313. class ModeldCameraSyncRcvCallback:
  314. def __init__(self):
  315. self.road_present = False
  316. self.wide_road_present = False
  317. self.is_dual_camera = True
  318. def __call__(self, msg, cfg, frame):
  319. self.is_dual_camera = len(cfg.vision_pubs) == 2
  320. if msg.which() == "roadCameraState":
  321. self.road_present = True
  322. elif msg.which() == "wideRoadCameraState":
  323. self.wide_road_present = True
  324. if self.road_present and self.wide_road_present:
  325. self.road_present, self.wide_road_present = False, False
  326. return True
  327. elif self.road_present and not self.is_dual_camera:
  328. self.road_present = False
  329. return True
  330. else:
  331. return False
  332. class MessageBasedRcvCallback:
  333. def __init__(self, trigger_msg_type):
  334. self.trigger_msg_type = trigger_msg_type
  335. def __call__(self, msg, cfg, frame):
  336. return msg.which() == self.trigger_msg_type
  337. class FrequencyBasedRcvCallback:
  338. def __init__(self, trigger_msg_type):
  339. self.trigger_msg_type = trigger_msg_type
  340. def __call__(self, msg, cfg, frame):
  341. if msg.which() != self.trigger_msg_type:
  342. return False
  343. resp_sockets = [
  344. s for s in cfg.subs
  345. if frame % max(1, int(SERVICE_LIST[msg.which()].frequency / SERVICE_LIST[s].frequency)) == 0
  346. ]
  347. return bool(len(resp_sockets))
  348. def controlsd_config_callback(params, cfg, lr):
  349. controlsState = None
  350. initialized = False
  351. for msg in lr:
  352. if msg.which() == "controlsState":
  353. controlsState = msg.controlsState
  354. if initialized:
  355. break
  356. elif msg.which() == "onroadEvents":
  357. initialized = car.CarEvent.EventName.controlsInitializing not in [e.name for e in msg.onroadEvents]
  358. assert controlsState is not None and initialized, "controlsState never initialized"
  359. params.put("ReplayControlsState", controlsState.as_builder().to_bytes())
  360. def locationd_config_pubsub_callback(params, cfg, lr):
  361. ublox = params.get_bool("UbloxAvailable")
  362. sub_keys = ({"gpsLocation", } if ublox else {"gpsLocationExternal", })
  363. cfg.pubs = set(cfg.pubs) - sub_keys
  364. CONFIGS = [
  365. ProcessConfig(
  366. proc_name="controlsd",
  367. pubs=[
  368. "can", "deviceState", "pandaStates", "peripheralState", "liveCalibration", "driverMonitoringState",
  369. "longitudinalPlan", "liveLocationKalman", "liveParameters", "radarState",
  370. "modelV2", "driverCameraState", "roadCameraState", "wideRoadCameraState", "managerState",
  371. "testJoystick", "liveTorqueParameters", "accelerometer", "gyroscope"
  372. ],
  373. subs=["controlsState", "carState", "carControl", "sendcan", "onroadEvents", "carParams"],
  374. ignore=["logMonoTime", "controlsState.startMonoTime", "controlsState.cumLagMs"],
  375. config_callback=controlsd_config_callback,
  376. init_callback=controlsd_fingerprint_callback,
  377. should_recv_callback=controlsd_rcv_callback,
  378. tolerance=NUMPY_TOLERANCE,
  379. processing_time=0.004,
  380. main_pub="can",
  381. ),
  382. ProcessConfig(
  383. proc_name="radard",
  384. pubs=["can", "carState", "modelV2"],
  385. subs=["radarState", "liveTracks"],
  386. ignore=["logMonoTime", "radarState.cumLagMs"],
  387. init_callback=get_car_params_callback,
  388. should_recv_callback=MessageBasedRcvCallback("can"),
  389. main_pub="can",
  390. ),
  391. ProcessConfig(
  392. proc_name="plannerd",
  393. pubs=["modelV2", "carControl", "carState", "controlsState", "radarState"],
  394. subs=["longitudinalPlan", "uiPlan"],
  395. ignore=["logMonoTime", "longitudinalPlan.processingDelay", "longitudinalPlan.solverExecutionTime"],
  396. init_callback=get_car_params_callback,
  397. should_recv_callback=FrequencyBasedRcvCallback("modelV2"),
  398. tolerance=NUMPY_TOLERANCE,
  399. ),
  400. ProcessConfig(
  401. proc_name="calibrationd",
  402. pubs=["carState", "cameraOdometry", "carParams"],
  403. subs=["liveCalibration"],
  404. ignore=["logMonoTime"],
  405. should_recv_callback=calibration_rcv_callback,
  406. ),
  407. ProcessConfig(
  408. proc_name="dmonitoringd",
  409. pubs=["driverStateV2", "liveCalibration", "carState", "modelV2", "controlsState"],
  410. subs=["driverMonitoringState"],
  411. ignore=["logMonoTime"],
  412. should_recv_callback=FrequencyBasedRcvCallback("driverStateV2"),
  413. tolerance=NUMPY_TOLERANCE,
  414. ),
  415. ProcessConfig(
  416. proc_name="locationd",
  417. pubs=[
  418. "cameraOdometry", "accelerometer", "gyroscope", "gpsLocationExternal",
  419. "liveCalibration", "carState", "gpsLocation"
  420. ],
  421. subs=["liveLocationKalman"],
  422. ignore=["logMonoTime"],
  423. config_callback=locationd_config_pubsub_callback,
  424. tolerance=NUMPY_TOLERANCE,
  425. ),
  426. ProcessConfig(
  427. proc_name="paramsd",
  428. pubs=["liveLocationKalman", "carState"],
  429. subs=["liveParameters"],
  430. ignore=["logMonoTime"],
  431. init_callback=get_car_params_callback,
  432. should_recv_callback=FrequencyBasedRcvCallback("liveLocationKalman"),
  433. tolerance=NUMPY_TOLERANCE,
  434. processing_time=0.004,
  435. ),
  436. ProcessConfig(
  437. proc_name="ubloxd",
  438. pubs=["ubloxRaw"],
  439. subs=["ubloxGnss", "gpsLocationExternal"],
  440. ignore=["logMonoTime"],
  441. ),
  442. ProcessConfig(
  443. proc_name="torqued",
  444. pubs=["liveLocationKalman", "carState", "carControl"],
  445. subs=["liveTorqueParameters"],
  446. ignore=["logMonoTime"],
  447. init_callback=get_car_params_callback,
  448. should_recv_callback=torqued_rcv_callback,
  449. tolerance=NUMPY_TOLERANCE,
  450. ),
  451. ProcessConfig(
  452. proc_name="modeld",
  453. pubs=["roadCameraState", "wideRoadCameraState", "liveCalibration", "driverMonitoringState"],
  454. subs=["modelV2", "cameraOdometry"],
  455. ignore=["logMonoTime", "modelV2.frameDropPerc", "modelV2.modelExecutionTime"],
  456. should_recv_callback=ModeldCameraSyncRcvCallback(),
  457. tolerance=NUMPY_TOLERANCE,
  458. processing_time=0.020,
  459. main_pub=vipc_get_endpoint_name("camerad", meta_from_camera_state("roadCameraState").stream),
  460. main_pub_drained=False,
  461. vision_pubs=["roadCameraState", "wideRoadCameraState"],
  462. ignore_alive_pubs=["wideRoadCameraState"],
  463. init_callback=get_car_params_callback,
  464. ),
  465. ProcessConfig(
  466. proc_name="dmonitoringmodeld",
  467. pubs=["liveCalibration", "driverCameraState"],
  468. subs=["driverStateV2"],
  469. ignore=["logMonoTime", "driverStateV2.modelExecutionTime", "driverStateV2.dspExecutionTime"],
  470. should_recv_callback=dmonitoringmodeld_rcv_callback,
  471. tolerance=NUMPY_TOLERANCE,
  472. processing_time=0.020,
  473. main_pub=vipc_get_endpoint_name("camerad", meta_from_camera_state("driverCameraState").stream),
  474. main_pub_drained=False,
  475. vision_pubs=["driverCameraState"],
  476. ignore_alive_pubs=["driverCameraState"],
  477. ),
  478. ]
  479. def get_process_config(name: str) -> ProcessConfig:
  480. try:
  481. return copy.deepcopy(next(c for c in CONFIGS if c.proc_name == name))
  482. except StopIteration as ex:
  483. raise Exception(f"Cannot find process config with name: {name}") from ex
  484. def get_custom_params_from_lr(lr: LogIterable, initial_state: str = "first") -> dict[str, Any]:
  485. """
  486. Use this to get custom params dict based on provided logs.
  487. Useful when replaying following processes: calibrationd, paramsd, torqued
  488. The params may be based on first or last message of given type (carParams, liveCalibration, liveParameters, liveTorqueParameters) in the logs.
  489. """
  490. car_params = [m for m in lr if m.which() == "carParams"]
  491. live_calibration = [m for m in lr if m.which() == "liveCalibration"]
  492. live_parameters = [m for m in lr if m.which() == "liveParameters"]
  493. live_torque_parameters = [m for m in lr if m.which() == "liveTorqueParameters"]
  494. assert initial_state in ["first", "last"]
  495. msg_index = 0 if initial_state == "first" else -1
  496. assert len(car_params) > 0, "carParams required for initial state of liveParameters and CarParamsPrevRoute"
  497. CP = car_params[msg_index].carParams
  498. custom_params = {
  499. "CarParamsPrevRoute": CP.as_builder().to_bytes()
  500. }
  501. if len(live_calibration) > 0:
  502. custom_params["CalibrationParams"] = live_calibration[msg_index].as_builder().to_bytes()
  503. if len(live_parameters) > 0:
  504. lp_dict = live_parameters[msg_index].to_dict()
  505. lp_dict["carFingerprint"] = CP.carFingerprint
  506. custom_params["LiveParameters"] = json.dumps(lp_dict)
  507. if len(live_torque_parameters) > 0:
  508. custom_params["LiveTorqueParameters"] = live_torque_parameters[msg_index].as_builder().to_bytes()
  509. return custom_params
  510. def replay_process_with_name(name: str | Iterable[str], lr: LogIterable, *args, **kwargs) -> list[capnp._DynamicStructReader]:
  511. if isinstance(name, str):
  512. cfgs = [get_process_config(name)]
  513. elif isinstance(name, Iterable):
  514. cfgs = [get_process_config(n) for n in name]
  515. else:
  516. raise ValueError("name must be str or collections of strings")
  517. return replay_process(cfgs, lr, *args, **kwargs)
  518. def replay_process(
  519. cfg: ProcessConfig | Iterable[ProcessConfig], lr: LogIterable, frs: dict[str, BaseFrameReader] = None,
  520. fingerprint: str = None, return_all_logs: bool = False, custom_params: dict[str, Any] = None,
  521. captured_output_store: dict[str, dict[str, str]] = None, disable_progress: bool = False
  522. ) -> list[capnp._DynamicStructReader]:
  523. if isinstance(cfg, Iterable):
  524. cfgs = list(cfg)
  525. else:
  526. cfgs = [cfg]
  527. all_msgs = migrate_all(lr, old_logtime=True,
  528. manager_states=True,
  529. panda_states=any("pandaStates" in cfg.pubs for cfg in cfgs),
  530. camera_states=any(len(cfg.vision_pubs) != 0 for cfg in cfgs))
  531. process_logs = _replay_multi_process(cfgs, all_msgs, frs, fingerprint, custom_params, captured_output_store, disable_progress)
  532. if return_all_logs:
  533. keys = {m.which() for m in process_logs}
  534. modified_logs = [m for m in all_msgs if m.which() not in keys]
  535. modified_logs.extend(process_logs)
  536. modified_logs.sort(key=lambda m: int(m.logMonoTime))
  537. log_msgs = modified_logs
  538. else:
  539. log_msgs = process_logs
  540. return log_msgs
  541. def _replay_multi_process(
  542. cfgs: list[ProcessConfig], lr: LogIterable, frs: dict[str, BaseFrameReader] | None, fingerprint: str | None,
  543. custom_params: dict[str, Any] | None, captured_output_store: dict[str, dict[str, str]] | None, disable_progress: bool
  544. ) -> list[capnp._DynamicStructReader]:
  545. if fingerprint is not None:
  546. params_config = generate_params_config(lr=lr, fingerprint=fingerprint, custom_params=custom_params)
  547. env_config = generate_environ_config(fingerprint=fingerprint)
  548. else:
  549. CP = next((m.carParams for m in lr if m.which() == "carParams"), None)
  550. params_config = generate_params_config(lr=lr, CP=CP, custom_params=custom_params)
  551. env_config = generate_environ_config(CP=CP)
  552. # validate frs and vision pubs
  553. all_vision_pubs = [pub for cfg in cfgs for pub in cfg.vision_pubs]
  554. if len(all_vision_pubs) != 0:
  555. assert frs is not None, "frs must be provided when replaying process using vision streams"
  556. assert all(meta_from_camera_state(st) is not None for st in all_vision_pubs), \
  557. f"undefined vision stream spotted, probably misconfigured process: (vision pubs: {all_vision_pubs})"
  558. required_vision_pubs = {m.camera_state for m in available_streams(lr)} & set(all_vision_pubs)
  559. assert all(st in frs for st in required_vision_pubs), f"frs for this process must contain following vision streams: {required_vision_pubs}"
  560. all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime)
  561. log_msgs = []
  562. try:
  563. containers = []
  564. for cfg in cfgs:
  565. container = ProcessContainer(cfg)
  566. containers.append(container)
  567. container.start(params_config, env_config, all_msgs, frs, fingerprint, captured_output_store is not None)
  568. all_pubs = {pub for container in containers for pub in container.pubs}
  569. all_subs = {sub for container in containers for sub in container.subs}
  570. lr_pubs = all_pubs - all_subs
  571. pubs_to_containers = {pub: [container for container in containers if pub in container.pubs] for pub in all_pubs}
  572. pub_msgs = [msg for msg in all_msgs if msg.which() in lr_pubs]
  573. # external queue for messages taken from logs; internal queue for messages generated by processes, which will be republished
  574. external_pub_queue: list[capnp._DynamicStructReader] = pub_msgs.copy()
  575. internal_pub_queue: list[capnp._DynamicStructReader] = []
  576. # heap for maintaining the order of messages generated by processes, where each element: (logMonoTime, index in internal_pub_queue)
  577. internal_pub_index_heap: list[tuple[int, int]] = []
  578. pbar = tqdm(total=len(external_pub_queue), disable=disable_progress)
  579. while len(external_pub_queue) != 0 or (len(internal_pub_index_heap) != 0 and not all(c.has_empty_queue for c in containers)):
  580. if len(internal_pub_index_heap) == 0 or (len(external_pub_queue) != 0 and external_pub_queue[0].logMonoTime < internal_pub_index_heap[0][0]):
  581. msg = external_pub_queue.pop(0)
  582. pbar.update(1)
  583. else:
  584. _, index = heapq.heappop(internal_pub_index_heap)
  585. msg = internal_pub_queue[index]
  586. target_containers = pubs_to_containers[msg.which()]
  587. for container in target_containers:
  588. output_msgs = container.run_step(msg, frs)
  589. for m in output_msgs:
  590. if m.which() in all_pubs:
  591. internal_pub_queue.append(m)
  592. heapq.heappush(internal_pub_index_heap, (m.logMonoTime, len(internal_pub_queue) - 1))
  593. log_msgs.extend(output_msgs)
  594. finally:
  595. for container in containers:
  596. container.stop()
  597. if captured_output_store is not None:
  598. assert container.capture is not None
  599. out, err = container.capture.read_outerr()
  600. captured_output_store[container.cfg.proc_name] = {"out": out, "err": err}
  601. return log_msgs
  602. def generate_params_config(lr=None, CP=None, fingerprint=None, custom_params=None) -> dict[str, Any]:
  603. params_dict = {
  604. "OpenpilotEnabledToggle": True,
  605. "DisengageOnAccelerator": True,
  606. "DisableLogging": False,
  607. }
  608. if custom_params is not None:
  609. params_dict.update(custom_params)
  610. if lr is not None:
  611. has_ublox = any(msg.which() == "ubloxGnss" for msg in lr)
  612. params_dict["UbloxAvailable"] = has_ublox
  613. is_rhd = next((msg.driverMonitoringState.isRHD for msg in lr if msg.which() == "driverMonitoringState"), False)
  614. params_dict["IsRhdDetected"] = is_rhd
  615. if CP is not None:
  616. if CP.alternativeExperience == ALTERNATIVE_EXPERIENCE.DISABLE_DISENGAGE_ON_GAS:
  617. params_dict["DisengageOnAccelerator"] = False
  618. if fingerprint is None:
  619. if CP.fingerprintSource == "fw":
  620. params_dict["CarParamsCache"] = CP.as_builder().to_bytes()
  621. if CP.openpilotLongitudinalControl:
  622. params_dict["ExperimentalLongitudinalEnabled"] = True
  623. if CP.notCar:
  624. params_dict["JoystickDebugMode"] = True
  625. return params_dict
  626. def generate_environ_config(CP=None, fingerprint=None, log_dir=None) -> dict[str, Any]:
  627. environ_dict = {}
  628. if platform.system() != "Darwin":
  629. environ_dict["PARAMS_ROOT"] = "/dev/shm/params"
  630. if log_dir is not None:
  631. environ_dict["LOG_ROOT"] = log_dir
  632. environ_dict["REPLAY"] = "1"
  633. # Regen or python process
  634. if CP is not None and fingerprint is None:
  635. if CP.fingerprintSource == "fw":
  636. environ_dict['SKIP_FW_QUERY'] = ""
  637. environ_dict['FINGERPRINT'] = ""
  638. else:
  639. environ_dict['SKIP_FW_QUERY'] = "1"
  640. environ_dict['FINGERPRINT'] = CP.carFingerprint
  641. elif fingerprint is not None:
  642. environ_dict['SKIP_FW_QUERY'] = "1"
  643. environ_dict['FINGERPRINT'] = fingerprint
  644. else:
  645. environ_dict["SKIP_FW_QUERY"] = ""
  646. environ_dict["FINGERPRINT"] = ""
  647. return environ_dict
  648. def check_openpilot_enabled(msgs: LogIterable) -> bool:
  649. cur_enabled_count = 0
  650. max_enabled_count = 0
  651. for msg in msgs:
  652. if msg.which() == "carParams":
  653. if msg.carParams.notCar:
  654. return True
  655. elif msg.which() == "controlsState":
  656. if msg.controlsState.active:
  657. cur_enabled_count += 1
  658. else:
  659. cur_enabled_count = 0
  660. max_enabled_count = max(max_enabled_count, cur_enabled_count)
  661. return max_enabled_count > int(10. / DT_CTRL)