plannerd.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940
  1. #!/usr/bin/env python3
  2. from cereal import car
  3. from openpilot.common.params import Params
  4. from openpilot.common.realtime import Priority, config_realtime_process
  5. from openpilot.common.swaglog import cloudlog
  6. from openpilot.selfdrive.controls.lib.ldw import LaneDepartureWarning
  7. from openpilot.selfdrive.controls.lib.longitudinal_planner import LongitudinalPlanner
  8. import cereal.messaging as messaging
  9. def main():
  10. config_realtime_process(5, Priority.CTRL_LOW)
  11. cloudlog.info("plannerd is waiting for CarParams")
  12. params = Params()
  13. CP = messaging.log_from_bytes(params.get("CarParams", block=True), car.CarParams)
  14. cloudlog.info("plannerd got CarParams: %s", CP.carName)
  15. ldw = LaneDepartureWarning()
  16. longitudinal_planner = LongitudinalPlanner(CP)
  17. pm = messaging.PubMaster(['longitudinalPlan', 'driverAssistance'])
  18. sm = messaging.SubMaster(['carControl', 'carState', 'controlsState', 'radarState', 'modelV2', 'selfdriveState'],
  19. poll='modelV2', ignore_avg_freq=['radarState'])
  20. while True:
  21. sm.update()
  22. if sm.updated['modelV2']:
  23. longitudinal_planner.update(sm)
  24. longitudinal_planner.publish(sm, pm)
  25. ldw.update(sm.frame, sm['modelV2'], sm['carState'], sm['carControl'])
  26. msg = messaging.new_message('driverAssistance')
  27. msg.valid = sm.all_checks(['carState', 'carControl', 'modelV2'])
  28. msg.driverAssistance.leftLaneDeparture = ldw.left
  29. msg.driverAssistance.rightLaneDeparture = ldw.right
  30. pm.send('driverAssistance', msg)
  31. if __name__ == "__main__":
  32. main()