car_helpers.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. import os
  2. from typing import Dict, List
  3. from cereal import car
  4. from common.params import Params
  5. from common.basedir import BASEDIR
  6. from system.version import is_comma_remote, is_tested_branch
  7. from selfdrive.car.interfaces import get_interface_attr
  8. from selfdrive.car.fingerprints import eliminate_incompatible_cars, all_legacy_fingerprint_cars
  9. from selfdrive.car.vin import get_vin, is_valid_vin, VIN_UNKNOWN
  10. from selfdrive.car.fw_versions import get_fw_versions_ordered, match_fw_to_car, get_present_ecus
  11. from system.swaglog import cloudlog
  12. import cereal.messaging as messaging
  13. from selfdrive.car import gen_empty_fingerprint
  14. EventName = car.CarEvent.EventName
  15. def get_startup_event(car_recognized, controller_available, fw_seen):
  16. if is_comma_remote() and is_tested_branch():
  17. event = EventName.startup
  18. else:
  19. event = EventName.startupMaster
  20. if not car_recognized:
  21. if fw_seen:
  22. event = EventName.startupNoCar
  23. else:
  24. event = EventName.startupNoFw
  25. elif car_recognized and not controller_available:
  26. event = EventName.startupNoControl
  27. return event
  28. def get_one_can(logcan):
  29. while True:
  30. can = messaging.recv_one_retry(logcan)
  31. if len(can.can) > 0:
  32. return can
  33. def load_interfaces(brand_names):
  34. ret = {}
  35. for brand_name in brand_names:
  36. path = f'selfdrive.car.{brand_name}'
  37. CarInterface = __import__(path + '.interface', fromlist=['CarInterface']).CarInterface
  38. if os.path.exists(BASEDIR + '/' + path.replace('.', '/') + '/carstate.py'):
  39. CarState = __import__(path + '.carstate', fromlist=['CarState']).CarState
  40. else:
  41. CarState = None
  42. if os.path.exists(BASEDIR + '/' + path.replace('.', '/') + '/carcontroller.py'):
  43. CarController = __import__(path + '.carcontroller', fromlist=['CarController']).CarController
  44. else:
  45. CarController = None
  46. for model_name in brand_names[brand_name]:
  47. ret[model_name] = (CarInterface, CarController, CarState)
  48. return ret
  49. def _get_interface_names() -> Dict[str, List[str]]:
  50. # returns a dict of brand name and its respective models
  51. brand_names = {}
  52. for brand_name, model_names in get_interface_attr("CAR").items():
  53. model_names = [getattr(model_names, c) for c in model_names.__dict__.keys() if not c.startswith("__")]
  54. brand_names[brand_name] = model_names
  55. return brand_names
  56. # imports from directory selfdrive/car/<name>/
  57. interface_names = _get_interface_names()
  58. interfaces = load_interfaces(interface_names)
  59. # **** for use live only ****
  60. def fingerprint(logcan, sendcan, num_pandas):
  61. fixed_fingerprint = os.environ.get('FINGERPRINT', "")
  62. skip_fw_query = os.environ.get('SKIP_FW_QUERY', False)
  63. ecu_rx_addrs = set()
  64. if not skip_fw_query:
  65. # Vin query only reliably works through OBDII
  66. bus = 1
  67. cached_params = Params().get("CarParamsCache")
  68. if cached_params is not None:
  69. cached_params = car.CarParams.from_bytes(cached_params)
  70. if cached_params.carName == "mock":
  71. cached_params = None
  72. if cached_params is not None and len(cached_params.carFw) > 0 and cached_params.carVin is not VIN_UNKNOWN:
  73. cloudlog.warning("Using cached CarParams")
  74. vin, vin_rx_addr = cached_params.carVin, 0
  75. car_fw = list(cached_params.carFw)
  76. cached = True
  77. else:
  78. cloudlog.warning("Getting VIN & FW versions")
  79. vin_rx_addr, vin = get_vin(logcan, sendcan, bus)
  80. ecu_rx_addrs = get_present_ecus(logcan, sendcan)
  81. car_fw = get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs, num_pandas=num_pandas)
  82. cached = False
  83. exact_fw_match, fw_candidates = match_fw_to_car(car_fw)
  84. else:
  85. vin, vin_rx_addr = VIN_UNKNOWN, 0
  86. exact_fw_match, fw_candidates, car_fw = True, set(), []
  87. cached = False
  88. if not is_valid_vin(vin):
  89. cloudlog.event("Malformed VIN", vin=vin, error=True)
  90. vin = VIN_UNKNOWN
  91. cloudlog.warning("VIN %s", vin)
  92. Params().put("CarVin", vin)
  93. finger = gen_empty_fingerprint()
  94. candidate_cars = {i: all_legacy_fingerprint_cars() for i in [0, 1]} # attempt fingerprint on both bus 0 and 1
  95. frame = 0
  96. frame_fingerprint = 100 # 1s
  97. car_fingerprint = None
  98. done = False
  99. # drain CAN socket so we always get the latest messages
  100. messaging.drain_sock_raw(logcan)
  101. while not done:
  102. a = get_one_can(logcan)
  103. for can in a.can:
  104. # The fingerprint dict is generated for all buses, this way the car interface
  105. # can use it to detect a (valid) multipanda setup and initialize accordingly
  106. if can.src < 128:
  107. if can.src not in finger:
  108. finger[can.src] = {}
  109. finger[can.src][can.address] = len(can.dat)
  110. for b in candidate_cars:
  111. # Ignore extended messages and VIN query response.
  112. if can.src == b and can.address < 0x800 and can.address not in (0x7df, 0x7e0, 0x7e8):
  113. candidate_cars[b] = eliminate_incompatible_cars(can, candidate_cars[b])
  114. # if we only have one car choice and the time since we got our first
  115. # message has elapsed, exit
  116. for b in candidate_cars:
  117. if len(candidate_cars[b]) == 1 and frame > frame_fingerprint:
  118. # fingerprint done
  119. car_fingerprint = candidate_cars[b][0]
  120. # bail if no cars left or we've been waiting for more than 2s
  121. failed = (all(len(cc) == 0 for cc in candidate_cars.values()) and frame > frame_fingerprint) or frame > 200
  122. succeeded = car_fingerprint is not None
  123. done = failed or succeeded
  124. frame += 1
  125. exact_match = True
  126. source = car.CarParams.FingerprintSource.can
  127. # If FW query returns exactly 1 candidate, use it
  128. if len(fw_candidates) == 1:
  129. car_fingerprint = list(fw_candidates)[0]
  130. source = car.CarParams.FingerprintSource.fw
  131. exact_match = exact_fw_match
  132. if fixed_fingerprint:
  133. car_fingerprint = fixed_fingerprint
  134. source = car.CarParams.FingerprintSource.fixed
  135. cloudlog.event("fingerprinted", car_fingerprint=car_fingerprint, source=source, fuzzy=not exact_match, cached=cached,
  136. fw_count=len(car_fw), ecu_responses=list(ecu_rx_addrs), vin_rx_addr=vin_rx_addr, error=True)
  137. return car_fingerprint, finger, vin, car_fw, source, exact_match
  138. def get_car(logcan, sendcan, num_pandas=1):
  139. candidate, fingerprints, vin, car_fw, source, exact_match = fingerprint(logcan, sendcan, num_pandas)
  140. if candidate is None:
  141. cloudlog.warning("car doesn't match any fingerprints: %r", fingerprints)
  142. candidate = "mock"
  143. experimental_long = Params().get_bool("ExperimentalLongitudinalEnabled")
  144. CarInterface, CarController, CarState = interfaces[candidate]
  145. CP = CarInterface.get_params(candidate, fingerprints, car_fw, experimental_long)
  146. CP.carVin = vin
  147. CP.carFw = car_fw
  148. CP.fingerprintSource = source
  149. CP.fuzzyFingerprint = not exact_match
  150. return CarInterface(CP, CarController, CarState), CP