common.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. from dataclasses import dataclass
  2. from enum import Enum
  3. from typing import Any, Dict, Optional, Tuple, Union
  4. import pickle
  5. from ray import ray_constants
  6. from ray.experimental.internal_kv import (
  7. _internal_kv_initialized,
  8. _internal_kv_get,
  9. _internal_kv_put,
  10. )
  11. from ray._private.runtime_env.packaging import parse_uri
  12. # NOTE(edoakes): these constants should be considered a public API because
  13. # they're exposed in the snapshot API.
  14. JOB_ID_METADATA_KEY = "job_submission_id"
  15. JOB_NAME_METADATA_KEY = "job_name"
  16. # Version 0 -> 1: Added log streaming and changed behavior of job logs cli.
  17. CURRENT_VERSION = "1"
  18. class JobStatus(str, Enum):
  19. def __str__(self):
  20. return f"{self.value}"
  21. PENDING = "PENDING"
  22. RUNNING = "RUNNING"
  23. STOPPED = "STOPPED"
  24. SUCCEEDED = "SUCCEEDED"
  25. FAILED = "FAILED"
  26. @dataclass
  27. class JobStatusInfo:
  28. status: JobStatus
  29. message: Optional[str] = None
  30. def __post_init__(self):
  31. if self.message is None:
  32. if self.status == JobStatus.PENDING:
  33. self.message = (
  34. "Job has not started yet, likely waiting "
  35. "for the runtime_env to be set up."
  36. )
  37. elif self.status == JobStatus.RUNNING:
  38. self.message = "Job is currently running."
  39. elif self.status == JobStatus.STOPPED:
  40. self.message = "Job was intentionally stopped."
  41. elif self.status == JobStatus.SUCCEEDED:
  42. self.message = "Job finished successfully."
  43. elif self.status == JobStatus.FAILED:
  44. self.message = "Job failed."
  45. class JobStatusStorageClient:
  46. """
  47. Handles formatting of status storage key given job id.
  48. """
  49. JOB_STATUS_KEY = "_ray_internal_job_status_{job_id}"
  50. def __init__(self):
  51. assert _internal_kv_initialized()
  52. def put_status(self, job_id: str, status: Union[JobStatus, JobStatusInfo]):
  53. if isinstance(status, JobStatus):
  54. status = JobStatusInfo(status=status)
  55. elif not isinstance(status, JobStatusInfo):
  56. assert False, "status must be JobStatus or JobStatusInfo."
  57. _internal_kv_put(
  58. self.JOB_STATUS_KEY.format(job_id=job_id),
  59. pickle.dumps(status),
  60. namespace=ray_constants.KV_NAMESPACE_JOB,
  61. )
  62. def get_status(self, job_id: str) -> Optional[JobStatusInfo]:
  63. pickled_status = _internal_kv_get(
  64. self.JOB_STATUS_KEY.format(job_id=job_id),
  65. namespace=ray_constants.KV_NAMESPACE_JOB,
  66. )
  67. if pickled_status is None:
  68. return None
  69. else:
  70. return pickle.loads(pickled_status)
  71. def uri_to_http_components(package_uri: str) -> Tuple[str, str]:
  72. if not package_uri.endswith(".zip"):
  73. raise ValueError(f"package_uri ({package_uri}) does not end in .zip")
  74. # We need to strip the gcs:// prefix and .zip suffix to make it
  75. # possible to pass the package_uri over HTTP.
  76. protocol, package_name = parse_uri(package_uri)
  77. return protocol.value, package_name[: -len(".zip")]
  78. def http_uri_components_to_uri(protocol: str, package_name: str) -> str:
  79. if package_name.endswith(".zip"):
  80. raise ValueError(f"package_name ({package_name}) should not end in .zip")
  81. return f"{protocol}://{package_name}.zip"
  82. def validate_request_type(json_data: Dict[str, Any], request_type: dataclass) -> Any:
  83. return request_type(**json_data)
  84. @dataclass
  85. class VersionResponse:
  86. version: str
  87. ray_version: str
  88. ray_commit: str
  89. @dataclass
  90. class JobSubmitRequest:
  91. # Command to start execution, ex: "python script.py"
  92. entrypoint: str
  93. # Optional job_id to specify for the job. If the job_id is not specified,
  94. # one will be generated. If a job with the same job_id already exists, it
  95. # will be rejected.
  96. job_id: Optional[str] = None
  97. # Dict to setup execution environment.
  98. runtime_env: Optional[Dict[str, Any]] = None
  99. # Metadata to pass in to the JobConfig.
  100. metadata: Optional[Dict[str, str]] = None
  101. def __post_init__(self):
  102. if not isinstance(self.entrypoint, str):
  103. raise TypeError(f"entrypoint must be a string, got {type(self.entrypoint)}")
  104. if self.job_id is not None and not isinstance(self.job_id, str):
  105. raise TypeError(
  106. f"job_id must be a string if provided, got {type(self.job_id)}"
  107. )
  108. if self.runtime_env is not None:
  109. if not isinstance(self.runtime_env, dict):
  110. raise TypeError(
  111. f"runtime_env must be a dict, got {type(self.runtime_env)}"
  112. )
  113. else:
  114. for k in self.runtime_env.keys():
  115. if not isinstance(k, str):
  116. raise TypeError(
  117. f"runtime_env keys must be strings, got {type(k)}"
  118. )
  119. if self.metadata is not None:
  120. if not isinstance(self.metadata, dict):
  121. raise TypeError(f"metadata must be a dict, got {type(self.metadata)}")
  122. else:
  123. for k in self.metadata.keys():
  124. if not isinstance(k, str):
  125. raise TypeError(f"metadata keys must be strings, got {type(k)}")
  126. for v in self.metadata.values():
  127. if not isinstance(v, str):
  128. raise TypeError(
  129. f"metadata values must be strings, got {type(v)}"
  130. )
  131. @dataclass
  132. class JobSubmitResponse:
  133. job_id: str
  134. @dataclass
  135. class JobStopResponse:
  136. stopped: bool
  137. @dataclass
  138. class JobStatusResponse:
  139. status: JobStatus
  140. message: Optional[str]
  141. # TODO(jiaodong): Support log streaming #19415
  142. @dataclass
  143. class JobLogsResponse:
  144. logs: str