Skip to content

NotifyTelegram

dotflow.providers.notify_telegram.NotifyTelegram

Bases: Notify

Source code in dotflow/providers/notify_telegram.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class NotifyTelegram(Notify):

    MESSAGE = "{symbol} {status}\n```json\n{task}```\n{workflow_id}-{task_id}"
    API_TELEGRAM = "https://api.telegram.org/bot{token}/sendMessage"

    def __init__(
        self,
        token: str,
        chat_id: int,
        notification_type: Optional[TypeStatus] = None,
        timeout: int = 1.5
    ):
        self.token = token
        self.chat_id = chat_id
        self.notification_type = notification_type
        self.timeout = timeout

    def send(self, task: Any) -> None:
        if not self.notification_type or self.notification_type == task.status:
            data = {
                "chat_id": self.chat_id,
                "text": self._get_text(task=task),
                "parse_mode": "markdown",
            }
            try:
                response = post(
                    url=self.API_TELEGRAM.format(token=self.token),
                    headers={"Content-Type": "application/json"},
                    data=dumps(data),
                    timeout=self.timeout
                )
                response.raise_for_status()
            except Exception as error:
                logger.error(
                    "Internal problem sending notification on Telegram: %s",
                    str(error),
                )

    def _get_text(self, task: Any) -> str:
        return self.MESSAGE.format(
            symbol=TypeStatus.get_symbol(task.status),
            status=task.status,
            workflow_id=task.workflow_id,
            task_id=task.task_id,
            task=task.result(max=4000),
        )

API_TELEGRAM = 'https://api.telegram.org/bot{token}/sendMessage' class-attribute instance-attribute

MESSAGE = '{symbol} {status}\n```json\n{task}```\n{workflow_id}-{task_id}' class-attribute instance-attribute

chat_id = chat_id instance-attribute

notification_type = notification_type instance-attribute

timeout = timeout instance-attribute

token = token instance-attribute

__init__(token, chat_id, notification_type=None, timeout=1.5)

Source code in dotflow/providers/notify_telegram.py
18
19
20
21
22
23
24
25
26
27
28
def __init__(
    self,
    token: str,
    chat_id: int,
    notification_type: Optional[TypeStatus] = None,
    timeout: int = 1.5
):
    self.token = token
    self.chat_id = chat_id
    self.notification_type = notification_type
    self.timeout = timeout

send(task)

Source code in dotflow/providers/notify_telegram.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def send(self, task: Any) -> None:
    if not self.notification_type or self.notification_type == task.status:
        data = {
            "chat_id": self.chat_id,
            "text": self._get_text(task=task),
            "parse_mode": "markdown",
        }
        try:
            response = post(
                url=self.API_TELEGRAM.format(token=self.token),
                headers={"Content-Type": "application/json"},
                data=dumps(data),
                timeout=self.timeout
            )
            response.raise_for_status()
        except Exception as error:
            logger.error(
                "Internal problem sending notification on Telegram: %s",
                str(error),
            )