test_glue.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680
  1. import os
  2. import pytest
  3. import shutil
  4. import sys
  5. import tempfile
  6. import time
  7. from typing import Type, Callable, Optional
  8. import unittest
  9. from unittest.mock import patch
  10. from ray_release.alerts.handle import result_to_handle_map
  11. from ray_release.cluster_manager.cluster_manager import ClusterManager
  12. from ray_release.cluster_manager.full import FullClusterManager
  13. from ray_release.command_runner.command_runner import CommandRunner
  14. from ray_release.config import (
  15. Test,
  16. DEFAULT_COMMAND_TIMEOUT,
  17. DEFAULT_WAIT_FOR_NODES_TIMEOUT,
  18. )
  19. from ray_release.exception import (
  20. ReleaseTestConfigError,
  21. LocalEnvSetupError,
  22. ClusterComputeCreateError,
  23. ClusterEnvBuildError,
  24. ClusterEnvBuildTimeout,
  25. ClusterEnvCreateError,
  26. ClusterCreationError,
  27. ClusterStartupError,
  28. ClusterStartupTimeout,
  29. RemoteEnvSetupError,
  30. CommandError,
  31. PrepareCommandError,
  32. CommandTimeout,
  33. PrepareCommandTimeout,
  34. TestCommandError,
  35. TestCommandTimeout,
  36. FetchResultError,
  37. LogsError,
  38. ResultsAlert,
  39. ClusterNodesWaitTimeout,
  40. )
  41. from ray_release.file_manager.file_manager import FileManager
  42. from ray_release.glue import (
  43. run_release_test,
  44. type_str_to_command_runner,
  45. command_runner_to_cluster_manager,
  46. TIMEOUT_BUFFER_MINUTES,
  47. )
  48. from ray_release.logger import logger
  49. from ray_release.reporter.reporter import Reporter
  50. from ray_release.result import Result, ExitCode
  51. from ray_release.tests.utils import MockSDK, APIDict
  52. def _fail_on_call(error_type: Type[Exception] = RuntimeError, message: str = "Fail"):
  53. def _fail(*args, **kwargs):
  54. raise error_type(message)
  55. return _fail
  56. class MockReturn:
  57. return_dict = {}
  58. def __getattribute__(self, item):
  59. return_dict = object.__getattribute__(self, "return_dict")
  60. if item in return_dict:
  61. mocked = return_dict[item]
  62. if isinstance(mocked, Callable):
  63. return mocked()
  64. else:
  65. return lambda *a, **kw: mocked
  66. return object.__getattribute__(self, item)
  67. @patch("ray_release.glue.reinstall_anyscale_dependencies", lambda: None)
  68. @patch("ray_release.glue.get_pip_packages", lambda: ["pip-packages"])
  69. class GlueTest(unittest.TestCase):
  70. def writeClusterEnv(self, content: str):
  71. with open(os.path.join(self.tempdir, "cluster_env.yaml"), "wt") as fp:
  72. fp.write(content)
  73. def writeClusterCompute(self, content: str):
  74. with open(os.path.join(self.tempdir, "cluster_compute.yaml"), "wt") as fp:
  75. fp.write(content)
  76. def setUp(self) -> None:
  77. self.tempdir = tempfile.mkdtemp()
  78. self.sdk = MockSDK()
  79. self.sdk.returns["get_project"] = APIDict(
  80. result=APIDict(name="unit_test_project")
  81. )
  82. self.sdk.returns["get_cloud"] = APIDict(result=APIDict(provider="AWS"))
  83. self.writeClusterEnv("{'env': true}")
  84. self.writeClusterCompute("{'compute': true}")
  85. with open(os.path.join(self.tempdir, "driver_fail.sh"), "wt") as f:
  86. f.write("exit 1\n")
  87. with open(os.path.join(self.tempdir, "driver_succeed.sh"), "wt") as f:
  88. f.write("exit 0\n")
  89. this_sdk = self.sdk
  90. this_tempdir = self.tempdir
  91. self.instances = {}
  92. self.cluster_manager_return = {}
  93. self.command_runner_return = {}
  94. self.file_manager_return = {}
  95. this_instances = self.instances
  96. this_cluster_manager_return = self.cluster_manager_return
  97. this_command_runner_return = self.command_runner_return
  98. this_file_manager_return = self.file_manager_return
  99. class MockClusterManager(MockReturn, FullClusterManager):
  100. def __init__(
  101. self,
  102. test_name: str,
  103. project_id: str,
  104. sdk=None,
  105. smoke_test: bool = False,
  106. ):
  107. super(MockClusterManager, self).__init__(
  108. test_name, project_id, this_sdk, smoke_test=smoke_test
  109. )
  110. self.return_dict = this_cluster_manager_return
  111. this_instances["cluster_manager"] = self
  112. class MockCommandRunner(MockReturn, CommandRunner):
  113. return_dict = self.cluster_manager_return
  114. def __init__(
  115. self,
  116. cluster_manager: ClusterManager,
  117. file_manager: FileManager,
  118. working_dir,
  119. sdk=None,
  120. artifact_path: Optional[str] = None,
  121. ):
  122. super(MockCommandRunner, self).__init__(
  123. cluster_manager, file_manager, this_tempdir
  124. )
  125. self.return_dict = this_command_runner_return
  126. class MockFileManager(MockReturn, FileManager):
  127. def __init__(self, cluster_manager: ClusterManager):
  128. super(MockFileManager, self).__init__(cluster_manager)
  129. self.return_dict = this_file_manager_return
  130. self.mock_alert_return = None
  131. def mock_alerter(test: Test, result: Result):
  132. return self.mock_alert_return
  133. result_to_handle_map["unit_test_alerter"] = (mock_alerter, False)
  134. type_str_to_command_runner["unit_test"] = MockCommandRunner
  135. command_runner_to_cluster_manager[MockCommandRunner] = MockClusterManager
  136. self.test = Test(
  137. name="unit_test_end_to_end",
  138. run=dict(
  139. type="unit_test",
  140. prepare="prepare_cmd",
  141. script="test_cmd",
  142. wait_for_nodes=dict(num_nodes=4, timeout=40),
  143. ),
  144. working_dir=self.tempdir,
  145. cluster=dict(
  146. cluster_env="cluster_env.yaml", cluster_compute="cluster_compute.yaml"
  147. ),
  148. alert="unit_test_alerter",
  149. driver_setup="driver_fail.sh",
  150. )
  151. self.anyscale_project = "prj_unit12345678"
  152. self.ray_wheels_url = "http://mock.wheels/"
  153. def tearDown(self) -> None:
  154. shutil.rmtree(self.tempdir)
  155. def _succeed_until(self, until: str):
  156. # These commands should succeed
  157. self.command_runner_return["prepare_local_env"] = None
  158. if until == "local_env":
  159. return
  160. self.test["driver_setup"] = "driver_succeed.sh"
  161. if until == "driver_setup":
  162. return
  163. self.cluster_manager_return["cluster_compute_id"] = "valid"
  164. self.cluster_manager_return["create_cluster_compute"] = None
  165. if until == "cluster_compute":
  166. return
  167. self.cluster_manager_return["cluster_env_id"] = "valid"
  168. self.cluster_manager_return["create_cluster_env"] = None
  169. self.cluster_manager_return["cluster_env_build_id"] = "valid"
  170. self.cluster_manager_return["build_cluster_env"] = None
  171. if until == "cluster_env":
  172. return
  173. self.cluster_manager_return["cluster_id"] = "valid"
  174. self.cluster_manager_return["start_cluster"] = None
  175. if until == "cluster_start":
  176. return
  177. self.command_runner_return["prepare_remote_env"] = None
  178. if until == "remote_env":
  179. return
  180. self.command_runner_return["wait_for_nodes"] = None
  181. if until == "wait_for_nodes":
  182. return
  183. self.command_runner_return["run_prepare_command"] = None
  184. if until == "prepare_command":
  185. return
  186. self.command_runner_return["run_command"] = None
  187. if until == "test_command":
  188. return
  189. self.command_runner_return["fetch_results"] = {
  190. "time_taken": 50,
  191. "last_update": time.time() - 60,
  192. }
  193. if until == "fetch_results":
  194. return
  195. self.command_runner_return["get_last_logs_ex"] = "Lorem ipsum"
  196. if until == "get_last_logs":
  197. return
  198. self.mock_alert_return = None
  199. def _run(self, result: Result, **kwargs):
  200. run_release_test(
  201. test=self.test,
  202. anyscale_project=self.anyscale_project,
  203. result=result,
  204. ray_wheels_url=self.ray_wheels_url,
  205. **kwargs
  206. )
  207. def testInvalidClusterEnv(self):
  208. result = Result()
  209. # Any ReleaseTestConfigError
  210. with patch(
  211. "ray_release.glue.load_test_cluster_env",
  212. _fail_on_call(ReleaseTestConfigError),
  213. ), self.assertRaises(ReleaseTestConfigError):
  214. self._run(result)
  215. self.assertEqual(result.return_code, ExitCode.CONFIG_ERROR.value)
  216. # Fails because file not found
  217. os.unlink(os.path.join(self.tempdir, "cluster_env.yaml"))
  218. with self.assertRaisesRegex(ReleaseTestConfigError, "Path not found"):
  219. self._run(result)
  220. self.assertEqual(result.return_code, ExitCode.CONFIG_ERROR.value)
  221. # Fails because invalid jinja template
  222. self.writeClusterEnv("{{ INVALID")
  223. with self.assertRaisesRegex(ReleaseTestConfigError, "yaml template"):
  224. self._run(result)
  225. self.assertEqual(result.return_code, ExitCode.CONFIG_ERROR.value)
  226. # Fails because invalid json
  227. self.writeClusterEnv("{'test': true, 'fail}")
  228. with self.assertRaisesRegex(ReleaseTestConfigError, "quoted scalar"):
  229. self._run(result)
  230. self.assertEqual(result.return_code, ExitCode.CONFIG_ERROR.value)
  231. def testInvalidClusterCompute(self):
  232. result = Result()
  233. with patch(
  234. "ray_release.glue.load_test_cluster_compute",
  235. _fail_on_call(ReleaseTestConfigError),
  236. ), self.assertRaises(ReleaseTestConfigError):
  237. self._run(result)
  238. self.assertEqual(result.return_code, ExitCode.CONFIG_ERROR.value)
  239. # Fails because file not found
  240. os.unlink(os.path.join(self.tempdir, "cluster_compute.yaml"))
  241. with self.assertRaisesRegex(ReleaseTestConfigError, "Path not found"):
  242. self._run(result)
  243. self.assertEqual(result.return_code, ExitCode.CONFIG_ERROR.value)
  244. # Fails because invalid jinja template
  245. self.writeClusterCompute("{{ INVALID")
  246. with self.assertRaisesRegex(ReleaseTestConfigError, "yaml template"):
  247. self._run(result)
  248. self.assertEqual(result.return_code, ExitCode.CONFIG_ERROR.value)
  249. # Fails because invalid json
  250. self.writeClusterCompute("{'test': true, 'fail}")
  251. with self.assertRaisesRegex(ReleaseTestConfigError, "quoted scalar"):
  252. self._run(result)
  253. self.assertEqual(result.return_code, ExitCode.CONFIG_ERROR.value)
  254. def testAutomaticClusterEnvVariables(self):
  255. result = Result()
  256. self._succeed_until("local_env")
  257. with self.assertRaises(LocalEnvSetupError):
  258. self._run(result)
  259. cluster_manager = self.instances["cluster_manager"]
  260. command_timeout = self.test["run"].get("timeout", DEFAULT_COMMAND_TIMEOUT)
  261. prepare_cmd = self.test["run"].get("prepare", None)
  262. if prepare_cmd:
  263. prepare_timeout = self.test["run"].get("prepare_timeout", command_timeout)
  264. else:
  265. prepare_timeout = 0
  266. command_and_prepare_timeout = command_timeout + prepare_timeout
  267. wait_timeout = self.test["run"]["wait_for_nodes"].get(
  268. "timeout", DEFAULT_WAIT_FOR_NODES_TIMEOUT
  269. )
  270. expected_idle_termination_minutes = int(
  271. command_and_prepare_timeout / 60 + TIMEOUT_BUFFER_MINUTES
  272. )
  273. expected_maximum_uptime_minutes = int(
  274. expected_idle_termination_minutes + wait_timeout + TIMEOUT_BUFFER_MINUTES
  275. )
  276. self.assertEqual(
  277. cluster_manager.cluster_compute["idle_termination_minutes"],
  278. expected_idle_termination_minutes,
  279. )
  280. self.assertEqual(
  281. cluster_manager.cluster_compute["maximum_uptime_minutes"],
  282. expected_maximum_uptime_minutes,
  283. )
  284. def testInvalidPrepareLocalEnv(self):
  285. result = Result()
  286. self.command_runner_return["prepare_local_env"] = _fail_on_call(
  287. LocalEnvSetupError
  288. )
  289. with self.assertRaises(LocalEnvSetupError):
  290. self._run(result)
  291. self.assertEqual(result.return_code, ExitCode.LOCAL_ENV_SETUP_ERROR.value)
  292. def testDriverSetupFails(self):
  293. result = Result()
  294. self._succeed_until("local_env")
  295. with self.assertRaises(LocalEnvSetupError):
  296. self._run(result)
  297. self.assertEqual(result.return_code, ExitCode.LOCAL_ENV_SETUP_ERROR.value)
  298. def testInvalidClusterIdOverride(self):
  299. result = Result()
  300. self._succeed_until("driver_setup")
  301. self.sdk.returns["get_cluster_environment"] = None
  302. with self.assertRaises(ClusterEnvCreateError):
  303. self._run(result, cluster_env_id="existing")
  304. self.sdk.returns["get_cluster_environment"] = APIDict(
  305. result=APIDict(config_json={"overridden": True})
  306. )
  307. with self.assertRaises(Exception) as cm: # Fail somewhere else
  308. self._run(result, cluster_env_id="existing")
  309. self.assertNotIsInstance(cm.exception, ClusterEnvCreateError)
  310. def testBuildConfigFailsClusterCompute(self):
  311. result = Result()
  312. self._succeed_until("driver_setup")
  313. # These commands should succeed
  314. self.command_runner_return["prepare_local_env"] = None
  315. # Fails because API response faulty
  316. with self.assertRaisesRegex(ClusterComputeCreateError, "Unexpected"):
  317. self._run(result)
  318. self.assertEqual(result.return_code, ExitCode.CLUSTER_RESOURCE_ERROR.value)
  319. # Fails for random cluster compute reason
  320. self.cluster_manager_return["create_cluster_compute"] = _fail_on_call(
  321. ClusterComputeCreateError, "Known"
  322. )
  323. with self.assertRaisesRegex(ClusterComputeCreateError, "Known"):
  324. self._run(result)
  325. self.assertEqual(result.return_code, ExitCode.CLUSTER_RESOURCE_ERROR.value)
  326. def testBuildConfigFailsClusterEnv(self):
  327. result = Result()
  328. self._succeed_until("cluster_compute")
  329. # Fails because API response faulty
  330. with self.assertRaisesRegex(ClusterEnvCreateError, "Unexpected"):
  331. self._run(result)
  332. self.assertEqual(result.return_code, ExitCode.CLUSTER_RESOURCE_ERROR.value)
  333. # Fails for random cluster env create reason
  334. self.cluster_manager_return["create_cluster_env"] = _fail_on_call(
  335. ClusterEnvCreateError, "Known"
  336. )
  337. with self.assertRaisesRegex(ClusterEnvCreateError, "Known"):
  338. self._run(result)
  339. self.assertEqual(result.return_code, ExitCode.CLUSTER_RESOURCE_ERROR.value)
  340. # Now, succeed creation but fail on cluster env build
  341. self.cluster_manager_return["cluster_env_id"] = "valid"
  342. self.cluster_manager_return["create_cluster_env"] = None
  343. self.cluster_manager_return["build_cluster_env"] = _fail_on_call(
  344. ClusterEnvBuildError
  345. )
  346. with self.assertRaises(ClusterEnvBuildError):
  347. self._run(result)
  348. self.assertEqual(result.return_code, ExitCode.CLUSTER_ENV_BUILD_ERROR.value)
  349. # Now, fail on cluster env timeout
  350. self.cluster_manager_return["build_cluster_env"] = _fail_on_call(
  351. ClusterEnvBuildTimeout
  352. )
  353. with self.assertRaises(ClusterEnvBuildTimeout):
  354. self._run(result)
  355. self.assertEqual(result.return_code, ExitCode.CLUSTER_ENV_BUILD_TIMEOUT.value)
  356. def testStartClusterFails(self):
  357. result = Result()
  358. self._succeed_until("cluster_env")
  359. # Fails because API response faulty
  360. with self.assertRaises(ClusterCreationError):
  361. self._run(result)
  362. self.assertEqual(result.return_code, ExitCode.CLUSTER_RESOURCE_ERROR.value)
  363. self.cluster_manager_return["cluster_id"] = "valid"
  364. # Fail for random cluster startup reason
  365. self.cluster_manager_return["start_cluster"] = _fail_on_call(
  366. ClusterStartupError
  367. )
  368. with self.assertRaises(ClusterStartupError):
  369. self._run(result)
  370. self.assertEqual(result.return_code, ExitCode.CLUSTER_STARTUP_ERROR.value)
  371. # Ensure cluster was terminated
  372. self.assertGreaterEqual(self.sdk.call_counter["terminate_cluster"], 1)
  373. # Fail for cluster startup timeout
  374. self.cluster_manager_return["start_cluster"] = _fail_on_call(
  375. ClusterStartupTimeout
  376. )
  377. with self.assertRaises(ClusterStartupTimeout):
  378. self._run(result)
  379. self.assertEqual(result.return_code, ExitCode.CLUSTER_STARTUP_TIMEOUT.value)
  380. # Ensure cluster was terminated
  381. self.assertGreaterEqual(self.sdk.call_counter["terminate_cluster"], 1)
  382. def testPrepareRemoteEnvFails(self):
  383. result = Result()
  384. self._succeed_until("cluster_start")
  385. self.command_runner_return["prepare_remote_env"] = _fail_on_call(
  386. RemoteEnvSetupError
  387. )
  388. with self.assertRaises(RemoteEnvSetupError):
  389. self._run(result)
  390. self.assertEqual(result.return_code, ExitCode.REMOTE_ENV_SETUP_ERROR.value)
  391. # Ensure cluster was terminated
  392. self.assertGreaterEqual(self.sdk.call_counter["terminate_cluster"], 1)
  393. def testWaitForNodesFails(self):
  394. result = Result()
  395. self._succeed_until("remote_env")
  396. # Wait for nodes command fails
  397. self.command_runner_return["wait_for_nodes"] = _fail_on_call(
  398. ClusterNodesWaitTimeout
  399. )
  400. with self.assertRaises(ClusterNodesWaitTimeout):
  401. self._run(result)
  402. self.assertEqual(result.return_code, ExitCode.CLUSTER_WAIT_TIMEOUT.value)
  403. # Ensure cluster was terminated
  404. self.assertGreaterEqual(self.sdk.call_counter["terminate_cluster"], 1)
  405. def testPrepareCommandFails(self):
  406. result = Result()
  407. self._succeed_until("wait_for_nodes")
  408. # Prepare command fails
  409. self.command_runner_return["run_prepare_command"] = _fail_on_call(CommandError)
  410. with self.assertRaises(PrepareCommandError):
  411. self._run(result)
  412. self.assertEqual(result.return_code, ExitCode.PREPARE_ERROR.value)
  413. # Prepare command times out
  414. self.command_runner_return["run_prepare_command"] = _fail_on_call(
  415. CommandTimeout
  416. )
  417. with self.assertRaises(PrepareCommandTimeout):
  418. self._run(result)
  419. # Special case: Prepare commands are usually waiting for nodes
  420. # (this may change in the future!)
  421. self.assertEqual(result.return_code, ExitCode.CLUSTER_WAIT_TIMEOUT.value)
  422. # Ensure cluster was terminated
  423. self.assertGreaterEqual(self.sdk.call_counter["terminate_cluster"], 1)
  424. def testTestCommandFails(self):
  425. result = Result()
  426. self._succeed_until("prepare_command")
  427. # Test command fails
  428. self.command_runner_return["run_command"] = _fail_on_call(CommandError)
  429. with self.assertRaises(TestCommandError):
  430. self._run(result)
  431. self.assertEqual(result.return_code, ExitCode.COMMAND_ERROR.value)
  432. # Test command times out
  433. self.command_runner_return["run_command"] = _fail_on_call(CommandTimeout)
  434. with self.assertRaises(TestCommandTimeout):
  435. self._run(result)
  436. self.assertEqual(result.return_code, ExitCode.COMMAND_TIMEOUT.value)
  437. # Ensure cluster was terminated
  438. self.assertGreaterEqual(self.sdk.call_counter["terminate_cluster"], 1)
  439. def testTestCommandTimeoutLongRunning(self):
  440. result = Result()
  441. self._succeed_until("fetch_results")
  442. # Test command times out
  443. self.command_runner_return["run_command"] = _fail_on_call(CommandTimeout)
  444. with self.assertRaises(TestCommandTimeout):
  445. self._run(result)
  446. self.assertEqual(result.return_code, ExitCode.COMMAND_TIMEOUT.value)
  447. # But now set test to long running
  448. self.test["run"]["long_running"] = True
  449. self._run(result) # Will not fail this time
  450. self.assertGreaterEqual(result.results["last_update_diff"], 60.0)
  451. # Ensure cluster was terminated
  452. self.assertGreaterEqual(self.sdk.call_counter["terminate_cluster"], 1)
  453. def testSmokeUnstableTest(self):
  454. result = Result()
  455. self._succeed_until("complete")
  456. self.test["stable"] = False
  457. self._run(result, smoke_test=True)
  458. # Ensure stable and smoke_test are set correctly.
  459. assert not result.stable
  460. assert result.smoke_test
  461. def testFetchResultFails(self):
  462. result = Result()
  463. self._succeed_until("test_command")
  464. self.command_runner_return["fetch_results"] = _fail_on_call(FetchResultError)
  465. with self.assertLogs(logger, "ERROR") as cm:
  466. self._run(result)
  467. self.assertTrue(any("Could not fetch results" in o for o in cm.output))
  468. self.assertEqual(result.return_code, ExitCode.SUCCESS.value)
  469. self.assertEqual(result.status, "finished")
  470. # Ensure cluster was terminated
  471. self.assertGreaterEqual(self.sdk.call_counter["terminate_cluster"], 1)
  472. def testFetchResultFailsReqNonEmptyResult(self):
  473. # set `require_result` bit.
  474. new_handler = (result_to_handle_map["unit_test_alerter"], True)
  475. result_to_handle_map["unit_test_alerter"] = new_handler
  476. result = Result()
  477. self._succeed_until("test_command")
  478. self.command_runner_return["fetch_results"] = _fail_on_call(FetchResultError)
  479. with self.assertRaisesRegex(FetchResultError, "Fail"):
  480. with self.assertLogs(logger, "ERROR") as cm:
  481. self._run(result)
  482. self.assertTrue(any("Could not fetch results" in o for o in cm.output))
  483. self.assertEqual(result.return_code, ExitCode.FETCH_RESULT_ERROR.value)
  484. self.assertEqual(result.status, "infra_error")
  485. # Ensure cluster was terminated, no matter what
  486. self.assertGreaterEqual(self.sdk.call_counter["terminate_cluster"], 1)
  487. def testLastLogsFails(self):
  488. result = Result()
  489. self._succeed_until("fetch_results")
  490. self.command_runner_return["get_last_logs_ex"] = _fail_on_call(LogsError)
  491. with self.assertLogs(logger, "ERROR") as cm:
  492. self._run(result)
  493. self.assertTrue(any("Error fetching logs" in o for o in cm.output))
  494. self.assertEqual(result.return_code, ExitCode.SUCCESS.value)
  495. self.assertEqual(result.status, "finished")
  496. # Ensure cluster was terminated
  497. self.assertGreaterEqual(self.sdk.call_counter["terminate_cluster"], 1)
  498. def testAlertFails(self):
  499. result = Result()
  500. self._succeed_until("get_last_logs")
  501. self.mock_alert_return = "Alert raised"
  502. with self.assertRaises(ResultsAlert):
  503. self._run(result)
  504. self.assertEqual(result.return_code, ExitCode.COMMAND_ALERT.value)
  505. self.assertEqual(result.status, "error")
  506. # Ensure cluster was terminated
  507. self.assertGreaterEqual(self.sdk.call_counter["terminate_cluster"], 1)
  508. def testReportFails(self):
  509. result = Result()
  510. self._succeed_until("complete")
  511. class FailReporter(Reporter):
  512. def report_result_ex(self, test: Test, result: Result):
  513. raise RuntimeError
  514. with self.assertLogs(logger, "ERROR") as cm:
  515. self._run(result, reporters=[FailReporter()])
  516. self.assertTrue(any("Error reporting results" in o for o in cm.output))
  517. self.assertEqual(result.return_code, ExitCode.SUCCESS.value)
  518. self.assertEqual(result.status, "finished")
  519. # Ensure cluster was terminated
  520. self.assertGreaterEqual(self.sdk.call_counter["terminate_cluster"], 1)
  521. if __name__ == "__main__":
  522. sys.exit(pytest.main(["-v", __file__]))