azure_container.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. import os
  2. from datetime import datetime, timedelta
  3. from functools import lru_cache
  4. from pathlib import Path
  5. from typing import IO
  6. TOKEN_PATH = Path("/data/azure_token")
  7. @lru_cache
  8. def get_azure_credential():
  9. if "AZURE_TOKEN" in os.environ:
  10. return os.environ["AZURE_TOKEN"]
  11. elif TOKEN_PATH.is_file():
  12. return TOKEN_PATH.read_text().strip()
  13. else:
  14. from azure.identity import AzureCliCredential
  15. return AzureCliCredential()
  16. @lru_cache
  17. def get_container_sas(account_name: str, container_name: str):
  18. from azure.storage.blob import BlobServiceClient, ContainerSasPermissions, generate_container_sas
  19. start_time = datetime.utcnow()
  20. expiry_time = start_time + timedelta(hours=1)
  21. blob_service = BlobServiceClient(
  22. account_url=f"https://{account_name}.blob.core.windows.net",
  23. credential=get_azure_credential(),
  24. )
  25. return generate_container_sas(
  26. account_name,
  27. container_name,
  28. user_delegation_key=blob_service.get_user_delegation_key(start_time, expiry_time),
  29. permission=ContainerSasPermissions(read=True, write=True, list=True),
  30. expiry=expiry_time,
  31. )
  32. class AzureContainer:
  33. def __init__(self, account, container):
  34. self.ACCOUNT = account
  35. self.CONTAINER = container
  36. @property
  37. def ACCOUNT_URL(self) -> str:
  38. return f"https://{self.ACCOUNT}.blob.core.windows.net"
  39. @property
  40. def BASE_URL(self) -> str:
  41. return f"{self.ACCOUNT_URL}/{self.CONTAINER}/"
  42. def get_client_and_key(self):
  43. from azure.storage.blob import ContainerClient
  44. client = ContainerClient(self.ACCOUNT_URL, self.CONTAINER, credential=get_azure_credential())
  45. key = get_container_sas(self.ACCOUNT, self.CONTAINER)
  46. return client, key
  47. def get_url(self, route_name: str, segment_num, log_type="rlog") -> str:
  48. ext = "hevc" if log_type.endswith('camera') else "bz2"
  49. return self.BASE_URL + f"{route_name.replace('|', '/')}/{segment_num}/{log_type}.{ext}"
  50. def upload_bytes(self, data: bytes | IO, blob_name: str) -> str:
  51. from azure.storage.blob import BlobClient
  52. blob = BlobClient(
  53. account_url=self.ACCOUNT_URL,
  54. container_name=self.CONTAINER,
  55. blob_name=blob_name,
  56. credential=get_azure_credential(),
  57. overwrite=False,
  58. )
  59. blob.upload_blob(data)
  60. return self.BASE_URL + blob_name
  61. def upload_file(self, path: str | os.PathLike, blob_name: str) -> str:
  62. with open(path, "rb") as f:
  63. return self.upload_bytes(f, blob_name)