config.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. import copy
  2. import json
  3. import os
  4. import re
  5. from typing import Dict, List, Optional, Tuple, Any
  6. import jsonschema
  7. import yaml
  8. from ray_release.test import (
  9. Test,
  10. TestDefinition,
  11. )
  12. from ray_release.anyscale_util import find_cloud_by_name
  13. from ray_release.bazel import bazel_runfile
  14. from ray_release.exception import ReleaseTestCLIError, ReleaseTestConfigError
  15. from ray_release.logger import logger
  16. from ray_release.util import DeferredEnvVar, deep_update
  17. DEFAULT_WHEEL_WAIT_TIMEOUT = 7200 # Two hours
  18. DEFAULT_COMMAND_TIMEOUT = 1800
  19. DEFAULT_BUILD_TIMEOUT = 3600
  20. DEFAULT_CLUSTER_TIMEOUT = 1800
  21. DEFAULT_AUTOSUSPEND_MINS = 120
  22. DEFAULT_MAXIMUM_UPTIME_MINS = 3200
  23. DEFAULT_WAIT_FOR_NODES_TIMEOUT = 3000
  24. DEFAULT_CLOUD_ID = DeferredEnvVar(
  25. "RELEASE_DEFAULT_CLOUD_ID",
  26. "cld_kvedZWag2qA8i5BjxUevf5i7", # anyscale_v2_default_cloud
  27. )
  28. DEFAULT_ANYSCALE_PROJECT = DeferredEnvVar(
  29. "RELEASE_DEFAULT_PROJECT",
  30. "prj_FKRmeV5pA6X72aVscFALNC32",
  31. )
  32. RELEASE_PACKAGE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
  33. RELEASE_TEST_SCHEMA_FILE = bazel_runfile("release/ray_release/schema.json")
  34. def read_and_validate_release_test_collection(
  35. config_files: List[str],
  36. test_definition_root: str = None,
  37. schema_file: Optional[str] = None,
  38. ) -> List[Test]:
  39. """Read and validate test collection from config file"""
  40. tests = []
  41. for config_file in config_files:
  42. path = (
  43. os.path.join(test_definition_root, config_file)
  44. if test_definition_root
  45. else bazel_runfile(config_file)
  46. )
  47. with open(path, "rt") as fp:
  48. tests += parse_test_definition(yaml.safe_load(fp))
  49. validate_release_test_collection(
  50. tests,
  51. schema_file=schema_file,
  52. test_definition_root=test_definition_root,
  53. )
  54. return tests
  55. def _test_definition_invariant(
  56. test_definition: TestDefinition,
  57. invariant: bool,
  58. message: str,
  59. ) -> None:
  60. if invariant:
  61. return
  62. raise ReleaseTestConfigError(
  63. f'{test_definition["name"]} has invalid definition: {message}',
  64. )
  65. def parse_test_definition(test_definitions: List[TestDefinition]) -> List[Test]:
  66. tests = []
  67. for test_definition in test_definitions:
  68. if "variations" not in test_definition:
  69. tests.append(Test(test_definition))
  70. continue
  71. variations = test_definition.pop("variations")
  72. _test_definition_invariant(
  73. test_definition,
  74. variations,
  75. "variations field cannot be empty in a test definition",
  76. )
  77. for variation in variations:
  78. _test_definition_invariant(
  79. test_definition,
  80. "__suffix__" in variation,
  81. "missing __suffix__ field in a variation",
  82. )
  83. test = copy.deepcopy(test_definition)
  84. test["name"] = f'{test["name"]}.{variation.pop("__suffix__")}'
  85. test = deep_update(test, variation)
  86. tests.append(Test(test))
  87. return tests
  88. def load_schema_file(path: Optional[str] = None) -> Dict:
  89. path = path or RELEASE_TEST_SCHEMA_FILE
  90. with open(path, "rt") as fp:
  91. return json.load(fp)
  92. def validate_release_test_collection(
  93. test_collection: List[Test],
  94. schema_file: Optional[str] = None,
  95. test_definition_root: Optional[str] = None,
  96. ):
  97. try:
  98. schema = load_schema_file(schema_file)
  99. except Exception as e:
  100. raise ReleaseTestConfigError(
  101. f"Could not load release test validation schema: {e}"
  102. ) from e
  103. num_errors = 0
  104. for test in test_collection:
  105. error = validate_test(test, schema)
  106. if error:
  107. logger.error(
  108. f"Failed to validate test {test.get('name', '(unnamed)')}: {error}"
  109. )
  110. num_errors += 1
  111. error = validate_test_cluster_compute(test, test_definition_root)
  112. if error:
  113. logger.error(
  114. f"Failed to validate test {test.get('name', '(unnamed)')}: {error}"
  115. )
  116. num_errors += 1
  117. if num_errors > 0:
  118. raise ReleaseTestConfigError(
  119. f"Release test configuration error: Found {num_errors} test "
  120. f"validation errors."
  121. )
  122. def validate_test(test: Test, schema: Optional[Dict] = None) -> Optional[str]:
  123. schema = schema or load_schema_file()
  124. try:
  125. jsonschema.validate(test, schema=schema)
  126. except (jsonschema.ValidationError, jsonschema.SchemaError) as e:
  127. return str(e.message)
  128. except Exception as e:
  129. return str(e)
  130. def validate_test_cluster_compute(
  131. test: Test, test_definition_root: Optional[str] = None
  132. ) -> Optional[str]:
  133. from ray_release.template import load_test_cluster_compute
  134. cluster_compute = load_test_cluster_compute(test, test_definition_root)
  135. return validate_cluster_compute(cluster_compute)
  136. def validate_cluster_compute(cluster_compute: Dict[str, Any]) -> Optional[str]:
  137. aws = cluster_compute.get("aws", {})
  138. head_node_aws = cluster_compute.get("head_node_type", {}).get(
  139. "aws_advanced_configurations", {}
  140. )
  141. configs_to_check = [aws, head_node_aws]
  142. for worker_node in cluster_compute.get("worker_node_types", []):
  143. worker_node_aws = worker_node.get("aws_advanced_configurations", {})
  144. configs_to_check.append(worker_node_aws)
  145. for config in configs_to_check:
  146. error = validate_aws_config(config)
  147. if error:
  148. return error
  149. return None
  150. def validate_aws_config(aws_config: Dict[str, Any]) -> Optional[str]:
  151. for block_device_mapping in aws_config.get("BlockDeviceMappings", []):
  152. ebs = block_device_mapping.get("Ebs")
  153. if not ebs:
  154. continue
  155. if not ebs.get("DeleteOnTermination", False) is True:
  156. return "Ebs volume does not have `DeleteOnTermination: true` set"
  157. return None
  158. def find_test(test_collection: List[Test], test_name: str) -> Optional[Test]:
  159. """Find test with `test_name` in `test_collection`"""
  160. for test in test_collection:
  161. if test["name"] == test_name:
  162. return test
  163. return None
  164. def as_smoke_test(test: Test) -> Test:
  165. if "smoke_test" not in test:
  166. raise ReleaseTestCLIError(
  167. f"Requested smoke test, but test with name {test['name']} does "
  168. f"not have any smoke test configuration."
  169. )
  170. smoke_test_config = test.pop("smoke_test")
  171. new_test = deep_update(test, smoke_test_config)
  172. return new_test
  173. def parse_python_version(version: str) -> Tuple[int, int]:
  174. """From XY and X.Y to (X, Y)"""
  175. match = re.match(r"^([0-9])\.?([0-9]+)$", version)
  176. if not match:
  177. raise ReleaseTestConfigError(f"Invalid Python version string: {version}")
  178. return int(match.group(1)), int(match.group(2))
  179. def get_test_cloud_id(test: Test) -> str:
  180. cloud_id = test["cluster"].get("cloud_id", None)
  181. cloud_name = test["cluster"].get("cloud_name", None)
  182. if cloud_id and cloud_name:
  183. raise RuntimeError(
  184. f"You can't supply both a `cloud_name` ({cloud_name}) and a "
  185. f"`cloud_id` ({cloud_id}) in the test cluster configuration. "
  186. f"Please provide only one."
  187. )
  188. elif cloud_name and not cloud_id:
  189. cloud_id = find_cloud_by_name(cloud_name)
  190. if not cloud_id:
  191. raise RuntimeError(f"Couldn't find cloud with name `{cloud_name}`.")
  192. else:
  193. cloud_id = cloud_id or str(DEFAULT_CLOUD_ID)
  194. return cloud_id