simulated_sensors.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. import time
  2. from cereal import log
  3. import cereal.messaging as messaging
  4. from openpilot.common.realtime import DT_DMON
  5. from openpilot.tools.sim.lib.camerad import Camerad
  6. from typing import TYPE_CHECKING
  7. if TYPE_CHECKING:
  8. from openpilot.tools.sim.lib.common import World, SimulatorState
  9. class SimulatedSensors:
  10. """Simulates the C3 sensors (acc, gyro, gps, peripherals, dm state, cameras) to OpenPilot"""
  11. def __init__(self, dual_camera=False):
  12. self.pm = messaging.PubMaster(['accelerometer', 'gyroscope', 'gpsLocationExternal', 'driverStateV2', 'driverMonitoringState', 'peripheralState'])
  13. self.camerad = Camerad(dual_camera=dual_camera)
  14. self.last_perp_update = 0
  15. self.last_dmon_update = 0
  16. def send_imu_message(self, simulator_state: 'SimulatorState'):
  17. for _ in range(5):
  18. dat = messaging.new_message('accelerometer', valid=True)
  19. dat.accelerometer.sensor = 4
  20. dat.accelerometer.type = 0x10
  21. dat.accelerometer.timestamp = dat.logMonoTime # TODO: use the IMU timestamp
  22. dat.accelerometer.init('acceleration')
  23. dat.accelerometer.acceleration.v = [simulator_state.imu.accelerometer.x, simulator_state.imu.accelerometer.y, simulator_state.imu.accelerometer.z]
  24. self.pm.send('accelerometer', dat)
  25. # copied these numbers from locationd
  26. dat = messaging.new_message('gyroscope', valid=True)
  27. dat.gyroscope.sensor = 5
  28. dat.gyroscope.type = 0x10
  29. dat.gyroscope.timestamp = dat.logMonoTime # TODO: use the IMU timestamp
  30. dat.gyroscope.init('gyroUncalibrated')
  31. dat.gyroscope.gyroUncalibrated.v = [simulator_state.imu.gyroscope.x, simulator_state.imu.gyroscope.y, simulator_state.imu.gyroscope.z]
  32. self.pm.send('gyroscope', dat)
  33. def send_gps_message(self, simulator_state: 'SimulatorState'):
  34. if not simulator_state.valid:
  35. return
  36. # transform from vel to NED
  37. velNED = [
  38. -simulator_state.velocity.y,
  39. simulator_state.velocity.x,
  40. simulator_state.velocity.z,
  41. ]
  42. for _ in range(10):
  43. dat = messaging.new_message('gpsLocationExternal', valid=True)
  44. dat.gpsLocationExternal = {
  45. "unixTimestampMillis": int(time.time() * 1000),
  46. "flags": 1, # valid fix
  47. "horizontalAccuracy": 1.0,
  48. "verticalAccuracy": 1.0,
  49. "speedAccuracy": 0.1,
  50. "bearingAccuracyDeg": 0.1,
  51. "vNED": velNED,
  52. "bearingDeg": simulator_state.imu.bearing,
  53. "latitude": simulator_state.gps.latitude,
  54. "longitude": simulator_state.gps.longitude,
  55. "altitude": simulator_state.gps.altitude,
  56. "speed": simulator_state.speed,
  57. "source": log.GpsLocationData.SensorSource.ublox,
  58. }
  59. self.pm.send('gpsLocationExternal', dat)
  60. def send_peripheral_state(self):
  61. dat = messaging.new_message('peripheralState')
  62. dat.valid = True
  63. dat.peripheralState = {
  64. 'pandaType': log.PandaState.PandaType.blackPanda,
  65. 'voltage': 12000,
  66. 'current': 5678,
  67. 'fanSpeedRpm': 1000
  68. }
  69. self.pm.send('peripheralState', dat)
  70. def send_fake_driver_monitoring(self):
  71. # dmonitoringmodeld output
  72. dat = messaging.new_message('driverStateV2')
  73. dat.driverStateV2.leftDriverData.faceOrientation = [0., 0., 0.]
  74. dat.driverStateV2.leftDriverData.faceProb = 1.0
  75. dat.driverStateV2.rightDriverData.faceOrientation = [0., 0., 0.]
  76. dat.driverStateV2.rightDriverData.faceProb = 1.0
  77. self.pm.send('driverStateV2', dat)
  78. # dmonitoringd output
  79. dat = messaging.new_message('driverMonitoringState', valid=True)
  80. dat.driverMonitoringState = {
  81. "faceDetected": True,
  82. "isDistracted": False,
  83. "awarenessStatus": 1.,
  84. }
  85. self.pm.send('driverMonitoringState', dat)
  86. def send_camera_images(self, world: 'World'):
  87. world.image_lock.acquire()
  88. yuv = self.camerad.rgb_to_yuv(world.road_image)
  89. self.camerad.cam_send_yuv_road(yuv)
  90. if world.dual_camera:
  91. yuv = self.camerad.rgb_to_yuv(world.wide_road_image)
  92. self.camerad.cam_send_yuv_wide_road(yuv)
  93. def update(self, simulator_state: 'SimulatorState', world: 'World'):
  94. now = time.time()
  95. self.send_imu_message(simulator_state)
  96. self.send_gps_message(simulator_state)
  97. if (now - self.last_dmon_update) > DT_DMON/2:
  98. self.send_fake_driver_monitoring()
  99. self.last_dmon_update = now
  100. if (now - self.last_perp_update) > 0.25:
  101. self.send_peripheral_state()
  102. self.last_perp_update = now