minimal.py 11 KB


  1. import time
  2. from ray_release.exception import (
  3. ClusterEnvBuildError,
  4. ClusterEnvBuildTimeout,
  5. ClusterEnvCreateError,
  6. ClusterComputeCreateError,
  7. )
  8. from ray_release.logger import logger
  9. from ray_release.cluster_manager.cluster_manager import ClusterManager
  10. from ray_release.util import format_link, anyscale_cluster_env_build_url
  11. from retry import retry
  12. REPORT_S = 30.0
  13. class MinimalClusterManager(ClusterManager):
  14. """Minimal manager.
  15. Builds app config and compute template but does not start or stop session.
  16. """
  17. @retry((ClusterEnvCreateError), delay=10, jitter=5, tries=2)
  18. def create_cluster_env(self):
  19. assert self.cluster_env_id is None
  20. if self.cluster_env:
  21. assert self.cluster_env_name
  22. logger.info(
  23. f"Test uses a cluster env with name "
  24. f"{self.cluster_env_name}. Looking up existing "
  25. f"cluster envs with this name."
  26. )
  27. paging_token = None
  28. while not self.cluster_env_id:
  29. result = self.sdk.search_cluster_environments(
  30. dict(
  31. project_id=self.project_id,
  32. name=dict(equals=self.cluster_env_name),
  33. paging=dict(count=50, paging_token=paging_token),
  34. )
  35. )
  36. paging_token = result.metadata.next_paging_token
  37. for res in result.results:
  38. if res.name == self.cluster_env_name:
  39. self.cluster_env_id = res.id
  40. logger.info(
  41. f"Cluster env already exists with ID "
  42. f"{self.cluster_env_id}"
  43. )
  44. break
  45. if not paging_token or self.cluster_env_id:
  46. break
  47. if not self.cluster_env_id:
  48. logger.info("Cluster env not found. Creating new one.")
  49. try:
  50. result = self.sdk.create_cluster_environment(
  51. dict(
  52. name=self.cluster_env_name,
  53. project_id=self.project_id,
  54. config_json=self.cluster_env,
  55. )
  56. )
  57. self.cluster_env_id = result.result.id
  58. except Exception as e:
  59. logger.warning(
  60. f"Got exception when trying to create cluster "
  61. f"env: {e}. Sleeping for 10 seconds with jitter and then "
  62. f"try again..."
  63. )
  64. raise ClusterEnvCreateError("Could not create cluster env.") from e
  65. logger.info(f"Cluster env created with ID {self.cluster_env_id}")
  66. def build_cluster_env(self, timeout: float = 600.0):
  67. assert self.cluster_env_id
  68. assert self.cluster_env_build_id is None
  69. # Fetch build
  70. build_id = None
  71. last_status = None
  72. error_message = None
  73. config_json = None
  74. result = self.sdk.list_cluster_environment_builds(self.cluster_env_id)
  75. if not result or not result.results:
  76. raise ClusterEnvBuildError(f"No build found for cluster env: {result}")
  77. build = sorted(result.results, key=lambda b: b.created_at)[-1]
  78. build_id = build.id
  79. last_status = build.status
  80. error_message = build.error_message
  81. config_json = build.config_json
  82. if last_status == "succeeded":
  83. logger.info(
  84. f"Link to succeeded cluster env build: "
  85. f"{format_link(anyscale_cluster_env_build_url(build_id))}"
  86. )
  87. self.cluster_env_build_id = build_id
  88. return
  89. if last_status == "failed":
  90. logger.info(f"Previous cluster env build failed: {error_message}")
  91. logger.info("Starting new cluster env build...")
  92. # Retry build
  93. result = self.sdk.create_cluster_environment_build(
  94. dict(
  95. cluster_environment_id=self.cluster_env_id, config_json=config_json
  96. )
  97. )
  98. build_id = result.result.id
  99. logger.info(
  100. f"Link to created cluster env build: "
  101. f"{format_link(anyscale_cluster_env_build_url(build_id))}"
  102. )
  103. # Build found but not failed/finished yet
  104. completed = False
  105. start_wait = time.time()
  106. next_report = start_wait + REPORT_S
  107. timeout_at = time.monotonic() + timeout
  108. logger.info(f"Waiting for build {build_id} to finish...")
  109. logger.info(
  110. f"Track progress here: "
  111. f"{format_link(anyscale_cluster_env_build_url(build_id))}"
  112. )
  113. while not completed:
  114. now = time.time()
  115. if now > next_report:
  116. logger.info(
  117. f"... still waiting for build {build_id} to finish "
  118. f"({int(now - start_wait)} seconds) ..."
  119. )
  120. next_report = next_report + REPORT_S
  121. result = self.sdk.get_build(build_id)
  122. build = result.result
  123. if build.status == "failed":
  124. raise ClusterEnvBuildError(
  125. f"Cluster env build failed. Please see "
  126. f"{anyscale_cluster_env_build_url(build_id)} for details. "
  127. f"Error message: {build.error_message}"
  128. )
  129. if build.status == "succeeded":
  130. logger.info("Build succeeded.")
  131. self.cluster_env_build_id = build_id
  132. return
  133. completed = build.status not in ["in_progress", "pending"]
  134. if completed:
  135. raise ClusterEnvBuildError(
  136. f"Unknown build status: {build.status}. Please see "
  137. f"{anyscale_cluster_env_build_url(build_id)} for details"
  138. )
  139. if time.monotonic() > timeout_at:
  140. raise ClusterEnvBuildTimeout(
  141. f"Time out when building cluster env {self.cluster_env_name}"
  142. )
  143. time.sleep(1)
  144. self.cluster_env_build_id = build_id
  145. def fetch_build_info(self):
  146. assert self.cluster_env_build_id
  147. result = self.sdk.get_cluster_environment_build(self.cluster_env_build_id)
  148. self.cluster_env = result.result.config_json
  149. def create_cluster_compute(self, _repeat: bool = True):
  150. assert self.cluster_compute_id is None
  151. if self.cluster_compute:
  152. assert self.cluster_compute
  153. logger.info(
  154. f"Tests uses compute template "
  155. f"with name {self.cluster_compute_name}. "
  156. f"Looking up existing cluster computes."
  157. )
  158. paging_token = None
  159. while not self.cluster_compute_id:
  160. result = self.sdk.search_cluster_computes(
  161. dict(
  162. project_id=self.project_id,
  163. name=dict(equals=self.cluster_compute_name),
  164. include_anonymous=True,
  165. paging=dict(paging_token=paging_token),
  166. )
  167. )
  168. paging_token = result.metadata.next_paging_token
  169. for res in result.results:
  170. if res.name == self.cluster_compute_name:
  171. self.cluster_compute_id = res.id
  172. logger.info(
  173. f"Cluster compute already exists "
  174. f"with ID {self.cluster_compute_id}"
  175. )
  176. break
  177. if not paging_token:
  178. break
  179. if not self.cluster_compute_id:
  180. logger.info(
  181. f"Cluster compute not found. "
  182. f"Creating with name {self.cluster_compute_name}."
  183. )
  184. try:
  185. result = self.sdk.create_cluster_compute(
  186. dict(
  187. name=self.cluster_compute_name,
  188. project_id=self.project_id,
  189. config=self.cluster_compute,
  190. )
  191. )
  192. self.cluster_compute_id = result.result.id
  193. except Exception as e:
  194. if _repeat:
  195. logger.warning(
  196. f"Got exception when trying to create cluster "
  197. f"compute: {e}. Sleeping for 10 seconds and then "
  198. f"try again once..."
  199. )
  200. time.sleep(10)
  201. return self.create_cluster_compute(_repeat=False)
  202. raise ClusterComputeCreateError(
  203. "Could not create cluster compute"
  204. ) from e
  205. logger.info(
  206. f"Cluster compute template created with "
  207. f"name {self.cluster_compute_name} and "
  208. f"ID {self.cluster_compute_id}"
  209. )
  210. def build_configs(self, timeout: float = 30.0):
  211. try:
  212. self.create_cluster_compute()
  213. except AssertionError as e:
  214. # If already exists, ignore
  215. logger.warning(str(e))
  216. except ClusterComputeCreateError as e:
  217. raise e
  218. except Exception as e:
  219. raise ClusterComputeCreateError(
  220. f"Unexpected cluster compute build error: {e}"
  221. ) from e
  222. try:
  223. self.create_cluster_env()
  224. except AssertionError as e:
  225. # If already exists, ignore
  226. logger.warning(str(e))
  227. except ClusterEnvCreateError as e:
  228. raise e
  229. except Exception as e:
  230. raise ClusterEnvCreateError(
  231. f"Unexpected cluster env create error: {e}"
  232. ) from e
  233. try:
  234. self.build_cluster_env(timeout=timeout)
  235. except AssertionError as e:
  236. # If already exists, ignore
  237. logger.warning(str(e))
  238. except (ClusterEnvBuildError, ClusterEnvBuildTimeout) as e:
  239. raise e
  240. except Exception as e:
  241. raise ClusterEnvBuildError(
  242. f"Unexpected cluster env build error: {e}"
  243. ) from e
  244. def delete_configs(self):
  245. if self.cluster_id:
  246. self.sdk.delete_cluster(self.cluster_id)
  247. if self.cluster_env_build_id:
  248. self.sdk.delete_cluster_environment_build(self.cluster_env_build_id)
  249. if self.cluster_env_id:
  250. self.sdk.delete_cluster_environment(self.cluster_env_id)
  251. if self.cluster_compute_id:
  252. self.sdk.delete_cluster_compute(self.cluster_compute_id)
  253. def start_cluster(self, timeout: float = 600.0):
  254. pass
  255. def terminate_cluster_ex(self, wait: bool = False):
  256. pass
  257. def get_cluster_address(self) -> str:
  258. return f"anyscale://{self.project_name}/{self.cluster_name}"