minimal.py 11 KB


  1. import time
  2. from ray_release.exception import (
  3. ClusterEnvBuildError,
  4. ClusterEnvBuildTimeout,
  5. ClusterEnvCreateError,
  6. ClusterComputeCreateError,
  7. )
  8. from ray_release.logger import logger
  9. from ray_release.cluster_manager.cluster_manager import ClusterManager
  10. from ray_release.util import format_link, anyscale_cluster_env_build_url
  11. from retry import retry
  12. REPORT_S = 30.0
  13. class MinimalClusterManager(ClusterManager):
  14. """Minimal manager.
  15. Builds app config and compute template but does not start or stop session.
  16. """
  17. @retry((ClusterEnvCreateError), delay=10, jitter=5, tries=2)
  18. def create_cluster_env(self):
  19. assert self.cluster_env_id is None
  20. assert self.cluster_env_name
  21. logger.info(
  22. f"Test uses a cluster env with name "
  23. f"{self.cluster_env_name}. Looking up existing "
  24. f"cluster envs with this name."
  25. )
  26. paging_token = None
  27. while not self.cluster_env_id:
  28. result = self.sdk.search_cluster_environments(
  29. dict(
  30. name=dict(equals=self.cluster_env_name),
  31. paging=dict(count=50, paging_token=paging_token),
  32. project_id=None,
  33. )
  34. )
  35. paging_token = result.metadata.next_paging_token
  36. for res in result.results:
  37. if res.name == self.cluster_env_name:
  38. self.cluster_env_id = res.id
  39. logger.info(
  40. f"Cluster env already exists with ID " f"{self.cluster_env_id}"
  41. )
  42. break
  43. if not paging_token or self.cluster_env_id:
  44. break
  45. if not self.cluster_env_id:
  46. logger.info("Cluster env not found. Creating new one.")
  47. try:
  48. result = self.sdk.create_byod_cluster_environment(
  49. dict(
  50. name=self.cluster_env_name,
  51. config_json=dict(
  52. docker_image=self.test.get_anyscale_byod_image(),
  53. ray_version="nightly",
  54. env_vars=self.test.get_byod_runtime_env(),
  55. ),
  56. )
  57. )
  58. self.cluster_env_id = result.result.id
  59. except Exception as e:
  60. logger.warning(
  61. f"Got exception when trying to create cluster "
  62. f"env: {e}. Sleeping for 10 seconds with jitter and then "
  63. f"try again..."
  64. )
  65. raise ClusterEnvCreateError("Could not create cluster env.") from e
  66. logger.info(f"Cluster env created with ID {self.cluster_env_id}")
  67. def build_cluster_env(self, timeout: float = 600.0):
  68. assert self.cluster_env_id
  69. assert self.cluster_env_build_id is None
  70. # Fetch build
  71. build_id = None
  72. last_status = None
  73. error_message = None
  74. config_json = None
  75. result = self.sdk.list_cluster_environment_builds(self.cluster_env_id)
  76. if not result or not result.results:
  77. raise ClusterEnvBuildError(f"No build found for cluster env: {result}")
  78. build = sorted(result.results, key=lambda b: b.created_at)[-1]
  79. build_id = build.id
  80. last_status = build.status
  81. error_message = build.error_message
  82. config_json = build.config_json
  83. if last_status == "succeeded":
  84. logger.info(
  85. f"Link to succeeded cluster env build: "
  86. f"{format_link(anyscale_cluster_env_build_url(build_id))}"
  87. )
  88. self.cluster_env_build_id = build_id
  89. return
  90. if last_status == "failed":
  91. logger.info(f"Previous cluster env build failed: {error_message}")
  92. logger.info("Starting new cluster env build...")
  93. # Retry build
  94. result = self.sdk.create_cluster_environment_build(
  95. dict(
  96. cluster_environment_id=self.cluster_env_id, config_json=config_json
  97. )
  98. )
  99. build_id = result.result.id
  100. logger.info(
  101. f"Link to created cluster env build: "
  102. f"{format_link(anyscale_cluster_env_build_url(build_id))}"
  103. )
  104. # Build found but not failed/finished yet
  105. completed = False
  106. start_wait = time.time()
  107. next_report = start_wait + REPORT_S
  108. timeout_at = time.monotonic() + timeout
  109. logger.info(f"Waiting for build {build_id} to finish...")
  110. logger.info(
  111. f"Track progress here: "
  112. f"{format_link(anyscale_cluster_env_build_url(build_id))}"
  113. )
  114. while not completed:
  115. now = time.time()
  116. if now > next_report:
  117. logger.info(
  118. f"... still waiting for build {build_id} to finish "
  119. f"({int(now - start_wait)} seconds) ..."
  120. )
  121. next_report = next_report + REPORT_S
  122. result = self.sdk.get_build(build_id)
  123. build = result.result
  124. if build.status == "failed":
  125. raise ClusterEnvBuildError(
  126. f"Cluster env build failed. Please see "
  127. f"{anyscale_cluster_env_build_url(build_id)} for details. "
  128. f"Error message: {build.error_message}"
  129. )
  130. if build.status == "succeeded":
  131. logger.info("Build succeeded.")
  132. self.cluster_env_build_id = build_id
  133. return
  134. completed = build.status not in ["in_progress", "pending"]
  135. if completed:
  136. raise ClusterEnvBuildError(
  137. f"Unknown build status: {build.status}. Please see "
  138. f"{anyscale_cluster_env_build_url(build_id)} for details"
  139. )
  140. if time.monotonic() > timeout_at:
  141. raise ClusterEnvBuildTimeout(
  142. f"Time out when building cluster env {self.cluster_env_name}"
  143. )
  144. time.sleep(1)
  145. self.cluster_env_build_id = build_id
  146. def create_cluster_compute(self, _repeat: bool = True):
  147. assert self.cluster_compute_id is None
  148. if self.cluster_compute:
  149. assert self.cluster_compute
  150. logger.info(
  151. f"Tests uses compute template "
  152. f"with name {self.cluster_compute_name}. "
  153. f"Looking up existing cluster computes."
  154. )
  155. paging_token = None
  156. while not self.cluster_compute_id:
  157. result = self.sdk.search_cluster_computes(
  158. dict(
  159. project_id=self.project_id,
  160. name=dict(equals=self.cluster_compute_name),
  161. include_anonymous=True,
  162. paging=dict(paging_token=paging_token),
  163. )
  164. )
  165. paging_token = result.metadata.next_paging_token
  166. for res in result.results:
  167. if res.name == self.cluster_compute_name:
  168. self.cluster_compute_id = res.id
  169. logger.info(
  170. f"Cluster compute already exists "
  171. f"with ID {self.cluster_compute_id}"
  172. )
  173. break
  174. if not paging_token:
  175. break
  176. if not self.cluster_compute_id:
  177. logger.info(
  178. f"Cluster compute not found. "
  179. f"Creating with name {self.cluster_compute_name}."
  180. )
  181. try:
  182. result = self.sdk.create_cluster_compute(
  183. dict(
  184. name=self.cluster_compute_name,
  185. project_id=self.project_id,
  186. config=self.cluster_compute,
  187. )
  188. )
  189. self.cluster_compute_id = result.result.id
  190. except Exception as e:
  191. if _repeat:
  192. logger.warning(
  193. f"Got exception when trying to create cluster "
  194. f"compute: {e}. Sleeping for 10 seconds and then "
  195. f"try again once..."
  196. )
  197. time.sleep(10)
  198. return self.create_cluster_compute(_repeat=False)
  199. raise ClusterComputeCreateError(
  200. "Could not create cluster compute"
  201. ) from e
  202. logger.info(
  203. f"Cluster compute template created with "
  204. f"name {self.cluster_compute_name} and "
  205. f"ID {self.cluster_compute_id}"
  206. )
  207. def build_configs(self, timeout: float = 30.0):
  208. try:
  209. self.create_cluster_compute()
  210. except AssertionError as e:
  211. # If already exists, ignore
  212. logger.warning(str(e))
  213. except ClusterComputeCreateError as e:
  214. raise e
  215. except Exception as e:
  216. raise ClusterComputeCreateError(
  217. f"Unexpected cluster compute build error: {e}"
  218. ) from e
  219. try:
  220. self.create_cluster_env()
  221. except AssertionError as e:
  222. # If already exists, ignore
  223. logger.warning(str(e))
  224. except ClusterEnvCreateError as e:
  225. raise e
  226. except Exception as e:
  227. raise ClusterEnvCreateError(
  228. f"Unexpected cluster env create error: {e}"
  229. ) from e
  230. try:
  231. self.build_cluster_env(timeout=timeout)
  232. except AssertionError as e:
  233. # If already exists, ignore
  234. logger.warning(str(e))
  235. except (ClusterEnvBuildError, ClusterEnvBuildTimeout) as e:
  236. raise e
  237. except Exception as e:
  238. raise ClusterEnvBuildError(
  239. f"Unexpected cluster env build error: {e}"
  240. ) from e
  241. def delete_configs(self):
  242. if self.cluster_id:
  243. self.sdk.delete_cluster(self.cluster_id)
  244. if self.cluster_env_build_id:
  245. self.sdk.delete_cluster_environment_build(self.cluster_env_build_id)
  246. if self.cluster_env_id:
  247. self.sdk.delete_cluster_environment(self.cluster_env_id)
  248. if self.cluster_compute_id:
  249. self.sdk.delete_cluster_compute(self.cluster_compute_id)
  250. def start_cluster(self, timeout: float = 600.0):
  251. pass
  252. def terminate_cluster_ex(self, wait: bool = False):
  253. pass
  254. def get_cluster_address(self) -> str:
  255. return f"anyscale://{self.project_name}/{self.cluster_name}"