123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422 |
- """
- This script provides extra functionality for Anyscale Jobs tests.
- It will be ran on the cluster.
- We need to reimplement some utility functions here as it will not
- have access to the ray_release package.
- """
- import argparse
- import time
- import os
- from pathlib import Path
- import subprocess
- import multiprocessing
- import json
- import sys
- import logging
- from urllib.parse import urlparse
- from typing import Optional, List, Tuple
- OUTPUT_JSON_FILENAME = "output.json"
- AWS_CP_TIMEOUT = 300
- TIMEOUT_RETURN_CODE = 124 # same as bash timeout
- installed_pips = []
- logger = logging.getLogger(__name__)
- logger.setLevel(logging.INFO)
- handler = logging.StreamHandler(stream=sys.stderr)
- formatter = logging.Formatter(
- fmt="[%(levelname)s %(asctime)s] %(filename)s: %(lineno)d %(message)s"
- )
- handler.setFormatter(formatter)
- logger.addHandler(handler)
- def exponential_backoff_retry(
- f, retry_exceptions, initial_retry_delay_s, max_retries
- ) -> None:
- retry_cnt = 0
- retry_delay_s = initial_retry_delay_s
- while True:
- try:
- return f()
- except retry_exceptions as e:
- retry_cnt += 1
- if retry_cnt > max_retries:
- raise
- logger.warning(
- f"Retry function call failed due to {e} "
- f"in {retry_delay_s} seconds..."
- )
- time.sleep(retry_delay_s)
- retry_delay_s *= 2
- def install_pip(pip: str):
- if pip in installed_pips:
- return
- subprocess.run(["pip", "install", "-q", pip], check=True)
- installed_pips.append(pip)
- def run_storage_cp(source: str, target: str):
- if not source or not target:
- return False
- if not Path(source).exists():
- logger.warning(f"Couldn't upload to cloud storage: '{source}' does not exist.")
- return False
- storage_service = urlparse(target).scheme
- cp_cmd_args = []
- if storage_service == "s3":
- cp_cmd_args = [
- "aws",
- "s3",
- "cp",
- source,
- target,
- "--acl",
- "bucket-owner-full-control",
- ]
- elif storage_service == "gs":
- cp_cmd_args = [
- "gcloud",
- "storage",
- "cp",
- source,
- target,
- ]
- else:
- raise Exception(f"Not supporting storage service: {storage_service}")
- try:
- exponential_backoff_retry(
- lambda: subprocess.run(
- cp_cmd_args,
- timeout=AWS_CP_TIMEOUT,
- check=True,
- ),
- subprocess.SubprocessError,
- initial_retry_delay_s=10,
- max_retries=3,
- )
- return True
- except subprocess.SubprocessError:
- logger.exception("Couldn't upload to cloud storage.")
- return False
- def collect_metrics(time_taken: float) -> bool:
- if "METRICS_OUTPUT_JSON" not in os.environ:
- return False
- # Timeout is the time the test took divided by 200
- # (~7 minutes for a 24h test) but no less than 90s
- # and no more than 900s
- metrics_timeout = max(90, min((time.time() - time_taken) / 200, 900))
- try:
- subprocess.run(
- [
- "python",
- "prometheus_metrics.py",
- str(time_taken),
- "--path",
- os.environ["METRICS_OUTPUT_JSON"],
- ],
- timeout=metrics_timeout,
- check=True,
- )
- return True
- except subprocess.SubprocessError:
- logger.exception("Couldn't collect metrics.")
- return False
- # Has to be here so it can be pickled
- def _run_bash_command_subprocess(command: str, timeout: float):
- """Ran in a multiprocessing process."""
- try:
- subprocess.run(command, check=True, timeout=timeout)
- return_code = 0
- except subprocess.TimeoutExpired:
- return_code = TIMEOUT_RETURN_CODE
- except subprocess.CalledProcessError as e:
- return_code = e.returncode
- print(f"Subprocess return code: {return_code}", file=sys.stderr)
- # Exit so the return code is propagated to the outer process
- sys.exit(return_code)
- def run_bash_command(workload: str, timeout: float):
- timeout = timeout if timeout > 0 else None
- cwd = Path.cwd()
- workload_path = cwd / "workload.sh"
- workload_path = workload_path.resolve()
- with open(workload_path, "w") as fp:
- fp.write(workload)
- command = ["bash", "-x", str(workload_path)]
- logger.info(f"Running command {workload}")
- # Pop job's runtime env to allow workload's runtime env to take precedence
- # TODO: Confirm this is safe
- os.environ.pop("RAY_JOB_CONFIG_JSON_ENV_VAR", None)
- # We use multiprocessing with 'spawn' context to avoid
- # forking (as happens when using subprocess directly).
- # Forking messes up Ray interactions and causes deadlocks.
- return_code = None
- try:
- ctx = multiprocessing.get_context("spawn")
- p = ctx.Process(target=_run_bash_command_subprocess, args=(command, timeout))
- p.start()
- logger.info(f"Starting process {p.pid}.")
- # Add a little extra to the timeout as _run_bash_command_subprocess
- # also has a timeout internally and it's cleaner to use that
- p.join(timeout=timeout + 10)
- except multiprocessing.TimeoutError:
- return_code = TIMEOUT_RETURN_CODE
- except multiprocessing.ProcessError:
- pass
- finally:
- if p.is_alive():
- logger.warning(f"Terminating process {p.pid} forcefully.")
- p.terminate()
- if return_code is None:
- return_code = p.exitcode
- os.remove(str(workload_path))
- logger.info(f"Process {p.pid} exited with return code {return_code}.")
- assert return_code is not None
- return return_code
- def run_prepare_commands(
- prepare_commands: List[str], prepare_commands_timeouts: List[float]
- ) -> Tuple[bool, List[int], float]:
- """Run prepare commands. All commands must pass. Fails fast."""
- prepare_return_codes = []
- prepare_passed = True
- prepare_time_taken = None
- if not prepare_commands:
- return prepare_passed, prepare_return_codes, prepare_time_taken
- logger.info("### Starting prepare commands ###")
- for prepare_command, timeout in zip(prepare_commands, prepare_commands_timeouts):
- command_start_time = time.monotonic()
- prepare_return_codes.append(run_bash_command(prepare_command, timeout))
- prepare_time_taken = time.monotonic() - command_start_time
- return_code = prepare_return_codes[-1]
- if return_code == 0:
- continue
- timed_out = return_code == TIMEOUT_RETURN_CODE
- if timed_out:
- logger.error(
- "Prepare command timed out. " f"Time taken: {prepare_time_taken}"
- )
- else:
- logger.info(
- f"Prepare command finished with return code {return_code}. "
- f"Time taken: {prepare_time_taken}"
- )
- logger.error("Prepare command failed.")
- prepare_passed = False
- break
- return prepare_passed, prepare_return_codes, prepare_time_taken
- def main(
- test_workload: str,
- test_workload_timeout: float,
- test_no_raise_on_timeout: bool,
- results_cloud_storage_uri: Optional[str],
- metrics_cloud_storage_uri: Optional[str],
- output_cloud_storage_uri: Optional[str],
- upload_cloud_storage_uri: Optional[str],
- artifact_path: Optional[str],
- prepare_commands: List[str],
- prepare_commands_timeouts: List[str],
- ):
- """
- This function provides extra functionality for an Anyscale Job.
- 1. Runs prepare commands and handles their timeouts
- 2. Runs the actual test workload and handles its timeout
- 3. Uploads test results.json
- 4. Gathers prometheus metrics
- 5. Uploads prometheus metrics.json
- 6. Uploads output.json
- """
- logger.info("### Starting ###")
- start_time = time.monotonic()
- if len(prepare_commands) != len(prepare_commands_timeouts):
- raise ValueError(
- "`prepare_commands` and `prepare_commands_timeouts` must "
- "have the same length."
- )
- # Run prepare commands. All prepare commands must pass.
- (
- prepare_passed,
- prepare_return_codes,
- last_prepare_time_taken,
- ) = run_prepare_commands(prepare_commands, prepare_commands_timeouts)
- uploaded_results = False
- collected_metrics = False
- uploaded_metrics = False
- uploaded_artifact = artifact_path is not None
- workload_time_taken = None
- # If all prepare commands passed, run actual test workload.
- if prepare_passed:
- logger.info("### Starting entrypoint ###")
- command_start_time = time.monotonic()
- return_code = run_bash_command(test_workload, test_workload_timeout)
- workload_time_taken = time.monotonic() - command_start_time
- timed_out = return_code == TIMEOUT_RETURN_CODE
- if timed_out:
- msg = f"Timed out. Time taken: {workload_time_taken}"
- if test_no_raise_on_timeout:
- logger.info(msg)
- else:
- logger.error(msg)
- else:
- logger.info(
- f"Finished with return code {return_code}. "
- f"Time taken: {workload_time_taken}"
- )
- # Upload results.json
- uploaded_results = run_storage_cp(
- os.environ.get("TEST_OUTPUT_JSON", None), results_cloud_storage_uri
- )
- # Collect prometheus metrics
- collected_metrics = collect_metrics(workload_time_taken)
- if collected_metrics:
- # Upload prometheus metrics
- uploaded_metrics = run_storage_cp(
- os.environ.get("METRICS_OUTPUT_JSON", None), metrics_cloud_storage_uri
- )
- uploaded_artifact = run_storage_cp(
- artifact_path,
- os.path.join(
- upload_cloud_storage_uri, os.environ["USER_GENERATED_ARTIFACT"]
- )
- if "USER_GENERATED_ARTIFACT" in os.environ
- else None,
- )
- else:
- return_code = None
- total_time_taken = time.monotonic() - start_time
- output_json = {
- "return_code": return_code,
- "prepare_return_codes": prepare_return_codes,
- "last_prepare_time_taken": last_prepare_time_taken,
- "workload_time_taken": workload_time_taken,
- "total_time_taken": total_time_taken,
- "uploaded_results": uploaded_results,
- "collected_metrics": collected_metrics,
- "uploaded_metrics": uploaded_metrics,
- "uploaded_artifact": uploaded_artifact,
- }
- output_json = json.dumps(
- output_json, ensure_ascii=True, sort_keys=True, separators=(",", ":")
- )
- output_json_file = (Path.cwd() / OUTPUT_JSON_FILENAME).resolve()
- with open(output_json_file, "w") as fp:
- fp.write(output_json)
- # Upload output.json
- run_storage_cp(str(output_json_file), output_cloud_storage_uri)
- logger.info("### Finished ###")
- # This will be read by the AnyscaleJobRunner on the buildkite runner
- # if output.json cannot be obtained from cloud storage
- logger.info(f"### JSON |{output_json}| ###")
- # Flush buffers
- logging.shutdown()
- print("", flush=True)
- print("", file=sys.stderr, flush=True)
- if return_code == TIMEOUT_RETURN_CODE and test_no_raise_on_timeout:
- return_code = 0
- elif return_code is None:
- return_code = 1
- time.sleep(1)
- return return_code
- if __name__ == "__main__":
- parser = argparse.ArgumentParser()
- parser.add_argument(
- "test_workload", type=str, help="test workload, eg. python workloads/script.py"
- )
- parser.add_argument(
- "--test-workload-timeout",
- default=3600,
- type=float,
- help="test workload timeout (set to <0 for infinite)",
- )
- parser.add_argument(
- "--test-no-raise-on-timeout",
- action="store_true",
- help="don't fail on timeout",
- )
- parser.add_argument(
- "--results-cloud-storage-uri",
- type=str,
- help="bucket address to upload results.json to",
- required=False,
- )
- parser.add_argument(
- "--metrics-cloud-storage-uri",
- type=str,
- help="bucket address to upload metrics.json to",
- required=False,
- )
- parser.add_argument(
- "--output-cloud-storage-uri",
- type=str,
- help="bucket address to upload output.json to",
- required=False,
- )
- parser.add_argument(
- "--upload-cloud-storage-uri",
- type=str,
- help="root cloud-storage bucket address to upload stuff",
- required=False,
- )
- parser.add_argument(
- "--artifact-path",
- type=str,
- help="user provided artifact path (on head node), must be a single file path",
- required=False,
- )
- parser.add_argument(
- "--prepare-commands", type=str, nargs="*", help="prepare commands to run"
- )
- parser.add_argument(
- "--prepare-commands-timeouts",
- default=3600,
- type=float,
- nargs="*",
- help="timeout for prepare commands (set to <0 for infinite)",
- )
- args = parser.parse_args()
- sys.exit(main(**args.__dict__))
|