123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- #!/usr/bin/env python3
- import sys
- import argparse
- import multiprocessing
- import rerun as rr
- import rerun.blueprint as rrb
- from functools import partial
- from collections import defaultdict
- from cereal.services import SERVICE_LIST
- from openpilot.tools.rerun.camera_reader import probe_packet_info, CameraReader, CameraConfig, CameraType
- from openpilot.tools.lib.logreader import LogReader
- from openpilot.tools.lib.route import Route, SegmentRange
- NUM_CPUS = multiprocessing.cpu_count()
- DEMO_ROUTE = "a2a0ccea32023010|2023-07-27--13-01-19"
- RR_TIMELINE_NAME = "Timeline"
- RR_WIN = "openpilot logs"
- """
- Relevant upstream Rerun issues:
- - loading videos directly: https://github.com/rerun-io/rerun/issues/6532
- """
- class Rerunner:
- def __init__(self, route, segment_range, camera_config):
- self.lr = LogReader(route_or_segment_name)
- # hevc files don't have start_time. We get it from qcamera.ts
- start_time = 0
- dat = probe_packet_info(route.qcamera_paths()[0])
- for d in dat:
- if d.startswith("pts_time="):
- start_time = float(d.split('=')[1])
- break
- qcam, fcam, ecam, dcam = camera_config
- self.camera_readers = {}
- if qcam:
- self.camera_readers[CameraType.qcam] = CameraReader(route.qcamera_paths(), start_time, segment_range.seg_idxs)
- if fcam:
- self.camera_readers[CameraType.fcam] = CameraReader(route.camera_paths(), start_time, segment_range.seg_idxs)
- if ecam:
- self.camera_readers[CameraType.ecam] = CameraReader(route.ecamera_paths(), start_time, segment_range.seg_idxs)
- if dcam:
- self.camera_readers[CameraType.dcam] = CameraReader(route.dcamera_paths(), start_time, segment_range.seg_idxs)
- def _create_blueprint(self):
- blueprint = None
- service_views = []
- for topic in sorted(SERVICE_LIST.keys()):
- View = rrb.TimeSeriesView if topic != "thumbnail" else rrb.Spatial2DView
- service_views.append(View(name=topic, origin=f"/{topic}/", visible=False))
- rr.log(topic, rr.SeriesLine(name=topic), timeless=True)
- center_view = [rrb.Vertical(*service_views, name="streams")]
- if len(self.camera_readers):
- center_view.append(rrb.Vertical(*[rrb.Spatial2DView(name=cam_type, origin=cam_type) for cam_type in self.camera_readers.keys()], name="cameras"))
- blueprint = rrb.Blueprint(
- rrb.Horizontal(
- *center_view
- ),
- rrb.SelectionPanel(expanded=False),
- rrb.TimePanel(expanded=False),
- )
- return blueprint
- @staticmethod
- def _parse_msg(msg, parent_key=''):
- stack = [(msg, parent_key)]
- while stack:
- current_msg, current_parent_key = stack.pop()
- if isinstance(current_msg, list):
- for index, item in enumerate(current_msg):
- new_key = f"{current_parent_key}/{index}"
- if isinstance(item, (int, float)):
- yield new_key, item
- elif isinstance(item, dict):
- stack.append((item, new_key))
- elif isinstance(current_msg, dict):
- for key, value in current_msg.items():
- new_key = f"{current_parent_key}/{key}"
- if isinstance(value, (int, float)):
- yield new_key, value
- elif isinstance(value, dict):
- stack.append((value, new_key))
- elif isinstance(value, list):
- for index, item in enumerate(value):
- if isinstance(item, (int, float)):
- yield f"{new_key}/{index}", item
- else:
- pass # Not a plottable value
- @staticmethod
- @rr.shutdown_at_exit
- def _process_log_msgs(blueprint, lr):
- rr.init(RR_WIN)
- rr.connect()
- rr.send_blueprint(blueprint)
- log_msgs = defaultdict(lambda: defaultdict(list))
- for msg in lr:
- msg_type = msg.which()
- if msg_type == "thumbnail":
- continue
- for entity_path, dat in Rerunner._parse_msg(msg.to_dict()[msg_type], msg_type):
- log_msgs[entity_path]["times"].append(msg.logMonoTime)
- log_msgs[entity_path]["data"].append(dat)
- for entity_path, log_msg in log_msgs.items():
- rr.send_columns(
- entity_path,
- times=[rr.TimeNanosColumn(RR_TIMELINE_NAME, log_msg["times"])],
- components=[rr.components.ScalarBatch(log_msg["data"])]
- )
- return []
- @staticmethod
- @rr.shutdown_at_exit
- def _process_cam_readers(blueprint, cam_type, h, w, fr):
- rr.init(RR_WIN)
- rr.connect()
- rr.send_blueprint(blueprint)
- for ts, frame in fr:
- rr.set_time_nanos(RR_TIMELINE_NAME, int(ts * 1e9))
- rr.log(cam_type, rr.Image(bytes=frame, width=w, height=h, pixel_format=rr.PixelFormat.NV12))
- def load_data(self):
- rr.init(RR_WIN, spawn=True)
- startup_blueprint = self._create_blueprint()
- self.lr.run_across_segments(NUM_CPUS, partial(self._process_log_msgs, startup_blueprint), desc="Log messages")
- for cam_type, cr in self.camera_readers.items():
- cr.run_across_segments(NUM_CPUS, partial(self._process_cam_readers, startup_blueprint, cam_type, cr.h, cr.w), desc=cam_type)
- rr.send_blueprint(self._create_blueprint())
- if __name__ == '__main__':
- parser = argparse.ArgumentParser(description="A helper to run rerun on openpilot routes",
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
- parser.add_argument("--demo", action="store_true", help="Use the demo route instead of providing one")
- parser.add_argument("--qcam", action="store_true", help="Show low-res road camera")
- parser.add_argument("--fcam", action="store_true", help="Show driving camera")
- parser.add_argument("--ecam", action="store_true", help="Show wide camera")
- parser.add_argument("--dcam", action="store_true", help="Show driver monitoring camera")
- parser.add_argument("route_or_segment_name", nargs='?', help="The route or segment name to plot")
- args = parser.parse_args()
- if not args.demo and not args.route_or_segment_name:
- parser.print_help()
- sys.exit()
- camera_config = CameraConfig(args.qcam, args.fcam, args.ecam, args.dcam)
- route_or_segment_name = DEMO_ROUTE if args.demo else args.route_or_segment_name.strip()
- sr = SegmentRange(route_or_segment_name)
- r = Route(sr.route_name)
- hevc_requested = any(camera_config[1:])
- if len(sr.seg_idxs) > 1 and hevc_requested:
- print("You're requesting more than 1 segment with hevc videos, " + \
- "please be aware that might take a lot of memory " + \
- "since rerun isn't yet well supported for high resolution video logging")
- response = input("Do you wish to continue? (Y/n): ")
- if response.strip().lower() != "y":
- sys.exit()
- rerunner = Rerunner(r, sr, camera_config)
- rerunner.load_data()
|