test_fw_fingerprint.py 14 KB


  1. #!/usr/bin/env python3
  2. import random
  3. import time
  4. import unittest
  5. from collections import defaultdict
  6. from parameterized import parameterized
  7. from unittest import mock
  8. from cereal import car
  9. from openpilot.selfdrive.car.car_helpers import interfaces
  10. from openpilot.selfdrive.car.fingerprints import FW_VERSIONS
  11. from openpilot.selfdrive.car.fw_versions import FW_QUERY_CONFIGS, FUZZY_EXCLUDE_ECUS, VERSIONS, build_fw_dict, \
  12. match_fw_to_car, get_brand_ecu_matches, get_fw_versions, get_present_ecus
  13. from openpilot.selfdrive.car.vin import get_vin
  14. CarFw = car.CarParams.CarFw
  15. Ecu = car.CarParams.Ecu
  16. ECU_NAME = {v: k for k, v in Ecu.schema.enumerants.items()}
  17. class FakeSocket:
  18. def receive(self, non_blocking=False):
  19. pass
  20. def send(self, msg):
  21. pass
  22. class TestFwFingerprint(unittest.TestCase):
  23. def assertFingerprints(self, candidates, expected):
  24. candidates = list(candidates)
  25. self.assertEqual(len(candidates), 1, f"got more than one candidate: {candidates}")
  26. self.assertEqual(candidates[0], expected)
  27. @parameterized.expand([(b, c, e[c], n) for b, e in VERSIONS.items() for c in e for n in (True, False)])
  28. def test_exact_match(self, brand, car_model, ecus, test_non_essential):
  29. config = FW_QUERY_CONFIGS[brand]
  30. CP = car.CarParams.new_message()
  31. for _ in range(100):
  32. fw = []
  33. for ecu, fw_versions in ecus.items():
  34. # Assume non-essential ECUs apply to all cars, so we catch cases where Car A with
  35. # missing ECUs won't match to Car B where only Car B has labeled non-essential ECUs
  36. if ecu[0] in config.non_essential_ecus and test_non_essential:
  37. continue
  38. ecu_name, addr, sub_addr = ecu
  39. fw.append({"ecu": ecu_name, "fwVersion": random.choice(fw_versions), 'brand': brand,
  40. "address": addr, "subAddress": 0 if sub_addr is None else sub_addr})
  41. CP.carFw = fw
  42. _, matches = match_fw_to_car(CP.carFw, allow_fuzzy=False)
  43. if not test_non_essential:
  44. self.assertFingerprints(matches, car_model)
  45. else:
  46. # if we're removing ECUs we expect some match loss, but it shouldn't mismatch
  47. if len(matches) != 0:
  48. self.assertFingerprints(matches, car_model)
  49. @parameterized.expand([(b, c, e[c]) for b, e in VERSIONS.items() for c in e])
  50. def test_custom_fuzzy_match(self, brand, car_model, ecus):
  51. # Assert brand-specific fuzzy fingerprinting function doesn't disagree with standard fuzzy function
  52. config = FW_QUERY_CONFIGS[brand]
  53. if config.match_fw_to_car_fuzzy is None:
  54. raise unittest.SkipTest("Brand does not implement custom fuzzy fingerprinting function")
  55. CP = car.CarParams.new_message()
  56. for _ in range(5):
  57. fw = []
  58. for ecu, fw_versions in ecus.items():
  59. ecu_name, addr, sub_addr = ecu
  60. fw.append({"ecu": ecu_name, "fwVersion": random.choice(fw_versions), 'brand': brand,
  61. "address": addr, "subAddress": 0 if sub_addr is None else sub_addr})
  62. CP.carFw = fw
  63. _, matches = match_fw_to_car(CP.carFw, allow_exact=False, log=False)
  64. brand_matches = config.match_fw_to_car_fuzzy(build_fw_dict(CP.carFw), VERSIONS[brand])
  65. # If both have matches, they must agree
  66. if len(matches) == 1 and len(brand_matches) == 1:
  67. self.assertEqual(matches, brand_matches)
  68. @parameterized.expand([(b, c, e[c]) for b, e in VERSIONS.items() for c in e])
  69. def test_fuzzy_match_ecu_count(self, brand, car_model, ecus):
  70. # Asserts that fuzzy matching does not count matching FW, but ECU address keys
  71. valid_ecus = [e for e in ecus if e[0] not in FUZZY_EXCLUDE_ECUS]
  72. if not len(valid_ecus):
  73. raise unittest.SkipTest("Car model has no compatible ECUs for fuzzy matching")
  74. fw = []
  75. for ecu in valid_ecus:
  76. ecu_name, addr, sub_addr = ecu
  77. for _ in range(5):
  78. # Add multiple FW versions to simulate ECU returning to multiple queries in a brand
  79. fw.append({"ecu": ecu_name, "fwVersion": random.choice(ecus[ecu]), 'brand': brand,
  80. "address": addr, "subAddress": 0 if sub_addr is None else sub_addr})
  81. CP = car.CarParams.new_message(carFw=fw)
  82. _, matches = match_fw_to_car(CP.carFw, allow_exact=False, log=False)
  83. # Assert no match if there are not enough unique ECUs
  84. unique_ecus = {(f['address'], f['subAddress']) for f in fw}
  85. if len(unique_ecus) < 2:
  86. self.assertEqual(len(matches), 0, car_model)
  87. # There won't always be a match due to shared FW, but if there is it should be correct
  88. elif len(matches):
  89. self.assertFingerprints(matches, car_model)
  90. def test_fw_version_lists(self):
  91. for car_model, ecus in FW_VERSIONS.items():
  92. with self.subTest(car_model=car_model.value):
  93. for ecu, ecu_fw in ecus.items():
  94. with self.subTest(ecu):
  95. duplicates = {fw for fw in ecu_fw if ecu_fw.count(fw) > 1}
  96. self.assertFalse(len(duplicates), f'{car_model}: Duplicate FW versions: Ecu.{ECU_NAME[ecu[0]]}, {duplicates}')
  97. self.assertGreater(len(ecu_fw), 0, f'{car_model}: No FW versions: Ecu.{ECU_NAME[ecu[0]]}')
  98. def test_all_addrs_map_to_one_ecu(self):
  99. for brand, cars in VERSIONS.items():
  100. addr_to_ecu = defaultdict(set)
  101. for ecus in cars.values():
  102. for ecu_type, addr, sub_addr in ecus.keys():
  103. addr_to_ecu[(addr, sub_addr)].add(ecu_type)
  104. ecus_for_addr = addr_to_ecu[(addr, sub_addr)]
  105. ecu_strings = ", ".join([f'Ecu.{ECU_NAME[ecu]}' for ecu in ecus_for_addr])
  106. self.assertLessEqual(len(ecus_for_addr), 1, f"{brand} has multiple ECUs that map to one address: {ecu_strings} -> ({hex(addr)}, {sub_addr})")
  107. def test_data_collection_ecus(self):
  108. # Asserts no extra ECUs are in the fingerprinting database
  109. for brand, config in FW_QUERY_CONFIGS.items():
  110. for car_model, ecus in VERSIONS[brand].items():
  111. bad_ecus = set(ecus).intersection(config.extra_ecus)
  112. with self.subTest(car_model=car_model.value):
  113. self.assertFalse(len(bad_ecus), f'{car_model}: Fingerprints contain ECUs added for data collection: {bad_ecus}')
  114. def test_blacklisted_ecus(self):
  115. blacklisted_addrs = (0x7c4, 0x7d0) # includes A/C ecu and an unknown ecu
  116. for car_model, ecus in FW_VERSIONS.items():
  117. with self.subTest(car_model=car_model.value):
  118. CP = interfaces[car_model][0].get_non_essential_params(car_model)
  119. if CP.carName == 'subaru':
  120. for ecu in ecus.keys():
  121. self.assertNotIn(ecu[1], blacklisted_addrs, f'{car_model}: Blacklisted ecu: (Ecu.{ECU_NAME[ecu[0]]}, {hex(ecu[1])})')
  122. elif CP.carName == "chrysler":
  123. # Some HD trucks have a combined TCM and ECM
  124. if CP.carFingerprint.startswith("RAM HD"):
  125. for ecu in ecus.keys():
  126. self.assertNotEqual(ecu[0], Ecu.transmission, f"{car_model}: Blacklisted ecu: (Ecu.{ECU_NAME[ecu[0]]}, {hex(ecu[1])})")
  127. def test_missing_versions_and_configs(self):
  128. brand_versions = set(VERSIONS.keys())
  129. brand_configs = set(FW_QUERY_CONFIGS.keys())
  130. if len(brand_configs - brand_versions):
  131. with self.subTest():
  132. self.fail(f"Brands do not implement FW_VERSIONS: {brand_configs - brand_versions}")
  133. if len(brand_versions - brand_configs):
  134. with self.subTest():
  135. self.fail(f"Brands do not implement FW_QUERY_CONFIG: {brand_versions - brand_configs}")
  136. # Ensure each brand has at least 1 ECU to query, and extra ECU retrieval
  137. for brand, config in FW_QUERY_CONFIGS.items():
  138. self.assertEqual(len(config.get_all_ecus({}, include_extra_ecus=False)), 0)
  139. self.assertEqual(config.get_all_ecus({}), set(config.extra_ecus))
  140. self.assertGreater(len(config.get_all_ecus(VERSIONS[brand])), 0)
  141. def test_fw_request_ecu_whitelist(self):
  142. for brand, config in FW_QUERY_CONFIGS.items():
  143. with self.subTest(brand=brand):
  144. whitelisted_ecus = {ecu for r in config.requests for ecu in r.whitelist_ecus}
  145. brand_ecus = {fw[0] for car_fw in VERSIONS[brand].values() for fw in car_fw}
  146. brand_ecus |= {ecu[0] for ecu in config.extra_ecus}
  147. # each ecu in brand's fw versions + extra ecus needs to be whitelisted at least once
  148. ecus_not_whitelisted = brand_ecus - whitelisted_ecus
  149. ecu_strings = ", ".join([f'Ecu.{ECU_NAME[ecu]}' for ecu in ecus_not_whitelisted])
  150. self.assertFalse(len(whitelisted_ecus) and len(ecus_not_whitelisted),
  151. f'{brand.title()}: ECUs not in any FW query whitelists: {ecu_strings}')
  152. def test_fw_requests(self):
  153. # Asserts equal length request and response lists
  154. for brand, config in FW_QUERY_CONFIGS.items():
  155. with self.subTest(brand=brand):
  156. for request_obj in config.requests:
  157. self.assertEqual(len(request_obj.request), len(request_obj.response))
  158. # No request on the OBD port (bus 1, multiplexed) should be run on an aux panda
  159. self.assertFalse(request_obj.auxiliary and request_obj.bus == 1 and request_obj.obd_multiplexing,
  160. f"{brand.title()}: OBD multiplexed request is marked auxiliary: {request_obj}")
  161. def test_brand_ecu_matches(self):
  162. empty_response = {brand: set() for brand in FW_QUERY_CONFIGS}
  163. self.assertEqual(get_brand_ecu_matches(set()), empty_response)
  164. # we ignore bus
  165. expected_response = empty_response | {'toyota': {(0x750, 0xf)}}
  166. self.assertEqual(get_brand_ecu_matches({(0x758, 0xf, 99)}), expected_response)
  167. class TestFwFingerprintTiming(unittest.TestCase):
  168. N: int = 5
  169. TOL: float = 0.05
  170. # for patched functions
  171. current_obd_multiplexing: bool
  172. total_time: float
  173. def fake_set_obd_multiplexing(self, _, obd_multiplexing):
  174. """The 10Hz blocking params loop adds on average 50ms to the query time for each OBD multiplexing change"""
  175. if obd_multiplexing != self.current_obd_multiplexing:
  176. self.current_obd_multiplexing = obd_multiplexing
  177. self.total_time += 0.1 / 2
  178. def fake_get_data(self, timeout):
  179. self.total_time += timeout
  180. return {}
  181. def _benchmark_brand(self, brand, num_pandas):
  182. fake_socket = FakeSocket()
  183. self.total_time = 0
  184. with (mock.patch("openpilot.selfdrive.car.fw_versions.set_obd_multiplexing", self.fake_set_obd_multiplexing),
  185. mock.patch("openpilot.selfdrive.car.isotp_parallel_query.IsoTpParallelQuery.get_data", self.fake_get_data)):
  186. for _ in range(self.N):
  187. # Treat each brand as the most likely (aka, the first) brand with OBD multiplexing initially on
  188. self.current_obd_multiplexing = True
  189. t = time.perf_counter()
  190. get_fw_versions(fake_socket, fake_socket, brand, num_pandas=num_pandas)
  191. self.total_time += time.perf_counter() - t
  192. return self.total_time / self.N
  193. def _assert_timing(self, avg_time, ref_time):
  194. self.assertLess(avg_time, ref_time + self.TOL)
  195. self.assertGreater(avg_time, ref_time - self.TOL, "Performance seems to have improved, update test refs.")
  196. def test_startup_timing(self):
  197. # Tests worse-case VIN query time and typical present ECU query time
  198. vin_ref_times = {'worst': 1.2, 'best': 0.6} # best assumes we go through all queries to get a match
  199. present_ecu_ref_time = 0.75
  200. def fake_get_ecu_addrs(*_, timeout):
  201. self.total_time += timeout
  202. return set()
  203. fake_socket = FakeSocket()
  204. self.total_time = 0.0
  205. with (mock.patch("openpilot.selfdrive.car.fw_versions.set_obd_multiplexing", self.fake_set_obd_multiplexing),
  206. mock.patch("openpilot.selfdrive.car.fw_versions.get_ecu_addrs", fake_get_ecu_addrs)):
  207. for _ in range(self.N):
  208. self.current_obd_multiplexing = True
  209. get_present_ecus(fake_socket, fake_socket, num_pandas=2)
  210. self._assert_timing(self.total_time / self.N, present_ecu_ref_time)
  211. print(f'get_present_ecus, query time={self.total_time / self.N} seconds')
  212. for name, args in (('worst', {}), ('best', {'retry': 1})):
  213. with self.subTest(name=name):
  214. self.total_time = 0.0
  215. with (mock.patch("openpilot.selfdrive.car.isotp_parallel_query.IsoTpParallelQuery.get_data", self.fake_get_data)):
  216. for _ in range(self.N):
  217. get_vin(fake_socket, fake_socket, (0, 1), **args)
  218. self._assert_timing(self.total_time / self.N, vin_ref_times[name])
  219. print(f'get_vin {name} case, query time={self.total_time / self.N} seconds')
  220. def test_fw_query_timing(self):
  221. total_ref_time = {1: 8.6, 2: 9.5}
  222. brand_ref_times = {
  223. 1: {
  224. 'gm': 1.0,
  225. 'body': 0.1,
  226. 'chrysler': 0.3,
  227. 'ford': 1.5,
  228. 'honda': 0.55,
  229. 'hyundai': 1.05,
  230. 'mazda': 0.1,
  231. 'nissan': 0.8,
  232. 'subaru': 0.65,
  233. 'tesla': 0.3,
  234. 'toyota': 1.6,
  235. 'volkswagen': 0.65,
  236. },
  237. 2: {
  238. 'ford': 1.6,
  239. 'hyundai': 1.85,
  240. 'tesla': 0.3,
  241. }
  242. }
  243. total_times = {1: 0.0, 2: 0.0}
  244. for num_pandas in (1, 2):
  245. for brand, config in FW_QUERY_CONFIGS.items():
  246. with self.subTest(brand=brand, num_pandas=num_pandas):
  247. avg_time = self._benchmark_brand(brand, num_pandas)
  248. total_times[num_pandas] += avg_time
  249. avg_time = round(avg_time, 2)
  250. ref_time = brand_ref_times[num_pandas].get(brand)
  251. if ref_time is None:
  252. # ref time should be same as 1 panda if no aux queries
  253. ref_time = brand_ref_times[num_pandas - 1][brand]
  254. self._assert_timing(avg_time, ref_time)
  255. print(f'{brand=}, {num_pandas=}, {len(config.requests)=}, avg FW query time={avg_time} seconds')
  256. for num_pandas in (1, 2):
  257. with self.subTest(brand='all_brands', num_pandas=num_pandas):
  258. total_time = round(total_times[num_pandas], 2)
  259. self._assert_timing(total_time, total_ref_time[num_pandas])
  260. print(f'all brands, total FW query time={total_time} seconds')
  261. if __name__ == "__main__":
  262. unittest.main()