123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- import subprocess
- import sys
- import time
- from typing import List
- import ray
- from ray.streaming import StreamingContext
- def test_word_count():
- try:
- ray.init(
- job_config=ray.job_config.JobConfig(code_search_path=sys.path))
- # time.sleep(10) # for gdb to attach
- ctx = StreamingContext.Builder() \
- .option("streaming.context-backend.type", "local_file") \
- .option(
- "streaming.context-backend.file-state.root",
- "/tmp/ray/cp_files/"
- ) \
- .option("streaming.checkpoint.timeout.secs", "3") \
- .build()
- print("-----------submit job-------------")
- ctx.read_text_file(__file__) \
- .set_parallelism(1) \
- .flat_map(lambda x: x.split()) \
- .map(lambda x: (x, 1)) \
- .key_by(lambda x: x[0]) \
- .reduce(lambda old_value, new_value:
- (old_value[0], old_value[1] + new_value[1])) \
- .filter(lambda x: "ray" not in x) \
- .sink(lambda x: print("####result", x))
- ctx.submit("word_count")
- print("-----------checking output-------------")
- retry_count = 180 / 5 # wait for 3min
- while not has_sink_output():
- time.sleep(5)
- retry_count -= 1
- if retry_count <= 0:
- raise RuntimeError("Can not find output")
- print("-----------killing worker-------------")
- time.sleep(5)
- kill_all_worker()
- print("-----------checking checkpoint-------------")
- cp_ok_num = checkpoint_success_num()
- retry_count = 300000 / 5 # wait for 5min
- while True:
- cur_cp_num = checkpoint_success_num()
- print("-----------checking checkpoint"
- ", cur_cp_num={}, old_cp_num={}-------------".format(
- cur_cp_num, cp_ok_num))
- if cur_cp_num > cp_ok_num:
- print("--------------TEST OK!------------------")
- break
- time.sleep(5)
- retry_count -= 1
- if retry_count <= 0:
- raise RuntimeError(
- "Checkpoint keeps failing after fail-over, test failed!")
- finally:
- ray.shutdown()
- def run_cmd(cmd: List):
- try:
- out = subprocess.check_output(cmd).decode()
- except subprocess.CalledProcessError as e:
- out = str(e)
- return out
- def grep_log(keyword: str) -> str:
- out = subprocess.check_output(
- ["grep", "-r", keyword, "/tmp/ray/session_latest/logs"])
- return out.decode()
- def has_sink_output() -> bool:
- try:
- grep_log("####result")
- return True
- except Exception:
- return False
- def checkpoint_success_num() -> int:
- try:
- return grep_log("Finish checkpoint").count("\n")
- except Exception:
- return 0
- def kill_all_worker():
- cmd = [
- "bash", "-c", "grep -r \'Initializing job worker, exe_vert\' "
- " /tmp/ray/session_latest/logs | awk -F\'pid\' \'{print $2}\'"
- "| awk -F\'=\' \'{print $2}\'" + "| xargs kill -9"
- ]
- print(cmd)
- return subprocess.run(cmd)
- if __name__ == "__main__":
- test_word_count()
|