Checkpoint with input-change policy¶
resume=True skips tasks that already have a checkpoint. But what happens when the input changed between runs? Without an explicit policy, the old checkpoint silently wins and the new payload is ignored — a common footgun in queued/job-based deployments.
on_input_change makes that behavior explicit.
How it works¶
- On each
start(), dotflow computes a fingerprint ofinitial_contextand stores it next to the checkpoints - On the next run, the stored fingerprint is compared with the current one
- If they differ, dotflow applies the policy you chose
Policies¶
| Policy | Behavior |
|---|---|
reuse |
Keep checkpoints, ignore input change (legacy behavior, default) |
reset |
Invalidate checkpoints and re-run from scratch |
raise |
Raise InputChangedError and force the caller to decide |
Example¶
from hashlib import sha256
from uuid import UUID
from dotflow import Config, DotFlow, action
from dotflow.providers import StorageFile
@action
def step_one(initial_context):
return {"loaded": initial_context.storage}
@action
def step_two(previous_context):
return {"transformed": previous_context.storage}
config = Config(storage=StorageFile())
def main():
payload = {"s3_key": "uploads/data.zip", "v": 1}
workflow = DotFlow(
config=config,
workflow_id=UUID("12345678-1234-5678-1234-567812345678"),
)
workflow.task.add(step=[step_one, step_two], initial_context=payload)
fp = sha256(payload["s3_key"].encode()).hexdigest()
workflow.start(resume=True, on_input_change="reset", fingerprint=fp)
return workflow
if __name__ == "__main__":
main()
When to use each policy¶
reset— input drives the result. Same job key with new payload should re-process. Common in ETL wherecustomer_idis stable but the dataset grows.raise— operator must approve. Useful in financial / regulated pipelines where a silent re-run would be unsafe.reuse— input is informational. Defaults preserve current behavior for existing pipelines.
Custom fingerprint¶
Pass an explicit fingerprint= to override the automatic computation. Useful when the payload contains volatile fields (timestamps, request IDs) that should not invalidate the checkpoint.
fp = sha256(payload["s3_key"].encode()).hexdigest()
workflow.start(resume=True, on_input_change="reset", fingerprint=fp)