Skip to content

Scheduler Cron

Run workflows on a recurring schedule using cron expressions. No external tools needed β€” everything stays in Python.

Note

Requires pip install dotflow[scheduler]

Example

from dotflow import Config, DotFlow, action
from dotflow.providers import SchedulerCron


@action
def extract():
    return {"data": "fetched"}


@action
def load(previous_context):
    return {"saved": previous_context.storage}


def main():
    config = Config(
        scheduler=SchedulerCron(cron="*/1 * * * *"),
    )

    workflow = DotFlow(config=config)
    workflow.task.add(step=extract)
    workflow.task.add(step=load)
    workflow.schedule()


if __name__ == "__main__":
    main()

With resume

Combine scheduling with checkpoint-based resume. If a run fails, the next scheduled run picks up from the last completed step.

from dotflow import Config, DotFlow, action
from dotflow.providers import SchedulerCron, StorageFile


@action
def extract():
    return {"data": "fetched"}


@action
def transform(previous_context):
    return {"transformed": previous_context.storage}


@action
def load(previous_context):
    return {"saved": previous_context.storage}


def main():
    config = Config(
        storage=StorageFile(),
        scheduler=SchedulerCron(cron="0 6 * * *"),
    )

    workflow = DotFlow(config=config)
    workflow.task.add(step=extract)
    workflow.task.add(step=transform)
    workflow.task.add(step=load)
    workflow.schedule(resume=True)


if __name__ == "__main__":
    main()

Overlap strategies

Controls what happens when a new execution is triggered while the previous one is still running.

Strategy Behavior
skip (default) If previous run is still active, skip this execution
queue Queue the execution, run when previous completes
parallel Run regardless, even if previous is still active
# Code above omitted πŸ‘†

def main():
    config_skip = Config(
        scheduler=SchedulerCron(cron="*/5 * * * *", overlap="skip"),
    )

    config_queue = Config(
        scheduler=SchedulerCron(cron="*/5 * * * *", overlap="queue"),
    )

    config_parallel = Config(
        scheduler=SchedulerCron(cron="*/5 * * * *", overlap="parallel"),
    )

    workflow = DotFlow(config=config_skip)
    workflow.task.add(step=heavy_task)
    workflow.schedule()

# Code below omitted πŸ‘‡
πŸ‘€ Full file preview
from dotflow import Config, DotFlow, action
from dotflow.providers import SchedulerCron


@action
def heavy_task():
    import time

    time.sleep(120)
    return {"done": True}


def main():
    config_skip = Config(
        scheduler=SchedulerCron(cron="*/5 * * * *", overlap="skip"),
    )

    config_queue = Config(
        scheduler=SchedulerCron(cron="*/5 * * * *", overlap="queue"),
    )

    config_parallel = Config(
        scheduler=SchedulerCron(cron="*/5 * * * *", overlap="parallel"),
    )

    workflow = DotFlow(config=config_skip)
    workflow.task.add(step=heavy_task)
    workflow.schedule()


if __name__ == "__main__":
    main()

Execution flow

skip β€” cron every 5 min, task takes 7 min:

graph LR
    A["00:00 β€” run"] -->|running| B["00:05 β€” skip"]
    B -->|still running| C["00:10 β€” run"]
    style A fill:#4caf50,color:#fff
    style B fill:#9e9e9e,color:#fff
    style C fill:#4caf50,color:#fff

queue β€” cron every 5 min, task takes 7 min:

graph LR
    A["00:00 β€” run"] -->|running| B["00:05 β€” queued"]
    B -->|"run at ~00:07"| C["00:10 β€” queued"]
    style A fill:#4caf50,color:#fff
    style B fill:#ff9800,color:#fff
    style C fill:#ff9800,color:#fff

parallel β€” cron every 5 min, task takes 7 min:

graph LR
    A["00:00 β€” run"] -->|running| B["00:05 β€” run"]
    B -->|both running| C["00:10 β€” run"]
    style A fill:#4caf50,color:#fff
    style B fill:#4caf50,color:#fff
    style C fill:#4caf50,color:#fff

Graceful shutdown

The scheduler listens for SIGINT (Ctrl+C) and SIGTERM signals. When received, the current execution finishes and the scheduler stops cleanly.

References