123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- Workflow Metadata
- =================
- Observability is important for workflows - sometimes we not only want
- to get the output, but also want to gain insights on the internal
- states (e.g., to measure the performance or find bottlenecks).
- Workflow metadata provides several stats that help understand
- the workflow, from basic running status and task options to performance
- and user-imposed metadata.
- Retrieving metadata
- -------------------
- Workflow metadata can be retrieved with ``workflow.get_metadata(workflow_id)``.
- For example:
- .. code-block:: python
- @ray.remote
- def add(left: int, right: int) -> int:
- return left + right
- workflow.run(add.bind(10, 20), workflow_id="add_example")
- workflow_metadata = workflow.get_metadata("add_example")
- assert workflow_metadata["status"] == "SUCCESSFUL"
- assert "start_time" in workflow_metadata["stats"]
- assert "end_time" in workflow_metadata["stats"]
- You can also retrieve metadata for individual workflow tasks by
- providing the task name:
- .. code-block:: python
- workflow.run(
- add.options(
- **workflow.options(task_id="add_task")
- ).bind(10, 20), workflow_id="add_example_2")
- task_metadata = workflow.get_metadata("add_example_2", task_id="add_task")
- assert "start_time" in workflow_metadata["stats"]
- assert "end_time" in workflow_metadata["stats"]
- User-defined metadata
- ---------------------
- Custom metadata can be added to a workflow or a workflow task by the user,
- which is useful when you want to attach some extra information to the
- workflow or workflow task.
- - workflow-level metadata can be added via ``.run(metadata=metadata)``
- - task-level metadata can be added via ``.options(**workflow.options(metadata=metadata))`` or in the decorator ``@workflow.options(metadata=metadata)``
- .. code-block:: python
- workflow.run(add.options(**workflow.options(task_id="add_task", metadata={"task_k": "task_v"})).bind(10, 20),
- workflow_id="add_example_3", metadata={"workflow_k": "workflow_v"})
- assert workflow.get_metadata("add_example_3")["user_metadata"] == {"workflow_k": "workflow_v"}
- assert workflow.get_metadata("add_example_3", task_id="add_task")["user_metadata"] == {"task_k": "task_v"}
- **Note: user-defined metadata must be a python dictionary with values that are
- JSON serializable.**
- Available Metrics
- -----------------
- **Workflow level**
- - status: workflow states, can be one of RUNNING, FAILED, RESUMABLE, CANCELED, or SUCCESSFUL.
- - user_metadata: a python dictionary of custom metadata by the user via ``workflow.run()``.
- - stats: workflow running stats, including workflow start time and end time.
- **Task level**
- - name: name of the task, either provided by the user via ``task.options(**workflow.options(task_id=xxx))`` or generated by the system.
- - task_options: options of the task, either provided by the user via ``task.options()`` or default by system.
- - user_metadata: a python dictionary of custom metadata by the user via ``task.options()``.
- - stats: task running stats, including task start time and end time.
- Notes
- -----
- 1. Unlike ``get_output()``, ``get_metadata()`` returns an immediate
- result for the time it is called, this also means not all fields will
- be available in the result if corresponding metadata is not available
- (e.g., ``metadata["stats"]["end_time"]`` won't be available until the workflow
- is completed).
- .. code-block:: python
- @ray.remote
- def simple():
- flag.touch() # touch a file here
- time.sleep(1000)
- return 0
- workflow.run_async(simple.bind(), workflow_id=workflow_id)
- # make sure workflow task starts running
- while not flag.exists():
- time.sleep(1)
- workflow_metadata = workflow.get_metadata(workflow_id)
- assert workflow_metadata["status"] == "RUNNING"
- assert "start_time" in workflow_metadata["stats"]
- assert "end_time" not in workflow_metadata["stats"]
- workflow.cancel(workflow_id)
- workflow_metadata = workflow.get_metadata(workflow_id)
- assert workflow_metadata["status"] == "CANCELED"
- assert "task_options" in workflow_metadata
- assert "start_time" in workflow_metadata["stats"]
- assert "end_time" not in workflow_metadata["stats"]
- 2. For resumed workflows, the current behavior is that "stats" will
- be updated whenever a workflow is resumed.
- .. code-block:: python
- workflow_id = "simple"
- error_flag = tmp_path / "error"
- error_flag.touch()
- @ray.remote
- def simple():
- if error_flag.exists():
- raise ValueError()
- return 0
- with pytest.raises(ray.exceptions.RaySystemError):
- workflow.run(simple.bind(), workflow_id=workflow_id)
- workflow_metadata_failed = workflow.get_metadata(workflow_id)
- assert workflow_metadata_failed["status"] == "FAILED"
- # remove flag to make task success
- error_flag.unlink()
- ref = workflow.resume_async(workflow_id)
- assert ray.get(ref) == 0
- workflow_metadata_resumed = workflow.get_metadata(workflow_id)
- assert workflow_metadata_resumed["status"] == "SUCCESSFUL"
- # make sure resume updated running metrics
- assert workflow_metadata_resumed["stats"]["start_time"] > workflow_metadata_failed["stats"]["start_time"]
- assert workflow_metadata_resumed["stats"]["end_time"] > workflow_metadata_failed["stats"]["end_time"]
|