interfaces.py 17 KB


  1. import yaml
  2. import os
  3. import time
  4. from abc import abstractmethod, ABC
  5. from typing import Any, Dict, Optional, Tuple, List, Callable
  6. from cereal import car
  7. from common.basedir import BASEDIR
  8. from common.conversions import Conversions as CV
  9. from common.kalman.simple_kalman import KF1D
  10. from common.numpy_fast import clip, interp
  11. from common.realtime import DT_CTRL
  12. from selfdrive.car import apply_hysteresis, gen_empty_fingerprint, scale_rot_inertia, scale_tire_stiffness
  13. from selfdrive.controls.lib.drive_helpers import V_CRUISE_MAX, apply_center_deadzone
  14. from selfdrive.controls.lib.events import Events
  15. from selfdrive.controls.lib.vehicle_model import VehicleModel
  16. ButtonType = car.CarState.ButtonEvent.Type
  17. GearShifter = car.CarState.GearShifter
  18. EventName = car.CarEvent.EventName
  19. TorqueFromLateralAccelCallbackType = Callable[[float, car.CarParams.LateralTorqueTuning, float, float, bool], float]
  20. MAX_CTRL_SPEED = (V_CRUISE_MAX + 4) * CV.KPH_TO_MS
  21. ACCEL_MAX = 2.0
  22. ACCEL_MIN = -3.5
  23. FRICTION_THRESHOLD = 0.3
  24. TORQUE_PARAMS_PATH = os.path.join(BASEDIR, 'selfdrive/car/torque_data/params.yaml')
  25. TORQUE_OVERRIDE_PATH = os.path.join(BASEDIR, 'selfdrive/car/torque_data/override.yaml')
  26. TORQUE_SUBSTITUTE_PATH = os.path.join(BASEDIR, 'selfdrive/car/torque_data/substitute.yaml')
  27. def get_torque_params(candidate):
  28. with open(TORQUE_SUBSTITUTE_PATH) as f:
  29. sub = yaml.load(f, Loader=yaml.CSafeLoader)
  30. if candidate in sub:
  31. candidate = sub[candidate]
  32. with open(TORQUE_PARAMS_PATH) as f:
  33. params = yaml.load(f, Loader=yaml.CSafeLoader)
  34. with open(TORQUE_OVERRIDE_PATH) as f:
  35. override = yaml.load(f, Loader=yaml.CSafeLoader)
  36. # Ensure no overlap
  37. if sum([candidate in x for x in [sub, params, override]]) > 1:
  38. raise RuntimeError(f'{candidate} is defined twice in torque config')
  39. if candidate in override:
  40. out = override[candidate]
  41. elif candidate in params:
  42. out = params[candidate]
  43. else:
  44. raise NotImplementedError(f"Did not find torque params for {candidate}")
  45. return {key: out[i] for i, key in enumerate(params['legend'])}
  46. # generic car and radar interfaces
  47. class CarInterfaceBase(ABC):
  48. def __init__(self, CP, CarController, CarState):
  49. self.CP = CP
  50. self.VM = VehicleModel(CP)
  51. self.frame = 0
  52. self.steering_unpressed = 0
  53. self.low_speed_alert = False
  54. self.silent_steer_warning = True
  55. self.v_ego_cluster_seen = False
  56. self.CS = None
  57. self.can_parsers = []
  58. if CarState is not None:
  59. self.CS = CarState(CP)
  60. self.cp = self.CS.get_can_parser(CP)
  61. self.cp_cam = self.CS.get_cam_can_parser(CP)
  62. self.cp_adas = self.CS.get_adas_can_parser(CP)
  63. self.cp_body = self.CS.get_body_can_parser(CP)
  64. self.cp_loopback = self.CS.get_loopback_can_parser(CP)
  65. self.can_parsers = [self.cp, self.cp_cam, self.cp_adas, self.cp_body, self.cp_loopback]
  66. self.CC = None
  67. if CarController is not None:
  68. self.CC = CarController(self.cp.dbc_name, CP, self.VM)
  69. @staticmethod
  70. def get_pid_accel_limits(CP, current_speed, cruise_speed):
  71. return ACCEL_MIN, ACCEL_MAX
  72. @classmethod
  73. def get_params(cls, candidate: str, fingerprint: Optional[Dict[int, Dict[int, int]]] = None, car_fw: Optional[List[car.CarParams.CarFw]] = None, experimental_long: bool = False):
  74. if fingerprint is None:
  75. fingerprint = gen_empty_fingerprint()
  76. if car_fw is None:
  77. car_fw = list()
  78. ret = CarInterfaceBase.get_std_params(candidate)
  79. ret = cls._get_params(ret, candidate, fingerprint, car_fw, experimental_long)
  80. # Set common params using fields set by the car interface
  81. # TODO: get actual value, for now starting with reasonable value for
  82. # civic and scaling by mass and wheelbase
  83. ret.rotationalInertia = scale_rot_inertia(ret.mass, ret.wheelbase)
  84. # TODO: some car interfaces set stiffness factor
  85. if ret.tireStiffnessFront == 0 or ret.tireStiffnessRear == 0:
  86. # TODO: start from empirically derived lateral slip stiffness for the civic and scale by
  87. # mass and CG position, so all cars will have approximately similar dyn behaviors
  88. ret.tireStiffnessFront, ret.tireStiffnessRear = scale_tire_stiffness(ret.mass, ret.wheelbase, ret.centerToFront)
  89. return ret
  90. @staticmethod
  91. @abstractmethod
  92. def _get_params(ret: car.CarParams, candidate: str, fingerprint: Dict[int, Dict[int, int]], car_fw: List[car.CarParams.CarFw], experimental_long: bool):
  93. raise NotImplementedError
  94. @staticmethod
  95. def init(CP, logcan, sendcan):
  96. pass
  97. @staticmethod
  98. def get_steer_feedforward_default(desired_angle, v_ego):
  99. # Proportional to realigning tire momentum: lateral acceleration.
  100. # TODO: something with lateralPlan.curvatureRates
  101. return desired_angle * (v_ego**2)
  102. def get_steer_feedforward_function(self):
  103. return self.get_steer_feedforward_default
  104. @staticmethod
  105. def torque_from_lateral_accel_linear(lateral_accel_value, torque_params, lateral_accel_error, lateral_accel_deadzone, friction_compensation):
  106. # The default is a linear relationship between torque and lateral acceleration (accounting for road roll and steering friction)
  107. friction_interp = interp(
  108. apply_center_deadzone(lateral_accel_error, lateral_accel_deadzone),
  109. [-FRICTION_THRESHOLD, FRICTION_THRESHOLD],
  110. [-torque_params.friction, torque_params.friction]
  111. )
  112. friction = friction_interp if friction_compensation else 0.0
  113. return (lateral_accel_value / torque_params.latAccelFactor) + friction
  114. def torque_from_lateral_accel(self) -> TorqueFromLateralAccelCallbackType:
  115. return self.torque_from_lateral_accel_linear
  116. # returns a set of default params to avoid repetition in car specific params
  117. @staticmethod
  118. def get_std_params(candidate):
  119. ret = car.CarParams.new_message()
  120. ret.carFingerprint = candidate
  121. # Car docs fields
  122. ret.maxLateralAccel = get_torque_params(candidate)['MAX_LAT_ACCEL_MEASURED']
  123. ret.autoResumeSng = True # describes whether car can resume from a stop automatically
  124. # standard ALC params
  125. ret.steerControlType = car.CarParams.SteerControlType.torque
  126. ret.minSteerSpeed = 0.
  127. ret.wheelSpeedFactor = 1.0
  128. ret.pcmCruise = True # openpilot's state is tied to the PCM's cruise state on most cars
  129. ret.minEnableSpeed = -1. # enable is done by stock ACC, so ignore this
  130. ret.steerRatioRear = 0. # no rear steering, at least on the listed cars aboveA
  131. ret.openpilotLongitudinalControl = False
  132. ret.stopAccel = -2.0
  133. ret.stoppingDecelRate = 0.8 # brake_travel/s while trying to stop
  134. ret.vEgoStopping = 0.5
  135. ret.vEgoStarting = 0.5
  136. ret.stoppingControl = True
  137. ret.longitudinalTuning.deadzoneBP = [0.]
  138. ret.longitudinalTuning.deadzoneV = [0.]
  139. ret.longitudinalTuning.kf = 1.
  140. ret.longitudinalTuning.kpBP = [0.]
  141. ret.longitudinalTuning.kpV = [1.]
  142. ret.longitudinalTuning.kiBP = [0.]
  143. ret.longitudinalTuning.kiV = [1.]
  144. # TODO estimate car specific lag, use .15s for now
  145. ret.longitudinalActuatorDelayLowerBound = 0.15
  146. ret.longitudinalActuatorDelayUpperBound = 0.15
  147. ret.steerLimitTimer = 1.0
  148. return ret
  149. @staticmethod
  150. def configure_torque_tune(candidate, tune, steering_angle_deadzone_deg=0.0, use_steering_angle=True):
  151. params = get_torque_params(candidate)
  152. tune.init('torque')
  153. tune.torque.useSteeringAngle = use_steering_angle
  154. tune.torque.kp = 1.0
  155. tune.torque.kf = 1.0
  156. tune.torque.ki = 0.1
  157. tune.torque.friction = params['FRICTION']
  158. tune.torque.latAccelFactor = params['LAT_ACCEL_FACTOR']
  159. tune.torque.latAccelOffset = 0.0
  160. tune.torque.steeringAngleDeadzoneDeg = steering_angle_deadzone_deg
  161. @abstractmethod
  162. def _update(self, c: car.CarControl) -> car.CarState:
  163. pass
  164. def update(self, c: car.CarControl, can_strings: List[bytes]) -> car.CarState:
  165. # parse can
  166. for cp in self.can_parsers:
  167. if cp is not None:
  168. cp.update_strings(can_strings)
  169. # get CarState
  170. ret = self._update(c)
  171. ret.canValid = all(cp.can_valid for cp in self.can_parsers if cp is not None)
  172. ret.canTimeout = any(cp.bus_timeout for cp in self.can_parsers if cp is not None)
  173. if ret.vEgoCluster == 0.0 and not self.v_ego_cluster_seen:
  174. ret.vEgoCluster = ret.vEgo
  175. else:
  176. self.v_ego_cluster_seen = True
  177. # Many cars apply hysteresis to the ego dash speed
  178. if self.CS is not None:
  179. ret.vEgoCluster = apply_hysteresis(ret.vEgoCluster, self.CS.out.vEgoCluster, self.CS.cluster_speed_hyst_gap)
  180. if abs(ret.vEgo) < self.CS.cluster_min_speed:
  181. ret.vEgoCluster = 0.0
  182. if ret.cruiseState.speedCluster == 0:
  183. ret.cruiseState.speedCluster = ret.cruiseState.speed
  184. # copy back for next iteration
  185. reader = ret.as_reader()
  186. if self.CS is not None:
  187. self.CS.out = reader
  188. return reader
  189. @abstractmethod
  190. def apply(self, c: car.CarControl) -> Tuple[car.CarControl.Actuators, List[bytes]]:
  191. pass
  192. def create_common_events(self, cs_out, extra_gears=None, pcm_enable=True, allow_enable=True,
  193. enable_buttons=(ButtonType.accelCruise, ButtonType.decelCruise)):
  194. events = Events()
  195. if cs_out.doorOpen:
  196. events.add(EventName.doorOpen)
  197. if cs_out.seatbeltUnlatched:
  198. events.add(EventName.seatbeltNotLatched)
  199. if cs_out.gearShifter != GearShifter.drive and (extra_gears is None or
  200. cs_out.gearShifter not in extra_gears):
  201. events.add(EventName.wrongGear)
  202. if cs_out.gearShifter == GearShifter.reverse:
  203. events.add(EventName.reverseGear)
  204. if not cs_out.cruiseState.available:
  205. events.add(EventName.wrongCarMode)
  206. if cs_out.espDisabled:
  207. events.add(EventName.espDisabled)
  208. if cs_out.stockFcw:
  209. events.add(EventName.stockFcw)
  210. if cs_out.stockAeb:
  211. events.add(EventName.stockAeb)
  212. if cs_out.vEgo > MAX_CTRL_SPEED:
  213. events.add(EventName.speedTooHigh)
  214. if cs_out.cruiseState.nonAdaptive:
  215. events.add(EventName.wrongCruiseMode)
  216. if cs_out.brakeHoldActive and self.CP.openpilotLongitudinalControl:
  217. events.add(EventName.brakeHold)
  218. if cs_out.parkingBrake:
  219. events.add(EventName.parkBrake)
  220. if cs_out.accFaulted:
  221. events.add(EventName.accFaulted)
  222. if cs_out.steeringPressed:
  223. events.add(EventName.steerOverride)
  224. # Handle button presses
  225. for b in cs_out.buttonEvents:
  226. # Enable OP long on falling edge of enable buttons (defaults to accelCruise and decelCruise, overridable per-port)
  227. if not self.CP.pcmCruise and (b.type in enable_buttons and not b.pressed):
  228. events.add(EventName.buttonEnable)
  229. # Disable on rising and falling edge of cancel for both stock and OP long
  230. if b.type == ButtonType.cancel:
  231. events.add(EventName.buttonCancel)
  232. # Handle permanent and temporary steering faults
  233. self.steering_unpressed = 0 if cs_out.steeringPressed else self.steering_unpressed + 1
  234. if cs_out.steerFaultTemporary:
  235. # if the user overrode recently, show a less harsh alert
  236. if self.silent_steer_warning or cs_out.standstill or self.steering_unpressed < int(1.5 / DT_CTRL):
  237. self.silent_steer_warning = True
  238. events.add(EventName.steerTempUnavailableSilent)
  239. else:
  240. events.add(EventName.steerTempUnavailable)
  241. else:
  242. self.silent_steer_warning = False
  243. if cs_out.steerFaultPermanent:
  244. events.add(EventName.steerUnavailable)
  245. # we engage when pcm is active (rising edge)
  246. # enabling can optionally be blocked by the car interface
  247. if pcm_enable:
  248. if cs_out.cruiseState.enabled and not self.CS.out.cruiseState.enabled and allow_enable:
  249. events.add(EventName.pcmEnable)
  250. elif not cs_out.cruiseState.enabled:
  251. events.add(EventName.pcmDisable)
  252. return events
  253. class RadarInterfaceBase(ABC):
  254. def __init__(self, CP):
  255. self.rcp = None
  256. self.pts = {}
  257. self.delay = 0
  258. self.radar_ts = CP.radarTimeStep
  259. self.no_radar_sleep = 'NO_RADAR_SLEEP' in os.environ
  260. def update(self, can_strings):
  261. ret = car.RadarData.new_message()
  262. if not self.no_radar_sleep:
  263. time.sleep(self.radar_ts) # radard runs on RI updates
  264. return ret
  265. class CarStateBase(ABC):
  266. def __init__(self, CP):
  267. self.CP = CP
  268. self.car_fingerprint = CP.carFingerprint
  269. self.out = car.CarState.new_message()
  270. self.cruise_buttons = 0
  271. self.left_blinker_cnt = 0
  272. self.right_blinker_cnt = 0
  273. self.steering_pressed_cnt = 0
  274. self.left_blinker_prev = False
  275. self.right_blinker_prev = False
  276. self.cluster_speed_hyst_gap = 0.0
  277. self.cluster_min_speed = 0.0 # min speed before dropping to 0
  278. # Q = np.matrix([[0.0, 0.0], [0.0, 100.0]])
  279. # R = 0.3
  280. self.v_ego_kf = KF1D(x0=[[0.0], [0.0]],
  281. A=[[1.0, DT_CTRL], [0.0, 1.0]],
  282. C=[1.0, 0.0],
  283. K=[[0.17406039], [1.65925647]])
  284. def update_speed_kf(self, v_ego_raw):
  285. if abs(v_ego_raw - self.v_ego_kf.x[0][0]) > 2.0: # Prevent large accelerations when car starts at non zero speed
  286. self.v_ego_kf.x = [[v_ego_raw], [0.0]]
  287. v_ego_x = self.v_ego_kf.update(v_ego_raw)
  288. return float(v_ego_x[0]), float(v_ego_x[1])
  289. def get_wheel_speeds(self, fl, fr, rl, rr, unit=CV.KPH_TO_MS):
  290. factor = unit * self.CP.wheelSpeedFactor
  291. wheelSpeeds = car.CarState.WheelSpeeds.new_message()
  292. wheelSpeeds.fl = fl * factor
  293. wheelSpeeds.fr = fr * factor
  294. wheelSpeeds.rl = rl * factor
  295. wheelSpeeds.rr = rr * factor
  296. return wheelSpeeds
  297. def update_blinker_from_lamp(self, blinker_time: int, left_blinker_lamp: bool, right_blinker_lamp: bool):
  298. """Update blinkers from lights. Enable output when light was seen within the last `blinker_time`
  299. iterations"""
  300. # TODO: Handle case when switching direction. Now both blinkers can be on at the same time
  301. self.left_blinker_cnt = blinker_time if left_blinker_lamp else max(self.left_blinker_cnt - 1, 0)
  302. self.right_blinker_cnt = blinker_time if right_blinker_lamp else max(self.right_blinker_cnt - 1, 0)
  303. return self.left_blinker_cnt > 0, self.right_blinker_cnt > 0
  304. def update_steering_pressed(self, steering_pressed, steering_pressed_min_count):
  305. """Applies filtering on steering pressed for noisy driver torque signals."""
  306. self.steering_pressed_cnt += 1 if steering_pressed else -1
  307. self.steering_pressed_cnt = clip(self.steering_pressed_cnt, 0, steering_pressed_min_count * 2)
  308. return self.steering_pressed_cnt > steering_pressed_min_count
  309. def update_blinker_from_stalk(self, blinker_time: int, left_blinker_stalk: bool, right_blinker_stalk: bool):
  310. """Update blinkers from stalk position. When stalk is seen the blinker will be on for at least blinker_time,
  311. or until the stalk is turned off, whichever is longer. If the opposite stalk direction is seen the blinker
  312. is forced to the other side. On a rising edge of the stalk the timeout is reset."""
  313. if left_blinker_stalk:
  314. self.right_blinker_cnt = 0
  315. if not self.left_blinker_prev:
  316. self.left_blinker_cnt = blinker_time
  317. if right_blinker_stalk:
  318. self.left_blinker_cnt = 0
  319. if not self.right_blinker_prev:
  320. self.right_blinker_cnt = blinker_time
  321. self.left_blinker_cnt = max(self.left_blinker_cnt - 1, 0)
  322. self.right_blinker_cnt = max(self.right_blinker_cnt - 1, 0)
  323. self.left_blinker_prev = left_blinker_stalk
  324. self.right_blinker_prev = right_blinker_stalk
  325. return bool(left_blinker_stalk or self.left_blinker_cnt > 0), bool(right_blinker_stalk or self.right_blinker_cnt > 0)
  326. @staticmethod
  327. def parse_gear_shifter(gear: Optional[str]) -> car.CarState.GearShifter:
  328. if gear is None:
  329. return GearShifter.unknown
  330. d: Dict[str, car.CarState.GearShifter] = {
  331. 'P': GearShifter.park, 'PARK': GearShifter.park,
  332. 'R': GearShifter.reverse, 'REVERSE': GearShifter.reverse,
  333. 'N': GearShifter.neutral, 'NEUTRAL': GearShifter.neutral,
  334. 'E': GearShifter.eco, 'ECO': GearShifter.eco,
  335. 'T': GearShifter.manumatic, 'MANUAL': GearShifter.manumatic,
  336. 'D': GearShifter.drive, 'DRIVE': GearShifter.drive,
  337. 'S': GearShifter.sport, 'SPORT': GearShifter.sport,
  338. 'L': GearShifter.low, 'LOW': GearShifter.low,
  339. 'B': GearShifter.brake, 'BRAKE': GearShifter.brake,
  340. }
  341. return d.get(gear.upper(), GearShifter.unknown)
  342. @staticmethod
  343. def get_cam_can_parser(CP):
  344. return None
  345. @staticmethod
  346. def get_adas_can_parser(CP):
  347. return None
  348. @staticmethod
  349. def get_body_can_parser(CP):
  350. return None
  351. @staticmethod
  352. def get_loopback_can_parser(CP):
  353. return None
  354. # interface-specific helpers
  355. def get_interface_attr(attr: str, combine_brands: bool = False, ignore_none: bool = False) -> Dict[str, Any]:
  356. # read all the folders in selfdrive/car and return a dict where:
  357. # - keys are all the car models or brand names
  358. # - values are attr values from all car folders
  359. result = {}
  360. for car_folder in sorted([x[0] for x in os.walk(BASEDIR + '/selfdrive/car')]):
  361. try:
  362. brand_name = car_folder.split('/')[-1]
  363. brand_values = __import__(f'selfdrive.car.{brand_name}.values', fromlist=[attr])
  364. if hasattr(brand_values, attr) or not ignore_none:
  365. attr_data = getattr(brand_values, attr, None)
  366. else:
  367. continue
  368. if combine_brands:
  369. if isinstance(attr_data, dict):
  370. for f, v in attr_data.items():
  371. result[f] = v
  372. else:
  373. result[brand_name] = attr_data
  374. except (ImportError, OSError):
  375. pass
  376. return result