123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- """Job submission test
- This test checks that when using the Ray Jobs API with num_gpus
- specified, the driver is run on a node that has a GPU.
- Test owner: architkulkarni
- Acceptance criteria: Should run through and print "PASSED"
- """
- import argparse
- import json
- import os
- import time
- import torch
- from typing import Optional
- from ray.dashboard.modules.job.common import JobStatus
- from ray.job_submission import JobSubmissionClient
- def wait_until_finish(
- client: JobSubmissionClient,
- job_id: str,
- timeout_s: int = 10 * 60,
- retry_interval_s: int = 1,
- ) -> Optional[JobStatus]:
- start_time_s = time.time()
- while time.time() - start_time_s <= timeout_s:
- status = client.get_job_status(job_id)
- print(f"status: {status}")
- if status in {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED}:
- return status
- time.sleep(retry_interval_s)
- return None
- if __name__ == "__main__":
- parser = argparse.ArgumentParser()
- parser.add_argument(
- "--smoke-test", action="store_true", help="Finish quickly for testing."
- )
- parser.add_argument(
- "--working-dir",
- required=True,
- help="working_dir to use for the job within this test.",
- )
- args = parser.parse_args()
- start = time.time()
- address = os.environ.get("RAY_ADDRESS")
- job_name = os.environ.get("RAY_JOB_NAME", "jobs_specify_num_gpus")
- if address is not None and address.startswith("anyscale://"):
- pass
- else:
- address = "http://127.0.0.1:8265"
- client = JobSubmissionClient(address)
- # This test script runs on the head node, which should not have a GPU.
- assert not torch.cuda.is_available()
- for num_gpus in [0, 0.1]:
- job_id = client.submit_job(
- entrypoint="python jobs_check_cuda_available.py",
- runtime_env={"working_dir": args.working_dir},
- entrypoint_num_gpus=num_gpus,
- )
- timeout_s = 10 * 60
- status = wait_until_finish(client=client, job_id=job_id, timeout_s=timeout_s)
- job_info = client.get_job_info(job_id)
- print("Status message: ", job_info.message)
- if num_gpus == 0:
- # We didn't specify any GPUs, so the driver should run on the head node.
- # The head node should not have a GPU, so the job should fail.
- assert status == JobStatus.FAILED
- assert "CUDA is not available in the driver script" in job_info.message
- else:
- # We specified a GPU, so the driver should run on the worker node
- # with a GPU, so the job should succeed.
- assert status == JobStatus.SUCCEEDED
- taken = time.time() - start
- result = {
- "time_taken": taken,
- }
- test_output_json = os.environ.get(
- "TEST_OUTPUT_JSON", "/tmp/jobs_specify_num_gpus.json"
- )
- with open(test_output_json, "wt") as f:
- json.dump(result, f)
- print("PASSED")
|