test_cluster_manager.py 31 KB


  1. import os
  2. import sys
  3. import time
  4. import unittest
  5. from typing import Callable
  6. from unittest.mock import patch
  7. from uuid import uuid4
  8. from freezegun import freeze_time
  9. from ray_release.exception import (
  10. ClusterCreationError,
  11. ClusterStartupError,
  12. ClusterStartupTimeout,
  13. ClusterStartupFailed,
  14. ClusterEnvBuildError,
  15. ClusterEnvBuildTimeout,
  16. ClusterComputeCreateError,
  17. ClusterEnvCreateError,
  18. )
  19. from ray_release.cluster_manager.full import FullClusterManager
  20. from ray_release.cluster_manager.minimal import MinimalClusterManager
  21. from ray_release.tests.utils import (
  22. UNIT_TEST_PROJECT_ID,
  23. UNIT_TEST_CLOUD_ID,
  24. APIDict,
  25. fail_always,
  26. fail_once,
  27. MockSDK,
  28. )
  29. from ray_release.util import get_anyscale_sdk
  30. TEST_CLUSTER_ENV = {
  31. "base_image": "anyscale/ray:nightly-py37",
  32. "env_vars": {},
  33. "python": {
  34. "pip_packages": [],
  35. },
  36. "conda_packages": [],
  37. "post_build_cmds": [f"echo {uuid4().hex[:8]}"],
  38. }
  39. TEST_CLUSTER_COMPUTE = {
  40. "cloud_id": UNIT_TEST_CLOUD_ID,
  41. "region": "us-west-2",
  42. "max_workers": 0,
  43. "head_node_type": {"name": "head_node", "instance_type": "m5.4xlarge"},
  44. "worker_node_types": [
  45. {
  46. "name": "worker_node",
  47. "instance_type": "m5.xlarge",
  48. "min_workers": 0,
  49. "max_workers": 0,
  50. "use_spot": False,
  51. }
  52. ],
  53. }
  54. def _fail(*args, **kwargs):
  55. raise RuntimeError()
  56. class _DelayedResponse:
  57. def __init__(
  58. self,
  59. callback: Callable[[], None],
  60. finish_after: float,
  61. before: APIDict,
  62. after: APIDict,
  63. ):
  64. self.callback = callback
  65. self.finish_after = time.monotonic() + finish_after
  66. self.before = before
  67. self.after = after
  68. def __call__(self, *args, **kwargs):
  69. self.callback()
  70. if time.monotonic() > self.finish_after:
  71. return self.after
  72. else:
  73. return self.before
  74. class MinimalSessionManagerTest(unittest.TestCase):
  75. cls = MinimalClusterManager
  76. def setUp(self) -> None:
  77. self.sdk = MockSDK()
  78. self.sdk.returns["get_project"] = APIDict(
  79. result=APIDict(name="release_unit_tests")
  80. )
  81. self.cluster_env = TEST_CLUSTER_ENV
  82. self.cluster_compute = TEST_CLUSTER_COMPUTE
  83. self.cluster_manager = self.cls(
  84. project_id=UNIT_TEST_PROJECT_ID,
  85. sdk=self.sdk,
  86. test_name=f"unit_test__{self.__class__.__name__}",
  87. )
  88. self.sdk.reset()
  89. self.sdk.returns["get_cloud"] = APIDict(result=APIDict(provider="AWS"))
  90. def testClusterName(self):
  91. sdk = MockSDK()
  92. sdk.returns["get_project"] = APIDict(result=APIDict(name="release_unit_tests"))
  93. sdk.returns["get_cloud"] = APIDict(result=APIDict(provider="AWS"))
  94. cluster_manager = self.cls(
  95. test_name="test", project_id=UNIT_TEST_PROJECT_ID, smoke_test=False, sdk=sdk
  96. )
  97. self.assertRegex(cluster_manager.cluster_name, r"^test_\d+$")
  98. cluster_manager = self.cls(
  99. test_name="test", project_id=UNIT_TEST_PROJECT_ID, smoke_test=True, sdk=sdk
  100. )
  101. self.assertRegex(cluster_manager.cluster_name, r"^test-smoke-test_\d+$")
  102. def testSetClusterEnv(self):
  103. sdk = MockSDK()
  104. sdk.returns["get_project"] = APIDict(result=APIDict(name="release_unit_tests"))
  105. sdk.returns["get_cloud"] = APIDict(result=APIDict(provider="AWS"))
  106. cluster_manager = self.cls(
  107. test_name="test", project_id=UNIT_TEST_PROJECT_ID, smoke_test=False, sdk=sdk
  108. )
  109. cluster_manager.set_cluster_env({})
  110. self.assertEqual(
  111. cluster_manager.cluster_env["env_vars"]["RAY_USAGE_STATS_EXTRA_TAGS"],
  112. "test_name=test;smoke_test=False",
  113. )
  114. cluster_manager = self.cls(
  115. test_name="Test", project_id=UNIT_TEST_PROJECT_ID, smoke_test=True, sdk=sdk
  116. )
  117. cluster_manager.set_cluster_env({})
  118. self.assertEqual(
  119. cluster_manager.cluster_env["env_vars"]["RAY_USAGE_STATS_EXTRA_TAGS"],
  120. "test_name=Test;smoke_test=True",
  121. )
  122. @patch("time.sleep", lambda *a, **kw: None)
  123. def testFindCreateClusterComputeExisting(self):
  124. # Find existing compute and succeed
  125. self.cluster_manager.set_cluster_compute(self.cluster_compute)
  126. self.assertTrue(self.cluster_manager.cluster_compute_name)
  127. self.assertFalse(self.cluster_manager.cluster_compute_id)
  128. self.sdk.returns["search_cluster_computes"] = APIDict(
  129. metadata=APIDict(
  130. next_paging_token=None,
  131. ),
  132. results=[
  133. APIDict(
  134. name="no_match",
  135. id="wrong",
  136. ),
  137. APIDict(name=self.cluster_manager.cluster_compute_name, id="correct"),
  138. ],
  139. )
  140. self.cluster_manager.create_cluster_compute()
  141. self.assertEqual(self.cluster_manager.cluster_compute_id, "correct")
  142. self.assertEqual(self.sdk.call_counter["search_cluster_computes"], 1)
  143. self.assertEqual(len(self.sdk.call_counter), 2) # 1 extra for cloud provider
  144. @patch("time.sleep", lambda *a, **kw: None)
  145. def testFindCreateClusterComputeCreateFailFail(self):
  146. # No existing compute, create new, but fail both times
  147. self.cluster_manager.set_cluster_compute(self.cluster_compute)
  148. self.assertTrue(self.cluster_manager.cluster_compute_name)
  149. self.assertFalse(self.cluster_manager.cluster_compute_id)
  150. self.sdk.returns["search_cluster_computes"] = APIDict(
  151. metadata=APIDict(
  152. next_paging_token=None,
  153. ),
  154. results=[
  155. APIDict(
  156. name="no_match",
  157. id="wrong",
  158. ),
  159. ],
  160. )
  161. self.sdk.returns["create_cluster_compute"] = fail_always
  162. with self.assertRaises(ClusterComputeCreateError):
  163. self.cluster_manager.create_cluster_compute()
  164. # No cluster ID found or created
  165. self.assertFalse(self.cluster_manager.cluster_compute_id)
  166. # Both APIs were called twice (retry after fail)
  167. self.assertEqual(self.sdk.call_counter["search_cluster_computes"], 2)
  168. self.assertEqual(self.sdk.call_counter["create_cluster_compute"], 2)
  169. self.assertEqual(len(self.sdk.call_counter), 3) # 1 extra for cloud provider
  170. @patch("time.sleep", lambda *a, **kw: None)
  171. def testFindCreateClusterComputeCreateFailSucceed(self):
  172. # No existing compute, create new, fail once, succeed afterwards
  173. self.cluster_manager.set_cluster_compute(self.cluster_compute)
  174. self.assertTrue(self.cluster_manager.cluster_compute_name)
  175. self.assertFalse(self.cluster_manager.cluster_compute_id)
  176. self.sdk.returns["search_cluster_computes"] = APIDict(
  177. metadata=APIDict(
  178. next_paging_token=None,
  179. ),
  180. results=[
  181. APIDict(
  182. name="no_match",
  183. id="wrong",
  184. ),
  185. ],
  186. )
  187. self.sdk.returns["create_cluster_compute"] = fail_once(
  188. result=APIDict(
  189. result=APIDict(
  190. id="correct",
  191. )
  192. )
  193. )
  194. self.cluster_manager.create_cluster_compute()
  195. # Both APIs were called twice (retry after fail)
  196. self.assertEqual(self.cluster_manager.cluster_compute_id, "correct")
  197. self.assertEqual(self.sdk.call_counter["search_cluster_computes"], 2)
  198. self.assertEqual(self.sdk.call_counter["create_cluster_compute"], 2)
  199. self.assertEqual(len(self.sdk.call_counter), 3) # 1 extra for cloud provider
  200. @patch("time.sleep", lambda *a, **kw: None)
  201. def testFindCreateClusterComputeCreateSucceed(self):
  202. # No existing compute, create new, and succeed
  203. self.cluster_manager.set_cluster_compute(self.cluster_compute)
  204. self.assertTrue(self.cluster_manager.cluster_compute_name)
  205. self.assertFalse(self.cluster_manager.cluster_compute_id)
  206. self.sdk.returns["search_cluster_computes"] = APIDict(
  207. metadata=APIDict(
  208. next_paging_token=None,
  209. ),
  210. results=[
  211. APIDict(
  212. name="no_match",
  213. id="wrong",
  214. ),
  215. ],
  216. )
  217. self.sdk.returns["create_cluster_compute"] = APIDict(
  218. result=APIDict(
  219. id="correct",
  220. )
  221. )
  222. self.cluster_manager.create_cluster_compute()
  223. # Both APIs were called twice (retry after fail)
  224. self.assertEqual(self.cluster_manager.cluster_compute_id, "correct")
  225. self.assertEqual(self.sdk.call_counter["search_cluster_computes"], 1)
  226. self.assertEqual(self.sdk.call_counter["create_cluster_compute"], 1)
  227. self.assertEqual(len(self.sdk.call_counter), 3) # 1 extra for cloud provider
  228. # Test automatic fields
  229. self.assertEqual(
  230. self.cluster_manager.cluster_compute["idle_termination_minutes"],
  231. self.cluster_manager.autosuspend_minutes,
  232. )
  233. self.assertEqual(
  234. self.cluster_manager.cluster_compute["maximum_uptime_minutes"],
  235. self.cluster_manager.maximum_uptime_minutes,
  236. )
  237. def testClusterComputeExtraTags(self):
  238. self.cluster_manager.set_cluster_compute(self.cluster_compute)
  239. # No extra tags specified
  240. self.assertEqual(self.cluster_manager.cluster_compute, self.cluster_compute)
  241. # Extra tags specified
  242. self.cluster_manager.set_cluster_compute(
  243. self.cluster_compute, extra_tags={"foo": "bar"}
  244. )
  245. # All ResourceTypes as in
  246. # ray_release.aws.RELEASE_AWS_RESOURCE_TYPES_TO_TRACK_FOR_BILLING
  247. target_cluster_compute = TEST_CLUSTER_COMPUTE.copy()
  248. target_cluster_compute["aws"] = {
  249. "TagSpecifications": [
  250. {"ResourceType": "instance", "Tags": [{"Key": "foo", "Value": "bar"}]},
  251. {"ResourceType": "volume", "Tags": [{"Key": "foo", "Value": "bar"}]},
  252. ]
  253. }
  254. self.assertEqual(
  255. self.cluster_manager.cluster_compute["aws"], target_cluster_compute["aws"]
  256. )
  257. # Test merging with already existing tags
  258. cluster_compute_with_tags = TEST_CLUSTER_COMPUTE.copy()
  259. cluster_compute_with_tags["aws"] = {
  260. "TagSpecifications": [
  261. {"ResourceType": "fake", "Tags": []},
  262. {"ResourceType": "instance", "Tags": [{"Key": "key", "Value": "val"}]},
  263. ]
  264. }
  265. self.cluster_manager.set_cluster_compute(
  266. cluster_compute_with_tags, extra_tags={"foo": "bar"}
  267. )
  268. # All ResourceTypes as in RELEASE_AWS_RESOURCE_TYPES_TO_TRACK_FOR_BILLING
  269. target_cluster_compute = TEST_CLUSTER_COMPUTE.copy()
  270. target_cluster_compute["aws"] = {
  271. "TagSpecifications": [
  272. {"ResourceType": "fake", "Tags": []},
  273. {
  274. "ResourceType": "instance",
  275. "Tags": [
  276. {"Key": "key", "Value": "val"},
  277. {"Key": "foo", "Value": "bar"},
  278. ],
  279. },
  280. {"ResourceType": "volume", "Tags": [{"Key": "foo", "Value": "bar"}]},
  281. ]
  282. }
  283. self.assertEqual(
  284. self.cluster_manager.cluster_compute["aws"], target_cluster_compute["aws"]
  285. )
  286. @patch("time.sleep", lambda *a, **kw: None)
  287. def testFindCreateClusterEnvExisting(self):
  288. # Find existing env and succeed
  289. self.cluster_manager.set_cluster_env(self.cluster_env)
  290. self.assertTrue(self.cluster_manager.cluster_env_name)
  291. self.assertFalse(self.cluster_manager.cluster_env_id)
  292. self.sdk.returns["search_cluster_environments"] = APIDict(
  293. metadata=APIDict(
  294. next_paging_token=None,
  295. ),
  296. results=[
  297. APIDict(
  298. name="no_match",
  299. id="wrong",
  300. ),
  301. APIDict(name=self.cluster_manager.cluster_env_name, id="correct"),
  302. ],
  303. )
  304. self.cluster_manager.create_cluster_env()
  305. self.assertEqual(self.cluster_manager.cluster_env_id, "correct")
  306. self.assertEqual(self.sdk.call_counter["search_cluster_environments"], 1)
  307. self.assertEqual(len(self.sdk.call_counter), 1)
  308. @patch("time.sleep", lambda *a, **kw: None)
  309. def testFindCreateClusterEnvFailFail(self):
  310. # No existing compute, create new, but fail both times
  311. self.cluster_manager.set_cluster_env(self.cluster_env)
  312. self.assertTrue(self.cluster_manager.cluster_env_name)
  313. self.assertFalse(self.cluster_manager.cluster_env_id)
  314. self.sdk.returns["search_cluster_environments"] = APIDict(
  315. metadata=APIDict(
  316. next_paging_token=None,
  317. ),
  318. results=[
  319. APIDict(
  320. name="no_match",
  321. id="wrong",
  322. ),
  323. ],
  324. )
  325. self.sdk.returns["create_cluster_environment"] = fail_always
  326. with self.assertRaises(ClusterEnvCreateError):
  327. self.cluster_manager.create_cluster_env()
  328. # No cluster ID found or created
  329. self.assertFalse(self.cluster_manager.cluster_env_id)
  330. # Both APIs were called twice (retry after fail)
  331. self.assertEqual(self.sdk.call_counter["search_cluster_environments"], 2)
  332. self.assertEqual(self.sdk.call_counter["create_cluster_environment"], 2)
  333. self.assertEqual(len(self.sdk.call_counter), 2)
  334. @patch("time.sleep", lambda *a, **kw: None)
  335. def testFindCreateClusterEnvFailSucceed(self):
  336. # No existing compute, create new, fail once, succeed afterwards
  337. self.cluster_manager.set_cluster_env(self.cluster_env)
  338. self.assertTrue(self.cluster_manager.cluster_env_name)
  339. self.assertFalse(self.cluster_manager.cluster_env_id)
  340. self.cluster_manager.cluster_env_id = None
  341. self.sdk.reset()
  342. self.sdk.returns["search_cluster_environments"] = APIDict(
  343. metadata=APIDict(
  344. next_paging_token=None,
  345. ),
  346. results=[
  347. APIDict(
  348. name="no_match",
  349. id="wrong",
  350. ),
  351. ],
  352. )
  353. self.sdk.returns["create_cluster_environment"] = fail_once(
  354. result=APIDict(
  355. result=APIDict(
  356. id="correct",
  357. )
  358. )
  359. )
  360. self.cluster_manager.create_cluster_env()
  361. # Both APIs were called twice (retry after fail)
  362. self.assertEqual(self.cluster_manager.cluster_env_id, "correct")
  363. self.assertEqual(self.sdk.call_counter["search_cluster_environments"], 2)
  364. self.assertEqual(self.sdk.call_counter["create_cluster_environment"], 2)
  365. self.assertEqual(len(self.sdk.call_counter), 2)
  366. @patch("time.sleep", lambda *a, **kw: None)
  367. def testFindCreateClusterEnvSucceed(self):
  368. # No existing compute, create new, and succeed
  369. self.cluster_manager.set_cluster_env(self.cluster_env)
  370. self.assertTrue(self.cluster_manager.cluster_env_name)
  371. self.assertFalse(self.cluster_manager.cluster_env_id)
  372. self.sdk.returns["search_cluster_environments"] = APIDict(
  373. metadata=APIDict(
  374. next_paging_token=None,
  375. ),
  376. results=[
  377. APIDict(
  378. name="no_match",
  379. id="wrong",
  380. ),
  381. ],
  382. )
  383. self.sdk.returns["create_cluster_environment"] = APIDict(
  384. result=APIDict(
  385. id="correct",
  386. )
  387. )
  388. self.cluster_manager.create_cluster_env()
  389. # Both APIs were called twice (retry after fail)
  390. self.assertEqual(self.cluster_manager.cluster_env_id, "correct")
  391. self.assertEqual(self.sdk.call_counter["search_cluster_environments"], 1)
  392. self.assertEqual(self.sdk.call_counter["create_cluster_environment"], 1)
  393. self.assertEqual(len(self.sdk.call_counter), 2)
  394. @patch("time.sleep", lambda *a, **kw: None)
  395. def testBuildClusterEnvNotFound(self):
  396. self.cluster_manager.set_cluster_env(self.cluster_env)
  397. self.cluster_manager.cluster_env_id = "correct"
  398. # Environment build not found
  399. self.sdk.returns["list_cluster_environment_builds"] = APIDict(results=[])
  400. with self.assertRaisesRegex(ClusterEnvBuildError, "No build found"):
  401. self.cluster_manager.build_cluster_env(timeout=600)
  402. @patch("time.sleep", lambda *a, **kw: None)
  403. def testBuildClusterEnvPreBuildFailed(self):
  404. """Pre-build fails, but is kicked off again."""
  405. self.cluster_manager.set_cluster_env(self.cluster_env)
  406. self.cluster_manager.cluster_env_id = "correct"
  407. # Build failed on first lookup
  408. self.cluster_manager.cluster_env_build_id = None
  409. self.sdk.reset()
  410. self.sdk.returns["list_cluster_environment_builds"] = APIDict(
  411. results=[
  412. APIDict(
  413. id="build_failed",
  414. status="failed",
  415. created_at=0,
  416. error_message=None,
  417. config_json={},
  418. )
  419. ]
  420. )
  421. self.sdk.returns["create_cluster_environment_build"] = APIDict(
  422. result=APIDict(id="new_build_id")
  423. )
  424. self.sdk.returns["get_build"] = APIDict(
  425. result=APIDict(
  426. id="build_now_succeeded",
  427. status="failed",
  428. created_at=0,
  429. error_message=None,
  430. config_json={},
  431. )
  432. )
  433. with self.assertRaisesRegex(ClusterEnvBuildError, "Cluster env build failed"):
  434. self.cluster_manager.build_cluster_env(timeout=600)
  435. self.assertFalse(self.cluster_manager.cluster_env_build_id)
  436. self.assertEqual(self.sdk.call_counter["list_cluster_environment_builds"], 1)
  437. self.assertEqual(self.sdk.call_counter["create_cluster_environment_build"], 1)
  438. self.assertEqual(len(self.sdk.call_counter), 3)
  439. @patch("time.sleep", lambda *a, **kw: None)
  440. def testBuildClusterEnvPreBuildSucceeded(self):
  441. self.cluster_manager.set_cluster_env(self.cluster_env)
  442. self.cluster_manager.cluster_env_id = "correct"
  443. # (Second) build succeeded
  444. self.cluster_manager.cluster_env_build_id = None
  445. self.sdk.reset()
  446. self.sdk.returns["list_cluster_environment_builds"] = APIDict(
  447. results=[
  448. APIDict(
  449. id="build_failed",
  450. status="failed",
  451. created_at=0,
  452. error_message=None,
  453. config_json={},
  454. ),
  455. APIDict(
  456. id="build_succeeded",
  457. status="succeeded",
  458. created_at=1,
  459. error_message=None,
  460. config_json={},
  461. ),
  462. ]
  463. )
  464. self.cluster_manager.build_cluster_env(timeout=600)
  465. self.assertTrue(self.cluster_manager.cluster_env_build_id)
  466. self.assertEqual(self.cluster_manager.cluster_env_build_id, "build_succeeded")
  467. self.assertEqual(self.sdk.call_counter["list_cluster_environment_builds"], 1)
  468. self.assertEqual(len(self.sdk.call_counter), 1)
  469. @patch("time.sleep", lambda *a, **kw: None)
  470. def testBuildClusterEnvSelectLastBuild(self):
  471. self.cluster_manager.set_cluster_env(self.cluster_env)
  472. self.cluster_manager.cluster_env_id = "correct"
  473. # (Second) build succeeded
  474. self.cluster_manager.cluster_env_build_id = None
  475. self.sdk.reset()
  476. self.sdk.returns["list_cluster_environment_builds"] = APIDict(
  477. results=[
  478. APIDict(
  479. id="build_succeeded",
  480. status="succeeded",
  481. created_at=0,
  482. error_message=None,
  483. config_json={},
  484. ),
  485. APIDict(
  486. id="build_succeeded_2",
  487. status="succeeded",
  488. created_at=1,
  489. error_message=None,
  490. config_json={},
  491. ),
  492. ]
  493. )
  494. self.cluster_manager.build_cluster_env(timeout=600)
  495. self.assertTrue(self.cluster_manager.cluster_env_build_id)
  496. self.assertEqual(self.cluster_manager.cluster_env_build_id, "build_succeeded_2")
  497. self.assertEqual(self.sdk.call_counter["list_cluster_environment_builds"], 1)
  498. self.assertEqual(len(self.sdk.call_counter), 1)
  499. @patch("time.sleep", lambda *a, **kw: None)
  500. def testBuildClusterBuildFails(self):
  501. self.cluster_manager.set_cluster_env(self.cluster_env)
  502. self.cluster_manager.cluster_env_id = "correct"
  503. # Build, but fails after 300 seconds
  504. self.cluster_manager.cluster_env_build_id = None
  505. self.sdk.reset()
  506. self.sdk.returns["list_cluster_environment_builds"] = APIDict(
  507. results=[
  508. APIDict(
  509. id="build_failed",
  510. status="failed",
  511. created_at=0,
  512. error_message=None,
  513. config_json={},
  514. ),
  515. APIDict(
  516. id="build_succeeded",
  517. status="pending",
  518. created_at=1,
  519. error_message=None,
  520. config_json={},
  521. ),
  522. ]
  523. )
  524. with freeze_time() as frozen_time, self.assertRaisesRegex(
  525. ClusterEnvBuildError, "Cluster env build failed"
  526. ):
  527. self.sdk.returns["get_build"] = _DelayedResponse(
  528. lambda: frozen_time.tick(delta=10),
  529. finish_after=300,
  530. before=APIDict(
  531. result=APIDict(
  532. status="in_progress", error_message=None, config_json={}
  533. )
  534. ),
  535. after=APIDict(
  536. result=APIDict(status="failed", error_message=None, config_json={})
  537. ),
  538. )
  539. self.cluster_manager.build_cluster_env(timeout=600)
  540. self.assertFalse(self.cluster_manager.cluster_env_build_id)
  541. self.assertEqual(self.sdk.call_counter["list_cluster_environment_builds"], 1)
  542. self.assertGreaterEqual(self.sdk.call_counter["get_build"], 9)
  543. self.assertEqual(len(self.sdk.call_counter), 2)
  544. @patch("time.sleep", lambda *a, **kw: None)
  545. def testBuildClusterEnvBuildTimeout(self):
  546. self.cluster_manager.set_cluster_env(self.cluster_env)
  547. self.cluster_manager.cluster_env_id = "correct"
  548. # Build, but timeout after 100 seconds
  549. self.cluster_manager.cluster_env_build_id = None
  550. self.sdk.reset()
  551. self.sdk.returns["list_cluster_environment_builds"] = APIDict(
  552. results=[
  553. APIDict(
  554. id="build_failed",
  555. status="failed",
  556. created_at=0,
  557. error_message=None,
  558. config_json={},
  559. ),
  560. APIDict(
  561. id="build_succeeded",
  562. status="pending",
  563. created_at=1,
  564. error_message=None,
  565. config_json={},
  566. ),
  567. ]
  568. )
  569. with freeze_time() as frozen_time, self.assertRaisesRegex(
  570. ClusterEnvBuildTimeout, "Time out when building cluster env"
  571. ):
  572. self.sdk.returns["get_build"] = _DelayedResponse(
  573. lambda: frozen_time.tick(delta=10),
  574. finish_after=300,
  575. before=APIDict(
  576. result=APIDict(
  577. status="in_progress", error_message=None, config_json={}
  578. )
  579. ),
  580. after=APIDict(
  581. result=APIDict(
  582. status="succeeded", error_message=None, config_json={}
  583. )
  584. ),
  585. )
  586. self.cluster_manager.build_cluster_env(timeout=100)
  587. self.assertFalse(self.cluster_manager.cluster_env_build_id)
  588. self.assertEqual(self.sdk.call_counter["list_cluster_environment_builds"], 1)
  589. self.assertGreaterEqual(self.sdk.call_counter["get_build"], 9)
  590. self.assertEqual(len(self.sdk.call_counter), 2)
  591. @patch("time.sleep", lambda *a, **kw: None)
  592. def testBuildClusterBuildSucceed(self):
  593. self.cluster_manager.set_cluster_env(self.cluster_env)
  594. self.cluster_manager.cluster_env_id = "correct"
  595. # Build, succeed after 300 seconds
  596. self.cluster_manager.cluster_env_build_id = None
  597. self.sdk.reset()
  598. self.sdk.returns["list_cluster_environment_builds"] = APIDict(
  599. results=[
  600. APIDict(
  601. id="build_failed",
  602. status="failed",
  603. created_at=0,
  604. error_message=None,
  605. config_json={},
  606. ),
  607. APIDict(
  608. id="build_succeeded",
  609. status="pending",
  610. created_at=1,
  611. error_message=None,
  612. config_json={},
  613. ),
  614. ]
  615. )
  616. with freeze_time() as frozen_time:
  617. self.sdk.returns["get_build"] = _DelayedResponse(
  618. lambda: frozen_time.tick(delta=10),
  619. finish_after=300,
  620. before=APIDict(
  621. result=APIDict(
  622. status="in_progress", error_message=None, config_json={}
  623. )
  624. ),
  625. after=APIDict(
  626. result=APIDict(
  627. status="succeeded", error_message=None, config_json={}
  628. )
  629. ),
  630. )
  631. self.cluster_manager.build_cluster_env(timeout=600)
  632. self.assertTrue(self.cluster_manager.cluster_env_build_id)
  633. self.assertEqual(self.sdk.call_counter["list_cluster_environment_builds"], 1)
  634. self.assertGreaterEqual(self.sdk.call_counter["get_build"], 9)
  635. self.assertEqual(len(self.sdk.call_counter), 2)
  636. class FullSessionManagerTest(MinimalSessionManagerTest):
  637. cls = FullClusterManager
  638. def testSessionStartCreationError(self):
  639. self.cluster_manager.cluster_env_id = "correct"
  640. self.cluster_manager.cluster_compute_id = "correct"
  641. self.sdk.returns["create_cluster"] = _fail
  642. with self.assertRaises(ClusterCreationError):
  643. self.cluster_manager.start_cluster()
  644. def testSessionStartStartupError(self):
  645. self.cluster_manager.cluster_env_id = "correct"
  646. self.cluster_manager.cluster_compute_id = "correct"
  647. self.sdk.returns["create_cluster"] = APIDict(result=APIDict(id="success"))
  648. self.sdk.returns["start_cluster"] = _fail
  649. with self.assertRaises(ClusterStartupError):
  650. self.cluster_manager.start_cluster()
  651. @patch("time.sleep", lambda *a, **kw: None)
  652. def testSessionStartStartupTimeout(self):
  653. self.cluster_manager.cluster_env_id = "correct"
  654. self.cluster_manager.cluster_compute_id = "correct"
  655. self.sdk.returns["create_cluster"] = APIDict(result=APIDict(id="success"))
  656. self.sdk.returns["start_cluster"] = APIDict(
  657. result=APIDict(id="cop_id", completed=False)
  658. )
  659. with freeze_time() as frozen_time, self.assertRaises(ClusterStartupTimeout):
  660. self.sdk.returns["get_cluster_operation"] = _DelayedResponse(
  661. lambda: frozen_time.tick(delta=10),
  662. finish_after=300,
  663. before=APIDict(result=APIDict(completed=False)),
  664. after=APIDict(result=APIDict(completed=True)),
  665. )
  666. # Timeout before startup finishes
  667. self.cluster_manager.start_cluster(timeout=200)
  668. @patch("time.sleep", lambda *a, **kw: None)
  669. def testSessionStartStartupFailed(self):
  670. self.cluster_manager.cluster_env_id = "correct"
  671. self.cluster_manager.cluster_compute_id = "correct"
  672. self.sdk.returns["create_cluster"] = APIDict(result=APIDict(id="success"))
  673. self.sdk.returns["start_cluster"] = APIDict(
  674. result=APIDict(id="cop_id", completed=False)
  675. )
  676. with freeze_time() as frozen_time, self.assertRaises(ClusterStartupFailed):
  677. frozen_time.tick(delta=0.1)
  678. self.sdk.returns["get_cluster_operation"] = _DelayedResponse(
  679. lambda: frozen_time.tick(delta=10),
  680. finish_after=300,
  681. before=APIDict(result=APIDict(completed=False)),
  682. after=APIDict(result=APIDict(completed=True)),
  683. )
  684. self.sdk.returns["get_cluster"] = APIDict(
  685. result=APIDict(state="Terminated")
  686. )
  687. # Timeout is long enough
  688. self.cluster_manager.start_cluster(timeout=400)
  689. @patch("time.sleep", lambda *a, **kw: None)
  690. def testSessionStartStartupSuccess(self):
  691. self.cluster_manager.cluster_env_id = "correct"
  692. self.cluster_manager.cluster_compute_id = "correct"
  693. self.sdk.returns["create_cluster"] = APIDict(result=APIDict(id="success"))
  694. self.sdk.returns["start_cluster"] = APIDict(
  695. result=APIDict(id="cop_id", completed=False)
  696. )
  697. with freeze_time() as frozen_time:
  698. frozen_time.tick(delta=0.1)
  699. self.sdk.returns["get_cluster_operation"] = _DelayedResponse(
  700. lambda: frozen_time.tick(delta=10),
  701. finish_after=300,
  702. before=APIDict(result=APIDict(completed=False)),
  703. after=APIDict(result=APIDict(completed=True)),
  704. )
  705. self.sdk.returns["get_cluster"] = APIDict(result=APIDict(state="Running"))
  706. # Timeout is long enough
  707. self.cluster_manager.start_cluster(timeout=400)
  708. @unittest.skipUnless(
  709. os.environ.get("RELEASE_UNIT_TEST_NO_ANYSCALE", "0") == "1",
  710. reason="RELEASE_UNIT_TEST_NO_ANYSCALE is set to 1",
  711. )
  712. class LiveSessionManagerTest(unittest.TestCase):
  713. def setUp(self) -> None:
  714. self.sdk = get_anyscale_sdk()
  715. self.cluster_env = TEST_CLUSTER_ENV
  716. self.cluster_compute = TEST_CLUSTER_COMPUTE
  717. self.cluster_manager = FullClusterManager(
  718. project_id=UNIT_TEST_PROJECT_ID,
  719. sdk=self.sdk,
  720. test_name=f"unit_test__{self.__class__.__name__}__endToEnd",
  721. )
  722. def tearDown(self) -> None:
  723. self.cluster_manager.terminate_cluster()
  724. self.cluster_manager.delete_configs()
  725. def testSessionEndToEnd(self):
  726. self.cluster_manager.set_cluster_env(self.cluster_env)
  727. self.cluster_manager.set_cluster_compute(self.cluster_compute)
  728. self.cluster_manager.build_configs(timeout=1200)
  729. # Reset, so that we fetch them again and test that code path
  730. self.cluster_manager.cluster_compute_id = None
  731. self.cluster_manager.cluster_env_id = None
  732. self.cluster_manager.cluster_env_build_id = None
  733. self.cluster_manager.build_configs(timeout=1200)
  734. # Start cluster
  735. self.cluster_manager.start_cluster(timeout=1200)
  736. if __name__ == "__main__":
  737. import pytest
  738. sys.exit(pytest.main(["-v", __file__]))