Output: result_task¶
workflow.result_task() returns the full Task objects for every task in the queue. Each Task carries identity (task_id), execution status (status), error history (errors), and the produced context (current_context).
Function step¶
from dotflow import DotFlow, action
@action
def simple_step():
return "ok"
def main():
workflow = DotFlow()
workflow.task.add(step=simple_step)
workflow.start()
for task in workflow.result_task():
print(task.task_id, task.status, task.current_context.storage)
assert task.current_context.storage == "ok"
return workflow
if __name__ == "__main__":
main()
Class step¶
For a class-based step, the outer Task.current_context.storage is a list of nested Context objects produced by the inner @action methods.
from dotflow import DotFlow, action
@action(retry=5)
class Step:
def auxiliary_function(self):
"""This function will not be executed, because
it does not have an 'action' decorator.
"""
@action
def first_function(self):
return {"foo": "bar"}
@action
def second_function(self):
return True
def main():
workflow = DotFlow()
workflow.task.add(step=Step)
workflow.start()
for task in workflow.result_task():
for current_context in task.current_context.storage:
print(task.task_id, task.status, current_context.storage)
assert current_context.storage
return workflow
if __name__ == "__main__":
main()
When to use¶
- Full post-mortem — task status, retry count, errors, durations
- Building dashboards / serializers (use
task.schema()ortask.result()) - Driving conditional logic on
task.statusafter the workflow completes
Related¶
- result_context — only the produced contexts
- result_storage — only the payloads
- Task reference