measure_steering_accuracy.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. #!/usr/bin/env python3
  2. # type: ignore
  3. import os
  4. import time
  5. import argparse
  6. import signal
  7. from collections import defaultdict
  8. import cereal.messaging as messaging
  9. from openpilot.tools.lib.logreader import LogReader
  10. def sigint_handler(signal, frame):
  11. exit(0)
  12. signal.signal(signal.SIGINT, sigint_handler)
  13. class SteeringAccuracyTool:
  14. all_groups = {"germany": (45, "45 - up m/s // 162 - up km/h // 101 - up mph"),
  15. "veryfast": (35, "35 - 45 m/s // 126 - 162 km/h // 78 - 101 mph"),
  16. "fast": (25, "25 - 35 m/s // 90 - 126 km/h // 56 - 78 mph"),
  17. "medium": (15, "15 - 25 m/s // 54 - 90 km/h // 34 - 56 mph"),
  18. "slow": (5, " 5 - 15 m/s // 18 - 54 km/h // 11 - 34 mph"),
  19. "crawl": (0, " 0 - 5 m/s // 0 - 18 km/h // 0 - 11 mph")}
  20. def __init__(self, args):
  21. self.msg_cnt = 0
  22. self.cnt = 0
  23. self.total_error = 0
  24. if args.group == "all":
  25. self.display_groups = self.all_groups.keys()
  26. elif args.group in self.all_groups.keys():
  27. self.display_groups = [args.group]
  28. else:
  29. raise ValueError("invalid speed group, see help")
  30. self.speed_group_stats = {}
  31. for group in self.all_groups:
  32. self.speed_group_stats[group] = defaultdict(lambda: {'err': 0, "cnt": 0, "=": 0, "+": 0, "-": 0, "steer": 0, "limited": 0, "saturated": 0, "dpp": 0})
  33. def update(self, sm):
  34. self.msg_cnt += 1
  35. lateralControlState = sm['controlsState'].lateralControlState
  36. control_type = list(lateralControlState.to_dict().keys())[0]
  37. control_state = lateralControlState.__getattr__(control_type)
  38. v_ego = sm['carState'].vEgo
  39. active = sm['controlsState'].active
  40. steer = sm['carOutput'].actuatorsOutput.steer
  41. standstill = sm['carState'].standstill
  42. steer_limited = abs(sm['carControl'].actuators.steer - sm['carControl'].actuatorsOutput.steer) > 1e-2
  43. overriding = sm['carState'].steeringPressed
  44. changing_lanes = sm['modelV2'].meta.laneChangeState != 0
  45. model_points = sm['modelV2'].position.y
  46. # must be engaged, not at standstill, not overriding steering, and not changing lanes
  47. if active and not standstill and not overriding and not changing_lanes:
  48. self.cnt += 1
  49. # wait 5 seconds after engage / standstill / override / lane change
  50. if self.cnt >= 500:
  51. actual_angle = control_state.steeringAngleDeg
  52. desired_angle = control_state.steeringAngleDesiredDeg
  53. # calculate error before rounding, then round for stats grouping
  54. angle_error = abs(desired_angle - actual_angle)
  55. actual_angle = round(actual_angle, 1)
  56. desired_angle = round(desired_angle, 1)
  57. angle_error = round(angle_error, 2)
  58. angle_abs = int(abs(round(desired_angle, 0)))
  59. for group, group_props in self.all_groups.items():
  60. if v_ego > group_props[0]:
  61. # collect stats
  62. self.speed_group_stats[group][angle_abs]["cnt"] += 1
  63. self.speed_group_stats[group][angle_abs]["err"] += angle_error
  64. self.speed_group_stats[group][angle_abs]["steer"] += abs(steer)
  65. if len(model_points):
  66. self.speed_group_stats[group][angle_abs]["dpp"] += abs(model_points[0])
  67. if steer_limited:
  68. self.speed_group_stats[group][angle_abs]["limited"] += 1
  69. if control_state.saturated:
  70. self.speed_group_stats[group][angle_abs]["saturated"] += 1
  71. if actual_angle == desired_angle:
  72. self.speed_group_stats[group][angle_abs]["="] += 1
  73. else:
  74. if desired_angle == 0.:
  75. overshoot = True
  76. else:
  77. overshoot = desired_angle < actual_angle if desired_angle > 0. else desired_angle > actual_angle
  78. self.speed_group_stats[group][angle_abs]["+" if overshoot else "-"] += 1
  79. break
  80. else:
  81. self.cnt = 0
  82. if self.msg_cnt % 100 == 0:
  83. print(chr(27) + "[2J")
  84. if self.cnt != 0:
  85. print("COLLECTING ...\n")
  86. else:
  87. print("DISABLED (not active, standstill, steering override, or lane change)\n")
  88. for group in self.display_groups:
  89. if len(self.speed_group_stats[group]) > 0:
  90. print(f"speed group: {group:10s} {self.all_groups[group][1]:>96s}")
  91. print(f" {'-'*118}")
  92. for k in sorted(self.speed_group_stats[group].keys()):
  93. v = self.speed_group_stats[group][k]
  94. print(f' {k:#2}° | actuator:{int(v["steer"] / v["cnt"] * 100):#3}% ' +
  95. f'| error: {round(v["err"] / v["cnt"], 2):2.2f}° | -:{int(v["-"] / v["cnt"] * 100):#3}% ' +
  96. f'| =:{int(v["="] / v["cnt"] * 100):#3}% | +:{int(v["+"] / v["cnt"] * 100):#3}% | lim:{v["limited"]:#5} ' +
  97. f'| sat:{v["saturated"]:#5} | path dev: {round(v["dpp"] / v["cnt"], 2):2.2f}m | total: {v["cnt"]:#5}')
  98. print("")
  99. if __name__ == "__main__":
  100. parser = argparse.ArgumentParser(description='Steering accuracy measurement tool')
  101. parser.add_argument('--route', help="route name")
  102. parser.add_argument('--addr', default='127.0.0.1', help="IP address for optional ZMQ listener, default to msgq")
  103. parser.add_argument('--group', default='all', help="speed group to display, [crawl|slow|medium|fast|veryfast|germany|all], default to all")
  104. parser.add_argument('--cache', default=False, action='store_true', help="use cached data, default to False")
  105. args = parser.parse_args()
  106. if args.cache:
  107. os.environ['FILEREADER_CACHE'] = '1'
  108. tool = SteeringAccuracyTool(args)
  109. if args.route is not None:
  110. print(f"loading {args.route}...")
  111. lr = LogReader(args.route, sort_by_time=True)
  112. sm = {}
  113. for msg in lr:
  114. if msg.which() == 'carState':
  115. sm['carState'] = msg.carState
  116. elif msg.which() == 'carControl':
  117. sm['carControl'] = msg.carControl
  118. elif msg.which() == 'controlsState':
  119. sm['controlsState'] = msg.controlsState
  120. elif msg.which() == 'modelV2':
  121. sm['modelV2'] = msg.modelV2
  122. if msg.which() == 'carControl' and 'carState' in sm and 'controlsState' in sm and 'modelV2' in sm:
  123. tool.update(sm)
  124. else:
  125. if args.addr != "127.0.0.1":
  126. os.environ["ZMQ"] = "1"
  127. messaging.reset_context()
  128. carControl = messaging.sub_sock('carControl', addr=args.addr, conflate=True)
  129. sm = messaging.SubMaster(['carState', 'carControl', 'carOutput', 'controlsState', 'modelV2'], addr=args.addr)
  130. time.sleep(1) # Make sure all submaster data is available before going further
  131. print("waiting for messages...")
  132. while messaging.recv_one(carControl):
  133. sm.update()
  134. tool.update(sm)