123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- #!/usr/bin/env python3
- import math
- from typing import SupportsFloat
- from cereal import car, log
- import cereal.messaging as messaging
- from openpilot.common.conversions import Conversions as CV
- from openpilot.common.params import Params
- from openpilot.common.realtime import config_realtime_process, Priority, Ratekeeper
- from openpilot.common.swaglog import cloudlog
- from opendbc.car.car_helpers import get_car_interface
- from openpilot.selfdrive.controls.lib.drive_helpers import clip_curvature
- from openpilot.selfdrive.controls.lib.latcontrol import LatControl, MIN_LATERAL_CONTROL_SPEED
- from openpilot.selfdrive.controls.lib.latcontrol_pid import LatControlPID
- from openpilot.selfdrive.controls.lib.latcontrol_angle import LatControlAngle, STEER_ANGLE_SATURATION_THRESHOLD
- from openpilot.selfdrive.controls.lib.latcontrol_torque import LatControlTorque
- from openpilot.selfdrive.controls.lib.longcontrol import LongControl
- from openpilot.selfdrive.controls.lib.vehicle_model import VehicleModel
- from openpilot.selfdrive.locationd.helpers import PoseCalibrator, Pose
- State = log.SelfdriveState.OpenpilotState
- LaneChangeState = log.LaneChangeState
- LaneChangeDirection = log.LaneChangeDirection
- ACTUATOR_FIELDS = tuple(car.CarControl.Actuators.schema.fields.keys())
- class Controls:
- def __init__(self) -> None:
- self.params = Params()
- cloudlog.info("controlsd is waiting for CarParams")
- self.CP = messaging.log_from_bytes(self.params.get("CarParams", block=True), car.CarParams)
- cloudlog.info("controlsd got CarParams")
- self.CI = get_car_interface(self.CP)
- self.sm = messaging.SubMaster(['liveParameters', 'liveTorqueParameters', 'modelV2', 'selfdriveState',
- 'liveCalibration', 'livePose', 'longitudinalPlan', 'carState', 'carOutput',
- 'driverMonitoringState', 'onroadEvents', 'driverAssistance'], poll='selfdriveState')
- self.pm = messaging.PubMaster(['carControl', 'controlsState'])
- self.steer_limited = False
- self.desired_curvature = 0.0
- self.pose_calibrator = PoseCalibrator()
- self.calibrated_pose: Pose|None = None
- self.LoC = LongControl(self.CP)
- self.VM = VehicleModel(self.CP)
- self.LaC: LatControl
- if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
- self.LaC = LatControlAngle(self.CP, self.CI)
- elif self.CP.lateralTuning.which() == 'pid':
- self.LaC = LatControlPID(self.CP, self.CI)
- elif self.CP.lateralTuning.which() == 'torque':
- self.LaC = LatControlTorque(self.CP, self.CI)
- def update(self):
- self.sm.update(15)
- if self.sm.updated["liveCalibration"]:
- self.pose_calibrator.feed_live_calib(self.sm['liveCalibration'])
- if self.sm.updated["livePose"]:
- device_pose = Pose.from_live_pose(self.sm['livePose'])
- self.calibrated_pose = self.pose_calibrator.build_calibrated_pose(device_pose)
- def state_control(self):
- CS = self.sm['carState']
- # Update VehicleModel
- lp = self.sm['liveParameters']
- x = max(lp.stiffnessFactor, 0.1)
- sr = max(lp.steerRatio, 0.1)
- self.VM.update_params(x, sr)
- # Update Torque Params
- if self.CP.lateralTuning.which() == 'torque':
- torque_params = self.sm['liveTorqueParameters']
- if self.sm.all_checks(['liveTorqueParameters']) and torque_params.useParams:
- self.LaC.update_live_torque_params(torque_params.latAccelFactorFiltered, torque_params.latAccelOffsetFiltered,
- torque_params.frictionCoefficientFiltered)
- long_plan = self.sm['longitudinalPlan']
- model_v2 = self.sm['modelV2']
- CC = car.CarControl.new_message()
- CC.enabled = self.sm['selfdriveState'].enabled
- # Check which actuators can be enabled
- standstill = abs(CS.vEgo) <= max(self.CP.minSteerSpeed, MIN_LATERAL_CONTROL_SPEED) or CS.standstill
- CC.latActive = self.sm['selfdriveState'].active and not CS.steerFaultTemporary and not CS.steerFaultPermanent and not standstill
- CC.longActive = CC.enabled and not any(e.overrideLongitudinal for e in self.sm['onroadEvents']) and self.CP.openpilotLongitudinalControl
- actuators = CC.actuators
- actuators.longControlState = self.LoC.long_control_state
- # Enable blinkers while lane changing
- if model_v2.meta.laneChangeState != LaneChangeState.off:
- CC.leftBlinker = model_v2.meta.laneChangeDirection == LaneChangeDirection.left
- CC.rightBlinker = model_v2.meta.laneChangeDirection == LaneChangeDirection.right
- if not CC.latActive:
- self.LaC.reset()
- if not CC.longActive:
- self.LoC.reset()
- # accel PID loop
- pid_accel_limits = self.CI.get_pid_accel_limits(self.CP, CS.vEgo, CS.vCruise * CV.KPH_TO_MS)
- actuators.accel = self.LoC.update(CC.longActive, CS, long_plan.aTarget, long_plan.shouldStop, pid_accel_limits)
- # Steering PID loop and lateral MPC
- self.desired_curvature = clip_curvature(CS.vEgo, self.desired_curvature, model_v2.action.desiredCurvature)
- actuators.curvature = self.desired_curvature
- actuators.steer, actuators.steeringAngleDeg, lac_log = self.LaC.update(CC.latActive, CS, self.VM, lp,
- self.steer_limited, self.desired_curvature,
- self.calibrated_pose) # TODO what if not available
- # Ensure no NaNs/Infs
- for p in ACTUATOR_FIELDS:
- attr = getattr(actuators, p)
- if not isinstance(attr, SupportsFloat):
- continue
- if not math.isfinite(attr):
- cloudlog.error(f"actuators.{p} not finite {actuators.to_dict()}")
- setattr(actuators, p, 0.0)
- return CC, lac_log
- def publish(self, CC, lac_log):
- CS = self.sm['carState']
- # Orientation and angle rates can be useful for carcontroller
- # Only calibrated (car) frame is relevant for the carcontroller
- if self.calibrated_pose is not None:
- CC.orientationNED = self.calibrated_pose.orientation.xyz.tolist()
- CC.angularVelocity = self.calibrated_pose.angular_velocity.xyz.tolist()
- CC.cruiseControl.override = CC.enabled and not CC.longActive and self.CP.openpilotLongitudinalControl
- CC.cruiseControl.cancel = CS.cruiseState.enabled and (not CC.enabled or not self.CP.pcmCruise)
- speeds = self.sm['longitudinalPlan'].speeds
- if len(speeds):
- CC.cruiseControl.resume = CC.enabled and CS.cruiseState.standstill and speeds[-1] > 0.1
- hudControl = CC.hudControl
- hudControl.setSpeed = float(CS.vCruiseCluster * CV.KPH_TO_MS)
- hudControl.speedVisible = CC.enabled
- hudControl.lanesVisible = CC.enabled
- hudControl.leadVisible = self.sm['longitudinalPlan'].hasLead
- hudControl.leadDistanceBars = self.sm['selfdriveState'].personality.raw + 1
- hudControl.visualAlert = self.sm['selfdriveState'].alertHudVisual
- hudControl.rightLaneVisible = True
- hudControl.leftLaneVisible = True
- if self.sm.valid['driverAssistance']:
- hudControl.leftLaneDepart = self.sm['driverAssistance'].leftLaneDeparture
- hudControl.rightLaneDepart = self.sm['driverAssistance'].rightLaneDeparture
- if self.sm['selfdriveState'].active:
- CO = self.sm['carOutput']
- if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
- self.steer_limited = abs(CC.actuators.steeringAngleDeg - CO.actuatorsOutput.steeringAngleDeg) > \
- STEER_ANGLE_SATURATION_THRESHOLD
- else:
- self.steer_limited = abs(CC.actuators.steer - CO.actuatorsOutput.steer) > 1e-2
- # TODO: both controlsState and carControl valids should be set by
- # sm.all_checks(), but this creates a circular dependency
- # controlsState
- dat = messaging.new_message('controlsState')
- dat.valid = CS.canValid
- cs = dat.controlsState
- lp = self.sm['liveParameters']
- steer_angle_without_offset = math.radians(CS.steeringAngleDeg - lp.angleOffsetDeg)
- cs.curvature = -self.VM.calc_curvature(steer_angle_without_offset, CS.vEgo, lp.roll)
- cs.longitudinalPlanMonoTime = self.sm.logMonoTime['longitudinalPlan']
- cs.lateralPlanMonoTime = self.sm.logMonoTime['modelV2']
- cs.desiredCurvature = self.desired_curvature
- cs.longControlState = self.LoC.long_control_state
- cs.upAccelCmd = float(self.LoC.pid.p)
- cs.uiAccelCmd = float(self.LoC.pid.i)
- cs.ufAccelCmd = float(self.LoC.pid.f)
- cs.forceDecel = bool((self.sm['driverMonitoringState'].awarenessStatus < 0.) or
- (self.sm['selfdriveState'].state == State.softDisabling))
- lat_tuning = self.CP.lateralTuning.which()
- if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
- cs.lateralControlState.angleState = lac_log
- elif lat_tuning == 'pid':
- cs.lateralControlState.pidState = lac_log
- elif lat_tuning == 'torque':
- cs.lateralControlState.torqueState = lac_log
- self.pm.send('controlsState', dat)
- # carControl
- cc_send = messaging.new_message('carControl')
- cc_send.valid = CS.canValid
- cc_send.carControl = CC
- self.pm.send('carControl', cc_send)
- def run(self):
- rk = Ratekeeper(100, print_delay_threshold=None)
- while True:
- self.update()
- CC, lac_log = self.state_control()
- self.publish(CC, lac_log)
- rk.keep_time()
- def main():
- config_realtime_process(4, Priority.CTRL_HIGH)
- controls = Controls()
- controls.run()
- if __name__ == "__main__":
- main()
|