123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- import aiohttp.web
- from aiohttp.web import Request, Response
- import dataclasses
- from functools import wraps
- import logging
- from typing import Any, Callable
- import json
- import traceback
- from dataclasses import dataclass
- import ray
- import ray.dashboard.utils as dashboard_utils
- import ray.dashboard.optional_utils as dashboard_optional_utils
- from ray._private.gcs_utils import use_gcs_for_bootstrap
- from ray._private.runtime_env.packaging import package_exists, upload_package_to_gcs
- from ray.dashboard.modules.job.common import (
- CURRENT_VERSION,
- http_uri_components_to_uri,
- JobStatusInfo,
- JobSubmitRequest,
- JobSubmitResponse,
- JobStopResponse,
- JobStatusResponse,
- JobLogsResponse,
- VersionResponse,
- validate_request_type,
- )
- from ray.dashboard.modules.job.job_manager import JobManager
- logger = logging.getLogger(__name__)
- logger.setLevel(logging.INFO)
- routes = dashboard_optional_utils.ClassMethodRouteTable
- RAY_INTERNAL_JOBS_NAMESPACE = "_ray_internal_jobs"
- def _init_ray_and_catch_exceptions(f: Callable) -> Callable:
- @wraps(f)
- async def check(self, *args, **kwargs):
- try:
- if not ray.is_initialized():
- try:
- if use_gcs_for_bootstrap():
- address = self._dashboard_head.gcs_address
- redis_pw = None
- logger.info(f"Connecting to ray with address={address}")
- else:
- ip, port = self._dashboard_head.redis_address
- redis_pw = self._dashboard_head.redis_password
- address = f"{ip}:{port}"
- logger.info(
- f"Connecting to ray with address={address}, "
- f"redis_pw={redis_pw}"
- )
- ray.init(
- address=address,
- namespace=RAY_INTERNAL_JOBS_NAMESPACE,
- _redis_password=redis_pw,
- )
- except Exception as e:
- ray.shutdown()
- raise e from None
- return await f(self, *args, **kwargs)
- except Exception as e:
- logger.exception(f"Unexpected error in handler: {e}")
- return Response(
- text=traceback.format_exc(),
- status=aiohttp.web.HTTPInternalServerError.status_code,
- )
- return check
- class JobHead(dashboard_utils.DashboardHeadModule):
- def __init__(self, dashboard_head):
- super().__init__(dashboard_head)
- self._job_manager = None
- async def _parse_and_validate_request(
- self, req: Request, request_type: dataclass
- ) -> Any:
- """Parse request and cast to request type. If parsing failed, return a
- Response object with status 400 and stacktrace instead.
- """
- try:
- return validate_request_type(await req.json(), request_type)
- except Exception as e:
- logger.info(f"Got invalid request type: {e}")
- return Response(
- text=traceback.format_exc(),
- status=aiohttp.web.HTTPBadRequest.status_code,
- )
- def job_exists(self, job_id: str) -> bool:
- status = self._job_manager.get_job_status(job_id)
- return status is not None
- @routes.get("/api/version")
- async def get_version(self, req: Request) -> Response:
- # NOTE(edoakes): CURRENT_VERSION should be bumped and checked on the
- # client when we have backwards-incompatible changes.
- resp = VersionResponse(
- version=CURRENT_VERSION,
- ray_version=ray.__version__,
- ray_commit=ray.__commit__,
- )
- return Response(
- text=json.dumps(dataclasses.asdict(resp)),
- content_type="application/json",
- status=aiohttp.web.HTTPOk.status_code,
- )
- @routes.get("/api/packages/{protocol}/{package_name}")
- @_init_ray_and_catch_exceptions
- async def get_package(self, req: Request) -> Response:
- package_uri = http_uri_components_to_uri(
- protocol=req.match_info["protocol"],
- package_name=req.match_info["package_name"],
- )
- if not package_exists(package_uri):
- return Response(
- text=f"Package {package_uri} does not exist",
- status=aiohttp.web.HTTPNotFound.status_code,
- )
- return Response()
- @routes.put("/api/packages/{protocol}/{package_name}")
- @_init_ray_and_catch_exceptions
- async def upload_package(self, req: Request):
- package_uri = http_uri_components_to_uri(
- protocol=req.match_info["protocol"],
- package_name=req.match_info["package_name"],
- )
- logger.info(f"Uploading package {package_uri} to the GCS.")
- try:
- upload_package_to_gcs(package_uri, await req.read())
- except Exception:
- return Response(
- text=traceback.format_exc(),
- status=aiohttp.web.HTTPInternalServerError.status_code,
- )
- return Response(status=aiohttp.web.HTTPOk.status_code)
- @routes.post("/api/jobs/")
- @_init_ray_and_catch_exceptions
- async def submit_job(self, req: Request) -> Response:
- result = await self._parse_and_validate_request(req, JobSubmitRequest)
- # Request parsing failed, returned with Response object.
- if isinstance(result, Response):
- return result
- else:
- submit_request = result
- try:
- job_id = self._job_manager.submit_job(
- entrypoint=submit_request.entrypoint,
- job_id=submit_request.job_id,
- runtime_env=submit_request.runtime_env,
- metadata=submit_request.metadata,
- )
- resp = JobSubmitResponse(job_id=job_id)
- except (TypeError, ValueError):
- return Response(
- text=traceback.format_exc(),
- status=aiohttp.web.HTTPBadRequest.status_code,
- )
- except Exception:
- return Response(
- text=traceback.format_exc(),
- status=aiohttp.web.HTTPInternalServerError.status_code,
- )
- return Response(
- text=json.dumps(dataclasses.asdict(resp)),
- content_type="application/json",
- status=aiohttp.web.HTTPOk.status_code,
- )
- @routes.post("/api/jobs/{job_id}/stop")
- @_init_ray_and_catch_exceptions
- async def stop_job(self, req: Request) -> Response:
- job_id = req.match_info["job_id"]
- if not self.job_exists(job_id):
- return Response(
- text=f"Job {job_id} does not exist",
- status=aiohttp.web.HTTPNotFound.status_code,
- )
- try:
- stopped = self._job_manager.stop_job(job_id)
- resp = JobStopResponse(stopped=stopped)
- except Exception:
- return Response(
- text=traceback.format_exc(),
- status=aiohttp.web.HTTPInternalServerError.status_code,
- )
- return Response(
- text=json.dumps(dataclasses.asdict(resp)), content_type="application/json"
- )
- @routes.get("/api/jobs/{job_id}")
- @_init_ray_and_catch_exceptions
- async def get_job_status(self, req: Request) -> Response:
- job_id = req.match_info["job_id"]
- if not self.job_exists(job_id):
- return Response(
- text=f"Job {job_id} does not exist",
- status=aiohttp.web.HTTPNotFound.status_code,
- )
- status: JobStatusInfo = self._job_manager.get_job_status(job_id)
- resp = JobStatusResponse(status=status.status, message=status.message)
- return Response(
- text=json.dumps(dataclasses.asdict(resp)), content_type="application/json"
- )
- @routes.get("/api/jobs/{job_id}/logs")
- @_init_ray_and_catch_exceptions
- async def get_job_logs(self, req: Request) -> Response:
- job_id = req.match_info["job_id"]
- if not self.job_exists(job_id):
- return Response(
- text=f"Job {job_id} does not exist",
- status=aiohttp.web.HTTPNotFound.status_code,
- )
- resp = JobLogsResponse(logs=self._job_manager.get_job_logs(job_id))
- return Response(
- text=json.dumps(dataclasses.asdict(resp)), content_type="application/json"
- )
- @routes.get("/api/jobs/{job_id}/logs/tail")
- @_init_ray_and_catch_exceptions
- async def tail_job_logs(self, req: Request) -> Response:
- job_id = req.match_info["job_id"]
- if not self.job_exists(job_id):
- return Response(
- text=f"Job {job_id} does not exist",
- status=aiohttp.web.HTTPNotFound.status_code,
- )
- ws = aiohttp.web.WebSocketResponse()
- await ws.prepare(req)
- async for lines in self._job_manager.tail_job_logs(job_id):
- await ws.send_str(lines)
- async def run(self, server):
- if not self._job_manager:
- self._job_manager = JobManager()
- @staticmethod
- def is_minimal_module():
- return False
|