Skip to content

Initial Context

Sending context

The initial context can be sent when adding a task to the queue using the initial_context attribute.

from dotflow import DotFlow, action


@action
def extract_task(initial_context):
    print(initial_context.storage, "extract")
    assert initial_context.storage == {"foo": True}

    return "extract"


@action
def transform_task(initial_context):
    print(initial_context.storage, "transform")
    assert initial_context.storage == {"bar": True}

    return "transform"


@action
def load_task():
    return "load"


def main():
    workflow = DotFlow()

    workflow.task.add(step=extract_task, initial_context={"foo": True})
    workflow.task.add(step=transform_task, initial_context={"bar": True})
    workflow.task.add(step=load_task)

    workflow.start()

    return workflow


if __name__ == "__main__":
    main()

Receiving context

In this example, an initial context value is received from the initial_context attribute defined in the function.

from dotflow import DotFlow, action


@action
def extract_task(initial_context):
    print(initial_context.storage, "extract")
    assert initial_context.storage == {"foo": True}

    return "extract"


@action
def transform_task(initial_context):
    print(initial_context.storage, "transform")
    assert initial_context.storage == {"bar": True}

    return "transform"


@action
def load_task():
    return "load"


def main():
    workflow = DotFlow()

    workflow.task.add(step=extract_task, initial_context={"foo": True})
    workflow.task.add(step=transform_task, initial_context={"bar": True})
    workflow.task.add(step=load_task)

    workflow.start()

    return workflow


if __name__ == "__main__":
    main()