1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 |
- import os
- from datetime import datetime, timedelta
- from functools import lru_cache
- from pathlib import Path
- from typing import IO
- TOKEN_PATH = Path("/data/azure_token")
- @lru_cache
- def get_azure_credential():
- if "AZURE_TOKEN" in os.environ:
- return os.environ["AZURE_TOKEN"]
- elif TOKEN_PATH.is_file():
- return TOKEN_PATH.read_text().strip()
- else:
- from azure.identity import AzureCliCredential
- return AzureCliCredential()
- @lru_cache
- def get_container_sas(account_name: str, container_name: str):
- from azure.storage.blob import BlobServiceClient, ContainerSasPermissions, generate_container_sas
- start_time = datetime.utcnow()
- expiry_time = start_time + timedelta(hours=1)
- blob_service = BlobServiceClient(
- account_url=f"https://{account_name}.blob.core.windows.net",
- credential=get_azure_credential(),
- )
- return generate_container_sas(
- account_name,
- container_name,
- user_delegation_key=blob_service.get_user_delegation_key(start_time, expiry_time),
- permission=ContainerSasPermissions(read=True, write=True, list=True),
- expiry=expiry_time,
- )
- class AzureContainer:
- def __init__(self, account, container):
- self.ACCOUNT = account
- self.CONTAINER = container
- @property
- def ACCOUNT_URL(self) -> str:
- return f"https://{self.ACCOUNT}.blob.core.windows.net"
- @property
- def BASE_URL(self) -> str:
- return f"{self.ACCOUNT_URL}/{self.CONTAINER}/"
- def get_client_and_key(self):
- from azure.storage.blob import ContainerClient
- client = ContainerClient(self.ACCOUNT_URL, self.CONTAINER, credential=get_azure_credential())
- key = get_container_sas(self.ACCOUNT, self.CONTAINER)
- return client, key
- def get_url(self, route_name: str, segment_num, log_type="rlog") -> str:
- ext = "hevc" if log_type.endswith('camera') else "bz2"
- return self.BASE_URL + f"{route_name.replace('|', '/')}/{segment_num}/{log_type}.{ext}"
- def upload_bytes(self, data: bytes | IO, blob_name: str) -> str:
- from azure.storage.blob import BlobClient
- blob = BlobClient(
- account_url=self.ACCOUNT_URL,
- container_name=self.CONTAINER,
- blob_name=blob_name,
- credential=get_azure_credential(),
- overwrite=False,
- )
- blob.upload_blob(data)
- return self.BASE_URL + blob_name
- def upload_file(self, path: str | os.PathLike, blob_name: str) -> str:
- with open(path, "rb") as f:
- return self.upload_bytes(f, blob_name)
|