Skip to content

TaskBuilder

dotflow.core.task.TaskBuilder

Import

You can import the Task class with:

from dotflow.core.task import TaskBuilder
Example

class dotflow.core.task.TaskBuilder

from uuid import uuid4

build = TaskBuilder(
    config=config
    workflow_id=uuid4()
)

Parameters:

Name Type Description Default
config Config

Configuration class.

required
workflow_id UUID

Workflow ID.

None
Source code in dotflow/core/task.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
class TaskBuilder:
    """
    Import:
        You can import the **Task** class with:

            from dotflow.core.task import TaskBuilder

    Example:
        `class` dotflow.core.task.TaskBuilder

            from uuid import uuid4

            build = TaskBuilder(
                config=config
                workflow_id=uuid4()
            )

    Args:
        config (Config): Configuration class.
        workflow_id (UUID): Workflow ID.
    """

    def __init__(
        self,
        config: Config,
        workflow_id: UUID = None,
        workflow_name: str | None = None,
    ) -> None:
        self.queue: list[Callable] = []
        self.workflow_id = workflow_id
        self.workflow_name = workflow_name
        self.config = config

    def add(
        self,
        step: Callable,
        callback: Callable = basic_callback,
        initial_context: Any = None,
        group_name: str = "default",
    ) -> TaskBuilder:
        """
        Args:
            step (Callable):
                A argument that receives an object of the callable type,
                which is basically a function. You can see in this
                [example](https://dotflow-io.github.io/dotflow/#3-task-function).

            callback (Callable):
                Any callable object that receives **args** or **kwargs**,
                which is basically a function. You can see in this
                [example](https://dotflow-io.github.io/dotflow/#2-callback-function).

            initial_context (Context):
                The argument exists to include initial data in the execution
                of the workflow within the **function context**. This parameter
                can be accessed internally, for example: **initial_context**,
                to retrieve this information and manipulate it if necessary,
                according to the objective of the workflow.

            group_name (str): Group name of tasks.
        """
        if isinstance(step, list):
            for inside_step in step:
                self.add(
                    step=inside_step,
                    callback=callback,
                    initial_context=initial_context,
                    group_name=group_name,
                )
            return self

        task_id = str(ULID())

        self.queue.append(
            Task(
                task_id=task_id,
                step=step,
                callback=Module(value=callback),
                initial_context=initial_context,
                workflow_id=self.workflow_id,
                config=self.config,
                group_name=group_name,
            )
        )

        return self

    def count(self) -> int:
        return len(self.queue)

    def clear(self) -> None:
        self.queue.clear()

    def reverse(self) -> None:
        self.queue.reverse()

    def schema(self) -> SerializerWorkflow:
        return SerializerWorkflow(
            workflow_id=self.workflow_id,
            workflow_name=self.workflow_name,
            tasks=[item.schema() for item in self.queue],
        )

    def result(self) -> SerializerWorkflow:
        item = self.schema().model_dump_json()
        return json.loads(item)

add(step, callback=basic_callback, initial_context=None, group_name='default')

Parameters:

Name Type Description Default
step Callable

A argument that receives an object of the callable type, which is basically a function. You can see in this example.

required
callback Callable

Any callable object that receives args or kwargs, which is basically a function. You can see in this example.

basic_callback
initial_context Context

The argument exists to include initial data in the execution of the workflow within the function context. This parameter can be accessed internally, for example: initial_context, to retrieve this information and manipulate it if necessary, according to the objective of the workflow.

None
group_name str

Group name of tasks.

'default'
Source code in dotflow/core/task.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
def add(
    self,
    step: Callable,
    callback: Callable = basic_callback,
    initial_context: Any = None,
    group_name: str = "default",
) -> TaskBuilder:
    """
    Args:
        step (Callable):
            A argument that receives an object of the callable type,
            which is basically a function. You can see in this
            [example](https://dotflow-io.github.io/dotflow/#3-task-function).

        callback (Callable):
            Any callable object that receives **args** or **kwargs**,
            which is basically a function. You can see in this
            [example](https://dotflow-io.github.io/dotflow/#2-callback-function).

        initial_context (Context):
            The argument exists to include initial data in the execution
            of the workflow within the **function context**. This parameter
            can be accessed internally, for example: **initial_context**,
            to retrieve this information and manipulate it if necessary,
            according to the objective of the workflow.

        group_name (str): Group name of tasks.
    """
    if isinstance(step, list):
        for inside_step in step:
            self.add(
                step=inside_step,
                callback=callback,
                initial_context=initial_context,
                group_name=group_name,
            )
        return self

    task_id = str(ULID())

    self.queue.append(
        Task(
            task_id=task_id,
            step=step,
            callback=Module(value=callback),
            initial_context=initial_context,
            workflow_id=self.workflow_id,
            config=self.config,
            group_name=group_name,
        )
    )

    return self

count()

Source code in dotflow/core/task.py
378
379
def count(self) -> int:
    return len(self.queue)

clear()

Source code in dotflow/core/task.py
381
382
def clear(self) -> None:
    self.queue.clear()

reverse()

Source code in dotflow/core/task.py
384
385
def reverse(self) -> None:
    self.queue.reverse()

schema()

Source code in dotflow/core/task.py
387
388
389
390
391
392
def schema(self) -> SerializerWorkflow:
    return SerializerWorkflow(
        workflow_id=self.workflow_id,
        workflow_name=self.workflow_name,
        tasks=[item.schema() for item in self.queue],
    )

result()

Source code in dotflow/core/task.py
394
395
396
def result(self) -> SerializerWorkflow:
    item = self.schema().model_dump_json()
    return json.loads(item)