Ir para o conteúdo

DotFlow

dotflow.core.dotflow.DotFlow

Import

You can import the Dotflow class directly from dotflow:

from dotflow import DotFlow, Config
from dotflow.providers import StorageFile
Example

class dotflow.core.dotflow.Dotflow

config = Config(
    storage=StorageFile()
)

workflow = DotFlow(config=config)

Parameters:

Name Type Description Default
config Optional[Config]

Configuration class.

None
workflow_id Optional[str]

Fixed workflow identifier for checkpoint resume. If not provided, a random UUID is generated.

None
name Optional[str]

Human-readable label sent to the managed server on registration. Defaults to the machine hostname so runs from different hosts stay distinguishable in the dashboard.

None

Attributes:

Name Type Description
workflow_id UUID
name str
task List[Task]
start Manager
schedule Scheduler
Source code in dotflow/core/dotflow.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class DotFlow:
    """
    Import:
        You can import the **Dotflow** class directly from dotflow:

            from dotflow import DotFlow, Config
            from dotflow.providers import StorageFile

    Example:
        `class` dotflow.core.dotflow.Dotflow

            config = Config(
                storage=StorageFile()
            )

            workflow = DotFlow(config=config)

    Args:
        config (Optional[Config]): Configuration class.

        workflow_id (Optional[str]): Fixed workflow identifier for checkpoint
            resume. If not provided, a random UUID is generated.

        name (Optional[str]): Human-readable label sent to the managed
            server on registration. Defaults to the machine hostname so
            runs from different hosts stay distinguishable in the
            dashboard.

    Attributes:
        workflow_id (UUID):

        name (str):

        task (List[Task]):

        start (Manager):

        schedule (Scheduler):
    """

    def __init__(
        self,
        config: Config | None = None,
        workflow_id: str | None = None,
        name: str | None = None,
    ) -> None:
        workflow_id = workflow_id or os.getenv("WORKFLOW_ID")
        self._externally_provided_id = workflow_id is not None
        self.workflow_id = workflow_id or uuid4()
        self.name = name or hostname()
        self._config = config if config else Config()
        self._manager: Manager | None = None
        self._last_run_signature: tuple = ()

        self.task = TaskBuilder(
            config=self._config,
            workflow_id=self.workflow_id,
            workflow_name=self.name,
        )

        if not self._externally_provided_id:
            self._config.server.create_workflow(workflow=self)

        self.schedule = partial(
            self._config.scheduler.start, workflow=self.start
        )

    def start(self, **kwargs) -> Manager:
        """Run the workflow once; duplicate calls return the original Manager."""
        signature = tuple(task.task_id for task in self.task.queue)
        if self._manager is not None and signature == self._last_run_signature:
            logger.warning(
                "DotFlow.start() already ran for %s; ignoring duplicate call",
                self.workflow_id,
            )
            return self._manager

        self._last_run_signature = signature
        self._manager = Manager(
            tasks=self.task.queue,
            workflow_id=self.workflow_id,
            config=self._config,
            **kwargs,
        )
        return self._manager

    def result_task(self):
        """
        Returns:
            list (List[Task]): Returns a list of Task class.
        """
        return self.task.queue

    def result_context(self):
        """
        Returns:
            list (List[Context]): Returns a list of Context class.
        """
        return [task.current_context for task in self.task.queue]

    def result_storage(self):
        """
        Returns:
            list (List[Any]): Returns a list of assorted objects.
        """
        return [task.current_context.storage for task in self.task.queue]

    def result(self) -> dict:
        """
        Returns:
            dict: Returns the full workflow result serialized as a dictionary,
                  including workflow ID and all task schemas.
        """
        return self.task.result()

workflow_id = workflow_id or uuid4() instance-attribute

task = TaskBuilder(config=(self._config), workflow_id=(self.workflow_id), workflow_name=(self.name)) instance-attribute

start(**kwargs)

Run the workflow once; duplicate calls return the original Manager.

Source code in dotflow/core/dotflow.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def start(self, **kwargs) -> Manager:
    """Run the workflow once; duplicate calls return the original Manager."""
    signature = tuple(task.task_id for task in self.task.queue)
    if self._manager is not None and signature == self._last_run_signature:
        logger.warning(
            "DotFlow.start() already ran for %s; ignoring duplicate call",
            self.workflow_id,
        )
        return self._manager

    self._last_run_signature = signature
    self._manager = Manager(
        tasks=self.task.queue,
        workflow_id=self.workflow_id,
        config=self._config,
        **kwargs,
    )
    return self._manager

result_task()

Returns:

Name Type Description
list List[Task]

Returns a list of Task class.

Source code in dotflow/core/dotflow.py
102
103
104
105
106
107
def result_task(self):
    """
    Returns:
        list (List[Task]): Returns a list of Task class.
    """
    return self.task.queue

result_context()

Returns:

Name Type Description
list List[Context]

Returns a list of Context class.

Source code in dotflow/core/dotflow.py
109
110
111
112
113
114
def result_context(self):
    """
    Returns:
        list (List[Context]): Returns a list of Context class.
    """
    return [task.current_context for task in self.task.queue]

result_storage()

Returns:

Name Type Description
list List[Any]

Returns a list of assorted objects.

Source code in dotflow/core/dotflow.py
116
117
118
119
120
121
def result_storage(self):
    """
    Returns:
        list (List[Any]): Returns a list of assorted objects.
    """
    return [task.current_context.storage for task in self.task.queue]

result()

Returns:

Name Type Description
dict dict

Returns the full workflow result serialized as a dictionary, including workflow ID and all task schemas.

Source code in dotflow/core/dotflow.py
123
124
125
126
127
128
129
def result(self) -> dict:
    """
    Returns:
        dict: Returns the full workflow result serialized as a dictionary,
              including workflow ID and all task schemas.
    """
    return self.task.result()