test_failover.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. import subprocess
  2. import sys
  3. import time
  4. from typing import List
  5. import ray
  6. from ray.streaming import StreamingContext
  7. def test_word_count():
  8. try:
  9. ray.init(
  10. job_config=ray.job_config.JobConfig(code_search_path=sys.path))
  11. # time.sleep(10) # for gdb to attach
  12. ctx = StreamingContext.Builder() \
  13. .option("streaming.context-backend.type", "local_file") \
  14. .option(
  15. "streaming.context-backend.file-state.root",
  16. "/tmp/ray/cp_files/"
  17. ) \
  18. .option("streaming.checkpoint.timeout.secs", "3") \
  19. .build()
  20. print("-----------submit job-------------")
  21. ctx.read_text_file(__file__) \
  22. .set_parallelism(1) \
  23. .flat_map(lambda x: x.split()) \
  24. .map(lambda x: (x, 1)) \
  25. .key_by(lambda x: x[0]) \
  26. .reduce(lambda old_value, new_value:
  27. (old_value[0], old_value[1] + new_value[1])) \
  28. .filter(lambda x: "ray" not in x) \
  29. .sink(lambda x: print("####result", x))
  30. ctx.submit("word_count")
  31. print("-----------checking output-------------")
  32. retry_count = 180 / 5 # wait for 3min
  33. while not has_sink_output():
  34. time.sleep(5)
  35. retry_count -= 1
  36. if retry_count <= 0:
  37. raise RuntimeError("Can not find output")
  38. print("-----------killing worker-------------")
  39. time.sleep(5)
  40. kill_all_worker()
  41. print("-----------checking checkpoint-------------")
  42. cp_ok_num = checkpoint_success_num()
  43. retry_count = 300000 / 5 # wait for 5min
  44. while True:
  45. cur_cp_num = checkpoint_success_num()
  46. print("-----------checking checkpoint"
  47. ", cur_cp_num={}, old_cp_num={}-------------".format(
  48. cur_cp_num, cp_ok_num))
  49. if cur_cp_num > cp_ok_num:
  50. print("--------------TEST OK!------------------")
  51. break
  52. time.sleep(5)
  53. retry_count -= 1
  54. if retry_count <= 0:
  55. raise RuntimeError(
  56. "Checkpoint keeps failing after fail-over, test failed!")
  57. finally:
  58. ray.shutdown()
  59. def run_cmd(cmd: List):
  60. try:
  61. out = subprocess.check_output(cmd).decode()
  62. except subprocess.CalledProcessError as e:
  63. out = str(e)
  64. return out
  65. def grep_log(keyword: str) -> str:
  66. out = subprocess.check_output(
  67. ["grep", "-r", keyword, "/tmp/ray/session_latest/logs"])
  68. return out.decode()
  69. def has_sink_output() -> bool:
  70. try:
  71. grep_log("####result")
  72. return True
  73. except Exception:
  74. return False
  75. def checkpoint_success_num() -> int:
  76. try:
  77. return grep_log("Finish checkpoint").count("\n")
  78. except Exception:
  79. return 0
  80. def kill_all_worker():
  81. cmd = [
  82. "bash", "-c", "grep -r \'Initializing job worker, exe_vert\' "
  83. " /tmp/ray/session_latest/logs | awk -F\'pid\' \'{print $2}\'"
  84. "| awk -F\'=\' \'{print $2}\'" + "| xargs kill -9"
  85. ]
  86. print(cmd)
  87. return subprocess.run(cmd)
  88. if __name__ == "__main__":
  89. test_word_count()