basics.rst 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. Getting Started
  2. ===============
  3. .. note::
  4. Workflows is a library that provides strong durability for Ray task graphs.
  5. If you’re brand new to Ray, we recommend starting with the :ref:`core walkthrough <core-walkthrough>` instead.
  6. Your first workflow
  7. -------------------
  8. Let's start by defining a simple workflow DAG, which we'll use for the below example.
  9. Here is a single three-node DAG (note the use of ``.bind(...)`` instead of
  10. ``.remote(...)``). The DAG will not be executed until further actions are
  11. taken on it:
  12. .. code-block:: python
  13. from typing import List
  14. import ray
  15. # Define Ray remote functions.
  16. @ray.remote
  17. def read_data(num: int):
  18. return [i for i in range(num)]
  19. @ray.remote
  20. def preprocessing(data: List[float]) -> List[float]:
  21. return [d**2 for d in data]
  22. @ray.remote
  23. def aggregate(data: List[float]) -> float:
  24. return sum(data)
  25. # Build the DAG:
  26. # data -> preprocessed_data -> aggregate
  27. data = read_data.bind(10)
  28. preprocessed_data = preprocessing.bind(data)
  29. output = aggregate.bind(preprocessed_data)
  30. We can plot this DAG by using ``ray.dag.vis_utils.plot(output, "output.jpg")``:
  31. .. image:: basic.png
  32. :width: 500px
  33. :align: center
  34. Next, let's execute the DAG we defined and inspect the result:
  35. .. code-block:: python
  36. # <follow the previous code>
  37. from ray import workflow
  38. # Execute the workflow and print the result.
  39. print(workflow.run(output))
  40. # You can also run the workflow asynchronously and fetch the output via
  41. # 'ray.get'
  42. output_ref = workflow.run_async(output)
  43. print(ray.get(output_ref))
  44. Each node in the original DAG becomes a workflow task. You can think of workflow
  45. tasks as wrappers around Ray tasks that insert *checkpointing logic* to
  46. ensure intermediate results are durably persisted. This enables workflow DAGs to
  47. always resume from the last successful task on failure.
  48. Setting workflow options
  49. ------------------------
  50. You can directly set Ray options to a workflow task just like a normal
  51. Ray remote function. To set workflow-specific options, use ``workflow.options``
  52. either as a decorator or as kwargs to ``<task>.options``:
  53. .. code-block:: python
  54. import ray
  55. from ray import workflow
  56. @workflow.options(checkpoint=True)
  57. @ray.remote(num_cpus=2, num_gpus=3, max_retries=5)
  58. def read_data(num: int):
  59. return [i for i in range(num)]
  60. read_data_with_options = read_data.options(
  61. num_cpus=1, num_gpus=1, **workflow.options(checkpoint=True))
  62. Retrieving Workflow Results
  63. ---------------------------
  64. To retrieve a workflow result, assign ``workflow_id`` when running a workflow:
  65. .. code-block:: python
  66. import ray
  67. from ray import workflow
  68. try:
  69. # Cleanup previous workflows
  70. # An exception will be raised if it doesn't exist.
  71. workflow.delete("add_example")
  72. except workflow.WorkflowNotFoundError:
  73. pass
  74. @ray.remote
  75. def add(left: int, right: int) -> int:
  76. return left + right
  77. @ray.remote
  78. def get_val() -> int:
  79. return 10
  80. ret = add.bind(get_val.bind(), 20)
  81. assert workflow.run(ret, workflow_id="add_example") == 30
  82. The workflow results can be retrieved with
  83. ``workflow.get_output(workflow_id)``. If a workflow is not given a
  84. ``workflow_id``, a random string is set as the ``workflow_id``. To list all
  85. workflow ids, call ``ray.workflow.list_all()``.
  86. .. code-block:: python
  87. assert workflow.get_output("add_example") == 30
  88. # "workflow.get_output_async" is an asynchronous version
  89. Sub-Task Results
  90. ~~~~~~~~~~~~~~~~
  91. We can retrieve the results for individual workflow tasks too with *task id*. Task ID can be given with ``task_id``:
  92. 1) via ``.options(**workflow.options(task_id="task_name"))``
  93. 2) via decorator ``@workflow.options(task_id="task_name")``
  94. If tasks are not given ``task_id``, the function name of the steps is set as the ``task_id``.
  95. If there are multiple tasks with the same id, a suffix with a counter ``_n`` will be added.
  96. Once a task id is given, the result of the task will be retrievable via ``workflow.get_output(workflow_id, task_id="task_id")``.
  97. If the task with the given ``task_id`` hasn't been executed before the workflow completes, an exception will be thrown. Here are some examples:
  98. .. code-block:: python
  99. import ray
  100. from ray import workflow
  101. workflow_id = "double"
  102. try:
  103. # cleanup previous workflows
  104. workflow.delete(workflow_id)
  105. except workflow.WorkflowNotFoundError:
  106. pass
  107. @ray.remote
  108. def double(v):
  109. return 2 * v
  110. inner_task = double.options(**workflow.options(task_id="inner")).bind(1)
  111. outer_task = double.options(**workflow.options(task_id="outer")).bind(inner_task)
  112. result_ref = workflow.run_async(outer_task, workflow_id="double")
  113. inner = workflow.get_output_async(workflow_id, task_id="inner")
  114. outer = workflow.get_output_async(workflow_id, task_id="outer")
  115. assert ray.get(inner) == 2
  116. assert ray.get(outer) == 4
  117. assert ray.get(result_ref) == 4
  118. Error handling
  119. --------------
  120. Workflow provides two ways to handle application-level exceptions: (1) automatic retry (as in normal Ray tasks), and (2) the ability to catch and handle exceptions.
  121. - If ``max_retries`` is given, the task will be retried for the given number of times if the workflow task failed.
  122. - If ``retry_exceptions`` is True, then the workflow task retries both task crashes and application-level errors;
  123. if it is ``False``, then the workflow task only retries task crashes.
  124. - If ``catch_exceptions`` is True, the return value of the function will be converted to ``Tuple[Optional[T], Optional[Exception]]``.
  125. It can be combined with ``max_retries`` to retry a given number of times before returning the result tuple.
  126. ``max_retries`` and ``retry_exceptions`` are also Ray task options,
  127. so they should be used inside the Ray remote decorator. Here is how you could use them:
  128. .. code-block:: python
  129. # specify in decorator
  130. @workflow.options(catch_exceptions=True)
  131. @ray.remote(max_retries=5, retry_exceptions=True)
  132. def faulty_function():
  133. pass
  134. # specify in .options()
  135. faulty_function.options(max_retries=3, retry_exceptions=False,
  136. **workflow.options(catch_exceptions=False))
  137. .. note:: By default ``retry_exceptions`` is ``False``, and ``max_retries`` is ``3``.
  138. Here is one example:
  139. .. code-block:: python
  140. from typing import Tuple
  141. import random
  142. import ray
  143. from ray import workflow
  144. @ray.remote
  145. def faulty_function() -> str:
  146. if random.random() > 0.5:
  147. raise RuntimeError("oops")
  148. return "OK"
  149. # Tries up to five times before giving up.
  150. r1 = faulty_function.options(max_retries=5).bind()
  151. workflow.run(r1)
  152. @ray.remote
  153. def handle_errors(result: Tuple[str, Exception]):
  154. # The exception field will be None on success.
  155. err = result[1]
  156. if err:
  157. return "There was an error: {}".format(err)
  158. else:
  159. return "OK"
  160. # `handle_errors` receives a tuple of (result, exception).
  161. r2 = faulty_function.options(**workflow.options(catch_exceptions=True)).bind()
  162. workflow.run(handle_errors.bind(r2))
  163. Durability guarantees
  164. ---------------------
  165. Workflow tasks provide *exactly-once* execution semantics. What this means is
  166. that **once the result of a workflow task is logged to durable storage, Ray
  167. guarantees the task will never be re-executed**. A task that receives the output
  168. of another workflow task can be assured that its inputs tasks will never be
  169. re-executed.
  170. Failure model
  171. ~~~~~~~~~~~~~
  172. - If the cluster fails, any workflows running on the cluster enter ``RESUMABLE`` state. The workflows can be resumed on another cluster (see the management API section).
  173. - The lifetime of the workflow is not coupled with the driver. If the driver exits, the workflow will continue running in the background of the cluster.
  174. Note that tasks that have side effects still need to be idempotent. This is because the task could always fail before its result is logged.
  175. .. code-block:: python
  176. :caption: Non-idempotent workflow:
  177. @ray.remote
  178. def book_flight_unsafe() -> FlightTicket:
  179. ticket = service.book_flight()
  180. # Uh oh, what if we failed here?
  181. return ticket
  182. # UNSAFE: we could book multiple flight tickets
  183. workflow.run(book_flight_unsafe.bind())
  184. .. code-block:: python
  185. :caption: Idempotent workflow:
  186. @ray.remote
  187. def generate_id() -> str:
  188. # Generate a unique idempotency token.
  189. return uuid.uuid4().hex
  190. @ray.remote
  191. def book_flight_idempotent(request_id: str) -> FlightTicket:
  192. if service.has_ticket(request_id):
  193. # Retrieve the previously created ticket.
  194. return service.get_ticket(request_id)
  195. return service.book_flight(request_id)
  196. # SAFE: book_flight is written to be idempotent
  197. request_id = generate_id.bind()
  198. workflow.run(book_flight_idempotent.bind(request_id))
  199. Dynamic workflows
  200. -----------------
  201. Workflow tasks can be dynamically created in the runtime. In theory, Ray DAG is
  202. static which means a DAG node can't be returned in a DAG node. For example, the
  203. following code is invalid:
  204. .. code-block:: python
  205. @ray.remote
  206. def bar(): ...
  207. @ray.remote
  208. def foo():
  209. return bar.bind() # This is invalid since Ray DAG is static
  210. ray.get(foo.bind().execute()) # This will error
  211. Workflow introduces a utility function called ``workflow.continuation`` which
  212. makes Ray DAG node can return a DAG in the runtime:
  213. .. code-block:: python
  214. @ray.remote
  215. def bar():
  216. return 10
  217. @ray.remote
  218. def foo():
  219. # This will return a DAG to be executed
  220. # after this function is finished.
  221. return workflow.continuation(bar.bind())
  222. assert ray.get(foo.bind().execute()) == 10
  223. assert workflow.run(foo.bind()) == 10
  224. The dynamic workflow enables nesting, looping, and recursion within workflows.
  225. The following example shows how to implement the recursive ``factorial`` program
  226. using dynamically workflow:
  227. .. code-block:: python
  228. @ray.remote
  229. def factorial(n: int) -> int:
  230. if n == 1:
  231. return 1
  232. else:
  233. # Here a DAG is passed to the continuation.
  234. # The DAG will continue to be executed after this task.
  235. return workflow.continuation(multiply.bind(n, factorial.bind(n - 1)))
  236. @ray.remote
  237. def multiply(a: int, b: int) -> int:
  238. return a * b
  239. assert workflow.run(factorial.bind(10)) == 3628800
  240. # You can also execute the code with Ray DAG engine.
  241. assert ray.get(factorial.bind(10).execute()) == 3628800
  242. The key behavior to note is that when a task returns a DAG wrapped by
  243. ``workflow.continuation`` instead of a concrete value, that wrapped DAG will be
  244. substituted for the task's return.
  245. To better understand dynamic workflows, let's look at a more realistic example of booking a trip:
  246. .. code-block:: python
  247. @ray.remote
  248. def book_flight(...) -> Flight: ...
  249. @ray.remote
  250. def book_hotel(...) -> Hotel: ...
  251. @ray.remote
  252. def finalize_or_cancel(
  253. flights: List[Flight],
  254. hotels: List[Hotel]) -> Receipt: ...
  255. @ray.remote
  256. def book_trip(origin: str, dest: str, dates) -> Receipt:
  257. # Note that the workflow engine will not begin executing
  258. # child workflows until the parent task returns.
  259. # This avoids task overlap and ensures recoverability.
  260. f1 = book_flight.bind(origin, dest, dates[0])
  261. f2 = book_flight.bind(dest, origin, dates[1])
  262. hotel = book_hotel.bind(dest, dates)
  263. return workflow.continuation(finalize_or_cancel.bind([f1, f2], [hotel]))
  264. receipt: Receipt = workflow.run(book_trip.bind("OAK", "SAN", ["6/12", "7/5"]))
  265. Here the workflow initially just consists of the ``book_trip`` task. Once
  266. executed, ``book_trip`` generates tasks to book flights and hotels in parallel,
  267. which feeds into a task to decide whether to cancel the trip or finalize it. The
  268. DAG can be visualized as follows (note the dynamically generated nested
  269. workflows within ``book_trip``):
  270. .. image:: trip.png
  271. :width: 500px
  272. :align: center
  273. The execution order here will be:
  274. 1. Run the ``book_trip`` task.
  275. 2. Run the two ``book_flight`` tasks and the ``book_hotel`` task in parallel.
  276. 3. Once all three booking tasks finish, ``finalize_or_cancel`` will be executed and its return will be the output of the workflow.
  277. Ray Integration
  278. ---------------
  279. Mixing workflow tasks with Ray tasks and actors
  280. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  281. Workflows are compatible with Ray tasks and actors. There are two methods of using them together:
  282. 1. Workflows can be launched from within a Ray task or actor. For example, you can launch a long-running workflow from Ray serve in response to a user request. This is no different from launching a workflow from the driver program.
  283. 2. Workflow tasks can use Ray tasks or actors within a single task. For example, a task could use Ray Train internally to train a model. No durability guarantees apply to the tasks or actors used within the task; if the task fails, it will be re-executed from scratch.
  284. Passing nested arguments
  285. ~~~~~~~~~~~~~~~~~~~~~~~~
  286. Like Ray tasks, when you pass a list of task outputs to a task, the values are
  287. not resolved. But we ensure that all ancestors of a task are fully executed
  288. before the task starts which is different from passing them into a Ray remote
  289. function whether they have been executed or not is not defined.
  290. .. code-block:: python
  291. @ray.remote
  292. def add(values: List[ray.ObjectRef[int]]) -> int:
  293. # although those values are not resolved, they have been
  294. # *fully executed and checkpointed*. This guarantees exactly-once
  295. # execution semantics.
  296. return sum(ray.get(values))
  297. @ray.remote
  298. def get_val() -> int:
  299. return 10
  300. ret = add.bind([get_val.bind() for _ in range(3)])
  301. assert workflow.run(ret) == 30
  302. Passing object references between tasks
  303. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  304. Ray object references and data structures composed of them (e.g.,
  305. ``ray.Dataset``) can be passed into and returned from workflow tasks. To ensure
  306. recoverability, their contents will be logged to durable storage before
  307. executing. However, an object will not be checkpointed more than once, even if
  308. it is passed to many different tasks.
  309. .. code-block:: python
  310. @ray.remote
  311. def do_add(a, b):
  312. return a + b
  313. @ray.remote
  314. def add(a, b):
  315. return do_add.remote(a, b)
  316. workflow.run(add.bind(ray.put(10), ray.put(20))) == 30
  317. Ray actor handles are not allowed to be passed between tasks.
  318. Setting custom resources for tasks
  319. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  320. You can assign resources (e.g., CPUs, GPUs to tasks via the same ``num_cpus``, ``num_gpus``, and ``resources`` arguments that Ray tasks take):
  321. .. code-block:: python
  322. @ray.remote(num_gpus=1)
  323. def train_model() -> Model:
  324. pass # This task is assigned to a GPU by Ray.
  325. workflow.run(train_model.bind())