Skip to content

SchedulerCron

dotflow.providers.scheduler_cron.SchedulerCron

Bases: Scheduler

Import

You can import the SchedulerCron class with:

from dotflow.providers import SchedulerCron
Example

class dotflow.providers.scheduler_cron.SchedulerCron

from dotflow import DotFlow, Config
from dotflow.providers import SchedulerCron

config = Config(
    scheduler=SchedulerCron(
        cron="*/5 * * * *",
        overlap="skip",
    )
)

workflow = DotFlow(config=config)
workflow.task.add(step=extract)
workflow.task.add(step=load)
workflow.schedule()

Parameters:

Name Type Description Default
cron str

A cron expression defining the schedule (e.g. "*/5 * * * *" for every 5 minutes).

required
overlap str

Strategy when a previous run is still active. One of "skip", "queue", or "parallel". Defaults to "skip".

SKIP

Attributes:

Name Type Description
cron str

The cron expression.

overlap str

The overlap strategy.

running bool

Whether the scheduler loop is active.

Source code in dotflow/providers/scheduler_cron.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
class SchedulerCron(Scheduler):
    """
    Import:
        You can import the **SchedulerCron** class with:

            from dotflow.providers import SchedulerCron

    Example:
        `class` dotflow.providers.scheduler_cron.SchedulerCron

            from dotflow import DotFlow, Config
            from dotflow.providers import SchedulerCron

            config = Config(
                scheduler=SchedulerCron(
                    cron="*/5 * * * *",
                    overlap="skip",
                )
            )

            workflow = DotFlow(config=config)
            workflow.task.add(step=extract)
            workflow.task.add(step=load)
            workflow.schedule()

    Args:
        cron (str): A cron expression defining the schedule
            (e.g. ``"*/5 * * * *"`` for every 5 minutes).

        overlap (str): Strategy when a previous run is still active.
            One of ``"skip"``, ``"queue"``, or ``"parallel"``.
            Defaults to ``"skip"``.

    Attributes:
        cron (str): The cron expression.

        overlap (str): The overlap strategy.

        running (bool): Whether the scheduler loop is active.
    """

    def __init__(
        self,
        cron: str,
        overlap: str = TypeOverlap.SKIP,
    ) -> None:
        try:
            from croniter import croniter  # noqa: F401
        except ImportError as err:
            raise ModuleNotFound(
                module="croniter", library="dotflow[scheduler]"
            ) from err

        try:
            croniter(cron, datetime.now())
        except (ValueError, KeyError) as err:
            raise ValueError(
                f"Invalid cron expression {cron!r}: {err}"
            ) from err

        valid_overlaps = {
            TypeOverlap.SKIP,
            TypeOverlap.QUEUE,
            TypeOverlap.PARALLEL,
        }
        if overlap not in valid_overlaps:
            raise ValueError(
                f"Invalid overlap {overlap!r}. "
                f"Must be one of: {sorted(valid_overlaps)}"
            )

        self.cron = cron
        self.overlap = overlap
        self.running = False
        self._executing = False
        self._lock = threading.Lock()
        self._queue_count = 0
        self._parallel_semaphore = threading.Semaphore(10)
        self._threads: list[threading.Thread] = []
        self._prev_sigint = None
        self._prev_sigterm = None

    def start(self, workflow: Callable, **kwargs) -> None:
        """Start the scheduler loop. Blocks the main thread.

        Args:
            workflow (Callable): The workflow start function to execute
                on each scheduled run.
            **kwargs: Additional keyword arguments passed to the workflow
                execution (e.g. mode, resume).
        """
        from croniter import croniter

        self.running = True
        self._register_signals()

        cron_iter = croniter(self.cron, datetime.now())

        while self.running:
            next_run = cron_iter.get_next(datetime)
            wait = (next_run - datetime.now()).total_seconds()

            while wait > 0 and self.running:
                step = min(wait, 1.0)
                sleep(step)
                wait -= step

            if not self.running:
                break

            self._dispatch(workflow=workflow, **kwargs)

    def stop(self, timeout: float | None = None) -> None:
        """Stop the scheduler loop and wait for in-flight threads.

        Args:
            timeout: Max seconds to wait for each thread. None = wait forever.
        """
        self.running = False
        self._restore_signals()
        with self._lock:
            threads, self._threads = self._threads, []
        for thread in threads:
            thread.join(timeout=timeout)

    def _dispatch(self, workflow: Callable, **kwargs) -> None:
        if self.overlap == TypeOverlap.SKIP:
            self._dispatch_skip(workflow=workflow, **kwargs)
        elif self.overlap == TypeOverlap.QUEUE:
            self._dispatch_queue(workflow=workflow, **kwargs)
        elif self.overlap == TypeOverlap.PARALLEL:
            self._dispatch_parallel(workflow=workflow, **kwargs)
        else:
            logger.error(
                "Unknown overlap strategy %r — workflow not dispatched",
                self.overlap,
            )

    def _track_thread(self, thread: threading.Thread) -> None:
        with self._lock:
            self._threads = [t for t in self._threads if t.is_alive()]
            self._threads.append(thread)

    def _dispatch_skip(self, workflow: Callable, **kwargs) -> None:
        with self._lock:
            if self._executing:
                return
            self._executing = True

        thread = threading.Thread(
            target=self._execute_and_reset,
            args=(workflow,),
            kwargs=kwargs,
        )
        self._track_thread(thread)
        thread.start()

    def _dispatch_queue(self, workflow: Callable, **kwargs) -> None:
        with self._lock:
            if self._executing:
                if self._queue_count == 0:
                    self._queue_count = 1
                return
            self._executing = True

        thread = threading.Thread(
            target=self._execute_queued,
            args=(workflow,),
            kwargs=kwargs,
        )
        self._track_thread(thread)
        thread.start()

    def _dispatch_parallel(self, workflow: Callable, **kwargs) -> None:
        if not self._parallel_semaphore.acquire(blocking=False):
            return

        thread = threading.Thread(
            target=self._execute_parallel,
            args=(workflow,),
            kwargs=kwargs,
        )
        self._track_thread(thread)
        thread.start()

    def _execute_parallel(self, workflow: Callable, **kwargs) -> None:
        try:
            workflow(**kwargs)
        except Exception:
            logger.exception("Scheduled workflow execution failed")
        finally:
            self._parallel_semaphore.release()

    def _execute_and_reset(self, workflow: Callable, **kwargs) -> None:
        try:
            workflow(**kwargs)
        except Exception:
            logger.exception("Scheduled workflow execution failed")
        finally:
            with self._lock:
                self._executing = False

    def _execute_queued(self, workflow: Callable, **kwargs) -> None:
        try:
            workflow(**kwargs)
        except Exception:
            logger.exception("Scheduled workflow execution failed")
        finally:
            with self._lock:
                if self._queue_count > 0:
                    self._queue_count = 0
                    next_thread = threading.Thread(
                        target=self._execute_queued,
                        args=(workflow,),
                        kwargs=kwargs,
                    )
                else:
                    self._executing = False
                    next_thread = None

            if next_thread is not None:
                self._track_thread(next_thread)
                next_thread.start()

    def _register_signals(self) -> None:
        if threading.current_thread() is not threading.main_thread():
            return
        self._prev_sigint = signal.signal(signal.SIGINT, self._handle_signal)
        self._prev_sigterm = signal.signal(signal.SIGTERM, self._handle_signal)

    def _restore_signals(self) -> None:
        if threading.current_thread() is not threading.main_thread():
            return
        if self._prev_sigint is not None:
            signal.signal(signal.SIGINT, self._prev_sigint)
            self._prev_sigint = None
        if self._prev_sigterm is not None:
            signal.signal(signal.SIGTERM, self._prev_sigterm)
            self._prev_sigterm = None

    def _handle_signal(self, signum, frame) -> None:
        self.stop()

cron = cron instance-attribute

overlap = overlap instance-attribute

running = False instance-attribute

__init__(cron, overlap=TypeOverlap.SKIP)

Source code in dotflow/providers/scheduler_cron.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def __init__(
    self,
    cron: str,
    overlap: str = TypeOverlap.SKIP,
) -> None:
    try:
        from croniter import croniter  # noqa: F401
    except ImportError as err:
        raise ModuleNotFound(
            module="croniter", library="dotflow[scheduler]"
        ) from err

    try:
        croniter(cron, datetime.now())
    except (ValueError, KeyError) as err:
        raise ValueError(
            f"Invalid cron expression {cron!r}: {err}"
        ) from err

    valid_overlaps = {
        TypeOverlap.SKIP,
        TypeOverlap.QUEUE,
        TypeOverlap.PARALLEL,
    }
    if overlap not in valid_overlaps:
        raise ValueError(
            f"Invalid overlap {overlap!r}. "
            f"Must be one of: {sorted(valid_overlaps)}"
        )

    self.cron = cron
    self.overlap = overlap
    self.running = False
    self._executing = False
    self._lock = threading.Lock()
    self._queue_count = 0
    self._parallel_semaphore = threading.Semaphore(10)
    self._threads: list[threading.Thread] = []
    self._prev_sigint = None
    self._prev_sigterm = None

start(workflow, **kwargs)

Start the scheduler loop. Blocks the main thread.

Parameters:

Name Type Description Default
workflow Callable

The workflow start function to execute on each scheduled run.

required
**kwargs

Additional keyword arguments passed to the workflow execution (e.g. mode, resume).

{}
Source code in dotflow/providers/scheduler_cron.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def start(self, workflow: Callable, **kwargs) -> None:
    """Start the scheduler loop. Blocks the main thread.

    Args:
        workflow (Callable): The workflow start function to execute
            on each scheduled run.
        **kwargs: Additional keyword arguments passed to the workflow
            execution (e.g. mode, resume).
    """
    from croniter import croniter

    self.running = True
    self._register_signals()

    cron_iter = croniter(self.cron, datetime.now())

    while self.running:
        next_run = cron_iter.get_next(datetime)
        wait = (next_run - datetime.now()).total_seconds()

        while wait > 0 and self.running:
            step = min(wait, 1.0)
            sleep(step)
            wait -= step

        if not self.running:
            break

        self._dispatch(workflow=workflow, **kwargs)

stop(timeout=None)

Stop the scheduler loop and wait for in-flight threads.

Parameters:

Name Type Description Default
timeout float | None

Max seconds to wait for each thread. None = wait forever.

None
Source code in dotflow/providers/scheduler_cron.py
129
130
131
132
133
134
135
136
137
138
139
140
def stop(self, timeout: float | None = None) -> None:
    """Stop the scheduler loop and wait for in-flight threads.

    Args:
        timeout: Max seconds to wait for each thread. None = wait forever.
    """
    self.running = False
    self._restore_signals()
    with self._lock:
        threads, self._threads = self._threads, []
    for thread in threads:
        thread.join(timeout=timeout)