123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241 |
- import click
- import subprocess
- import os
- import json
- import time
- from typing import Dict, List, Set
- from ray_release.logger import logger
- from ray_release.buildkite.step import get_step
- from ray_release.byod.build import build_anyscale_byod_images
- from ray_release.config import (
- read_and_validate_release_test_collection,
- parse_python_version,
- DEFAULT_WHEEL_WAIT_TIMEOUT,
- )
- from ray_release.test import (
- Test,
- DEFAULT_PYTHON_VERSION,
- )
- from ray_release.test_automation.state_machine import TestStateMachine
- from ray_release.wheels import find_and_wait_for_ray_wheels_url
- @click.command()
- @click.argument("test_name", required=True, type=str)
- @click.argument("passing_commit", required=True, type=str)
- @click.argument("failing_commit", required=True, type=str)
- @click.option(
- "--concurrency",
- default=3,
- type=int,
- help=(
- "Maximum number of concurrent test jobs to run. Higher number uses more "
- "capacity, but reduce the bisect duration"
- ),
- )
- @click.option(
- "--run-per-commit",
- default=1,
- type=int,
- help=(
- "The number of time we run test on the same commit, to account for test "
- "flakiness. Commit passes only when it passes on all runs"
- ),
- )
- def main(
- test_name: str,
- passing_commit: str,
- failing_commit: str,
- concurrency: int = 1,
- run_per_commit: int = 1,
- ) -> None:
- if concurrency <= 0:
- raise ValueError(
- f"Concurrency input need to be a positive number, received: {concurrency}"
- )
- test = _get_test(test_name)
- pre_sanity_check = _sanity_check(
- test, passing_commit, failing_commit, run_per_commit
- )
- if not pre_sanity_check:
- logger.info(
- "Failed pre-saniy check, the test might be flaky or fail due to"
- " an external (not a code change) factors"
- )
- return
- commit_lists = _get_commit_lists(passing_commit, failing_commit)
- blamed_commit = _bisect(test, commit_lists, concurrency, run_per_commit)
- logger.info(f"Blamed commit found for test {test_name}: {blamed_commit}")
- # TODO(can): this env var is used as a feature flag, in case we need to turn this
- # off quickly. We should remove this when the new db reporter is stable.
- if os.environ.get("UPDATE_TEST_STATE_MACHINE", False):
- logger.info(f"Updating test state for test {test_name} to CONSISTENTLY_FAILING")
- _update_test_state(test, blamed_commit)
- def _bisect(
- test: Test,
- commit_list: List[str],
- concurrency: int,
- run_per_commit: int,
- ) -> str:
- while len(commit_list) > 2:
- logger.info(
- f"Bisecting between {len(commit_list)} commits: "
- f"{commit_list[0]} to {commit_list[-1]} with concurrency {concurrency}"
- )
- idx_to_commit = {}
- for i in range(concurrency):
- idx = len(commit_list) * (i + 1) // (concurrency + 1)
- # make sure that idx is not at the boundary; this avoids rerun bisect
- # on the previously run revision
- idx = min(max(idx, 1), len(commit_list) - 2)
- idx_to_commit[idx] = commit_list[idx]
- outcomes = _run_test(test, set(idx_to_commit.values()), run_per_commit)
- passing_idx = 0
- failing_idx = len(commit_list) - 1
- for idx, commit in idx_to_commit.items():
- is_passing = all(
- outcome == "passed" for outcome in outcomes[commit].values()
- )
- if is_passing and idx > passing_idx:
- passing_idx = idx
- if not is_passing and idx < failing_idx:
- failing_idx = idx
- commit_list = commit_list[passing_idx : failing_idx + 1]
- return commit_list[-1]
- def _sanity_check(
- test: Test, passing_revision: str, failing_revision: str, run_per_commit: int
- ) -> bool:
- """
- Sanity check that the test indeed passes on the passing revision, and fails on the
- failing revision
- """
- logger.info(
- f"Sanity check passing revision: {passing_revision}"
- f" and failing revision: {failing_revision}"
- )
- outcomes = _run_test(test, [passing_revision, failing_revision], run_per_commit)
- if any(map(lambda x: x != "passed", outcomes[passing_revision].values())):
- return False
- return any(map(lambda x: x != "passed", outcomes[failing_revision].values()))
- def _run_test(
- test: Test, commits: Set[str], run_per_commit: int
- ) -> Dict[str, Dict[int, str]]:
- logger.info(f'Running test {test["name"]} on commits {commits}')
- for commit in commits:
- _trigger_test_run(test, commit, run_per_commit)
- return _obtain_test_result(commits, run_per_commit)
- def _trigger_test_run(test: Test, commit: str, run_per_commit: int) -> None:
- python_version = DEFAULT_PYTHON_VERSION
- if "python" in test:
- python_version = parse_python_version(test["python"])
- if test.is_byod_cluster():
- ray_wheels_url = None
- os.environ["COMMIT_TO_TEST"] = commit
- build_anyscale_byod_images([test])
- else:
- ray_wheels_url = find_and_wait_for_ray_wheels_url(
- commit, timeout=DEFAULT_WHEEL_WAIT_TIMEOUT, python_version=python_version
- )
- for run in range(run_per_commit):
- step = get_step(
- test,
- ray_wheels=ray_wheels_url,
- env={
- "RAY_COMMIT_OF_WHEEL": commit,
- },
- )
- step["label"] = f'{test["name"]}:{commit[:7]}-{run}'
- step["key"] = f"{commit}-{run}"
- pipeline = subprocess.Popen(
- ["echo", json.dumps({"steps": [step]})], stdout=subprocess.PIPE
- )
- subprocess.check_output(
- ["buildkite-agent", "pipeline", "upload"], stdin=pipeline.stdout
- )
- pipeline.stdout.close()
- def _obtain_test_result(
- commits: Set[str], run_per_commit: int
- ) -> Dict[str, Dict[int, str]]:
- outcomes = {}
- wait = 5
- total_wait = 0
- while True:
- logger.info(f"... waiting for test result ...({total_wait} seconds)")
- for commit in commits:
- if commit in outcomes and len(outcomes[commit]) == run_per_commit:
- continue
- for run in range(run_per_commit):
- outcome = subprocess.check_output(
- [
- "buildkite-agent",
- "step",
- "get",
- "outcome",
- "--step",
- f"{commit}-{run}",
- ]
- ).decode("utf-8")
- if not outcome:
- continue
- if commit not in outcomes:
- outcomes[commit] = {}
- outcomes[commit][run] = outcome
- all_commit_finished = len(outcomes) == len(commits)
- per_commit_finished = all(
- len(outcome) == run_per_commit for outcome in outcomes.values()
- )
- if all_commit_finished and per_commit_finished:
- break
- time.sleep(wait)
- total_wait = total_wait + wait
- logger.info(f"Final test outcomes: {outcomes}")
- return outcomes
- def _get_test(test_name: str) -> Test:
- test_collection = read_and_validate_release_test_collection(
- os.path.join(os.path.dirname(__file__), "..", "..", "release_tests.yaml")
- )
- return [test for test in test_collection if test["name"] == test_name][0]
- def _get_commit_lists(passing_commit: str, failing_commit: str) -> List[str]:
- # This command obtains all commits between inclusively
- return (
- subprocess.check_output(
- f"git rev-list --reverse ^{passing_commit}~ {failing_commit}",
- shell=True,
- )
- .decode("utf-8")
- .strip()
- .split("\n")
- )
- def _update_test_state(test: Test, blamed_commit: str) -> None:
- test.update_from_s3()
- logger.info(f"Test object: {json.dumps(test)}")
- test[Test.KEY_BISECT_BLAMED_COMMIT] = blamed_commit
- # Compute and update the next test state, then comment blamed commit on github issue
- sm = TestStateMachine(test)
- sm.move()
- sm.comment_blamed_commit_on_github_issue()
- logger.info(f"Test object: {json.dumps(test)}")
- test.persist_to_s3()
- if __name__ == "__main__":
- main()
|