Skip to content

Parallel Execution with Groups

Run independent task groups in parallel while keeping tasks within each group sequential. This is useful when you have multiple independent pipelines that share no state.

How it works

  1. Tasks with the same group_name run sequentially, passing context between them
  2. Different groups run in parallel using separate processes
  3. Results from all groups are collected after all processes complete

Example

from dotflow import DotFlow, action


@action
def fetch_users():
    return {"users": ["alice", "bob"]}


@action
def process_users(previous_context):
    return {"processed": len(previous_context.storage["users"])}


@action
def fetch_orders():
    return {"orders": [101, 102, 103]}


@action
def process_orders(previous_context):
    return {"total": len(previous_context.storage["orders"])}


def main():
    workflow = DotFlow()
    workflow.task.add(step=fetch_users, group_name="users")
    workflow.task.add(step=process_users, group_name="users")
    workflow.task.add(step=fetch_orders, group_name="orders")
    workflow.task.add(step=process_orders, group_name="orders")
    workflow.start(mode="sequential")


if __name__ == "__main__":
    main()

Execution flow

flowchart TD
    A[Start] --> B[Group: users]
    A --> C[Group: orders]
    B --> D[fetch_users]
    D --> E[process_users]
    C --> F[fetch_orders]
    F --> G[process_orders]
    E --> H[Finish]
    G --> H

References