long_running_many_jobs.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. """Job submission long running test
  2. Submits many simple jobs on a long running cluster.
  3. Test owner: architkulkarni
  4. Acceptance criteria: Should run through and print "PASSED"
  5. """
  6. import argparse
  7. import os
  8. import time
  9. import random
  10. from typing import List, Optional
  11. from ray.dashboard.modules.job.common import JobStatus
  12. from ray.dashboard.modules.job.pydantic_models import JobDetails
  13. import ray
  14. from ray.job_submission import JobSubmissionClient
  15. from ray._private.test_utils import safe_write_to_results_json
  16. NUM_CLIENTS = 4
  17. NUM_JOBS_PER_BATCH = 4
  18. SMOKE_TEST_TIMEOUT = 10 * 60 # 10 minutes
  19. FULL_TEST_TIMEOUT = 8 * 60 * 60 # 8 hours
  20. def wait_until_finish(
  21. client: JobSubmissionClient,
  22. job_id: str,
  23. timeout_s: int = 10 * 60,
  24. retry_interval_s: int = 10,
  25. ) -> Optional[JobStatus]:
  26. start_time_s = time.time()
  27. while time.time() - start_time_s <= timeout_s:
  28. # Test calling list_jobs
  29. client.list_jobs()
  30. status = client.get_job_status(job_id)
  31. if status in {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED}:
  32. return status
  33. time.sleep(retry_interval_s)
  34. return None
  35. def submit_batch_jobs(
  36. clients: List[JobSubmissionClient],
  37. num_jobs: int,
  38. timeout_s: int = 10 * 60,
  39. retry_interval_s: int = 1,
  40. ) -> bool:
  41. job_ids = []
  42. for i in range(num_jobs):
  43. # Cycle through clients arbitrarily
  44. client = clients[i % len(clients)]
  45. job_id = client.submit_job(
  46. entrypoint="echo hello",
  47. )
  48. job_ids.append(job_id)
  49. print(f"submitted job: {job_id}")
  50. for job_id in job_ids:
  51. client = clients[job_ids.index(job_id) % len(clients)]
  52. status = wait_until_finish(client, job_id, timeout_s, retry_interval_s)
  53. if status != JobStatus.SUCCEEDED:
  54. print(
  55. f"Info for failed/timed-out job {job_id}: {client.get_job_info(job_id)}"
  56. )
  57. print(
  58. f"Logs for failed/timed-out job {job_id}: {client.get_job_logs(job_id)}"
  59. )
  60. print(
  61. f"Job {job_id} failed with status {status} (`None` indicates timeout)"
  62. )
  63. return False
  64. return True
  65. if __name__ == "__main__":
  66. parser = argparse.ArgumentParser()
  67. parser.add_argument(
  68. "--smoke-test", action="store_true", help="Finish quickly for testing."
  69. )
  70. parser.add_argument("--num-clients", type=int, default=NUM_CLIENTS)
  71. parser.add_argument("--num-jobs-per-batch", type=int, default=NUM_JOBS_PER_BATCH)
  72. args = parser.parse_args()
  73. if args.smoke_test:
  74. print(f"Running smoke test with timeout {SMOKE_TEST_TIMEOUT} seconds")
  75. timeout = SMOKE_TEST_TIMEOUT
  76. else:
  77. print(f"Running full test (timeout: {FULL_TEST_TIMEOUT}s)")
  78. timeout = FULL_TEST_TIMEOUT
  79. start = time.time()
  80. ray.init()
  81. address = os.environ.get("RAY_ADDRESS")
  82. job_name = os.environ.get("RAY_JOB_NAME", "jobs_basic")
  83. if address is None or not address.startswith("anyscale://"):
  84. address = "http://127.0.0.1:8265"
  85. clients = [JobSubmissionClient(address) for i in range(NUM_CLIENTS)]
  86. batch_counter = 0
  87. while time.time() - start < timeout:
  88. batch_counter += 1
  89. print(f"Submitting batch {batch_counter}...")
  90. # Submit a batch of jobs
  91. if not submit_batch_jobs(clients, NUM_JOBS_PER_BATCH):
  92. print("FAILED")
  93. exit(1)
  94. # Test list jobs
  95. jobs: List[JobDetails] = clients[0].list_jobs()
  96. print(f"Total jobs submitted so far: {len(jobs)}")
  97. # Get job logs from random submission job
  98. is_submission_job = False
  99. while not is_submission_job:
  100. job_details = random.choice(jobs)
  101. is_submission_job = job_details.type == "SUBMISSION"
  102. job_id = job_details.submission_id
  103. print(f"Getting logs for randomly chosen job {job_id}...")
  104. logs = clients[0].get_job_logs(job_id)
  105. print(logs)
  106. time_taken = time.time() - start
  107. result = {
  108. "time_taken": time_taken,
  109. }
  110. safe_write_to_results_json(result, "/tmp/jobs_basic.json")
  111. print("PASSED")