util.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. import collections
  2. import hashlib
  3. import json
  4. import os
  5. import random
  6. import string
  7. import subprocess
  8. import time
  9. from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
  10. import requests
  11. from ray_release.logger import logger
  12. if TYPE_CHECKING:
  13. from anyscale.sdk.anyscale_client.sdk import AnyscaleSDK
  14. class DeferredEnvVar:
  15. def __init__(self, var: str, default: Optional[str] = None):
  16. self._var = var
  17. self._default = default
  18. def __str__(self):
  19. return os.environ.get(self._var, self._default)
  20. ANYSCALE_HOST = DeferredEnvVar("ANYSCALE_HOST", "https://console.anyscale.com")
  21. S3_CLOUD_STORAGE = "s3"
  22. GS_CLOUD_STORAGE = "gs"
  23. GS_BUCKET = "anyscale-oss-dev-bucket"
  24. ERROR_LOG_PATTERNS = [
  25. "ERROR",
  26. "Traceback (most recent call last)",
  27. ]
  28. def deep_update(d, u) -> Dict:
  29. for k, v in u.items():
  30. if isinstance(v, collections.abc.Mapping):
  31. d[k] = deep_update(d.get(k, {}), v)
  32. else:
  33. d[k] = v
  34. return d
  35. def dict_hash(dt: Dict[Any, Any]) -> str:
  36. json_str = json.dumps(dt, sort_keys=True, ensure_ascii=True)
  37. sha = hashlib.sha256()
  38. sha.update(json_str.encode())
  39. return sha.hexdigest()
  40. def url_exists(url: str) -> bool:
  41. try:
  42. return requests.head(url, allow_redirects=True).status_code == 200
  43. except requests.exceptions.RequestException:
  44. logger.exception(f"Failed to check url exists: {url}")
  45. return False
  46. def resolve_url(url: str) -> str:
  47. return requests.head(url, allow_redirects=True).url
  48. def format_link(link: str) -> str:
  49. # Use ANSI escape code to allow link to be clickable
  50. # https://buildkite.com/docs/pipelines/links-and-images
  51. # -in-log-output
  52. if os.environ.get("BUILDKITE_COMMIT") and link:
  53. return "\033]1339;url='" + link + "'\a\n"
  54. # Else, no buildkite:
  55. return link
  56. def anyscale_project_url(project_id: str) -> str:
  57. return (
  58. f"{ANYSCALE_HOST}"
  59. f"/o/anyscale-internal/projects/{project_id}"
  60. f"/?tab=session-list"
  61. )
  62. def anyscale_cluster_url(project_id: str, cluster_id: str) -> str:
  63. return (
  64. f"{ANYSCALE_HOST}"
  65. f"/o/anyscale-internal/projects/{project_id}"
  66. f"/clusters/{cluster_id}"
  67. )
  68. def anyscale_cluster_compute_url(compute_tpl_id: str) -> str:
  69. return (
  70. f"{ANYSCALE_HOST}"
  71. f"/o/anyscale-internal/configurations/cluster-computes"
  72. f"/{compute_tpl_id}"
  73. )
  74. def anyscale_cluster_env_build_url(build_id: str) -> str:
  75. return (
  76. f"{ANYSCALE_HOST}"
  77. f"/o/anyscale-internal/configurations/app-config-details"
  78. f"/{build_id}"
  79. )
  80. def anyscale_job_url(job_id: str) -> str:
  81. return f"{ANYSCALE_HOST}/o/anyscale-internal/jobs/{job_id}"
  82. _anyscale_sdk = None
  83. def get_anyscale_sdk(use_cache: bool = True) -> "AnyscaleSDK":
  84. from anyscale.sdk.anyscale_client.sdk import AnyscaleSDK
  85. global _anyscale_sdk
  86. if use_cache and _anyscale_sdk:
  87. return _anyscale_sdk
  88. _anyscale_sdk = AnyscaleSDK(host=str(ANYSCALE_HOST))
  89. return _anyscale_sdk
  90. def exponential_backoff_retry(
  91. f, retry_exceptions, initial_retry_delay_s, max_retries
  92. ) -> None:
  93. retry_cnt = 0
  94. retry_delay_s = initial_retry_delay_s
  95. while True:
  96. try:
  97. return f()
  98. except retry_exceptions as e:
  99. retry_cnt += 1
  100. if retry_cnt > max_retries:
  101. raise
  102. logger.exception(
  103. f"Retry function call failed due to {e} "
  104. f"in {retry_delay_s} seconds..."
  105. )
  106. time.sleep(retry_delay_s)
  107. retry_delay_s *= 2
  108. def run_bash_script(bash_script: str) -> None:
  109. subprocess.run(f"bash {bash_script}", shell=True, check=True)
  110. def reinstall_anyscale_dependencies() -> None:
  111. logger.info("Re-installing `anyscale` package")
  112. subprocess.check_output(
  113. "pip install -U anyscale",
  114. shell=True,
  115. text=True,
  116. )
  117. def get_pip_packages() -> List[str]:
  118. from pip._internal.operations import freeze
  119. return list(freeze.freeze())
  120. def python_version_str(python_version: Tuple[int, int]) -> str:
  121. """From (X, Y) to XY"""
  122. return "".join([str(x) for x in python_version])
  123. def generate_tmp_cloud_storage_path() -> str:
  124. return "".join(random.choice(string.ascii_lowercase) for i in range(10))
  125. def join_cloud_storage_paths(*paths: str):
  126. paths = list(paths)
  127. if len(paths) > 1:
  128. for i in range(1, len(paths)):
  129. while paths[i][0] == "/":
  130. paths[i] = paths[i][1:]
  131. joined_path = os.path.join(*paths)
  132. while joined_path[-1] == "/":
  133. joined_path = joined_path[:-1]
  134. return joined_path