setup_chaos.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. import argparse
  2. from ray.util.state.api import StateApiClient
  3. from ray.util.state.common import ListApiOptions, StateResource
  4. import ray
  5. from ray._private.test_utils import (
  6. get_and_run_resource_killer,
  7. RayletKiller,
  8. WorkerKillerActor,
  9. EC2InstanceTerminator,
  10. )
  11. def parse_script_args():
  12. parser = argparse.ArgumentParser()
  13. # '--kill-workers' to be deprecated in favor of '--chaos'
  14. parser.add_argument("--kill-workers", action="store_true", default=False)
  15. parser.add_argument(
  16. "--chaos",
  17. type=str,
  18. default="",
  19. help=(
  20. "Chaos to inject into the test environment. "
  21. "Options: KillRaylet, KillWorker, TerminateEC2Instance."
  22. ),
  23. )
  24. parser.add_argument("--kill-interval", type=int, default=60)
  25. parser.add_argument("--max-to-kill", type=int, default=2)
  26. parser.add_argument(
  27. "--no-start",
  28. action="store_true",
  29. default=False,
  30. help=(
  31. "If set, resource killer won't be starting to kill resources when "
  32. "the script is done. Driver needs to manually "
  33. "obtain the resource killer handle and invoke run method to "
  34. "start killing nodes. If not set, as soon as "
  35. "the script is done, resources will be killed every "
  36. "--kill-interval seconds."
  37. ),
  38. )
  39. parser.add_argument(
  40. "--kill-delay",
  41. type=int,
  42. default=0,
  43. help=(
  44. "Seconds to wait before node killer starts killing nodes. No-op if "
  45. "'no-start' is set."
  46. ),
  47. )
  48. parser.add_argument(
  49. "--task-names",
  50. nargs="*",
  51. default=[],
  52. )
  53. return parser.parse_known_args()
  54. def task_filter(task_names):
  55. def _task_filter():
  56. if not task_names:
  57. return lambda _: True
  58. def _filter_fn(task):
  59. return task.name in task_names
  60. return _filter_fn
  61. return _task_filter
  62. def task_node_filter(task_names):
  63. def _task_node_filter():
  64. if not task_names:
  65. return lambda _: True
  66. tasks = StateApiClient().list(
  67. StateResource.TASKS, options=ListApiOptions(), raise_on_missing_output=False
  68. )
  69. filtered_tasks = list(filter(lambda task: task.name in task_names, tasks))
  70. nodes_with_filtered_tasks = {task.node_id for task in filtered_tasks}
  71. def _filter_fn(node):
  72. return node["NodeID"] in nodes_with_filtered_tasks
  73. return _filter_fn
  74. return _task_node_filter
  75. def get_chaos_killer(args):
  76. if args.chaos != "":
  77. chaos_type = args.chaos
  78. elif args.kill_workers:
  79. chaos_type = "KillWorker"
  80. else:
  81. chaos_type = "KillRaylet" # default
  82. if chaos_type == "KillRaylet":
  83. return RayletKiller, task_node_filter(args.task_names)
  84. elif chaos_type == "KillWorker":
  85. return WorkerKillerActor, task_filter(args.task_names)
  86. elif chaos_type == "TerminateEC2Instance":
  87. return EC2InstanceTerminator, task_node_filter(args.task_names)
  88. else:
  89. raise ValueError(f"Chaos type {chaos_type} not supported.")
  90. def main():
  91. """Start the chaos testing.
  92. Currently, chaos testing only covers random node failures.
  93. """
  94. args, _ = parse_script_args()
  95. ray.init(address="auto")
  96. resource_killer_cls, kill_filter_fn = get_chaos_killer(args)
  97. get_and_run_resource_killer(
  98. resource_killer_cls,
  99. args.kill_interval,
  100. namespace="release_test_namespace",
  101. lifetime="detached",
  102. no_start=args.no_start,
  103. max_to_kill=args.max_to_kill,
  104. kill_delay_s=args.kill_delay,
  105. kill_filter_fn=kill_filter_fn,
  106. )
  107. print(
  108. f"Successfully deployed a {'worker' if args.kill_workers else 'node'} killer."
  109. )
  110. main()