Workflow¶
The callback mechanism in a Workflow is global, meaning that it applies to all tasks added to the execution queue. There are two types of workflow callbacks:
Success Callback¶
This callback is triggered only when the entire workflow completes successfully, with all tasks executed without errors.
Including callback on success¶
# Code above omitted π
def main():
workflow = DotFlow()
workflow.task.add(step=simple_step)
workflow.start(on_success=callback)
return workflow
# Code below omitted π
π Full file preview
from typing import List
from dotflow import DotFlow, action, Task
from dotflow.core.types.status import TypeStatus
def callback(tasks: List[Task]):
assert tasks
assert len(tasks)
for task in tasks:
assert task.status is TypeStatus.COMPLETED
print(task.task_id, task.status, task.current_context.storage)
@action
def simple_step():
return "ok"
def main():
workflow = DotFlow()
workflow.task.add(step=simple_step)
workflow.start(on_success=callback)
return workflow
if __name__ == "__main__":
main()
Receiving callback¶
In this example, we receive a list containing the Task object.
# Code above omitted π
def callback(tasks: List[Task]):
assert tasks
assert len(tasks)
for task in tasks:
assert task.status is TypeStatus.COMPLETED
print(task.task_id, task.status, task.current_context.storage)
# Code below omitted π
π Full file preview
from typing import List
from dotflow import DotFlow, action, Task
from dotflow.core.types.status import TypeStatus
def callback(tasks: List[Task]):
assert tasks
assert len(tasks)
for task in tasks:
assert task.status is TypeStatus.COMPLETED
print(task.task_id, task.status, task.current_context.storage)
@action
def simple_step():
return "ok"
def main():
workflow = DotFlow()
workflow.task.add(step=simple_step)
workflow.start(on_success=callback)
return workflow
if __name__ == "__main__":
main()
Failure Callback¶
This callback is triggered if any task in the workflow fails during execution.
These callbacks should be used to handle post-execution actions depending on the overall result of the workflow, such as logging, notifications, or cleanup operations.
Including callback on failure¶
# Code above omitted π
def main():
workflow = DotFlow()
workflow.task.add(step=simple_step)
workflow.start(on_failure=callback)
return workflow
# Code below omitted π
π Full file preview
from typing import List
from dotflow import DotFlow, action, Task
from dotflow.core.types.status import TypeStatus
def callback(tasks: List[Task]):
assert tasks
assert len(tasks)
for task in tasks:
assert task.status is TypeStatus.FAILED
print(task.task_id, task.status, task.current_context.storage)
@action
def simple_step():
raise Exception("Fail!")
def main():
workflow = DotFlow()
workflow.task.add(step=simple_step)
workflow.start(on_failure=callback)
return workflow
if __name__ == "__main__":
main()
Receiving callback¶
In this example, we receive a list containing the Task object.
# Code above omitted π
def callback(tasks: List[Task]):
assert tasks
assert len(tasks)
for task in tasks:
assert task.status is TypeStatus.FAILED
print(task.task_id, task.status, task.current_context.storage)
# Code below omitted π
π Full file preview
from typing import List
from dotflow import DotFlow, action, Task
from dotflow.core.types.status import TypeStatus
def callback(tasks: List[Task]):
assert tasks
assert len(tasks)
for task in tasks:
assert task.status is TypeStatus.FAILED
print(task.task_id, task.status, task.current_context.storage)
@action
def simple_step():
raise Exception("Fail!")
def main():
workflow = DotFlow()
workflow.task.add(step=simple_step)
workflow.start(on_failure=callback)
return workflow
if __name__ == "__main__":
main()