Skip to content

Scheduled Pipeline with Resume

Combine cron-based scheduling with checkpoint resume to build resilient pipelines that recover automatically from failures.

How it works

  1. SchedulerCron runs the workflow on a cron schedule
  2. StorageFile (or S3/GCS) persists task output after each step
  3. If a step fails, the next scheduled run skips completed steps and resumes from the failure point

Example

from dotflow import Config, DotFlow, action
from dotflow.providers import SchedulerCron, StorageFile


@action
def extract():
    return {"records": fetch_from_api()}


@action
def transform(previous_context):
    data = previous_context.storage["records"]
    return {"cleaned": clean(data)}


@action
def load(previous_context):
    save_to_database(previous_context.storage["cleaned"])
    return {"loaded": True}


def main():
    config = Config(
        storage=StorageFile(),
        scheduler=SchedulerCron(cron="0 6 * * *", overlap="skip"),
    )

    workflow = DotFlow(config=config, workflow_id="etl-daily")
    workflow.task.add(step=extract)
    workflow.task.add(step=transform)
    workflow.task.add(step=load)
    workflow.schedule(mode="sequential", resume=True)


if __name__ == "__main__":
    main()

Execution flow

Day 1 — load fails:

graph LR
    A[extract] -->|completed| B[transform]
    B -->|completed| C[load]
    C -->|failed| D((stop))
    style A fill:#4caf50,color:#fff
    style B fill:#4caf50,color:#fff
    style C fill:#f44336,color:#fff

Day 2 — automatic resume:

graph LR
    A[extract] -->|skipped| B[transform]
    B -->|skipped| C[load]
    C -->|completed| D((done))
    style A fill:#9e9e9e,color:#fff
    style B fill:#9e9e9e,color:#fff
    style C fill:#4caf50,color:#fff

Requirements

  • A fixed workflow_id — so checkpoints persist across scheduled runs
  • A persistent storage providerStorageFile, StorageS3, or StorageGCS
  • resume=True passed to workflow.schedule()

References