test_models.py 19 KB


  1. #!/usr/bin/env python3
  2. import capnp
  3. import os
  4. import importlib
  5. import pytest
  6. import random
  7. import unittest
  8. from collections import defaultdict, Counter
  9. import hypothesis.strategies as st
  10. from hypothesis import Phase, given, settings
  11. from parameterized import parameterized_class
  12. from cereal import messaging, log, car
  13. from openpilot.common.basedir import BASEDIR
  14. from openpilot.common.params import Params
  15. from openpilot.common.realtime import DT_CTRL
  16. from openpilot.selfdrive.car import gen_empty_fingerprint
  17. from openpilot.selfdrive.car.fingerprints import all_known_cars
  18. from openpilot.selfdrive.car.car_helpers import FRAME_FINGERPRINT, interfaces
  19. from openpilot.selfdrive.car.honda.values import CAR as HONDA, HondaFlags
  20. from openpilot.selfdrive.car.tests.routes import non_tested_cars, routes, CarTestRoute
  21. from openpilot.selfdrive.car.values import PLATFORMS, Platform
  22. from openpilot.selfdrive.controls.controlsd import Controls
  23. from openpilot.selfdrive.test.helpers import read_segment_list
  24. from openpilot.system.hardware.hw import DEFAULT_DOWNLOAD_CACHE_ROOT
  25. from openpilot.tools.lib.logreader import LogReader, internal_source, openpilotci_source
  26. from openpilot.tools.lib.route import SegmentName
  27. from panda.tests.libpanda import libpanda_py
  28. EventName = car.CarEvent.EventName
  29. PandaType = log.PandaState.PandaType
  30. SafetyModel = car.CarParams.SafetyModel
  31. NUM_JOBS = int(os.environ.get("NUM_JOBS", "1"))
  32. JOB_ID = int(os.environ.get("JOB_ID", "0"))
  33. INTERNAL_SEG_LIST = os.environ.get("INTERNAL_SEG_LIST", "")
  34. INTERNAL_SEG_CNT = int(os.environ.get("INTERNAL_SEG_CNT", "0"))
  35. MAX_EXAMPLES = int(os.environ.get("MAX_EXAMPLES", "300"))
  36. CI = os.environ.get("CI", None) is not None
  37. def get_test_cases() -> list[tuple[str, CarTestRoute | None]]:
  38. # build list of test cases
  39. test_cases = []
  40. if not len(INTERNAL_SEG_LIST):
  41. routes_by_car = defaultdict(set)
  42. for r in routes:
  43. routes_by_car[r.car_model].add(r)
  44. for i, c in enumerate(sorted(all_known_cars())):
  45. if i % NUM_JOBS == JOB_ID:
  46. test_cases.extend(sorted((c, r) for r in routes_by_car.get(c, (None,))))
  47. else:
  48. segment_list = read_segment_list(os.path.join(BASEDIR, INTERNAL_SEG_LIST))
  49. segment_list = random.sample(segment_list, INTERNAL_SEG_CNT or len(segment_list))
  50. for platform, segment in segment_list:
  51. segment_name = SegmentName(segment)
  52. test_cases.append((platform, CarTestRoute(segment_name.route_name.canonical_name, platform,
  53. segment=segment_name.segment_num)))
  54. return test_cases
  55. @pytest.mark.slow
  56. @pytest.mark.shared_download_cache
  57. class TestCarModelBase(unittest.TestCase):
  58. platform: Platform | None = None
  59. test_route: CarTestRoute | None = None
  60. test_route_on_bucket: bool = True # whether the route is on the preserved CI bucket
  61. can_msgs: list[capnp.lib.capnp._DynamicStructReader]
  62. fingerprint: dict[int, dict[int, int]]
  63. elm_frame: int | None
  64. car_safety_mode_frame: int | None
  65. @classmethod
  66. def get_testing_data_from_logreader(cls, lr):
  67. car_fw = []
  68. can_msgs = []
  69. cls.elm_frame = None
  70. cls.car_safety_mode_frame = None
  71. cls.fingerprint = gen_empty_fingerprint()
  72. experimental_long = False
  73. for msg in lr:
  74. if msg.which() == "can":
  75. can_msgs.append(msg)
  76. if len(can_msgs) <= FRAME_FINGERPRINT:
  77. for m in msg.can:
  78. if m.src < 64:
  79. cls.fingerprint[m.src][m.address] = len(m.dat)
  80. elif msg.which() == "carParams":
  81. car_fw = msg.carParams.carFw
  82. if msg.carParams.openpilotLongitudinalControl:
  83. experimental_long = True
  84. if cls.platform is None and not cls.ci:
  85. cls.platform = PLATFORMS.get(msg.carParams.carFingerprint)
  86. # Log which can frame the panda safety mode left ELM327, for CAN validity checks
  87. elif msg.which() == 'pandaStates':
  88. for ps in msg.pandaStates:
  89. if cls.elm_frame is None and ps.safetyModel != SafetyModel.elm327:
  90. cls.elm_frame = len(can_msgs)
  91. if cls.car_safety_mode_frame is None and ps.safetyModel not in \
  92. (SafetyModel.elm327, SafetyModel.noOutput):
  93. cls.car_safety_mode_frame = len(can_msgs)
  94. elif msg.which() == 'pandaStateDEPRECATED':
  95. if cls.elm_frame is None and msg.pandaStateDEPRECATED.safetyModel != SafetyModel.elm327:
  96. cls.elm_frame = len(can_msgs)
  97. if cls.car_safety_mode_frame is None and msg.pandaStateDEPRECATED.safetyModel not in \
  98. (SafetyModel.elm327, SafetyModel.noOutput):
  99. cls.car_safety_mode_frame = len(can_msgs)
  100. if len(can_msgs) > int(50 / DT_CTRL):
  101. return car_fw, can_msgs, experimental_long
  102. raise Exception("no can data found")
  103. @classmethod
  104. def get_testing_data(cls):
  105. test_segs = (2, 1, 0)
  106. if cls.test_route.segment is not None:
  107. test_segs = (cls.test_route.segment,)
  108. is_internal = len(INTERNAL_SEG_LIST)
  109. for seg in test_segs:
  110. segment_range = f"{cls.test_route.route}/{seg}"
  111. try:
  112. lr = LogReader(segment_range, default_source=internal_source if is_internal else openpilotci_source)
  113. return cls.get_testing_data_from_logreader(lr)
  114. except Exception:
  115. pass
  116. # Route is not in CI bucket, assume either user has access (private), or it is public
  117. # test_route_on_ci_bucket will fail when running in CI
  118. if not is_internal:
  119. cls.test_route_on_bucket = False
  120. for seg in test_segs:
  121. segment_range = f"{cls.test_route.route}/{seg}"
  122. try:
  123. lr = LogReader(segment_range)
  124. return cls.get_testing_data_from_logreader(lr)
  125. except Exception:
  126. pass
  127. raise Exception(f"Route: {repr(cls.test_route.route)} with segments: {test_segs} not found or no CAN msgs found. Is it uploaded and public?")
  128. @classmethod
  129. def setUpClass(cls):
  130. if cls.__name__ == 'TestCarModel' or cls.__name__.endswith('Base'):
  131. raise unittest.SkipTest
  132. if cls.test_route is None:
  133. if cls.platform in non_tested_cars:
  134. print(f"Skipping tests for {cls.platform}: missing route")
  135. raise unittest.SkipTest
  136. raise Exception(f"missing test route for {cls.platform}")
  137. car_fw, can_msgs, experimental_long = cls.get_testing_data()
  138. # if relay is expected to be open in the route
  139. cls.openpilot_enabled = cls.car_safety_mode_frame is not None
  140. cls.can_msgs = sorted(can_msgs, key=lambda msg: msg.logMonoTime)
  141. cls.CarInterface, cls.CarController, cls.CarState = interfaces[cls.platform]
  142. cls.CP = cls.CarInterface.get_params(cls.platform, cls.fingerprint, car_fw, experimental_long, docs=False)
  143. assert cls.CP
  144. assert cls.CP.carFingerprint == cls.platform
  145. os.environ["COMMA_CACHE"] = DEFAULT_DOWNLOAD_CACHE_ROOT
  146. @classmethod
  147. def tearDownClass(cls):
  148. del cls.can_msgs
  149. def setUp(self):
  150. self.CI = self.CarInterface(self.CP.copy(), self.CarController, self.CarState)
  151. assert self.CI
  152. Params().put_bool("OpenpilotEnabledToggle", self.openpilot_enabled)
  153. # TODO: check safetyModel is in release panda build
  154. self.safety = libpanda_py.libpanda
  155. cfg = self.CP.safetyConfigs[-1]
  156. set_status = self.safety.set_safety_hooks(cfg.safetyModel.raw, cfg.safetyParam)
  157. self.assertEqual(0, set_status, f"failed to set safetyModel {cfg}")
  158. self.safety.init_tests()
  159. def test_car_params(self):
  160. if self.CP.dashcamOnly:
  161. self.skipTest("no need to check carParams for dashcamOnly")
  162. # make sure car params are within a valid range
  163. self.assertGreater(self.CP.mass, 1)
  164. if self.CP.steerControlType != car.CarParams.SteerControlType.angle:
  165. tuning = self.CP.lateralTuning.which()
  166. if tuning == 'pid':
  167. self.assertTrue(len(self.CP.lateralTuning.pid.kpV))
  168. elif tuning == 'torque':
  169. self.assertTrue(self.CP.lateralTuning.torque.kf > 0)
  170. else:
  171. raise Exception("unknown tuning")
  172. def test_car_interface(self):
  173. # TODO: also check for checksum violations from can parser
  174. can_invalid_cnt = 0
  175. can_valid = False
  176. CC = car.CarControl.new_message()
  177. for i, msg in enumerate(self.can_msgs):
  178. CS = self.CI.update(CC, (msg.as_builder().to_bytes(),))
  179. self.CI.apply(CC, msg.logMonoTime)
  180. if CS.canValid:
  181. can_valid = True
  182. # wait max of 2s for low frequency msgs to be seen
  183. if i > 200 or can_valid:
  184. can_invalid_cnt += not CS.canValid
  185. self.assertEqual(can_invalid_cnt, 0)
  186. def test_radar_interface(self):
  187. RadarInterface = importlib.import_module(f'selfdrive.car.{self.CP.carName}.radar_interface').RadarInterface
  188. RI = RadarInterface(self.CP)
  189. assert RI
  190. # Since OBD port is multiplexed to bus 1 (commonly radar bus) while fingerprinting,
  191. # start parsing CAN messages after we've left ELM mode and can expect CAN traffic
  192. error_cnt = 0
  193. for i, msg in enumerate(self.can_msgs[self.elm_frame:]):
  194. rr = RI.update((msg.as_builder().to_bytes(),))
  195. if rr is not None and i > 50:
  196. error_cnt += car.RadarData.Error.canError in rr.errors
  197. self.assertEqual(error_cnt, 0)
  198. def test_panda_safety_rx_checks(self):
  199. if self.CP.dashcamOnly:
  200. self.skipTest("no need to check panda safety for dashcamOnly")
  201. start_ts = self.can_msgs[0].logMonoTime
  202. failed_addrs = Counter()
  203. for can in self.can_msgs:
  204. # update panda timer
  205. t = (can.logMonoTime - start_ts) / 1e3
  206. self.safety.set_timer(int(t))
  207. # run all msgs through the safety RX hook
  208. for msg in can.can:
  209. if msg.src >= 64:
  210. continue
  211. to_send = libpanda_py.make_CANPacket(msg.address, msg.src % 4, msg.dat)
  212. if self.safety.safety_rx_hook(to_send) != 1:
  213. failed_addrs[hex(msg.address)] += 1
  214. # ensure all msgs defined in the addr checks are valid
  215. self.safety.safety_tick_current_safety_config()
  216. if t > 1e6:
  217. self.assertTrue(self.safety.safety_config_valid())
  218. # Don't check relay malfunction on disabled routes (relay closed),
  219. # or before fingerprinting is done (elm327 and noOutput)
  220. if self.openpilot_enabled and t / 1e4 > self.car_safety_mode_frame:
  221. self.assertFalse(self.safety.get_relay_malfunction())
  222. else:
  223. self.safety.set_relay_malfunction(False)
  224. self.assertFalse(len(failed_addrs), f"panda safety RX check failed: {failed_addrs}")
  225. # ensure RX checks go invalid after small time with no traffic
  226. self.safety.set_timer(int(t + (2*1e6)))
  227. self.safety.safety_tick_current_safety_config()
  228. self.assertFalse(self.safety.safety_config_valid())
  229. def test_panda_safety_tx_cases(self, data=None):
  230. """Asserts we can tx common messages"""
  231. if self.CP.notCar:
  232. self.skipTest("Skipping test for notCar")
  233. def test_car_controller(car_control):
  234. now_nanos = 0
  235. msgs_sent = 0
  236. CI = self.CarInterface(self.CP, self.CarController, self.CarState)
  237. for _ in range(round(10.0 / DT_CTRL)): # make sure we hit the slowest messages
  238. CI.update(car_control, [])
  239. _, sendcan = CI.apply(car_control, now_nanos)
  240. now_nanos += DT_CTRL * 1e9
  241. msgs_sent += len(sendcan)
  242. for addr, _, dat, bus in sendcan:
  243. to_send = libpanda_py.make_CANPacket(addr, bus % 4, dat)
  244. self.assertTrue(self.safety.safety_tx_hook(to_send), (addr, dat, bus))
  245. # Make sure we attempted to send messages
  246. self.assertGreater(msgs_sent, 50)
  247. # Make sure we can send all messages while inactive
  248. CC = car.CarControl.new_message()
  249. test_car_controller(CC)
  250. # Test cancel + general messages (controls_allowed=False & cruise_engaged=True)
  251. self.safety.set_cruise_engaged_prev(True)
  252. CC = car.CarControl.new_message(cruiseControl={'cancel': True})
  253. test_car_controller(CC)
  254. # Test resume + general messages (controls_allowed=True & cruise_engaged=True)
  255. self.safety.set_controls_allowed(True)
  256. CC = car.CarControl.new_message(cruiseControl={'resume': True})
  257. test_car_controller(CC)
  258. # Skip stdout/stderr capture with pytest, causes elevated memory usage
  259. @pytest.mark.nocapture
  260. @settings(max_examples=MAX_EXAMPLES, deadline=None,
  261. phases=(Phase.reuse, Phase.generate, Phase.shrink))
  262. @given(data=st.data())
  263. def test_panda_safety_carstate_fuzzy(self, data):
  264. """
  265. For each example, pick a random CAN message on the bus and fuzz its data,
  266. checking for panda state mismatches.
  267. """
  268. if self.CP.dashcamOnly:
  269. self.skipTest("no need to check panda safety for dashcamOnly")
  270. valid_addrs = [(addr, bus, size) for bus, addrs in self.fingerprint.items() for addr, size in addrs.items()]
  271. address, bus, size = data.draw(st.sampled_from(valid_addrs))
  272. msg_strategy = st.binary(min_size=size, max_size=size)
  273. msgs = data.draw(st.lists(msg_strategy, min_size=20))
  274. CC = car.CarControl.new_message()
  275. for dat in msgs:
  276. # due to panda updating state selectively, only edges are expected to match
  277. # TODO: warm up CarState with real CAN messages to check edge of both sources
  278. # (eg. toyota's gasPressed is the inverse of a signal being set)
  279. prev_panda_gas = self.safety.get_gas_pressed_prev()
  280. prev_panda_brake = self.safety.get_brake_pressed_prev()
  281. prev_panda_regen_braking = self.safety.get_regen_braking_prev()
  282. prev_panda_vehicle_moving = self.safety.get_vehicle_moving()
  283. prev_panda_cruise_engaged = self.safety.get_cruise_engaged_prev()
  284. prev_panda_acc_main_on = self.safety.get_acc_main_on()
  285. to_send = libpanda_py.make_CANPacket(address, bus, dat)
  286. self.safety.safety_rx_hook(to_send)
  287. can = messaging.new_message('can', 1)
  288. can.can = [log.CanData(address=address, dat=dat, src=bus)]
  289. CS = self.CI.update(CC, (can.to_bytes(),))
  290. if self.safety.get_gas_pressed_prev() != prev_panda_gas:
  291. self.assertEqual(CS.gasPressed, self.safety.get_gas_pressed_prev())
  292. if self.safety.get_brake_pressed_prev() != prev_panda_brake:
  293. # TODO: remove this exception once this mismatch is resolved
  294. brake_pressed = CS.brakePressed
  295. if CS.brakePressed and not self.safety.get_brake_pressed_prev():
  296. if self.CP.carFingerprint in (HONDA.PILOT, HONDA.RIDGELINE) and CS.brake > 0.05:
  297. brake_pressed = False
  298. self.assertEqual(brake_pressed, self.safety.get_brake_pressed_prev())
  299. if self.safety.get_regen_braking_prev() != prev_panda_regen_braking:
  300. self.assertEqual(CS.regenBraking, self.safety.get_regen_braking_prev())
  301. if self.safety.get_vehicle_moving() != prev_panda_vehicle_moving:
  302. self.assertEqual(not CS.standstill, self.safety.get_vehicle_moving())
  303. if not (self.CP.carName == "honda" and not (self.CP.flags & HondaFlags.BOSCH)):
  304. if self.safety.get_cruise_engaged_prev() != prev_panda_cruise_engaged:
  305. self.assertEqual(CS.cruiseState.enabled, self.safety.get_cruise_engaged_prev())
  306. if self.CP.carName == "honda":
  307. if self.safety.get_acc_main_on() != prev_panda_acc_main_on:
  308. self.assertEqual(CS.cruiseState.available, self.safety.get_acc_main_on())
  309. def test_panda_safety_carstate(self):
  310. """
  311. Assert that panda safety matches openpilot's carState
  312. """
  313. if self.CP.dashcamOnly:
  314. self.skipTest("no need to check panda safety for dashcamOnly")
  315. CC = car.CarControl.new_message()
  316. # warm up pass, as initial states may be different
  317. for can in self.can_msgs[:300]:
  318. self.CI.update(CC, (can.as_builder().to_bytes(), ))
  319. for msg in filter(lambda m: m.src in range(64), can.can):
  320. to_send = libpanda_py.make_CANPacket(msg.address, msg.src % 4, msg.dat)
  321. self.safety.safety_rx_hook(to_send)
  322. controls_allowed_prev = False
  323. CS_prev = car.CarState.new_message()
  324. checks = defaultdict(int)
  325. controlsd = Controls(CI=self.CI)
  326. controlsd.initialized = True
  327. for idx, can in enumerate(self.can_msgs):
  328. CS = self.CI.update(CC, (can.as_builder().to_bytes(), ))
  329. for msg in filter(lambda m: m.src in range(64), can.can):
  330. to_send = libpanda_py.make_CANPacket(msg.address, msg.src % 4, msg.dat)
  331. ret = self.safety.safety_rx_hook(to_send)
  332. self.assertEqual(1, ret, f"safety rx failed ({ret=}): {to_send}")
  333. # Skip first frame so CS_prev is properly initialized
  334. if idx == 0:
  335. CS_prev = CS
  336. # Button may be left pressed in warm up period
  337. if not self.CP.pcmCruise:
  338. self.safety.set_controls_allowed(0)
  339. continue
  340. # TODO: check rest of panda's carstate (steering, ACC main on, etc.)
  341. checks['gasPressed'] += CS.gasPressed != self.safety.get_gas_pressed_prev()
  342. checks['standstill'] += CS.standstill == self.safety.get_vehicle_moving()
  343. # TODO: remove this exception once this mismatch is resolved
  344. brake_pressed = CS.brakePressed
  345. if CS.brakePressed and not self.safety.get_brake_pressed_prev():
  346. if self.CP.carFingerprint in (HONDA.PILOT, HONDA.RIDGELINE) and CS.brake > 0.05:
  347. brake_pressed = False
  348. checks['brakePressed'] += brake_pressed != self.safety.get_brake_pressed_prev()
  349. checks['regenBraking'] += CS.regenBraking != self.safety.get_regen_braking_prev()
  350. if self.CP.pcmCruise:
  351. # On most pcmCruise cars, openpilot's state is always tied to the PCM's cruise state.
  352. # On Honda Nidec, we always engage on the rising edge of the PCM cruise state, but
  353. # openpilot brakes to zero even if the min ACC speed is non-zero (i.e. the PCM disengages).
  354. if self.CP.carName == "honda" and not (self.CP.flags & HondaFlags.BOSCH):
  355. # only the rising edges are expected to match
  356. if CS.cruiseState.enabled and not CS_prev.cruiseState.enabled:
  357. checks['controlsAllowed'] += not self.safety.get_controls_allowed()
  358. else:
  359. checks['controlsAllowed'] += not CS.cruiseState.enabled and self.safety.get_controls_allowed()
  360. # TODO: fix notCar mismatch
  361. if not self.CP.notCar:
  362. checks['cruiseState'] += CS.cruiseState.enabled != self.safety.get_cruise_engaged_prev()
  363. else:
  364. # Check for enable events on rising edge of controls allowed
  365. controlsd.update_events(CS)
  366. controlsd.CS_prev = CS
  367. button_enable = (any(evt.enable for evt in CS.events) and
  368. not any(evt == EventName.pedalPressed for evt in controlsd.events.names))
  369. mismatch = button_enable != (self.safety.get_controls_allowed() and not controls_allowed_prev)
  370. checks['controlsAllowed'] += mismatch
  371. controls_allowed_prev = self.safety.get_controls_allowed()
  372. if button_enable and not mismatch:
  373. self.safety.set_controls_allowed(False)
  374. if self.CP.carName == "honda":
  375. checks['mainOn'] += CS.cruiseState.available != self.safety.get_acc_main_on()
  376. CS_prev = CS
  377. failed_checks = {k: v for k, v in checks.items() if v > 0}
  378. self.assertFalse(len(failed_checks), f"panda safety doesn't agree with openpilot: {failed_checks}")
  379. @unittest.skipIf(not CI, "Accessing non CI-bucket routes is allowed only when not in CI")
  380. def test_route_on_ci_bucket(self):
  381. self.assertTrue(self.test_route_on_bucket, "Route not on CI bucket. " +
  382. "This is fine to fail for WIP car ports, just let us know and we can upload your routes to the CI bucket.")
  383. @parameterized_class(('platform', 'test_route'), get_test_cases())
  384. @pytest.mark.xdist_group_class_property('test_route')
  385. class TestCarModel(TestCarModelBase):
  386. pass
  387. if __name__ == "__main__":
  388. unittest.main()