Ir para o conteúdo

Scheduler Cron

Execute workflows de forma recorrente usando expressões cron. Sem ferramentas externas — tudo fica em Python.

Note

Requer pip install dotflow[scheduler]

Exemplo

from dotflow import Config, DotFlow, action
from dotflow.providers import SchedulerCron


@action
def extract():
    return {"data": "fetched"}


@action
def load(previous_context):
    return {"saved": previous_context.storage}


def main():
    config = Config(
        scheduler=SchedulerCron(cron="*/1 * * * *"),
    )

    workflow = DotFlow(config=config)
    workflow.task.add(step=extract)
    workflow.task.add(step=load)
    workflow.schedule()


if __name__ == "__main__":
    main()

Com resume

Combine agendamento com checkpoint. Se uma execução falhar, a próxima execução agendada continua do último passo concluído.

from dotflow import Config, DotFlow, action
from dotflow.providers import SchedulerCron, StorageFile


@action
def extract():
    return {"data": "fetched"}


@action
def transform(previous_context):
    return {"transformed": previous_context.storage}


@action
def load(previous_context):
    return {"saved": previous_context.storage}


def main():
    config = Config(
        storage=StorageFile(),
        scheduler=SchedulerCron(cron="0 6 * * *"),
    )

    workflow = DotFlow(config=config)
    workflow.task.add(step=extract)
    workflow.task.add(step=transform)
    workflow.task.add(step=load)
    workflow.schedule(resume=True)


if __name__ == "__main__":
    main()

Estratégias de overlap

Controla o que acontece quando uma nova execução é disparada enquanto a anterior ainda está rodando.

Estratégia Comportamento
skip (padrão) Se a execução anterior ainda está ativa, pula esta execução
queue Enfileira a execução, roda quando a anterior terminar
parallel Roda independentemente, mesmo se a anterior ainda estiver ativa
# Code above omitted 👆

def main():
    config_skip = Config(
        scheduler=SchedulerCron(cron="*/5 * * * *", overlap="skip"),
    )

    config_queue = Config(
        scheduler=SchedulerCron(cron="*/5 * * * *", overlap="queue"),
    )

    config_parallel = Config(
        scheduler=SchedulerCron(cron="*/5 * * * *", overlap="parallel"),
    )

    workflow = DotFlow(config=config_skip)
    workflow.task.add(step=heavy_task)
    workflow.schedule()

# Code below omitted 👇
👀 Full file preview
from dotflow import Config, DotFlow, action
from dotflow.providers import SchedulerCron


@action
def heavy_task():
    import time

    time.sleep(120)
    return {"done": True}


def main():
    config_skip = Config(
        scheduler=SchedulerCron(cron="*/5 * * * *", overlap="skip"),
    )

    config_queue = Config(
        scheduler=SchedulerCron(cron="*/5 * * * *", overlap="queue"),
    )

    config_parallel = Config(
        scheduler=SchedulerCron(cron="*/5 * * * *", overlap="parallel"),
    )

    workflow = DotFlow(config=config_skip)
    workflow.task.add(step=heavy_task)
    workflow.schedule()


if __name__ == "__main__":
    main()

Fluxo de execução

skip — cron a cada 5 min, tarefa leva 7 min:

graph LR
    A["00:00 — executa"] -->|rodando| B["00:05 — pula"]
    B -->|ainda rodando| C["00:10 — executa"]
    style A fill:#4caf50,color:#fff
    style B fill:#9e9e9e,color:#fff
    style C fill:#4caf50,color:#fff

queue — cron a cada 5 min, tarefa leva 7 min:

graph LR
    A["00:00 — executa"] -->|rodando| B["00:05 — enfileirado"]
    B -->|"executa ~00:07"| C["00:10 — enfileirado"]
    style A fill:#4caf50,color:#fff
    style B fill:#ff9800,color:#fff
    style C fill:#ff9800,color:#fff

parallel — cron a cada 5 min, tarefa leva 7 min:

graph LR
    A["00:00 — executa"] -->|rodando| B["00:05 — executa"]
    B -->|ambos rodando| C["00:10 — executa"]
    style A fill:#4caf50,color:#fff
    style B fill:#4caf50,color:#fff
    style C fill:#4caf50,color:#fff

Desligamento gracioso

O scheduler escuta sinais SIGINT (Ctrl+C) e SIGTERM. Quando recebido, a execução atual termina e o scheduler para de forma limpa.

Referências