123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824 |
- import os
- import sys
- import time
- import unittest
- from typing import Callable
- from unittest.mock import patch
- from freezegun import freeze_time
- from ray_release.exception import (
- ClusterCreationError,
- ClusterStartupError,
- ClusterStartupTimeout,
- ClusterStartupFailed,
- ClusterEnvBuildError,
- ClusterEnvBuildTimeout,
- ClusterComputeCreateError,
- ClusterEnvCreateError,
- )
- from ray_release.cluster_manager.full import FullClusterManager
- from ray_release.cluster_manager.minimal import MinimalClusterManager
- from ray_release.tests.utils import (
- UNIT_TEST_PROJECT_ID,
- UNIT_TEST_CLOUD_ID,
- APIDict,
- fail_always,
- fail_once,
- MockSDK,
- )
- from ray_release.util import get_anyscale_sdk
- from ray_release.test import Test
- TEST_CLUSTER_COMPUTE = {
- "cloud_id": UNIT_TEST_CLOUD_ID,
- "region": "us-west-2",
- "max_workers": 0,
- "head_node_type": {"name": "head_node", "instance_type": "m5.4xlarge"},
- "worker_node_types": [
- {
- "name": "worker_node",
- "instance_type": "m5.xlarge",
- "min_workers": 0,
- "max_workers": 0,
- "use_spot": False,
- }
- ],
- }
- def _fail(*args, **kwargs):
- raise RuntimeError()
- class MockTest(Test):
- def get_anyscale_byod_image(self) -> str:
- return "anyscale"
- class _DelayedResponse:
- def __init__(
- self,
- callback: Callable[[], None],
- finish_after: float,
- before: APIDict,
- after: APIDict,
- ):
- self.callback = callback
- self.finish_after = time.monotonic() + finish_after
- self.before = before
- self.after = after
- def __call__(self, *args, **kwargs):
- self.callback()
- if time.monotonic() > self.finish_after:
- return self.after
- else:
- return self.before
- class MinimalSessionManagerTest(unittest.TestCase):
- cls = MinimalClusterManager
- def setUp(self) -> None:
- self.sdk = MockSDK()
- self.sdk.returns["get_project"] = APIDict(
- result=APIDict(name="release_unit_tests")
- )
- self.cluster_compute = TEST_CLUSTER_COMPUTE
- self.cluster_manager = self.cls(
- project_id=UNIT_TEST_PROJECT_ID,
- sdk=self.sdk,
- test=MockTest(
- {
- "name": f"unit_test__{self.__class__.__name__}",
- "cluster": {"byod": {}},
- }
- ),
- )
- self.sdk.reset()
- self.sdk.returns["get_cloud"] = APIDict(result=APIDict(provider="AWS"))
- def testClusterName(self):
- sdk = MockSDK()
- sdk.returns["get_project"] = APIDict(result=APIDict(name="release_unit_tests"))
- sdk.returns["get_cloud"] = APIDict(result=APIDict(provider="AWS"))
- cluster_manager = self.cls(
- test=MockTest({"name": "test"}),
- project_id=UNIT_TEST_PROJECT_ID,
- smoke_test=False,
- sdk=sdk,
- )
- self.assertRegex(cluster_manager.cluster_name, r"^test_\d+$")
- cluster_manager = self.cls(
- test=MockTest({"name": "test"}),
- project_id=UNIT_TEST_PROJECT_ID,
- smoke_test=True,
- sdk=sdk,
- )
- self.assertRegex(cluster_manager.cluster_name, r"^test-smoke-test_\d+$")
- def testSetClusterEnv(self):
- sdk = MockSDK()
- sdk.returns["get_project"] = APIDict(result=APIDict(name="release_unit_tests"))
- sdk.returns["get_cloud"] = APIDict(result=APIDict(provider="AWS"))
- cluster_manager = self.cls(
- test=MockTest({"name": "test", "cluster": {"byod": {}}}),
- project_id=UNIT_TEST_PROJECT_ID,
- smoke_test=False,
- sdk=sdk,
- )
- cluster_manager.set_cluster_env()
- self.assertEqual(
- cluster_manager.cluster_env_name,
- "anyscale__env__"
- "a93b7dec6c1b606a9814ceb96ace13e116d04cc8ce3a2bdea1b0f279c34ff692",
- )
- @patch("time.sleep", lambda *a, **kw: None)
- def testFindCreateClusterComputeExisting(self):
- # Find existing compute and succeed
- self.cluster_manager.set_cluster_compute(self.cluster_compute)
- self.assertTrue(self.cluster_manager.cluster_compute_name)
- self.assertFalse(self.cluster_manager.cluster_compute_id)
- self.sdk.returns["search_cluster_computes"] = APIDict(
- metadata=APIDict(
- next_paging_token=None,
- ),
- results=[
- APIDict(
- name="no_match",
- id="wrong",
- ),
- APIDict(name=self.cluster_manager.cluster_compute_name, id="correct"),
- ],
- )
- self.cluster_manager.create_cluster_compute()
- self.assertEqual(self.cluster_manager.cluster_compute_id, "correct")
- self.assertEqual(self.sdk.call_counter["search_cluster_computes"], 1)
- self.assertEqual(len(self.sdk.call_counter), 2) # 1 extra for cloud provider
- @patch("time.sleep", lambda *a, **kw: None)
- def testFindCreateClusterComputeCreateFailFail(self):
- # No existing compute, create new, but fail both times
- self.cluster_manager.set_cluster_compute(self.cluster_compute)
- self.assertTrue(self.cluster_manager.cluster_compute_name)
- self.assertFalse(self.cluster_manager.cluster_compute_id)
- self.sdk.returns["search_cluster_computes"] = APIDict(
- metadata=APIDict(
- next_paging_token=None,
- ),
- results=[
- APIDict(
- name="no_match",
- id="wrong",
- ),
- ],
- )
- self.sdk.returns["create_cluster_compute"] = fail_always
- with self.assertRaises(ClusterComputeCreateError):
- self.cluster_manager.create_cluster_compute()
- # No cluster ID found or created
- self.assertFalse(self.cluster_manager.cluster_compute_id)
- # Both APIs were called twice (retry after fail)
- self.assertEqual(self.sdk.call_counter["search_cluster_computes"], 2)
- self.assertEqual(self.sdk.call_counter["create_cluster_compute"], 2)
- self.assertEqual(len(self.sdk.call_counter), 3) # 1 extra for cloud provider
- @patch("time.sleep", lambda *a, **kw: None)
- def testFindCreateClusterComputeCreateFailSucceed(self):
- # No existing compute, create new, fail once, succeed afterwards
- self.cluster_manager.set_cluster_compute(self.cluster_compute)
- self.assertTrue(self.cluster_manager.cluster_compute_name)
- self.assertFalse(self.cluster_manager.cluster_compute_id)
- self.sdk.returns["search_cluster_computes"] = APIDict(
- metadata=APIDict(
- next_paging_token=None,
- ),
- results=[
- APIDict(
- name="no_match",
- id="wrong",
- ),
- ],
- )
- self.sdk.returns["create_cluster_compute"] = fail_once(
- result=APIDict(
- result=APIDict(
- id="correct",
- )
- )
- )
- self.cluster_manager.create_cluster_compute()
- # Both APIs were called twice (retry after fail)
- self.assertEqual(self.cluster_manager.cluster_compute_id, "correct")
- self.assertEqual(self.sdk.call_counter["search_cluster_computes"], 2)
- self.assertEqual(self.sdk.call_counter["create_cluster_compute"], 2)
- self.assertEqual(len(self.sdk.call_counter), 3) # 1 extra for cloud provider
- @patch("time.sleep", lambda *a, **kw: None)
- def testFindCreateClusterComputeCreateSucceed(self):
- # No existing compute, create new, and succeed
- self.cluster_manager.set_cluster_compute(self.cluster_compute)
- self.assertTrue(self.cluster_manager.cluster_compute_name)
- self.assertFalse(self.cluster_manager.cluster_compute_id)
- self.sdk.returns["search_cluster_computes"] = APIDict(
- metadata=APIDict(
- next_paging_token=None,
- ),
- results=[
- APIDict(
- name="no_match",
- id="wrong",
- ),
- ],
- )
- self.sdk.returns["create_cluster_compute"] = APIDict(
- result=APIDict(
- id="correct",
- )
- )
- self.cluster_manager.create_cluster_compute()
- # Both APIs were called twice (retry after fail)
- self.assertEqual(self.cluster_manager.cluster_compute_id, "correct")
- self.assertEqual(self.sdk.call_counter["search_cluster_computes"], 1)
- self.assertEqual(self.sdk.call_counter["create_cluster_compute"], 1)
- self.assertEqual(len(self.sdk.call_counter), 3) # 1 extra for cloud provider
- # Test automatic fields
- self.assertEqual(
- self.cluster_manager.cluster_compute["idle_termination_minutes"],
- self.cluster_manager.autosuspend_minutes,
- )
- self.assertEqual(
- self.cluster_manager.cluster_compute["maximum_uptime_minutes"],
- self.cluster_manager.maximum_uptime_minutes,
- )
- def testClusterComputeExtraTags(self):
- self.cluster_manager.set_cluster_compute(self.cluster_compute)
- # No extra tags specified
- self.assertEqual(self.cluster_manager.cluster_compute, self.cluster_compute)
- # Extra tags specified
- self.cluster_manager.set_cluster_compute(
- self.cluster_compute, extra_tags={"foo": "bar"}
- )
- # All ResourceTypes as in
- # ray_release.aws.RELEASE_AWS_RESOURCE_TYPES_TO_TRACK_FOR_BILLING
- target_cluster_compute = TEST_CLUSTER_COMPUTE.copy()
- target_cluster_compute["aws"] = {
- "TagSpecifications": [
- {"ResourceType": "instance", "Tags": [{"Key": "foo", "Value": "bar"}]},
- {"ResourceType": "volume", "Tags": [{"Key": "foo", "Value": "bar"}]},
- ]
- }
- self.assertEqual(
- self.cluster_manager.cluster_compute["aws"], target_cluster_compute["aws"]
- )
- # Test merging with already existing tags
- cluster_compute_with_tags = TEST_CLUSTER_COMPUTE.copy()
- cluster_compute_with_tags["aws"] = {
- "TagSpecifications": [
- {"ResourceType": "fake", "Tags": []},
- {"ResourceType": "instance", "Tags": [{"Key": "key", "Value": "val"}]},
- ]
- }
- self.cluster_manager.set_cluster_compute(
- cluster_compute_with_tags, extra_tags={"foo": "bar"}
- )
- # All ResourceTypes as in RELEASE_AWS_RESOURCE_TYPES_TO_TRACK_FOR_BILLING
- target_cluster_compute = TEST_CLUSTER_COMPUTE.copy()
- target_cluster_compute["aws"] = {
- "TagSpecifications": [
- {"ResourceType": "fake", "Tags": []},
- {
- "ResourceType": "instance",
- "Tags": [
- {"Key": "key", "Value": "val"},
- {"Key": "foo", "Value": "bar"},
- ],
- },
- {"ResourceType": "volume", "Tags": [{"Key": "foo", "Value": "bar"}]},
- ]
- }
- self.assertEqual(
- self.cluster_manager.cluster_compute["aws"], target_cluster_compute["aws"]
- )
- @patch("time.sleep", lambda *a, **kw: None)
- def testFindCreateClusterEnvExisting(self):
- # Find existing env and succeed
- self.cluster_manager.set_cluster_env()
- self.assertTrue(self.cluster_manager.cluster_env_name)
- self.assertFalse(self.cluster_manager.cluster_env_id)
- self.sdk.returns["search_cluster_environments"] = APIDict(
- metadata=APIDict(
- next_paging_token=None,
- ),
- results=[
- APIDict(
- name="no_match",
- id="wrong",
- ),
- APIDict(name=self.cluster_manager.cluster_env_name, id="correct"),
- ],
- )
- self.cluster_manager.create_cluster_env()
- self.assertEqual(self.cluster_manager.cluster_env_id, "correct")
- self.assertEqual(self.sdk.call_counter["search_cluster_environments"], 1)
- self.assertEqual(len(self.sdk.call_counter), 1)
- @patch("time.sleep", lambda *a, **kw: None)
- def testFindCreateClusterEnvFailFail(self):
- # No existing compute, create new, but fail both times
- self.cluster_manager.set_cluster_env()
- self.assertTrue(self.cluster_manager.cluster_env_name)
- self.assertFalse(self.cluster_manager.cluster_env_id)
- self.sdk.returns["search_cluster_environments"] = APIDict(
- metadata=APIDict(
- next_paging_token=None,
- ),
- results=[
- APIDict(
- name="no_match",
- id="wrong",
- ),
- ],
- )
- self.sdk.returns["create_byod_cluster_environment"] = fail_always
- with self.assertRaises(ClusterEnvCreateError):
- self.cluster_manager.create_cluster_env()
- # No cluster ID found or created
- self.assertFalse(self.cluster_manager.cluster_env_id)
- # Both APIs were called twice (retry after fail)
- self.assertEqual(self.sdk.call_counter["search_cluster_environments"], 2)
- self.assertEqual(self.sdk.call_counter["create_byod_cluster_environment"], 2)
- self.assertEqual(len(self.sdk.call_counter), 2)
- @patch("time.sleep", lambda *a, **kw: None)
- def testFindCreateClusterEnvFailSucceed(self):
- # No existing compute, create new, fail once, succeed afterwards
- self.cluster_manager.set_cluster_env()
- self.assertTrue(self.cluster_manager.cluster_env_name)
- self.assertFalse(self.cluster_manager.cluster_env_id)
- self.cluster_manager.cluster_env_id = None
- self.sdk.reset()
- self.sdk.returns["search_cluster_environments"] = APIDict(
- metadata=APIDict(
- next_paging_token=None,
- ),
- results=[
- APIDict(
- name="no_match",
- id="wrong",
- ),
- ],
- )
- self.sdk.returns["create_byod_cluster_environment"] = fail_once(
- result=APIDict(
- result=APIDict(
- id="correct",
- )
- )
- )
- self.cluster_manager.create_cluster_env()
- # Both APIs were called twice (retry after fail)
- self.assertEqual(self.cluster_manager.cluster_env_id, "correct")
- self.assertEqual(self.sdk.call_counter["search_cluster_environments"], 2)
- self.assertEqual(self.sdk.call_counter["create_byod_cluster_environment"], 2)
- self.assertEqual(len(self.sdk.call_counter), 2)
- @patch("time.sleep", lambda *a, **kw: None)
- def testFindCreateClusterEnvSucceed(self):
- # No existing compute, create new, and succeed
- self.cluster_manager.set_cluster_env()
- self.assertTrue(self.cluster_manager.cluster_env_name)
- self.assertFalse(self.cluster_manager.cluster_env_id)
- self.sdk.returns["search_cluster_environments"] = APIDict(
- metadata=APIDict(
- next_paging_token=None,
- ),
- results=[
- APIDict(
- name="no_match",
- id="wrong",
- ),
- ],
- )
- self.sdk.returns["create_byod_cluster_environment"] = APIDict(
- result=APIDict(
- id="correct",
- )
- )
- self.cluster_manager.create_cluster_env()
- # Both APIs were called twice (retry after fail)
- self.assertEqual(self.cluster_manager.cluster_env_id, "correct")
- self.assertEqual(self.sdk.call_counter["search_cluster_environments"], 1)
- self.assertEqual(self.sdk.call_counter["create_byod_cluster_environment"], 1)
- self.assertEqual(len(self.sdk.call_counter), 2)
- @patch("time.sleep", lambda *a, **kw: None)
- def testBuildClusterEnvNotFound(self):
- self.cluster_manager.set_cluster_env()
- self.cluster_manager.cluster_env_id = "correct"
- # Environment build not found
- self.sdk.returns["list_cluster_environment_builds"] = APIDict(results=[])
- with self.assertRaisesRegex(ClusterEnvBuildError, "No build found"):
- self.cluster_manager.build_cluster_env(timeout=600)
- @patch("time.sleep", lambda *a, **kw: None)
- def testBuildClusterEnvPreBuildFailed(self):
- """Pre-build fails, but is kicked off again."""
- self.cluster_manager.set_cluster_env()
- self.cluster_manager.cluster_env_id = "correct"
- # Build failed on first lookup
- self.cluster_manager.cluster_env_build_id = None
- self.sdk.reset()
- self.sdk.returns["list_cluster_environment_builds"] = APIDict(
- results=[
- APIDict(
- id="build_failed",
- status="failed",
- created_at=0,
- error_message=None,
- config_json={},
- )
- ]
- )
- self.sdk.returns["create_cluster_environment_build"] = APIDict(
- result=APIDict(id="new_build_id")
- )
- self.sdk.returns["get_build"] = APIDict(
- result=APIDict(
- id="build_now_succeeded",
- status="failed",
- created_at=0,
- error_message=None,
- config_json={},
- )
- )
- with self.assertRaisesRegex(ClusterEnvBuildError, "Cluster env build failed"):
- self.cluster_manager.build_cluster_env(timeout=600)
- self.assertFalse(self.cluster_manager.cluster_env_build_id)
- self.assertEqual(self.sdk.call_counter["list_cluster_environment_builds"], 1)
- self.assertEqual(self.sdk.call_counter["create_cluster_environment_build"], 1)
- self.assertEqual(len(self.sdk.call_counter), 3)
- @patch("time.sleep", lambda *a, **kw: None)
- def testBuildClusterEnvPreBuildSucceeded(self):
- self.cluster_manager.set_cluster_env()
- self.cluster_manager.cluster_env_id = "correct"
- # (Second) build succeeded
- self.cluster_manager.cluster_env_build_id = None
- self.sdk.reset()
- self.sdk.returns["list_cluster_environment_builds"] = APIDict(
- results=[
- APIDict(
- id="build_failed",
- status="failed",
- created_at=0,
- error_message=None,
- config_json={},
- ),
- APIDict(
- id="build_succeeded",
- status="succeeded",
- created_at=1,
- error_message=None,
- config_json={},
- ),
- ]
- )
- self.cluster_manager.build_cluster_env(timeout=600)
- self.assertTrue(self.cluster_manager.cluster_env_build_id)
- self.assertEqual(self.cluster_manager.cluster_env_build_id, "build_succeeded")
- self.assertEqual(self.sdk.call_counter["list_cluster_environment_builds"], 1)
- self.assertEqual(len(self.sdk.call_counter), 1)
- @patch("time.sleep", lambda *a, **kw: None)
- def testBuildClusterEnvSelectLastBuild(self):
- self.cluster_manager.set_cluster_env()
- self.cluster_manager.cluster_env_id = "correct"
- # (Second) build succeeded
- self.cluster_manager.cluster_env_build_id = None
- self.sdk.reset()
- self.sdk.returns["list_cluster_environment_builds"] = APIDict(
- results=[
- APIDict(
- id="build_succeeded",
- status="succeeded",
- created_at=0,
- error_message=None,
- config_json={},
- ),
- APIDict(
- id="build_succeeded_2",
- status="succeeded",
- created_at=1,
- error_message=None,
- config_json={},
- ),
- ]
- )
- self.cluster_manager.build_cluster_env(timeout=600)
- self.assertTrue(self.cluster_manager.cluster_env_build_id)
- self.assertEqual(self.cluster_manager.cluster_env_build_id, "build_succeeded_2")
- self.assertEqual(self.sdk.call_counter["list_cluster_environment_builds"], 1)
- self.assertEqual(len(self.sdk.call_counter), 1)
- @patch("time.sleep", lambda *a, **kw: None)
- def testBuildClusterBuildFails(self):
- self.cluster_manager.set_cluster_env()
- self.cluster_manager.cluster_env_id = "correct"
- # Build, but fails after 300 seconds
- self.cluster_manager.cluster_env_build_id = None
- self.sdk.reset()
- self.sdk.returns["list_cluster_environment_builds"] = APIDict(
- results=[
- APIDict(
- id="build_failed",
- status="failed",
- created_at=0,
- error_message=None,
- config_json={},
- ),
- APIDict(
- id="build_succeeded",
- status="pending",
- created_at=1,
- error_message=None,
- config_json={},
- ),
- ]
- )
- with freeze_time() as frozen_time, self.assertRaisesRegex(
- ClusterEnvBuildError, "Cluster env build failed"
- ):
- self.sdk.returns["get_build"] = _DelayedResponse(
- lambda: frozen_time.tick(delta=10),
- finish_after=300,
- before=APIDict(
- result=APIDict(
- status="in_progress", error_message=None, config_json={}
- )
- ),
- after=APIDict(
- result=APIDict(status="failed", error_message=None, config_json={})
- ),
- )
- self.cluster_manager.build_cluster_env(timeout=600)
- self.assertFalse(self.cluster_manager.cluster_env_build_id)
- self.assertEqual(self.sdk.call_counter["list_cluster_environment_builds"], 1)
- self.assertGreaterEqual(self.sdk.call_counter["get_build"], 9)
- self.assertEqual(len(self.sdk.call_counter), 2)
- @patch("time.sleep", lambda *a, **kw: None)
- def testBuildClusterEnvBuildTimeout(self):
- self.cluster_manager.set_cluster_env()
- self.cluster_manager.cluster_env_id = "correct"
- # Build, but timeout after 100 seconds
- self.cluster_manager.cluster_env_build_id = None
- self.sdk.reset()
- self.sdk.returns["list_cluster_environment_builds"] = APIDict(
- results=[
- APIDict(
- id="build_failed",
- status="failed",
- created_at=0,
- error_message=None,
- config_json={},
- ),
- APIDict(
- id="build_succeeded",
- status="pending",
- created_at=1,
- error_message=None,
- config_json={},
- ),
- ]
- )
- with freeze_time() as frozen_time, self.assertRaisesRegex(
- ClusterEnvBuildTimeout, "Time out when building cluster env"
- ):
- self.sdk.returns["get_build"] = _DelayedResponse(
- lambda: frozen_time.tick(delta=10),
- finish_after=300,
- before=APIDict(
- result=APIDict(
- status="in_progress", error_message=None, config_json={}
- )
- ),
- after=APIDict(
- result=APIDict(
- status="succeeded", error_message=None, config_json={}
- )
- ),
- )
- self.cluster_manager.build_cluster_env(timeout=100)
- self.assertFalse(self.cluster_manager.cluster_env_build_id)
- self.assertEqual(self.sdk.call_counter["list_cluster_environment_builds"], 1)
- self.assertGreaterEqual(self.sdk.call_counter["get_build"], 9)
- self.assertEqual(len(self.sdk.call_counter), 2)
- @patch("time.sleep", lambda *a, **kw: None)
- def testBuildClusterBuildSucceed(self):
- self.cluster_manager.set_cluster_env()
- self.cluster_manager.cluster_env_id = "correct"
- # Build, succeed after 300 seconds
- self.cluster_manager.cluster_env_build_id = None
- self.sdk.reset()
- self.sdk.returns["list_cluster_environment_builds"] = APIDict(
- results=[
- APIDict(
- id="build_failed",
- status="failed",
- created_at=0,
- error_message=None,
- config_json={},
- ),
- APIDict(
- id="build_succeeded",
- status="pending",
- created_at=1,
- error_message=None,
- config_json={},
- ),
- ]
- )
- with freeze_time() as frozen_time:
- self.sdk.returns["get_build"] = _DelayedResponse(
- lambda: frozen_time.tick(delta=10),
- finish_after=300,
- before=APIDict(
- result=APIDict(
- status="in_progress", error_message=None, config_json={}
- )
- ),
- after=APIDict(
- result=APIDict(
- status="succeeded", error_message=None, config_json={}
- )
- ),
- )
- self.cluster_manager.build_cluster_env(timeout=600)
- self.assertTrue(self.cluster_manager.cluster_env_build_id)
- self.assertEqual(self.sdk.call_counter["list_cluster_environment_builds"], 1)
- self.assertGreaterEqual(self.sdk.call_counter["get_build"], 9)
- self.assertEqual(len(self.sdk.call_counter), 2)
- class FullSessionManagerTest(MinimalSessionManagerTest):
- cls = FullClusterManager
- def testSessionStartCreationError(self):
- self.cluster_manager.cluster_env_id = "correct"
- self.cluster_manager.cluster_compute_id = "correct"
- self.sdk.returns["create_cluster"] = _fail
- with self.assertRaises(ClusterCreationError):
- self.cluster_manager.start_cluster()
- def testSessionStartStartupError(self):
- self.cluster_manager.cluster_env_id = "correct"
- self.cluster_manager.cluster_compute_id = "correct"
- self.sdk.returns["create_cluster"] = APIDict(result=APIDict(id="success"))
- self.sdk.returns["start_cluster"] = _fail
- with self.assertRaises(ClusterStartupError):
- self.cluster_manager.start_cluster()
- @patch("time.sleep", lambda *a, **kw: None)
- def testSessionStartStartupTimeout(self):
- self.cluster_manager.cluster_env_id = "correct"
- self.cluster_manager.cluster_compute_id = "correct"
- self.sdk.returns["create_cluster"] = APIDict(result=APIDict(id="success"))
- self.sdk.returns["start_cluster"] = APIDict(
- result=APIDict(id="cop_id", completed=False)
- )
- with freeze_time() as frozen_time, self.assertRaises(ClusterStartupTimeout):
- self.sdk.returns["get_cluster_operation"] = _DelayedResponse(
- lambda: frozen_time.tick(delta=10),
- finish_after=300,
- before=APIDict(result=APIDict(completed=False)),
- after=APIDict(result=APIDict(completed=True)),
- )
- # Timeout before startup finishes
- self.cluster_manager.start_cluster(timeout=200)
- @patch("time.sleep", lambda *a, **kw: None)
- def testSessionStartStartupFailed(self):
- self.cluster_manager.cluster_env_id = "correct"
- self.cluster_manager.cluster_compute_id = "correct"
- self.sdk.returns["create_cluster"] = APIDict(result=APIDict(id="success"))
- self.sdk.returns["start_cluster"] = APIDict(
- result=APIDict(id="cop_id", completed=False)
- )
- with freeze_time() as frozen_time, self.assertRaises(ClusterStartupFailed):
- frozen_time.tick(delta=0.1)
- self.sdk.returns["get_cluster_operation"] = _DelayedResponse(
- lambda: frozen_time.tick(delta=10),
- finish_after=300,
- before=APIDict(result=APIDict(completed=False)),
- after=APIDict(result=APIDict(completed=True)),
- )
- self.sdk.returns["get_cluster"] = APIDict(
- result=APIDict(state="Terminated")
- )
- # Timeout is long enough
- self.cluster_manager.start_cluster(timeout=400)
- @patch("time.sleep", lambda *a, **kw: None)
- def testSessionStartStartupSuccess(self):
- self.cluster_manager.cluster_env_id = "correct"
- self.cluster_manager.cluster_compute_id = "correct"
- self.sdk.returns["create_cluster"] = APIDict(result=APIDict(id="success"))
- self.sdk.returns["start_cluster"] = APIDict(
- result=APIDict(id="cop_id", completed=False)
- )
- with freeze_time() as frozen_time:
- frozen_time.tick(delta=0.1)
- self.sdk.returns["get_cluster_operation"] = _DelayedResponse(
- lambda: frozen_time.tick(delta=10),
- finish_after=300,
- before=APIDict(result=APIDict(completed=False)),
- after=APIDict(result=APIDict(completed=True)),
- )
- self.sdk.returns["get_cluster"] = APIDict(result=APIDict(state="Running"))
- # Timeout is long enough
- self.cluster_manager.start_cluster(timeout=400)
- @unittest.skipUnless(
- os.environ.get("RELEASE_UNIT_TEST_NO_ANYSCALE", "0") == "1",
- reason="RELEASE_UNIT_TEST_NO_ANYSCALE is set to 1",
- )
- class LiveSessionManagerTest(unittest.TestCase):
- def setUp(self) -> None:
- self.sdk = get_anyscale_sdk()
- self.cluster_compute = TEST_CLUSTER_COMPUTE
- self.cluster_manager = FullClusterManager(
- project_id=UNIT_TEST_PROJECT_ID,
- sdk=self.sdk,
- test_name=f"unit_test__{self.__class__.__name__}__endToEnd",
- )
- def tearDown(self) -> None:
- self.cluster_manager.terminate_cluster()
- self.cluster_manager.delete_configs()
- def testSessionEndToEnd(self):
- self.cluster_manager.set_cluster_env(self.cluster_env)
- self.cluster_manager.set_cluster_compute(self.cluster_compute)
- self.cluster_manager.build_configs(timeout=1200)
- # Reset, so that we fetch them again and test that code path
- self.cluster_manager.cluster_compute_id = None
- self.cluster_manager.cluster_env_id = None
- self.cluster_manager.cluster_env_build_id = None
- self.cluster_manager.build_configs(timeout=1200)
- # Start cluster
- self.cluster_manager.start_cluster(timeout=1200)
- if __name__ == "__main__":
- import pytest
- sys.exit(pytest.main(["-v", __file__]))
|