Skip to content

Log

dotflow.abc.log.Log

Bases: ABC

Log

Base class for all log providers. Subclasses must set self._logger, self._level and self._format in their __init__.

Source code in dotflow/abc/log.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
class Log(ABC):
    """Log

    Base class for all log providers. Subclasses must set
    ``self._logger``, ``self._level`` and ``self._format``
    in their ``__init__``.
    """

    _logger: logging.Logger
    _level: int = logging.INFO
    _format: str = "simple"

    def info(self, **kwargs) -> None:
        """Info"""
        self._log(logging.INFO, **kwargs)

    def error(self, **kwargs) -> None:
        """Error"""
        self._log(logging.ERROR, **kwargs)

    def warning(self, **kwargs) -> None:
        """Warning"""
        self._log(logging.WARNING, **kwargs)

    def debug(self, **kwargs) -> None:
        """Debug"""
        self._log(logging.DEBUG, **kwargs)

    _FORMATTERS = {
        "json": "_format_json",
        "simple": "_format_text",
    }

    def _log(self, level: int, **kwargs) -> None:
        if level < self._level:
            return

        formatter = getattr(
            self, self._FORMATTERS.get(self._format, "_format_text")
        )
        task = kwargs.get("task")

        if task:
            message = formatter(level, task=task)
        else:
            message = formatter(level, **kwargs)

        self._logger.log(level, message)

    def _format_json(self, level: int, **kwargs) -> str:
        task = kwargs.get("task")
        if task:
            record = LogRecord(
                timestamp=datetime.now(timezone.utc).isoformat(),
                level=logging.getLevelName(level),
                workflow_id=str(task.workflow_id),
                task_id=str(task.task_id),
                status=str(task.status),
            )

            if task.duration is not None:
                record.duration = task.duration

            if task.retry_count:
                record.retry_count = task.retry_count

            if task.errors and level >= logging.ERROR:
                last_error = task.errors[-1]
                record.error = LogRecordError(
                    exception=str(last_error.exception),
                    message=str(last_error.message),
                )

            return record.model_dump_json(exclude_none=True)

        record = LogRecord(**{k: str(v) for k, v in kwargs.items()})
        return record.model_dump_json(exclude_none=True)

    def _format_text(self, level: int, **kwargs) -> str:
        task = kwargs.get("task")
        if task:
            message = f"ID {task.workflow_id} - {task.task_id} - {task.status}"
            if task.errors and level >= logging.ERROR:
                message += f" \n {task.errors[-1].traceback}"
            return message

        return str(kwargs)

debug(**kwargs)

Debug

Source code in dotflow/abc/log.py
60
61
62
def debug(self, **kwargs) -> None:
    """Debug"""
    self._log(logging.DEBUG, **kwargs)

error(**kwargs)

Error

Source code in dotflow/abc/log.py
52
53
54
def error(self, **kwargs) -> None:
    """Error"""
    self._log(logging.ERROR, **kwargs)

info(**kwargs)

Info

Source code in dotflow/abc/log.py
48
49
50
def info(self, **kwargs) -> None:
    """Info"""
    self._log(logging.INFO, **kwargs)

warning(**kwargs)

Warning

Source code in dotflow/abc/log.py
56
57
58
def warning(self, **kwargs) -> None:
    """Warning"""
    self._log(logging.WARNING, **kwargs)