Task¶
Tasks can also receive a specific callback, and itβs quite simple.
Receiving callback¶
In this example, we receive a Task object.
# Code above omitted π
def callback_one(task: Task):
assert task.status is TypeStatus.COMPLETED
print(task.task_id, task.status, task.current_context.storage)
def callback_two(task: Task):
assert task.status is TypeStatus.FAILED
print(task.task_id, task.status, task.current_context.storage)
# Code below omitted π
π Full file preview
from dotflow import DotFlow, action, Task
from dotflow.core.types.status import TypeStatus
def callback_one(task: Task):
assert task.status is TypeStatus.COMPLETED
print(task.task_id, task.status, task.current_context.storage)
def callback_two(task: Task):
assert task.status is TypeStatus.FAILED
print(task.task_id, task.status, task.current_context.storage)
@action
def simple_one():
return "ok"
@action
def simple_two():
raise Exception("Fail!")
def main():
workflow = DotFlow()
workflow.task.add(step=simple_one, callback=callback_one)
workflow.task.add(step=simple_two, callback=callback_two)
workflow.start(keep_going=True)
return workflow
if __name__ == "__main__":
main()
Including callback in the tasks¶
To add a callback to a task, simply include it when adding the task to the queue β and thatβs it, the callback will be executed once the task finishes.
# Code above omitted π
def main():
workflow = DotFlow()
workflow.task.add(step=simple_one, callback=callback_one)
workflow.task.add(step=simple_two, callback=callback_two)
workflow.start(keep_going=True)
return workflow
# Code below omitted π
π Full file preview
from dotflow import DotFlow, action, Task
from dotflow.core.types.status import TypeStatus
def callback_one(task: Task):
assert task.status is TypeStatus.COMPLETED
print(task.task_id, task.status, task.current_context.storage)
def callback_two(task: Task):
assert task.status is TypeStatus.FAILED
print(task.task_id, task.status, task.current_context.storage)
@action
def simple_one():
return "ok"
@action
def simple_two():
raise Exception("Fail!")
def main():
workflow = DotFlow()
workflow.task.add(step=simple_one, callback=callback_one)
workflow.task.add(step=simple_two, callback=callback_two)
workflow.start(keep_going=True)
return workflow
if __name__ == "__main__":
main()
Warning
The parameter keep_going
was included to keep the workflow running even if a task fails.
# Code above omitted π
def main():
workflow = DotFlow()
workflow.task.add(step=simple_one, callback=callback_one)
workflow.task.add(step=simple_two, callback=callback_two)
workflow.start(keep_going=True)
return workflow
# Code below omitted π
π Full file preview
from dotflow import DotFlow, action, Task
from dotflow.core.types.status import TypeStatus
def callback_one(task: Task):
assert task.status is TypeStatus.COMPLETED
print(task.task_id, task.status, task.current_context.storage)
def callback_two(task: Task):
assert task.status is TypeStatus.FAILED
print(task.task_id, task.status, task.current_context.storage)
@action
def simple_one():
return "ok"
@action
def simple_two():
raise Exception("Fail!")
def main():
workflow = DotFlow()
workflow.task.add(step=simple_one, callback=callback_one)
workflow.task.add(step=simple_two, callback=callback_two)
workflow.start(keep_going=True)
return workflow
if __name__ == "__main__":
main()