modeld.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. #!/usr/bin/env python3
  2. import os
  3. import time
  4. import pickle
  5. import numpy as np
  6. import cereal.messaging as messaging
  7. from cereal import car, log
  8. from pathlib import Path
  9. from setproctitle import setproctitle
  10. from cereal.messaging import PubMaster, SubMaster
  11. from msgq.visionipc import VisionIpcClient, VisionStreamType, VisionBuf
  12. from opendbc.car.car_helpers import get_demo_car_params
  13. from openpilot.common.swaglog import cloudlog
  14. from openpilot.common.params import Params
  15. from openpilot.common.filter_simple import FirstOrderFilter
  16. from openpilot.common.realtime import config_realtime_process
  17. from openpilot.common.transformations.camera import DEVICE_CAMERAS
  18. from openpilot.common.transformations.model import get_warp_matrix
  19. from openpilot.system import sentry
  20. from openpilot.selfdrive.controls.lib.desire_helper import DesireHelper
  21. from openpilot.selfdrive.modeld.runners import ModelRunner, Runtime
  22. from openpilot.selfdrive.modeld.parse_model_outputs import Parser
  23. from openpilot.selfdrive.modeld.fill_model_msg import fill_model_msg, fill_pose_msg, PublishState
  24. from openpilot.selfdrive.modeld.constants import ModelConstants
  25. from openpilot.selfdrive.modeld.models.commonmodel_pyx import ModelFrame, CLContext
  26. PROCESS_NAME = "selfdrive.modeld.modeld"
  27. SEND_RAW_PRED = os.getenv('SEND_RAW_PRED')
  28. MODEL_PATHS = {
  29. ModelRunner.THNEED: Path(__file__).parent / 'models/supercombo.thneed',
  30. ModelRunner.ONNX: Path(__file__).parent / 'models/supercombo.onnx'}
  31. METADATA_PATH = Path(__file__).parent / 'models/supercombo_metadata.pkl'
  32. class FrameMeta:
  33. frame_id: int = 0
  34. timestamp_sof: int = 0
  35. timestamp_eof: int = 0
  36. def __init__(self, vipc=None):
  37. if vipc is not None:
  38. self.frame_id, self.timestamp_sof, self.timestamp_eof = vipc.frame_id, vipc.timestamp_sof, vipc.timestamp_eof
  39. class ModelState:
  40. frame: ModelFrame
  41. wide_frame: ModelFrame
  42. inputs: dict[str, np.ndarray]
  43. output: np.ndarray
  44. prev_desire: np.ndarray # for tracking the rising edge of the pulse
  45. model: ModelRunner
  46. def __init__(self, context: CLContext):
  47. self.frame = ModelFrame(context)
  48. self.wide_frame = ModelFrame(context)
  49. self.prev_desire = np.zeros(ModelConstants.DESIRE_LEN, dtype=np.float32)
  50. self.full_features_20Hz = np.zeros((ModelConstants.FULL_HISTORY_BUFFER_LEN, ModelConstants.FEATURE_LEN), dtype=np.float32)
  51. self.desire_20Hz = np.zeros((ModelConstants.FULL_HISTORY_BUFFER_LEN + 1, ModelConstants.DESIRE_LEN), dtype=np.float32)
  52. self.prev_desired_curv_20hz = np.zeros((ModelConstants.FULL_HISTORY_BUFFER_LEN + 1, ModelConstants.PREV_DESIRED_CURV_LEN), dtype=np.float32)
  53. # img buffers are managed in openCL transform code
  54. self.inputs = {
  55. 'desire': np.zeros(ModelConstants.DESIRE_LEN * (ModelConstants.HISTORY_BUFFER_LEN+1), dtype=np.float32),
  56. 'traffic_convention': np.zeros(ModelConstants.TRAFFIC_CONVENTION_LEN, dtype=np.float32),
  57. 'lateral_control_params': np.zeros(ModelConstants.LATERAL_CONTROL_PARAMS_LEN, dtype=np.float32),
  58. 'prev_desired_curv': np.zeros(ModelConstants.PREV_DESIRED_CURV_LEN * (ModelConstants.HISTORY_BUFFER_LEN+1), dtype=np.float32),
  59. 'features_buffer': np.zeros(ModelConstants.HISTORY_BUFFER_LEN * ModelConstants.FEATURE_LEN, dtype=np.float32),
  60. }
  61. with open(METADATA_PATH, 'rb') as f:
  62. model_metadata = pickle.load(f)
  63. self.output_slices = model_metadata['output_slices']
  64. net_output_size = model_metadata['output_shapes']['outputs'][1]
  65. self.output = np.zeros(net_output_size, dtype=np.float32)
  66. self.parser = Parser()
  67. self.model = ModelRunner(MODEL_PATHS, self.output, Runtime.GPU, False, context)
  68. self.model.addInput("input_imgs", None)
  69. self.model.addInput("big_input_imgs", None)
  70. for k,v in self.inputs.items():
  71. self.model.addInput(k, v)
  72. def slice_outputs(self, model_outputs: np.ndarray) -> dict[str, np.ndarray]:
  73. parsed_model_outputs = {k: model_outputs[np.newaxis, v] for k,v in self.output_slices.items()}
  74. if SEND_RAW_PRED:
  75. parsed_model_outputs['raw_pred'] = model_outputs.copy()
  76. return parsed_model_outputs
  77. def run(self, buf: VisionBuf, wbuf: VisionBuf, transform: np.ndarray, transform_wide: np.ndarray,
  78. inputs: dict[str, np.ndarray], prepare_only: bool) -> dict[str, np.ndarray] | None:
  79. # Model decides when action is completed, so desire input is just a pulse triggered on rising edge
  80. inputs['desire'][0] = 0
  81. new_desire = np.where(inputs['desire'] - self.prev_desire > .99, inputs['desire'], 0)
  82. self.prev_desire[:] = inputs['desire']
  83. self.desire_20Hz[:-1] = self.desire_20Hz[1:]
  84. self.desire_20Hz[-1] = new_desire
  85. self.inputs['desire'][:] = self.desire_20Hz.reshape((25,4,-1)).max(axis=1).flatten()
  86. self.inputs['traffic_convention'][:] = inputs['traffic_convention']
  87. self.inputs['lateral_control_params'][:] = inputs['lateral_control_params']
  88. self.model.setInputBuffer("input_imgs", self.frame.prepare(buf, transform.flatten(), self.model.getCLBuffer("input_imgs")))
  89. self.model.setInputBuffer("big_input_imgs", self.wide_frame.prepare(wbuf, transform_wide.flatten(), self.model.getCLBuffer("big_input_imgs")))
  90. if prepare_only:
  91. return None
  92. self.model.execute()
  93. outputs = self.parser.parse_outputs(self.slice_outputs(self.output))
  94. self.full_features_20Hz[:-1] = self.full_features_20Hz[1:]
  95. self.full_features_20Hz[-1] = outputs['hidden_state'][0, :]
  96. self.prev_desired_curv_20hz[:-1] = self.prev_desired_curv_20hz[1:]
  97. self.prev_desired_curv_20hz[-1] = outputs['desired_curvature'][0, :]
  98. idxs = np.arange(-4,-100,-4)[::-1]
  99. self.inputs['features_buffer'][:] = self.full_features_20Hz[idxs].flatten()
  100. # TODO model only uses last value now, once that changes we need to input strided action history buffer
  101. self.inputs['prev_desired_curv'][-ModelConstants.PREV_DESIRED_CURV_LEN:] = 0. * self.prev_desired_curv_20hz[-4, :]
  102. return outputs
  103. def main(demo=False):
  104. cloudlog.warning("modeld init")
  105. sentry.set_tag("daemon", PROCESS_NAME)
  106. cloudlog.bind(daemon=PROCESS_NAME)
  107. setproctitle(PROCESS_NAME)
  108. config_realtime_process(7, 54)
  109. cloudlog.warning("setting up CL context")
  110. cl_context = CLContext()
  111. cloudlog.warning("CL context ready; loading model")
  112. model = ModelState(cl_context)
  113. cloudlog.warning("models loaded, modeld starting")
  114. # visionipc clients
  115. while True:
  116. available_streams = VisionIpcClient.available_streams("camerad", block=False)
  117. if available_streams:
  118. use_extra_client = VisionStreamType.VISION_STREAM_WIDE_ROAD in available_streams and VisionStreamType.VISION_STREAM_ROAD in available_streams
  119. main_wide_camera = VisionStreamType.VISION_STREAM_ROAD not in available_streams
  120. break
  121. time.sleep(.1)
  122. vipc_client_main_stream = VisionStreamType.VISION_STREAM_WIDE_ROAD if main_wide_camera else VisionStreamType.VISION_STREAM_ROAD
  123. vipc_client_main = VisionIpcClient("camerad", vipc_client_main_stream, True, cl_context)
  124. vipc_client_extra = VisionIpcClient("camerad", VisionStreamType.VISION_STREAM_WIDE_ROAD, False, cl_context)
  125. cloudlog.warning(f"vision stream set up, main_wide_camera: {main_wide_camera}, use_extra_client: {use_extra_client}")
  126. while not vipc_client_main.connect(False):
  127. time.sleep(0.1)
  128. while use_extra_client and not vipc_client_extra.connect(False):
  129. time.sleep(0.1)
  130. cloudlog.warning(f"connected main cam with buffer size: {vipc_client_main.buffer_len} ({vipc_client_main.width} x {vipc_client_main.height})")
  131. if use_extra_client:
  132. cloudlog.warning(f"connected extra cam with buffer size: {vipc_client_extra.buffer_len} ({vipc_client_extra.width} x {vipc_client_extra.height})")
  133. # messaging
  134. pm = PubMaster(["modelV2", "drivingModelData", "cameraOdometry"])
  135. sm = SubMaster(["deviceState", "carState", "roadCameraState", "liveCalibration", "driverMonitoringState", "carControl"])
  136. publish_state = PublishState()
  137. params = Params()
  138. # setup filter to track dropped frames
  139. frame_dropped_filter = FirstOrderFilter(0., 10., 1. / ModelConstants.MODEL_FREQ)
  140. frame_id = 0
  141. last_vipc_frame_id = 0
  142. run_count = 0
  143. model_transform_main = np.zeros((3, 3), dtype=np.float32)
  144. model_transform_extra = np.zeros((3, 3), dtype=np.float32)
  145. live_calib_seen = False
  146. buf_main, buf_extra = None, None
  147. meta_main = FrameMeta()
  148. meta_extra = FrameMeta()
  149. if demo:
  150. CP = get_demo_car_params()
  151. else:
  152. CP = messaging.log_from_bytes(params.get("CarParams", block=True), car.CarParams)
  153. cloudlog.info("modeld got CarParams: %s", CP.carName)
  154. # TODO this needs more thought, use .2s extra for now to estimate other delays
  155. steer_delay = CP.steerActuatorDelay + .2
  156. DH = DesireHelper()
  157. while True:
  158. # Keep receiving frames until we are at least 1 frame ahead of previous extra frame
  159. while meta_main.timestamp_sof < meta_extra.timestamp_sof + 25000000:
  160. buf_main = vipc_client_main.recv()
  161. meta_main = FrameMeta(vipc_client_main)
  162. if buf_main is None:
  163. break
  164. if buf_main is None:
  165. cloudlog.debug("vipc_client_main no frame")
  166. continue
  167. if use_extra_client:
  168. # Keep receiving extra frames until frame id matches main camera
  169. while True:
  170. buf_extra = vipc_client_extra.recv()
  171. meta_extra = FrameMeta(vipc_client_extra)
  172. if buf_extra is None or meta_main.timestamp_sof < meta_extra.timestamp_sof + 25000000:
  173. break
  174. if buf_extra is None:
  175. cloudlog.debug("vipc_client_extra no frame")
  176. continue
  177. if abs(meta_main.timestamp_sof - meta_extra.timestamp_sof) > 10000000:
  178. cloudlog.error(f"frames out of sync! main: {meta_main.frame_id} ({meta_main.timestamp_sof / 1e9:.5f}),\
  179. extra: {meta_extra.frame_id} ({meta_extra.timestamp_sof / 1e9:.5f})")
  180. else:
  181. # Use single camera
  182. buf_extra = buf_main
  183. meta_extra = meta_main
  184. sm.update(0)
  185. desire = DH.desire
  186. is_rhd = sm["driverMonitoringState"].isRHD
  187. frame_id = sm["roadCameraState"].frameId
  188. v_ego = max(sm["carState"].vEgo, 0.)
  189. lateral_control_params = np.array([v_ego, steer_delay], dtype=np.float32)
  190. if sm.updated["liveCalibration"] and sm.seen['roadCameraState'] and sm.seen['deviceState']:
  191. device_from_calib_euler = np.array(sm["liveCalibration"].rpyCalib, dtype=np.float32)
  192. dc = DEVICE_CAMERAS[(str(sm['deviceState'].deviceType), str(sm['roadCameraState'].sensor))]
  193. model_transform_main = get_warp_matrix(device_from_calib_euler, dc.ecam.intrinsics if main_wide_camera else dc.fcam.intrinsics, False).astype(np.float32)
  194. model_transform_extra = get_warp_matrix(device_from_calib_euler, dc.ecam.intrinsics, True).astype(np.float32)
  195. live_calib_seen = True
  196. traffic_convention = np.zeros(2)
  197. traffic_convention[int(is_rhd)] = 1
  198. vec_desire = np.zeros(ModelConstants.DESIRE_LEN, dtype=np.float32)
  199. if desire >= 0 and desire < ModelConstants.DESIRE_LEN:
  200. vec_desire[desire] = 1
  201. # tracked dropped frames
  202. vipc_dropped_frames = max(0, meta_main.frame_id - last_vipc_frame_id - 1)
  203. frames_dropped = frame_dropped_filter.update(min(vipc_dropped_frames, 10))
  204. if run_count < 10: # let frame drops warm up
  205. frame_dropped_filter.x = 0.
  206. frames_dropped = 0.
  207. run_count = run_count + 1
  208. frame_drop_ratio = frames_dropped / (1 + frames_dropped)
  209. prepare_only = vipc_dropped_frames > 0
  210. if prepare_only:
  211. cloudlog.error(f"skipping model eval. Dropped {vipc_dropped_frames} frames")
  212. inputs:dict[str, np.ndarray] = {
  213. 'desire': vec_desire,
  214. 'traffic_convention': traffic_convention,
  215. 'lateral_control_params': lateral_control_params,
  216. }
  217. mt1 = time.perf_counter()
  218. model_output = model.run(buf_main, buf_extra, model_transform_main, model_transform_extra, inputs, prepare_only)
  219. mt2 = time.perf_counter()
  220. model_execution_time = mt2 - mt1
  221. if model_output is not None:
  222. modelv2_send = messaging.new_message('modelV2')
  223. drivingdata_send = messaging.new_message('drivingModelData')
  224. posenet_send = messaging.new_message('cameraOdometry')
  225. fill_model_msg(drivingdata_send, modelv2_send, model_output, publish_state, meta_main.frame_id, meta_extra.frame_id, frame_id,
  226. frame_drop_ratio, meta_main.timestamp_eof, model_execution_time, live_calib_seen)
  227. desire_state = modelv2_send.modelV2.meta.desireState
  228. l_lane_change_prob = desire_state[log.Desire.laneChangeLeft]
  229. r_lane_change_prob = desire_state[log.Desire.laneChangeRight]
  230. lane_change_prob = l_lane_change_prob + r_lane_change_prob
  231. DH.update(sm['carState'], sm['carControl'].latActive, lane_change_prob)
  232. modelv2_send.modelV2.meta.laneChangeState = DH.lane_change_state
  233. modelv2_send.modelV2.meta.laneChangeDirection = DH.lane_change_direction
  234. drivingdata_send.drivingModelData.meta.laneChangeState = DH.lane_change_state
  235. drivingdata_send.drivingModelData.meta.laneChangeDirection = DH.lane_change_direction
  236. fill_pose_msg(posenet_send, model_output, meta_main.frame_id, vipc_dropped_frames, meta_main.timestamp_eof, live_calib_seen)
  237. pm.send('modelV2', modelv2_send)
  238. pm.send('drivingModelData', drivingdata_send)
  239. pm.send('cameraOdometry', posenet_send)
  240. last_vipc_frame_id = meta_main.frame_id
  241. if __name__ == "__main__":
  242. try:
  243. import argparse
  244. parser = argparse.ArgumentParser()
  245. parser.add_argument('--demo', action='store_true', help='A boolean for demo mode.')
  246. args = parser.parse_args()
  247. main(demo=args.demo)
  248. except KeyboardInterrupt:
  249. cloudlog.warning(f"child {PROCESS_NAME} got SIGINT")
  250. except Exception:
  251. sentry.capture_exception()
  252. raise