ray_bisect.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. import click
  2. import subprocess
  3. import os
  4. import json
  5. import time
  6. from typing import Dict, List, Set
  7. from ray_release.logger import logger
  8. from ray_release.buildkite.step import get_step
  9. from ray_release.byod.build import build_anyscale_byod_images
  10. from ray_release.config import (
  11. read_and_validate_release_test_collection,
  12. parse_python_version,
  13. DEFAULT_WHEEL_WAIT_TIMEOUT,
  14. )
  15. from ray_release.test import (
  16. Test,
  17. DEFAULT_PYTHON_VERSION,
  18. )
  19. from ray_release.test_automation.state_machine import TestStateMachine
  20. from ray_release.wheels import find_and_wait_for_ray_wheels_url
  21. @click.command()
  22. @click.argument("test_name", required=True, type=str)
  23. @click.argument("passing_commit", required=True, type=str)
  24. @click.argument("failing_commit", required=True, type=str)
  25. @click.option(
  26. "--concurrency",
  27. default=3,
  28. type=int,
  29. help=(
  30. "Maximum number of concurrent test jobs to run. Higher number uses more "
  31. "capacity, but reduce the bisect duration"
  32. ),
  33. )
  34. @click.option(
  35. "--run-per-commit",
  36. default=1,
  37. type=int,
  38. help=(
  39. "The number of time we run test on the same commit, to account for test "
  40. "flakiness. Commit passes only when it passes on all runs"
  41. ),
  42. )
  43. def main(
  44. test_name: str,
  45. passing_commit: str,
  46. failing_commit: str,
  47. concurrency: int = 1,
  48. run_per_commit: int = 1,
  49. ) -> None:
  50. if concurrency <= 0:
  51. raise ValueError(
  52. f"Concurrency input need to be a positive number, received: {concurrency}"
  53. )
  54. test = _get_test(test_name)
  55. pre_sanity_check = _sanity_check(
  56. test, passing_commit, failing_commit, run_per_commit
  57. )
  58. if not pre_sanity_check:
  59. logger.info(
  60. "Failed pre-saniy check, the test might be flaky or fail due to"
  61. " an external (not a code change) factors"
  62. )
  63. return
  64. commit_lists = _get_commit_lists(passing_commit, failing_commit)
  65. blamed_commit = _bisect(test, commit_lists, concurrency, run_per_commit)
  66. logger.info(f"Blamed commit found for test {test_name}: {blamed_commit}")
  67. # TODO(can): this env var is used as a feature flag, in case we need to turn this
  68. # off quickly. We should remove this when the new db reporter is stable.
  69. if os.environ.get("UPDATE_TEST_STATE_MACHINE", False):
  70. logger.info(f"Updating test state for test {test_name} to CONSISTENTLY_FAILING")
  71. _update_test_state(test, blamed_commit)
  72. def _bisect(
  73. test: Test,
  74. commit_list: List[str],
  75. concurrency: int,
  76. run_per_commit: int,
  77. ) -> str:
  78. while len(commit_list) > 2:
  79. logger.info(
  80. f"Bisecting between {len(commit_list)} commits: "
  81. f"{commit_list[0]} to {commit_list[-1]} with concurrency {concurrency}"
  82. )
  83. idx_to_commit = {}
  84. for i in range(concurrency):
  85. idx = len(commit_list) * (i + 1) // (concurrency + 1)
  86. # make sure that idx is not at the boundary; this avoids rerun bisect
  87. # on the previously run revision
  88. idx = min(max(idx, 1), len(commit_list) - 2)
  89. idx_to_commit[idx] = commit_list[idx]
  90. outcomes = _run_test(test, set(idx_to_commit.values()), run_per_commit)
  91. passing_idx = 0
  92. failing_idx = len(commit_list) - 1
  93. for idx, commit in idx_to_commit.items():
  94. is_passing = all(
  95. outcome == "passed" for outcome in outcomes[commit].values()
  96. )
  97. if is_passing and idx > passing_idx:
  98. passing_idx = idx
  99. if not is_passing and idx < failing_idx:
  100. failing_idx = idx
  101. commit_list = commit_list[passing_idx : failing_idx + 1]
  102. return commit_list[-1]
  103. def _sanity_check(
  104. test: Test, passing_revision: str, failing_revision: str, run_per_commit: int
  105. ) -> bool:
  106. """
  107. Sanity check that the test indeed passes on the passing revision, and fails on the
  108. failing revision
  109. """
  110. logger.info(
  111. f"Sanity check passing revision: {passing_revision}"
  112. f" and failing revision: {failing_revision}"
  113. )
  114. outcomes = _run_test(test, [passing_revision, failing_revision], run_per_commit)
  115. if any(map(lambda x: x != "passed", outcomes[passing_revision].values())):
  116. return False
  117. return any(map(lambda x: x != "passed", outcomes[failing_revision].values()))
  118. def _run_test(
  119. test: Test, commits: Set[str], run_per_commit: int
  120. ) -> Dict[str, Dict[int, str]]:
  121. logger.info(f'Running test {test["name"]} on commits {commits}')
  122. for commit in commits:
  123. _trigger_test_run(test, commit, run_per_commit)
  124. return _obtain_test_result(commits, run_per_commit)
  125. def _trigger_test_run(test: Test, commit: str, run_per_commit: int) -> None:
  126. python_version = DEFAULT_PYTHON_VERSION
  127. if "python" in test:
  128. python_version = parse_python_version(test["python"])
  129. if test.is_byod_cluster():
  130. ray_wheels_url = None
  131. os.environ["COMMIT_TO_TEST"] = commit
  132. build_anyscale_byod_images([test])
  133. else:
  134. ray_wheels_url = find_and_wait_for_ray_wheels_url(
  135. commit, timeout=DEFAULT_WHEEL_WAIT_TIMEOUT, python_version=python_version
  136. )
  137. for run in range(run_per_commit):
  138. step = get_step(
  139. test,
  140. ray_wheels=ray_wheels_url,
  141. env={
  142. "RAY_COMMIT_OF_WHEEL": commit,
  143. },
  144. )
  145. step["label"] = f'{test["name"]}:{commit[:7]}-{run}'
  146. step["key"] = f"{commit}-{run}"
  147. pipeline = subprocess.Popen(
  148. ["echo", json.dumps({"steps": [step]})], stdout=subprocess.PIPE
  149. )
  150. subprocess.check_output(
  151. ["buildkite-agent", "pipeline", "upload"], stdin=pipeline.stdout
  152. )
  153. pipeline.stdout.close()
  154. def _obtain_test_result(
  155. commits: Set[str], run_per_commit: int
  156. ) -> Dict[str, Dict[int, str]]:
  157. outcomes = {}
  158. wait = 5
  159. total_wait = 0
  160. while True:
  161. logger.info(f"... waiting for test result ...({total_wait} seconds)")
  162. for commit in commits:
  163. if commit in outcomes and len(outcomes[commit]) == run_per_commit:
  164. continue
  165. for run in range(run_per_commit):
  166. outcome = subprocess.check_output(
  167. [
  168. "buildkite-agent",
  169. "step",
  170. "get",
  171. "outcome",
  172. "--step",
  173. f"{commit}-{run}",
  174. ]
  175. ).decode("utf-8")
  176. if not outcome:
  177. continue
  178. if commit not in outcomes:
  179. outcomes[commit] = {}
  180. outcomes[commit][run] = outcome
  181. all_commit_finished = len(outcomes) == len(commits)
  182. per_commit_finished = all(
  183. len(outcome) == run_per_commit for outcome in outcomes.values()
  184. )
  185. if all_commit_finished and per_commit_finished:
  186. break
  187. time.sleep(wait)
  188. total_wait = total_wait + wait
  189. logger.info(f"Final test outcomes: {outcomes}")
  190. return outcomes
  191. def _get_test(test_name: str) -> Test:
  192. test_collection = read_and_validate_release_test_collection(
  193. os.path.join(os.path.dirname(__file__), "..", "..", "release_tests.yaml")
  194. )
  195. return [test for test in test_collection if test["name"] == test_name][0]
  196. def _get_commit_lists(passing_commit: str, failing_commit: str) -> List[str]:
  197. # This command obtains all commits between inclusively
  198. return (
  199. subprocess.check_output(
  200. f"git rev-list --reverse ^{passing_commit}~ {failing_commit}",
  201. shell=True,
  202. )
  203. .decode("utf-8")
  204. .strip()
  205. .split("\n")
  206. )
  207. def _update_test_state(test: Test, blamed_commit: str) -> None:
  208. test.update_from_s3()
  209. logger.info(f"Test object: {json.dumps(test)}")
  210. test[Test.KEY_BISECT_BLAMED_COMMIT] = blamed_commit
  211. # Compute and update the next test state, then comment blamed commit on github issue
  212. sm = TestStateMachine(test)
  213. sm.move()
  214. sm.comment_blamed_commit_on_github_issue()
  215. logger.info(f"Test object: {json.dumps(test)}")
  216. test.persist_to_s3()
  217. if __name__ == "__main__":
  218. main()