test_submit_cpp_job.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. import os
  2. import shutil
  3. import sys
  4. import tempfile
  5. import pytest
  6. from ray._private.test_utils import (
  7. format_web_url,
  8. wait_for_condition,
  9. wait_until_server_available,
  10. )
  11. from ray.job_submission import JobStatus, JobSubmissionClient
  12. from ray.tests.conftest import _ray_start
  13. @pytest.fixture(scope="module")
  14. def headers():
  15. return {"Connection": "keep-alive", "Authorization": "TOK:<MY_TOKEN>"}
  16. @pytest.fixture(scope="module")
  17. def job_sdk_client(headers):
  18. with _ray_start(
  19. include_dashboard=True, num_cpus=1, _node_ip_address="0.0.0.0"
  20. ) as ctx:
  21. address = ctx.address_info["webui_url"]
  22. assert wait_until_server_available(address)
  23. yield JobSubmissionClient(format_web_url(address), headers=headers)
  24. def _check_job_succeeded(client: JobSubmissionClient, job_id: str) -> bool:
  25. status = client.get_job_status(job_id)
  26. if status == JobStatus.FAILED:
  27. logs = client.get_job_logs(job_id)
  28. raise RuntimeError(f"Job failed\nlogs:\n{logs}")
  29. return status == JobStatus.SUCCEEDED
  30. def test_submit_simple_cpp_job(job_sdk_client):
  31. client = job_sdk_client
  32. simple_job_so_path = os.environ["SIMPLE_DRIVER_SO_PATH"]
  33. simple_job_so_filename = os.path.basename(simple_job_so_path)
  34. simple_job_main_path = os.environ["SIMPLE_DRIVER_MAIN_PATH"]
  35. simple_job_main_filename = os.path.basename(simple_job_main_path)
  36. with tempfile.TemporaryDirectory() as tmp_dir:
  37. working_dir = os.path.join(tmp_dir, "cpp_worker")
  38. os.makedirs(working_dir)
  39. shutil.copy2(
  40. simple_job_so_path, os.path.join(working_dir, simple_job_so_filename)
  41. )
  42. shutil.copy2(
  43. simple_job_main_path,
  44. os.path.join(working_dir, simple_job_main_filename),
  45. )
  46. shutil.copymode(
  47. simple_job_main_path,
  48. os.path.join(working_dir, simple_job_main_filename),
  49. )
  50. entrypoint = (
  51. f"chmod +x {simple_job_main_filename} && ./{simple_job_main_filename}"
  52. )
  53. runtime_env = dict(
  54. working_dir=working_dir,
  55. env_vars={"TEST_KEY": "TEST_VALUE"},
  56. )
  57. job_id = client.submit_job(
  58. entrypoint=entrypoint,
  59. runtime_env=runtime_env,
  60. )
  61. wait_for_condition(
  62. _check_job_succeeded, client=client, job_id=job_id, timeout=120
  63. )
  64. logs = client.get_job_logs(job_id)
  65. print(f"================== logs ================== \n {logs}")
  66. assert "try to get TEST_KEY: TEST_VALUE" in logs
  67. if __name__ == "__main__":
  68. sys.exit(pytest.main(["-v", __file__]))