config.py 7.3 KB


  1. import copy
  2. import json
  3. import os
  4. import re
  5. from typing import Dict, List, Optional, Tuple, Any
  6. import jsonschema
  7. import yaml
  8. from ray_release.test import (
  9. Test,
  10. TestDefinition,
  11. )
  12. from ray_release.anyscale_util import find_cloud_by_name
  13. from ray_release.bazel import bazel_runfile
  14. from ray_release.exception import ReleaseTestCLIError, ReleaseTestConfigError
  15. from ray_release.logger import logger
  16. from ray_release.util import DeferredEnvVar, deep_update
  17. DEFAULT_WHEEL_WAIT_TIMEOUT = 7200 # Two hours
  18. DEFAULT_COMMAND_TIMEOUT = 1800
  19. DEFAULT_BUILD_TIMEOUT = 3600
  20. DEFAULT_CLUSTER_TIMEOUT = 1800
  21. DEFAULT_AUTOSUSPEND_MINS = 120
  22. DEFAULT_MAXIMUM_UPTIME_MINS = 3200
  23. DEFAULT_WAIT_FOR_NODES_TIMEOUT = 3000
  24. DEFAULT_CLOUD_ID = DeferredEnvVar(
  25. "RELEASE_DEFAULT_CLOUD_ID",
  26. "cld_kvedZWag2qA8i5BjxUevf5i7", # anyscale_v2_default_cloud
  27. )
  28. DEFAULT_ANYSCALE_PROJECT = DeferredEnvVar(
  29. "RELEASE_DEFAULT_PROJECT",
  30. "prj_FKRmeV5pA6X72aVscFALNC32",
  31. )
  32. RELEASE_PACKAGE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
  33. RELEASE_TEST_SCHEMA_FILE = bazel_runfile("release/ray_release/schema.json")
  34. def read_and_validate_release_test_collection(
  35. config_files: List[str],
  36. test_definition_root: str = None,
  37. schema_file: Optional[str] = None,
  38. ) -> List[Test]:
  39. """Read and validate test collection from config file"""
  40. tests = []
  41. for config_file in config_files:
  42. path = (
  43. os.path.join(test_definition_root, config_file)
  44. if test_definition_root
  45. else bazel_runfile(config_file)
  46. )
  47. with open(path, "rt") as fp:
  48. tests += parse_test_definition(yaml.safe_load(fp))
  49. validate_release_test_collection(
  50. tests,
  51. schema_file=schema_file,
  52. test_definition_root=test_definition_root,
  53. )
  54. return tests
  55. def _test_definition_invariant(
  56. test_definition: TestDefinition,
  57. invariant: bool,
  58. message: str,
  59. ) -> None:
  60. if invariant:
  61. return
  62. raise ReleaseTestConfigError(
  63. f'{test_definition["name"]} has invalid definition: {message}',
  64. )
  65. def parse_test_definition(test_definitions: List[TestDefinition]) -> List[Test]:
  66. tests = []
  67. for test_definition in test_definitions:
  68. if "variations" not in test_definition:
  69. tests.append(Test(test_definition))
  70. continue
  71. variations = test_definition.pop("variations")
  72. _test_definition_invariant(
  73. test_definition,
  74. variations,
  75. "variations field cannot be empty in a test definition",
  76. )
  77. for variation in variations:
  78. _test_definition_invariant(
  79. test_definition,
  80. "__suffix__" in variation,
  81. "missing __suffix__ field in a variation",
  82. )
  83. test = copy.deepcopy(test_definition)
  84. test["name"] = f'{test["name"]}.{variation.pop("__suffix__")}'
  85. test = deep_update(test, variation)
  86. tests.append(Test(test))
  87. return tests
  88. def load_schema_file(path: Optional[str] = None) -> Dict:
  89. path = path or RELEASE_TEST_SCHEMA_FILE
  90. with open(path, "rt") as fp:
  91. return json.load(fp)
  92. def validate_release_test_collection(
  93. test_collection: List[Test],
  94. schema_file: Optional[str] = None,
  95. test_definition_root: Optional[str] = None,
  96. ):
  97. try:
  98. schema = load_schema_file(schema_file)
  99. except Exception as e:
  100. raise ReleaseTestConfigError(
  101. f"Could not load release test validation schema: {e}"
  102. ) from e
  103. num_errors = 0
  104. for test in test_collection:
  105. error = validate_test(test, schema)
  106. if error:
  107. logger.error(
  108. f"Failed to validate test {test.get('name', '(unnamed)')}: {error}"
  109. )
  110. num_errors += 1
  111. error = validate_test_cluster_compute(test, test_definition_root)
  112. if error:
  113. logger.error(
  114. f"Failed to validate test {test.get('name', '(unnamed)')}: {error}"
  115. )
  116. num_errors += 1
  117. if num_errors > 0:
  118. raise ReleaseTestConfigError(
  119. f"Release test configuration error: Found {num_errors} test "
  120. f"validation errors."
  121. )
  122. def validate_test(test: Test, schema: Optional[Dict] = None) -> Optional[str]:
  123. schema = schema or load_schema_file()
  124. try:
  125. jsonschema.validate(test, schema=schema)
  126. except (jsonschema.ValidationError, jsonschema.SchemaError) as e:
  127. return str(e.message)
  128. except Exception as e:
  129. return str(e)
  130. def validate_test_cluster_compute(
  131. test: Test, test_definition_root: Optional[str] = None
  132. ) -> Optional[str]:
  133. from ray_release.template import load_test_cluster_compute
  134. cluster_compute = load_test_cluster_compute(test, test_definition_root)
  135. return validate_cluster_compute(cluster_compute)
  136. def validate_cluster_compute(cluster_compute: Dict[str, Any]) -> Optional[str]:
  137. aws = cluster_compute.get("aws", {})
  138. head_node_aws = cluster_compute.get("head_node_type", {}).get(
  139. "aws_advanced_configurations", {}
  140. )
  141. configs_to_check = [aws, head_node_aws]
  142. for worker_node in cluster_compute.get("worker_node_types", []):
  143. worker_node_aws = worker_node.get("aws_advanced_configurations", {})
  144. configs_to_check.append(worker_node_aws)
  145. for config in configs_to_check:
  146. error = validate_aws_config(config)
  147. if error:
  148. return error
  149. return None
  150. def validate_aws_config(aws_config: Dict[str, Any]) -> Optional[str]:
  151. for block_device_mapping in aws_config.get("BlockDeviceMappings", []):
  152. ebs = block_device_mapping.get("Ebs")
  153. if not ebs:
  154. continue
  155. if not ebs.get("DeleteOnTermination", False) is True:
  156. return "Ebs volume does not have `DeleteOnTermination: true` set"
  157. return None
  158. def find_test(test_collection: List[Test], test_name: str) -> Optional[Test]:
  159. """Find test with `test_name` in `test_collection`"""
  160. for test in test_collection:
  161. if test["name"] == test_name:
  162. return test
  163. return None
  164. def as_smoke_test(test: Test) -> Test:
  165. if "smoke_test" not in test:
  166. raise ReleaseTestCLIError(
  167. f"Requested smoke test, but test with name {test['name']} does "
  168. f"not have any smoke test configuration."
  169. )
  170. smoke_test_config = test.pop("smoke_test")
  171. new_test = deep_update(test, smoke_test_config)
  172. return new_test
  173. def parse_python_version(version: str) -> Tuple[int, int]:
  174. """From XY and X.Y to (X, Y)"""
  175. match = re.match(r"^([0-9])\.?([0-9]+)$", version)
  176. if not match:
  177. raise ReleaseTestConfigError(f"Invalid Python version string: {version}")
  178. return int(match.group(1)), int(match.group(2))
  179. def get_test_cloud_id(test: Test) -> str:
  180. cloud_id = test["cluster"].get("cloud_id", None)
  181. cloud_name = test["cluster"].get("cloud_name", None)
  182. if cloud_id and cloud_name:
  183. raise RuntimeError(
  184. f"You can't supply both a `cloud_name` ({cloud_name}) and a "
  185. f"`cloud_id` ({cloud_id}) in the test cluster configuration. "
  186. f"Please provide only one."
  187. )
  188. elif cloud_name and not cloud_id:
  189. cloud_id = find_cloud_by_name(cloud_name)
  190. if not cloud_id:
  191. raise RuntimeError(f"Couldn't find cloud with name `{cloud_name}`.")
  192. else:
  193. cloud_id = cloud_id or str(DEFAULT_CLOUD_ID)
  194. return cloud_id