123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307 |
- #!/usr/bin/env python3
- import math
- import os
- import threading
- import requests
- import numpy as np
- import cereal.messaging as messaging
- from cereal import log
- from common.api import Api
- from common.params import Params
- from common.realtime import Ratekeeper
- from common.transformations.coordinates import ecef2geodetic
- from selfdrive.navd.helpers import (Coordinate, coordinate_from_param,
- distance_along_geometry, maxspeed_to_ms,
- minimum_distance,
- parse_banner_instructions)
- from system.swaglog import cloudlog
- REROUTE_DISTANCE = 25
- MANEUVER_TRANSITION_THRESHOLD = 10
- VALID_POS_STD = 50.0
- class RouteEngine:
- def __init__(self, sm, pm):
- self.sm = sm
- self.pm = pm
- self.params = Params()
- # Get last gps position from params
- self.last_position = coordinate_from_param("LastGPSPosition", self.params)
- self.last_bearing = None
- self.gps_ok = False
- self.localizer_valid = False
- self.nav_destination = None
- self.step_idx = None
- self.route = None
- self.route_geometry = None
- self.recompute_backoff = 0
- self.recompute_countdown = 0
- self.ui_pid = None
- if "MAPBOX_TOKEN" in os.environ:
- self.mapbox_token = os.environ["MAPBOX_TOKEN"]
- self.mapbox_host = "https://api.mapbox.com"
- else:
- try:
- self.mapbox_token = Api(self.params.get("DongleId", encoding='utf8')).get_token(expiry_hours=4 * 7 * 24)
- except FileNotFoundError:
- cloudlog.exception("Failed to generate mapbox token due to missing private key. Ensure device is registered.")
- self.mapbox_token = ""
- self.mapbox_host = "https://maps.comma.ai"
- def update(self):
- self.sm.update(0)
- if self.sm.updated["managerState"]:
- ui_pid = [p.pid for p in self.sm["managerState"].processes if p.name == "ui" and p.running]
- if ui_pid:
- if self.ui_pid and self.ui_pid != ui_pid[0]:
- cloudlog.warning("UI restarting, sending route")
- threading.Timer(5.0, self.send_route).start()
- self.ui_pid = ui_pid[0]
- self.update_location()
- self.recompute_route()
- self.send_instruction()
- def update_location(self):
- location = self.sm['liveLocationKalman']
- laikad = self.sm['gnssMeasurements']
- locationd_valid = (location.status == log.LiveLocationKalman.Status.valid) and location.positionGeodetic.valid
- laikad_valid = laikad.positionECEF.valid and np.linalg.norm(laikad.positionECEF.std) < VALID_POS_STD
- self.localizer_valid = locationd_valid or laikad_valid
- self.gps_ok = location.gpsOK or laikad_valid
- if locationd_valid:
- self.last_bearing = math.degrees(location.calibratedOrientationNED.value[2])
- self.last_position = Coordinate(location.positionGeodetic.value[0], location.positionGeodetic.value[1])
- elif laikad_valid:
- geodetic = ecef2geodetic(laikad.positionECEF.value)
- self.last_position = Coordinate(geodetic[0], geodetic[1])
- self.last_bearing = None
- def recompute_route(self):
- if self.last_position is None:
- return
- new_destination = coordinate_from_param("NavDestination", self.params)
- if new_destination is None:
- self.clear_route()
- return
- should_recompute = self.should_recompute()
- if new_destination != self.nav_destination:
- cloudlog.warning(f"Got new destination from NavDestination param {new_destination}")
- should_recompute = True
- # Don't recompute when GPS drifts in tunnels
- if not self.gps_ok and self.step_idx is not None:
- return
- if self.recompute_countdown == 0 and should_recompute:
- self.recompute_countdown = 2**self.recompute_backoff
- self.recompute_backoff = min(6, self.recompute_backoff + 1)
- self.calculate_route(new_destination)
- else:
- self.recompute_countdown = max(0, self.recompute_countdown - 1)
- def calculate_route(self, destination):
- cloudlog.warning(f"Calculating route {self.last_position} -> {destination}")
- self.nav_destination = destination
- lang = self.params.get('LanguageSetting', encoding='utf8')
- if lang is not None:
- lang = lang.replace('main_', '')
- params = {
- 'access_token': self.mapbox_token,
- 'annotations': 'maxspeed',
- 'geometries': 'geojson',
- 'overview': 'full',
- 'steps': 'true',
- 'banner_instructions': 'true',
- 'alternatives': 'false',
- 'language': lang,
- }
- if self.last_bearing is not None:
- params['bearings'] = f"{(self.last_bearing + 360) % 360:.0f},90;"
- url = self.mapbox_host + f'/directions/v5/mapbox/driving-traffic/{self.last_position.longitude},{self.last_position.latitude};{destination.longitude},{destination.latitude}'
- try:
- resp = requests.get(url, params=params, timeout=10)
- resp.raise_for_status()
- r = resp.json()
- if len(r['routes']):
- self.route = r['routes'][0]['legs'][0]['steps']
- self.route_geometry = []
- maxspeed_idx = 0
- maxspeeds = r['routes'][0]['legs'][0]['annotation']['maxspeed']
- # Convert coordinates
- for step in self.route:
- coords = []
- for c in step['geometry']['coordinates']:
- coord = Coordinate.from_mapbox_tuple(c)
- # Last step does not have maxspeed
- if (maxspeed_idx < len(maxspeeds)):
- maxspeed = maxspeeds[maxspeed_idx]
- if ('unknown' not in maxspeed) and ('none' not in maxspeed):
- coord.annotations['maxspeed'] = maxspeed_to_ms(maxspeed)
- coords.append(coord)
- maxspeed_idx += 1
- self.route_geometry.append(coords)
- maxspeed_idx -= 1 # Every segment ends with the same coordinate as the start of the next
- self.step_idx = 0
- else:
- cloudlog.warning("Got empty route response")
- self.clear_route()
- except requests.exceptions.RequestException:
- cloudlog.exception("failed to get route")
- self.clear_route()
- self.send_route()
- def send_instruction(self):
- msg = messaging.new_message('navInstruction')
- if self.step_idx is None:
- msg.valid = False
- self.pm.send('navInstruction', msg)
- return
- step = self.route[self.step_idx]
- geometry = self.route_geometry[self.step_idx]
- along_geometry = distance_along_geometry(geometry, self.last_position)
- distance_to_maneuver_along_geometry = step['distance'] - along_geometry
- # Current instruction
- msg.navInstruction.maneuverDistance = distance_to_maneuver_along_geometry
- parse_banner_instructions(msg.navInstruction, step['bannerInstructions'], distance_to_maneuver_along_geometry)
- # Compute total remaining time and distance
- remaining = 1.0 - along_geometry / max(step['distance'], 1)
- total_distance = step['distance'] * remaining
- total_time = step['duration'] * remaining
- total_time_typical = step['duration_typical'] * remaining
- # Add up totals for future steps
- for i in range(self.step_idx + 1, len(self.route)):
- total_distance += self.route[i]['distance']
- total_time += self.route[i]['duration']
- total_time_typical += self.route[i]['duration_typical']
- msg.navInstruction.distanceRemaining = total_distance
- msg.navInstruction.timeRemaining = total_time
- msg.navInstruction.timeRemainingTypical = total_time_typical
- # Speed limit
- closest_idx, closest = min(enumerate(geometry), key=lambda p: p[1].distance_to(self.last_position))
- if closest_idx > 0:
- # If we are not past the closest point, show previous
- if along_geometry < distance_along_geometry(geometry, geometry[closest_idx]):
- closest = geometry[closest_idx - 1]
- if ('maxspeed' in closest.annotations) and self.localizer_valid:
- msg.navInstruction.speedLimit = closest.annotations['maxspeed']
- # Speed limit sign type
- if 'speedLimitSign' in step:
- if step['speedLimitSign'] == 'mutcd':
- msg.navInstruction.speedLimitSign = log.NavInstruction.SpeedLimitSign.mutcd
- elif step['speedLimitSign'] == 'vienna':
- msg.navInstruction.speedLimitSign = log.NavInstruction.SpeedLimitSign.vienna
- self.pm.send('navInstruction', msg)
- # Transition to next route segment
- if distance_to_maneuver_along_geometry < -MANEUVER_TRANSITION_THRESHOLD:
- if self.step_idx + 1 < len(self.route):
- self.step_idx += 1
- self.recompute_backoff = 0
- self.recompute_countdown = 0
- else:
- cloudlog.warning("Destination reached")
- Params().remove("NavDestination")
- # Clear route if driving away from destination
- dist = self.nav_destination.distance_to(self.last_position)
- if dist > REROUTE_DISTANCE:
- self.clear_route()
- def send_route(self):
- coords = []
- if self.route is not None:
- for path in self.route_geometry:
- coords += [c.as_dict() for c in path]
- msg = messaging.new_message('navRoute')
- msg.navRoute.coordinates = coords
- self.pm.send('navRoute', msg)
- def clear_route(self):
- self.route = None
- self.route_geometry = None
- self.step_idx = None
- self.nav_destination = None
- def should_recompute(self):
- if self.step_idx is None or self.route is None:
- return True
- # Don't recompute in last segment, assume destination is reached
- if self.step_idx == len(self.route) - 1:
- return False
- # Compute closest distance to all line segments in the current path
- min_d = REROUTE_DISTANCE + 1
- path = self.route_geometry[self.step_idx]
- for i in range(len(path) - 1):
- a = path[i]
- b = path[i + 1]
- if a.distance_to(b) < 1.0:
- continue
- min_d = min(min_d, minimum_distance(a, b, self.last_position))
- return min_d > REROUTE_DISTANCE
- # TODO: Check for going wrong way in segment
- def main(sm=None, pm=None):
- if sm is None:
- sm = messaging.SubMaster(['liveLocationKalman', 'gnssMeasurements', 'managerState'])
- if pm is None:
- pm = messaging.PubMaster(['navInstruction', 'navRoute'])
- rk = Ratekeeper(1.0)
- route_engine = RouteEngine(sm, pm)
- while True:
- route_engine.update()
- rk.keep_time()
- if __name__ == "__main__":
- main()
|