car_helpers.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. import os
  2. import time
  3. from collections.abc import Callable
  4. from cereal import car
  5. from openpilot.common.params import Params
  6. from openpilot.common.basedir import BASEDIR
  7. from openpilot.selfdrive.car.values import PLATFORMS
  8. from openpilot.system.version import is_comma_remote, is_tested_branch
  9. from openpilot.selfdrive.car.interfaces import get_interface_attr
  10. from openpilot.selfdrive.car.fingerprints import eliminate_incompatible_cars, all_legacy_fingerprint_cars
  11. from openpilot.selfdrive.car.vin import get_vin, is_valid_vin, VIN_UNKNOWN
  12. from openpilot.selfdrive.car.fw_versions import get_fw_versions_ordered, get_present_ecus, match_fw_to_car, set_obd_multiplexing
  13. from openpilot.selfdrive.car.mock.values import CAR as MOCK
  14. from openpilot.common.swaglog import cloudlog
  15. import cereal.messaging as messaging
  16. from openpilot.selfdrive.car import gen_empty_fingerprint
  17. FRAME_FINGERPRINT = 100 # 1s
  18. EventName = car.CarEvent.EventName
  19. def get_startup_event(car_recognized, controller_available, fw_seen):
  20. if is_comma_remote() and is_tested_branch():
  21. event = EventName.startup
  22. else:
  23. event = EventName.startupMaster
  24. if not car_recognized:
  25. if fw_seen:
  26. event = EventName.startupNoCar
  27. else:
  28. event = EventName.startupNoFw
  29. elif car_recognized and not controller_available:
  30. event = EventName.startupNoControl
  31. return event
  32. def get_one_can(logcan):
  33. while True:
  34. can = messaging.recv_one_retry(logcan)
  35. if len(can.can) > 0:
  36. return can
  37. def load_interfaces(brand_names):
  38. ret = {}
  39. for brand_name in brand_names:
  40. path = f'openpilot.selfdrive.car.{brand_name}'
  41. CarInterface = __import__(path + '.interface', fromlist=['CarInterface']).CarInterface
  42. if os.path.exists(BASEDIR + '/' + path.replace('.', '/') + '/carstate.py'):
  43. CarState = __import__(path + '.carstate', fromlist=['CarState']).CarState
  44. else:
  45. CarState = None
  46. if os.path.exists(BASEDIR + '/' + path.replace('.', '/') + '/carcontroller.py'):
  47. CarController = __import__(path + '.carcontroller', fromlist=['CarController']).CarController
  48. else:
  49. CarController = None
  50. for model_name in brand_names[brand_name]:
  51. ret[model_name] = (CarInterface, CarController, CarState)
  52. return ret
  53. def _get_interface_names() -> dict[str, list[str]]:
  54. # returns a dict of brand name and its respective models
  55. brand_names = {}
  56. for brand_name, brand_models in get_interface_attr("CAR").items():
  57. brand_names[brand_name] = [model.value for model in brand_models]
  58. return brand_names
  59. # imports from directory selfdrive/car/<name>/
  60. interface_names = _get_interface_names()
  61. interfaces = load_interfaces(interface_names)
  62. def can_fingerprint(next_can: Callable) -> tuple[str | None, dict[int, dict]]:
  63. finger = gen_empty_fingerprint()
  64. candidate_cars = {i: all_legacy_fingerprint_cars() for i in [0, 1]} # attempt fingerprint on both bus 0 and 1
  65. frame = 0
  66. car_fingerprint = None
  67. done = False
  68. while not done:
  69. a = next_can()
  70. for can in a.can:
  71. # The fingerprint dict is generated for all buses, this way the car interface
  72. # can use it to detect a (valid) multipanda setup and initialize accordingly
  73. if can.src < 128:
  74. if can.src not in finger:
  75. finger[can.src] = {}
  76. finger[can.src][can.address] = len(can.dat)
  77. for b in candidate_cars:
  78. # Ignore extended messages and VIN query response.
  79. if can.src == b and can.address < 0x800 and can.address not in (0x7df, 0x7e0, 0x7e8):
  80. candidate_cars[b] = eliminate_incompatible_cars(can, candidate_cars[b])
  81. # if we only have one car choice and the time since we got our first
  82. # message has elapsed, exit
  83. for b in candidate_cars:
  84. if len(candidate_cars[b]) == 1 and frame > FRAME_FINGERPRINT:
  85. # fingerprint done
  86. car_fingerprint = candidate_cars[b][0]
  87. # bail if no cars left or we've been waiting for more than 2s
  88. failed = (all(len(cc) == 0 for cc in candidate_cars.values()) and frame > FRAME_FINGERPRINT) or frame > 200
  89. succeeded = car_fingerprint is not None
  90. done = failed or succeeded
  91. frame += 1
  92. return car_fingerprint, finger
  93. # **** for use live only ****
  94. def fingerprint(logcan, sendcan, num_pandas):
  95. fixed_fingerprint = os.environ.get('FINGERPRINT', "")
  96. skip_fw_query = os.environ.get('SKIP_FW_QUERY', False)
  97. disable_fw_cache = os.environ.get('DISABLE_FW_CACHE', False)
  98. ecu_rx_addrs = set()
  99. params = Params()
  100. start_time = time.monotonic()
  101. if not skip_fw_query:
  102. cached_params = params.get("CarParamsCache")
  103. if cached_params is not None:
  104. with car.CarParams.from_bytes(cached_params) as cached_params:
  105. if cached_params.carName == "mock":
  106. cached_params = None
  107. if cached_params is not None and len(cached_params.carFw) > 0 and \
  108. cached_params.carVin is not VIN_UNKNOWN and not disable_fw_cache:
  109. cloudlog.warning("Using cached CarParams")
  110. vin_rx_addr, vin_rx_bus, vin = -1, -1, cached_params.carVin
  111. car_fw = list(cached_params.carFw)
  112. cached = True
  113. else:
  114. cloudlog.warning("Getting VIN & FW versions")
  115. # enable OBD multiplexing for VIN query
  116. # NOTE: this takes ~0.1s and is relied on to allow sendcan subscriber to connect in time
  117. set_obd_multiplexing(params, True)
  118. # VIN query only reliably works through OBDII
  119. vin_rx_addr, vin_rx_bus, vin = get_vin(logcan, sendcan, (0, 1))
  120. ecu_rx_addrs = get_present_ecus(logcan, sendcan, num_pandas=num_pandas)
  121. car_fw = get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs, num_pandas=num_pandas)
  122. cached = False
  123. exact_fw_match, fw_candidates = match_fw_to_car(car_fw)
  124. else:
  125. vin_rx_addr, vin_rx_bus, vin = -1, -1, VIN_UNKNOWN
  126. exact_fw_match, fw_candidates, car_fw = True, set(), []
  127. cached = False
  128. if not is_valid_vin(vin):
  129. cloudlog.event("Malformed VIN", vin=vin, error=True)
  130. vin = VIN_UNKNOWN
  131. cloudlog.warning("VIN %s", vin)
  132. params.put("CarVin", vin)
  133. # disable OBD multiplexing for CAN fingerprinting and potential ECU knockouts
  134. set_obd_multiplexing(params, False)
  135. params.put_bool("FirmwareQueryDone", True)
  136. fw_query_time = time.monotonic() - start_time
  137. # CAN fingerprint
  138. # drain CAN socket so we get the latest messages
  139. messaging.drain_sock_raw(logcan)
  140. car_fingerprint, finger = can_fingerprint(lambda: get_one_can(logcan))
  141. exact_match = True
  142. source = car.CarParams.FingerprintSource.can
  143. # If FW query returns exactly 1 candidate, use it
  144. if len(fw_candidates) == 1:
  145. car_fingerprint = list(fw_candidates)[0]
  146. source = car.CarParams.FingerprintSource.fw
  147. exact_match = exact_fw_match
  148. if fixed_fingerprint:
  149. car_fingerprint = fixed_fingerprint
  150. source = car.CarParams.FingerprintSource.fixed
  151. cloudlog.event("fingerprinted", car_fingerprint=car_fingerprint, source=source, fuzzy=not exact_match, cached=cached,
  152. fw_count=len(car_fw), ecu_responses=list(ecu_rx_addrs), vin_rx_addr=vin_rx_addr, vin_rx_bus=vin_rx_bus,
  153. fingerprints=repr(finger), fw_query_time=fw_query_time, error=True)
  154. car_platform = PLATFORMS.get(car_fingerprint, MOCK.MOCK)
  155. return car_platform, finger, vin, car_fw, source, exact_match
  156. def get_car_interface(CP):
  157. CarInterface, CarController, CarState = interfaces[CP.carFingerprint]
  158. return CarInterface(CP, CarController, CarState)
  159. def get_car(logcan, sendcan, experimental_long_allowed, num_pandas=1):
  160. candidate, fingerprints, vin, car_fw, source, exact_match = fingerprint(logcan, sendcan, num_pandas)
  161. if candidate is None:
  162. cloudlog.event("car doesn't match any fingerprints", fingerprints=repr(fingerprints), error=True)
  163. candidate = "mock"
  164. CarInterface, _, _ = interfaces[candidate]
  165. CP = CarInterface.get_params(candidate, fingerprints, car_fw, experimental_long_allowed, docs=False)
  166. CP.carVin = vin
  167. CP.carFw = car_fw
  168. CP.fingerprintSource = source
  169. CP.fuzzyFingerprint = not exact_match
  170. return get_car_interface(CP), CP
  171. def write_car_param(platform=MOCK.MOCK):
  172. params = Params()
  173. CarInterface, _, _ = interfaces[platform]
  174. CP = CarInterface.get_non_essential_params(platform)
  175. params.put("CarParams", CP.to_bytes())
  176. def get_demo_car_params():
  177. platform = MOCK.MOCK
  178. CarInterface, _, _ = interfaces[platform]
  179. CP = CarInterface.get_non_essential_params(platform)
  180. return CP