ray_bisect.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. import click
  2. import copy
  3. import subprocess
  4. import os
  5. import json
  6. import time
  7. from typing import Dict, List, Set, Tuple
  8. from pathlib import Path
  9. from ray_release.bazel import bazel_runfile
  10. from ray_release.logger import logger
  11. from ray_release.buildkite.step import get_step
  12. from ray_release.byod.build import (
  13. build_anyscale_base_byod_images,
  14. build_anyscale_custom_byod_image,
  15. )
  16. from ray_release.config import read_and_validate_release_test_collection
  17. from ray_release.configs.global_config import init_global_config
  18. from ray_release.test import Test
  19. from ray_release.test_automation.release_state_machine import ReleaseTestStateMachine
  20. @click.command()
  21. @click.argument("test_name", required=True, type=str)
  22. @click.argument("passing_commit", required=True, type=str)
  23. @click.argument("failing_commit", required=True, type=str)
  24. @click.option(
  25. "--test-collection-file",
  26. type=str,
  27. multiple=True,
  28. help="Test collection file, relative path to ray repo.",
  29. )
  30. @click.option(
  31. "--concurrency",
  32. default=3,
  33. type=int,
  34. help=(
  35. "Maximum number of concurrent test jobs to run. Higher number uses more "
  36. "capacity, but reduce the bisect duration"
  37. ),
  38. )
  39. @click.option(
  40. "--run-per-commit",
  41. default=1,
  42. type=int,
  43. help=(
  44. "The number of time we run test on the same commit, to account for test "
  45. "flakiness. Commit passes only when it passes on all runs"
  46. ),
  47. )
  48. @click.option(
  49. "--is-full-test",
  50. is_flag=True,
  51. show_default=True,
  52. default=False,
  53. help=("Use the full, non-smoke version of the test"),
  54. )
  55. @click.option(
  56. "--global-config",
  57. default="oss_config.yaml",
  58. type=click.Choice(
  59. [x.name for x in (Path(__file__).parent.parent / "configs").glob("*.yaml")]
  60. ),
  61. help="Global config to use for test execution.",
  62. )
  63. def main(
  64. test_name: str,
  65. passing_commit: str,
  66. failing_commit: str,
  67. test_collection_file: Tuple[str],
  68. concurrency: int = 1,
  69. run_per_commit: int = 1,
  70. is_full_test: bool = False,
  71. global_config: str = "oss_config.yaml",
  72. ) -> None:
  73. init_global_config(
  74. bazel_runfile("release/ray_release/configs", global_config),
  75. )
  76. if concurrency <= 0:
  77. raise ValueError(
  78. f"Concurrency input need to be a positive number, received: {concurrency}"
  79. )
  80. test = _get_test(test_name, test_collection_file)
  81. pre_sanity_check = _sanity_check(
  82. test, passing_commit, failing_commit, run_per_commit, is_full_test
  83. )
  84. if not pre_sanity_check:
  85. logger.info(
  86. "Failed pre-saniy check, the test might be flaky or fail due to"
  87. " an external (not a code change) factors"
  88. )
  89. return
  90. commit_lists = _get_commit_lists(passing_commit, failing_commit)
  91. blamed_commit = _bisect(
  92. test, commit_lists, concurrency, run_per_commit, is_full_test
  93. )
  94. logger.info(f"Blamed commit found for test {test_name}: {blamed_commit}")
  95. # TODO(can): this env var is used as a feature flag, in case we need to turn this
  96. # off quickly. We should remove this when the new db reporter is stable.
  97. if os.environ.get("UPDATE_TEST_STATE_MACHINE", False):
  98. logger.info(f"Updating test state for test {test_name} to CONSISTENTLY_FAILING")
  99. _update_test_state(test, blamed_commit)
  100. def _bisect(
  101. test: Test,
  102. commit_list: List[str],
  103. concurrency: int,
  104. run_per_commit: int,
  105. is_full_test: bool = False,
  106. ) -> str:
  107. while len(commit_list) > 2:
  108. logger.info(
  109. f"Bisecting between {len(commit_list)} commits: "
  110. f"{commit_list[0]} to {commit_list[-1]} with concurrency {concurrency}"
  111. )
  112. idx_to_commit = {}
  113. for i in range(concurrency):
  114. idx = len(commit_list) * (i + 1) // (concurrency + 1)
  115. # make sure that idx is not at the boundary; this avoids rerun bisect
  116. # on the previously run revision
  117. idx = min(max(idx, 1), len(commit_list) - 2)
  118. idx_to_commit[idx] = commit_list[idx]
  119. outcomes = _run_test(
  120. test, set(idx_to_commit.values()), run_per_commit, is_full_test
  121. )
  122. passing_idx = 0
  123. failing_idx = len(commit_list) - 1
  124. for idx, commit in idx_to_commit.items():
  125. is_passing = all(
  126. outcome == "passed" for outcome in outcomes[commit].values()
  127. )
  128. if is_passing and idx > passing_idx:
  129. passing_idx = idx
  130. if not is_passing and idx < failing_idx:
  131. failing_idx = idx
  132. commit_list = commit_list[passing_idx : failing_idx + 1]
  133. return commit_list[-1]
  134. def _sanity_check(
  135. test: Test,
  136. passing_revision: str,
  137. failing_revision: str,
  138. run_per_commit: int,
  139. is_full_test: bool = False,
  140. ) -> bool:
  141. """
  142. Sanity check that the test indeed passes on the passing revision, and fails on the
  143. failing revision
  144. """
  145. logger.info(
  146. f"Sanity check passing revision: {passing_revision}"
  147. f" and failing revision: {failing_revision}"
  148. )
  149. outcomes = _run_test(
  150. test, [passing_revision, failing_revision], run_per_commit, is_full_test
  151. )
  152. if any(map(lambda x: x != "passed", outcomes[passing_revision].values())):
  153. return False
  154. return any(map(lambda x: x != "passed", outcomes[failing_revision].values()))
  155. def _run_test(
  156. test: Test, commits: Set[str], run_per_commit: int, is_full_test: bool
  157. ) -> Dict[str, Dict[int, str]]:
  158. logger.info(f'Running test {test["name"]} on commits {commits}')
  159. for commit in commits:
  160. _trigger_test_run(test, commit, run_per_commit, is_full_test)
  161. return _obtain_test_result(commits, run_per_commit)
  162. def _trigger_test_run(
  163. test: Test, commit: str, run_per_commit: int, is_full_test: bool
  164. ) -> None:
  165. os.environ["COMMIT_TO_TEST"] = commit
  166. build_anyscale_base_byod_images([test])
  167. build_anyscale_custom_byod_image(test)
  168. for run in range(run_per_commit):
  169. step = get_step(
  170. copy.deepcopy(test), # avoid mutating the original test
  171. smoke_test=test.get("smoke_test", False) and not is_full_test,
  172. env={
  173. "RAY_COMMIT_OF_WHEEL": commit,
  174. "COMMIT_TO_TEST": commit,
  175. },
  176. )
  177. step["label"] = f'{test["name"]}:{commit[:7]}-{run}'
  178. step["key"] = f"{commit}-{run}"
  179. pipeline = subprocess.Popen(
  180. ["echo", json.dumps({"steps": [step]})], stdout=subprocess.PIPE
  181. )
  182. subprocess.check_output(
  183. ["buildkite-agent", "pipeline", "upload"], stdin=pipeline.stdout
  184. )
  185. pipeline.stdout.close()
  186. def _obtain_test_result(
  187. commits: Set[str], run_per_commit: int
  188. ) -> Dict[str, Dict[int, str]]:
  189. outcomes = {}
  190. wait = 5
  191. total_wait = 0
  192. while True:
  193. logger.info(f"... waiting for test result ...({total_wait} seconds)")
  194. for commit in commits:
  195. if commit in outcomes and len(outcomes[commit]) == run_per_commit:
  196. continue
  197. for run in range(run_per_commit):
  198. outcome = (
  199. subprocess.check_output(
  200. [
  201. "buildkite-agent",
  202. "step",
  203. "get",
  204. "outcome",
  205. "--step",
  206. f"{commit}-{run}",
  207. ]
  208. )
  209. .decode("utf-8")
  210. .strip()
  211. )
  212. if not outcome:
  213. continue
  214. if commit not in outcomes:
  215. outcomes[commit] = {}
  216. outcomes[commit][run] = outcome
  217. all_commit_finished = len(outcomes) == len(commits)
  218. per_commit_finished = all(
  219. len(outcome) == run_per_commit for outcome in outcomes.values()
  220. )
  221. if all_commit_finished and per_commit_finished:
  222. break
  223. time.sleep(wait)
  224. total_wait = total_wait + wait
  225. logger.info(f"Final test outcomes: {outcomes}")
  226. return outcomes
  227. def _get_test(test_name: str, test_collection_file: Tuple[str]) -> Test:
  228. test_collection = read_and_validate_release_test_collection(
  229. test_collection_file or ["release/release_tests.yaml"],
  230. )
  231. return [test for test in test_collection if test["name"] == test_name][0]
  232. def _get_commit_lists(passing_commit: str, failing_commit: str) -> List[str]:
  233. # This command obtains all commits between inclusively
  234. return (
  235. subprocess.check_output(
  236. f"git rev-list --reverse ^{passing_commit}~ {failing_commit}",
  237. shell=True,
  238. )
  239. .decode("utf-8")
  240. .strip()
  241. .split("\n")
  242. )
  243. def _update_test_state(test: Test, blamed_commit: str) -> None:
  244. test.update_from_s3()
  245. logger.info(f"Test object: {json.dumps(test)}")
  246. test[Test.KEY_BISECT_BLAMED_COMMIT] = blamed_commit
  247. # Compute and update the next test state, then comment blamed commit on github issue
  248. sm = ReleaseTestStateMachine(test)
  249. sm.move()
  250. sm.comment_blamed_commit_on_github_issue()
  251. logger.info(f"Test object: {json.dumps(test)}")
  252. test.persist_to_s3()
  253. if __name__ == "__main__":
  254. main()