Ir para o conteúdo

Metrics OpenTelemetry

MetricsOpenTelemetry exports counters and histograms for workflow and task execution using the OpenTelemetry Metrics SDK. Use it to monitor throughput, duration, retries, and failures in Prometheus, Grafana, Datadog, or any OTLP-compatible backend.

Note

Requires pip install dotflow[otel]

Setup

pip install dotflow[otel]

Basic example

Metrics are recorded internally by the SDK — no exporter needed for the provider to work.

from dotflow import Config, DotFlow, action
from dotflow.providers import MetricsOpenTelemetry


@action
def extract():
    return {"data": "fetched"}


@action
def transform(previous_context):
    return {"result": previous_context.storage}


@action(retry=3)
def load(previous_context):
    return {"saved": previous_context.storage}


def main():
    config = Config(
        metrics=MetricsOpenTelemetry(service_name="my-pipeline"),
    )

    workflow = DotFlow(config=config)
    workflow.task.add(step=extract)
    workflow.task.add(step=transform)
    workflow.task.add(step=load)
    workflow.start()

    return workflow


if __name__ == "__main__":
    main()

Exporting metrics

To visualize metrics, add an exporter. The example below prints metrics to the console.

pip install opentelemetry-exporter-otlp-proto-grpc
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import (
    ConsoleMetricExporter,
    PeriodicExportingMetricReader,
)
from opentelemetry.sdk.resources import Resource

from dotflow import Config, DotFlow, action
from dotflow.providers import MetricsOpenTelemetry


@action
def extract():
    return {"data": "fetched"}


@action
def transform(previous_context):
    return {"result": previous_context.storage}


@action(retry=3)
def load(previous_context):
    return {"saved": previous_context.storage}


def main():
    m = MetricsOpenTelemetry(service_name="my-pipeline")

    reader = PeriodicExportingMetricReader(ConsoleMetricExporter())
    provider = metrics.get_meter_provider()
    provider._all_metric_readers.add(reader)

    config = Config(metrics=m)

    workflow = DotFlow(config=config)
    workflow.task.add(step=extract)
    workflow.task.add(step=transform)
    workflow.task.add(step=load)
    workflow.start()

    reader.force_flush()
    return workflow


if __name__ == "__main__":
    main()

Exported metrics

Metric Type When
dotflow_workflow_total Counter Workflow starts, completes, or fails
dotflow_workflow_duration_seconds Histogram Workflow completes or fails
dotflow_task_total Counter Task completes or fails
dotflow_task_duration_seconds Histogram Task completes or fails
dotflow_task_retry_total Counter Task is retried

Lifecycle

ABC method When Metric action
workflow_started Manager.init Increments workflow_total{status=started}
workflow_completed _callback_workflow (success) Increments workflow_total{status=completed}, records duration
workflow_failed _callback_workflow (failure) Increments workflow_total{status=failed}, records duration
task_completed Task status = Completed Increments task_total{status=completed}, records duration
task_failed Task status = Failed Increments task_total{status=failed}, records duration
task_retried Task status = Retry Increments task_retry_total

Compatible backends

Backend Exporter package
Prometheus opentelemetry-exporter-prometheus
Grafana opentelemetry-exporter-otlp-proto-grpc
Datadog opentelemetry-exporter-otlp-proto-grpc
Console (debug) Built-in (ConsoleMetricExporter)

References