command_runner.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. import abc
  2. from typing import Dict, Any, Optional, List
  3. from ray_release.cluster_manager.cluster_manager import ClusterManager
  4. from ray_release.file_manager.file_manager import FileManager
  5. from ray_release.reporter.artifacts import DEFAULT_ARTIFACTS_DIR
  6. from ray_release.util import exponential_backoff_retry
  7. from ray_release.logger import logger
  8. from click.exceptions import ClickException
  9. class CommandRunner(abc.ABC):
  10. """This is run on Buildkite runners."""
  11. # the directory for runners to dump files to (on buildkite runner instances).
  12. # Write to this directory. run_release_tests.sh will ensure that the content
  13. # shows up under buildkite job's "Artifacts" UI tab.
  14. _DEFAULT_ARTIFACTS_DIR = DEFAULT_ARTIFACTS_DIR
  15. # the artifact file name put under s3 bucket root.
  16. # AnyscalejobWrapper will upload user generated artifact to this path
  17. # and AnyscaleJobRunner will then download from there.
  18. _USER_GENERATED_ARTIFACT = "user_generated_artifact"
  19. # the path where result json will be written to on both head node
  20. # as well as the relative path where result json will be uploaded to on s3.
  21. _RESULT_OUTPUT_JSON = "/tmp/release_test_out.json"
  22. # the path where output json will be written to on both head node
  23. # as well as the relative path where metrics json will be uploaded to on s3.
  24. _METRICS_OUTPUT_JSON = "/tmp/metrics_test_out.json"
  25. def __init__(
  26. self,
  27. cluster_manager: ClusterManager,
  28. file_manager: FileManager,
  29. working_dir: str,
  30. artifact_path: Optional[str] = None,
  31. ):
  32. self.cluster_manager = cluster_manager
  33. self.file_manager = file_manager
  34. self.working_dir = working_dir
  35. @property
  36. def command_env(self):
  37. return {
  38. "TEST_OUTPUT_JSON": self._RESULT_OUTPUT_JSON,
  39. "METRICS_OUTPUT_JSON": self._METRICS_OUTPUT_JSON,
  40. "USER_GENERATED_ARTIFACT": self._USER_GENERATED_ARTIFACT,
  41. }
  42. def get_full_command_env(self, env: Optional[Dict] = None):
  43. full_env = self.command_env.copy()
  44. if env:
  45. full_env.update(env)
  46. return full_env
  47. def prepare_local_env(self, ray_wheels_url: Optional[str] = None):
  48. """Prepare local environment, e.g. install dependencies."""
  49. raise NotImplementedError
  50. def prepare_remote_env(self):
  51. """Prepare remote environment, e.g. upload files."""
  52. raise NotImplementedError
  53. def wait_for_nodes(self, num_nodes: int, timeout: float = 900.0):
  54. """Wait for cluster nodes to be up.
  55. Args:
  56. num_nodes: Number of nodes to wait for.
  57. timeout: Timeout in seconds to wait for nodes before
  58. raising a ``PrepareCommandTimeoutError``.
  59. Returns:
  60. None
  61. Raises:
  62. PrepareCommandTimeoutError
  63. """
  64. raise NotImplementedError
  65. def save_metrics(self, start_time: float, timeout: float = 900.0):
  66. """Obtains Prometheus metrics from head node and saves them
  67. to ``self.metrics_output_json``.
  68. Args:
  69. start_time: From which UNIX timestamp to start the query.
  70. timeout: Timeout in seconds.
  71. Returns:
  72. None
  73. """
  74. raise NotImplementedError
  75. def run_command(
  76. self,
  77. command: str,
  78. env: Optional[Dict] = None,
  79. timeout: float = 3600.0,
  80. raise_on_timeout: bool = True,
  81. pip: Optional[List[str]] = None,
  82. ) -> float:
  83. """Run command."""
  84. raise NotImplementedError
  85. def run_prepare_command(
  86. self, command: str, env: Optional[Dict] = None, timeout: float = 3600.0
  87. ):
  88. """Run prepare command.
  89. Command runners may choose to run this differently than the
  90. test command.
  91. """
  92. return exponential_backoff_retry(
  93. lambda: self.run_command(command, env, timeout),
  94. ClickException,
  95. initial_retry_delay_s=5,
  96. max_retries=3,
  97. )
  98. def get_last_logs(self) -> Optional[str]:
  99. try:
  100. return self.get_last_logs_ex()
  101. except Exception as e:
  102. logger.exception(f"Error fetching logs: {e}")
  103. return None
  104. def get_last_logs_ex(self):
  105. raise NotImplementedError
  106. def fetch_results(self) -> Dict[str, Any]:
  107. raise NotImplementedError
  108. def fetch_metrics(self) -> Dict[str, Any]:
  109. raise NotImplementedError
  110. def fetch_artifact(self, artifact_path):
  111. raise NotImplementedError