Task

dotflow.core.task.Task

Bases: TaskInstance

Import

You can import the Task class directly from dotflow:

from dotflow import Task
Example

class dotflow.core.task.Task

task = Task(
    task_id=1,
    step=my_step,
    callback=my_callback
)

Parameters:

Name Type Description Default
task_id int

Task ID.

required
step Callable

A argument that receives an object of the callable type, which is basically a function. You can see in this example.

required
callback Callable

Any callable object that receives args or kwargs, which is basically a function. You can see in this example.

basic_callback
initial_context Any

Any python object.

None
workflow_id UUID

Workflow ID.

None
config Config

Configuration class.

None
Source code in dotflow/core/task.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
class Task(TaskInstance):
    """
    Import:
        You can import the **Task** class directly from dotflow:

            from dotflow import Task

    Example:
        `class` dotflow.core.task.Task

            task = Task(
                task_id=1,
                step=my_step,
                callback=my_callback
            )

    Args:
        task_id (int): Task ID.

        step (Callable):
            A argument that receives an object of the callable type,
            which is basically a function. You can see in this
            [example](https://dotflow-io.github.io/dotflow/nav/getting-started/#3-task-function).

        callback (Callable):
            Any callable object that receives **args** or **kwargs**,
            which is basically a function. You can see in this
            [example](https://dotflow-io.github.io/dotflow/nav/getting-started/#2-callback-function).

        initial_context (Any): Any python object.

        workflow_id (UUID): Workflow ID.

        config (Config): Configuration class.
    """

    def __init__(
        self,
        task_id: int,
        step: Callable,
        callback: Callable = basic_callback,
        initial_context: Any = None,
        workflow_id: UUID = None,
        config: Config = None,
    ) -> None:
        super().__init__(
            task_id,
            step,
            callback,
            initial_context,
            workflow_id
        )
        self.task_id = task_id
        self.workflow_id = workflow_id
        self.step = step
        self.callback = callback
        self.initial_context = initial_context
        self.status = TaskStatus.NOT_STARTED
        self.config = config

    @property
    def step(self):
        return self._step

    @step.setter
    def step(self, value: Callable):
        new_step = value

        if isinstance(value, str):
            new_step = Module(value=value)

        if new_step.__module__ != Action.__module__:
            raise MissingActionDecorator()

        self._step = new_step

    @property
    def callback(self):
        return self._callback

    @callback.setter
    def callback(self, value: Callable):
        new_callback = value

        if isinstance(value, str):
            new_callback = Module(value=value)

        if not isinstance(new_callback, Callable):
            raise NotCallableObject(name=str(new_callback))

        self._callback = new_callback

    @property
    def previous_context(self):
        if not self._previous_context:
            return Context()
        return self._previous_context

    @previous_context.setter
    def previous_context(self, value: Context):
        self._previous_context = Context(value)

    @property
    def initial_context(self):
        if not self._initial_context:
            return Context()
        return self._initial_context

    @initial_context.setter
    def initial_context(self, value: Context):
        self._initial_context = Context(value)

    @property
    def current_context(self):
        if not self._current_context:
            return Context()
        return self._current_context

    @current_context.setter
    def current_context(self, value: Context):
        self._current_context = Context(
            task_id=self.task_id,
            workflow_id=self.workflow_id,
            storage=value
        )

        self.config.storage.post(
            key=self.config.storage.key(task=self),
            context=self.current_context
        )

    @property
    def duration(self):
        return self._duration

    @duration.setter
    def duration(self, value: float):
        self._duration = value

    @property
    def error(self):
        if not self._error:
            return TaskError()
        return self._error

    @error.setter
    def error(self, value: Exception) -> None:
        task_error = TaskError(value)
        self._error = task_error

        logger.error(
            "ID %s - %s - %s \n %s",
            self.workflow_id,
            self.task_id,
            self.status,
            task_error.traceback,
        )

        console = Console()
        console.print_exception(show_locals=True)

    @property
    def status(self):
        if not self._status:
            return TaskStatus.NOT_STARTED
        return self._status

    @status.setter
    def status(self, value: TaskStatus) -> None:
        self._status = value

        logger.info(
            "ID %s - %s - %s",
            self.workflow_id,
            self.task_id,
            self.status,
        )

    @property
    def config(self):
        if not self._config:
            return Config()
        return self._config

    @config.setter
    def config(self, value: Config):
        self._config = value

task_id = task_id instance-attribute

workflow_id = workflow_id instance-attribute

step property writable

callback property writable

initial_context property writable

status property writable

config property writable