ecu_addrs.py 3.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. #!/usr/bin/env python3
  2. import capnp
  3. import time
  4. import cereal.messaging as messaging
  5. from panda.python.uds import SERVICE_TYPE
  6. from openpilot.selfdrive.car import make_can_msg
  7. from openpilot.selfdrive.car.fw_query_definitions import EcuAddrBusType
  8. from openpilot.selfdrive.boardd.boardd import can_list_to_can_capnp
  9. from openpilot.common.swaglog import cloudlog
  10. def make_tester_present_msg(addr, bus, subaddr=None):
  11. dat = [0x02, SERVICE_TYPE.TESTER_PRESENT, 0x0]
  12. if subaddr is not None:
  13. dat.insert(0, subaddr)
  14. dat.extend([0x0] * (8 - len(dat)))
  15. return make_can_msg(addr, bytes(dat), bus)
  16. def is_tester_present_response(msg: capnp.lib.capnp._DynamicStructReader, subaddr: int = None) -> bool:
  17. # ISO-TP messages are always padded to 8 bytes
  18. # tester present response is always a single frame
  19. dat_offset = 1 if subaddr is not None else 0
  20. if len(msg.dat) == 8 and 1 <= msg.dat[dat_offset] <= 7:
  21. # success response
  22. if msg.dat[dat_offset + 1] == (SERVICE_TYPE.TESTER_PRESENT + 0x40):
  23. return True
  24. # error response
  25. if msg.dat[dat_offset + 1] == 0x7F and msg.dat[dat_offset + 2] == SERVICE_TYPE.TESTER_PRESENT:
  26. return True
  27. return False
  28. def get_all_ecu_addrs(logcan: messaging.SubSocket, sendcan: messaging.PubSocket, bus: int, timeout: float = 1, debug: bool = True) -> set[EcuAddrBusType]:
  29. addr_list = [0x700 + i for i in range(256)] + [0x18da00f1 + (i << 8) for i in range(256)]
  30. queries: set[EcuAddrBusType] = {(addr, None, bus) for addr in addr_list}
  31. responses = queries
  32. return get_ecu_addrs(logcan, sendcan, queries, responses, timeout=timeout, debug=debug)
  33. def get_ecu_addrs(logcan: messaging.SubSocket, sendcan: messaging.PubSocket, queries: set[EcuAddrBusType],
  34. responses: set[EcuAddrBusType], timeout: float = 1, debug: bool = False) -> set[EcuAddrBusType]:
  35. ecu_responses: set[EcuAddrBusType] = set() # set((addr, subaddr, bus),)
  36. try:
  37. msgs = [make_tester_present_msg(addr, bus, subaddr) for addr, subaddr, bus in queries]
  38. messaging.drain_sock_raw(logcan)
  39. sendcan.send(can_list_to_can_capnp(msgs, msgtype='sendcan'))
  40. start_time = time.monotonic()
  41. while time.monotonic() - start_time < timeout:
  42. can_packets = messaging.drain_sock(logcan, wait_for_one=True)
  43. for packet in can_packets:
  44. for msg in packet.can:
  45. if not len(msg.dat):
  46. cloudlog.warning("ECU addr scan: skipping empty remote frame")
  47. continue
  48. subaddr = None if (msg.address, None, msg.src) in responses else msg.dat[0]
  49. if (msg.address, subaddr, msg.src) in responses and is_tester_present_response(msg, subaddr):
  50. if debug:
  51. print(f"CAN-RX: {hex(msg.address)} - 0x{bytes.hex(msg.dat)}")
  52. if (msg.address, subaddr, msg.src) in ecu_responses:
  53. print(f"Duplicate ECU address: {hex(msg.address)}")
  54. ecu_responses.add((msg.address, subaddr, msg.src))
  55. except Exception:
  56. cloudlog.exception("ECU addr scan exception")
  57. return ecu_responses
  58. if __name__ == "__main__":
  59. import argparse
  60. parser = argparse.ArgumentParser(description='Get addresses of all ECUs')
  61. parser.add_argument('--debug', action='store_true')
  62. parser.add_argument('--bus', type=int, default=1)
  63. parser.add_argument('--timeout', type=float, default=1.0)
  64. args = parser.parse_args()
  65. logcan = messaging.sub_sock('can')
  66. sendcan = messaging.pub_sock('sendcan')
  67. time.sleep(1.0)
  68. print("Getting ECU addresses ...")
  69. ecu_addrs = get_all_ecu_addrs(logcan, sendcan, args.bus, args.timeout, debug=args.debug)
  70. print()
  71. print("Found ECUs on addresses:")
  72. for addr, subaddr, _ in ecu_addrs:
  73. msg = f" 0x{hex(addr)}"
  74. if subaddr is not None:
  75. msg += f" (sub-address: {hex(subaddr)})"
  76. print(msg)