Ir para o conteúdo

MetricsOpenTelemetry

dotflow.providers.metrics_opentelemetry.MetricsOpenTelemetry

Bases: Metrics

Import

You can import the MetricsOpenTelemetry class with:

from dotflow.providers import MetricsOpenTelemetry
Example

class dotflow.providers.metrics_opentelemetry.MetricsOpenTelemetry

from dotflow import Config, DotFlow
from dotflow.providers import MetricsOpenTelemetry

config = Config(
    metrics=MetricsOpenTelemetry(service_name="my-pipeline"),
)

workflow = DotFlow(config=config)

Parameters:

Name Type Description Default
service_name str

The service name used in the OpenTelemetry meter.

'dotflow'
Source code in dotflow/providers/metrics_opentelemetry.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class MetricsOpenTelemetry(Metrics):
    """
    Import:
        You can import the **MetricsOpenTelemetry** class with:

            from dotflow.providers import MetricsOpenTelemetry

    Example:
        `class` dotflow.providers.metrics_opentelemetry.MetricsOpenTelemetry

            from dotflow import Config, DotFlow
            from dotflow.providers import MetricsOpenTelemetry

            config = Config(
                metrics=MetricsOpenTelemetry(service_name="my-pipeline"),
            )

            workflow = DotFlow(config=config)

    Args:
        service_name (str): The service name used in the OpenTelemetry meter.
    """

    def __init__(self, service_name: str = "dotflow") -> None:
        try:
            from opentelemetry import metrics
            from opentelemetry.sdk.metrics import MeterProvider
            from opentelemetry.sdk.resources import Resource
        except ImportError as err:
            raise ModuleNotFound(
                module="opentelemetry", library="dotflow[otel]"
            ) from err

        resource = Resource.create({"service.name": service_name})
        provider = MeterProvider(resource=resource)
        metrics.set_meter_provider(provider)

        meter = metrics.get_meter("dotflow")

        self._workflow_total = meter.create_counter(
            name="dotflow_workflow_total",
            description="Total workflows executed",
        )
        self._workflow_duration = meter.create_histogram(
            name="dotflow_workflow_duration_seconds",
            description="Workflow execution duration in seconds",
            unit="s",
        )
        self._task_total = meter.create_counter(
            name="dotflow_task_total",
            description="Total tasks executed by status",
        )
        self._task_duration = meter.create_histogram(
            name="dotflow_task_duration_seconds",
            description="Task execution duration in seconds",
            unit="s",
        )
        self._task_retry_total = meter.create_counter(
            name="dotflow_task_retry_total",
            description="Total task retries",
        )

    def workflow_started(self, workflow_id: Any, **kwargs) -> None:
        self._workflow_total.add(1, {"status": "started"})

    def workflow_completed(self, workflow_id: Any, duration: float) -> None:
        self._workflow_total.add(1, {"status": "completed"})
        self._workflow_duration.record(duration, {"status": "completed"})

    def workflow_failed(self, workflow_id: Any, duration: float) -> None:
        self._workflow_total.add(1, {"status": "failed"})
        self._workflow_duration.record(duration, {"status": "failed"})

    def task_completed(self, task: Any) -> None:
        self._task_total.add(1, {"status": "completed"})
        if task.duration is not None:
            self._task_duration.record(task.duration, {"status": "completed"})

    def task_failed(self, task: Any) -> None:
        self._task_total.add(1, {"status": "failed"})
        if task.duration is not None:
            self._task_duration.record(task.duration, {"status": "failed"})

    def task_retried(self, task: Any) -> None:
        self._task_retry_total.add(1)

__init__(service_name='dotflow')

Source code in dotflow/providers/metrics_opentelemetry.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def __init__(self, service_name: str = "dotflow") -> None:
    try:
        from opentelemetry import metrics
        from opentelemetry.sdk.metrics import MeterProvider
        from opentelemetry.sdk.resources import Resource
    except ImportError as err:
        raise ModuleNotFound(
            module="opentelemetry", library="dotflow[otel]"
        ) from err

    resource = Resource.create({"service.name": service_name})
    provider = MeterProvider(resource=resource)
    metrics.set_meter_provider(provider)

    meter = metrics.get_meter("dotflow")

    self._workflow_total = meter.create_counter(
        name="dotflow_workflow_total",
        description="Total workflows executed",
    )
    self._workflow_duration = meter.create_histogram(
        name="dotflow_workflow_duration_seconds",
        description="Workflow execution duration in seconds",
        unit="s",
    )
    self._task_total = meter.create_counter(
        name="dotflow_task_total",
        description="Total tasks executed by status",
    )
    self._task_duration = meter.create_histogram(
        name="dotflow_task_duration_seconds",
        description="Task execution duration in seconds",
        unit="s",
    )
    self._task_retry_total = meter.create_counter(
        name="dotflow_task_retry_total",
        description="Total task retries",
    )

task_completed(task)

Source code in dotflow/providers/metrics_opentelemetry.py
84
85
86
87
def task_completed(self, task: Any) -> None:
    self._task_total.add(1, {"status": "completed"})
    if task.duration is not None:
        self._task_duration.record(task.duration, {"status": "completed"})

task_failed(task)

Source code in dotflow/providers/metrics_opentelemetry.py
89
90
91
92
def task_failed(self, task: Any) -> None:
    self._task_total.add(1, {"status": "failed"})
    if task.duration is not None:
        self._task_duration.record(task.duration, {"status": "failed"})

task_retried(task)

Source code in dotflow/providers/metrics_opentelemetry.py
94
95
def task_retried(self, task: Any) -> None:
    self._task_retry_total.add(1)

workflow_completed(workflow_id, duration)

Source code in dotflow/providers/metrics_opentelemetry.py
76
77
78
def workflow_completed(self, workflow_id: Any, duration: float) -> None:
    self._workflow_total.add(1, {"status": "completed"})
    self._workflow_duration.record(duration, {"status": "completed"})

workflow_failed(workflow_id, duration)

Source code in dotflow/providers/metrics_opentelemetry.py
80
81
82
def workflow_failed(self, workflow_id: Any, duration: float) -> None:
    self._workflow_total.add(1, {"status": "failed"})
    self._workflow_duration.record(duration, {"status": "failed"})

workflow_started(workflow_id, **kwargs)

Source code in dotflow/providers/metrics_opentelemetry.py
73
74
def workflow_started(self, workflow_id: Any, **kwargs) -> None:
    self._workflow_total.add(1, {"status": "started"})