glue.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567
  1. import os
  2. import time
  3. import traceback
  4. from typing import Optional, List, Tuple
  5. from ray_release.alerts.handle import handle_result, require_result
  6. from ray_release.anyscale_util import get_cluster_name
  7. from ray_release.buildkite.output import buildkite_group, buildkite_open_last
  8. from ray_release.cluster_manager.cluster_manager import ClusterManager
  9. from ray_release.cluster_manager.full import FullClusterManager
  10. from ray_release.cluster_manager.minimal import MinimalClusterManager
  11. from ray_release.command_runner.job_runner import JobRunner
  12. from ray_release.command_runner.command_runner import CommandRunner
  13. from ray_release.command_runner.anyscale_job_runner import AnyscaleJobRunner
  14. from ray_release.config import (
  15. Test,
  16. DEFAULT_BUILD_TIMEOUT,
  17. DEFAULT_CLUSTER_TIMEOUT,
  18. DEFAULT_COMMAND_TIMEOUT,
  19. DEFAULT_WAIT_FOR_NODES_TIMEOUT,
  20. RELEASE_PACKAGE_DIR,
  21. DEFAULT_AUTOSUSPEND_MINS,
  22. validate_test,
  23. )
  24. from ray_release.template import load_test_cluster_env, load_test_cluster_compute
  25. from ray_release.exception import (
  26. ReleaseTestConfigError,
  27. ReleaseTestSetupError,
  28. CommandError,
  29. PrepareCommandError,
  30. CommandTimeout,
  31. PrepareCommandTimeout,
  32. TestCommandError,
  33. TestCommandTimeout,
  34. LocalEnvSetupError,
  35. ClusterEnvCreateError,
  36. )
  37. from ray_release.file_manager.job_file_manager import JobFileManager
  38. from ray_release.logger import logger
  39. from ray_release.reporter.reporter import Reporter
  40. from ray_release.result import Result, handle_exception
  41. from ray_release.signal_handling import (
  42. setup_signal_handling,
  43. reset_signal_handling,
  44. register_handler,
  45. )
  46. from ray_release.util import (
  47. run_bash_script,
  48. get_pip_packages,
  49. reinstall_anyscale_dependencies,
  50. )
  51. type_str_to_command_runner = {
  52. "job": JobRunner,
  53. "anyscale_job": AnyscaleJobRunner,
  54. }
  55. command_runner_to_cluster_manager = {
  56. JobRunner: FullClusterManager,
  57. AnyscaleJobRunner: MinimalClusterManager,
  58. }
  59. DEFAULT_RUN_TYPE = "anyscale_job"
  60. TIMEOUT_BUFFER_MINUTES = 15
  61. def _get_extra_tags_from_env() -> dict:
  62. env_vars = (
  63. "BUILDKITE_JOB_ID",
  64. "BUILDKITE_PULL_REQUEST",
  65. "BUILDKITE_PIPELINE_SLUG",
  66. "BUILDKITE_SOURCE",
  67. "RELEASE_FREQUENCY",
  68. )
  69. return {key.lower(): os.getenv(key, "") for key in env_vars}
  70. def _load_test_configuration(
  71. test: Test,
  72. anyscale_project: str,
  73. result: Result,
  74. ray_wheels_url: str,
  75. smoke_test: bool = False,
  76. no_terminate: bool = False,
  77. ) -> Tuple[ClusterManager, CommandRunner, str]:
  78. validate_test(test)
  79. logger.info(f"Test config: {test}")
  80. # Populate result paramaters
  81. result.wheels_url = ray_wheels_url
  82. result.stable = test.get("stable", True)
  83. result.smoke_test = smoke_test
  84. buildkite_url = os.getenv("BUILDKITE_BUILD_URL", "")
  85. buildkite_job_id = os.getenv("BUILDKITE_JOB_ID", "")
  86. if buildkite_url:
  87. buildkite_url += "#" + buildkite_job_id
  88. result.buildkite_url = buildkite_url
  89. result.buildkite_job_id = buildkite_job_id
  90. # Setting up working directory
  91. working_dir = test["working_dir"]
  92. new_wd = os.path.join(RELEASE_PACKAGE_DIR, working_dir)
  93. os.chdir(new_wd)
  94. run_type = test["run"].get("type", DEFAULT_RUN_TYPE)
  95. # Workaround while Anyscale Jobs don't support leaving cluster alive
  96. # after the job has finished.
  97. # TODO: Remove once we have support in Anyscale
  98. if no_terminate and run_type == "anyscale_job":
  99. logger.warning(
  100. "anyscale_job run type does not support --no-terminate. "
  101. "Switching to job (Ray Job) run type."
  102. )
  103. run_type = "job"
  104. command_runner_cls = type_str_to_command_runner.get(run_type)
  105. if not command_runner_cls:
  106. raise ReleaseTestConfigError(
  107. f"Unknown command runner type: {run_type}. Must be one of "
  108. f"{list(type_str_to_command_runner.keys())}"
  109. )
  110. cluster_manager_cls = command_runner_to_cluster_manager[command_runner_cls]
  111. logger.info(f"Got command runner cls: {command_runner_cls}")
  112. # Extra tags to be set on resources on cloud provider's side
  113. extra_tags = _get_extra_tags_from_env()
  114. # We don't need other attributes as they can be derived from the name
  115. extra_tags["test_name"] = str(test["name"])
  116. extra_tags["test_smoke_test"] = str(result.smoke_test)
  117. result.extra_tags = extra_tags
  118. artifact_path = test["run"].get("artifact_path", None)
  119. # Instantiate managers and command runner
  120. try:
  121. cluster_manager = cluster_manager_cls(
  122. test["name"],
  123. anyscale_project,
  124. smoke_test=smoke_test,
  125. )
  126. command_runner = command_runner_cls(
  127. cluster_manager,
  128. JobFileManager(cluster_manager=cluster_manager),
  129. working_dir,
  130. artifact_path=artifact_path,
  131. )
  132. except Exception as e:
  133. raise ReleaseTestSetupError(f"Error setting up release test: {e}") from e
  134. return cluster_manager, command_runner, artifact_path
  135. def _setup_cluster_environment(
  136. test: Test,
  137. result: Result,
  138. cluster_manager: ClusterManager,
  139. ray_wheels_url: str,
  140. cluster_env_id: Optional[str],
  141. ) -> Tuple[str, int, int, int, int]:
  142. setup_signal_handling()
  143. # Load configs
  144. cluster_env = load_test_cluster_env(test, ray_wheels_url=ray_wheels_url)
  145. cluster_compute = load_test_cluster_compute(test)
  146. if cluster_env_id:
  147. try:
  148. cluster_manager.cluster_env_id = cluster_env_id
  149. cluster_manager.build_cluster_env()
  150. cluster_manager.fetch_build_info()
  151. logger.info(
  152. "Using overridden cluster environment with ID "
  153. f"{cluster_env_id} and build ID "
  154. f"{cluster_manager.cluster_env_build_id}"
  155. )
  156. except Exception as e:
  157. raise ClusterEnvCreateError(
  158. f"Could not get existing overridden cluster environment "
  159. f"{cluster_env_id}: {e}"
  160. ) from e
  161. else:
  162. cluster_manager.set_cluster_env(cluster_env)
  163. # Load some timeouts
  164. build_timeout = int(test["run"].get("build_timeout", DEFAULT_BUILD_TIMEOUT))
  165. command_timeout = int(test["run"].get("timeout", DEFAULT_COMMAND_TIMEOUT))
  166. cluster_timeout = int(test["run"].get("session_timeout", DEFAULT_CLUSTER_TIMEOUT))
  167. # Get prepare command timeout, if any
  168. prepare_cmd = test["run"].get("prepare", None)
  169. if prepare_cmd:
  170. prepare_timeout = test["run"].get("prepare_timeout", command_timeout)
  171. else:
  172. prepare_timeout = 0
  173. # Base maximum uptime on the combined command and prepare timeouts
  174. command_and_prepare_timeout = command_timeout + prepare_timeout
  175. # Use default timeout = 0 here if wait_for_nodes is empty. This is to make
  176. # sure we don't inflate the maximum_uptime_minutes too much if we don't wait
  177. # for nodes at all.
  178. # The actual default will be otherwise loaded further down.
  179. wait_timeout = int(test["run"].get("wait_for_nodes", {}).get("timeout", 0))
  180. autosuspend_mins = test["cluster"].get("autosuspend_mins", None)
  181. if autosuspend_mins:
  182. cluster_manager.autosuspend_minutes = autosuspend_mins
  183. autosuspend_base = autosuspend_mins
  184. else:
  185. cluster_manager.autosuspend_minutes = min(
  186. DEFAULT_AUTOSUSPEND_MINS,
  187. int(command_and_prepare_timeout / 60) + TIMEOUT_BUFFER_MINUTES,
  188. )
  189. # Maximum uptime should be based on the command timeout, not the
  190. # DEFAULT_AUTOSUSPEND_MINS
  191. autosuspend_base = (
  192. int(command_and_prepare_timeout / 60) + TIMEOUT_BUFFER_MINUTES
  193. )
  194. maximum_uptime_minutes = test["cluster"].get("maximum_uptime_minutes", None)
  195. if maximum_uptime_minutes:
  196. cluster_manager.maximum_uptime_minutes = maximum_uptime_minutes
  197. else:
  198. cluster_manager.maximum_uptime_minutes = (
  199. autosuspend_base + wait_timeout + TIMEOUT_BUFFER_MINUTES
  200. )
  201. # Set cluster compute here. Note that this may use timeouts provided
  202. # above.
  203. cluster_manager.set_cluster_compute(
  204. cluster_compute,
  205. extra_tags=result.extra_tags,
  206. )
  207. return prepare_cmd, prepare_timeout, build_timeout, cluster_timeout, command_timeout
  208. def _setup_local_environment(
  209. test: Test,
  210. command_runner: CommandRunner,
  211. ray_wheels_url: str,
  212. ) -> None:
  213. driver_setup_script = test.get("driver_setup", None)
  214. if driver_setup_script:
  215. try:
  216. run_bash_script(driver_setup_script)
  217. except Exception as e:
  218. raise LocalEnvSetupError(f"Driver setup script failed: {e}") from e
  219. # Install local dependencies
  220. command_runner.prepare_local_env(ray_wheels_url)
  221. # Re-install anyscale package as local dependencies might have changed
  222. # from local env setup
  223. reinstall_anyscale_dependencies()
  224. def _local_environment_information(
  225. result: Result,
  226. cluster_manager: ClusterManager,
  227. command_runner: CommandRunner,
  228. build_timeout: int,
  229. cluster_timeout: int,
  230. no_terminate: bool,
  231. cluster_id: Optional[str],
  232. cluster_env_id: Optional[str],
  233. ) -> None:
  234. pip_packages = get_pip_packages()
  235. pip_package_string = "\n".join(pip_packages)
  236. logger.info(f"Installed python packages:\n{pip_package_string}")
  237. if isinstance(cluster_manager, FullClusterManager):
  238. if not no_terminate:
  239. register_handler(
  240. lambda sig, frame: cluster_manager.terminate_cluster(wait=True)
  241. )
  242. # Start cluster
  243. if cluster_id:
  244. buildkite_group(":rocket: Using existing cluster")
  245. # Re-use existing cluster ID for development
  246. cluster_manager.cluster_id = cluster_id
  247. cluster_manager.cluster_name = get_cluster_name(cluster_id)
  248. else:
  249. buildkite_group(":gear: Building cluster environment")
  250. if cluster_env_id:
  251. cluster_manager.cluster_env_id = cluster_env_id
  252. cluster_manager.build_configs(timeout=build_timeout)
  253. if isinstance(cluster_manager, FullClusterManager):
  254. buildkite_group(":rocket: Starting up cluster")
  255. cluster_manager.start_cluster(timeout=cluster_timeout)
  256. elif isinstance(command_runner, AnyscaleJobRunner):
  257. command_runner.job_manager.cluster_startup_timeout = cluster_timeout
  258. result.cluster_url = cluster_manager.get_cluster_url()
  259. result.cluster_id = cluster_manager.cluster_id
  260. def _prepare_remote_environment(
  261. test: Test,
  262. command_runner: CommandRunner,
  263. prepare_cmd: bool,
  264. prepare_timeout: int,
  265. ) -> None:
  266. command_runner.prepare_remote_env()
  267. wait_for_nodes = test["run"].get("wait_for_nodes", None)
  268. if wait_for_nodes:
  269. buildkite_group(":stopwatch: Waiting for nodes to come up")
  270. # Overwrite wait_timeout from above to account for better default
  271. wait_timeout = int(
  272. wait_for_nodes.get("timeout", DEFAULT_WAIT_FOR_NODES_TIMEOUT)
  273. )
  274. num_nodes = test["run"]["wait_for_nodes"]["num_nodes"]
  275. command_runner.wait_for_nodes(num_nodes, wait_timeout)
  276. if prepare_cmd:
  277. try:
  278. command_runner.run_prepare_command(prepare_cmd, timeout=prepare_timeout)
  279. except CommandError as e:
  280. raise PrepareCommandError(e)
  281. except CommandTimeout as e:
  282. raise PrepareCommandTimeout(e)
  283. def _running_test_script(
  284. test: Test,
  285. smoke_test: bool,
  286. command_runner: CommandRunner,
  287. command_timeout: int,
  288. ) -> None:
  289. command = test["run"]["script"]
  290. command_env = {}
  291. if smoke_test:
  292. command = f"{command} --smoke-test"
  293. command_env["IS_SMOKE_TEST"] = "1"
  294. is_long_running = test["run"].get("long_running", False)
  295. try:
  296. command_runner.run_command(
  297. command,
  298. env=command_env,
  299. timeout=command_timeout,
  300. raise_on_timeout=not is_long_running,
  301. )
  302. except (
  303. TestCommandError,
  304. PrepareCommandError,
  305. TestCommandTimeout,
  306. PrepareCommandTimeout,
  307. ) as e:
  308. raise e
  309. except CommandError as e:
  310. raise TestCommandError(e)
  311. except CommandTimeout as e:
  312. if not is_long_running:
  313. # Only raise error if command is not long running
  314. raise TestCommandTimeout(e)
  315. def _fetching_results(
  316. result: Result,
  317. command_runner: CommandRunner,
  318. artifact_path: Optional[str],
  319. smoke_test: bool,
  320. start_time_unix: int,
  321. ) -> Tuple[dict, Exception]:
  322. fetch_result_exception = None
  323. try:
  324. command_results = command_runner.fetch_results()
  325. except Exception as e:
  326. logger.exception(f"Could not fetch results for test command: {e}")
  327. command_results = {}
  328. fetch_result_exception = e
  329. if artifact_path:
  330. try:
  331. command_runner.fetch_artifact()
  332. except Exception as e:
  333. logger.error("Could not fetch artifact for test command")
  334. logger.exception(e)
  335. # Postprocess result:
  336. if "last_update" in command_results:
  337. command_results["last_update_diff"] = time.time() - command_results.get(
  338. "last_update", 0.0
  339. )
  340. try:
  341. # Logic duplicated in ray_release/command_runner/_anyscale_job_wrapper.py
  342. # Timeout is the time the test took divided by 200
  343. # (~7 minutes for a 24h test) but no less than 30s
  344. # and no more than 900s
  345. metrics_timeout = max(30, min((time.time() - start_time_unix) / 200, 900))
  346. command_runner.save_metrics(start_time_unix, timeout=metrics_timeout)
  347. metrics = command_runner.fetch_metrics()
  348. except Exception as e:
  349. logger.exception(f"Could not fetch metrics for test command: {e}")
  350. metrics = {}
  351. if smoke_test:
  352. command_results["smoke_test"] = True
  353. result.results = command_results
  354. result.status = "finished"
  355. return metrics, fetch_result_exception
  356. def run_release_test(
  357. test: Test,
  358. anyscale_project: str,
  359. result: Result,
  360. ray_wheels_url: str,
  361. reporters: Optional[List[Reporter]] = None,
  362. smoke_test: bool = False,
  363. cluster_id: Optional[str] = None,
  364. cluster_env_id: Optional[str] = None,
  365. no_terminate: bool = False,
  366. ) -> Result:
  367. old_wd = os.getcwd()
  368. start_time = time.monotonic()
  369. command_runner = None
  370. cluster_manager = None
  371. pipeline_exception = None
  372. # non critical for some tests. So separate it from the general one.
  373. fetch_result_exception = None
  374. try:
  375. buildkite_group(":spiral_note_pad: Loading test configuration")
  376. cluster_manager, command_runner, artifact_path = _load_test_configuration(
  377. test,
  378. anyscale_project,
  379. result,
  380. ray_wheels_url,
  381. smoke_test,
  382. no_terminate,
  383. )
  384. buildkite_group(":nut_and_bolt: Setting up cluster environment")
  385. (
  386. prepare_cmd,
  387. prepare_timeout,
  388. build_timeout,
  389. cluster_timeout,
  390. command_timeout,
  391. ) = _setup_cluster_environment(
  392. test,
  393. result,
  394. cluster_manager,
  395. ray_wheels_url,
  396. cluster_env_id,
  397. )
  398. buildkite_group(":nut_and_bolt: Setting up local environment")
  399. _setup_local_environment(test, command_runner, ray_wheels_url)
  400. # Print installed pip packages
  401. buildkite_group(":bulb: Local environment information")
  402. _local_environment_information(
  403. result,
  404. cluster_manager,
  405. command_runner,
  406. build_timeout,
  407. cluster_timeout,
  408. no_terminate,
  409. cluster_id,
  410. cluster_env_id,
  411. )
  412. # Upload files
  413. buildkite_group(":wrench: Preparing remote environment")
  414. _prepare_remote_environment(
  415. test,
  416. command_runner,
  417. prepare_cmd,
  418. prepare_timeout,
  419. )
  420. buildkite_group(":runner: Running test script")
  421. start_time_unix = time.time()
  422. _running_test_script(
  423. test,
  424. smoke_test,
  425. command_runner,
  426. command_timeout,
  427. )
  428. buildkite_group(":floppy_disk: Fetching results")
  429. metrics, fetch_result_exception = _fetching_results(
  430. result,
  431. command_runner,
  432. artifact_path,
  433. smoke_test,
  434. start_time_unix,
  435. )
  436. except Exception as e:
  437. logger.exception(e)
  438. buildkite_open_last()
  439. pipeline_exception = e
  440. metrics = {}
  441. # Obtain the cluster URL again as it is set after the
  442. # command was run in case of anyscale jobs
  443. if isinstance(command_runner, AnyscaleJobRunner):
  444. result.cluster_url = cluster_manager.get_cluster_url()
  445. result.cluster_id = cluster_manager.cluster_id
  446. result.job_url = command_runner.job_manager.job_url
  447. result.job_id = command_runner.job_manager.job_id
  448. result.last_logs = command_runner.get_last_logs() if command_runner else None
  449. if not no_terminate and cluster_manager:
  450. buildkite_group(":earth_africa: Terminating cluster")
  451. cluster_manager.terminate_cluster(wait=False)
  452. if hasattr(command_runner, "cleanup"):
  453. command_runner.cleanup()
  454. reset_signal_handling()
  455. time_taken = time.monotonic() - start_time
  456. result.runtime = time_taken
  457. result.prometheus_metrics = metrics
  458. os.chdir(old_wd)
  459. if not pipeline_exception:
  460. if require_result(test) and fetch_result_exception:
  461. pipeline_exception = fetch_result_exception
  462. else:
  463. buildkite_group(":mag: Interpreting results")
  464. # Only handle results if we didn't run into issues earlier
  465. try:
  466. handle_result(test, result)
  467. except Exception as e:
  468. pipeline_exception = e
  469. if pipeline_exception:
  470. buildkite_group(":rotating_light: Handling errors")
  471. exit_code, result_status, runtime = handle_exception(
  472. pipeline_exception,
  473. result.runtime,
  474. )
  475. result.return_code = exit_code.value
  476. result.status = result_status.value
  477. if runtime is not None:
  478. result.runtime = runtime
  479. try:
  480. raise pipeline_exception
  481. except Exception:
  482. if not result.last_logs:
  483. result.last_logs = traceback.format_exc()
  484. buildkite_group(":memo: Reporting results", open=True)
  485. for reporter in reporters or []:
  486. reporter.report_result(test, result)
  487. if pipeline_exception:
  488. raise pipeline_exception
  489. return result