Scheduler Cron¶
Run workflows on a recurring schedule using cron expressions. No external tools needed β everything stays in Python.
Note
Requires pip install dotflow[scheduler]
Example¶
from dotflow import Config, DotFlow, action
from dotflow.providers import SchedulerCron
@action
def extract():
return {"data": "fetched"}
@action
def load(previous_context):
return {"saved": previous_context.storage}
def main():
config = Config(
scheduler=SchedulerCron(cron="*/1 * * * *"),
)
workflow = DotFlow(config=config)
workflow.task.add(step=extract)
workflow.task.add(step=load)
workflow.schedule()
if __name__ == "__main__":
main()
With resume¶
Combine scheduling with checkpoint-based resume. If a run fails, the next scheduled run picks up from the last completed step.
from dotflow import Config, DotFlow, action
from dotflow.providers import SchedulerCron, StorageFile
@action
def extract():
return {"data": "fetched"}
@action
def transform(previous_context):
return {"transformed": previous_context.storage}
@action
def load(previous_context):
return {"saved": previous_context.storage}
def main():
config = Config(
storage=StorageFile(),
scheduler=SchedulerCron(cron="0 6 * * *"),
)
workflow = DotFlow(config=config)
workflow.task.add(step=extract)
workflow.task.add(step=transform)
workflow.task.add(step=load)
workflow.schedule(resume=True)
if __name__ == "__main__":
main()
Overlap strategies¶
Controls what happens when a new execution is triggered while the previous one is still running.
| Strategy | Behavior |
|---|---|
skip (default) |
If previous run is still active, skip this execution |
queue |
Queue the execution, run when previous completes |
parallel |
Run regardless, even if previous is still active |
# Code above omitted π
def main():
config_skip = Config(
scheduler=SchedulerCron(cron="*/5 * * * *", overlap="skip"),
)
config_queue = Config(
scheduler=SchedulerCron(cron="*/5 * * * *", overlap="queue"),
)
config_parallel = Config(
scheduler=SchedulerCron(cron="*/5 * * * *", overlap="parallel"),
)
workflow = DotFlow(config=config_skip)
workflow.task.add(step=heavy_task)
workflow.schedule()
# Code below omitted π
π Full file preview
from dotflow import Config, DotFlow, action
from dotflow.providers import SchedulerCron
@action
def heavy_task():
import time
time.sleep(120)
return {"done": True}
def main():
config_skip = Config(
scheduler=SchedulerCron(cron="*/5 * * * *", overlap="skip"),
)
config_queue = Config(
scheduler=SchedulerCron(cron="*/5 * * * *", overlap="queue"),
)
config_parallel = Config(
scheduler=SchedulerCron(cron="*/5 * * * *", overlap="parallel"),
)
workflow = DotFlow(config=config_skip)
workflow.task.add(step=heavy_task)
workflow.schedule()
if __name__ == "__main__":
main()
Execution flow¶
skip β cron every 5 min, task takes 7 min:
graph LR
A["00:00 β run"] -->|running| B["00:05 β skip"]
B -->|still running| C["00:10 β run"]
style A fill:#4caf50,color:#fff
style B fill:#9e9e9e,color:#fff
style C fill:#4caf50,color:#fff
queue β cron every 5 min, task takes 7 min:
graph LR
A["00:00 β run"] -->|running| B["00:05 β queued"]
B -->|"run at ~00:07"| C["00:10 β queued"]
style A fill:#4caf50,color:#fff
style B fill:#ff9800,color:#fff
style C fill:#ff9800,color:#fff
parallel β cron every 5 min, task takes 7 min:
graph LR
A["00:00 β run"] -->|running| B["00:05 β run"]
B -->|both running| C["00:10 β run"]
style A fill:#4caf50,color:#fff
style B fill:#4caf50,color:#fff
style C fill:#4caf50,color:#fff
Graceful shutdown¶
The scheduler listens for SIGINT (Ctrl+C) and SIGTERM signals. When received, the current execution finishes and the scheduler stops cleanly.