camera_reader.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. import tqdm
  2. import subprocess
  3. import multiprocessing
  4. from enum import StrEnum
  5. from functools import partial
  6. from collections import namedtuple
  7. from openpilot.tools.lib.framereader import ffprobe
  8. CameraConfig = namedtuple("CameraConfig", ["qcam", "fcam", "ecam", "dcam"])
  9. class CameraType(StrEnum):
  10. qcam = "qcamera"
  11. fcam = "fcamera"
  12. ecam = "ecamera"
  13. dcam = "dcamera"
  14. def probe_packet_info(camera_path):
  15. args = ["ffprobe", "-v", "quiet", "-show_packets", "-probesize", "10M", camera_path]
  16. dat = subprocess.check_output(args)
  17. dat = dat.decode().split()
  18. return dat
  19. class _FrameReader:
  20. def __init__(self, camera_path, segment, h, w, start_time):
  21. self.camera_path = camera_path
  22. self.segment = segment
  23. self.h = h
  24. self.w = w
  25. self.start_time = start_time
  26. self.ts = self._get_ts()
  27. def _read_stream_nv12(self):
  28. frame_sz = self.w * self.h * 3 // 2
  29. proc = subprocess.Popen(
  30. ["ffmpeg", "-v", "quiet", "-i", self.camera_path, "-f", "rawvideo", "-pix_fmt", "nv12", "-"],
  31. stdin=subprocess.PIPE,
  32. stdout=subprocess.PIPE,
  33. stderr=subprocess.DEVNULL
  34. )
  35. try:
  36. while True:
  37. dat = proc.stdout.read(frame_sz)
  38. if len(dat) == 0:
  39. break
  40. yield dat
  41. finally:
  42. proc.kill()
  43. def _get_ts(self):
  44. dat = probe_packet_info(self.camera_path)
  45. try:
  46. ret = [float(d.split('=')[1]) for d in dat if d.startswith("pts_time=")]
  47. except ValueError:
  48. # pts_times aren't available. Infer timestamps from duration_times
  49. ret = [d for d in dat if d.startswith("duration_time")]
  50. ret = [float(d.split('=')[1])*(i+1)+(self.segment*60)+self.start_time for i, d in enumerate(ret)]
  51. return ret
  52. def __iter__(self):
  53. for i, frame in enumerate(self._read_stream_nv12()):
  54. yield self.ts[i], frame
  55. class CameraReader:
  56. def __init__(self, camera_paths, start_time, seg_idxs):
  57. self.seg_idxs = seg_idxs
  58. self.camera_paths = camera_paths
  59. self.start_time = start_time
  60. probe = ffprobe(camera_paths[0])["streams"][0]
  61. self.h = probe["height"]
  62. self.w = probe["width"]
  63. self.__frs = {}
  64. def _get_fr(self, i):
  65. if i not in self.__frs:
  66. self.__frs[i] = _FrameReader(self.camera_paths[i], segment=i, h=self.h, w=self.w, start_time=self.start_time)
  67. return self.__frs[i]
  68. def _run_on_segment(self, func, i):
  69. return func(self._get_fr(i))
  70. def run_across_segments(self, num_processes, func, desc=None):
  71. with multiprocessing.Pool(num_processes) as pool:
  72. num_segs = len(self.seg_idxs)
  73. for _ in tqdm.tqdm(pool.imap_unordered(partial(self._run_on_segment, func), self.seg_idxs), total=num_segs, desc=desc):
  74. continue