url_file.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. import logging
  2. import os
  3. import socket
  4. import time
  5. from hashlib import sha256
  6. from urllib3 import PoolManager, Retry
  7. from urllib3.response import BaseHTTPResponse
  8. from urllib3.util import Timeout
  9. from openpilot.common.file_helpers import atomic_write_in_dir
  10. from openpilot.system.hardware.hw import Paths
  11. # Cache chunk size
  12. K = 1000
  13. CHUNK_SIZE = 1000 * K
  14. logging.getLogger("urllib3").setLevel(logging.WARNING)
  15. def hash_256(link: str) -> str:
  16. hsh = str(sha256((link.split("?")[0]).encode('utf-8')).hexdigest())
  17. return hsh
  18. class URLFileException(Exception):
  19. pass
  20. class URLFile:
  21. _pool_manager: PoolManager|None = None
  22. @staticmethod
  23. def reset() -> None:
  24. URLFile._pool_manager = None
  25. @staticmethod
  26. def pool_manager() -> PoolManager:
  27. if URLFile._pool_manager is None:
  28. socket_options = [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),]
  29. retries = Retry(total=5, backoff_factor=0.5, status_forcelist=[409, 429, 503, 504])
  30. URLFile._pool_manager = PoolManager(num_pools=10, maxsize=100, socket_options=socket_options, retries=retries)
  31. return URLFile._pool_manager
  32. def __init__(self, url: str, timeout: int=10, debug: bool=False, cache: bool|None=None):
  33. self._url = url
  34. self._timeout = Timeout(connect=timeout, read=timeout)
  35. self._pos = 0
  36. self._length: int|None = None
  37. self._debug = debug
  38. # True by default, false if FILEREADER_CACHE is defined, but can be overwritten by the cache input
  39. self._force_download = not int(os.environ.get("FILEREADER_CACHE", "0"))
  40. if cache is not None:
  41. self._force_download = not cache
  42. if not self._force_download:
  43. os.makedirs(Paths.download_cache_root(), exist_ok=True)
  44. def __enter__(self):
  45. return self
  46. def __exit__(self, exc_type, exc_value, traceback) -> None:
  47. pass
  48. def _request(self, method: str, url: str, headers: dict[str, str]|None=None) -> BaseHTTPResponse:
  49. return URLFile.pool_manager().request(method, url, timeout=self._timeout, headers=headers)
  50. def get_length_online(self) -> int:
  51. response = self._request('HEAD', self._url)
  52. if not (200 <= response.status <= 299):
  53. return -1
  54. length = response.headers.get('content-length', 0)
  55. return int(length)
  56. def get_length(self) -> int:
  57. if self._length is not None:
  58. return self._length
  59. file_length_path = os.path.join(Paths.download_cache_root(), hash_256(self._url) + "_length")
  60. if not self._force_download and os.path.exists(file_length_path):
  61. with open(file_length_path) as file_length:
  62. content = file_length.read()
  63. self._length = int(content)
  64. return self._length
  65. self._length = self.get_length_online()
  66. if not self._force_download and self._length != -1:
  67. with atomic_write_in_dir(file_length_path, mode="w") as file_length:
  68. file_length.write(str(self._length))
  69. return self._length
  70. def read(self, ll: int|None=None) -> bytes:
  71. if self._force_download:
  72. return self.read_aux(ll=ll)
  73. file_begin = self._pos
  74. file_end = self._pos + ll if ll is not None else self.get_length()
  75. assert file_end != -1, f"Remote file is empty or doesn't exist: {self._url}"
  76. # We have to align with chunks we store. Position is the begginiing of the latest chunk that starts before or at our file
  77. position = (file_begin // CHUNK_SIZE) * CHUNK_SIZE
  78. response = b""
  79. while True:
  80. self._pos = position
  81. chunk_number = self._pos / CHUNK_SIZE
  82. file_name = hash_256(self._url) + "_" + str(chunk_number)
  83. full_path = os.path.join(Paths.download_cache_root(), str(file_name))
  84. data = None
  85. # If we don't have a file, download it
  86. if not os.path.exists(full_path):
  87. data = self.read_aux(ll=CHUNK_SIZE)
  88. with atomic_write_in_dir(full_path, mode="wb") as new_cached_file:
  89. new_cached_file.write(data)
  90. else:
  91. with open(full_path, "rb") as cached_file:
  92. data = cached_file.read()
  93. response += data[max(0, file_begin - position): min(CHUNK_SIZE, file_end - position)]
  94. position += CHUNK_SIZE
  95. if position >= file_end:
  96. self._pos = file_end
  97. return response
  98. def read_aux(self, ll: int|None=None) -> bytes:
  99. download_range = False
  100. headers = {}
  101. if self._pos != 0 or ll is not None:
  102. if ll is None:
  103. end = self.get_length() - 1
  104. else:
  105. end = min(self._pos + ll, self.get_length()) - 1
  106. if self._pos >= end:
  107. return b""
  108. headers['Range'] = f"bytes={self._pos}-{end}"
  109. download_range = True
  110. if self._debug:
  111. t1 = time.time()
  112. response = self._request('GET', self._url, headers=headers)
  113. ret = response.data
  114. if self._debug:
  115. t2 = time.time()
  116. if t2 - t1 > 0.1:
  117. print(f"get {self._url} {headers!r} {t2 - t1:.3f} slow")
  118. response_code = response.status
  119. if response_code == 416: # Requested Range Not Satisfiable
  120. raise URLFileException(f"Error, range out of bounds {response_code} {headers} ({self._url}): {repr(ret)[:500]}")
  121. if download_range and response_code != 206: # Partial Content
  122. raise URLFileException(f"Error, requested range but got unexpected response {response_code} {headers} ({self._url}): {repr(ret)[:500]}")
  123. if (not download_range) and response_code != 200: # OK
  124. raise URLFileException(f"Error {response_code} {headers} ({self._url}): {repr(ret)[:500]}")
  125. self._pos += len(ret)
  126. return ret
  127. def seek(self, pos:int) -> None:
  128. self._pos = pos
  129. @property
  130. def name(self) -> str:
  131. return self._url
  132. os.register_at_fork(after_in_child=URLFile.reset)