config.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. import copy
  2. import json
  3. import os
  4. import re
  5. from typing import Dict, List, Optional, Tuple, Any
  6. import jsonschema
  7. import yaml
  8. from ray_release.test import (
  9. Test,
  10. TestDefinition,
  11. )
  12. from ray_release.anyscale_util import find_cloud_by_name
  13. from ray_release.bazel import bazel_runfile
  14. from ray_release.exception import ReleaseTestCLIError, ReleaseTestConfigError
  15. from ray_release.logger import logger
  16. from ray_release.util import DeferredEnvVar, deep_update
  17. DEFAULT_WHEEL_WAIT_TIMEOUT = 7200 # Two hours
  18. DEFAULT_COMMAND_TIMEOUT = 1800
  19. DEFAULT_BUILD_TIMEOUT = 3600
  20. DEFAULT_CLUSTER_TIMEOUT = 1800
  21. DEFAULT_AUTOSUSPEND_MINS = 120
  22. DEFAULT_MAXIMUM_UPTIME_MINS = 3200
  23. DEFAULT_WAIT_FOR_NODES_TIMEOUT = 3000
  24. DEFAULT_CLOUD_ID = DeferredEnvVar(
  25. "RELEASE_DEFAULT_CLOUD_ID",
  26. "cld_kvedZWag2qA8i5BjxUevf5i7", # anyscale_v2_default_cloud
  27. )
  28. DEFAULT_ANYSCALE_PROJECT = DeferredEnvVar(
  29. "RELEASE_DEFAULT_PROJECT",
  30. "prj_FKRmeV5pA6X72aVscFALNC32",
  31. )
  32. RELEASE_PACKAGE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
  33. RELEASE_TEST_SCHEMA_FILE = bazel_runfile("release/ray_release/schema.json")
  34. def read_and_validate_release_test_collection(
  35. config_file: str, schema_file: Optional[str] = None
  36. ) -> List[Test]:
  37. """Read and validate test collection from config file"""
  38. with open(config_file, "rt") as fp:
  39. tests = parse_test_definition(yaml.safe_load(fp))
  40. validate_release_test_collection(tests, schema_file=schema_file)
  41. return tests
  42. def _test_definition_invariant(
  43. test_definition: TestDefinition,
  44. invariant: bool,
  45. message: str,
  46. ) -> None:
  47. if invariant:
  48. return
  49. raise ReleaseTestConfigError(
  50. f'{test_definition["name"]} has invalid definition: {message}',
  51. )
  52. def parse_test_definition(test_definitions: List[TestDefinition]) -> List[Test]:
  53. tests = []
  54. for test_definition in test_definitions:
  55. if "variations" not in test_definition:
  56. tests.append(Test(test_definition))
  57. continue
  58. variations = test_definition.pop("variations")
  59. _test_definition_invariant(
  60. test_definition,
  61. variations,
  62. "variations field cannot be empty in a test definition",
  63. )
  64. for variation in variations:
  65. _test_definition_invariant(
  66. test_definition,
  67. "__suffix__" in variation,
  68. "missing __suffix__ field in a variation",
  69. )
  70. test = copy.deepcopy(test_definition)
  71. test["name"] = f'{test["name"]}.{variation.pop("__suffix__")}'
  72. test = deep_update(test, variation)
  73. tests.append(Test(test))
  74. return tests
  75. def load_schema_file(path: Optional[str] = None) -> Dict:
  76. path = path or RELEASE_TEST_SCHEMA_FILE
  77. with open(path, "rt") as fp:
  78. return json.load(fp)
  79. def validate_release_test_collection(
  80. test_collection: List[Test], schema_file: Optional[str] = None
  81. ):
  82. try:
  83. schema = load_schema_file(schema_file)
  84. except Exception as e:
  85. raise ReleaseTestConfigError(
  86. f"Could not load release test validation schema: {e}"
  87. ) from e
  88. num_errors = 0
  89. for test in test_collection:
  90. error = validate_test(test, schema)
  91. if error:
  92. logger.error(
  93. f"Failed to validate test {test.get('name', '(unnamed)')}: {error}"
  94. )
  95. num_errors += 1
  96. error = validate_test_cluster_compute(test)
  97. if error:
  98. logger.error(
  99. f"Failed to validate test {test.get('name', '(unnamed)')}: {error}"
  100. )
  101. num_errors += 1
  102. error = validate_test_cluster_env(test)
  103. if error:
  104. logger.error(
  105. f"Failed to validate test {test.get('name', '(unnamed)')}: {error}"
  106. )
  107. num_errors += 1
  108. if num_errors > 0:
  109. raise ReleaseTestConfigError(
  110. f"Release test configuration error: Found {num_errors} test "
  111. f"validation errors."
  112. )
  113. def validate_test(test: Test, schema: Optional[Dict] = None) -> Optional[str]:
  114. schema = schema or load_schema_file()
  115. try:
  116. jsonschema.validate(test, schema=schema)
  117. except (jsonschema.ValidationError, jsonschema.SchemaError) as e:
  118. return str(e.message)
  119. except Exception as e:
  120. return str(e)
  121. def validate_test_cluster_compute(test: Test) -> Optional[str]:
  122. from ray_release.template import load_test_cluster_compute
  123. cluster_compute = load_test_cluster_compute(test)
  124. return validate_cluster_compute(cluster_compute)
  125. def validate_cluster_compute(cluster_compute: Dict[str, Any]) -> Optional[str]:
  126. aws = cluster_compute.get("aws", {})
  127. head_node_aws = cluster_compute.get("head_node_type", {}).get(
  128. "aws_advanced_configurations", {}
  129. )
  130. configs_to_check = [aws, head_node_aws]
  131. for worker_node in cluster_compute.get("worker_node_types", []):
  132. worker_node_aws = worker_node.get("aws_advanced_configurations", {})
  133. configs_to_check.append(worker_node_aws)
  134. for config in configs_to_check:
  135. error = validate_aws_config(config)
  136. if error:
  137. return error
  138. return None
  139. def validate_test_cluster_env(test: Test) -> Optional[str]:
  140. from ray_release.template import get_cluster_env_path
  141. cluster_env_path = get_cluster_env_path(test)
  142. if not os.path.exists(cluster_env_path):
  143. raise ReleaseTestConfigError(
  144. f"Cannot load yaml template from {cluster_env_path}: Path not found."
  145. )
  146. return None
  147. def validate_aws_config(aws_config: Dict[str, Any]) -> Optional[str]:
  148. for block_device_mapping in aws_config.get("BlockDeviceMappings", []):
  149. ebs = block_device_mapping.get("Ebs")
  150. if not ebs:
  151. continue
  152. if not ebs.get("DeleteOnTermination", False) is True:
  153. return "Ebs volume does not have `DeleteOnTermination: true` set"
  154. return None
  155. def find_test(test_collection: List[Test], test_name: str) -> Optional[Test]:
  156. """Find test with `test_name` in `test_collection`"""
  157. for test in test_collection:
  158. if test["name"] == test_name:
  159. return test
  160. return None
  161. def as_smoke_test(test: Test) -> Test:
  162. if "smoke_test" not in test:
  163. raise ReleaseTestCLIError(
  164. f"Requested smoke test, but test with name {test['name']} does "
  165. f"not have any smoke test configuration."
  166. )
  167. smoke_test_config = test.pop("smoke_test")
  168. new_test = deep_update(test, smoke_test_config)
  169. return new_test
  170. def parse_python_version(version: str) -> Tuple[int, int]:
  171. """From XY and X.Y to (X, Y)"""
  172. match = re.match(r"^([0-9])\.?([0-9]+)$", version)
  173. if not match:
  174. raise ReleaseTestConfigError(f"Invalid Python version string: {version}")
  175. return int(match.group(1)), int(match.group(2))
  176. def get_test_cloud_id(test: Test) -> str:
  177. cloud_id = test["cluster"].get("cloud_id", None)
  178. cloud_name = test["cluster"].get("cloud_name", None)
  179. if cloud_id and cloud_name:
  180. raise RuntimeError(
  181. f"You can't supply both a `cloud_name` ({cloud_name}) and a "
  182. f"`cloud_id` ({cloud_id}) in the test cluster configuration. "
  183. f"Please provide only one."
  184. )
  185. elif cloud_name and not cloud_id:
  186. cloud_id = find_cloud_by_name(cloud_name)
  187. if not cloud_id:
  188. raise RuntimeError(f"Couldn't find cloud with name `{cloud_name}`.")
  189. else:
  190. cloud_id = cloud_id or str(DEFAULT_CLOUD_ID)
  191. return cloud_id