glue.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534
  1. import os
  2. import time
  3. import traceback
  4. from typing import Optional, List, Tuple
  5. from ray_release.alerts.handle import handle_result, require_result
  6. from ray_release.anyscale_util import get_cluster_name, LAST_LOGS_LENGTH
  7. from ray_release.buildkite.output import buildkite_group, buildkite_open_last
  8. from ray_release.cluster_manager.cluster_manager import ClusterManager
  9. from ray_release.cluster_manager.full import FullClusterManager
  10. from ray_release.cluster_manager.minimal import MinimalClusterManager
  11. from ray_release.command_runner.job_runner import JobRunner
  12. from ray_release.command_runner.command_runner import CommandRunner
  13. from ray_release.command_runner.anyscale_job_runner import AnyscaleJobRunner
  14. from ray_release.test import Test
  15. from ray_release.config import (
  16. DEFAULT_BUILD_TIMEOUT,
  17. DEFAULT_CLUSTER_TIMEOUT,
  18. DEFAULT_COMMAND_TIMEOUT,
  19. DEFAULT_WAIT_FOR_NODES_TIMEOUT,
  20. RELEASE_PACKAGE_DIR,
  21. DEFAULT_AUTOSUSPEND_MINS,
  22. )
  23. from ray_release.template import load_test_cluster_compute
  24. from ray_release.exception import (
  25. ReleaseTestConfigError,
  26. ReleaseTestSetupError,
  27. CommandError,
  28. PrepareCommandError,
  29. CommandTimeout,
  30. PrepareCommandTimeout,
  31. TestCommandError,
  32. TestCommandTimeout,
  33. ClusterEnvCreateError,
  34. )
  35. from ray_release.file_manager.job_file_manager import JobFileManager
  36. from ray_release.logger import logger
  37. from ray_release.reporter.reporter import Reporter
  38. from ray_release.result import Result, ResultStatus, handle_exception
  39. from ray_release.signal_handling import (
  40. setup_signal_handling,
  41. reset_signal_handling,
  42. register_handler,
  43. )
  44. type_str_to_command_runner = {
  45. "job": JobRunner,
  46. "anyscale_job": AnyscaleJobRunner,
  47. }
  48. command_runner_to_cluster_manager = {
  49. JobRunner: FullClusterManager,
  50. AnyscaleJobRunner: MinimalClusterManager,
  51. }
  52. DEFAULT_RUN_TYPE = "anyscale_job"
  53. TIMEOUT_BUFFER_MINUTES = 15
  54. def _get_extra_tags_from_env() -> dict:
  55. env_vars = (
  56. "BUILDKITE_JOB_ID",
  57. "BUILDKITE_PULL_REQUEST",
  58. "BUILDKITE_PIPELINE_SLUG",
  59. "BUILDKITE_SOURCE",
  60. "RELEASE_FREQUENCY",
  61. )
  62. return {key.lower(): os.getenv(key, "") for key in env_vars}
  63. def _load_test_configuration(
  64. test: Test,
  65. anyscale_project: str,
  66. result: Result,
  67. smoke_test: bool = False,
  68. no_terminate: bool = False,
  69. test_definition_root: Optional[str] = None,
  70. log_streaming_limit: int = LAST_LOGS_LENGTH,
  71. ) -> Tuple[ClusterManager, CommandRunner, str]:
  72. logger.info(f"Test config: {test}")
  73. # Populate result paramaters
  74. result.stable = test.get("stable", True)
  75. result.smoke_test = smoke_test
  76. buildkite_url = os.getenv("BUILDKITE_BUILD_URL", "")
  77. buildkite_job_id = os.getenv("BUILDKITE_JOB_ID", "")
  78. if buildkite_url:
  79. buildkite_url += "#" + buildkite_job_id
  80. result.buildkite_url = buildkite_url
  81. result.buildkite_job_id = buildkite_job_id
  82. # Setting up working directory
  83. working_dir = test["working_dir"]
  84. new_wd = os.path.join(test_definition_root or RELEASE_PACKAGE_DIR, working_dir)
  85. os.chdir(new_wd)
  86. run_type = test["run"].get("type", DEFAULT_RUN_TYPE)
  87. # Workaround while Anyscale Jobs don't support leaving cluster alive
  88. # after the job has finished.
  89. # TODO: Remove once we have support in Anyscale
  90. if no_terminate and run_type == "anyscale_job":
  91. logger.warning(
  92. "anyscale_job run type does not support --no-terminate. "
  93. "Switching to job (Ray Job) run type."
  94. )
  95. run_type = "job"
  96. command_runner_cls = type_str_to_command_runner.get(run_type)
  97. if not command_runner_cls:
  98. raise ReleaseTestConfigError(
  99. f"Unknown command runner type: {run_type}. Must be one of "
  100. f"{list(type_str_to_command_runner.keys())}"
  101. )
  102. cluster_manager_cls = command_runner_to_cluster_manager[command_runner_cls]
  103. logger.info(f"Got command runner cls: {command_runner_cls}")
  104. # Extra tags to be set on resources on cloud provider's side
  105. extra_tags = _get_extra_tags_from_env()
  106. # We don't need other attributes as they can be derived from the name
  107. extra_tags["test_name"] = str(test["name"])
  108. extra_tags["test_smoke_test"] = str(result.smoke_test)
  109. result.extra_tags = extra_tags
  110. artifact_path = test["run"].get("artifact_path", None)
  111. # Instantiate managers and command runner
  112. try:
  113. cluster_manager = cluster_manager_cls(
  114. test,
  115. anyscale_project,
  116. smoke_test=smoke_test,
  117. log_streaming_limit=log_streaming_limit,
  118. )
  119. command_runner = command_runner_cls(
  120. cluster_manager,
  121. JobFileManager(cluster_manager=cluster_manager),
  122. working_dir,
  123. artifact_path=artifact_path,
  124. )
  125. except Exception as e:
  126. raise ReleaseTestSetupError(f"Error setting up release test: {e}") from e
  127. return cluster_manager, command_runner, artifact_path
  128. def _setup_cluster_environment(
  129. test: Test,
  130. result: Result,
  131. cluster_manager: ClusterManager,
  132. cluster_env_id: Optional[str],
  133. test_definition_root: Optional[str] = None,
  134. ) -> Tuple[str, int, int, int, int]:
  135. setup_signal_handling()
  136. # Load configs
  137. cluster_compute = load_test_cluster_compute(test, test_definition_root)
  138. if cluster_env_id:
  139. try:
  140. cluster_manager.cluster_env_id = cluster_env_id
  141. cluster_manager.build_cluster_env()
  142. cluster_manager.fetch_build_info()
  143. logger.info(
  144. "Using overridden cluster environment with ID "
  145. f"{cluster_env_id} and build ID "
  146. f"{cluster_manager.cluster_env_build_id}"
  147. )
  148. except Exception as e:
  149. raise ClusterEnvCreateError(
  150. f"Could not get existing overridden cluster environment "
  151. f"{cluster_env_id}: {e}"
  152. ) from e
  153. else:
  154. cluster_manager.set_cluster_env()
  155. # Load some timeouts
  156. build_timeout = int(test["run"].get("build_timeout", DEFAULT_BUILD_TIMEOUT))
  157. command_timeout = int(test["run"].get("timeout", DEFAULT_COMMAND_TIMEOUT))
  158. cluster_timeout = int(test["run"].get("session_timeout", DEFAULT_CLUSTER_TIMEOUT))
  159. # Get prepare command timeout, if any
  160. prepare_cmd = test["run"].get("prepare", None)
  161. if prepare_cmd:
  162. prepare_timeout = test["run"].get("prepare_timeout", command_timeout)
  163. else:
  164. prepare_timeout = 0
  165. # Base maximum uptime on the combined command and prepare timeouts
  166. command_and_prepare_timeout = command_timeout + prepare_timeout
  167. # Use default timeout = 0 here if wait_for_nodes is empty. This is to make
  168. # sure we don't inflate the maximum_uptime_minutes too much if we don't wait
  169. # for nodes at all.
  170. # The actual default will be otherwise loaded further down.
  171. wait_timeout = int(test["run"].get("wait_for_nodes", {}).get("timeout", 0))
  172. autosuspend_mins = test["cluster"].get("autosuspend_mins", None)
  173. if autosuspend_mins:
  174. cluster_manager.autosuspend_minutes = autosuspend_mins
  175. autosuspend_base = autosuspend_mins
  176. else:
  177. cluster_manager.autosuspend_minutes = min(
  178. DEFAULT_AUTOSUSPEND_MINS,
  179. int(command_and_prepare_timeout / 60) + TIMEOUT_BUFFER_MINUTES,
  180. )
  181. # Maximum uptime should be based on the command timeout, not the
  182. # DEFAULT_AUTOSUSPEND_MINS
  183. autosuspend_base = (
  184. int(command_and_prepare_timeout / 60) + TIMEOUT_BUFFER_MINUTES
  185. )
  186. maximum_uptime_minutes = test["cluster"].get("maximum_uptime_minutes", None)
  187. if maximum_uptime_minutes:
  188. cluster_manager.maximum_uptime_minutes = maximum_uptime_minutes
  189. else:
  190. cluster_manager.maximum_uptime_minutes = (
  191. autosuspend_base + wait_timeout + TIMEOUT_BUFFER_MINUTES
  192. )
  193. # Set cluster compute here. Note that this may use timeouts provided
  194. # above.
  195. cluster_manager.set_cluster_compute(
  196. cluster_compute,
  197. extra_tags=result.extra_tags,
  198. )
  199. return prepare_cmd, prepare_timeout, build_timeout, cluster_timeout, command_timeout
  200. def _local_environment_information(
  201. result: Result,
  202. cluster_manager: ClusterManager,
  203. command_runner: CommandRunner,
  204. build_timeout: int,
  205. cluster_timeout: int,
  206. no_terminate: bool,
  207. cluster_id: Optional[str],
  208. cluster_env_id: Optional[str],
  209. ) -> None:
  210. if isinstance(cluster_manager, FullClusterManager):
  211. if not no_terminate:
  212. register_handler(
  213. lambda sig, frame: cluster_manager.terminate_cluster(wait=True)
  214. )
  215. # Start cluster
  216. if cluster_id:
  217. buildkite_group(":rocket: Using existing cluster")
  218. # Re-use existing cluster ID for development
  219. cluster_manager.cluster_id = cluster_id
  220. cluster_manager.cluster_name = get_cluster_name(cluster_id)
  221. else:
  222. buildkite_group(":gear: Building cluster environment")
  223. if cluster_env_id:
  224. cluster_manager.cluster_env_id = cluster_env_id
  225. cluster_manager.build_configs(timeout=build_timeout)
  226. if isinstance(cluster_manager, FullClusterManager):
  227. buildkite_group(":rocket: Starting up cluster")
  228. cluster_manager.start_cluster(timeout=cluster_timeout)
  229. elif isinstance(command_runner, AnyscaleJobRunner):
  230. command_runner.job_manager.cluster_startup_timeout = cluster_timeout
  231. result.cluster_url = cluster_manager.get_cluster_url()
  232. result.cluster_id = cluster_manager.cluster_id
  233. def _prepare_remote_environment(
  234. test: Test,
  235. command_runner: CommandRunner,
  236. prepare_cmd: bool,
  237. prepare_timeout: int,
  238. ) -> None:
  239. command_runner.prepare_remote_env()
  240. wait_for_nodes = test["run"].get("wait_for_nodes", None)
  241. if wait_for_nodes:
  242. buildkite_group(":stopwatch: Waiting for nodes to come up")
  243. # Overwrite wait_timeout from above to account for better default
  244. wait_timeout = int(
  245. wait_for_nodes.get("timeout", DEFAULT_WAIT_FOR_NODES_TIMEOUT)
  246. )
  247. num_nodes = test["run"]["wait_for_nodes"]["num_nodes"]
  248. command_runner.wait_for_nodes(num_nodes, wait_timeout)
  249. if prepare_cmd:
  250. try:
  251. command_runner.run_prepare_command(prepare_cmd, timeout=prepare_timeout)
  252. except CommandError as e:
  253. raise PrepareCommandError(e)
  254. except CommandTimeout as e:
  255. raise PrepareCommandTimeout(e)
  256. def _running_test_script(
  257. test: Test,
  258. smoke_test: bool,
  259. command_runner: CommandRunner,
  260. command_timeout: int,
  261. ) -> None:
  262. command = test["run"]["script"]
  263. command_env = test.get_byod_runtime_env()
  264. if smoke_test:
  265. command = f"{command} --smoke-test"
  266. command_env["IS_SMOKE_TEST"] = "1"
  267. is_long_running = test["run"].get("long_running", False)
  268. try:
  269. command_runner.run_command(
  270. command,
  271. env=command_env,
  272. timeout=command_timeout,
  273. raise_on_timeout=not is_long_running,
  274. pip=test.get_byod_pips(),
  275. )
  276. except (
  277. TestCommandError,
  278. PrepareCommandError,
  279. TestCommandTimeout,
  280. PrepareCommandTimeout,
  281. ) as e:
  282. raise e
  283. except CommandError as e:
  284. raise TestCommandError(e)
  285. except CommandTimeout as e:
  286. if not is_long_running:
  287. # Only raise error if command is not long running
  288. raise TestCommandTimeout(e)
  289. def _fetching_results(
  290. result: Result,
  291. command_runner: CommandRunner,
  292. artifact_path: Optional[str],
  293. smoke_test: bool,
  294. start_time_unix: int,
  295. ) -> Tuple[dict, Exception]:
  296. fetch_result_exception = None
  297. try:
  298. command_results = command_runner.fetch_results()
  299. except Exception as e:
  300. logger.exception(f"Could not fetch results for test command: {e}")
  301. command_results = {}
  302. fetch_result_exception = e
  303. if artifact_path:
  304. try:
  305. command_runner.fetch_artifact()
  306. except Exception as e:
  307. logger.error("Could not fetch artifact for test command")
  308. logger.exception(e)
  309. # Postprocess result:
  310. if "last_update" in command_results:
  311. command_results["last_update_diff"] = time.time() - command_results.get(
  312. "last_update", 0.0
  313. )
  314. try:
  315. # Logic duplicated in ray_release/command_runner/_anyscale_job_wrapper.py
  316. # Timeout is the time the test took divided by 200
  317. # (~7 minutes for a 24h test) but no less than 30s
  318. # and no more than 900s
  319. metrics_timeout = max(30, min((time.time() - start_time_unix) / 200, 900))
  320. command_runner.save_metrics(start_time_unix, timeout=metrics_timeout)
  321. metrics = command_runner.fetch_metrics()
  322. except Exception as e:
  323. logger.exception(f"Could not fetch metrics for test command: {e}")
  324. metrics = {}
  325. if smoke_test:
  326. command_results["smoke_test"] = True
  327. result.results = command_results
  328. result.status = ResultStatus.SUCCESS.value
  329. return metrics, fetch_result_exception
  330. def run_release_test(
  331. test: Test,
  332. anyscale_project: str,
  333. result: Result,
  334. reporters: Optional[List[Reporter]] = None,
  335. smoke_test: bool = False,
  336. cluster_id: Optional[str] = None,
  337. cluster_env_id: Optional[str] = None,
  338. no_terminate: bool = False,
  339. test_definition_root: Optional[str] = None,
  340. log_streaming_limit: int = LAST_LOGS_LENGTH,
  341. ) -> Result:
  342. old_wd = os.getcwd()
  343. start_time = time.monotonic()
  344. command_runner = None
  345. cluster_manager = None
  346. pipeline_exception = None
  347. # non critical for some tests. So separate it from the general one.
  348. fetch_result_exception = None
  349. try:
  350. buildkite_group(":spiral_note_pad: Loading test configuration")
  351. cluster_manager, command_runner, artifact_path = _load_test_configuration(
  352. test,
  353. anyscale_project,
  354. result,
  355. smoke_test,
  356. no_terminate,
  357. test_definition_root,
  358. log_streaming_limit,
  359. )
  360. buildkite_group(":nut_and_bolt: Setting up cluster environment")
  361. (
  362. prepare_cmd,
  363. prepare_timeout,
  364. build_timeout,
  365. cluster_timeout,
  366. command_timeout,
  367. ) = _setup_cluster_environment(
  368. test,
  369. result,
  370. cluster_manager,
  371. cluster_env_id,
  372. test_definition_root,
  373. )
  374. buildkite_group(":bulb: Local environment information")
  375. _local_environment_information(
  376. result,
  377. cluster_manager,
  378. command_runner,
  379. build_timeout,
  380. cluster_timeout,
  381. no_terminate,
  382. cluster_id,
  383. cluster_env_id,
  384. )
  385. # Upload files
  386. buildkite_group(":wrench: Preparing remote environment")
  387. _prepare_remote_environment(
  388. test,
  389. command_runner,
  390. prepare_cmd,
  391. prepare_timeout,
  392. )
  393. buildkite_group(":runner: Running test script")
  394. start_time_unix = time.time()
  395. _running_test_script(
  396. test,
  397. smoke_test,
  398. command_runner,
  399. command_timeout,
  400. )
  401. buildkite_group(":floppy_disk: Fetching results")
  402. metrics, fetch_result_exception = _fetching_results(
  403. result,
  404. command_runner,
  405. artifact_path,
  406. smoke_test,
  407. start_time_unix,
  408. )
  409. except Exception as e:
  410. logger.exception(e)
  411. buildkite_open_last()
  412. pipeline_exception = e
  413. metrics = {}
  414. # Obtain the cluster URL again as it is set after the
  415. # command was run in case of anyscale jobs
  416. if isinstance(command_runner, AnyscaleJobRunner):
  417. result.cluster_url = cluster_manager.get_cluster_url()
  418. result.cluster_id = cluster_manager.cluster_id
  419. result.job_url = command_runner.job_manager.job_url
  420. result.job_id = command_runner.job_manager.job_id
  421. result.last_logs = command_runner.get_last_logs() if command_runner else None
  422. if not no_terminate and cluster_manager:
  423. buildkite_group(":earth_africa: Terminating cluster")
  424. cluster_manager.terminate_cluster(wait=False)
  425. if hasattr(command_runner, "cleanup"):
  426. command_runner.cleanup()
  427. reset_signal_handling()
  428. time_taken = time.monotonic() - start_time
  429. result.runtime = time_taken
  430. result.prometheus_metrics = metrics
  431. os.chdir(old_wd)
  432. if not pipeline_exception:
  433. if require_result(test) and fetch_result_exception:
  434. pipeline_exception = fetch_result_exception
  435. else:
  436. buildkite_group(":mag: Interpreting results")
  437. # Only handle results if we didn't run into issues earlier
  438. try:
  439. handle_result(test, result)
  440. except Exception as e:
  441. pipeline_exception = e
  442. if pipeline_exception:
  443. buildkite_group(":rotating_light: Handling errors")
  444. exit_code, result_status, runtime = handle_exception(
  445. pipeline_exception,
  446. result.runtime,
  447. )
  448. result.return_code = exit_code.value
  449. result.status = result_status.value
  450. if runtime is not None:
  451. result.runtime = runtime
  452. try:
  453. raise pipeline_exception
  454. except Exception:
  455. if not result.last_logs:
  456. result.last_logs = traceback.format_exc()
  457. buildkite_group(":memo: Reporting results", open=True)
  458. for reporter in reporters or []:
  459. reporter.report_result(test, result)
  460. if pipeline_exception:
  461. raise pipeline_exception
  462. return result