123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- import time
- from cereal import log
- import cereal.messaging as messaging
- from openpilot.common.realtime import DT_DMON
- from openpilot.tools.sim.lib.camerad import Camerad
- from typing import TYPE_CHECKING
- if TYPE_CHECKING:
- from openpilot.tools.sim.lib.common import World, SimulatorState
- class SimulatedSensors:
- """Simulates the C3 sensors (acc, gyro, gps, peripherals, dm state, cameras) to OpenPilot"""
- def __init__(self, dual_camera=False):
- self.pm = messaging.PubMaster(['accelerometer', 'gyroscope', 'gpsLocationExternal', 'driverStateV2', 'driverMonitoringState', 'peripheralState'])
- self.camerad = Camerad(dual_camera=dual_camera)
- self.last_perp_update = 0
- self.last_dmon_update = 0
- def send_imu_message(self, simulator_state: 'SimulatorState'):
- for _ in range(5):
- dat = messaging.new_message('accelerometer', valid=True)
- dat.accelerometer.sensor = 4
- dat.accelerometer.type = 0x10
- dat.accelerometer.timestamp = dat.logMonoTime # TODO: use the IMU timestamp
- dat.accelerometer.init('acceleration')
- dat.accelerometer.acceleration.v = [simulator_state.imu.accelerometer.x, simulator_state.imu.accelerometer.y, simulator_state.imu.accelerometer.z]
- self.pm.send('accelerometer', dat)
- # copied these numbers from locationd
- dat = messaging.new_message('gyroscope', valid=True)
- dat.gyroscope.sensor = 5
- dat.gyroscope.type = 0x10
- dat.gyroscope.timestamp = dat.logMonoTime # TODO: use the IMU timestamp
- dat.gyroscope.init('gyroUncalibrated')
- dat.gyroscope.gyroUncalibrated.v = [simulator_state.imu.gyroscope.x, simulator_state.imu.gyroscope.y, simulator_state.imu.gyroscope.z]
- self.pm.send('gyroscope', dat)
- def send_gps_message(self, simulator_state: 'SimulatorState'):
- if not simulator_state.valid:
- return
- # transform from vel to NED
- velNED = [
- -simulator_state.velocity.y,
- simulator_state.velocity.x,
- simulator_state.velocity.z,
- ]
- for _ in range(10):
- dat = messaging.new_message('gpsLocationExternal', valid=True)
- dat.gpsLocationExternal = {
- "unixTimestampMillis": int(time.time() * 1000),
- "flags": 1, # valid fix
- "horizontalAccuracy": 1.0,
- "verticalAccuracy": 1.0,
- "speedAccuracy": 0.1,
- "bearingAccuracyDeg": 0.1,
- "vNED": velNED,
- "bearingDeg": simulator_state.imu.bearing,
- "latitude": simulator_state.gps.latitude,
- "longitude": simulator_state.gps.longitude,
- "altitude": simulator_state.gps.altitude,
- "speed": simulator_state.speed,
- "source": log.GpsLocationData.SensorSource.ublox,
- }
- self.pm.send('gpsLocationExternal', dat)
- def send_peripheral_state(self):
- dat = messaging.new_message('peripheralState')
- dat.valid = True
- dat.peripheralState = {
- 'pandaType': log.PandaState.PandaType.blackPanda,
- 'voltage': 12000,
- 'current': 5678,
- 'fanSpeedRpm': 1000
- }
- self.pm.send('peripheralState', dat)
- def send_fake_driver_monitoring(self):
- # dmonitoringmodeld output
- dat = messaging.new_message('driverStateV2')
- dat.driverStateV2.leftDriverData.faceOrientation = [0., 0., 0.]
- dat.driverStateV2.leftDriverData.faceProb = 1.0
- dat.driverStateV2.rightDriverData.faceOrientation = [0., 0., 0.]
- dat.driverStateV2.rightDriverData.faceProb = 1.0
- self.pm.send('driverStateV2', dat)
- # dmonitoringd output
- dat = messaging.new_message('driverMonitoringState', valid=True)
- dat.driverMonitoringState = {
- "faceDetected": True,
- "isDistracted": False,
- "awarenessStatus": 1.,
- }
- self.pm.send('driverMonitoringState', dat)
- def send_camera_images(self, world: 'World'):
- world.image_lock.acquire()
- yuv = self.camerad.rgb_to_yuv(world.road_image)
- self.camerad.cam_send_yuv_road(yuv)
- if world.dual_camera:
- yuv = self.camerad.rgb_to_yuv(world.wide_road_image)
- self.camerad.cam_send_yuv_wide_road(yuv)
- def update(self, simulator_state: 'SimulatorState', world: 'World'):
- now = time.time()
- self.send_imu_message(simulator_state)
- self.send_gps_message(simulator_state)
- if (now - self.last_dmon_update) > DT_DMON/2:
- self.send_fake_driver_monitoring()
- self.last_dmon_update = now
- if (now - self.last_perp_update) > 0.25:
- self.send_peripheral_state()
- self.last_perp_update = now
|