run.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. #!/usr/bin/env python3
  2. import sys
  3. import argparse
  4. import multiprocessing
  5. import rerun as rr
  6. import rerun.blueprint as rrb
  7. from functools import partial
  8. from collections import defaultdict
  9. from cereal.services import SERVICE_LIST
  10. from openpilot.tools.rerun.camera_reader import probe_packet_info, CameraReader, CameraConfig, CameraType
  11. from openpilot.tools.lib.logreader import LogReader
  12. from openpilot.tools.lib.route import Route, SegmentRange
  13. NUM_CPUS = multiprocessing.cpu_count()
  14. DEMO_ROUTE = "a2a0ccea32023010|2023-07-27--13-01-19"
  15. RR_TIMELINE_NAME = "Timeline"
  16. RR_WIN = "openpilot logs"
  17. """
  18. Relevant upstream Rerun issues:
  19. - loading videos directly: https://github.com/rerun-io/rerun/issues/6532
  20. """
  21. class Rerunner:
  22. def __init__(self, route, segment_range, camera_config):
  23. self.lr = LogReader(route_or_segment_name)
  24. # hevc files don't have start_time. We get it from qcamera.ts
  25. start_time = 0
  26. dat = probe_packet_info(route.qcamera_paths()[0])
  27. for d in dat:
  28. if d.startswith("pts_time="):
  29. start_time = float(d.split('=')[1])
  30. break
  31. qcam, fcam, ecam, dcam = camera_config
  32. self.camera_readers = {}
  33. if qcam:
  34. self.camera_readers[CameraType.qcam] = CameraReader(route.qcamera_paths(), start_time, segment_range.seg_idxs)
  35. if fcam:
  36. self.camera_readers[CameraType.fcam] = CameraReader(route.camera_paths(), start_time, segment_range.seg_idxs)
  37. if ecam:
  38. self.camera_readers[CameraType.ecam] = CameraReader(route.ecamera_paths(), start_time, segment_range.seg_idxs)
  39. if dcam:
  40. self.camera_readers[CameraType.dcam] = CameraReader(route.dcamera_paths(), start_time, segment_range.seg_idxs)
  41. def _create_blueprint(self):
  42. blueprint = None
  43. service_views = []
  44. for topic in sorted(SERVICE_LIST.keys()):
  45. View = rrb.TimeSeriesView if topic != "thumbnail" else rrb.Spatial2DView
  46. service_views.append(View(name=topic, origin=f"/{topic}/", visible=False))
  47. rr.log(topic, rr.SeriesLine(name=topic), timeless=True)
  48. center_view = [rrb.Vertical(*service_views, name="streams")]
  49. if len(self.camera_readers):
  50. center_view.append(rrb.Vertical(*[rrb.Spatial2DView(name=cam_type, origin=cam_type) for cam_type in self.camera_readers.keys()], name="cameras"))
  51. blueprint = rrb.Blueprint(
  52. rrb.Horizontal(
  53. *center_view
  54. ),
  55. rrb.SelectionPanel(expanded=False),
  56. rrb.TimePanel(expanded=False),
  57. )
  58. return blueprint
  59. @staticmethod
  60. def _parse_msg(msg, parent_key=''):
  61. stack = [(msg, parent_key)]
  62. while stack:
  63. current_msg, current_parent_key = stack.pop()
  64. if isinstance(current_msg, list):
  65. for index, item in enumerate(current_msg):
  66. new_key = f"{current_parent_key}/{index}"
  67. if isinstance(item, (int, float)):
  68. yield new_key, item
  69. elif isinstance(item, dict):
  70. stack.append((item, new_key))
  71. elif isinstance(current_msg, dict):
  72. for key, value in current_msg.items():
  73. new_key = f"{current_parent_key}/{key}"
  74. if isinstance(value, (int, float)):
  75. yield new_key, value
  76. elif isinstance(value, dict):
  77. stack.append((value, new_key))
  78. elif isinstance(value, list):
  79. for index, item in enumerate(value):
  80. if isinstance(item, (int, float)):
  81. yield f"{new_key}/{index}", item
  82. else:
  83. pass # Not a plottable value
  84. @staticmethod
  85. @rr.shutdown_at_exit
  86. def _process_log_msgs(blueprint, lr):
  87. rr.init(RR_WIN)
  88. rr.connect()
  89. rr.send_blueprint(blueprint)
  90. log_msgs = defaultdict(lambda: defaultdict(list))
  91. for msg in lr:
  92. msg_type = msg.which()
  93. if msg_type == "thumbnail":
  94. continue
  95. for entity_path, dat in Rerunner._parse_msg(msg.to_dict()[msg_type], msg_type):
  96. log_msgs[entity_path]["times"].append(msg.logMonoTime)
  97. log_msgs[entity_path]["data"].append(dat)
  98. for entity_path, log_msg in log_msgs.items():
  99. rr.send_columns(
  100. entity_path,
  101. times=[rr.TimeNanosColumn(RR_TIMELINE_NAME, log_msg["times"])],
  102. components=[rr.components.ScalarBatch(log_msg["data"])]
  103. )
  104. return []
  105. @staticmethod
  106. @rr.shutdown_at_exit
  107. def _process_cam_readers(blueprint, cam_type, h, w, fr):
  108. rr.init(RR_WIN)
  109. rr.connect()
  110. rr.send_blueprint(blueprint)
  111. for ts, frame in fr:
  112. rr.set_time_nanos(RR_TIMELINE_NAME, int(ts * 1e9))
  113. rr.log(cam_type, rr.Image(bytes=frame, width=w, height=h, pixel_format=rr.PixelFormat.NV12))
  114. def load_data(self):
  115. rr.init(RR_WIN, spawn=True)
  116. startup_blueprint = self._create_blueprint()
  117. self.lr.run_across_segments(NUM_CPUS, partial(self._process_log_msgs, startup_blueprint), desc="Log messages")
  118. for cam_type, cr in self.camera_readers.items():
  119. cr.run_across_segments(NUM_CPUS, partial(self._process_cam_readers, startup_blueprint, cam_type, cr.h, cr.w), desc=cam_type)
  120. rr.send_blueprint(self._create_blueprint())
  121. if __name__ == '__main__':
  122. parser = argparse.ArgumentParser(description="A helper to run rerun on openpilot routes",
  123. formatter_class=argparse.ArgumentDefaultsHelpFormatter)
  124. parser.add_argument("--demo", action="store_true", help="Use the demo route instead of providing one")
  125. parser.add_argument("--qcam", action="store_true", help="Show low-res road camera")
  126. parser.add_argument("--fcam", action="store_true", help="Show driving camera")
  127. parser.add_argument("--ecam", action="store_true", help="Show wide camera")
  128. parser.add_argument("--dcam", action="store_true", help="Show driver monitoring camera")
  129. parser.add_argument("route_or_segment_name", nargs='?', help="The route or segment name to plot")
  130. args = parser.parse_args()
  131. if not args.demo and not args.route_or_segment_name:
  132. parser.print_help()
  133. sys.exit()
  134. camera_config = CameraConfig(args.qcam, args.fcam, args.ecam, args.dcam)
  135. route_or_segment_name = DEMO_ROUTE if args.demo else args.route_or_segment_name.strip()
  136. sr = SegmentRange(route_or_segment_name)
  137. r = Route(sr.route_name)
  138. hevc_requested = any(camera_config[1:])
  139. if len(sr.seg_idxs) > 1 and hevc_requested:
  140. print("You're requesting more than 1 segment with hevc videos, " + \
  141. "please be aware that might take a lot of memory " + \
  142. "since rerun isn't yet well supported for high resolution video logging")
  143. response = input("Do you wish to continue? (Y/n): ")
  144. if response.strip().lower() != "y":
  145. sys.exit()
  146. rerunner = Rerunner(r, sr, camera_config)
  147. rerunner.load_data()