jobs_basic.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. """Job submission test
  2. This test runs a basic Tune job on a remote cluster.
  3. Test owner: architkulkarni
  4. Acceptance criteria: Should run through and print "PASSED"
  5. """
  6. import argparse
  7. import json
  8. import os
  9. import time
  10. from typing import Optional
  11. from ray.dashboard.modules.job.common import JobStatus
  12. from ray.job_submission import JobSubmissionClient
  13. def wait_until_finish(
  14. client: JobSubmissionClient,
  15. job_id: str,
  16. timeout_s: int = 10 * 60,
  17. retry_interval_s: int = 1,
  18. ) -> Optional[JobStatus]:
  19. start_time_s = time.time()
  20. while time.time() - start_time_s <= timeout_s:
  21. status = client.get_job_status(job_id)
  22. print(f"status: {status}")
  23. if status in {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED}:
  24. return status
  25. time.sleep(retry_interval_s)
  26. return None
  27. if __name__ == "__main__":
  28. parser = argparse.ArgumentParser()
  29. parser.add_argument(
  30. "--smoke-test", action="store_true", help="Finish quickly for testing."
  31. )
  32. parser.add_argument(
  33. "--working-dir",
  34. required=True,
  35. help="working_dir to use for the job within this test.",
  36. )
  37. args = parser.parse_args()
  38. start = time.time()
  39. address = os.environ.get("RAY_ADDRESS")
  40. job_name = os.environ.get("RAY_JOB_NAME", "jobs_basic")
  41. if address is not None and address.startswith("anyscale://"):
  42. pass
  43. else:
  44. address = "http://127.0.0.1:8265"
  45. client = JobSubmissionClient(address)
  46. job_id = client.submit_job(
  47. entrypoint="python run_simple_tune_job.py",
  48. runtime_env={"pip": ["ray[tune]"], "working_dir": args.working_dir},
  49. )
  50. timeout_s = 10 * 60
  51. status = wait_until_finish(client=client, job_id=job_id, timeout_s=timeout_s)
  52. print("Status message: ", client.get_job_info(job_id=job_id).message)
  53. assert status == JobStatus.SUCCEEDED
  54. taken = time.time() - start
  55. result = {
  56. "time_taken": taken,
  57. }
  58. test_output_json = os.environ.get("TEST_OUTPUT_JSON", "/tmp/jobs_basic.json")
  59. with open(test_output_json, "wt") as f:
  60. json.dump(result, f)
  61. logs = client.get_job_logs(job_id)
  62. assert "Starting Ray Tune job" in logs
  63. assert "Best config:" in logs
  64. print("PASSED")