glue.py 11 KB


  1. import os
  2. import time
  3. from typing import Optional, List
  4. from ray_release.alerts.handle import handle_result
  5. from ray_release.anyscale_util import get_cluster_name
  6. from ray_release.buildkite.output import buildkite_group, buildkite_open_last
  7. from ray_release.cluster_manager.full import FullClusterManager
  8. from ray_release.command_runner.client_runner import ClientRunner
  9. from ray_release.command_runner.job_runner import JobRunner
  10. from ray_release.command_runner.sdk_runner import SDKRunner
  11. from ray_release.config import (
  12. Test,
  13. DEFAULT_BUILD_TIMEOUT,
  14. DEFAULT_CLUSTER_TIMEOUT,
  15. DEFAULT_COMMAND_TIMEOUT,
  16. DEFAULT_WAIT_FOR_NODES_TIMEOUT,
  17. RELEASE_PACKAGE_DIR,
  18. DEFAULT_AUTOSUSPEND_MINS,
  19. validate_test,
  20. )
  21. from ray_release.template import load_test_cluster_env, load_test_cluster_compute
  22. from ray_release.exception import (
  23. ReleaseTestConfigError,
  24. ReleaseTestSetupError,
  25. CommandError,
  26. PrepareCommandError,
  27. CommandTimeout,
  28. PrepareCommandTimeout,
  29. TestCommandError,
  30. TestCommandTimeout,
  31. LocalEnvSetupError,
  32. ClusterEnvCreateError,
  33. )
  34. from ray_release.file_manager.job_file_manager import JobFileManager
  35. from ray_release.file_manager.remote_task import RemoteTaskFileManager
  36. from ray_release.file_manager.session_controller import SessionControllerFileManager
  37. from ray_release.logger import logger
  38. from ray_release.reporter.reporter import Reporter
  39. from ray_release.result import Result, handle_exception
  40. from ray_release.util import (
  41. run_bash_script,
  42. get_pip_packages,
  43. reinstall_anyscale_dependencies,
  44. )
  45. type_str_to_command_runner = {
  46. "command": SDKRunner,
  47. "sdk_command": SDKRunner,
  48. "job": JobRunner,
  49. "client": ClientRunner,
  50. }
  51. command_runner_to_cluster_manager = {
  52. SDKRunner: FullClusterManager,
  53. ClientRunner: FullClusterManager,
  54. JobRunner: FullClusterManager,
  55. }
  56. file_manager_str_to_file_manager = {
  57. "sdk": SessionControllerFileManager,
  58. "client": RemoteTaskFileManager,
  59. "job": JobFileManager,
  60. }
  61. command_runner_to_file_manager = {
  62. SDKRunner: SessionControllerFileManager,
  63. ClientRunner: RemoteTaskFileManager,
  64. JobFileManager: JobFileManager,
  65. }
  66. uploader_str_to_uploader = {"client": None, "s3": None, "command_runner": None}
  67. def run_release_test(
  68. test: Test,
  69. anyscale_project: str,
  70. result: Result,
  71. ray_wheels_url: str,
  72. reporters: Optional[List[Reporter]] = None,
  73. smoke_test: bool = False,
  74. cluster_id: Optional[str] = None,
  75. cluster_env_id: Optional[str] = None,
  76. no_terminate: bool = False,
  77. ) -> Result:
  78. buildkite_group(":spiral_note_pad: Loading test configuration")
  79. validate_test(test)
  80. result.wheels_url = ray_wheels_url
  81. result.stable = test.get("stable", True)
  82. result.smoke_test = smoke_test
  83. buildkite_url = os.getenv("BUILDKITE_BUILD_URL", "")
  84. if buildkite_url:
  85. buildkite_url += "#" + os.getenv("BUILDKITE_JOB_ID", "")
  86. result.buildkite_url = buildkite_url
  87. working_dir = test["working_dir"]
  88. old_wd = os.getcwd()
  89. new_wd = os.path.join(RELEASE_PACKAGE_DIR, working_dir)
  90. os.chdir(new_wd)
  91. start_time = time.monotonic()
  92. run_type = test["run"].get("type", "sdk_command")
  93. command_runner_cls = type_str_to_command_runner.get(run_type)
  94. if not command_runner_cls:
  95. raise ReleaseTestConfigError(
  96. f"Unknown command runner type: {run_type}. Must be one of "
  97. f"{list(type_str_to_command_runner.keys())}"
  98. )
  99. cluster_manager_cls = command_runner_to_cluster_manager[command_runner_cls]
  100. file_manager_str = test["run"].get("file_manager", None)
  101. if file_manager_str:
  102. if file_manager_str not in file_manager_str_to_file_manager:
  103. raise ReleaseTestConfigError(
  104. f"Unknown file manager: {file_manager_str}. Must be one of "
  105. f"{list(file_manager_str_to_file_manager.keys())}"
  106. )
  107. file_manager_cls = file_manager_str_to_file_manager[file_manager_str]
  108. else:
  109. file_manager_cls = command_runner_to_file_manager[command_runner_cls]
  110. # Instantiate managers and command runner
  111. try:
  112. cluster_manager = cluster_manager_cls(test["name"], anyscale_project)
  113. file_manager = file_manager_cls(cluster_manager=cluster_manager)
  114. command_runner = command_runner_cls(cluster_manager, file_manager, working_dir)
  115. except Exception as e:
  116. raise ReleaseTestSetupError(f"Error setting up release test: {e}") from e
  117. pipeline_exception = None
  118. try:
  119. # Load configs
  120. cluster_env = load_test_cluster_env(test, ray_wheels_url=ray_wheels_url)
  121. cluster_compute = load_test_cluster_compute(test)
  122. if cluster_env_id:
  123. try:
  124. cluster_manager.cluster_env_id = cluster_env_id
  125. cluster_manager.build_cluster_env()
  126. cluster_manager.fetch_build_info()
  127. logger.info(
  128. "Using overridden cluster environment with ID "
  129. f"{cluster_env_id} and build ID "
  130. f"{cluster_manager.cluster_env_build_id}"
  131. )
  132. except Exception as e:
  133. raise ClusterEnvCreateError(
  134. f"Could not get existing overridden cluster environment "
  135. f"{cluster_env_id}: {e}"
  136. ) from e
  137. else:
  138. cluster_manager.set_cluster_env(cluster_env)
  139. cluster_manager.set_cluster_compute(cluster_compute)
  140. buildkite_group(":nut_and_bolt: Setting up local environment")
  141. driver_setup_script = test.get("driver_setup", None)
  142. if driver_setup_script:
  143. try:
  144. run_bash_script(driver_setup_script)
  145. except Exception as e:
  146. raise LocalEnvSetupError(f"Driver setup script failed: {e}") from e
  147. # Install local dependencies
  148. command_runner.prepare_local_env(ray_wheels_url)
  149. command_timeout = test["run"].get("timeout", DEFAULT_COMMAND_TIMEOUT)
  150. # Re-install anyscale package as local dependencies might have changed
  151. # from local env setup
  152. reinstall_anyscale_dependencies()
  153. # Print installed pip packages
  154. buildkite_group(":bulb: Local environment information")
  155. pip_packages = get_pip_packages()
  156. pip_package_string = "\n".join(pip_packages)
  157. logger.info(f"Installed python packages:\n{pip_package_string}")
  158. # Start cluster
  159. if cluster_id:
  160. buildkite_group(":rocket: Using existing cluster")
  161. # Re-use existing cluster ID for development
  162. cluster_manager.cluster_id = cluster_id
  163. cluster_manager.cluster_name = get_cluster_name(cluster_id)
  164. else:
  165. buildkite_group(":gear: Building cluster environment")
  166. build_timeout = test["run"].get("build_timeout", DEFAULT_BUILD_TIMEOUT)
  167. if cluster_env_id:
  168. cluster_manager.cluster_env_id = cluster_env_id
  169. cluster_manager.build_configs(timeout=build_timeout)
  170. cluster_timeout = test["run"].get(
  171. "session_timeout", DEFAULT_CLUSTER_TIMEOUT
  172. )
  173. autosuspend_mins = test["cluster"].get("autosuspend_mins", None)
  174. if autosuspend_mins:
  175. cluster_manager.autosuspend_minutes = autosuspend_mins
  176. else:
  177. cluster_manager.autosuspend_minutes = min(
  178. DEFAULT_AUTOSUSPEND_MINS, int(command_timeout / 60) + 10
  179. )
  180. buildkite_group(":rocket: Starting up cluster")
  181. cluster_manager.start_cluster(timeout=cluster_timeout)
  182. result.cluster_url = cluster_manager.get_cluster_url()
  183. # Upload files
  184. buildkite_group(":wrench: Preparing remote environment")
  185. command_runner.prepare_remote_env()
  186. wait_for_nodes = test["run"].get("wait_for_nodes", None)
  187. if wait_for_nodes:
  188. buildkite_group(":stopwatch: Waiting for nodes to come up")
  189. num_nodes = test["run"]["wait_for_nodes"]["num_nodes"]
  190. wait_timeout = test["run"]["wait_for_nodes"].get(
  191. "timeout", DEFAULT_WAIT_FOR_NODES_TIMEOUT
  192. )
  193. command_runner.wait_for_nodes(num_nodes, wait_timeout)
  194. prepare_cmd = test["run"].get("prepare", None)
  195. if prepare_cmd:
  196. prepare_timeout = test["run"].get("prepare_timeout", command_timeout)
  197. try:
  198. command_runner.run_prepare_command(prepare_cmd, timeout=prepare_timeout)
  199. except CommandError as e:
  200. raise PrepareCommandError(e)
  201. except CommandTimeout as e:
  202. raise PrepareCommandTimeout(e)
  203. buildkite_group(":runner: Running test script")
  204. command = test["run"]["script"]
  205. command_env = {}
  206. if smoke_test:
  207. command = f"{command} --smoke-test"
  208. command_env["IS_SMOKE_TEST"] = "1"
  209. is_long_running = test["run"].get("long_running", False)
  210. try:
  211. command_runner.run_command(
  212. command, env=command_env, timeout=command_timeout
  213. )
  214. except CommandError as e:
  215. raise TestCommandError(e)
  216. except CommandTimeout as e:
  217. if not is_long_running:
  218. # Only raise error if command is not long running
  219. raise TestCommandTimeout(e)
  220. buildkite_group(":floppy_disk: Fetching results")
  221. try:
  222. command_results = command_runner.fetch_results()
  223. except Exception as e:
  224. logger.error("Could not fetch results for test command")
  225. logger.exception(e)
  226. command_results = {}
  227. # Postprocess result:
  228. if "last_update" in command_results:
  229. command_results["last_update_diff"] = time.time() - command_results.get(
  230. "last_update", 0.0
  231. )
  232. if smoke_test:
  233. command_results["smoke_test"] = True
  234. result.results = command_results
  235. result.status = "finished"
  236. except Exception as e:
  237. logger.exception(e)
  238. buildkite_open_last()
  239. pipeline_exception = e
  240. try:
  241. last_logs = command_runner.get_last_logs()
  242. except Exception as e:
  243. logger.error(f"Error fetching logs: {e}")
  244. last_logs = "No logs could be retrieved."
  245. result.last_logs = last_logs
  246. if not no_terminate:
  247. buildkite_group(":earth_africa: Terminating cluster")
  248. try:
  249. cluster_manager.terminate_cluster(wait=False)
  250. except Exception as e:
  251. logger.error(f"Could not terminate cluster: {e}")
  252. time_taken = time.monotonic() - start_time
  253. result.runtime = time_taken
  254. os.chdir(old_wd)
  255. if not pipeline_exception:
  256. buildkite_group(":mag: Interpreting results")
  257. # Only handle results if we didn't run into issues earlier
  258. try:
  259. handle_result(test, result)
  260. except Exception as e:
  261. pipeline_exception = e
  262. if pipeline_exception:
  263. buildkite_group(":rotating_light: Handling errors")
  264. exit_code, error_type, runtime = handle_exception(pipeline_exception)
  265. result.return_code = exit_code.value
  266. result.status = error_type
  267. if runtime is not None:
  268. result.runtime = runtime
  269. buildkite_group(":memo: Reporting results", open=True)
  270. reporters = reporters or []
  271. for reporter in reporters:
  272. try:
  273. reporter.report_result(test, result)
  274. except Exception as e:
  275. logger.error(f"Error reporting results via {type(reporter)}: {e}")
  276. if pipeline_exception:
  277. raise pipeline_exception
  278. return result