Ir para o conteúdo

NotifyTelegram

dotflow.providers.notify_telegram.NotifyTelegram

Bases: Notify

Import

You can import the NotifyTelegram class with:

from dotflow.providers import NotifyTelegram
Example

class dotflow.providers.notify_telegram.NotifyTelegram

from dotflow import Config, DotFlow
from dotflow.providers import NotifyTelegram
from dotflow.core.types.status import TypeStatus

config = Config(
    notify=NotifyTelegram(
        token="YOUR_BOT_TOKEN",
        chat_id=123456789,
        notification_type=TypeStatus.FAILED,
    )
)

workflow = DotFlow(config=config)

Parameters:

Name Type Description Default
token str

Telegram bot token from BotFather.

required
chat_id int

Telegram chat ID to send messages to.

required
notification_type Optional[TypeStatus]

Filter notifications by task status. If None, all statuses are notified.

None
show_result bool

Include task result in the notification. Defaults to False.

False
timeout float

Request timeout in seconds.

1.5
Source code in dotflow/providers/notify_telegram.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
class NotifyTelegram(Notify):
    """
    Import:
        You can import the **NotifyTelegram** class with:

            from dotflow.providers import NotifyTelegram

    Example:
        `class` dotflow.providers.notify_telegram.NotifyTelegram

            from dotflow import Config, DotFlow
            from dotflow.providers import NotifyTelegram
            from dotflow.core.types.status import TypeStatus

            config = Config(
                notify=NotifyTelegram(
                    token="YOUR_BOT_TOKEN",
                    chat_id=123456789,
                    notification_type=TypeStatus.FAILED,
                )
            )

            workflow = DotFlow(config=config)

    Args:
        token (str): Telegram bot token from BotFather.

        chat_id (int): Telegram chat ID to send messages to.

        notification_type (Optional[TypeStatus]): Filter notifications
            by task status. If None, all statuses are notified.

        show_result (bool): Include task result in the notification.
            Defaults to False.

        timeout (float): Request timeout in seconds.
    """

    API_URL = "https://api.telegram.org/bot{token}/sendMessage"

    def __init__(
        self,
        token: str,
        chat_id: int,
        notification_type: TypeStatus | None = None,
        show_result: bool = False,
        timeout: float = 1.5,
    ):
        self.token = token
        self.chat_id = chat_id
        self.notification_type = notification_type
        self.show_result = show_result
        self.timeout = timeout

    def hook_status_task(self, task: Any) -> None:
        if self.notification_type and self.notification_type != task.status:
            return

        try:
            response = post(
                url=self.API_URL.format(token=self.token),
                headers={"Content-Type": "application/json"},
                data=dumps(
                    {
                        "chat_id": self.chat_id,
                        "text": self._build_message(task),
                        "parse_mode": "markdown",
                    }
                ),
                timeout=self.timeout,
            )
            response.raise_for_status()
        except Exception as error:
            logger.error(
                "Internal problem sending notification on Telegram: %s",
                str(error),
            )

    def _build_message(self, task: Any) -> str:
        symbol = TypeStatus.get_symbol(task.status)
        header = f"{symbol} {task.status}"
        footer = f"`{task.workflow_id}` — Task {task.task_id}"

        parts = [header]

        if self.show_result:
            parts.append(f"```json\n{task.result(max=4000)}```")

        if task.status == TypeStatus.FAILED and task.errors:
            last_error = task.errors[-1]
            parts.append(f"`{last_error.exception}`: {last_error.message}")

        parts.append(footer)

        return "\n".join(parts)

API_URL = 'https://api.telegram.org/bot{token}/sendMessage' class-attribute instance-attribute

chat_id = chat_id instance-attribute

notification_type = notification_type instance-attribute

show_result = show_result instance-attribute

timeout = timeout instance-attribute

token = token instance-attribute

__init__(token, chat_id, notification_type=None, show_result=False, timeout=1.5)

Source code in dotflow/providers/notify_telegram.py
55
56
57
58
59
60
61
62
63
64
65
66
67
def __init__(
    self,
    token: str,
    chat_id: int,
    notification_type: TypeStatus | None = None,
    show_result: bool = False,
    timeout: float = 1.5,
):
    self.token = token
    self.chat_id = chat_id
    self.notification_type = notification_type
    self.show_result = show_result
    self.timeout = timeout

hook_status_task(task)

Source code in dotflow/providers/notify_telegram.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def hook_status_task(self, task: Any) -> None:
    if self.notification_type and self.notification_type != task.status:
        return

    try:
        response = post(
            url=self.API_URL.format(token=self.token),
            headers={"Content-Type": "application/json"},
            data=dumps(
                {
                    "chat_id": self.chat_id,
                    "text": self._build_message(task),
                    "parse_mode": "markdown",
                }
            ),
            timeout=self.timeout,
        )
        response.raise_for_status()
    except Exception as error:
        logger.error(
            "Internal problem sending notification on Telegram: %s",
            str(error),
        )