Ir para o conteúdo

Checkpoint with input-change policy

resume=True skips tasks that already have a checkpoint. But what happens when the input changed between runs? Without an explicit policy, the old checkpoint silently wins and the new payload is ignored — a common footgun in queued/job-based deployments.

on_input_change makes that behavior explicit.

How it works

  1. On each start(), dotflow computes a fingerprint of initial_context and stores it next to the checkpoints
  2. On the next run, the stored fingerprint is compared with the current one
  3. If they differ, dotflow applies the policy you chose

Policies

Policy Behavior
reuse Keep checkpoints, ignore input change (legacy behavior, default)
reset Invalidate checkpoints and re-run from scratch
raise Raise InputChangedError and force the caller to decide

Example

from hashlib import sha256
from uuid import UUID

from dotflow import Config, DotFlow, action
from dotflow.providers import StorageFile


@action
def step_one(initial_context):
    return {"loaded": initial_context.storage}


@action
def step_two(previous_context):
    return {"transformed": previous_context.storage}


config = Config(storage=StorageFile())


def main():
    payload = {"s3_key": "uploads/data.zip", "v": 1}

    workflow = DotFlow(
        config=config,
        workflow_id=UUID("12345678-1234-5678-1234-567812345678"),
    )

    workflow.task.add(step=[step_one, step_two], initial_context=payload)

    fp = sha256(payload["s3_key"].encode()).hexdigest()
    workflow.start(resume=True, on_input_change="reset", fingerprint=fp)

    return workflow


if __name__ == "__main__":
    main()

When to use each policy

  • reset — input drives the result. Same job key with new payload should re-process. Common in ETL where customer_id is stable but the dataset grows.
  • raise — operator must approve. Useful in financial / regulated pipelines where a silent re-run would be unsafe.
  • reuse — input is informational. Defaults preserve current behavior for existing pipelines.

Custom fingerprint

Pass an explicit fingerprint= to override the automatic computation. Useful when the payload contains volatile fields (timestamps, request IDs) that should not invalidate the checkpoint.

fp = sha256(payload["s3_key"].encode()).hexdigest()
workflow.start(resume=True, on_input_change="reset", fingerprint=fp)

References