123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- import abc
- from typing import Dict, Any, Optional, List
- from ray_release.cluster_manager.cluster_manager import ClusterManager
- from ray_release.file_manager.file_manager import FileManager
- from ray_release.reporter.artifacts import DEFAULT_ARTIFACTS_DIR
- from ray_release.util import exponential_backoff_retry
- from ray_release.logger import logger
- from click.exceptions import ClickException
- class CommandRunner(abc.ABC):
- """This is run on Buildkite runners."""
- # the directory for runners to dump files to (on buildkite runner instances).
- # Write to this directory. run_release_tests.sh will ensure that the content
- # shows up under buildkite job's "Artifacts" UI tab.
- _DEFAULT_ARTIFACTS_DIR = DEFAULT_ARTIFACTS_DIR
- # the artifact file name put under s3 bucket root.
- # AnyscalejobWrapper will upload user generated artifact to this path
- # and AnyscaleJobRunner will then download from there.
- _USER_GENERATED_ARTIFACT = "user_generated_artifact"
- # the path where result json will be written to on both head node
- # as well as the relative path where result json will be uploaded to on s3.
- _RESULT_OUTPUT_JSON = "/tmp/release_test_out.json"
- # the path where output json will be written to on both head node
- # as well as the relative path where metrics json will be uploaded to on s3.
- _METRICS_OUTPUT_JSON = "/tmp/metrics_test_out.json"
- def __init__(
- self,
- cluster_manager: ClusterManager,
- file_manager: FileManager,
- working_dir: str,
- artifact_path: Optional[str] = None,
- ):
- self.cluster_manager = cluster_manager
- self.file_manager = file_manager
- self.working_dir = working_dir
- @property
- def command_env(self):
- return {
- "TEST_OUTPUT_JSON": self._RESULT_OUTPUT_JSON,
- "METRICS_OUTPUT_JSON": self._METRICS_OUTPUT_JSON,
- "USER_GENERATED_ARTIFACT": self._USER_GENERATED_ARTIFACT,
- }
- def get_full_command_env(self, env: Optional[Dict] = None):
- full_env = self.command_env.copy()
- if env:
- full_env.update(env)
- return full_env
- def prepare_local_env(self, ray_wheels_url: Optional[str] = None):
- """Prepare local environment, e.g. install dependencies."""
- raise NotImplementedError
- def prepare_remote_env(self):
- """Prepare remote environment, e.g. upload files."""
- raise NotImplementedError
- def wait_for_nodes(self, num_nodes: int, timeout: float = 900.0):
- """Wait for cluster nodes to be up.
- Args:
- num_nodes: Number of nodes to wait for.
- timeout: Timeout in seconds to wait for nodes before
- raising a ``PrepareCommandTimeoutError``.
- Returns:
- None
- Raises:
- PrepareCommandTimeoutError
- """
- raise NotImplementedError
- def save_metrics(self, start_time: float, timeout: float = 900.0):
- """Obtains Prometheus metrics from head node and saves them
- to ``self.metrics_output_json``.
- Args:
- start_time: From which UNIX timestamp to start the query.
- timeout: Timeout in seconds.
- Returns:
- None
- """
- raise NotImplementedError
- def run_command(
- self,
- command: str,
- env: Optional[Dict] = None,
- timeout: float = 3600.0,
- raise_on_timeout: bool = True,
- pip: Optional[List[str]] = None,
- ) -> float:
- """Run command."""
- raise NotImplementedError
- def run_prepare_command(
- self, command: str, env: Optional[Dict] = None, timeout: float = 3600.0
- ):
- """Run prepare command.
- Command runners may choose to run this differently than the
- test command.
- """
- return exponential_backoff_retry(
- lambda: self.run_command(command, env, timeout),
- ClickException,
- initial_retry_delay_s=5,
- max_retries=3,
- )
- def get_last_logs(self) -> Optional[str]:
- try:
- return self.get_last_logs_ex()
- except Exception as e:
- logger.exception(f"Error fetching logs: {e}")
- return None
- def get_last_logs_ex(self):
- raise NotImplementedError
- def fetch_results(self) -> Dict[str, Any]:
- raise NotImplementedError
- def fetch_metrics(self) -> Dict[str, Any]:
- raise NotImplementedError
- def fetch_artifact(self, artifact_path):
- raise NotImplementedError
|