123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- import collections
- import hashlib
- import json
- import os
- import random
- import string
- import subprocess
- import time
- from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
- import requests
- from ray_release.logger import logger
- from ray_release.configs.global_config import get_global_config
- if TYPE_CHECKING:
- from anyscale.sdk.anyscale_client.sdk import AnyscaleSDK
- class DeferredEnvVar:
- def __init__(self, var: str, default: Optional[str] = None):
- self._var = var
- self._default = default
- def __str__(self):
- return os.environ.get(self._var, self._default)
- ANYSCALE_HOST = DeferredEnvVar("ANYSCALE_HOST", "https://console.anyscale.com")
- S3_CLOUD_STORAGE = "s3"
- GS_CLOUD_STORAGE = "gs"
- GS_BUCKET = "anyscale-oss-dev-bucket"
- ERROR_LOG_PATTERNS = [
- "ERROR",
- "Traceback (most recent call last)",
- ]
- def get_read_state_machine_aws_bucket(allow_pr_bucket: bool = False) -> str:
- # We support by default reading from the branch bucket only, since most of the use
- # cases are on branch pipelines. Changing the default flag to read from the bucket
- # according to the current pipeline
- if allow_pr_bucket:
- return get_write_state_machine_aws_bucket()
- return get_global_config()["state_machine_branch_aws_bucket"]
- def get_write_state_machine_aws_bucket() -> str:
- # We support different buckets for writing test result data; one for pr and one for
- # branch. This is because pr and branch pipeline have different permissions, and we
- # want data on branch pipeline being protected.
- pipeline_id = os.environ.get("BUILDKITE_PIPELINE_ID")
- pr_pipelines = get_global_config()["ci_pipeline_premerge"]
- branch_pipelines = get_global_config()["ci_pipeline_postmerge"]
- assert pipeline_id in pr_pipelines + branch_pipelines, (
- "Test state machine is only supported for branch or pr pipeline, "
- f"{pipeline_id} is given"
- )
- if pipeline_id in pr_pipelines:
- return get_global_config()["state_machine_pr_aws_bucket"]
- return get_global_config()["state_machine_branch_aws_bucket"]
- def deep_update(d, u) -> Dict:
- for k, v in u.items():
- if isinstance(v, collections.abc.Mapping):
- d[k] = deep_update(d.get(k, {}), v)
- else:
- d[k] = v
- return d
- def dict_hash(dt: Dict[Any, Any]) -> str:
- json_str = json.dumps(dt, sort_keys=True, ensure_ascii=True)
- sha = hashlib.sha256()
- sha.update(json_str.encode())
- return sha.hexdigest()
- def url_exists(url: str) -> bool:
- try:
- return requests.head(url, allow_redirects=True).status_code == 200
- except requests.exceptions.RequestException:
- logger.exception(f"Failed to check url exists: {url}")
- return False
- def resolve_url(url: str) -> str:
- return requests.head(url, allow_redirects=True).url
- def format_link(link: str) -> str:
- # Use ANSI escape code to allow link to be clickable
- # https://buildkite.com/docs/pipelines/links-and-images
- # -in-log-output
- if os.environ.get("BUILDKITE_COMMIT") and link:
- return "\033]1339;url='" + link + "'\a\n"
- # Else, no buildkite:
- return link
- def anyscale_project_url(project_id: str) -> str:
- return (
- f"{ANYSCALE_HOST}"
- f"/o/anyscale-internal/projects/{project_id}"
- f"/?tab=session-list"
- )
- def anyscale_cluster_url(project_id: str, cluster_id: str) -> str:
- return (
- f"{ANYSCALE_HOST}"
- f"/o/anyscale-internal/projects/{project_id}"
- f"/clusters/{cluster_id}"
- )
- def anyscale_cluster_compute_url(compute_tpl_id: str) -> str:
- return (
- f"{ANYSCALE_HOST}"
- f"/o/anyscale-internal/configurations/cluster-computes"
- f"/{compute_tpl_id}"
- )
- def anyscale_cluster_env_build_url(build_id: str) -> str:
- return (
- f"{ANYSCALE_HOST}"
- f"/o/anyscale-internal/configurations/app-config-details"
- f"/{build_id}"
- )
- def anyscale_job_url(job_id: str) -> str:
- return f"{ANYSCALE_HOST}/o/anyscale-internal/jobs/{job_id}"
- _anyscale_sdk = None
- def get_anyscale_sdk(use_cache: bool = True) -> "AnyscaleSDK":
- from anyscale.sdk.anyscale_client.sdk import AnyscaleSDK
- global _anyscale_sdk
- if use_cache and _anyscale_sdk:
- return _anyscale_sdk
- _anyscale_sdk = AnyscaleSDK(host=str(ANYSCALE_HOST))
- return _anyscale_sdk
- def exponential_backoff_retry(
- f, retry_exceptions, initial_retry_delay_s, max_retries
- ) -> None:
- retry_cnt = 0
- retry_delay_s = initial_retry_delay_s
- while True:
- try:
- return f()
- except retry_exceptions as e:
- retry_cnt += 1
- if retry_cnt > max_retries:
- raise
- logger.exception(
- f"Retry function call failed due to {e} "
- f"in {retry_delay_s} seconds..."
- )
- time.sleep(retry_delay_s)
- retry_delay_s *= 2
- def run_bash_script(bash_script: str) -> None:
- subprocess.run(f"bash {bash_script}", shell=True, check=True)
- def reinstall_anyscale_dependencies() -> None:
- logger.info("Re-installing `anyscale` package")
- subprocess.check_output(
- "pip install -U anyscale",
- shell=True,
- text=True,
- )
- def get_pip_packages() -> List[str]:
- from pip._internal.operations import freeze
- return list(freeze.freeze())
- def python_version_str(python_version: Tuple[int, int]) -> str:
- """From (X, Y) to XY"""
- return "".join([str(x) for x in python_version])
- def generate_tmp_cloud_storage_path() -> str:
- return "".join(random.choice(string.ascii_lowercase) for i in range(10))
- def join_cloud_storage_paths(*paths: str):
- paths = list(paths)
- if len(paths) > 1:
- for i in range(1, len(paths)):
- while paths[i][0] == "/":
- paths[i] = paths[i][1:]
- joined_path = os.path.join(*paths)
- while joined_path[-1] == "/":
- joined_path = joined_path[:-1]
- return joined_path
|