Skip to content

DotFlow

dotflow.core.dotflow.DotFlow

Import

You can import the Dotflow class directly from dotflow:

from dotflow import DotFlow, Config
from dotflow.providers import StorageFile
Example

class dotflow.core.dotflow.Dotflow

config = Config(
    storage=StorageFile()
)

workflow = DotFlow(config=config)

Parameters:

Name Type Description Default
config Optional[Config]

Configuration class.

None

Attributes:

Name Type Description
workflow_id UUID
task List[Task]
start Manager
Source code in dotflow/core/dotflow.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class DotFlow:
    """
    Import:
        You can import the **Dotflow** class directly from dotflow:

            from dotflow import DotFlow, Config
            from dotflow.providers import StorageFile

    Example:
        `class` dotflow.core.dotflow.Dotflow

            config = Config(
                storage=StorageFile()
            )

            workflow = DotFlow(config=config)

    Args:
        config (Optional[Config]): Configuration class.

    Attributes:
        workflow_id (UUID):

        task (List[Task]):

        start (Manager):
    """

    def __init__(
            self,
            config: Optional[Config] = None
    ) -> None:
        self.workflow_id = uuid4()
        config = config if config else Config()

        self.task = TaskBuilder(
            config=config,
            workflow_id=self.workflow_id
        )

        self.start = partial(
            Manager,
            tasks=self.task.queue,
            workflow_id=self.workflow_id
        )

    def result_task(self):
        """
        Returns:
            list (List[Task]): Returns a list of Task class.
        """
        return self.task.queue

    def result_context(self):
        """
        Returns:
            list (List[Context]): Returns a list of Context class.
        """
        return [task.current_context for task in self.task.queue]

    def result_storage(self):
        """
        Returns:
            list (List[Any]): Returns a list of assorted objects.
        """
        return [task.current_context.storage for task in self.task.queue]

    def result(self):
        return self.task.result()

workflow_id = uuid4() instance-attribute

task = TaskBuilder(config=config, workflow_id=self.workflow_id) instance-attribute

start = partial(Manager, tasks=self.task.queue, workflow_id=self.workflow_id) instance-attribute

result_task()

Returns:

Name Type Description
list List[Task]

Returns a list of Task class.

Source code in dotflow/core/dotflow.py
58
59
60
61
62
63
def result_task(self):
    """
    Returns:
        list (List[Task]): Returns a list of Task class.
    """
    return self.task.queue

result_context()

Returns:

Name Type Description
list List[Context]

Returns a list of Context class.

Source code in dotflow/core/dotflow.py
65
66
67
68
69
70
def result_context(self):
    """
    Returns:
        list (List[Context]): Returns a list of Context class.
    """
    return [task.current_context for task in self.task.queue]

result_storage()

Returns:

Name Type Description
list List[Any]

Returns a list of assorted objects.

Source code in dotflow/core/dotflow.py
72
73
74
75
76
77
def result_storage(self):
    """
    Returns:
        list (List[Any]): Returns a list of assorted objects.
    """
    return [task.current_context.storage for task in self.task.queue]