fw_versions.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. #!/usr/bin/env python3
  2. from collections import defaultdict
  3. from collections.abc import Iterator
  4. from typing import Any, Protocol, TypeVar
  5. from tqdm import tqdm
  6. import capnp
  7. import panda.python.uds as uds
  8. from cereal import car
  9. from openpilot.common.params import Params
  10. from openpilot.common.swaglog import cloudlog
  11. from openpilot.selfdrive.car.ecu_addrs import get_ecu_addrs
  12. from openpilot.selfdrive.car.fingerprints import FW_VERSIONS
  13. from openpilot.selfdrive.car.fw_query_definitions import AddrType, EcuAddrBusType, FwQueryConfig, LiveFwVersions, OfflineFwVersions
  14. from openpilot.selfdrive.car.interfaces import get_interface_attr
  15. from openpilot.selfdrive.car.isotp_parallel_query import IsoTpParallelQuery
  16. Ecu = car.CarParams.Ecu
  17. ESSENTIAL_ECUS = [Ecu.engine, Ecu.eps, Ecu.abs, Ecu.fwdRadar, Ecu.fwdCamera, Ecu.vsa]
  18. FUZZY_EXCLUDE_ECUS = [Ecu.fwdCamera, Ecu.fwdRadar, Ecu.eps, Ecu.debug]
  19. FW_QUERY_CONFIGS: dict[str, FwQueryConfig] = get_interface_attr('FW_QUERY_CONFIG', ignore_none=True)
  20. VERSIONS = get_interface_attr('FW_VERSIONS', ignore_none=True)
  21. MODEL_TO_BRAND = {c: b for b, e in VERSIONS.items() for c in e}
  22. REQUESTS = [(brand, config, r) for brand, config in FW_QUERY_CONFIGS.items() for r in config.requests]
  23. T = TypeVar('T')
  24. def chunks(l: list[T], n: int = 128) -> Iterator[list[T]]:
  25. for i in range(0, len(l), n):
  26. yield l[i:i + n]
  27. def is_brand(brand: str, filter_brand: str | None) -> bool:
  28. """Returns if brand matches filter_brand or no brand filter is specified"""
  29. return filter_brand is None or brand == filter_brand
  30. def build_fw_dict(fw_versions: list[capnp.lib.capnp._DynamicStructBuilder], filter_brand: str = None) -> dict[AddrType, set[bytes]]:
  31. fw_versions_dict: defaultdict[AddrType, set[bytes]] = defaultdict(set)
  32. for fw in fw_versions:
  33. if is_brand(fw.brand, filter_brand) and not fw.logging:
  34. sub_addr = fw.subAddress if fw.subAddress != 0 else None
  35. fw_versions_dict[(fw.address, sub_addr)].add(fw.fwVersion)
  36. return dict(fw_versions_dict)
  37. class MatchFwToCar(Protocol):
  38. def __call__(self, live_fw_versions: LiveFwVersions, match_brand: str = None, log: bool = True) -> set[str]:
  39. ...
  40. def match_fw_to_car_fuzzy(live_fw_versions: LiveFwVersions, match_brand: str = None, log: bool = True, exclude: str = None) -> set[str]:
  41. """Do a fuzzy FW match. This function will return a match, and the number of firmware version
  42. that were matched uniquely to that specific car. If multiple ECUs uniquely match to different cars
  43. the match is rejected."""
  44. # Build lookup table from (addr, sub_addr, fw) to list of candidate cars
  45. all_fw_versions = defaultdict(list)
  46. for candidate, fw_by_addr in FW_VERSIONS.items():
  47. if not is_brand(MODEL_TO_BRAND[candidate], match_brand):
  48. continue
  49. if candidate == exclude:
  50. continue
  51. for addr, fws in fw_by_addr.items():
  52. # These ECUs are known to be shared between models (EPS only between hybrid/ICE version)
  53. # Getting this exactly right isn't crucial, but excluding camera and radar makes it almost
  54. # impossible to get 3 matching versions, even if two models with shared parts are released at the same
  55. # time and only one is in our database.
  56. if addr[0] in FUZZY_EXCLUDE_ECUS:
  57. continue
  58. for f in fws:
  59. all_fw_versions[(addr[1], addr[2], f)].append(candidate)
  60. matched_ecus = set()
  61. match: str | None = None
  62. for addr, versions in live_fw_versions.items():
  63. ecu_key = (addr[0], addr[1])
  64. for version in versions:
  65. # All cars that have this FW response on the specified address
  66. candidates = all_fw_versions[(*ecu_key, version)]
  67. if len(candidates) == 1:
  68. matched_ecus.add(ecu_key)
  69. if match is None:
  70. match = candidates[0]
  71. # We uniquely matched two different cars. No fuzzy match possible
  72. elif match != candidates[0]:
  73. return set()
  74. # Note that it is possible to match to a candidate without all its ECUs being present
  75. # if there are enough matches. FIXME: parameterize this or require all ECUs to exist like exact matching
  76. if match and len(matched_ecus) >= 2:
  77. if log:
  78. cloudlog.error(f"Fingerprinted {match} using fuzzy match. {len(matched_ecus)} matching ECUs")
  79. return {match}
  80. else:
  81. return set()
  82. def match_fw_to_car_exact(live_fw_versions: LiveFwVersions, match_brand: str = None, log: bool = True, extra_fw_versions: dict = None) -> set[str]:
  83. """Do an exact FW match. Returns all cars that match the given
  84. FW versions for a list of "essential" ECUs. If an ECU is not considered
  85. essential the FW version can be missing to get a fingerprint, but if it's present it
  86. needs to match the database."""
  87. if extra_fw_versions is None:
  88. extra_fw_versions = {}
  89. invalid = set()
  90. candidates = {c: f for c, f in FW_VERSIONS.items() if
  91. is_brand(MODEL_TO_BRAND[c], match_brand)}
  92. for candidate, fws in candidates.items():
  93. config = FW_QUERY_CONFIGS[MODEL_TO_BRAND[candidate]]
  94. for ecu, expected_versions in fws.items():
  95. expected_versions = expected_versions + extra_fw_versions.get(candidate, {}).get(ecu, [])
  96. ecu_type = ecu[0]
  97. addr = ecu[1:]
  98. found_versions = live_fw_versions.get(addr, set())
  99. if not len(found_versions):
  100. # Some models can sometimes miss an ecu, or show on two different addresses
  101. # FIXME: this logic can be improved to be more specific, should require one of the two addresses
  102. if candidate in config.non_essential_ecus.get(ecu_type, []):
  103. continue
  104. # Ignore non essential ecus
  105. if ecu_type not in ESSENTIAL_ECUS:
  106. continue
  107. # Virtual debug ecu doesn't need to match the database
  108. if ecu_type == Ecu.debug:
  109. continue
  110. if not any(found_version in expected_versions for found_version in found_versions):
  111. invalid.add(candidate)
  112. break
  113. return set(candidates.keys()) - invalid
  114. def match_fw_to_car(fw_versions: list[capnp.lib.capnp._DynamicStructBuilder], allow_exact: bool = True, allow_fuzzy: bool = True,
  115. log: bool = True) -> tuple[bool, set[str]]:
  116. # Try exact matching first
  117. exact_matches: list[tuple[bool, MatchFwToCar]] = []
  118. if allow_exact:
  119. exact_matches = [(True, match_fw_to_car_exact)]
  120. if allow_fuzzy:
  121. exact_matches.append((False, match_fw_to_car_fuzzy))
  122. for exact_match, match_func in exact_matches:
  123. # For each brand, attempt to fingerprint using all FW returned from its queries
  124. matches: set[str] = set()
  125. for brand in VERSIONS.keys():
  126. fw_versions_dict = build_fw_dict(fw_versions, filter_brand=brand)
  127. matches |= match_func(fw_versions_dict, match_brand=brand, log=log)
  128. # If specified and no matches so far, fall back to brand's fuzzy fingerprinting function
  129. config = FW_QUERY_CONFIGS[brand]
  130. if not exact_match and not len(matches) and config.match_fw_to_car_fuzzy is not None:
  131. matches |= config.match_fw_to_car_fuzzy(fw_versions_dict, VERSIONS[brand])
  132. if len(matches):
  133. return exact_match, matches
  134. return True, set()
  135. def get_present_ecus(logcan, sendcan, num_pandas: int = 1) -> set[EcuAddrBusType]:
  136. params = Params()
  137. # queries are split by OBD multiplexing mode
  138. queries: dict[bool, list[list[EcuAddrBusType]]] = {True: [], False: []}
  139. parallel_queries: dict[bool, list[EcuAddrBusType]] = {True: [], False: []}
  140. responses: set[EcuAddrBusType] = set()
  141. for brand, config, r in REQUESTS:
  142. # Skip query if no panda available
  143. if r.bus > num_pandas * 4 - 1:
  144. continue
  145. for ecu_type, addr, sub_addr in config.get_all_ecus(VERSIONS[brand]):
  146. # Only query ecus in whitelist if whitelist is not empty
  147. if len(r.whitelist_ecus) == 0 or ecu_type in r.whitelist_ecus:
  148. a = (addr, sub_addr, r.bus)
  149. # Build set of queries
  150. if sub_addr is None:
  151. if a not in parallel_queries[r.obd_multiplexing]:
  152. parallel_queries[r.obd_multiplexing].append(a)
  153. else: # subaddresses must be queried one by one
  154. if [a] not in queries[r.obd_multiplexing]:
  155. queries[r.obd_multiplexing].append([a])
  156. # Build set of expected responses to filter
  157. response_addr = uds.get_rx_addr_for_tx_addr(addr, r.rx_offset)
  158. responses.add((response_addr, sub_addr, r.bus))
  159. for obd_multiplexing in queries:
  160. queries[obd_multiplexing].insert(0, parallel_queries[obd_multiplexing])
  161. ecu_responses = set()
  162. for obd_multiplexing in queries:
  163. set_obd_multiplexing(params, obd_multiplexing)
  164. for query in queries[obd_multiplexing]:
  165. ecu_responses.update(get_ecu_addrs(logcan, sendcan, set(query), responses, timeout=0.1))
  166. return ecu_responses
  167. def get_brand_ecu_matches(ecu_rx_addrs: set[EcuAddrBusType]) -> dict[str, set[AddrType]]:
  168. """Returns dictionary of brands and matches with ECUs in their FW versions"""
  169. brand_addrs = {brand: {(addr, subaddr) for _, addr, subaddr in config.get_all_ecus(VERSIONS[brand])} for
  170. brand, config in FW_QUERY_CONFIGS.items()}
  171. brand_matches: dict[str, set[AddrType]] = {brand: set() for brand, _, _ in REQUESTS}
  172. brand_rx_offsets = {(brand, r.rx_offset) for brand, _, r in REQUESTS}
  173. for addr, sub_addr, _ in ecu_rx_addrs:
  174. # Since we can't know what request an ecu responded to, add matches for all possible rx offsets
  175. for brand, rx_offset in brand_rx_offsets:
  176. a = (uds.get_rx_addr_for_tx_addr(addr, -rx_offset), sub_addr)
  177. if a in brand_addrs[brand]:
  178. brand_matches[brand].add(a)
  179. return brand_matches
  180. def set_obd_multiplexing(params: Params, obd_multiplexing: bool):
  181. if params.get_bool("ObdMultiplexingEnabled") != obd_multiplexing:
  182. cloudlog.warning(f"Setting OBD multiplexing to {obd_multiplexing}")
  183. params.remove("ObdMultiplexingChanged")
  184. params.put_bool("ObdMultiplexingEnabled", obd_multiplexing)
  185. params.get_bool("ObdMultiplexingChanged", block=True)
  186. cloudlog.warning("OBD multiplexing set successfully")
  187. def get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs: set[EcuAddrBusType], timeout: float = 0.1, num_pandas: int = 1,
  188. debug: bool = False, progress: bool = False) -> list[capnp.lib.capnp._DynamicStructBuilder]:
  189. """Queries for FW versions ordering brands by likelihood, breaks when exact match is found"""
  190. all_car_fw = []
  191. brand_matches = get_brand_ecu_matches(ecu_rx_addrs)
  192. for brand in sorted(brand_matches, key=lambda b: len(brand_matches[b]), reverse=True):
  193. # Skip this brand if there are no matching present ECUs
  194. if not len(brand_matches[brand]):
  195. continue
  196. car_fw = get_fw_versions(logcan, sendcan, query_brand=brand, timeout=timeout, num_pandas=num_pandas, debug=debug, progress=progress)
  197. all_car_fw.extend(car_fw)
  198. # If there is a match using this brand's FW alone, finish querying early
  199. _, matches = match_fw_to_car(car_fw, log=False)
  200. if len(matches) == 1:
  201. break
  202. return all_car_fw
  203. def get_fw_versions(logcan, sendcan, query_brand: str = None, extra: OfflineFwVersions = None, timeout: float = 0.1, num_pandas: int = 1,
  204. debug: bool = False, progress: bool = False) -> list[capnp.lib.capnp._DynamicStructBuilder]:
  205. versions = VERSIONS.copy()
  206. params = Params()
  207. if query_brand is not None:
  208. versions = {query_brand: versions[query_brand]}
  209. if extra is not None:
  210. versions.update(extra)
  211. # Extract ECU addresses to query from fingerprints
  212. # ECUs using a subaddress need be queried one by one, the rest can be done in parallel
  213. addrs = []
  214. parallel_addrs = []
  215. ecu_types = {}
  216. for brand, brand_versions in versions.items():
  217. config = FW_QUERY_CONFIGS[brand]
  218. for ecu_type, addr, sub_addr in config.get_all_ecus(brand_versions):
  219. a = (brand, addr, sub_addr)
  220. if a not in ecu_types:
  221. ecu_types[a] = ecu_type
  222. if sub_addr is None:
  223. if a not in parallel_addrs:
  224. parallel_addrs.append(a)
  225. else:
  226. if [a] not in addrs:
  227. addrs.append([a])
  228. addrs.insert(0, parallel_addrs)
  229. # Get versions and build capnp list to put into CarParams
  230. car_fw = []
  231. requests = [(brand, config, r) for brand, config, r in REQUESTS if is_brand(brand, query_brand)]
  232. for addr_group in tqdm(addrs, disable=not progress): # split by subaddr, if any
  233. for addr_chunk in chunks(addr_group):
  234. for brand, config, r in requests:
  235. # Skip query if no panda available
  236. if r.bus > num_pandas * 4 - 1:
  237. continue
  238. # Toggle OBD multiplexing for each request
  239. if r.bus % 4 == 1:
  240. set_obd_multiplexing(params, r.obd_multiplexing)
  241. try:
  242. query_addrs = [(a, s) for (b, a, s) in addr_chunk if b in (brand, 'any') and
  243. (len(r.whitelist_ecus) == 0 or ecu_types[(b, a, s)] in r.whitelist_ecus)]
  244. if query_addrs:
  245. query = IsoTpParallelQuery(sendcan, logcan, r.bus, query_addrs, r.request, r.response, r.rx_offset, debug=debug)
  246. for (tx_addr, sub_addr), version in query.get_data(timeout).items():
  247. f = car.CarParams.CarFw.new_message()
  248. f.ecu = ecu_types.get((brand, tx_addr, sub_addr), Ecu.unknown)
  249. f.fwVersion = version
  250. f.address = tx_addr
  251. f.responseAddress = uds.get_rx_addr_for_tx_addr(tx_addr, r.rx_offset)
  252. f.request = r.request
  253. f.brand = brand
  254. f.bus = r.bus
  255. f.logging = r.logging or (f.ecu, tx_addr, sub_addr) in config.extra_ecus
  256. f.obdMultiplexing = r.obd_multiplexing
  257. if sub_addr is not None:
  258. f.subAddress = sub_addr
  259. car_fw.append(f)
  260. except Exception:
  261. cloudlog.exception("FW query exception")
  262. return car_fw
  263. if __name__ == "__main__":
  264. import time
  265. import argparse
  266. import cereal.messaging as messaging
  267. from openpilot.selfdrive.car.vin import get_vin
  268. parser = argparse.ArgumentParser(description='Get firmware version of ECUs')
  269. parser.add_argument('--scan', action='store_true')
  270. parser.add_argument('--debug', action='store_true')
  271. parser.add_argument('--brand', help='Only query addresses/with requests for this brand')
  272. args = parser.parse_args()
  273. logcan = messaging.sub_sock('can')
  274. pandaStates_sock = messaging.sub_sock('pandaStates')
  275. sendcan = messaging.pub_sock('sendcan')
  276. # Set up params for boardd
  277. params = Params()
  278. params.remove("FirmwareQueryDone")
  279. params.put_bool("IsOnroad", False)
  280. time.sleep(0.2) # thread is 10 Hz
  281. params.put_bool("IsOnroad", True)
  282. extra: Any = None
  283. if args.scan:
  284. extra = {}
  285. # Honda
  286. for i in range(256):
  287. extra[(Ecu.unknown, 0x18da00f1 + (i << 8), None)] = []
  288. extra[(Ecu.unknown, 0x700 + i, None)] = []
  289. extra[(Ecu.unknown, 0x750, i)] = []
  290. extra = {"any": {"debug": extra}}
  291. num_pandas = len(messaging.recv_one_retry(pandaStates_sock).pandaStates)
  292. t = time.time()
  293. print("Getting vin...")
  294. vin_rx_addr, vin_rx_bus, vin = get_vin(logcan, sendcan, (0, 1), retry=10, debug=args.debug)
  295. print(f'RX: {hex(vin_rx_addr)}, BUS: {vin_rx_bus}, VIN: {vin}')
  296. print(f"Getting VIN took {time.time() - t:.3f} s")
  297. print()
  298. t = time.time()
  299. fw_vers = get_fw_versions(logcan, sendcan, query_brand=args.brand, extra=extra, num_pandas=num_pandas, debug=args.debug, progress=True)
  300. _, candidates = match_fw_to_car(fw_vers)
  301. print()
  302. print("Found FW versions")
  303. print("{")
  304. padding = max([len(fw.brand) for fw in fw_vers] or [0])
  305. for version in fw_vers:
  306. subaddr = None if version.subAddress == 0 else hex(version.subAddress)
  307. print(f" Brand: {version.brand:{padding}}, bus: {version.bus} - (Ecu.{version.ecu}, {hex(version.address)}, {subaddr}): [{version.fwVersion}]")
  308. print("}")
  309. print()
  310. print("Possible matches:", candidates)
  311. print(f"Getting fw took {time.time() - t:.3f} s")