Action

dotflow.core.action.Action

Bases: object

Import

You can import the action decorator directly from dotflow:

from dotflow import action
Example

class dotflow.core.action.Action

Standard

@action
def my_task():
    print("task")

With Retry

@action(retry=5)
def my_task():
    print("task")

Parameters:

Name Type Description Default
func Callable
None
task Callable
None
retry int

Integer-type argument referring to the number of retry attempts the function will execute in case of failure.

1
Source code in dotflow/core/action.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class Action(object):
    """
    Import:
        You can import the **action** decorator directly from dotflow:

            from dotflow import action

    Example:
        `class` dotflow.core.action.Action

        Standard

            @action
            def my_task():
                print("task")

        With Retry

            @action(retry=5)
            def my_task():
                print("task")

    Args:
        func (Callable):

        task (Callable):

        retry (int):
            Integer-type argument referring to the number of retry attempts
            the function will execute in case of failure.
    """

    def __init__(
            self,
            func: Callable = None,
            task: Callable = None,
            retry: int = 1
    ) -> None:
        self.func = func
        self.task = task
        self.retry = retry
        self.params = []

    def __call__(self, *args, **kwargs):
        # With parameters
        if self.func:
            self._set_params()

            task = self._get_task(kwargs=kwargs)
            contexts = self._get_context(kwargs=kwargs)

            if contexts:
                return Context(
                    storage=self._retry(*args, **contexts),
                    task_id=task.task_id,
                    workflow_id=task.workflow_id
                )

            return Context(
                storage=self._retry(*args),
                task_id=task.task_id,
                workflow_id=task.workflow_id
            )

        # No parameters
        def action(*_args, **_kwargs):
            self.func = args[0]
            self._set_params()

            task = self._get_task(kwargs=_kwargs)
            contexts = self._get_context(kwargs=_kwargs)

            if contexts:
                return Context(
                    storage=self._retry(*_args, **contexts),
                    task_id=task.task_id,
                    workflow_id=task.workflow_id
                )

            return Context(
                storage=self._retry(*_args),
                task_id=task.task_id,
                workflow_id=task.workflow_id
            )

        return action

    def _retry(self, *args, **kwargs):
        attempt = 0
        exception = Exception()

        while self.retry > attempt:
            try:
                return self.func(*args, **kwargs)
            except Exception as error:
                exception = error
                attempt += 1

        raise exception

    def _set_params(self):
        if isinstance(self.func, FunctionType):
            self.params = [param for param in self.func.__code__.co_varnames]

        if type(self.func) is type:
            if hasattr(self.func, "__init__"):
                if hasattr(self.func.__init__, "__code__"):
                    self.params = [param for param in self.func.__init__.__code__.co_varnames]

    def _get_context(self, kwargs: Dict):
        context = {}
        if "initial_context" in self.params:
            context["initial_context"] = Context(kwargs.get("initial_context"))

        if "previous_context" in self.params:
            context["previous_context"] = Context(kwargs.get("previous_context"))

        return context

    def _get_task(self, kwargs: Dict):
        return kwargs.get("task")

func = func instance-attribute

task = task instance-attribute

retry = retry instance-attribute

params = [] instance-attribute

_retry(*args, **kwargs)

Source code in dotflow/core/action.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def _retry(self, *args, **kwargs):
    attempt = 0
    exception = Exception()

    while self.retry > attempt:
        try:
            return self.func(*args, **kwargs)
        except Exception as error:
            exception = error
            attempt += 1

    raise exception

_set_params()

Source code in dotflow/core/action.py
109
110
111
112
113
114
115
116
def _set_params(self):
    if isinstance(self.func, FunctionType):
        self.params = [param for param in self.func.__code__.co_varnames]

    if type(self.func) is type:
        if hasattr(self.func, "__init__"):
            if hasattr(self.func.__init__, "__code__"):
                self.params = [param for param in self.func.__init__.__code__.co_varnames]

_get_context(kwargs)

Source code in dotflow/core/action.py
118
119
120
121
122
123
124
125
126
def _get_context(self, kwargs: Dict):
    context = {}
    if "initial_context" in self.params:
        context["initial_context"] = Context(kwargs.get("initial_context"))

    if "previous_context" in self.params:
        context["previous_context"] = Context(kwargs.get("previous_context"))

    return context

_get_task(kwargs)

Source code in dotflow/core/action.py
128
129
def _get_task(self, kwargs: Dict):
    return kwargs.get("task")