Ir para o conteúdo

Flow

dotflow.abc.flow.Flow

Bases: ABC

Source code in dotflow/abc/flow.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class Flow(ABC):
    def __init__(
        self,
        tasks: list[Task],
        workflow_id: UUID,
        ignore: bool,
        groups: dict[str, list[Task]],
        resume: bool = False,
    ) -> None:
        self.queue = None
        self.tasks = tasks
        self.workflow_id = workflow_id
        self.ignore = ignore
        self.groups = groups
        self.resume = resume

        self.setup_queue()
        self.run()

    @abstractmethod
    def setup_queue(self) -> None:
        self.queue = []

    @abstractmethod
    def get_tasks(self) -> list[Task]:
        return self.queue

    @abstractmethod
    def _flow_callback(self, task: Task) -> None:
        self.queue.append(task)

    @abstractmethod
    def run(self) -> None:
        return None

    def _try_restore_checkpoint(self, task: Task):
        """Attempts to restore a checkpoint. Returns the Context if found, None otherwise."""
        if not self.resume:
            return None

        context = task.config.storage.get(
            key=task.config.storage.key(task=task)
        )

        if context.storage is None:
            return None

        task.status = TypeStatus.COMPLETED
        task.current_context = context
        self._flow_callback(task=task)

        return context

groups = groups instance-attribute

ignore = ignore instance-attribute

queue = None instance-attribute

resume = resume instance-attribute

tasks = tasks instance-attribute

workflow_id = workflow_id instance-attribute

__init__(tasks, workflow_id, ignore, groups, resume=False)

Source code in dotflow/abc/flow.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def __init__(
    self,
    tasks: list[Task],
    workflow_id: UUID,
    ignore: bool,
    groups: dict[str, list[Task]],
    resume: bool = False,
) -> None:
    self.queue = None
    self.tasks = tasks
    self.workflow_id = workflow_id
    self.ignore = ignore
    self.groups = groups
    self.resume = resume

    self.setup_queue()
    self.run()

get_tasks() abstractmethod

Source code in dotflow/abc/flow.py
35
36
37
@abstractmethod
def get_tasks(self) -> list[Task]:
    return self.queue

run() abstractmethod

Source code in dotflow/abc/flow.py
43
44
45
@abstractmethod
def run(self) -> None:
    return None

setup_queue() abstractmethod

Source code in dotflow/abc/flow.py
31
32
33
@abstractmethod
def setup_queue(self) -> None:
    self.queue = []