isotp_parallel_query.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. import time
  2. from collections import defaultdict
  3. from functools import partial
  4. import cereal.messaging as messaging
  5. from openpilot.common.swaglog import cloudlog
  6. from openpilot.selfdrive.boardd.boardd import can_list_to_can_capnp
  7. from openpilot.selfdrive.car.fw_query_definitions import AddrType
  8. from panda.python.uds import CanClient, IsoTpMessage, FUNCTIONAL_ADDRS, get_rx_addr_for_tx_addr
  9. class IsoTpParallelQuery:
  10. def __init__(self, sendcan: messaging.PubSocket, logcan: messaging.SubSocket, bus: int, addrs: list[int] | list[AddrType],
  11. request: list[bytes], response: list[bytes], response_offset: int = 0x8,
  12. functional_addrs: list[int] = None, debug: bool = False, response_pending_timeout: float = 10) -> None:
  13. self.sendcan = sendcan
  14. self.logcan = logcan
  15. self.bus = bus
  16. self.request = request
  17. self.response = response
  18. self.functional_addrs = functional_addrs or []
  19. self.debug = debug
  20. self.response_pending_timeout = response_pending_timeout
  21. real_addrs = [a if isinstance(a, tuple) else (a, None) for a in addrs]
  22. for tx_addr, _ in real_addrs:
  23. assert tx_addr not in FUNCTIONAL_ADDRS, f"Functional address should be defined in functional_addrs: {hex(tx_addr)}"
  24. self.msg_addrs = {tx_addr: get_rx_addr_for_tx_addr(tx_addr[0], rx_offset=response_offset) for tx_addr in real_addrs}
  25. self.msg_buffer: dict[int, list[tuple[int, int, bytes, int]]] = defaultdict(list)
  26. def rx(self):
  27. """Drain can socket and sort messages into buffers based on address"""
  28. can_packets = messaging.drain_sock(self.logcan, wait_for_one=True)
  29. for packet in can_packets:
  30. for msg in packet.can:
  31. if msg.src == self.bus and msg.address in self.msg_addrs.values():
  32. self.msg_buffer[msg.address].append((msg.address, msg.busTime, msg.dat, msg.src))
  33. def _can_tx(self, tx_addr, dat, bus):
  34. """Helper function to send single message"""
  35. msg = [tx_addr, 0, dat, bus]
  36. self.sendcan.send(can_list_to_can_capnp([msg], msgtype='sendcan'))
  37. def _can_rx(self, addr, sub_addr=None):
  38. """Helper function to retrieve message with specified address and subadress from buffer"""
  39. keep_msgs = []
  40. if sub_addr is None:
  41. msgs = self.msg_buffer[addr]
  42. else:
  43. # Filter based on subadress
  44. msgs = []
  45. for m in self.msg_buffer[addr]:
  46. first_byte = m[2][0]
  47. if first_byte == sub_addr:
  48. msgs.append(m)
  49. else:
  50. keep_msgs.append(m)
  51. self.msg_buffer[addr] = keep_msgs
  52. return msgs
  53. def _drain_rx(self):
  54. messaging.drain_sock_raw(self.logcan)
  55. self.msg_buffer = defaultdict(list)
  56. def _create_isotp_msg(self, tx_addr: int, sub_addr: int | None, rx_addr: int):
  57. can_client = CanClient(self._can_tx, partial(self._can_rx, rx_addr, sub_addr=sub_addr), tx_addr, rx_addr,
  58. self.bus, sub_addr=sub_addr, debug=self.debug)
  59. max_len = 8 if sub_addr is None else 7
  60. # uses iso-tp frame separation time of 10 ms
  61. # TODO: use single_frame_mode so ECUs can send as fast as they want,
  62. # as well as reduces chances we process messages from previous queries
  63. return IsoTpMessage(can_client, timeout=0, separation_time=0.01, debug=self.debug, max_len=max_len)
  64. def get_data(self, timeout: float, total_timeout: float = 60.) -> dict[AddrType, bytes]:
  65. self._drain_rx()
  66. # Create message objects
  67. msgs = {}
  68. request_counter = {}
  69. request_done = {}
  70. for tx_addr, rx_addr in self.msg_addrs.items():
  71. msgs[tx_addr] = self._create_isotp_msg(*tx_addr, rx_addr)
  72. request_counter[tx_addr] = 0
  73. request_done[tx_addr] = False
  74. # Send first request to functional addrs, subsequent responses are handled on physical addrs
  75. if len(self.functional_addrs):
  76. for addr in self.functional_addrs:
  77. self._create_isotp_msg(addr, None, -1).send(self.request[0])
  78. # Send first frame (single or first) to all addresses and receive asynchronously in the loop below.
  79. # If querying functional addrs, only set up physical IsoTpMessages to send consecutive frames
  80. for msg in msgs.values():
  81. msg.send(self.request[0], setup_only=len(self.functional_addrs) > 0)
  82. results = {}
  83. start_time = time.monotonic()
  84. addrs_responded = set() # track addresses that have ever sent a valid iso-tp frame for timeout logging
  85. response_timeouts = {tx_addr: start_time + timeout for tx_addr in self.msg_addrs}
  86. while True:
  87. self.rx()
  88. for tx_addr, msg in msgs.items():
  89. try:
  90. dat, rx_in_progress = msg.recv()
  91. except Exception:
  92. cloudlog.exception(f"Error processing UDS response: {tx_addr}")
  93. request_done[tx_addr] = True
  94. continue
  95. # Extend timeout for each consecutive ISO-TP frame to avoid timing out on long responses
  96. if rx_in_progress:
  97. addrs_responded.add(tx_addr)
  98. response_timeouts[tx_addr] = time.monotonic() + timeout
  99. if dat is None:
  100. continue
  101. # Log unexpected empty responses
  102. if len(dat) == 0:
  103. cloudlog.error(f"iso-tp query empty response: {tx_addr}")
  104. request_done[tx_addr] = True
  105. continue
  106. counter = request_counter[tx_addr]
  107. expected_response = self.response[counter]
  108. response_valid = dat.startswith(expected_response)
  109. if response_valid:
  110. if counter + 1 < len(self.request):
  111. response_timeouts[tx_addr] = time.monotonic() + timeout
  112. msg.send(self.request[counter + 1])
  113. request_counter[tx_addr] += 1
  114. else:
  115. results[tx_addr] = dat[len(expected_response):]
  116. request_done[tx_addr] = True
  117. else:
  118. error_code = dat[2] if len(dat) > 2 else -1
  119. if error_code == 0x78:
  120. response_timeouts[tx_addr] = time.monotonic() + self.response_pending_timeout
  121. cloudlog.error(f"iso-tp query response pending: {tx_addr}")
  122. else:
  123. request_done[tx_addr] = True
  124. cloudlog.error(f"iso-tp query bad response: {tx_addr} - 0x{dat.hex()}")
  125. # Mark request done if address timed out
  126. cur_time = time.monotonic()
  127. for tx_addr in response_timeouts:
  128. if cur_time - response_timeouts[tx_addr] > 0:
  129. if not request_done[tx_addr]:
  130. if request_counter[tx_addr] > 0:
  131. cloudlog.error(f"iso-tp query timeout after receiving partial response: {tx_addr}")
  132. elif tx_addr in addrs_responded:
  133. cloudlog.error(f"iso-tp query timeout while receiving response: {tx_addr}")
  134. # TODO: handle functional addresses
  135. # else:
  136. # cloudlog.error(f"iso-tp query timeout with no response: {tx_addr}")
  137. request_done[tx_addr] = True
  138. # Break if all requests are done (finished or timed out)
  139. if all(request_done.values()):
  140. break
  141. if cur_time - start_time > total_timeout:
  142. cloudlog.error("iso-tp query timeout while receiving data")
  143. break
  144. return results