Skip to content

Task

dotflow.core.task.Task

Bases: TaskInstance

Import

You can import the Task class directly from dotflow:

from dotflow import Task
Example

class dotflow.core.task.Task

task = Task(
    task_id=1,
    step=my_step,
    callback=my_callback
)

Parameters:

Name Type Description Default
task_id int

Task ID.

required
step Callable

A argument that receives an object of the callable type, which is basically a function. You can see in this example.

required
callback Callable

Any callable object that receives args or kwargs, which is basically a function. You can see in this example.

basic_callback
initial_context Any

Any python object.

None
workflow_id UUID

Workflow ID.

None
config Config

Configuration class.

None
group_name str

Group name of tasks.

'default'
Source code in dotflow/core/task.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
class Task(TaskInstance):
    """
    Import:
        You can import the **Task** class directly from dotflow:

            from dotflow import Task

    Example:
        `class` dotflow.core.task.Task

            task = Task(
                task_id=1,
                step=my_step,
                callback=my_callback
            )

    Args:
        task_id (int): Task ID.

        step (Callable):
            A argument that receives an object of the callable type,
            which is basically a function. You can see in this
            [example](https://dotflow-io.github.io/dotflow/nav/getting-started/#3-task-function).

        callback (Callable):
            Any callable object that receives **args** or **kwargs**,
            which is basically a function. You can see in this
            [example](https://dotflow-io.github.io/dotflow/nav/getting-started/#2-callback-function).

        initial_context (Any): Any python object.

        workflow_id (UUID): Workflow ID.

        config (Config): Configuration class.

        group_name (str): Group name of tasks.
    """

    def __init__(
        self,
        task_id: int,
        step: Callable,
        callback: Callable = basic_callback,
        initial_context: Any = None,
        workflow_id: UUID = None,
        config: Config = None,
        group_name: str = "default"
    ) -> None:
        super().__init__(
            task_id,
            step,
            callback,
            initial_context,
            workflow_id,
            config,
            group_name
        )
        self.config = config
        self.group_name = group_name
        self.task_id = task_id
        self.workflow_id = workflow_id
        self.step = step
        self.callback = callback
        self.initial_context = initial_context
        self.status = TypeStatus.NOT_STARTED

    @property
    def step(self):
        return self._step

    @step.setter
    def step(self, value: Callable):
        new_step = value

        if isinstance(value, str):
            new_step = Module(value=value)

        if new_step.__module__ != Action.__module__:
            raise MissingActionDecorator()

        self._step = new_step

    @property
    def callback(self):
        return self._callback

    @callback.setter
    def callback(self, value: Callable):
        new_callback = value

        if isinstance(value, str):
            new_callback = Module(value=value)

        if not isinstance(new_callback, Callable):
            raise NotCallableObject(name=str(new_callback))

        self._callback = new_callback

    @property
    def previous_context(self):
        if not self._previous_context:
            return Context()
        return self._previous_context

    @previous_context.setter
    def previous_context(self, value: Context):
        self._previous_context = Context(value)

    @property
    def initial_context(self):
        if not self._initial_context:
            return Context()
        return self._initial_context

    @initial_context.setter
    def initial_context(self, value: Context):
        self._initial_context = Context(value)

    @property
    def current_context(self):
        if not self._current_context:
            return Context()
        return self._current_context

    @current_context.setter
    def current_context(self, value: Context):
        self._current_context = Context(
            task_id=self.task_id,
            workflow_id=self.workflow_id,
            storage=value
        )

        self.config.storage.post(
            key=self.config.storage.key(task=self),
            context=self.current_context
        )

    @property
    def duration(self):
        return self._duration

    @duration.setter
    def duration(self, value: float):
        self._duration = value

    @property
    def error(self):
        if not self._error:
            return TaskError()
        return self._error

    @error.setter
    def error(self, value: Exception) -> None:
        if isinstance(value, TaskError):
            self._error = value

        if isinstance(value, Exception):
            task_error = TaskError(value)
            self._error = task_error

            self.config.log.error(task=self)

    @property
    def status(self):
        if not self._status:
            return TypeStatus.NOT_STARTED
        return self._status

    @status.setter
    def status(self, value: TypeStatus) -> None:
        self._status = value

        self.config.notify.send(task=self)
        self.config.log.info(task=self)

    @property
    def config(self):
        if not self._config:
            return Config()
        return self._config

    @config.setter
    def config(self, value: Config):
        self._config = value

    def schema(self, max: int = None) -> SerializerTask:
        return SerializerTask(**self.__dict__, max=max)

    def result(self, max: int = None) -> SerializerWorkflow:
        item = self.schema(max=max).model_dump_json()
        return json.loads(item)

callback property writable

config property writable

current_context property writable

duration property writable

error property writable

group_name = group_name instance-attribute

initial_context property writable

previous_context property writable

status property writable

step property writable

task_id = task_id instance-attribute

workflow_id = workflow_id instance-attribute

__init__(task_id, step, callback=basic_callback, initial_context=None, workflow_id=None, config=None, group_name='default')

Source code in dotflow/core/task.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def __init__(
    self,
    task_id: int,
    step: Callable,
    callback: Callable = basic_callback,
    initial_context: Any = None,
    workflow_id: UUID = None,
    config: Config = None,
    group_name: str = "default"
) -> None:
    super().__init__(
        task_id,
        step,
        callback,
        initial_context,
        workflow_id,
        config,
        group_name
    )
    self.config = config
    self.group_name = group_name
    self.task_id = task_id
    self.workflow_id = workflow_id
    self.step = step
    self.callback = callback
    self.initial_context = initial_context
    self.status = TypeStatus.NOT_STARTED

result(max=None)

Source code in dotflow/core/task.py
234
235
236
def result(self, max: int = None) -> SerializerWorkflow:
    item = self.schema(max=max).model_dump_json()
    return json.loads(item)

schema(max=None)

Source code in dotflow/core/task.py
231
232
def schema(self, max: int = None) -> SerializerTask:
    return SerializerTask(**self.__dict__, max=max)