Workflow

dotflow.core.workflow.Workflow

Import

You can import the Workflow class with:

from dotflow.core.workflow import Workflow
Example

class dotflow.core.workflow.Workflow

workflow = Workflow(
    tasks=[tasks],
    success=basic_callback,
    failure=basic_callback,
    keep_going=True
)

Parameters:

Name Type Description Default
tasks List[Task]
required
success Callable
basic_callback
failure Callable
basic_callback
keep_going bool

A parameter that receives a boolean object with the purpose of continuing or not the execution of the workflow in case of an error during the execution of a task. If it is true, the execution will continue; if it is False, the workflow will stop.

False
mode TypeExecution

A parameter for assigning the execution mode of the workflow. Currently, there is the option to execute in sequential mode or background mode. By default, it is in sequential mode.

SEQUENTIAL
workflow_id UUID
None

Attributes:

Name Type Description
workflow_id UUID
started datetime
tasks List[Task]
success Callable
failure Callable
Source code in dotflow/core/workflow.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class Workflow:
    """
    Import:
        You can import the **Workflow** class with:

            from dotflow.core.workflow import Workflow

    Example:
        `class` dotflow.core.workflow.Workflow

            workflow = Workflow(
                tasks=[tasks],
                success=basic_callback,
                failure=basic_callback,
                keep_going=True
            )

    Args:
        tasks (List[Task]):

        success (Callable):

        failure (Callable):

        keep_going (bool):
            A parameter that receives a boolean object with the purpose of continuing
            or not the execution of the workflow in case of an error during the
            execution of a task. If it is **true**, the execution will continue;
            if it is **False**, the workflow will stop.

        mode (TypeExecution):
            A parameter for assigning the execution mode of the workflow. Currently,
            there is the option to execute in **sequential** mode or **background** mode.
            By default, it is in **sequential** mode.

        workflow_id (UUID):

    Attributes:
        workflow_id (UUID):
        started (datetime):
        tasks (List[Task]):
        success (Callable):
        failure (Callable):
    """

    def __init__(
        self,
        tasks: List[Task],
        success: Callable = basic_callback,
        failure: Callable = basic_callback,
        keep_going: bool = False,
        mode: TypeExecution = TypeExecution.SEQUENTIAL,
        workflow_id: UUID = None
    ) -> None:
        self.workflow_id = workflow_id or uuid4()
        self.started = datetime.now()
        self.tasks = tasks
        self.success = success
        self.failure = failure

        try:
            getattr(self, mode)(keep_going=keep_going)
        except AttributeError as err:
            raise ExecutionModeNotExist() from err

    def _callback_workflow(self, tasks: Task):
        final_status = [task.status for task in tasks]
        if TaskStatus.FAILED in final_status:
            self.failure(tasks=tasks)
        else:
            self.success(tasks=tasks)

    def sequential(self, keep_going: bool = False):
        """Sequential"""
        previous_context = Context(
            task_id=0,
            workflow_id=self.workflow_id
        )

        for task in self.tasks:
            Execution(
                task=task,
                workflow_id=self.workflow_id,
                previous_context=previous_context
            )

            previous_context = task.config.storage.get(
                key=task.config.storage.key(task=task)
            )

            if not keep_going:
                if task.status == TaskStatus.FAILED:
                    break

        self._callback_workflow(tasks=self.tasks)
        return self.tasks

    def background(self, keep_going: bool = False):
        """Background"""
        th = threading.Thread(target=self.sequential, args=[keep_going])
        th.start()

    def parallel(self, keep_going: bool = False):
        """Not implemented"""

    def data_store(self, keep_going: bool = False):
        """Not implemented"""

_callback_workflow(tasks)

Source code in dotflow/core/workflow.py
82
83
84
85
86
87
def _callback_workflow(self, tasks: Task):
    final_status = [task.status for task in tasks]
    if TaskStatus.FAILED in final_status:
        self.failure(tasks=tasks)
    else:
        self.success(tasks=tasks)

sequential(keep_going=False)

Sequential

Source code in dotflow/core/workflow.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def sequential(self, keep_going: bool = False):
    """Sequential"""
    previous_context = Context(
        task_id=0,
        workflow_id=self.workflow_id
    )

    for task in self.tasks:
        Execution(
            task=task,
            workflow_id=self.workflow_id,
            previous_context=previous_context
        )

        previous_context = task.config.storage.get(
            key=task.config.storage.key(task=task)
        )

        if not keep_going:
            if task.status == TaskStatus.FAILED:
                break

    self._callback_workflow(tasks=self.tasks)
    return self.tasks

background(keep_going=False)

Background

Source code in dotflow/core/workflow.py
114
115
116
117
def background(self, keep_going: bool = False):
    """Background"""
    th = threading.Thread(target=self.sequential, args=[keep_going])
    th.start()

parallel(keep_going=False)

Not implemented

Source code in dotflow/core/workflow.py
119
120
def parallel(self, keep_going: bool = False):
    """Not implemented"""

data_store(keep_going=False)

Not implemented

Source code in dotflow/core/workflow.py
122
123
def data_store(self, keep_going: bool = False):
    """Not implemented"""