config.py 7.4 KB


  1. import copy
  2. import json
  3. import os
  4. import re
  5. from typing import Dict, List, Optional, Tuple, Any
  6. import jsonschema
  7. import yaml
  8. from ray_release.test import (
  9. Test,
  10. TestDefinition,
  11. )
  12. from ray_release.anyscale_util import find_cloud_by_name
  13. from ray_release.bazel import bazel_runfile
  14. from ray_release.exception import ReleaseTestCLIError, ReleaseTestConfigError
  15. from ray_release.logger import logger
  16. from ray_release.util import DeferredEnvVar, deep_update
  17. DEFAULT_WHEEL_WAIT_TIMEOUT = 7200 # Two hours
  18. DEFAULT_COMMAND_TIMEOUT = 1800
  19. DEFAULT_BUILD_TIMEOUT = 3600
  20. DEFAULT_CLUSTER_TIMEOUT = 1800
  21. DEFAULT_AUTOSUSPEND_MINS = 120
  22. DEFAULT_MAXIMUM_UPTIME_MINS = 3200
  23. DEFAULT_WAIT_FOR_NODES_TIMEOUT = 3000
  24. DEFAULT_CLOUD_ID = DeferredEnvVar(
  25. "RELEASE_DEFAULT_CLOUD_ID",
  26. "cld_kvedZWag2qA8i5BjxUevf5i7", # anyscale_v2_default_cloud
  27. )
  28. DEFAULT_ANYSCALE_PROJECT = DeferredEnvVar(
  29. "RELEASE_DEFAULT_PROJECT",
  30. "prj_FKRmeV5pA6X72aVscFALNC32",
  31. )
  32. RELEASE_PACKAGE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
  33. RELEASE_TEST_SCHEMA_FILE = bazel_runfile("release/ray_release/schema.json")
  34. def read_and_validate_release_test_collection(
  35. config_file: str, schema_file: Optional[str] = None
  36. ) -> List[Test]:
  37. """Read and validate test collection from config file"""
  38. with open(config_file, "rt") as fp:
  39. tests = parse_test_definition(yaml.safe_load(fp))
  40. validate_release_test_collection(tests, schema_file=schema_file)
  41. return tests
  42. def _test_definition_invariant(
  43. test_definition: TestDefinition,
  44. invariant: bool,
  45. message: str,
  46. ) -> None:
  47. if invariant:
  48. return
  49. raise ReleaseTestConfigError(
  50. f'{test_definition["name"]} has invalid definition: {message}',
  51. )
  52. def parse_test_definition(test_definitions: List[TestDefinition]) -> List[Test]:
  53. tests = []
  54. for test_definition in test_definitions:
  55. if "variations" not in test_definition:
  56. tests.append(Test(test_definition))
  57. continue
  58. variations = test_definition.pop("variations")
  59. _test_definition_invariant(
  60. test_definition,
  61. variations,
  62. "variations field cannot be empty in a test definition",
  63. )
  64. for variation in variations:
  65. _test_definition_invariant(
  66. test_definition,
  67. "__suffix__" in variation,
  68. "missing __suffix__ field in a variation",
  69. )
  70. test = copy.deepcopy(test_definition)
  71. test["name"] = f'{test["name"]}.{variation.pop("__suffix__")}'
  72. test = deep_update(test, variation)
  73. tests.append(Test(test))
  74. return tests
  75. def load_schema_file(path: Optional[str] = None) -> Dict:
  76. path = path or RELEASE_TEST_SCHEMA_FILE
  77. with open(path, "rt") as fp:
  78. return json.load(fp)
  79. def validate_release_test_collection(
  80. test_collection: List[Test], schema_file: Optional[str] = None
  81. ):
  82. try:
  83. schema = load_schema_file(schema_file)
  84. except Exception as e:
  85. raise ReleaseTestConfigError(
  86. f"Could not load release test validation schema: {e}"
  87. ) from e
  88. num_errors = 0
  89. for test in test_collection:
  90. error = validate_test(test, schema)
  91. if error:
  92. logger.error(
  93. f"Failed to validate test {test.get('name', '(unnamed)')}: {error}"
  94. )
  95. num_errors += 1
  96. error = validate_test_cluster_compute(test)
  97. if error:
  98. logger.error(
  99. f"Failed to validate test {test.get('name', '(unnamed)')}: {error}"
  100. )
  101. num_errors += 1
  102. error = validate_test_cluster_env(test)
  103. if error:
  104. logger.error(
  105. f"Failed to validate test {test.get('name', '(unnamed)')}: {error}"
  106. )
  107. num_errors += 1
  108. if num_errors > 0:
  109. raise ReleaseTestConfigError(
  110. f"Release test configuration error: Found {num_errors} test "
  111. f"validation errors."
  112. )
  113. def validate_test(test: Test, schema: Optional[Dict] = None) -> Optional[str]:
  114. schema = schema or load_schema_file()
  115. try:
  116. jsonschema.validate(test, schema=schema)
  117. except (jsonschema.ValidationError, jsonschema.SchemaError) as e:
  118. return str(e.message)
  119. except Exception as e:
  120. return str(e)
  121. def validate_test_cluster_compute(test: Test) -> Optional[str]:
  122. from ray_release.template import load_test_cluster_compute
  123. cluster_compute = load_test_cluster_compute(test)
  124. return validate_cluster_compute(cluster_compute)
  125. def validate_cluster_compute(cluster_compute: Dict[str, Any]) -> Optional[str]:
  126. aws = cluster_compute.get("aws", {})
  127. head_node_aws = cluster_compute.get("head_node_type", {}).get(
  128. "aws_advanced_configurations", {}
  129. )
  130. configs_to_check = [aws, head_node_aws]
  131. for worker_node in cluster_compute.get("worker_node_types", []):
  132. worker_node_aws = worker_node.get("aws_advanced_configurations", {})
  133. configs_to_check.append(worker_node_aws)
  134. for config in configs_to_check:
  135. error = validate_aws_config(config)
  136. if error:
  137. return error
  138. return None
  139. def validate_test_cluster_env(test: Test) -> Optional[str]:
  140. from ray_release.template import get_cluster_env_path
  141. cluster_env_path = get_cluster_env_path(test)
  142. if not os.path.exists(cluster_env_path):
  143. raise ReleaseTestConfigError(
  144. f"Cannot load yaml template from {cluster_env_path}: Path not found."
  145. )
  146. return None
  147. def validate_aws_config(aws_config: Dict[str, Any]) -> Optional[str]:
  148. for block_device_mapping in aws_config.get("BlockDeviceMappings", []):
  149. ebs = block_device_mapping.get("Ebs")
  150. if not ebs:
  151. continue
  152. if not ebs.get("DeleteOnTermination", False) is True:
  153. return "Ebs volume does not have `DeleteOnTermination: true` set"
  154. return None
  155. def find_test(test_collection: List[Test], test_name: str) -> Optional[Test]:
  156. """Find test with `test_name` in `test_collection`"""
  157. for test in test_collection:
  158. if test["name"] == test_name:
  159. return test
  160. return None
  161. def as_smoke_test(test: Test) -> Test:
  162. if "smoke_test" not in test:
  163. raise ReleaseTestCLIError(
  164. f"Requested smoke test, but test with name {test['name']} does "
  165. f"not have any smoke test configuration."
  166. )
  167. smoke_test_config = test.pop("smoke_test")
  168. new_test = deep_update(test, smoke_test_config)
  169. return new_test
  170. def parse_python_version(version: str) -> Tuple[int, int]:
  171. """From XY and X.Y to (X, Y)"""
  172. match = re.match(r"^([0-9])\.?([0-9]+)$", version)
  173. if not match:
  174. raise ReleaseTestConfigError(f"Invalid Python version string: {version}")
  175. return int(match.group(1)), int(match.group(2))
  176. def get_test_cloud_id(test: Test) -> str:
  177. cloud_id = test["cluster"].get("cloud_id", None)
  178. cloud_name = test["cluster"].get("cloud_name", None)
  179. if cloud_id and cloud_name:
  180. raise RuntimeError(
  181. f"You can't supply both a `cloud_name` ({cloud_name}) and a "
  182. f"`cloud_id` ({cloud_id}) in the test cluster configuration. "
  183. f"Please provide only one."
  184. )
  185. elif cloud_name and not cloud_id:
  186. cloud_id = find_cloud_by_name(cloud_name)
  187. if not cloud_id:
  188. raise RuntimeError(f"Couldn't find cloud with name `{cloud_name}`.")
  189. else:
  190. cloud_id = cloud_id or str(DEFAULT_CLOUD_ID)
  191. return cloud_id