Scheduled Pipeline with Resume¶
Combine cron-based scheduling with checkpoint resume to build resilient pipelines that recover automatically from failures.
How it works¶
SchedulerCronruns the workflow on a cron scheduleStorageFile(or S3/GCS) persists task output after each step- If a step fails, the next scheduled run skips completed steps and resumes from the failure point
Example¶
from dotflow import Config, DotFlow, action
from dotflow.providers import SchedulerCron, StorageFile
@action
def extract():
return {"records": fetch_from_api()}
@action
def transform(previous_context):
data = previous_context.storage["records"]
return {"cleaned": clean(data)}
@action
def load(previous_context):
save_to_database(previous_context.storage["cleaned"])
return {"loaded": True}
def main():
config = Config(
storage=StorageFile(),
scheduler=SchedulerCron(cron="0 6 * * *", overlap="skip"),
)
workflow = DotFlow(config=config, workflow_id="etl-daily")
workflow.task.add(step=extract)
workflow.task.add(step=transform)
workflow.task.add(step=load)
workflow.schedule(mode="sequential", resume=True)
if __name__ == "__main__":
main()
Execution flow¶
Day 1 — load fails:
graph LR
A[extract] -->|completed| B[transform]
B -->|completed| C[load]
C -->|failed| D((stop))
style A fill:#4caf50,color:#fff
style B fill:#4caf50,color:#fff
style C fill:#f44336,color:#fff
Day 2 — automatic resume:
graph LR
A[extract] -->|skipped| B[transform]
B -->|skipped| C[load]
C -->|completed| D((done))
style A fill:#9e9e9e,color:#fff
style B fill:#9e9e9e,color:#fff
style C fill:#4caf50,color:#fff
Requirements¶
- A fixed
workflow_id— so checkpoints persist across scheduled runs - A persistent storage provider —
StorageFile,StorageS3, orStorageGCS resume=Truepassed toworkflow.schedule()