Ir para o conteúdo

TracerSentry

dotflow.providers.tracer_sentry.TracerSentry

Bases: Tracer

Import

You can import the TracerSentry class with:

from dotflow.providers import TracerSentry
Example

class dotflow.providers.tracer_sentry.TracerSentry

from dotflow import Config, DotFlow
from dotflow.providers import TracerSentry

config = Config(
    tracer=TracerSentry(),
)

workflow = DotFlow(config=config)

Parameters:

Name Type Description Default
dsn str

Sentry DSN for the project. If None, uses the SDK already initialized by LogSentry or manually.

None
environment str

Environment tag sent to Sentry. Defaults to None.

None
traces_sample_rate float

Sample rate for performance traces (0.0 to 1.0). Defaults to 1.0.

1.0
Source code in dotflow/providers/tracer_sentry.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
class TracerSentry(Tracer):
    """
    Import:
        You can import the **TracerSentry** class with:

            from dotflow.providers import TracerSentry

    Example:
        `class` dotflow.providers.tracer_sentry.TracerSentry

            from dotflow import Config, DotFlow
            from dotflow.providers import TracerSentry

            config = Config(
                tracer=TracerSentry(),
            )

            workflow = DotFlow(config=config)

    Args:
        dsn (str): Sentry DSN for the project. If None, uses the SDK
            already initialized by LogSentry or manually.
        environment (str): Environment tag sent to Sentry.
            Defaults to None.
        traces_sample_rate (float): Sample rate for performance traces (0.0 to 1.0).
            Defaults to 1.0.
    """

    def __init__(
        self,
        dsn: str | None = None,
        environment: str | None = None,
        traces_sample_rate: float = 1.0,
    ) -> None:
        try:
            import sentry_sdk
        except ImportError as err:
            raise ModuleNotFound(
                module="sentry-sdk", library="dotflow[sentry]"
            ) from err

        if dsn:
            sentry_sdk.init(
                dsn=dsn,
                environment=environment,
                traces_sample_rate=traces_sample_rate,
            )

        self._sentry = sentry_sdk
        self._transactions: dict[str, Any] = {}
        self._spans: dict[str, Any] = {}

    def start_workflow(self, workflow_id: Any, **kwargs) -> None:
        key = str(workflow_id)
        transaction = self._sentry.start_transaction(
            op="workflow",
            name=f"workflow:{key}",
        )
        transaction.set_tag("dotflow.workflow_id", key)

        for k, v in kwargs.items():
            transaction.set_tag(f"dotflow.{k}", str(v))

        self._transactions[key] = transaction

    def end_workflow(self, workflow_id: Any, **kwargs) -> None:
        key = str(workflow_id)
        transaction = self._transactions.pop(key, None)
        if not transaction:
            return

        failed = kwargs.get("failed", False)
        transaction.set_status("internal_error" if failed else "ok")

        if "duration" in kwargs:
            transaction.set_data("dotflow.duration", kwargs["duration"])

        transaction.finish()

    def start_task(self, task: Any) -> None:
        workflow_key = str(task.workflow_id)
        task_key = f"{workflow_key}:{task.task_id}"

        transaction = self._transactions.get(workflow_key)
        if not transaction:
            return

        span = transaction.start_child(
            op="task",
            name=f"task:{task.task_id}",
        )
        span.set_tag("dotflow.workflow_id", workflow_key)
        span.set_tag("dotflow.task_id", str(task.task_id))
        self._spans[task_key] = span

    def end_task(self, task: Any) -> None:
        task_key = f"{task.workflow_id}:{task.task_id}"
        span = self._spans.pop(task_key, None)
        if not span:
            return

        span.set_tag("dotflow.task.status", str(task.status))

        if task.duration is not None:
            span.set_data("dotflow.task.duration", task.duration)
        if task.retry_count:
            span.set_data("dotflow.task.retry_count", task.retry_count)

        if task.errors:
            last_error = task.errors[-1]
            span.set_data("dotflow.task.exception", str(last_error.exception))
            span.set_status("internal_error")
        else:
            span.set_status("ok")

        span.finish()

__init__(dsn=None, environment=None, traces_sample_rate=1.0)

Source code in dotflow/providers/tracer_sentry.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def __init__(
    self,
    dsn: str | None = None,
    environment: str | None = None,
    traces_sample_rate: float = 1.0,
) -> None:
    try:
        import sentry_sdk
    except ImportError as err:
        raise ModuleNotFound(
            module="sentry-sdk", library="dotflow[sentry]"
        ) from err

    if dsn:
        sentry_sdk.init(
            dsn=dsn,
            environment=environment,
            traces_sample_rate=traces_sample_rate,
        )

    self._sentry = sentry_sdk
    self._transactions: dict[str, Any] = {}
    self._spans: dict[str, Any] = {}

end_task(task)

Source code in dotflow/providers/tracer_sentry.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def end_task(self, task: Any) -> None:
    task_key = f"{task.workflow_id}:{task.task_id}"
    span = self._spans.pop(task_key, None)
    if not span:
        return

    span.set_tag("dotflow.task.status", str(task.status))

    if task.duration is not None:
        span.set_data("dotflow.task.duration", task.duration)
    if task.retry_count:
        span.set_data("dotflow.task.retry_count", task.retry_count)

    if task.errors:
        last_error = task.errors[-1]
        span.set_data("dotflow.task.exception", str(last_error.exception))
        span.set_status("internal_error")
    else:
        span.set_status("ok")

    span.finish()

end_workflow(workflow_id, **kwargs)

Source code in dotflow/providers/tracer_sentry.py
76
77
78
79
80
81
82
83
84
85
86
87
88
def end_workflow(self, workflow_id: Any, **kwargs) -> None:
    key = str(workflow_id)
    transaction = self._transactions.pop(key, None)
    if not transaction:
        return

    failed = kwargs.get("failed", False)
    transaction.set_status("internal_error" if failed else "ok")

    if "duration" in kwargs:
        transaction.set_data("dotflow.duration", kwargs["duration"])

    transaction.finish()

start_task(task)

Source code in dotflow/providers/tracer_sentry.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def start_task(self, task: Any) -> None:
    workflow_key = str(task.workflow_id)
    task_key = f"{workflow_key}:{task.task_id}"

    transaction = self._transactions.get(workflow_key)
    if not transaction:
        return

    span = transaction.start_child(
        op="task",
        name=f"task:{task.task_id}",
    )
    span.set_tag("dotflow.workflow_id", workflow_key)
    span.set_tag("dotflow.task_id", str(task.task_id))
    self._spans[task_key] = span

start_workflow(workflow_id, **kwargs)

Source code in dotflow/providers/tracer_sentry.py
63
64
65
66
67
68
69
70
71
72
73
74
def start_workflow(self, workflow_id: Any, **kwargs) -> None:
    key = str(workflow_id)
    transaction = self._sentry.start_transaction(
        op="workflow",
        name=f"workflow:{key}",
    )
    transaction.set_tag("dotflow.workflow_id", key)

    for k, v in kwargs.items():
        transaction.set_tag(f"dotflow.{k}", str(v))

    self._transactions[key] = transaction