full.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. import time
  2. from ray_release.exception import (
  3. ClusterCreationError,
  4. ClusterStartupError,
  5. ClusterStartupTimeout,
  6. ClusterStartupFailed,
  7. )
  8. from ray_release.logger import logger
  9. from ray_release.cluster_manager.minimal import MinimalClusterManager
  10. from ray_release.util import (
  11. format_link,
  12. anyscale_cluster_url,
  13. exponential_backoff_retry,
  14. )
  15. REPORT_S = 30.0
  16. class FullClusterManager(MinimalClusterManager):
  17. """Full manager.
  18. Builds app config and compute template and starts/terminated session
  19. using SDK.
  20. """
  21. def start_cluster(self, timeout: float = 600.0):
  22. logger.info(f"Creating cluster {self.cluster_name}")
  23. logger.info(f"Autosuspend time: {self.autosuspend_minutes} minutes")
  24. logger.info(f"Auto terminate after: {self.maximum_uptime_minutes} minutes")
  25. try:
  26. result = self.sdk.create_cluster(
  27. dict(
  28. name=self.cluster_name,
  29. project_id=self.project_id,
  30. cluster_environment_build_id=self.cluster_env_build_id,
  31. cluster_compute_id=self.cluster_compute_id,
  32. idle_timeout_minutes=self.autosuspend_minutes,
  33. )
  34. )
  35. self.cluster_id = result.result.id
  36. except Exception as e:
  37. raise ClusterCreationError(f"Error creating cluster: {e}") from e
  38. # Trigger session start
  39. logger.info(f"Starting cluster {self.cluster_name} ({self.cluster_id})")
  40. cluster_url = anyscale_cluster_url(
  41. project_id=self.project_id, cluster_id=self.cluster_id
  42. )
  43. logger.info(f"Link to cluster: {format_link(cluster_url)}")
  44. try:
  45. result = self.sdk.start_cluster(self.cluster_id, start_cluster_options={})
  46. cop_id = result.result.id
  47. completed = result.result.completed
  48. except Exception as e:
  49. raise ClusterStartupError(
  50. f"Error starting cluster with name "
  51. f"{self.cluster_name} and {self.cluster_id} ({cluster_url}): "
  52. f"{e}"
  53. ) from e
  54. # Wait for session
  55. logger.info(f"Waiting for cluster {self.cluster_name}...")
  56. start_time = time.monotonic()
  57. timeout_at = start_time + timeout
  58. next_status = start_time + 30
  59. while not completed:
  60. now = time.monotonic()
  61. if now >= timeout_at:
  62. raise ClusterStartupTimeout(
  63. f"Time out when creating cluster {self.cluster_name}"
  64. )
  65. if now >= next_status:
  66. logger.info(
  67. f"... still waiting for cluster {self.cluster_name} "
  68. f"({int(now - start_time)} seconds) ..."
  69. )
  70. next_status += 30
  71. # Sleep 1 sec before next check.
  72. time.sleep(1)
  73. result = exponential_backoff_retry(
  74. lambda: self.sdk.get_cluster_operation(cop_id, _request_timeout=30),
  75. retry_exceptions=Exception,
  76. initial_retry_delay_s=2,
  77. max_retries=3,
  78. )
  79. completed = result.result.completed
  80. result = self.sdk.get_cluster(self.cluster_id)
  81. if result.result.state != "Running":
  82. raise ClusterStartupFailed(
  83. f"Cluster did not come up - most likely the nodes are currently "
  84. f"not available. Please check the cluster startup logs: "
  85. f"{cluster_url} (cluster state: {result.result.state})"
  86. )
  87. def terminate_cluster_ex(self, wait: bool = False):
  88. if self.cluster_id:
  89. logger.info(f"Terminating cluster with ID {self.cluster_id}")
  90. # Just trigger a request. No need to wait until session shutdown.
  91. result = self.sdk.terminate_cluster(
  92. cluster_id=self.cluster_id, terminate_cluster_options={}
  93. )
  94. logger.info(f"Terminate request for cluster with ID {self.cluster_id} sent")
  95. if not wait:
  96. return
  97. # Only do this when waiting
  98. cop_id = result.result.id
  99. completed = result.result.completed
  100. while not completed:
  101. # Sleep 1 sec before next check.
  102. time.sleep(1)
  103. cluster_operation_response = self.sdk.get_cluster_operation(
  104. cop_id, _request_timeout=30
  105. )
  106. cluster_operation = cluster_operation_response.result
  107. completed = cluster_operation.completed
  108. result = self.sdk.get_cluster(self.cluster_id)
  109. while result.result.state != "Terminated":
  110. time.sleep(1)
  111. result = self.sdk.get_cluster(self.cluster_id)