Ir para o conteúdo

Task

Tasks can also receive a specific callback, and it’s quite simple.

Receiving callback

In this example, we receive a Task object.

# Code above omitted 👆

def callback_one(task: Task):
    assert task.status is TypeStatus.COMPLETED
    print(task.task_id, task.status, task.current_context.storage)


def callback_two(task: Task):
    assert task.status is TypeStatus.FAILED
    print(task.task_id, task.status, task.current_context.storage)

# Code below omitted 👇
👀 Full file preview
from dotflow import DotFlow, action, Task
from dotflow.core.types.status import TypeStatus


def callback_one(task: Task):
    assert task.status is TypeStatus.COMPLETED
    print(task.task_id, task.status, task.current_context.storage)


def callback_two(task: Task):
    assert task.status is TypeStatus.FAILED
    print(task.task_id, task.status, task.current_context.storage)


@action
def simple_one():
    return "ok"


@action
def simple_two():
    raise Exception("Fail!")


def main():
    workflow = DotFlow()

    workflow.task.add(step=simple_one, callback=callback_one)
    workflow.task.add(step=simple_two, callback=callback_two)
    workflow.start(keep_going=True)

    return workflow


if __name__ == "__main__":
    main()

Including callback in the tasks

To add a callback to a task, simply include it when adding the task to the queue — and that’s it, the callback will be executed once the task finishes.

# Code above omitted 👆

def main():
    workflow = DotFlow()

    workflow.task.add(step=simple_one, callback=callback_one)
    workflow.task.add(step=simple_two, callback=callback_two)
    workflow.start(keep_going=True)

    return workflow

# Code below omitted 👇
👀 Full file preview
from dotflow import DotFlow, action, Task
from dotflow.core.types.status import TypeStatus


def callback_one(task: Task):
    assert task.status is TypeStatus.COMPLETED
    print(task.task_id, task.status, task.current_context.storage)


def callback_two(task: Task):
    assert task.status is TypeStatus.FAILED
    print(task.task_id, task.status, task.current_context.storage)


@action
def simple_one():
    return "ok"


@action
def simple_two():
    raise Exception("Fail!")


def main():
    workflow = DotFlow()

    workflow.task.add(step=simple_one, callback=callback_one)
    workflow.task.add(step=simple_two, callback=callback_two)
    workflow.start(keep_going=True)

    return workflow


if __name__ == "__main__":
    main()

Warning

The parameter keep_going was included to keep the workflow running even if a task fails.

# Code above omitted 👆

def main():
    workflow = DotFlow()

    workflow.task.add(step=simple_one, callback=callback_one)
    workflow.task.add(step=simple_two, callback=callback_two)
    workflow.start(keep_going=True)

    return workflow

# Code below omitted 👇
👀 Full file preview
from dotflow import DotFlow, action, Task
from dotflow.core.types.status import TypeStatus


def callback_one(task: Task):
    assert task.status is TypeStatus.COMPLETED
    print(task.task_id, task.status, task.current_context.storage)


def callback_two(task: Task):
    assert task.status is TypeStatus.FAILED
    print(task.task_id, task.status, task.current_context.storage)


@action
def simple_one():
    return "ok"


@action
def simple_two():
    raise Exception("Fail!")


def main():
    workflow = DotFlow()

    workflow.task.add(step=simple_one, callback=callback_one)
    workflow.task.add(step=simple_two, callback=callback_two)
    workflow.start(keep_going=True)

    return workflow


if __name__ == "__main__":
    main()