metadata.rst 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. Workflow Metadata
  2. =================
  3. Observability is important for workflows - sometimes we not only want
  4. to get the output, but also want to gain insights on the internal
  5. states (e.g., to measure the performance or find bottlenecks).
  6. Workflow metadata provides several stats that help understand
  7. the workflow, from basic running status and task options to performance
  8. and user-imposed metadata.
  9. Retrieving metadata
  10. -------------------
  11. Workflow metadata can be retrieved with ``workflow.get_metadata(workflow_id)``.
  12. For example:
  13. .. code-block:: python
  14. @ray.remote
  15. def add(left: int, right: int) -> int:
  16. return left + right
  17. workflow.run(add.bind(10, 20), workflow_id="add_example")
  18. workflow_metadata = workflow.get_metadata("add_example")
  19. assert workflow_metadata["status"] == "SUCCESSFUL"
  20. assert "start_time" in workflow_metadata["stats"]
  21. assert "end_time" in workflow_metadata["stats"]
  22. You can also retrieve metadata for individual workflow tasks by
  23. providing the task name:
  24. .. code-block:: python
  25. workflow.run(
  26. add.options(
  27. **workflow.options(task_id="add_task")
  28. ).bind(10, 20), workflow_id="add_example_2")
  29. task_metadata = workflow.get_metadata("add_example_2", task_id="add_task")
  30. assert "start_time" in workflow_metadata["stats"]
  31. assert "end_time" in workflow_metadata["stats"]
  32. User-defined metadata
  33. ---------------------
  34. Custom metadata can be added to a workflow or a workflow task by the user,
  35. which is useful when you want to attach some extra information to the
  36. workflow or workflow task.
  37. - workflow-level metadata can be added via ``.run(metadata=metadata)``
  38. - task-level metadata can be added via ``.options(**workflow.options(metadata=metadata))`` or in the decorator ``@workflow.options(metadata=metadata)``
  39. .. code-block:: python
  40. workflow.run(add.options(**workflow.options(task_id="add_task", metadata={"task_k": "task_v"})).bind(10, 20),
  41. workflow_id="add_example_3", metadata={"workflow_k": "workflow_v"})
  42. assert workflow.get_metadata("add_example_3")["user_metadata"] == {"workflow_k": "workflow_v"}
  43. assert workflow.get_metadata("add_example_3", task_id="add_task")["user_metadata"] == {"task_k": "task_v"}
  44. **Note: user-defined metadata must be a python dictionary with values that are
  45. JSON serializable.**
  46. Available Metrics
  47. -----------------
  48. **Workflow level**
  49. - status: workflow states, can be one of RUNNING, FAILED, RESUMABLE, CANCELED, or SUCCESSFUL.
  50. - user_metadata: a python dictionary of custom metadata by the user via ``workflow.run()``.
  51. - stats: workflow running stats, including workflow start time and end time.
  52. **Task level**
  53. - name: name of the task, either provided by the user via ``task.options(**workflow.options(task_id=xxx))`` or generated by the system.
  54. - task_options: options of the task, either provided by the user via ``task.options()`` or default by system.
  55. - user_metadata: a python dictionary of custom metadata by the user via ``task.options()``.
  56. - stats: task running stats, including task start time and end time.
  57. Notes
  58. -----
  59. 1. Unlike ``get_output()``, ``get_metadata()`` returns an immediate
  60. result for the time it is called, this also means not all fields will
  61. be available in the result if corresponding metadata is not available
  62. (e.g., ``metadata["stats"]["end_time"]`` won't be available until the workflow
  63. is completed).
  64. .. code-block:: python
  65. @ray.remote
  66. def simple():
  67. flag.touch() # touch a file here
  68. time.sleep(1000)
  69. return 0
  70. workflow.run_async(simple.bind(), workflow_id=workflow_id)
  71. # make sure workflow task starts running
  72. while not flag.exists():
  73. time.sleep(1)
  74. workflow_metadata = workflow.get_metadata(workflow_id)
  75. assert workflow_metadata["status"] == "RUNNING"
  76. assert "start_time" in workflow_metadata["stats"]
  77. assert "end_time" not in workflow_metadata["stats"]
  78. workflow.cancel(workflow_id)
  79. workflow_metadata = workflow.get_metadata(workflow_id)
  80. assert workflow_metadata["status"] == "CANCELED"
  81. assert "task_options" in workflow_metadata
  82. assert "start_time" in workflow_metadata["stats"]
  83. assert "end_time" not in workflow_metadata["stats"]
  84. 2. For resumed workflows, the current behavior is that "stats" will
  85. be updated whenever a workflow is resumed.
  86. .. code-block:: python
  87. workflow_id = "simple"
  88. error_flag = tmp_path / "error"
  89. error_flag.touch()
  90. @ray.remote
  91. def simple():
  92. if error_flag.exists():
  93. raise ValueError()
  94. return 0
  95. with pytest.raises(ray.exceptions.RaySystemError):
  96. workflow.run(simple.bind(), workflow_id=workflow_id)
  97. workflow_metadata_failed = workflow.get_metadata(workflow_id)
  98. assert workflow_metadata_failed["status"] == "FAILED"
  99. # remove flag to make task success
  100. error_flag.unlink()
  101. ref = workflow.resume_async(workflow_id)
  102. assert ray.get(ref) == 0
  103. workflow_metadata_resumed = workflow.get_metadata(workflow_id)
  104. assert workflow_metadata_resumed["status"] == "SUCCESSFUL"
  105. # make sure resume updated running metrics
  106. assert workflow_metadata_resumed["stats"]["start_time"] > workflow_metadata_failed["stats"]["start_time"]
  107. assert workflow_metadata_resumed["stats"]["end_time"] > workflow_metadata_failed["stats"]["end_time"]