Scheduler Cron¶
Execute workflows de forma recorrente usando expressões cron. Sem ferramentas externas — tudo fica em Python.
Note
Requer pip install dotflow[scheduler]
Exemplo¶
from dotflow import Config, DotFlow, action
from dotflow.providers import SchedulerCron
@action
def extract():
return {"data": "fetched"}
@action
def load(previous_context):
return {"saved": previous_context.storage}
def main():
config = Config(
scheduler=SchedulerCron(cron="*/1 * * * *"),
)
workflow = DotFlow(config=config)
workflow.task.add(step=extract)
workflow.task.add(step=load)
workflow.schedule()
if __name__ == "__main__":
main()
Com resume¶
Combine agendamento com checkpoint. Se uma execução falhar, a próxima execução agendada continua do último passo concluído.
from dotflow import Config, DotFlow, action
from dotflow.providers import SchedulerCron, StorageFile
@action
def extract():
return {"data": "fetched"}
@action
def transform(previous_context):
return {"transformed": previous_context.storage}
@action
def load(previous_context):
return {"saved": previous_context.storage}
def main():
config = Config(
storage=StorageFile(),
scheduler=SchedulerCron(cron="0 6 * * *"),
)
workflow = DotFlow(config=config)
workflow.task.add(step=extract)
workflow.task.add(step=transform)
workflow.task.add(step=load)
workflow.schedule(resume=True)
if __name__ == "__main__":
main()
Estratégias de overlap¶
Controla o que acontece quando uma nova execução é disparada enquanto a anterior ainda está rodando.
| Estratégia | Comportamento |
|---|---|
skip (padrão) |
Se a execução anterior ainda está ativa, pula esta execução |
queue |
Enfileira a execução, roda quando a anterior terminar |
parallel |
Roda independentemente, mesmo se a anterior ainda estiver ativa |
# Code above omitted 👆
def main():
config_skip = Config(
scheduler=SchedulerCron(cron="*/5 * * * *", overlap="skip"),
)
config_queue = Config(
scheduler=SchedulerCron(cron="*/5 * * * *", overlap="queue"),
)
config_parallel = Config(
scheduler=SchedulerCron(cron="*/5 * * * *", overlap="parallel"),
)
workflow = DotFlow(config=config_skip)
workflow.task.add(step=heavy_task)
workflow.schedule()
# Code below omitted 👇
👀 Full file preview
from dotflow import Config, DotFlow, action
from dotflow.providers import SchedulerCron
@action
def heavy_task():
import time
time.sleep(120)
return {"done": True}
def main():
config_skip = Config(
scheduler=SchedulerCron(cron="*/5 * * * *", overlap="skip"),
)
config_queue = Config(
scheduler=SchedulerCron(cron="*/5 * * * *", overlap="queue"),
)
config_parallel = Config(
scheduler=SchedulerCron(cron="*/5 * * * *", overlap="parallel"),
)
workflow = DotFlow(config=config_skip)
workflow.task.add(step=heavy_task)
workflow.schedule()
if __name__ == "__main__":
main()
Fluxo de execução¶
skip — cron a cada 5 min, tarefa leva 7 min:
graph LR
A["00:00 — executa"] -->|rodando| B["00:05 — pula"]
B -->|ainda rodando| C["00:10 — executa"]
style A fill:#4caf50,color:#fff
style B fill:#9e9e9e,color:#fff
style C fill:#4caf50,color:#fff
queue — cron a cada 5 min, tarefa leva 7 min:
graph LR
A["00:00 — executa"] -->|rodando| B["00:05 — enfileirado"]
B -->|"executa ~00:07"| C["00:10 — enfileirado"]
style A fill:#4caf50,color:#fff
style B fill:#ff9800,color:#fff
style C fill:#ff9800,color:#fff
parallel — cron a cada 5 min, tarefa leva 7 min:
graph LR
A["00:00 — executa"] -->|rodando| B["00:05 — executa"]
B -->|ambos rodando| C["00:10 — executa"]
style A fill:#4caf50,color:#fff
style B fill:#4caf50,color:#fff
style C fill:#4caf50,color:#fff
Desligamento gracioso¶
O scheduler escuta sinais SIGINT (Ctrl+C) e SIGTERM. Quando recebido, a execução atual termina e o scheduler para de forma limpa.