Previous Context¶
Sending context¶
To send a context, just return some information in your task implementation. You can return any object, and it will become part of the context.
from dotflow import DotFlow, action, Context
@action
def extract_task():
print("extract")
return "extract"
@action
def transform_task(previous_context: Context):
print(previous_context.storage, "transform")
assert previous_context.storage == "extract"
return "transform"
@action
def load_task(previous_context: Context):
print(previous_context.storage, "load")
assert previous_context.storage == "transform"
return "load"
def main():
workflow = DotFlow()
workflow.task.add(step=extract_task)
workflow.task.add(step=transform_task)
workflow.task.add(step=load_task)
workflow.start()
return workflow
if __name__ == "__main__":
main()
Receiving context¶
In this example, an previous context value is received from the previous_contexto
attribute defined in the function.
from dotflow import DotFlow, action, Context
@action
def extract_task():
print("extract")
return "extract"
@action
def transform_task(previous_context: Context):
print(previous_context.storage, "transform")
assert previous_context.storage == "extract"
return "transform"
@action
def load_task(previous_context: Context):
print(previous_context.storage, "load")
assert previous_context.storage == "transform"
return "load"
def main():
workflow = DotFlow()
workflow.task.add(step=extract_task)
workflow.task.add(step=transform_task)
workflow.task.add(step=load_task)
workflow.start()
return workflow
if __name__ == "__main__":
main()