azure_container.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. import os
  2. from datetime import datetime, timedelta, UTC
  3. from functools import lru_cache
  4. from pathlib import Path
  5. from typing import IO
  6. TOKEN_PATH = Path("/data/azure_token")
  7. @lru_cache
  8. def get_azure_credential():
  9. if "AZURE_TOKEN" in os.environ:
  10. return os.environ["AZURE_TOKEN"]
  11. elif TOKEN_PATH.is_file():
  12. return TOKEN_PATH.read_text().strip()
  13. else:
  14. from azure.identity import AzureCliCredential
  15. return AzureCliCredential()
  16. @lru_cache
  17. def get_container_sas(account_name: str, container_name: str):
  18. from azure.storage.blob import BlobServiceClient, ContainerSasPermissions, generate_container_sas
  19. start_time = datetime.now(UTC).replace(tzinfo=None)
  20. expiry_time = start_time + timedelta(hours=1)
  21. blob_service = BlobServiceClient(
  22. account_url=f"https://{account_name}.blob.core.windows.net",
  23. credential=get_azure_credential(),
  24. )
  25. return generate_container_sas(
  26. account_name,
  27. container_name,
  28. user_delegation_key=blob_service.get_user_delegation_key(start_time, expiry_time),
  29. permission=ContainerSasPermissions(read=True, write=True, list=True),
  30. expiry=expiry_time,
  31. )
  32. class AzureContainer:
  33. def __init__(self, account, container):
  34. self.ACCOUNT = account
  35. self.CONTAINER = container
  36. @property
  37. def ACCOUNT_URL(self) -> str:
  38. return f"https://{self.ACCOUNT}.blob.core.windows.net"
  39. @property
  40. def BASE_URL(self) -> str:
  41. return f"{self.ACCOUNT_URL}/{self.CONTAINER}/"
  42. def get_client_and_key(self):
  43. from azure.storage.blob import ContainerClient
  44. client = ContainerClient(self.ACCOUNT_URL, self.CONTAINER, credential=get_azure_credential())
  45. key = get_container_sas(self.ACCOUNT, self.CONTAINER)
  46. return client, key
  47. def get_url(self, route_name: str, segment_num: str, filename: str) -> str:
  48. return self.BASE_URL + f"{route_name.replace('|', '/')}/{segment_num}/{filename}"
  49. def upload_bytes(self, data: bytes | IO, blob_name: str, overwrite=False) -> str:
  50. from azure.storage.blob import BlobClient
  51. blob = BlobClient(
  52. account_url=self.ACCOUNT_URL,
  53. container_name=self.CONTAINER,
  54. blob_name=blob_name,
  55. credential=get_azure_credential(),
  56. overwrite=overwrite,
  57. )
  58. blob.upload_blob(data, overwrite=overwrite)
  59. return self.BASE_URL + blob_name
  60. def upload_file(self, path: str | os.PathLike, blob_name: str, overwrite=False) -> str:
  61. with open(path, "rb") as f:
  62. return self.upload_bytes(f, blob_name, overwrite)