Skip to content

Previous Context

Sending context

To send a context, just return some information in your task implementation. You can return any object, and it will become part of the context.

from dotflow import DotFlow, action, Context


@action
def extract_task():
    print("extract")
    return "extract"


@action
def transform_task(previous_context: Context):
    print(previous_context.storage, "transform")
    assert previous_context.storage == "extract"

    return "transform"


@action
def load_task(previous_context: Context):
    print(previous_context.storage, "load")
    assert previous_context.storage == "transform"

    return "load"


def main():
    workflow = DotFlow()

    workflow.task.add(step=extract_task)
    workflow.task.add(step=transform_task)
    workflow.task.add(step=load_task)

    workflow.start()

    return workflow


if __name__ == "__main__":
    main()

Receiving context

In this example, an previous context value is received from the previous_contexto attribute defined in the function.

from dotflow import DotFlow, action, Context


@action
def extract_task():
    print("extract")
    return "extract"


@action
def transform_task(previous_context: Context):
    print(previous_context.storage, "transform")
    assert previous_context.storage == "extract"

    return "transform"


@action
def load_task(previous_context: Context):
    print(previous_context.storage, "load")
    assert previous_context.storage == "transform"

    return "load"


def main():
    workflow = DotFlow()

    workflow.task.add(step=extract_task)
    workflow.task.add(step=transform_task)
    workflow.task.add(step=load_task)

    workflow.start()

    return workflow


if __name__ == "__main__":
    main()