run_gcs_ft_on_k8s.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. import enum
  2. import json
  3. import subprocess
  4. from kubernetes import client, config, watch
  5. import requests
  6. import random
  7. import uuid
  8. import pathlib
  9. import time
  10. import ray
  11. import os
  12. # global variables for the cluster informations
  13. CLUSTER_ID = None
  14. RAY_CLUSTER_NAME = None
  15. RAY_SERVICE_NAME = None
  16. LOCUST_ID = None
  17. # Kill node type
  18. class TestScenario(enum.Enum):
  19. KILL_WORKER_NODE = "kill_worker_node"
  20. KILL_HEAD_NODE = "kill_head_node"
  21. if os.environ.get("RAY_IMAGE") is not None:
  22. ray_image = os.environ.get("RAY_IMAGE")
  23. elif ray.__version__ != "3.0.0.dev0":
  24. ray_image = f"rayproject/ray:{ray.__version__}"
  25. elif ray.__commit__ == "{{RAY_COMMIT_SHA}}":
  26. ray_image = "rayproject/ray:nightly"
  27. else:
  28. ray_image = f"rayproject/ray:{ray.__commit__[:6]}"
  29. config.load_kube_config()
  30. cli = client.CoreV1Api()
  31. yaml_path = pathlib.Path("/tmp/ray_v1alpha1_rayservice.yaml")
  32. def generate_cluster_variable():
  33. global CLUSTER_ID
  34. global RAY_CLUSTER_NAME
  35. global RAY_SERVICE_NAME
  36. global LOCUST_ID
  37. CLUSTER_ID = str(uuid.uuid4()).split("-")[0]
  38. RAY_CLUSTER_NAME = "cluster-" + CLUSTER_ID
  39. RAY_SERVICE_NAME = "service-" + CLUSTER_ID
  40. LOCUST_ID = "ray-locust-" + CLUSTER_ID
  41. def check_kuberay_installed():
  42. # Make sure the ray namespace exists
  43. KUBERAY_VERSION = "v1.2.2"
  44. uri = (
  45. "github.com/ray-project/kuberay/manifests"
  46. f"/base?ref={KUBERAY_VERSION}&timeout=90s"
  47. )
  48. print(
  49. subprocess.check_output(
  50. [
  51. "kubectl",
  52. "apply",
  53. "-k",
  54. uri,
  55. ]
  56. ).decode()
  57. )
  58. pods = subprocess.check_output(
  59. ["kubectl", "get", "pods", "--namespace", "ray-system", "--no-headers"]
  60. ).decode()
  61. assert pods.split("\n") != 0
  62. def start_rayservice():
  63. # step-1: generate the yaml file
  64. print(f"Using ray image: {ray_image}")
  65. solution = "\n".join(
  66. [
  67. f" {line}"
  68. for line in pathlib.Path("./solution.py").read_text().splitlines()
  69. ]
  70. )
  71. locustfile = "\n".join(
  72. [
  73. f" {line}"
  74. for line in pathlib.Path("./locustfile.py").read_text().splitlines()
  75. ]
  76. )
  77. template = (
  78. pathlib.Path("ray_v1alpha1_rayservice_template.yaml")
  79. .read_text()
  80. .format(
  81. cluster_id=CLUSTER_ID,
  82. ray_image=ray_image,
  83. solution=solution,
  84. locustfile=locustfile,
  85. )
  86. )
  87. print("=== YamlFile ===")
  88. print(template)
  89. tmp_yaml = pathlib.Path("/tmp/ray_v1alpha1_rayservice.yaml")
  90. tmp_yaml.write_text(template)
  91. print("=== Get Pods from ray-system ===")
  92. print(
  93. subprocess.check_output(
  94. ["kubectl", "get", "pods", "--namespace", "ray-system", "--no-headers"]
  95. ).decode()
  96. )
  97. # step-2: create the cluter
  98. print(f"Creating cluster with id: {CLUSTER_ID}")
  99. print(subprocess.check_output(["kubectl", "create", "-f", str(tmp_yaml)]).decode())
  100. # step-3: make sure the ray cluster is up
  101. w = watch.Watch()
  102. start_time = time.time()
  103. head_pod_name = None
  104. for event in w.stream(
  105. func=cli.list_namespaced_pod,
  106. namespace="default",
  107. label_selector=f"rayCluster={RAY_CLUSTER_NAME},ray.io/node-type=head",
  108. timeout_seconds=60,
  109. ):
  110. if event["object"].status.phase == "Running":
  111. assert event["object"].kind == "Pod"
  112. head_pod_name = event["object"].metadata.name
  113. end_time = time.time()
  114. print(f"{CLUSTER_ID} started in {end_time-start_time} sec")
  115. print(f"head pod {head_pod_name}")
  116. break
  117. assert head_pod_name is not None
  118. # step-4: e2e check it's alive
  119. cmd = """
  120. import requests
  121. print(requests.get('http://localhost:8000/?val=123').text)
  122. """
  123. while True:
  124. try:
  125. resp = (
  126. subprocess.check_output(
  127. f'kubectl exec {head_pod_name} -- python -c "{cmd}"', shell=True
  128. )
  129. .decode()
  130. .strip()
  131. )
  132. if resp == "375":
  133. print("Service is up now!")
  134. break
  135. else:
  136. print(f"Failed with msg {resp}")
  137. except Exception as e:
  138. print("Error", e)
  139. time.sleep(2)
  140. def start_port_forward():
  141. proc = None
  142. proc = subprocess.Popen(
  143. [
  144. "kubectl",
  145. "port-forward",
  146. f"svc/{RAY_SERVICE_NAME}-serve-svc",
  147. "8000:8000",
  148. "--address=0.0.0.0",
  149. ]
  150. )
  151. while True:
  152. try:
  153. resp = requests.get(
  154. "http://localhost:8000/",
  155. timeout=1,
  156. params={
  157. "val": 10,
  158. },
  159. )
  160. if resp.status_code == 200:
  161. print("The ray service is ready!!!")
  162. break
  163. except requests.exceptions.Timeout:
  164. pass
  165. except requests.exceptions.ConnectionError:
  166. pass
  167. print("Waiting for the proxy to be alive")
  168. time.sleep(1)
  169. return proc
  170. def warmup_cluster(num_reqs):
  171. for _ in range(num_reqs):
  172. resp = requests.get(
  173. "http://localhost:8000/",
  174. timeout=1,
  175. params={
  176. "val": 10,
  177. },
  178. )
  179. assert resp.status_code == 200
  180. def start_sending_traffics(duration, users):
  181. print("=== Install locust by helm ===")
  182. yaml_config = (
  183. pathlib.Path("locust-run.yaml")
  184. .read_text()
  185. .format(users=users, cluster_id=CLUSTER_ID, duration=int(duration))
  186. )
  187. print("=== Locust YAML ===")
  188. print(yaml_config)
  189. pathlib.Path("/tmp/locust-run-config.yaml").write_text(yaml_config)
  190. helm_install_logs = subprocess.check_output(
  191. [
  192. "helm",
  193. "install",
  194. LOCUST_ID,
  195. "deliveryhero/locust",
  196. "-f",
  197. "/tmp/locust-run-config.yaml",
  198. ]
  199. )
  200. print(helm_install_logs)
  201. timeout_wait_for_locust_s = 300
  202. while timeout_wait_for_locust_s > 0:
  203. labels = [
  204. f"app.kubernetes.io/instance=ray-locust-{CLUSTER_ID}",
  205. "app.kubernetes.io/name=locust,component=master",
  206. ]
  207. pods = cli.list_namespaced_pod("default", label_selector=",".join(labels))
  208. assert len(pods.items) == 1
  209. if pods.items[0].status.phase == "Pending":
  210. print("Waiting for the locust pod to be ready...")
  211. time.sleep(30)
  212. timeout_wait_for_locust_s -= 30
  213. else:
  214. break
  215. proc = subprocess.Popen(
  216. [
  217. "kubectl",
  218. "port-forward",
  219. f"svc/ray-locust-{CLUSTER_ID}",
  220. "8080:8089",
  221. "--address=0.0.0.0",
  222. ]
  223. )
  224. return proc
  225. def dump_pods_actors(pod_name):
  226. print(
  227. subprocess.run(
  228. f"kubectl exec {pod_name} -- ps -ef | grep ::",
  229. shell=True,
  230. capture_output=True,
  231. ).stdout.decode()
  232. )
  233. def kill_head():
  234. pods = cli.list_namespaced_pod(
  235. "default",
  236. label_selector=f"rayCluster={RAY_CLUSTER_NAME},ray.io/node-type=head",
  237. )
  238. if pods.items[0].status.phase == "Running":
  239. print(f"Killing header {pods.items[0].metadata.name}")
  240. dump_pods_actors(pods.items[0].metadata.name)
  241. cli.delete_namespaced_pod(pods.items[0].metadata.name, "default")
  242. def kill_worker():
  243. pods = cli.list_namespaced_pod(
  244. "default",
  245. label_selector=f"rayCluster={RAY_CLUSTER_NAME},ray.io/node-type=worker",
  246. )
  247. alive_pods = [
  248. (p.status.start_time, p.metadata.name)
  249. for p in pods.items
  250. if p.status.phase == "Running"
  251. ]
  252. # sorted(alive_pods)
  253. # We kill the oldest nodes for now given the memory leak in serve.
  254. # to_be_killed = alive_pods[-1][1]
  255. to_be_killed = random.choice(alive_pods)[1]
  256. print(f"Killing worker {to_be_killed}")
  257. dump_pods_actors(pods.items[0].metadata.name)
  258. cli.delete_namespaced_pod(to_be_killed, "default")
  259. def start_killing_nodes(duration, kill_interval, kill_node_type):
  260. """Kill the nodes in ray cluster.
  261. duration: How long does we run the test (seconds)
  262. kill_interval: The interval between two kills (seconds)
  263. kill_head_every_n: For every n kills, we kill a head node
  264. kill_node_type: kill either worker node or head node
  265. """
  266. for kill_idx in range(1, int(duration / kill_interval)):
  267. while True:
  268. try:
  269. # kill
  270. if kill_node_type == TestScenario.KILL_HEAD_NODE:
  271. kill_head()
  272. elif kill_node_type == TestScenario.KILL_WORKER_NODE:
  273. kill_worker()
  274. break
  275. except Exception as e:
  276. from time import sleep
  277. print(f"Fail to kill node, retry in 5 seconds: {e}")
  278. sleep(5)
  279. time.sleep(kill_interval)
  280. def get_stats():
  281. labels = [
  282. f"app.kubernetes.io/instance=ray-locust-{CLUSTER_ID}",
  283. "app.kubernetes.io/name=locust,component=master",
  284. ]
  285. pods = cli.list_namespaced_pod("default", label_selector=",".join(labels))
  286. assert len(pods.items) == 1
  287. pod_name = pods.items[0].metadata.name
  288. subprocess.check_output(
  289. [
  290. "kubectl",
  291. "cp",
  292. f"{pod_name}:/home/locust/test_result_{CLUSTER_ID}_stats_history.csv",
  293. "./stats_history.csv",
  294. ]
  295. )
  296. data = []
  297. with open("stats_history.csv") as f:
  298. import csv
  299. reader = csv.reader(f)
  300. for d in reader:
  301. data.append(d)
  302. # The first 5mins is for warming up
  303. offset = 300
  304. start_time = int(data[offset][0])
  305. end_time = int(data[-1][0])
  306. # 17 is the index for total requests
  307. # 18 is the index for total failed requests
  308. total = float(data[-1][17]) - float(data[offset][17])
  309. failures = float(data[-1][18]) - float(data[offset][18])
  310. # Available, through put
  311. return (total - failures) / total, total / (end_time - start_time), data
  312. def main():
  313. result = {
  314. TestScenario.KILL_WORKER_NODE.value: {"rate": None},
  315. TestScenario.KILL_HEAD_NODE.value: {"rate": None},
  316. }
  317. expected_result = {
  318. TestScenario.KILL_HEAD_NODE: 0.99,
  319. TestScenario.KILL_HEAD_NODE: 0.99,
  320. }
  321. check_kuberay_installed()
  322. users = 60
  323. exception = None
  324. for kill_node_type, kill_interval, test_duration in [
  325. (TestScenario.KILL_WORKER_NODE, 60, 600),
  326. (TestScenario.KILL_HEAD_NODE, 300, 1200),
  327. ]:
  328. try:
  329. generate_cluster_variable()
  330. procs = []
  331. start_rayservice()
  332. procs.append(start_port_forward())
  333. warmup_cluster(200)
  334. procs.append(start_sending_traffics(test_duration * 1.1, users))
  335. start_killing_nodes(test_duration, kill_interval, kill_node_type)
  336. rate, qps, data = get_stats()
  337. print("Raw Data", data, qps)
  338. result[kill_node_type.value]["rate"] = rate
  339. assert expected_result[kill_node_type] <= rate
  340. assert qps > users * 10 * 0.8
  341. except Exception as e:
  342. print(f"{kill_node_type} HA test failed, {e}")
  343. exception = e
  344. finally:
  345. print("=== Cleanup ===")
  346. subprocess.run(
  347. ["kubectl", "delete", "-f", str(yaml_path)],
  348. capture_output=True,
  349. )
  350. subprocess.run(
  351. ["helm", "uninstall", LOCUST_ID],
  352. capture_output=True,
  353. )
  354. for p in procs:
  355. p.kill()
  356. print("==== Cleanup done ===")
  357. if exception:
  358. raise exception
  359. print("Result:", result)
  360. test_output_json_path = os.environ.get(
  361. "TEST_OUTPUT_JSON", "/tmp/release_test_output.json"
  362. )
  363. with open(test_output_json_path, "wt") as f:
  364. json.dump(result, f)
  365. if __name__ == "__main__":
  366. try:
  367. # Connect to ray so that the auto suspense
  368. # will not start.
  369. ray.init("auto")
  370. except Exception:
  371. # It doesnt' matter if it failed.
  372. pass
  373. main()