util.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. import collections
  2. import hashlib
  3. import json
  4. import os
  5. import random
  6. import string
  7. import subprocess
  8. import time
  9. from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
  10. import requests
  11. from ray_release.logger import logger
  12. from ray_release.configs.global_config import get_global_config
  13. if TYPE_CHECKING:
  14. from anyscale.sdk.anyscale_client.sdk import AnyscaleSDK
  15. class DeferredEnvVar:
  16. def __init__(self, var: str, default: Optional[str] = None):
  17. self._var = var
  18. self._default = default
  19. def __str__(self):
  20. return os.environ.get(self._var, self._default)
  21. ANYSCALE_HOST = DeferredEnvVar("ANYSCALE_HOST", "https://console.anyscale.com")
  22. S3_CLOUD_STORAGE = "s3"
  23. GS_CLOUD_STORAGE = "gs"
  24. GS_BUCKET = "anyscale-oss-dev-bucket"
  25. ERROR_LOG_PATTERNS = [
  26. "ERROR",
  27. "Traceback (most recent call last)",
  28. ]
  29. def get_read_state_machine_aws_bucket(allow_pr_bucket: bool = False) -> str:
  30. # We support by default reading from the branch bucket only, since most of the use
  31. # cases are on branch pipelines. Changing the default flag to read from the bucket
  32. # according to the current pipeline
  33. if allow_pr_bucket:
  34. return get_write_state_machine_aws_bucket()
  35. return get_global_config()["state_machine_branch_aws_bucket"]
  36. def get_write_state_machine_aws_bucket() -> str:
  37. # We support different buckets for writing test result data; one for pr and one for
  38. # branch. This is because pr and branch pipeline have different permissions, and we
  39. # want data on branch pipeline being protected.
  40. pipeline_id = os.environ.get("BUILDKITE_PIPELINE_ID")
  41. pr_pipelines = get_global_config()["ci_pipeline_premerge"]
  42. branch_pipelines = get_global_config()["ci_pipeline_postmerge"]
  43. assert pipeline_id in pr_pipelines + branch_pipelines, (
  44. "Test state machine is only supported for branch or pr pipeline, "
  45. f"{pipeline_id} is given"
  46. )
  47. if pipeline_id in pr_pipelines:
  48. return get_global_config()["state_machine_pr_aws_bucket"]
  49. return get_global_config()["state_machine_branch_aws_bucket"]
  50. def deep_update(d, u) -> Dict:
  51. for k, v in u.items():
  52. if isinstance(v, collections.abc.Mapping):
  53. d[k] = deep_update(d.get(k, {}), v)
  54. else:
  55. d[k] = v
  56. return d
  57. def dict_hash(dt: Dict[Any, Any]) -> str:
  58. json_str = json.dumps(dt, sort_keys=True, ensure_ascii=True)
  59. sha = hashlib.sha256()
  60. sha.update(json_str.encode())
  61. return sha.hexdigest()
  62. def url_exists(url: str) -> bool:
  63. try:
  64. return requests.head(url, allow_redirects=True).status_code == 200
  65. except requests.exceptions.RequestException:
  66. logger.exception(f"Failed to check url exists: {url}")
  67. return False
  68. def resolve_url(url: str) -> str:
  69. return requests.head(url, allow_redirects=True).url
  70. def format_link(link: str) -> str:
  71. # Use ANSI escape code to allow link to be clickable
  72. # https://buildkite.com/docs/pipelines/links-and-images
  73. # -in-log-output
  74. if os.environ.get("BUILDKITE_COMMIT") and link:
  75. return "\033]1339;url='" + link + "'\a\n"
  76. # Else, no buildkite:
  77. return link
  78. def anyscale_project_url(project_id: str) -> str:
  79. return (
  80. f"{ANYSCALE_HOST}"
  81. f"/o/anyscale-internal/projects/{project_id}"
  82. f"/?tab=session-list"
  83. )
  84. def anyscale_cluster_url(project_id: str, cluster_id: str) -> str:
  85. return (
  86. f"{ANYSCALE_HOST}"
  87. f"/o/anyscale-internal/projects/{project_id}"
  88. f"/clusters/{cluster_id}"
  89. )
  90. def anyscale_cluster_compute_url(compute_tpl_id: str) -> str:
  91. return (
  92. f"{ANYSCALE_HOST}"
  93. f"/o/anyscale-internal/configurations/cluster-computes"
  94. f"/{compute_tpl_id}"
  95. )
  96. def anyscale_cluster_env_build_url(build_id: str) -> str:
  97. return (
  98. f"{ANYSCALE_HOST}"
  99. f"/o/anyscale-internal/configurations/app-config-details"
  100. f"/{build_id}"
  101. )
  102. def anyscale_job_url(job_id: str) -> str:
  103. return f"{ANYSCALE_HOST}/o/anyscale-internal/jobs/{job_id}"
  104. _anyscale_sdk = None
  105. def get_anyscale_sdk(use_cache: bool = True) -> "AnyscaleSDK":
  106. from anyscale.sdk.anyscale_client.sdk import AnyscaleSDK
  107. global _anyscale_sdk
  108. if use_cache and _anyscale_sdk:
  109. return _anyscale_sdk
  110. _anyscale_sdk = AnyscaleSDK(host=str(ANYSCALE_HOST))
  111. return _anyscale_sdk
  112. def exponential_backoff_retry(
  113. f, retry_exceptions, initial_retry_delay_s, max_retries
  114. ) -> None:
  115. retry_cnt = 0
  116. retry_delay_s = initial_retry_delay_s
  117. while True:
  118. try:
  119. return f()
  120. except retry_exceptions as e:
  121. retry_cnt += 1
  122. if retry_cnt > max_retries:
  123. raise
  124. logger.exception(
  125. f"Retry function call failed due to {e} "
  126. f"in {retry_delay_s} seconds..."
  127. )
  128. time.sleep(retry_delay_s)
  129. retry_delay_s *= 2
  130. def run_bash_script(bash_script: str) -> None:
  131. subprocess.run(f"bash {bash_script}", shell=True, check=True)
  132. def reinstall_anyscale_dependencies() -> None:
  133. logger.info("Re-installing `anyscale` package")
  134. subprocess.check_output(
  135. "pip install -U anyscale",
  136. shell=True,
  137. text=True,
  138. )
  139. def get_pip_packages() -> List[str]:
  140. from pip._internal.operations import freeze
  141. return list(freeze.freeze())
  142. def python_version_str(python_version: Tuple[int, int]) -> str:
  143. """From (X, Y) to XY"""
  144. return "".join([str(x) for x in python_version])
  145. def generate_tmp_cloud_storage_path() -> str:
  146. return "".join(random.choice(string.ascii_lowercase) for i in range(10))
  147. def join_cloud_storage_paths(*paths: str):
  148. paths = list(paths)
  149. if len(paths) > 1:
  150. for i in range(1, len(paths)):
  151. while paths[i][0] == "/":
  152. paths[i] = paths[i][1:]
  153. joined_path = os.path.join(*paths)
  154. while joined_path[-1] == "/":
  155. joined_path = joined_path[:-1]
  156. return joined_path