Ir para o conteúdo

ServerDefault

dotflow.providers.server_default.ServerDefault

Bases: Server

Default Server provider with auto-detected managed mode.

Source code in dotflow/providers/server_default.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
class ServerDefault(Server):
    """Default Server provider with auto-detected managed mode."""

    MAX_RESULT_SIZE = 5_000_000
    TIMEOUT = 15.0

    ENDPOINT_WORKFLOWS = "/cli/workflows"
    ENDPOINT_WORKFLOW = "/cli/workflows/{workflow_id}"
    ENDPOINT_TASKS = "/cli/workflows/{workflow_id}/tasks"
    ENDPOINT_TASK = "/cli/workflows/{workflow_id}/tasks/{task_id}"

    def __init__(self) -> None:
        base_url = resolve(key="base_url", env_var="SERVER_BASE_URL")
        user_token = resolve(key="token", env_var="SERVER_USER_TOKEN")
        self._managed = bool(base_url and user_token)

        self._base_url = base_url.rstrip("/") if base_url else None
        self._user_token = user_token

    @property
    def _headers(self) -> dict:
        return {
            "Authorization": f"Bearer {self._user_token}",
            "Content-Type": "application/json",
        }

    def _post(self, url: str, json: dict) -> None:
        try:
            post(
                url,
                json=json,
                headers=self._headers,
                timeout=self.TIMEOUT,
            )
        except RequestException as error:
            logger.error("POST %s failed: %s", url, error)

    def _patch(self, url: str, json: dict) -> None:
        try:
            patch(
                url,
                json=json,
                headers=self._headers,
                timeout=self.TIMEOUT,
            )
        except RequestException as error:
            logger.error("PATCH %s failed: %s", url, error)

    @managed
    def create_workflow(self, workflow: Any) -> None:
        self._post(
            self._base_url + self.ENDPOINT_WORKFLOWS,
            json=workflow.result(),
        )

    @managed
    def update_workflow(self, workflow: Any, status: str = "") -> None:
        self._patch(
            self._base_url
            + self.ENDPOINT_WORKFLOW.format(workflow_id=workflow),
            json={"status": status},
        )

    @managed
    def create_task(self, task: Any) -> None:
        self._post(
            self._base_url
            + self.ENDPOINT_TASKS.format(workflow_id=task.workflow_id),
            json=task.result(max=self.MAX_RESULT_SIZE),
        )

    @managed
    def update_task(self, task: Any) -> None:
        self._patch(
            self._base_url
            + self.ENDPOINT_TASK.format(
                workflow_id=task.workflow_id,
                task_id=task.task_id,
            ),
            json=task.result(max=self.MAX_RESULT_SIZE),
        )

MAX_RESULT_SIZE = 5000000 class-attribute instance-attribute

TIMEOUT = 15.0 class-attribute instance-attribute

ENDPOINT_WORKFLOWS = '/cli/workflows' class-attribute instance-attribute

ENDPOINT_WORKFLOW = '/cli/workflows/{workflow_id}' class-attribute instance-attribute

ENDPOINT_TASKS = '/cli/workflows/{workflow_id}/tasks' class-attribute instance-attribute

ENDPOINT_TASK = '/cli/workflows/{workflow_id}/tasks/{task_id}' class-attribute instance-attribute

create_workflow(workflow)

Source code in dotflow/providers/server_default.py
74
75
76
77
78
79
@managed
def create_workflow(self, workflow: Any) -> None:
    self._post(
        self._base_url + self.ENDPOINT_WORKFLOWS,
        json=workflow.result(),
    )

update_workflow(workflow, status='')

Source code in dotflow/providers/server_default.py
81
82
83
84
85
86
87
@managed
def update_workflow(self, workflow: Any, status: str = "") -> None:
    self._patch(
        self._base_url
        + self.ENDPOINT_WORKFLOW.format(workflow_id=workflow),
        json={"status": status},
    )

create_task(task)

Source code in dotflow/providers/server_default.py
89
90
91
92
93
94
95
@managed
def create_task(self, task: Any) -> None:
    self._post(
        self._base_url
        + self.ENDPOINT_TASKS.format(workflow_id=task.workflow_id),
        json=task.result(max=self.MAX_RESULT_SIZE),
    )

update_task(task)

Source code in dotflow/providers/server_default.py
 97
 98
 99
100
101
102
103
104
105
106
@managed
def update_task(self, task: Any) -> None:
    self._patch(
        self._base_url
        + self.ENDPOINT_TASK.format(
            workflow_id=task.workflow_id,
            task_id=task.task_id,
        ),
        json=task.result(max=self.MAX_RESULT_SIZE),
    )