jobs_specify_num_gpus.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. """Job submission test
  2. This test checks that when using the Ray Jobs API with num_gpus
  3. specified, the driver is run on a node that has a GPU.
  4. Test owner: architkulkarni
  5. Acceptance criteria: Should run through and print "PASSED"
  6. """
  7. import argparse
  8. import json
  9. import os
  10. import time
  11. import torch
  12. from typing import Optional
  13. from ray.dashboard.modules.job.common import JobStatus
  14. from ray.job_submission import JobSubmissionClient
  15. def wait_until_finish(
  16. client: JobSubmissionClient,
  17. job_id: str,
  18. timeout_s: int = 10 * 60,
  19. retry_interval_s: int = 1,
  20. ) -> Optional[JobStatus]:
  21. start_time_s = time.time()
  22. while time.time() - start_time_s <= timeout_s:
  23. status = client.get_job_status(job_id)
  24. print(f"status: {status}")
  25. if status in {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED}:
  26. return status
  27. time.sleep(retry_interval_s)
  28. return None
  29. if __name__ == "__main__":
  30. parser = argparse.ArgumentParser()
  31. parser.add_argument(
  32. "--smoke-test", action="store_true", help="Finish quickly for testing."
  33. )
  34. parser.add_argument(
  35. "--working-dir",
  36. required=True,
  37. help="working_dir to use for the job within this test.",
  38. )
  39. args = parser.parse_args()
  40. start = time.time()
  41. address = os.environ.get("RAY_ADDRESS")
  42. job_name = os.environ.get("RAY_JOB_NAME", "jobs_specify_num_gpus")
  43. if address is not None and address.startswith("anyscale://"):
  44. pass
  45. else:
  46. address = "http://127.0.0.1:8265"
  47. client = JobSubmissionClient(address)
  48. # This test script runs on the head node, which should not have a GPU.
  49. assert not torch.cuda.is_available()
  50. for num_gpus in [0, 0.1]:
  51. job_id = client.submit_job(
  52. entrypoint="python jobs_check_cuda_available.py",
  53. runtime_env={"working_dir": args.working_dir},
  54. entrypoint_num_gpus=num_gpus,
  55. )
  56. timeout_s = 10 * 60
  57. status = wait_until_finish(client=client, job_id=job_id, timeout_s=timeout_s)
  58. job_info = client.get_job_info(job_id)
  59. print("Status message: ", job_info.message)
  60. if num_gpus == 0:
  61. # We didn't specify any GPUs, so the driver should run on the head node.
  62. # The head node should not have a GPU, so the job should fail.
  63. assert status == JobStatus.FAILED
  64. assert "CUDA is not available in the driver script" in job_info.message
  65. else:
  66. # We specified a GPU, so the driver should run on the worker node
  67. # with a GPU, so the job should succeed.
  68. assert status == JobStatus.SUCCEEDED
  69. taken = time.time() - start
  70. result = {
  71. "time_taken": taken,
  72. }
  73. test_output_json = os.environ.get(
  74. "TEST_OUTPUT_JSON", "/tmp/jobs_specify_num_gpus.json"
  75. )
  76. with open(test_output_json, "wt") as f:
  77. json.dump(result, f)
  78. print("PASSED")