test_fw_query_on_routes.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. #!/usr/bin/env python3
  2. # type: ignore
  3. from collections import defaultdict
  4. import argparse
  5. import os
  6. import traceback
  7. from tqdm import tqdm
  8. from tools.lib.logreader import LogReader
  9. from tools.lib.route import Route
  10. from selfdrive.car.interfaces import get_interface_attr
  11. from selfdrive.car.car_helpers import interface_names
  12. from selfdrive.car.fw_versions import match_fw_to_car
  13. NO_API = "NO_API" in os.environ
  14. VERSIONS = get_interface_attr('FW_VERSIONS', ignore_none=True)
  15. SUPPORTED_BRANDS = VERSIONS.keys()
  16. SUPPORTED_CARS = [brand for brand in SUPPORTED_BRANDS for brand in interface_names[brand]]
  17. UNKNOWN_BRAND = "unknown"
  18. try:
  19. from xx.pipeline.c.CarState import migration
  20. except ImportError:
  21. migration = {}
  22. if __name__ == "__main__":
  23. parser = argparse.ArgumentParser(description='Run FW fingerprint on Qlog of route or list of routes')
  24. parser.add_argument('route', help='Route or file with list of routes')
  25. parser.add_argument('--car', help='Force comparison fingerprint to known car')
  26. args = parser.parse_args()
  27. if os.path.exists(args.route):
  28. routes = list(open(args.route))
  29. else:
  30. routes = [args.route]
  31. mismatches = defaultdict(list)
  32. not_fingerprinted = 0
  33. solved_by_fuzzy = 0
  34. good_exact = 0
  35. wrong_fuzzy = 0
  36. good_fuzzy = 0
  37. dongles = []
  38. for route in tqdm(routes):
  39. route = route.rstrip()
  40. dongle_id, time = route.split('|')
  41. if dongle_id in dongles:
  42. continue
  43. if NO_API:
  44. qlog_path = f"cd:/{dongle_id}/{time}/0/qlog.bz2"
  45. else:
  46. route = Route(route)
  47. qlog_path = next((p for p in route.qlog_paths() if p is not None), None)
  48. if qlog_path is None:
  49. continue
  50. try:
  51. lr = LogReader(qlog_path)
  52. dongles.append(dongle_id)
  53. CP = None
  54. for msg in lr:
  55. if msg.which() == "pandaStates":
  56. if msg.pandaStates[0].pandaType in ('unknown', 'whitePanda', 'greyPanda', 'pedal'):
  57. print("wrong panda type")
  58. break
  59. elif msg.which() == "carParams":
  60. CP = msg.carParams
  61. car_fw = CP.carFw
  62. if len(car_fw) == 0:
  63. print("no fw")
  64. break
  65. live_fingerprint = CP.carFingerprint
  66. live_fingerprint = migration.get(live_fingerprint, live_fingerprint)
  67. if args.car is not None:
  68. live_fingerprint = args.car
  69. if live_fingerprint not in SUPPORTED_CARS:
  70. print("not in supported cars")
  71. break
  72. _, exact_matches = match_fw_to_car(car_fw, allow_exact=True, allow_fuzzy=False)
  73. _, fuzzy_matches = match_fw_to_car(car_fw, allow_exact=False, allow_fuzzy=True)
  74. if (len(exact_matches) == 1) and (list(exact_matches)[0] == live_fingerprint):
  75. good_exact += 1
  76. print(f"Correct! Live: {live_fingerprint} - Fuzzy: {fuzzy_matches}")
  77. # Check if fuzzy match was correct
  78. if len(fuzzy_matches) == 1:
  79. if list(fuzzy_matches)[0] != live_fingerprint:
  80. wrong_fuzzy += 1
  81. print(f"{dongle_id}|{time}")
  82. print("Fuzzy match wrong! Fuzzy:", fuzzy_matches, "Live:", live_fingerprint)
  83. else:
  84. good_fuzzy += 1
  85. break
  86. print(f"{dongle_id}|{time}")
  87. print("Old style:", live_fingerprint, "Vin", CP.carVin)
  88. print("New style (exact):", exact_matches)
  89. print("New style (fuzzy):", fuzzy_matches)
  90. padding = max([len(fw.brand or UNKNOWN_BRAND) for fw in car_fw])
  91. for version in sorted(car_fw, key=lambda fw: fw.brand):
  92. subaddr = None if version.subAddress == 0 else hex(version.subAddress)
  93. print(f" Brand: {version.brand or UNKNOWN_BRAND:{padding}}, bus: {version.bus} - (Ecu.{version.ecu}, {hex(version.address)}, {subaddr}): [{version.fwVersion}],")
  94. print("Mismatches")
  95. found = False
  96. for brand in SUPPORTED_BRANDS:
  97. car_fws = VERSIONS[brand]
  98. if live_fingerprint in car_fws:
  99. found = True
  100. expected = car_fws[live_fingerprint]
  101. for (_, expected_addr, expected_sub_addr), v in expected.items():
  102. for version in car_fw:
  103. if version.brand != brand and len(version.brand):
  104. continue
  105. sub_addr = None if version.subAddress == 0 else version.subAddress
  106. addr = version.address
  107. if (addr, sub_addr) == (expected_addr, expected_sub_addr):
  108. if version.fwVersion not in v:
  109. print(f"({hex(addr)}, {'None' if sub_addr is None else hex(sub_addr)}) - {version.fwVersion}")
  110. # Add to global list of mismatches
  111. mismatch = (addr, sub_addr, version.fwVersion)
  112. if mismatch not in mismatches[live_fingerprint]:
  113. mismatches[live_fingerprint].append(mismatch)
  114. # No FW versions for this car yet, add them all to mismatch list
  115. if not found:
  116. for version in car_fw:
  117. sub_addr = None if version.subAddress == 0 else version.subAddress
  118. addr = version.address
  119. mismatch = (addr, sub_addr, version.fwVersion)
  120. if mismatch not in mismatches[live_fingerprint]:
  121. mismatches[live_fingerprint].append(mismatch)
  122. print()
  123. not_fingerprinted += 1
  124. if len(fuzzy_matches) == 1:
  125. if list(fuzzy_matches)[0] == live_fingerprint:
  126. solved_by_fuzzy += 1
  127. else:
  128. wrong_fuzzy += 1
  129. print("Fuzzy match wrong! Fuzzy:", fuzzy_matches, "Live:", live_fingerprint)
  130. break
  131. if CP is None:
  132. print("no CarParams in logs")
  133. except Exception:
  134. traceback.print_exc()
  135. except KeyboardInterrupt:
  136. break
  137. print()
  138. # Print FW versions that need to be added separated out by car and address
  139. for car, m in sorted(mismatches.items()):
  140. print(car)
  141. addrs = defaultdict(list)
  142. for (addr, sub_addr, version) in m:
  143. addrs[(addr, sub_addr)].append(version)
  144. for (addr, sub_addr), versions in addrs.items():
  145. print(f" ({hex(addr)}, {'None' if sub_addr is None else hex(sub_addr)}): [")
  146. for v in versions:
  147. print(f" {v},")
  148. print(" ]")
  149. print()
  150. print()
  151. print(f"Number of dongle ids checked: {len(dongles)}")
  152. print(f"Fingerprinted: {good_exact}")
  153. print(f"Not fingerprinted: {not_fingerprinted}")
  154. print(f" of which had a fuzzy match: {solved_by_fuzzy}")
  155. print()
  156. print(f"Correct fuzzy matches: {good_fuzzy}")
  157. print(f"Wrong fuzzy matches: {wrong_fuzzy}")
  158. print()