Ir para o conteúdo

Server

dotflow.abc.server.Server

Bases: ABC

Server ABC provider for sending workflow and task data to a remote API.

Implementations should handle HTTP communication with a server like dotflow-api, sending execution data (status, duration, errors, context) in real time.

Source code in dotflow/abc/server.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Server(ABC):
    """Server ABC provider for sending workflow and task
    data to a remote API.

    Implementations should handle HTTP communication with
    a server like dotflow-api, sending execution data
    (status, duration, errors, context) in real time.
    """

    @abstractmethod
    def create_workflow(self, workflow: Any) -> None:
        """Register a new workflow on the remote server."""

    @abstractmethod
    def update_workflow(self, workflow: Any, status: str = "") -> None:
        """Update workflow status on the remote server."""

    @abstractmethod
    def create_task(self, task: Any) -> None:
        """Register a new task on the remote server."""

    @abstractmethod
    def update_task(self, task: Any) -> None:
        """Update task data on the remote server."""

create_task(task) abstractmethod

Register a new task on the remote server.

Source code in dotflow/abc/server.py
24
25
26
@abstractmethod
def create_task(self, task: Any) -> None:
    """Register a new task on the remote server."""

create_workflow(workflow) abstractmethod

Register a new workflow on the remote server.

Source code in dotflow/abc/server.py
16
17
18
@abstractmethod
def create_workflow(self, workflow: Any) -> None:
    """Register a new workflow on the remote server."""

update_task(task) abstractmethod

Update task data on the remote server.

Source code in dotflow/abc/server.py
28
29
30
@abstractmethod
def update_task(self, task: Any) -> None:
    """Update task data on the remote server."""

update_workflow(workflow, status='') abstractmethod

Update workflow status on the remote server.

Source code in dotflow/abc/server.py
20
21
22
@abstractmethod
def update_workflow(self, workflow: Any, status: str = "") -> None:
    """Update workflow status on the remote server."""