123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- from openpilot.common.numpy_fast import clip, interp
- from opendbc.can.packer import CANPacker
- from openpilot.selfdrive.car import apply_driver_steer_torque_limits, common_fault_avoidance
- from openpilot.selfdrive.car.interfaces import CarControllerBase
- from openpilot.selfdrive.car.subaru import subarucan
- from openpilot.selfdrive.car.subaru.values import DBC, GLOBAL_ES_ADDR, CanBus, CarControllerParams, SubaruFlags
- # FIXME: These limits aren't exact. The real limit is more than likely over a larger time period and
- # involves the total steering angle change rather than rate, but these limits work well for now
- MAX_STEER_RATE = 25 # deg/s
- MAX_STEER_RATE_FRAMES = 7 # tx control frames needed before torque can be cut
- class CarController(CarControllerBase):
- def __init__(self, dbc_name, CP, VM):
- self.CP = CP
- self.apply_steer_last = 0
- self.frame = 0
- self.cruise_button_prev = 0
- self.steer_rate_counter = 0
- self.p = CarControllerParams(CP)
- self.packer = CANPacker(DBC[CP.carFingerprint]['pt'])
- def update(self, CC, CS, now_nanos):
- actuators = CC.actuators
- hud_control = CC.hudControl
- pcm_cancel_cmd = CC.cruiseControl.cancel
- can_sends = []
- # *** steering ***
- if (self.frame % self.p.STEER_STEP) == 0:
- apply_steer = int(round(actuators.steer * self.p.STEER_MAX))
- # limits due to driver torque
- new_steer = int(round(apply_steer))
- apply_steer = apply_driver_steer_torque_limits(new_steer, self.apply_steer_last, CS.out.steeringTorque, self.p)
- if not CC.latActive:
- apply_steer = 0
- if self.CP.flags & SubaruFlags.PREGLOBAL:
- can_sends.append(subarucan.create_preglobal_steering_control(self.packer, self.frame // self.p.STEER_STEP, apply_steer, CC.latActive))
- else:
- apply_steer_req = CC.latActive
- if self.CP.flags & SubaruFlags.STEER_RATE_LIMITED:
- # Steering rate fault prevention
- self.steer_rate_counter, apply_steer_req = \
- common_fault_avoidance(abs(CS.out.steeringRateDeg) > MAX_STEER_RATE, apply_steer_req,
- self.steer_rate_counter, MAX_STEER_RATE_FRAMES)
- can_sends.append(subarucan.create_steering_control(self.packer, apply_steer, apply_steer_req))
- self.apply_steer_last = apply_steer
- # *** longitudinal ***
- if CC.longActive:
- apply_throttle = int(round(interp(actuators.accel, CarControllerParams.THROTTLE_LOOKUP_BP, CarControllerParams.THROTTLE_LOOKUP_V)))
- apply_rpm = int(round(interp(actuators.accel, CarControllerParams.RPM_LOOKUP_BP, CarControllerParams.RPM_LOOKUP_V)))
- apply_brake = int(round(interp(actuators.accel, CarControllerParams.BRAKE_LOOKUP_BP, CarControllerParams.BRAKE_LOOKUP_V)))
- # limit min and max values
- cruise_throttle = clip(apply_throttle, CarControllerParams.THROTTLE_MIN, CarControllerParams.THROTTLE_MAX)
- cruise_rpm = clip(apply_rpm, CarControllerParams.RPM_MIN, CarControllerParams.RPM_MAX)
- cruise_brake = clip(apply_brake, CarControllerParams.BRAKE_MIN, CarControllerParams.BRAKE_MAX)
- else:
- cruise_throttle = CarControllerParams.THROTTLE_INACTIVE
- cruise_rpm = CarControllerParams.RPM_MIN
- cruise_brake = CarControllerParams.BRAKE_MIN
- # *** alerts and pcm cancel ***
- if self.CP.flags & SubaruFlags.PREGLOBAL:
- if self.frame % 5 == 0:
- # 1 = main, 2 = set shallow, 3 = set deep, 4 = resume shallow, 5 = resume deep
- # disengage ACC when OP is disengaged
- if pcm_cancel_cmd:
- cruise_button = 1
- # turn main on if off and past start-up state
- elif not CS.out.cruiseState.available and CS.ready:
- cruise_button = 1
- else:
- cruise_button = CS.cruise_button
- # unstick previous mocked button press
- if cruise_button == 1 and self.cruise_button_prev == 1:
- cruise_button = 0
- self.cruise_button_prev = cruise_button
- can_sends.append(subarucan.create_preglobal_es_distance(self.packer, cruise_button, CS.es_distance_msg))
- else:
- if self.frame % 10 == 0:
- can_sends.append(subarucan.create_es_dashstatus(self.packer, self.frame // 10, CS.es_dashstatus_msg, CC.enabled,
- self.CP.openpilotLongitudinalControl, CC.longActive, hud_control.leadVisible))
- can_sends.append(subarucan.create_es_lkas_state(self.packer, self.frame // 10, CS.es_lkas_state_msg, CC.enabled, hud_control.visualAlert,
- hud_control.leftLaneVisible, hud_control.rightLaneVisible,
- hud_control.leftLaneDepart, hud_control.rightLaneDepart))
- if self.CP.flags & SubaruFlags.SEND_INFOTAINMENT:
- can_sends.append(subarucan.create_es_infotainment(self.packer, self.frame // 10, CS.es_infotainment_msg, hud_control.visualAlert))
- if self.CP.openpilotLongitudinalControl:
- if self.frame % 5 == 0:
- can_sends.append(subarucan.create_es_status(self.packer, self.frame // 5, CS.es_status_msg,
- self.CP.openpilotLongitudinalControl, CC.longActive, cruise_rpm))
- can_sends.append(subarucan.create_es_brake(self.packer, self.frame // 5, CS.es_brake_msg,
- self.CP.openpilotLongitudinalControl, CC.longActive, cruise_brake))
- can_sends.append(subarucan.create_es_distance(self.packer, self.frame // 5, CS.es_distance_msg, 0, pcm_cancel_cmd,
- self.CP.openpilotLongitudinalControl, cruise_brake > 0, cruise_throttle))
- else:
- if pcm_cancel_cmd:
- if not (self.CP.flags & SubaruFlags.HYBRID):
- bus = CanBus.alt if self.CP.flags & SubaruFlags.GLOBAL_GEN2 else CanBus.main
- can_sends.append(subarucan.create_es_distance(self.packer, CS.es_distance_msg["COUNTER"] + 1, CS.es_distance_msg, bus, pcm_cancel_cmd))
- if self.CP.flags & SubaruFlags.DISABLE_EYESIGHT:
- # Tester present (keeps eyesight disabled)
- if self.frame % 100 == 0:
- can_sends.append([GLOBAL_ES_ADDR, 0, b"\x02\x3E\x80\x00\x00\x00\x00\x00", CanBus.camera])
- # Create all of the other eyesight messages to keep the rest of the car happy when eyesight is disabled
- if self.frame % 5 == 0:
- can_sends.append(subarucan.create_es_highbeamassist(self.packer))
- if self.frame % 10 == 0:
- can_sends.append(subarucan.create_es_static_1(self.packer))
- if self.frame % 2 == 0:
- can_sends.append(subarucan.create_es_static_2(self.packer))
- new_actuators = actuators.copy()
- new_actuators.steer = self.apply_steer_last / self.p.STEER_MAX
- new_actuators.steerOutputCan = self.apply_steer_last
- self.frame += 1
- return new_actuators, can_sends
|