Ir para o conteúdo

Action

dotflow.core.action.Action

Decorator for creating task steps.

Accepts configuration for retry, timeout, retry_delay, and backoff, which are stored as attributes and consumed by TaskEngine during execution.

Import

You can import the action decorator directly from dotflow:

from dotflow import action
Example

class dotflow.core.action.Action

Standard

@action
def my_task():
    print("task")

With Retry

@action(retry=5)
def my_task():
    print("task")

With Timeout

@action(timeout=60)
def my_task():
    print("task")

With Retry delay

@action(retry=5, retry_delay=5)
def my_task():
    print("task")

With Backoff

@action(retry=5, backoff=True)
def my_task():
    print("task")
Note

Retry, timeout, and backoff are enforced by TaskEngine.execute_with_retry(), not by Action itself. Action stores the configuration as attributes.

Parameters:

Name Type Description Default
func Callable
None
task Callable
None
retry int

Number of task retries on on_failure.

1
timeout int

Execution timeout for a task. Duration (in seconds)

0
retry_delay int

Retry delay on task on_failure. Duration (in seconds)

1
backoff bool

Exponential backoff. Doubles retry_delay after each attempt.

False
Source code in dotflow/core/action.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
class Action:
    """Decorator for creating task steps.

    Accepts configuration for retry, timeout, retry_delay, and backoff,
    which are stored as attributes and consumed by TaskEngine during execution.

    Import:
        You can import the **action** decorator directly from dotflow:

            from dotflow import action

    Example:
        `class` dotflow.core.action.Action

        Standard

            @action
            def my_task():
                print("task")

        With Retry

            @action(retry=5)
            def my_task():
                print("task")

        With Timeout

            @action(timeout=60)
            def my_task():
                print("task")

        With Retry delay

            @action(retry=5, retry_delay=5)
            def my_task():
                print("task")

        With Backoff

            @action(retry=5, backoff=True)
            def my_task():
                print("task")

    Note:
        Retry, timeout, and backoff are enforced by TaskEngine.execute_with_retry(),
        not by Action itself. Action stores the configuration as attributes.

    Args:
        func (Callable):

        task (Callable):

        retry (int): Number of task retries on on_failure.

        timeout (int): Execution timeout for a task. Duration (in seconds)

        retry_delay (int): Retry delay on task on_failure. Duration (in seconds)

        backoff (bool): Exponential backoff. Doubles retry_delay after each attempt.

    """

    def __init__(
        self,
        func: Callable = None,
        task: Callable = None,
        retry: int = 1,
        timeout: int = 0,
        retry_delay: int = 1,
        backoff: bool = False,
    ) -> None:
        self.func = func
        self.task = task
        self.retry = retry
        self.timeout = timeout
        self.retry_delay = retry_delay
        self.backoff = backoff
        self.params = []

    def __call__(self, *args, **kwargs):
        if self.func:
            self._set_params()

            task = self._get_task(kwargs=kwargs)
            contexts = self._get_context(kwargs=kwargs)

            if contexts:
                return Context(
                    storage=self._run_action(*args, **contexts),
                    task_id=task.task_id,
                    workflow_id=task.workflow_id,
                )

            return Context(
                storage=self._run_action(*args),
                task_id=task.task_id,
                workflow_id=task.workflow_id,
            )

        def action(*_args, **_kwargs):
            self.func = args[0]
            self._set_params()

            task = self._get_task(kwargs=_kwargs)
            contexts = self._get_context(kwargs=_kwargs)

            if contexts:
                return Context(
                    storage=self._run_action(*_args, **contexts),
                    task_id=task.task_id,
                    workflow_id=task.workflow_id,
                )

            return Context(
                storage=self._run_action(*_args),
                task_id=task.task_id,
                workflow_id=task.workflow_id,
            )

        action.retry = self.retry
        action.timeout = self.timeout
        action.retry_delay = self.retry_delay
        action.backoff = self.backoff

        return action

    def _run_action(self, *args, **kwargs):
        is_async = inspect.iscoroutinefunction(self.func)

        try:
            return self._call_func(is_async, *args, **kwargs)
        except Exception as error:
            if is_execution_with_class_internal_error(error=error):
                raise ExecutionWithClassError() from None
            raise

    def _call_func(self, is_async, *args, **kwargs):
        if not is_async:
            return self.func(*args, **kwargs)

        try:
            asyncio.get_running_loop()
        except RuntimeError:
            return asyncio.run(self.func(*args, **kwargs))

        with ThreadPoolExecutor(max_workers=1) as pool:
            return pool.submit(
                asyncio.run, self.func(*args, **kwargs)
            ).result()

    def _set_params(self):
        if isinstance(self.func, FunctionType):
            code = self.func.__code__
            self.params = list(
                code.co_varnames[: code.co_argcount + code.co_kwonlyargcount]
            )

        if (
            type(self.func) is type
            and hasattr(self.func, "__init__")
            and hasattr(self.func.__init__, "__code__")
        ):
            code = self.func.__init__.__code__
            self.params = list(
                code.co_varnames[: code.co_argcount + code.co_kwonlyargcount]
            )

    def _get_context(self, kwargs: dict):
        context = {}
        if "initial_context" in self.params:
            context["initial_context"] = Context(kwargs.get("initial_context"))

        if "previous_context" in self.params:
            context["previous_context"] = Context(
                kwargs.get("previous_context")
            )

        return context

    def _get_task(self, kwargs: dict):
        return kwargs.get("task")

func = func instance-attribute

task = task instance-attribute

retry = retry instance-attribute

timeout = timeout instance-attribute

retry_delay = retry_delay instance-attribute

backoff = backoff instance-attribute

params = [] instance-attribute

_run_action(*args, **kwargs)

Source code in dotflow/core/action.py
150
151
152
153
154
155
156
157
158
def _run_action(self, *args, **kwargs):
    is_async = inspect.iscoroutinefunction(self.func)

    try:
        return self._call_func(is_async, *args, **kwargs)
    except Exception as error:
        if is_execution_with_class_internal_error(error=error):
            raise ExecutionWithClassError() from None
        raise

_set_params()

Source code in dotflow/core/action.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def _set_params(self):
    if isinstance(self.func, FunctionType):
        code = self.func.__code__
        self.params = list(
            code.co_varnames[: code.co_argcount + code.co_kwonlyargcount]
        )

    if (
        type(self.func) is type
        and hasattr(self.func, "__init__")
        and hasattr(self.func.__init__, "__code__")
    ):
        code = self.func.__init__.__code__
        self.params = list(
            code.co_varnames[: code.co_argcount + code.co_kwonlyargcount]
        )

_get_context(kwargs)

Source code in dotflow/core/action.py
191
192
193
194
195
196
197
198
199
200
201
def _get_context(self, kwargs: dict):
    context = {}
    if "initial_context" in self.params:
        context["initial_context"] = Context(kwargs.get("initial_context"))

    if "previous_context" in self.params:
        context["previous_context"] = Context(
            kwargs.get("previous_context")
        )

    return context

_get_task(kwargs)

Source code in dotflow/core/action.py
203
204
def _get_task(self, kwargs: dict):
    return kwargs.get("task")