controlsd.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. #!/usr/bin/env python3
  2. import math
  3. from typing import SupportsFloat
  4. from cereal import car, log
  5. import cereal.messaging as messaging
  6. from openpilot.common.conversions import Conversions as CV
  7. from openpilot.common.params import Params
  8. from openpilot.common.realtime import config_realtime_process, Priority, Ratekeeper
  9. from openpilot.common.swaglog import cloudlog
  10. from opendbc.car.car_helpers import get_car_interface
  11. from openpilot.selfdrive.controls.lib.drive_helpers import clip_curvature
  12. from openpilot.selfdrive.controls.lib.latcontrol import LatControl, MIN_LATERAL_CONTROL_SPEED
  13. from openpilot.selfdrive.controls.lib.latcontrol_pid import LatControlPID
  14. from openpilot.selfdrive.controls.lib.latcontrol_angle import LatControlAngle, STEER_ANGLE_SATURATION_THRESHOLD
  15. from openpilot.selfdrive.controls.lib.latcontrol_torque import LatControlTorque
  16. from openpilot.selfdrive.controls.lib.longcontrol import LongControl
  17. from openpilot.selfdrive.controls.lib.vehicle_model import VehicleModel
  18. from openpilot.selfdrive.locationd.helpers import PoseCalibrator, Pose
  19. State = log.SelfdriveState.OpenpilotState
  20. LaneChangeState = log.LaneChangeState
  21. LaneChangeDirection = log.LaneChangeDirection
  22. ACTUATOR_FIELDS = tuple(car.CarControl.Actuators.schema.fields.keys())
  23. class Controls:
  24. def __init__(self) -> None:
  25. self.params = Params()
  26. cloudlog.info("controlsd is waiting for CarParams")
  27. self.CP = messaging.log_from_bytes(self.params.get("CarParams", block=True), car.CarParams)
  28. cloudlog.info("controlsd got CarParams")
  29. self.CI = get_car_interface(self.CP)
  30. self.sm = messaging.SubMaster(['liveParameters', 'liveTorqueParameters', 'modelV2', 'selfdriveState',
  31. 'liveCalibration', 'livePose', 'longitudinalPlan', 'carState', 'carOutput',
  32. 'driverMonitoringState', 'onroadEvents', 'driverAssistance'], poll='selfdriveState')
  33. self.pm = messaging.PubMaster(['carControl', 'controlsState'])
  34. self.steer_limited = False
  35. self.desired_curvature = 0.0
  36. self.pose_calibrator = PoseCalibrator()
  37. self.calibrated_pose: Pose|None = None
  38. self.LoC = LongControl(self.CP)
  39. self.VM = VehicleModel(self.CP)
  40. self.LaC: LatControl
  41. if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
  42. self.LaC = LatControlAngle(self.CP, self.CI)
  43. elif self.CP.lateralTuning.which() == 'pid':
  44. self.LaC = LatControlPID(self.CP, self.CI)
  45. elif self.CP.lateralTuning.which() == 'torque':
  46. self.LaC = LatControlTorque(self.CP, self.CI)
  47. def update(self):
  48. self.sm.update(15)
  49. if self.sm.updated["liveCalibration"]:
  50. self.pose_calibrator.feed_live_calib(self.sm['liveCalibration'])
  51. if self.sm.updated["livePose"]:
  52. device_pose = Pose.from_live_pose(self.sm['livePose'])
  53. self.calibrated_pose = self.pose_calibrator.build_calibrated_pose(device_pose)
  54. def state_control(self):
  55. CS = self.sm['carState']
  56. # Update VehicleModel
  57. lp = self.sm['liveParameters']
  58. x = max(lp.stiffnessFactor, 0.1)
  59. sr = max(lp.steerRatio, 0.1)
  60. self.VM.update_params(x, sr)
  61. # Update Torque Params
  62. if self.CP.lateralTuning.which() == 'torque':
  63. torque_params = self.sm['liveTorqueParameters']
  64. if self.sm.all_checks(['liveTorqueParameters']) and torque_params.useParams:
  65. self.LaC.update_live_torque_params(torque_params.latAccelFactorFiltered, torque_params.latAccelOffsetFiltered,
  66. torque_params.frictionCoefficientFiltered)
  67. long_plan = self.sm['longitudinalPlan']
  68. model_v2 = self.sm['modelV2']
  69. CC = car.CarControl.new_message()
  70. CC.enabled = self.sm['selfdriveState'].enabled
  71. # Check which actuators can be enabled
  72. standstill = abs(CS.vEgo) <= max(self.CP.minSteerSpeed, MIN_LATERAL_CONTROL_SPEED) or CS.standstill
  73. CC.latActive = self.sm['selfdriveState'].active and not CS.steerFaultTemporary and not CS.steerFaultPermanent and not standstill
  74. CC.longActive = CC.enabled and not any(e.overrideLongitudinal for e in self.sm['onroadEvents']) and self.CP.openpilotLongitudinalControl
  75. actuators = CC.actuators
  76. actuators.longControlState = self.LoC.long_control_state
  77. # Enable blinkers while lane changing
  78. if model_v2.meta.laneChangeState != LaneChangeState.off:
  79. CC.leftBlinker = model_v2.meta.laneChangeDirection == LaneChangeDirection.left
  80. CC.rightBlinker = model_v2.meta.laneChangeDirection == LaneChangeDirection.right
  81. if not CC.latActive:
  82. self.LaC.reset()
  83. if not CC.longActive:
  84. self.LoC.reset()
  85. # accel PID loop
  86. pid_accel_limits = self.CI.get_pid_accel_limits(self.CP, CS.vEgo, CS.vCruise * CV.KPH_TO_MS)
  87. actuators.accel = self.LoC.update(CC.longActive, CS, long_plan.aTarget, long_plan.shouldStop, pid_accel_limits)
  88. # Steering PID loop and lateral MPC
  89. self.desired_curvature = clip_curvature(CS.vEgo, self.desired_curvature, model_v2.action.desiredCurvature)
  90. actuators.curvature = self.desired_curvature
  91. actuators.steer, actuators.steeringAngleDeg, lac_log = self.LaC.update(CC.latActive, CS, self.VM, lp,
  92. self.steer_limited, self.desired_curvature,
  93. self.calibrated_pose) # TODO what if not available
  94. # Ensure no NaNs/Infs
  95. for p in ACTUATOR_FIELDS:
  96. attr = getattr(actuators, p)
  97. if not isinstance(attr, SupportsFloat):
  98. continue
  99. if not math.isfinite(attr):
  100. cloudlog.error(f"actuators.{p} not finite {actuators.to_dict()}")
  101. setattr(actuators, p, 0.0)
  102. return CC, lac_log
  103. def publish(self, CC, lac_log):
  104. CS = self.sm['carState']
  105. # Orientation and angle rates can be useful for carcontroller
  106. # Only calibrated (car) frame is relevant for the carcontroller
  107. if self.calibrated_pose is not None:
  108. CC.orientationNED = self.calibrated_pose.orientation.xyz.tolist()
  109. CC.angularVelocity = self.calibrated_pose.angular_velocity.xyz.tolist()
  110. CC.cruiseControl.override = CC.enabled and not CC.longActive and self.CP.openpilotLongitudinalControl
  111. CC.cruiseControl.cancel = CS.cruiseState.enabled and (not CC.enabled or not self.CP.pcmCruise)
  112. speeds = self.sm['longitudinalPlan'].speeds
  113. if len(speeds):
  114. CC.cruiseControl.resume = CC.enabled and CS.cruiseState.standstill and speeds[-1] > 0.1
  115. hudControl = CC.hudControl
  116. hudControl.setSpeed = float(CS.vCruiseCluster * CV.KPH_TO_MS)
  117. hudControl.speedVisible = CC.enabled
  118. hudControl.lanesVisible = CC.enabled
  119. hudControl.leadVisible = self.sm['longitudinalPlan'].hasLead
  120. hudControl.leadDistanceBars = self.sm['selfdriveState'].personality.raw + 1
  121. hudControl.visualAlert = self.sm['selfdriveState'].alertHudVisual
  122. hudControl.rightLaneVisible = True
  123. hudControl.leftLaneVisible = True
  124. if self.sm.valid['driverAssistance']:
  125. hudControl.leftLaneDepart = self.sm['driverAssistance'].leftLaneDeparture
  126. hudControl.rightLaneDepart = self.sm['driverAssistance'].rightLaneDeparture
  127. if self.sm['selfdriveState'].active:
  128. CO = self.sm['carOutput']
  129. if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
  130. self.steer_limited = abs(CC.actuators.steeringAngleDeg - CO.actuatorsOutput.steeringAngleDeg) > \
  131. STEER_ANGLE_SATURATION_THRESHOLD
  132. else:
  133. self.steer_limited = abs(CC.actuators.steer - CO.actuatorsOutput.steer) > 1e-2
  134. # TODO: both controlsState and carControl valids should be set by
  135. # sm.all_checks(), but this creates a circular dependency
  136. # controlsState
  137. dat = messaging.new_message('controlsState')
  138. dat.valid = CS.canValid
  139. cs = dat.controlsState
  140. lp = self.sm['liveParameters']
  141. steer_angle_without_offset = math.radians(CS.steeringAngleDeg - lp.angleOffsetDeg)
  142. cs.curvature = -self.VM.calc_curvature(steer_angle_without_offset, CS.vEgo, lp.roll)
  143. cs.longitudinalPlanMonoTime = self.sm.logMonoTime['longitudinalPlan']
  144. cs.lateralPlanMonoTime = self.sm.logMonoTime['modelV2']
  145. cs.desiredCurvature = self.desired_curvature
  146. cs.longControlState = self.LoC.long_control_state
  147. cs.upAccelCmd = float(self.LoC.pid.p)
  148. cs.uiAccelCmd = float(self.LoC.pid.i)
  149. cs.ufAccelCmd = float(self.LoC.pid.f)
  150. cs.forceDecel = bool((self.sm['driverMonitoringState'].awarenessStatus < 0.) or
  151. (self.sm['selfdriveState'].state == State.softDisabling))
  152. lat_tuning = self.CP.lateralTuning.which()
  153. if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
  154. cs.lateralControlState.angleState = lac_log
  155. elif lat_tuning == 'pid':
  156. cs.lateralControlState.pidState = lac_log
  157. elif lat_tuning == 'torque':
  158. cs.lateralControlState.torqueState = lac_log
  159. self.pm.send('controlsState', dat)
  160. # carControl
  161. cc_send = messaging.new_message('carControl')
  162. cc_send.valid = CS.canValid
  163. cc_send.carControl = CC
  164. self.pm.send('carControl', cc_send)
  165. def run(self):
  166. rk = Ratekeeper(100, print_delay_threshold=None)
  167. while True:
  168. self.update()
  169. CC, lac_log = self.state_control()
  170. self.publish(CC, lac_log)
  171. rk.keep_time()
  172. def main():
  173. config_realtime_process(4, Priority.CTRL_HIGH)
  174. controls = Controls()
  175. controls.run()
  176. if __name__ == "__main__":
  177. main()