glue.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530
  1. import os
  2. import time
  3. import traceback
  4. from typing import Optional, List, Tuple
  5. from ray_release.alerts.handle import handle_result, require_result
  6. from ray_release.anyscale_util import get_cluster_name
  7. from ray_release.buildkite.output import buildkite_group, buildkite_open_last
  8. from ray_release.cluster_manager.cluster_manager import ClusterManager
  9. from ray_release.cluster_manager.full import FullClusterManager
  10. from ray_release.cluster_manager.minimal import MinimalClusterManager
  11. from ray_release.command_runner.job_runner import JobRunner
  12. from ray_release.command_runner.command_runner import CommandRunner
  13. from ray_release.command_runner.anyscale_job_runner import AnyscaleJobRunner
  14. from ray_release.test import Test
  15. from ray_release.config import (
  16. DEFAULT_BUILD_TIMEOUT,
  17. DEFAULT_CLUSTER_TIMEOUT,
  18. DEFAULT_COMMAND_TIMEOUT,
  19. DEFAULT_WAIT_FOR_NODES_TIMEOUT,
  20. RELEASE_PACKAGE_DIR,
  21. DEFAULT_AUTOSUSPEND_MINS,
  22. )
  23. from ray_release.template import load_test_cluster_compute
  24. from ray_release.exception import (
  25. ReleaseTestConfigError,
  26. ReleaseTestSetupError,
  27. CommandError,
  28. PrepareCommandError,
  29. CommandTimeout,
  30. PrepareCommandTimeout,
  31. TestCommandError,
  32. TestCommandTimeout,
  33. ClusterEnvCreateError,
  34. )
  35. from ray_release.file_manager.job_file_manager import JobFileManager
  36. from ray_release.logger import logger
  37. from ray_release.reporter.reporter import Reporter
  38. from ray_release.result import Result, ResultStatus, handle_exception
  39. from ray_release.signal_handling import (
  40. setup_signal_handling,
  41. reset_signal_handling,
  42. register_handler,
  43. )
  44. type_str_to_command_runner = {
  45. "job": JobRunner,
  46. "anyscale_job": AnyscaleJobRunner,
  47. }
  48. command_runner_to_cluster_manager = {
  49. JobRunner: FullClusterManager,
  50. AnyscaleJobRunner: MinimalClusterManager,
  51. }
  52. DEFAULT_RUN_TYPE = "anyscale_job"
  53. TIMEOUT_BUFFER_MINUTES = 15
  54. def _get_extra_tags_from_env() -> dict:
  55. env_vars = (
  56. "BUILDKITE_JOB_ID",
  57. "BUILDKITE_PULL_REQUEST",
  58. "BUILDKITE_PIPELINE_SLUG",
  59. "BUILDKITE_SOURCE",
  60. "RELEASE_FREQUENCY",
  61. )
  62. return {key.lower(): os.getenv(key, "") for key in env_vars}
  63. def _load_test_configuration(
  64. test: Test,
  65. anyscale_project: str,
  66. result: Result,
  67. smoke_test: bool = False,
  68. no_terminate: bool = False,
  69. test_definition_root: Optional[str] = None,
  70. ) -> Tuple[ClusterManager, CommandRunner, str]:
  71. logger.info(f"Test config: {test}")
  72. # Populate result paramaters
  73. result.stable = test.get("stable", True)
  74. result.smoke_test = smoke_test
  75. buildkite_url = os.getenv("BUILDKITE_BUILD_URL", "")
  76. buildkite_job_id = os.getenv("BUILDKITE_JOB_ID", "")
  77. if buildkite_url:
  78. buildkite_url += "#" + buildkite_job_id
  79. result.buildkite_url = buildkite_url
  80. result.buildkite_job_id = buildkite_job_id
  81. # Setting up working directory
  82. working_dir = test["working_dir"]
  83. new_wd = os.path.join(test_definition_root or RELEASE_PACKAGE_DIR, working_dir)
  84. os.chdir(new_wd)
  85. run_type = test["run"].get("type", DEFAULT_RUN_TYPE)
  86. # Workaround while Anyscale Jobs don't support leaving cluster alive
  87. # after the job has finished.
  88. # TODO: Remove once we have support in Anyscale
  89. if no_terminate and run_type == "anyscale_job":
  90. logger.warning(
  91. "anyscale_job run type does not support --no-terminate. "
  92. "Switching to job (Ray Job) run type."
  93. )
  94. run_type = "job"
  95. command_runner_cls = type_str_to_command_runner.get(run_type)
  96. if not command_runner_cls:
  97. raise ReleaseTestConfigError(
  98. f"Unknown command runner type: {run_type}. Must be one of "
  99. f"{list(type_str_to_command_runner.keys())}"
  100. )
  101. cluster_manager_cls = command_runner_to_cluster_manager[command_runner_cls]
  102. logger.info(f"Got command runner cls: {command_runner_cls}")
  103. # Extra tags to be set on resources on cloud provider's side
  104. extra_tags = _get_extra_tags_from_env()
  105. # We don't need other attributes as they can be derived from the name
  106. extra_tags["test_name"] = str(test["name"])
  107. extra_tags["test_smoke_test"] = str(result.smoke_test)
  108. result.extra_tags = extra_tags
  109. artifact_path = test["run"].get("artifact_path", None)
  110. # Instantiate managers and command runner
  111. try:
  112. cluster_manager = cluster_manager_cls(
  113. test,
  114. anyscale_project,
  115. smoke_test=smoke_test,
  116. )
  117. command_runner = command_runner_cls(
  118. cluster_manager,
  119. JobFileManager(cluster_manager=cluster_manager),
  120. working_dir,
  121. artifact_path=artifact_path,
  122. )
  123. except Exception as e:
  124. raise ReleaseTestSetupError(f"Error setting up release test: {e}") from e
  125. return cluster_manager, command_runner, artifact_path
  126. def _setup_cluster_environment(
  127. test: Test,
  128. result: Result,
  129. cluster_manager: ClusterManager,
  130. cluster_env_id: Optional[str],
  131. test_definition_root: Optional[str] = None,
  132. ) -> Tuple[str, int, int, int, int]:
  133. setup_signal_handling()
  134. # Load configs
  135. cluster_compute = load_test_cluster_compute(test, test_definition_root)
  136. if cluster_env_id:
  137. try:
  138. cluster_manager.cluster_env_id = cluster_env_id
  139. cluster_manager.build_cluster_env()
  140. cluster_manager.fetch_build_info()
  141. logger.info(
  142. "Using overridden cluster environment with ID "
  143. f"{cluster_env_id} and build ID "
  144. f"{cluster_manager.cluster_env_build_id}"
  145. )
  146. except Exception as e:
  147. raise ClusterEnvCreateError(
  148. f"Could not get existing overridden cluster environment "
  149. f"{cluster_env_id}: {e}"
  150. ) from e
  151. else:
  152. cluster_manager.set_cluster_env()
  153. # Load some timeouts
  154. build_timeout = int(test["run"].get("build_timeout", DEFAULT_BUILD_TIMEOUT))
  155. command_timeout = int(test["run"].get("timeout", DEFAULT_COMMAND_TIMEOUT))
  156. cluster_timeout = int(test["run"].get("session_timeout", DEFAULT_CLUSTER_TIMEOUT))
  157. # Get prepare command timeout, if any
  158. prepare_cmd = test["run"].get("prepare", None)
  159. if prepare_cmd:
  160. prepare_timeout = test["run"].get("prepare_timeout", command_timeout)
  161. else:
  162. prepare_timeout = 0
  163. # Base maximum uptime on the combined command and prepare timeouts
  164. command_and_prepare_timeout = command_timeout + prepare_timeout
  165. # Use default timeout = 0 here if wait_for_nodes is empty. This is to make
  166. # sure we don't inflate the maximum_uptime_minutes too much if we don't wait
  167. # for nodes at all.
  168. # The actual default will be otherwise loaded further down.
  169. wait_timeout = int(test["run"].get("wait_for_nodes", {}).get("timeout", 0))
  170. autosuspend_mins = test["cluster"].get("autosuspend_mins", None)
  171. if autosuspend_mins:
  172. cluster_manager.autosuspend_minutes = autosuspend_mins
  173. autosuspend_base = autosuspend_mins
  174. else:
  175. cluster_manager.autosuspend_minutes = min(
  176. DEFAULT_AUTOSUSPEND_MINS,
  177. int(command_and_prepare_timeout / 60) + TIMEOUT_BUFFER_MINUTES,
  178. )
  179. # Maximum uptime should be based on the command timeout, not the
  180. # DEFAULT_AUTOSUSPEND_MINS
  181. autosuspend_base = (
  182. int(command_and_prepare_timeout / 60) + TIMEOUT_BUFFER_MINUTES
  183. )
  184. maximum_uptime_minutes = test["cluster"].get("maximum_uptime_minutes", None)
  185. if maximum_uptime_minutes:
  186. cluster_manager.maximum_uptime_minutes = maximum_uptime_minutes
  187. else:
  188. cluster_manager.maximum_uptime_minutes = (
  189. autosuspend_base + wait_timeout + TIMEOUT_BUFFER_MINUTES
  190. )
  191. # Set cluster compute here. Note that this may use timeouts provided
  192. # above.
  193. cluster_manager.set_cluster_compute(
  194. cluster_compute,
  195. extra_tags=result.extra_tags,
  196. )
  197. return prepare_cmd, prepare_timeout, build_timeout, cluster_timeout, command_timeout
  198. def _local_environment_information(
  199. result: Result,
  200. cluster_manager: ClusterManager,
  201. command_runner: CommandRunner,
  202. build_timeout: int,
  203. cluster_timeout: int,
  204. no_terminate: bool,
  205. cluster_id: Optional[str],
  206. cluster_env_id: Optional[str],
  207. ) -> None:
  208. if isinstance(cluster_manager, FullClusterManager):
  209. if not no_terminate:
  210. register_handler(
  211. lambda sig, frame: cluster_manager.terminate_cluster(wait=True)
  212. )
  213. # Start cluster
  214. if cluster_id:
  215. buildkite_group(":rocket: Using existing cluster")
  216. # Re-use existing cluster ID for development
  217. cluster_manager.cluster_id = cluster_id
  218. cluster_manager.cluster_name = get_cluster_name(cluster_id)
  219. else:
  220. buildkite_group(":gear: Building cluster environment")
  221. if cluster_env_id:
  222. cluster_manager.cluster_env_id = cluster_env_id
  223. cluster_manager.build_configs(timeout=build_timeout)
  224. if isinstance(cluster_manager, FullClusterManager):
  225. buildkite_group(":rocket: Starting up cluster")
  226. cluster_manager.start_cluster(timeout=cluster_timeout)
  227. elif isinstance(command_runner, AnyscaleJobRunner):
  228. command_runner.job_manager.cluster_startup_timeout = cluster_timeout
  229. result.cluster_url = cluster_manager.get_cluster_url()
  230. result.cluster_id = cluster_manager.cluster_id
  231. def _prepare_remote_environment(
  232. test: Test,
  233. command_runner: CommandRunner,
  234. prepare_cmd: bool,
  235. prepare_timeout: int,
  236. ) -> None:
  237. command_runner.prepare_remote_env()
  238. wait_for_nodes = test["run"].get("wait_for_nodes", None)
  239. if wait_for_nodes:
  240. buildkite_group(":stopwatch: Waiting for nodes to come up")
  241. # Overwrite wait_timeout from above to account for better default
  242. wait_timeout = int(
  243. wait_for_nodes.get("timeout", DEFAULT_WAIT_FOR_NODES_TIMEOUT)
  244. )
  245. num_nodes = test["run"]["wait_for_nodes"]["num_nodes"]
  246. command_runner.wait_for_nodes(num_nodes, wait_timeout)
  247. if prepare_cmd:
  248. try:
  249. command_runner.run_prepare_command(prepare_cmd, timeout=prepare_timeout)
  250. except CommandError as e:
  251. raise PrepareCommandError(e)
  252. except CommandTimeout as e:
  253. raise PrepareCommandTimeout(e)
  254. def _running_test_script(
  255. test: Test,
  256. smoke_test: bool,
  257. command_runner: CommandRunner,
  258. command_timeout: int,
  259. ) -> None:
  260. command = test["run"]["script"]
  261. command_env = test.get_byod_runtime_env()
  262. if smoke_test:
  263. command = f"{command} --smoke-test"
  264. command_env["IS_SMOKE_TEST"] = "1"
  265. is_long_running = test["run"].get("long_running", False)
  266. try:
  267. command_runner.run_command(
  268. command,
  269. env=command_env,
  270. timeout=command_timeout,
  271. raise_on_timeout=not is_long_running,
  272. pip=test.get_byod_pips(),
  273. )
  274. except (
  275. TestCommandError,
  276. PrepareCommandError,
  277. TestCommandTimeout,
  278. PrepareCommandTimeout,
  279. ) as e:
  280. raise e
  281. except CommandError as e:
  282. raise TestCommandError(e)
  283. except CommandTimeout as e:
  284. if not is_long_running:
  285. # Only raise error if command is not long running
  286. raise TestCommandTimeout(e)
  287. def _fetching_results(
  288. result: Result,
  289. command_runner: CommandRunner,
  290. artifact_path: Optional[str],
  291. smoke_test: bool,
  292. start_time_unix: int,
  293. ) -> Tuple[dict, Exception]:
  294. fetch_result_exception = None
  295. try:
  296. command_results = command_runner.fetch_results()
  297. except Exception as e:
  298. logger.exception(f"Could not fetch results for test command: {e}")
  299. command_results = {}
  300. fetch_result_exception = e
  301. if artifact_path:
  302. try:
  303. command_runner.fetch_artifact()
  304. except Exception as e:
  305. logger.error("Could not fetch artifact for test command")
  306. logger.exception(e)
  307. # Postprocess result:
  308. if "last_update" in command_results:
  309. command_results["last_update_diff"] = time.time() - command_results.get(
  310. "last_update", 0.0
  311. )
  312. try:
  313. # Logic duplicated in ray_release/command_runner/_anyscale_job_wrapper.py
  314. # Timeout is the time the test took divided by 200
  315. # (~7 minutes for a 24h test) but no less than 30s
  316. # and no more than 900s
  317. metrics_timeout = max(30, min((time.time() - start_time_unix) / 200, 900))
  318. command_runner.save_metrics(start_time_unix, timeout=metrics_timeout)
  319. metrics = command_runner.fetch_metrics()
  320. except Exception as e:
  321. logger.exception(f"Could not fetch metrics for test command: {e}")
  322. metrics = {}
  323. if smoke_test:
  324. command_results["smoke_test"] = True
  325. result.results = command_results
  326. result.status = ResultStatus.SUCCESS.value
  327. return metrics, fetch_result_exception
  328. def run_release_test(
  329. test: Test,
  330. anyscale_project: str,
  331. result: Result,
  332. reporters: Optional[List[Reporter]] = None,
  333. smoke_test: bool = False,
  334. cluster_id: Optional[str] = None,
  335. cluster_env_id: Optional[str] = None,
  336. no_terminate: bool = False,
  337. test_definition_root: Optional[str] = None,
  338. ) -> Result:
  339. old_wd = os.getcwd()
  340. start_time = time.monotonic()
  341. command_runner = None
  342. cluster_manager = None
  343. pipeline_exception = None
  344. # non critical for some tests. So separate it from the general one.
  345. fetch_result_exception = None
  346. try:
  347. buildkite_group(":spiral_note_pad: Loading test configuration")
  348. cluster_manager, command_runner, artifact_path = _load_test_configuration(
  349. test,
  350. anyscale_project,
  351. result,
  352. smoke_test,
  353. no_terminate,
  354. test_definition_root,
  355. )
  356. buildkite_group(":nut_and_bolt: Setting up cluster environment")
  357. (
  358. prepare_cmd,
  359. prepare_timeout,
  360. build_timeout,
  361. cluster_timeout,
  362. command_timeout,
  363. ) = _setup_cluster_environment(
  364. test,
  365. result,
  366. cluster_manager,
  367. cluster_env_id,
  368. test_definition_root,
  369. )
  370. buildkite_group(":bulb: Local environment information")
  371. _local_environment_information(
  372. result,
  373. cluster_manager,
  374. command_runner,
  375. build_timeout,
  376. cluster_timeout,
  377. no_terminate,
  378. cluster_id,
  379. cluster_env_id,
  380. )
  381. # Upload files
  382. buildkite_group(":wrench: Preparing remote environment")
  383. _prepare_remote_environment(
  384. test,
  385. command_runner,
  386. prepare_cmd,
  387. prepare_timeout,
  388. )
  389. buildkite_group(":runner: Running test script")
  390. start_time_unix = time.time()
  391. _running_test_script(
  392. test,
  393. smoke_test,
  394. command_runner,
  395. command_timeout,
  396. )
  397. buildkite_group(":floppy_disk: Fetching results")
  398. metrics, fetch_result_exception = _fetching_results(
  399. result,
  400. command_runner,
  401. artifact_path,
  402. smoke_test,
  403. start_time_unix,
  404. )
  405. except Exception as e:
  406. logger.exception(e)
  407. buildkite_open_last()
  408. pipeline_exception = e
  409. metrics = {}
  410. # Obtain the cluster URL again as it is set after the
  411. # command was run in case of anyscale jobs
  412. if isinstance(command_runner, AnyscaleJobRunner):
  413. result.cluster_url = cluster_manager.get_cluster_url()
  414. result.cluster_id = cluster_manager.cluster_id
  415. result.job_url = command_runner.job_manager.job_url
  416. result.job_id = command_runner.job_manager.job_id
  417. result.last_logs = command_runner.get_last_logs() if command_runner else None
  418. if not no_terminate and cluster_manager:
  419. buildkite_group(":earth_africa: Terminating cluster")
  420. cluster_manager.terminate_cluster(wait=False)
  421. if hasattr(command_runner, "cleanup"):
  422. command_runner.cleanup()
  423. reset_signal_handling()
  424. time_taken = time.monotonic() - start_time
  425. result.runtime = time_taken
  426. result.prometheus_metrics = metrics
  427. os.chdir(old_wd)
  428. if not pipeline_exception:
  429. if require_result(test) and fetch_result_exception:
  430. pipeline_exception = fetch_result_exception
  431. else:
  432. buildkite_group(":mag: Interpreting results")
  433. # Only handle results if we didn't run into issues earlier
  434. try:
  435. handle_result(test, result)
  436. except Exception as e:
  437. pipeline_exception = e
  438. if pipeline_exception:
  439. buildkite_group(":rotating_light: Handling errors")
  440. exit_code, result_status, runtime = handle_exception(
  441. pipeline_exception,
  442. result.runtime,
  443. )
  444. result.return_code = exit_code.value
  445. result.status = result_status.value
  446. if runtime is not None:
  447. result.runtime = runtime
  448. try:
  449. raise pipeline_exception
  450. except Exception:
  451. if not result.last_logs:
  452. result.last_logs = traceback.format_exc()
  453. buildkite_group(":memo: Reporting results", open=True)
  454. for reporter in reporters or []:
  455. reporter.report_result(test, result)
  456. if pipeline_exception:
  457. raise pipeline_exception
  458. return result