key-concepts.rst 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. Key Concepts
  2. ------------
  3. .. note::
  4. Workflows is a library that provides strong durability for Ray task graphs.
  5. If you’re brand new to Ray, we recommend starting with the :ref:`core walkthrough <core-walkthrough>` instead.
  6. DAG API
  7. ~~~~~~~
  8. Normally, Ray tasks are executed eagerly.
  9. In order to provide durability, Ray Workflows uses the lazy :ref:`Ray DAG API <ray-dag-guide>`
  10. to separate the definition and execution of task DAGs.
  11. Switching from Ray tasks to the DAG API is simple: just replace all calls to ``.remote(...)``
  12. (which return object references), to calls to ``.bind(...)`` (which return DAG nodes).
  13. Ray DAG nodes can otherwise be composed like normal Ray tasks.
  14. However, unlike Ray tasks, you are not allowed to call ``ray.get()`` or ``ray.wait()`` on
  15. DAG nodes. Instead, the DAG needs to be *executed* in order to compute a result.
  16. .. code-block:: python
  17. :caption: Composing functions together into a DAG:
  18. import ray
  19. @ray.remote
  20. def one() -> int:
  21. return 1
  22. @ray.remote
  23. def add(a: int, b: int) -> int:
  24. return a + b
  25. dag = add.bind(100, one.bind())
  26. Workflow Execution
  27. ~~~~~~~~~~~~~~~~~~
  28. To execute a DAG with workflows, use `workflow.run`:
  29. .. code-block:: python
  30. :caption: Executing a DAG with Ray Workflows.
  31. from ray import workflow
  32. # Run the workflow until it completes and returns the output
  33. assert workflow.run(dag) == 101
  34. # Or you can run it asynchronously and fetch the output via 'ray.get'
  35. output_ref = workflow.run_async(dag)
  36. assert ray.get(output_ref) == 101
  37. Once started, a workflow's execution is durably logged to storage. On system
  38. failure, the workflow can be resumed on any Ray cluster with access to the
  39. storage.
  40. When executing the workflow DAG, workflow tasks are retried on failure, but once
  41. they finish successfully and the results are persisted by the workflow engine,
  42. they will never be run again.
  43. .. code-block:: python
  44. :caption: Getting the result of a workflow.
  45. # configure the storage with "ray.init" or "ray start --head --storage=<STORAGE_URI>"
  46. # A default temporary storage is used by by the workflow if starting without
  47. # Ray init.
  48. ray.init(storage="/tmp/data")
  49. assert output.run(workflow_id="run_1") == 101
  50. assert workflow.get_status("run_1") == workflow.WorkflowStatus.SUCCESSFUL
  51. assert workflow.get_output("run_1") == 101
  52. # workflow.get_output_async returns an ObjectRef.
  53. assert ray.get(workflow.get_output_async("run_1")) == 101
  54. Objects
  55. ~~~~~~~
  56. Workflows integrates seamlessly with Ray objects, by allowing Ray object
  57. references to be passed into and returned from tasks. Objects are checkpointed
  58. when initially returned from a task. After checkpointing, the object can be
  59. shared among any number of workflow tasks at memory-speed via the Ray object
  60. store.
  61. .. code-block:: python
  62. :caption: Using Ray objects in a workflow:
  63. import ray
  64. from typing import List
  65. @ray.remote
  66. def hello():
  67. return "hello"
  68. @ray.remote
  69. def words() -> List[ray.ObjectRef]:
  70. # NOTE: Here it is ".remote()" instead of ".bind()", so
  71. # it creates an ObjectRef instead of a DAG.
  72. return [hello.remote(), ray.put("world")]
  73. @ray.remote
  74. def concat(words: List[ray.ObjectRef]) -> str:
  75. return " ".join([ray.get(w) for w in words])
  76. assert workflow.run(concat.bind(words.bind())) == "hello world"
  77. Dynamic Workflows
  78. ~~~~~~~~~~~~~~~~~
  79. Workflows can generate new tasks at runtime. This is achieved by returning a
  80. continuation of a DAG. A continuation is something returned by a function and
  81. executed after it returns. The continuation feature enables nesting, looping,
  82. and recursion within workflows:
  83. .. code-block:: python
  84. :caption: The Fibonacci recursive workflow:
  85. @ray.remote
  86. def add(a: int, b: int) -> int:
  87. return a + b
  88. @ray.remote
  89. def fib(n: int) -> int:
  90. if n <= 1:
  91. return n
  92. # return a continuation of a DAG
  93. return workflow.continuation(add.bind(fib.bind(n - 1), fib.bind(n - 2)))
  94. assert workflow.run(fib.bind(10)) == 55
  95. Events
  96. ~~~~~~
  97. Events are external signals sent to the workflow. Workflows can be efficiently
  98. triggered by timers or external events using the event system.
  99. .. code-block:: python
  100. :caption: Using events.
  101. # Sleep is a special type of event.
  102. sleep_task = workflow.sleep(100)
  103. # `wait_for_events` allows for pluggable event listeners.
  104. event_task = workflow.wait_for_event(MyEventListener)
  105. @ray.remote
  106. def gather(*args):
  107. return args
  108. # If a task's arguments include events, the task won't be executed until all
  109. # of the events have occurred.
  110. workflow.run(gather.bind(sleep_task, event_task, "hello world"))