modeld.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. #!/usr/bin/env python3
  2. import os
  3. import time
  4. import pickle
  5. import numpy as np
  6. import cereal.messaging as messaging
  7. from cereal import car, log
  8. from pathlib import Path
  9. from setproctitle import setproctitle
  10. from cereal.messaging import PubMaster, SubMaster
  11. from cereal.visionipc import VisionIpcClient, VisionStreamType, VisionBuf
  12. from openpilot.common.swaglog import cloudlog
  13. from openpilot.common.params import Params
  14. from openpilot.common.filter_simple import FirstOrderFilter
  15. from openpilot.common.realtime import config_realtime_process
  16. from openpilot.common.transformations.model import get_warp_matrix
  17. from openpilot.selfdrive import sentry
  18. from openpilot.selfdrive.car.car_helpers import get_demo_car_params
  19. from openpilot.selfdrive.controls.lib.desire_helper import DesireHelper
  20. from openpilot.selfdrive.modeld.runners import ModelRunner, Runtime
  21. from openpilot.selfdrive.modeld.parse_model_outputs import Parser
  22. from openpilot.selfdrive.modeld.fill_model_msg import fill_model_msg, fill_pose_msg, PublishState
  23. from openpilot.selfdrive.modeld.constants import ModelConstants
  24. from openpilot.selfdrive.modeld.models.commonmodel_pyx import ModelFrame, CLContext
  25. PROCESS_NAME = "selfdrive.modeld.modeld"
  26. SEND_RAW_PRED = os.getenv('SEND_RAW_PRED')
  27. MODEL_PATHS = {
  28. ModelRunner.THNEED: Path(__file__).parent / 'models/supercombo.thneed',
  29. ModelRunner.ONNX: Path(__file__).parent / 'models/supercombo.onnx'}
  30. METADATA_PATH = Path(__file__).parent / 'models/supercombo_metadata.pkl'
  31. class FrameMeta:
  32. frame_id: int = 0
  33. timestamp_sof: int = 0
  34. timestamp_eof: int = 0
  35. def __init__(self, vipc=None):
  36. if vipc is not None:
  37. self.frame_id, self.timestamp_sof, self.timestamp_eof = vipc.frame_id, vipc.timestamp_sof, vipc.timestamp_eof
  38. class ModelState:
  39. frame: ModelFrame
  40. wide_frame: ModelFrame
  41. inputs: dict[str, np.ndarray]
  42. output: np.ndarray
  43. prev_desire: np.ndarray # for tracking the rising edge of the pulse
  44. model: ModelRunner
  45. def __init__(self, context: CLContext):
  46. self.frame = ModelFrame(context)
  47. self.wide_frame = ModelFrame(context)
  48. self.prev_desire = np.zeros(ModelConstants.DESIRE_LEN, dtype=np.float32)
  49. self.inputs = {
  50. 'desire': np.zeros(ModelConstants.DESIRE_LEN * (ModelConstants.HISTORY_BUFFER_LEN+1), dtype=np.float32),
  51. 'traffic_convention': np.zeros(ModelConstants.TRAFFIC_CONVENTION_LEN, dtype=np.float32),
  52. 'lateral_control_params': np.zeros(ModelConstants.LATERAL_CONTROL_PARAMS_LEN, dtype=np.float32),
  53. 'prev_desired_curv': np.zeros(ModelConstants.PREV_DESIRED_CURV_LEN * (ModelConstants.HISTORY_BUFFER_LEN+1), dtype=np.float32),
  54. 'nav_features': np.zeros(ModelConstants.NAV_FEATURE_LEN, dtype=np.float32),
  55. 'nav_instructions': np.zeros(ModelConstants.NAV_INSTRUCTION_LEN, dtype=np.float32),
  56. 'features_buffer': np.zeros(ModelConstants.HISTORY_BUFFER_LEN * ModelConstants.FEATURE_LEN, dtype=np.float32),
  57. }
  58. with open(METADATA_PATH, 'rb') as f:
  59. model_metadata = pickle.load(f)
  60. self.output_slices = model_metadata['output_slices']
  61. net_output_size = model_metadata['output_shapes']['outputs'][1]
  62. self.output = np.zeros(net_output_size, dtype=np.float32)
  63. self.parser = Parser()
  64. self.model = ModelRunner(MODEL_PATHS, self.output, Runtime.GPU, False, context)
  65. self.model.addInput("input_imgs", None)
  66. self.model.addInput("big_input_imgs", None)
  67. for k,v in self.inputs.items():
  68. self.model.addInput(k, v)
  69. def slice_outputs(self, model_outputs: np.ndarray) -> dict[str, np.ndarray]:
  70. parsed_model_outputs = {k: model_outputs[np.newaxis, v] for k,v in self.output_slices.items()}
  71. if SEND_RAW_PRED:
  72. parsed_model_outputs['raw_pred'] = model_outputs.copy()
  73. return parsed_model_outputs
  74. def run(self, buf: VisionBuf, wbuf: VisionBuf, transform: np.ndarray, transform_wide: np.ndarray,
  75. inputs: dict[str, np.ndarray], prepare_only: bool) -> dict[str, np.ndarray] | None:
  76. # Model decides when action is completed, so desire input is just a pulse triggered on rising edge
  77. inputs['desire'][0] = 0
  78. self.inputs['desire'][:-ModelConstants.DESIRE_LEN] = self.inputs['desire'][ModelConstants.DESIRE_LEN:]
  79. self.inputs['desire'][-ModelConstants.DESIRE_LEN:] = np.where(inputs['desire'] - self.prev_desire > .99, inputs['desire'], 0)
  80. self.prev_desire[:] = inputs['desire']
  81. self.inputs['traffic_convention'][:] = inputs['traffic_convention']
  82. self.inputs['lateral_control_params'][:] = inputs['lateral_control_params']
  83. self.inputs['nav_features'][:] = inputs['nav_features']
  84. self.inputs['nav_instructions'][:] = inputs['nav_instructions']
  85. # if getCLBuffer is not None, frame will be None
  86. self.model.setInputBuffer("input_imgs", self.frame.prepare(buf, transform.flatten(), self.model.getCLBuffer("input_imgs")))
  87. if wbuf is not None:
  88. self.model.setInputBuffer("big_input_imgs", self.wide_frame.prepare(wbuf, transform_wide.flatten(), self.model.getCLBuffer("big_input_imgs")))
  89. if prepare_only:
  90. return None
  91. self.model.execute()
  92. outputs = self.parser.parse_outputs(self.slice_outputs(self.output))
  93. self.inputs['features_buffer'][:-ModelConstants.FEATURE_LEN] = self.inputs['features_buffer'][ModelConstants.FEATURE_LEN:]
  94. self.inputs['features_buffer'][-ModelConstants.FEATURE_LEN:] = outputs['hidden_state'][0, :]
  95. self.inputs['prev_desired_curv'][:-ModelConstants.PREV_DESIRED_CURV_LEN] = self.inputs['prev_desired_curv'][ModelConstants.PREV_DESIRED_CURV_LEN:]
  96. self.inputs['prev_desired_curv'][-ModelConstants.PREV_DESIRED_CURV_LEN:] = outputs['desired_curvature'][0, :]
  97. return outputs
  98. def main(demo=False):
  99. cloudlog.warning("modeld init")
  100. sentry.set_tag("daemon", PROCESS_NAME)
  101. cloudlog.bind(daemon=PROCESS_NAME)
  102. setproctitle(PROCESS_NAME)
  103. config_realtime_process(7, 54)
  104. cloudlog.warning("setting up CL context")
  105. cl_context = CLContext()
  106. cloudlog.warning("CL context ready; loading model")
  107. model = ModelState(cl_context)
  108. cloudlog.warning("models loaded, modeld starting")
  109. # visionipc clients
  110. while True:
  111. available_streams = VisionIpcClient.available_streams("camerad", block=False)
  112. if available_streams:
  113. use_extra_client = VisionStreamType.VISION_STREAM_WIDE_ROAD in available_streams and VisionStreamType.VISION_STREAM_ROAD in available_streams
  114. main_wide_camera = VisionStreamType.VISION_STREAM_ROAD not in available_streams
  115. break
  116. time.sleep(.1)
  117. vipc_client_main_stream = VisionStreamType.VISION_STREAM_WIDE_ROAD if main_wide_camera else VisionStreamType.VISION_STREAM_ROAD
  118. vipc_client_main = VisionIpcClient("camerad", vipc_client_main_stream, True, cl_context)
  119. vipc_client_extra = VisionIpcClient("camerad", VisionStreamType.VISION_STREAM_WIDE_ROAD, False, cl_context)
  120. cloudlog.warning(f"vision stream set up, main_wide_camera: {main_wide_camera}, use_extra_client: {use_extra_client}")
  121. while not vipc_client_main.connect(False):
  122. time.sleep(0.1)
  123. while use_extra_client and not vipc_client_extra.connect(False):
  124. time.sleep(0.1)
  125. cloudlog.warning(f"connected main cam with buffer size: {vipc_client_main.buffer_len} ({vipc_client_main.width} x {vipc_client_main.height})")
  126. if use_extra_client:
  127. cloudlog.warning(f"connected extra cam with buffer size: {vipc_client_extra.buffer_len} ({vipc_client_extra.width} x {vipc_client_extra.height})")
  128. # messaging
  129. pm = PubMaster(["modelV2", "cameraOdometry"])
  130. sm = SubMaster(["carState", "roadCameraState", "liveCalibration", "driverMonitoringState", "navModel", "navInstruction", "carControl"])
  131. publish_state = PublishState()
  132. params = Params()
  133. # setup filter to track dropped frames
  134. frame_dropped_filter = FirstOrderFilter(0., 10., 1. / ModelConstants.MODEL_FREQ)
  135. frame_id = 0
  136. last_vipc_frame_id = 0
  137. run_count = 0
  138. model_transform_main = np.zeros((3, 3), dtype=np.float32)
  139. model_transform_extra = np.zeros((3, 3), dtype=np.float32)
  140. live_calib_seen = False
  141. nav_features = np.zeros(ModelConstants.NAV_FEATURE_LEN, dtype=np.float32)
  142. nav_instructions = np.zeros(ModelConstants.NAV_INSTRUCTION_LEN, dtype=np.float32)
  143. buf_main, buf_extra = None, None
  144. meta_main = FrameMeta()
  145. meta_extra = FrameMeta()
  146. if demo:
  147. CP = get_demo_car_params()
  148. else:
  149. with car.CarParams.from_bytes(params.get("CarParams", block=True)) as msg:
  150. CP = msg
  151. cloudlog.info("modeld got CarParams: %s", CP.carName)
  152. # TODO this needs more thought, use .2s extra for now to estimate other delays
  153. steer_delay = CP.steerActuatorDelay + .2
  154. DH = DesireHelper()
  155. while True:
  156. # Keep receiving frames until we are at least 1 frame ahead of previous extra frame
  157. while meta_main.timestamp_sof < meta_extra.timestamp_sof + 25000000:
  158. buf_main = vipc_client_main.recv()
  159. meta_main = FrameMeta(vipc_client_main)
  160. if buf_main is None:
  161. break
  162. if buf_main is None:
  163. cloudlog.error("vipc_client_main no frame")
  164. continue
  165. if use_extra_client:
  166. # Keep receiving extra frames until frame id matches main camera
  167. while True:
  168. buf_extra = vipc_client_extra.recv()
  169. meta_extra = FrameMeta(vipc_client_extra)
  170. if buf_extra is None or meta_main.timestamp_sof < meta_extra.timestamp_sof + 25000000:
  171. break
  172. if buf_extra is None:
  173. cloudlog.error("vipc_client_extra no frame")
  174. continue
  175. if abs(meta_main.timestamp_sof - meta_extra.timestamp_sof) > 10000000:
  176. cloudlog.error("frames out of sync! main: {} ({:.5f}), extra: {} ({:.5f})".format(
  177. meta_main.frame_id, meta_main.timestamp_sof / 1e9,
  178. meta_extra.frame_id, meta_extra.timestamp_sof / 1e9))
  179. else:
  180. # Use single camera
  181. buf_extra = buf_main
  182. meta_extra = meta_main
  183. sm.update(0)
  184. desire = DH.desire
  185. is_rhd = sm["driverMonitoringState"].isRHD
  186. frame_id = sm["roadCameraState"].frameId
  187. lateral_control_params = np.array([sm["carState"].vEgo, steer_delay], dtype=np.float32)
  188. if sm.updated["liveCalibration"]:
  189. device_from_calib_euler = np.array(sm["liveCalibration"].rpyCalib, dtype=np.float32)
  190. model_transform_main = get_warp_matrix(device_from_calib_euler, main_wide_camera, False).astype(np.float32)
  191. model_transform_extra = get_warp_matrix(device_from_calib_euler, True, True).astype(np.float32)
  192. live_calib_seen = True
  193. traffic_convention = np.zeros(2)
  194. traffic_convention[int(is_rhd)] = 1
  195. vec_desire = np.zeros(ModelConstants.DESIRE_LEN, dtype=np.float32)
  196. if desire >= 0 and desire < ModelConstants.DESIRE_LEN:
  197. vec_desire[desire] = 1
  198. # Enable/disable nav features
  199. timestamp_llk = sm["navModel"].locationMonoTime
  200. nav_valid = sm.valid["navModel"] # and (nanos_since_boot() - timestamp_llk < 1e9)
  201. nav_enabled = nav_valid and params.get_bool("ExperimentalMode")
  202. if not nav_enabled:
  203. nav_features[:] = 0
  204. nav_instructions[:] = 0
  205. if nav_enabled and sm.updated["navModel"]:
  206. nav_features = np.array(sm["navModel"].features)
  207. if nav_enabled and sm.updated["navInstruction"]:
  208. nav_instructions[:] = 0
  209. for maneuver in sm["navInstruction"].allManeuvers:
  210. distance_idx = 25 + int(maneuver.distance / 20)
  211. direction_idx = 0
  212. if maneuver.modifier in ("left", "slight left", "sharp left"):
  213. direction_idx = 1
  214. if maneuver.modifier in ("right", "slight right", "sharp right"):
  215. direction_idx = 2
  216. if 0 <= distance_idx < 50:
  217. nav_instructions[distance_idx*3 + direction_idx] = 1
  218. # tracked dropped frames
  219. vipc_dropped_frames = max(0, meta_main.frame_id - last_vipc_frame_id - 1)
  220. frames_dropped = frame_dropped_filter.update(min(vipc_dropped_frames, 10))
  221. if run_count < 10: # let frame drops warm up
  222. frame_dropped_filter.x = 0.
  223. frames_dropped = 0.
  224. run_count = run_count + 1
  225. frame_drop_ratio = frames_dropped / (1 + frames_dropped)
  226. prepare_only = vipc_dropped_frames > 0
  227. if prepare_only:
  228. cloudlog.error(f"skipping model eval. Dropped {vipc_dropped_frames} frames")
  229. inputs:dict[str, np.ndarray] = {
  230. 'desire': vec_desire,
  231. 'traffic_convention': traffic_convention,
  232. 'lateral_control_params': lateral_control_params,
  233. 'nav_features': nav_features,
  234. 'nav_instructions': nav_instructions}
  235. mt1 = time.perf_counter()
  236. model_output = model.run(buf_main, buf_extra, model_transform_main, model_transform_extra, inputs, prepare_only)
  237. mt2 = time.perf_counter()
  238. model_execution_time = mt2 - mt1
  239. if model_output is not None:
  240. modelv2_send = messaging.new_message('modelV2')
  241. posenet_send = messaging.new_message('cameraOdometry')
  242. fill_model_msg(modelv2_send, model_output, publish_state, meta_main.frame_id, meta_extra.frame_id, frame_id, frame_drop_ratio,
  243. meta_main.timestamp_eof, timestamp_llk, model_execution_time, nav_enabled, live_calib_seen)
  244. desire_state = modelv2_send.modelV2.meta.desireState
  245. l_lane_change_prob = desire_state[log.Desire.laneChangeLeft]
  246. r_lane_change_prob = desire_state[log.Desire.laneChangeRight]
  247. lane_change_prob = l_lane_change_prob + r_lane_change_prob
  248. DH.update(sm['carState'], sm['carControl'].latActive, lane_change_prob)
  249. modelv2_send.modelV2.meta.laneChangeState = DH.lane_change_state
  250. modelv2_send.modelV2.meta.laneChangeDirection = DH.lane_change_direction
  251. fill_pose_msg(posenet_send, model_output, meta_main.frame_id, vipc_dropped_frames, meta_main.timestamp_eof, live_calib_seen)
  252. pm.send('modelV2', modelv2_send)
  253. pm.send('cameraOdometry', posenet_send)
  254. last_vipc_frame_id = meta_main.frame_id
  255. if __name__ == "__main__":
  256. try:
  257. import argparse
  258. parser = argparse.ArgumentParser()
  259. parser.add_argument('--demo', action='store_true', help='A boolean for demo mode.')
  260. args = parser.parse_args()
  261. main(demo=args.demo)
  262. except KeyboardInterrupt:
  263. cloudlog.warning(f"child {PROCESS_NAME} got SIGINT")
  264. except Exception:
  265. sentry.capture_exception()
  266. raise