job_head.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. import aiohttp.web
  2. from aiohttp.web import Request, Response
  3. import dataclasses
  4. from functools import wraps
  5. import logging
  6. from typing import Any, Callable
  7. import json
  8. import traceback
  9. from dataclasses import dataclass
  10. import ray
  11. import ray.dashboard.utils as dashboard_utils
  12. import ray.dashboard.optional_utils as dashboard_optional_utils
  13. from ray._private.gcs_utils import use_gcs_for_bootstrap
  14. from ray._private.runtime_env.packaging import package_exists, upload_package_to_gcs
  15. from ray.dashboard.modules.job.common import (
  16. CURRENT_VERSION,
  17. http_uri_components_to_uri,
  18. JobStatusInfo,
  19. JobSubmitRequest,
  20. JobSubmitResponse,
  21. JobStopResponse,
  22. JobStatusResponse,
  23. JobLogsResponse,
  24. VersionResponse,
  25. validate_request_type,
  26. )
  27. from ray.dashboard.modules.job.job_manager import JobManager
  28. logger = logging.getLogger(__name__)
  29. logger.setLevel(logging.INFO)
  30. routes = dashboard_optional_utils.ClassMethodRouteTable
  31. RAY_INTERNAL_JOBS_NAMESPACE = "_ray_internal_jobs"
  32. def _init_ray_and_catch_exceptions(f: Callable) -> Callable:
  33. @wraps(f)
  34. async def check(self, *args, **kwargs):
  35. try:
  36. if not ray.is_initialized():
  37. try:
  38. if use_gcs_for_bootstrap():
  39. address = self._dashboard_head.gcs_address
  40. redis_pw = None
  41. logger.info(f"Connecting to ray with address={address}")
  42. else:
  43. ip, port = self._dashboard_head.redis_address
  44. redis_pw = self._dashboard_head.redis_password
  45. address = f"{ip}:{port}"
  46. logger.info(
  47. f"Connecting to ray with address={address}, "
  48. f"redis_pw={redis_pw}"
  49. )
  50. ray.init(
  51. address=address,
  52. namespace=RAY_INTERNAL_JOBS_NAMESPACE,
  53. _redis_password=redis_pw,
  54. )
  55. except Exception as e:
  56. ray.shutdown()
  57. raise e from None
  58. return await f(self, *args, **kwargs)
  59. except Exception as e:
  60. logger.exception(f"Unexpected error in handler: {e}")
  61. return Response(
  62. text=traceback.format_exc(),
  63. status=aiohttp.web.HTTPInternalServerError.status_code,
  64. )
  65. return check
  66. class JobHead(dashboard_utils.DashboardHeadModule):
  67. def __init__(self, dashboard_head):
  68. super().__init__(dashboard_head)
  69. self._job_manager = None
  70. async def _parse_and_validate_request(
  71. self, req: Request, request_type: dataclass
  72. ) -> Any:
  73. """Parse request and cast to request type. If parsing failed, return a
  74. Response object with status 400 and stacktrace instead.
  75. """
  76. try:
  77. return validate_request_type(await req.json(), request_type)
  78. except Exception as e:
  79. logger.info(f"Got invalid request type: {e}")
  80. return Response(
  81. text=traceback.format_exc(),
  82. status=aiohttp.web.HTTPBadRequest.status_code,
  83. )
  84. def job_exists(self, job_id: str) -> bool:
  85. status = self._job_manager.get_job_status(job_id)
  86. return status is not None
  87. @routes.get("/api/version")
  88. async def get_version(self, req: Request) -> Response:
  89. # NOTE(edoakes): CURRENT_VERSION should be bumped and checked on the
  90. # client when we have backwards-incompatible changes.
  91. resp = VersionResponse(
  92. version=CURRENT_VERSION,
  93. ray_version=ray.__version__,
  94. ray_commit=ray.__commit__,
  95. )
  96. return Response(
  97. text=json.dumps(dataclasses.asdict(resp)),
  98. content_type="application/json",
  99. status=aiohttp.web.HTTPOk.status_code,
  100. )
  101. @routes.get("/api/packages/{protocol}/{package_name}")
  102. @_init_ray_and_catch_exceptions
  103. async def get_package(self, req: Request) -> Response:
  104. package_uri = http_uri_components_to_uri(
  105. protocol=req.match_info["protocol"],
  106. package_name=req.match_info["package_name"],
  107. )
  108. if not package_exists(package_uri):
  109. return Response(
  110. text=f"Package {package_uri} does not exist",
  111. status=aiohttp.web.HTTPNotFound.status_code,
  112. )
  113. return Response()
  114. @routes.put("/api/packages/{protocol}/{package_name}")
  115. @_init_ray_and_catch_exceptions
  116. async def upload_package(self, req: Request):
  117. package_uri = http_uri_components_to_uri(
  118. protocol=req.match_info["protocol"],
  119. package_name=req.match_info["package_name"],
  120. )
  121. logger.info(f"Uploading package {package_uri} to the GCS.")
  122. try:
  123. upload_package_to_gcs(package_uri, await req.read())
  124. except Exception:
  125. return Response(
  126. text=traceback.format_exc(),
  127. status=aiohttp.web.HTTPInternalServerError.status_code,
  128. )
  129. return Response(status=aiohttp.web.HTTPOk.status_code)
  130. @routes.post("/api/jobs/")
  131. @_init_ray_and_catch_exceptions
  132. async def submit_job(self, req: Request) -> Response:
  133. result = await self._parse_and_validate_request(req, JobSubmitRequest)
  134. # Request parsing failed, returned with Response object.
  135. if isinstance(result, Response):
  136. return result
  137. else:
  138. submit_request = result
  139. try:
  140. job_id = self._job_manager.submit_job(
  141. entrypoint=submit_request.entrypoint,
  142. job_id=submit_request.job_id,
  143. runtime_env=submit_request.runtime_env,
  144. metadata=submit_request.metadata,
  145. )
  146. resp = JobSubmitResponse(job_id=job_id)
  147. except (TypeError, ValueError):
  148. return Response(
  149. text=traceback.format_exc(),
  150. status=aiohttp.web.HTTPBadRequest.status_code,
  151. )
  152. except Exception:
  153. return Response(
  154. text=traceback.format_exc(),
  155. status=aiohttp.web.HTTPInternalServerError.status_code,
  156. )
  157. return Response(
  158. text=json.dumps(dataclasses.asdict(resp)),
  159. content_type="application/json",
  160. status=aiohttp.web.HTTPOk.status_code,
  161. )
  162. @routes.post("/api/jobs/{job_id}/stop")
  163. @_init_ray_and_catch_exceptions
  164. async def stop_job(self, req: Request) -> Response:
  165. job_id = req.match_info["job_id"]
  166. if not self.job_exists(job_id):
  167. return Response(
  168. text=f"Job {job_id} does not exist",
  169. status=aiohttp.web.HTTPNotFound.status_code,
  170. )
  171. try:
  172. stopped = self._job_manager.stop_job(job_id)
  173. resp = JobStopResponse(stopped=stopped)
  174. except Exception:
  175. return Response(
  176. text=traceback.format_exc(),
  177. status=aiohttp.web.HTTPInternalServerError.status_code,
  178. )
  179. return Response(
  180. text=json.dumps(dataclasses.asdict(resp)), content_type="application/json"
  181. )
  182. @routes.get("/api/jobs/{job_id}")
  183. @_init_ray_and_catch_exceptions
  184. async def get_job_status(self, req: Request) -> Response:
  185. job_id = req.match_info["job_id"]
  186. if not self.job_exists(job_id):
  187. return Response(
  188. text=f"Job {job_id} does not exist",
  189. status=aiohttp.web.HTTPNotFound.status_code,
  190. )
  191. status: JobStatusInfo = self._job_manager.get_job_status(job_id)
  192. resp = JobStatusResponse(status=status.status, message=status.message)
  193. return Response(
  194. text=json.dumps(dataclasses.asdict(resp)), content_type="application/json"
  195. )
  196. @routes.get("/api/jobs/{job_id}/logs")
  197. @_init_ray_and_catch_exceptions
  198. async def get_job_logs(self, req: Request) -> Response:
  199. job_id = req.match_info["job_id"]
  200. if not self.job_exists(job_id):
  201. return Response(
  202. text=f"Job {job_id} does not exist",
  203. status=aiohttp.web.HTTPNotFound.status_code,
  204. )
  205. resp = JobLogsResponse(logs=self._job_manager.get_job_logs(job_id))
  206. return Response(
  207. text=json.dumps(dataclasses.asdict(resp)), content_type="application/json"
  208. )
  209. @routes.get("/api/jobs/{job_id}/logs/tail")
  210. @_init_ray_and_catch_exceptions
  211. async def tail_job_logs(self, req: Request) -> Response:
  212. job_id = req.match_info["job_id"]
  213. if not self.job_exists(job_id):
  214. return Response(
  215. text=f"Job {job_id} does not exist",
  216. status=aiohttp.web.HTTPNotFound.status_code,
  217. )
  218. ws = aiohttp.web.WebSocketResponse()
  219. await ws.prepare(req)
  220. async for lines in self._job_manager.tail_job_logs(job_id):
  221. await ws.send_str(lines)
  222. async def run(self, server):
  223. if not self._job_manager:
  224. self._job_manager = JobManager()
  225. @staticmethod
  226. def is_minimal_module():
  227. return False