Skip to content

Manager

dotflow.core.workflow.Manager

Import

You can import the Manager class with:

from dotflow.core.workflow import Manager
Example

class dotflow.core.workflow.Manager

workflow = Manager(
    tasks=[tasks],
    on_success=basic_callback,
    on_failure=basic_callback,
    keep_going=True
)

Parameters:

Name Type Description Default
tasks List[Task]

A list containing objects of type Task.

required
on_success Callable

Success function to be executed after the completion of the entire workflow. It's essentially a callback for successful scenarios.

basic_callback
on_failure Callable

Failure function to be executed after the completion of the entire workflow. It's essentially a callback for error scenarios

basic_callback
mode TypeExecution

Parameter that defines the execution mode of the workflow. Currently, there are options to execute in sequential, background, or parallel mode. The sequential mode is used by default.

SEQUENTIAL
keep_going bool

A parameter that receives a boolean object with the purpose of continuing or not the execution of the workflow in case of an error during the execution of a task. If it is true, the execution will continue; if it is False, the workflow will stop.

False
workflow_id UUID

Workflow ID.

None

Attributes:

Name Type Description
on_success Callable
on_failure Callable
workflow_id UUID
started datetime
Source code in dotflow/core/workflow.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
class Manager:
    """
    Import:
        You can import the **Manager** class with:

            from dotflow.core.workflow import Manager

    Example:
        `class` dotflow.core.workflow.Manager

            workflow = Manager(
                tasks=[tasks],
                on_success=basic_callback,
                on_failure=basic_callback,
                keep_going=True
            )

    Args:
        tasks (List[Task]):
            A list containing objects of type Task.

        on_success (Callable):
            Success function to be executed after the completion of the entire
            workflow. It's essentially a callback for successful scenarios.

        on_failure (Callable):
            Failure function to be executed after the completion of the entire
            workflow. It's essentially a callback for error scenarios

        mode (TypeExecution):
            Parameter that defines the execution mode of the workflow. Currently,
            there are options to execute in **sequential**, **background**, or **parallel** mode.
            The sequential mode is used by default.


        keep_going (bool):
            A parameter that receives a boolean object with the purpose of continuing
            or not the execution of the workflow in case of an error during the
            execution of a task. If it is **true**, the execution will continue;
            if it is **False**, the workflow will stop.

        workflow_id (UUID): Workflow ID.

    Attributes:
        on_success (Callable):

        on_failure (Callable):

        workflow_id (UUID):

        started (datetime):
    """

    def __init__(
        self,
        tasks: List[Task],
        on_success: Callable = basic_callback,
        on_failure: Callable = basic_callback,
        mode: TypeExecution = TypeExecution.SEQUENTIAL,
        keep_going: bool = False,
        workflow_id: UUID = None,
    ) -> None:
        self.tasks = tasks
        self.on_success = on_success
        self.on_failure = on_failure
        self.workflow_id = workflow_id or uuid4()
        self.started = datetime.now()

        execution = None
        groups = grouper(tasks=tasks)

        try:
            execution = getattr(self, mode)
        except AttributeError as err:
            raise ExecutionModeNotExist() from err

        self.tasks = execution(
            tasks=tasks, workflow_id=workflow_id, ignore=keep_going, groups=groups
        )

        self._callback_workflow(tasks=self.tasks)

    def _callback_workflow(self, tasks: List[Task]):
        final_status = [task.status for task in tasks]

        if TypeStatus.FAILED in final_status:
            self.on_failure(tasks=tasks)
        else:
            self.on_success(tasks=tasks)

    def sequential(self, **kwargs) -> List[Task]:
        if len(kwargs.get("groups", {})) > 1 and not is_darwin():
            process = SequentialGroup(**kwargs)
            return process.get_tasks()

        process = Sequential(**kwargs)
        return process.get_tasks()

    def sequential_group(self, **kwargs):
        process = SequentialGroup(**kwargs)
        return process.get_tasks()

    def background(self, **kwargs) -> List[Task]:
        process = Background(**kwargs)
        return process.get_tasks()

    def parallel(self, **kwargs) -> List[Task]:
        if is_darwin():
            warnings.warn(
                "Parallel mode does not work with MacOS."
                " Running tasks in sequence.",
                Warning
            )
            process = Sequential(**kwargs)
            return process.get_tasks()

        process = Parallel(**kwargs)
        return process.get_tasks()

_callback_workflow(tasks)

Source code in dotflow/core/workflow.py
120
121
122
123
124
125
126
def _callback_workflow(self, tasks: List[Task]):
    final_status = [task.status for task in tasks]

    if TypeStatus.FAILED in final_status:
        self.on_failure(tasks=tasks)
    else:
        self.on_success(tasks=tasks)

sequential(**kwargs)

Source code in dotflow/core/workflow.py
128
129
130
131
132
133
134
def sequential(self, **kwargs) -> List[Task]:
    if len(kwargs.get("groups", {})) > 1 and not is_darwin():
        process = SequentialGroup(**kwargs)
        return process.get_tasks()

    process = Sequential(**kwargs)
    return process.get_tasks()

background(**kwargs)

Source code in dotflow/core/workflow.py
140
141
142
def background(self, **kwargs) -> List[Task]:
    process = Background(**kwargs)
    return process.get_tasks()

parallel(**kwargs)

Source code in dotflow/core/workflow.py
144
145
146
147
148
149
150
151
152
153
154
155
def parallel(self, **kwargs) -> List[Task]:
    if is_darwin():
        warnings.warn(
            "Parallel mode does not work with MacOS."
            " Running tasks in sequence.",
            Warning
        )
        process = Sequential(**kwargs)
        return process.get_tasks()

    process = Parallel(**kwargs)
    return process.get_tasks()