Skip to content

Output: result_task

workflow.result_task() returns the full Task objects for every task in the queue. Each Task carries identity (task_id), execution status (status), error history (errors), and the produced context (current_context).

Function step

from dotflow import DotFlow, action


@action
def simple_step():
    return "ok"


def main():
    workflow = DotFlow()

    workflow.task.add(step=simple_step)
    workflow.start()

    for task in workflow.result_task():
        print(task.task_id, task.status, task.current_context.storage)
        assert task.current_context.storage == "ok"

    return workflow


if __name__ == "__main__":
    main()

Class step

For a class-based step, the outer Task.current_context.storage is a list of nested Context objects produced by the inner @action methods.

from dotflow import DotFlow, action


@action(retry=5)
class Step:
    def auxiliary_function(self):
        """This function will not be executed, because
        it does not have an 'action' decorator.
        """

    @action
    def first_function(self):
        return {"foo": "bar"}

    @action
    def second_function(self):
        return True


def main():
    workflow = DotFlow()

    workflow.task.add(step=Step)
    workflow.start()

    for task in workflow.result_task():
        for current_context in task.current_context.storage:
            print(task.task_id, task.status, current_context.storage)
            assert current_context.storage

    return workflow


if __name__ == "__main__":
    main()

When to use

  • Full post-mortem — task status, retry count, errors, durations
  • Building dashboards / serializers (use task.schema() or task.result())
  • Driving conditional logic on task.status after the workflow completes