Ir para o conteúdo

Workflow

The callback mechanism in a Workflow is global, meaning that it applies to all tasks added to the execution queue. There are two types of workflow callbacks:

Success Callback

This callback is triggered only when the entire workflow completes successfully, with all tasks executed without errors.

Including callback on success

# Code above omitted 👆

def main():
    workflow = DotFlow()

    workflow.task.add(step=simple_step)
    workflow.start(on_success=callback)

    return workflow

# Code below omitted 👇
👀 Full file preview
from typing import List

from dotflow import DotFlow, action, Task
from dotflow.core.types.status import TypeStatus


def callback(tasks: List[Task]):
    assert tasks
    assert len(tasks)

    for task in tasks:
        assert task.status is TypeStatus.COMPLETED
        print(task.task_id, task.status, task.current_context.storage)


@action
def simple_step():
    return "ok"


def main():
    workflow = DotFlow()

    workflow.task.add(step=simple_step)
    workflow.start(on_success=callback)

    return workflow


if __name__ == "__main__":
    main()

Receiving callback

In this example, we receive a list containing the Task object.

# Code above omitted 👆

def callback(tasks: List[Task]):
    assert tasks
    assert len(tasks)

    for task in tasks:
        assert task.status is TypeStatus.COMPLETED
        print(task.task_id, task.status, task.current_context.storage)

# Code below omitted 👇
👀 Full file preview
from typing import List

from dotflow import DotFlow, action, Task
from dotflow.core.types.status import TypeStatus


def callback(tasks: List[Task]):
    assert tasks
    assert len(tasks)

    for task in tasks:
        assert task.status is TypeStatus.COMPLETED
        print(task.task_id, task.status, task.current_context.storage)


@action
def simple_step():
    return "ok"


def main():
    workflow = DotFlow()

    workflow.task.add(step=simple_step)
    workflow.start(on_success=callback)

    return workflow


if __name__ == "__main__":
    main()

Failure Callback

This callback is triggered if any task in the workflow fails during execution.

These callbacks should be used to handle post-execution actions depending on the overall result of the workflow, such as logging, notifications, or cleanup operations.

Including callback on failure

# Code above omitted 👆

def main():
    workflow = DotFlow()

    workflow.task.add(step=simple_step)
    workflow.start(on_failure=callback)

    return workflow

# Code below omitted 👇
👀 Full file preview
from typing import List

from dotflow import DotFlow, action, Task
from dotflow.core.types.status import TypeStatus


def callback(tasks: List[Task]):
    assert tasks
    assert len(tasks)

    for task in tasks:
        assert task.status is TypeStatus.FAILED
        print(task.task_id, task.status, task.current_context.storage)


@action
def simple_step():
    raise Exception("Fail!")


def main():
    workflow = DotFlow()

    workflow.task.add(step=simple_step)
    workflow.start(on_failure=callback)

    return workflow


if __name__ == "__main__":
    main()

Receiving callback

In this example, we receive a list containing the Task object.

# Code above omitted 👆

def callback(tasks: List[Task]):
    assert tasks
    assert len(tasks)

    for task in tasks:
        assert task.status is TypeStatus.FAILED
        print(task.task_id, task.status, task.current_context.storage)

# Code below omitted 👇
👀 Full file preview
from typing import List

from dotflow import DotFlow, action, Task
from dotflow.core.types.status import TypeStatus


def callback(tasks: List[Task]):
    assert tasks
    assert len(tasks)

    for task in tasks:
        assert task.status is TypeStatus.FAILED
        print(task.task_id, task.status, task.current_context.storage)


@action
def simple_step():
    raise Exception("Fail!")


def main():
    workflow = DotFlow()

    workflow.task.add(step=simple_step)
    workflow.start(on_failure=callback)

    return workflow


if __name__ == "__main__":
    main()