Metrics OpenTelemetry¶
MetricsOpenTelemetry exports counters and histograms for workflow and task execution using the OpenTelemetry Metrics SDK. Use it to monitor throughput, duration, retries, and failures in Prometheus, Grafana, Datadog, or any OTLP-compatible backend.
Note
Requires pip install dotflow[otel]
Setup¶
pip install dotflow[otel]
Basic example¶
Metrics are recorded internally by the SDK — no exporter needed for the provider to work.
from dotflow import Config, DotFlow, action
from dotflow.providers import MetricsOpenTelemetry
@action
def extract():
return {"data": "fetched"}
@action
def transform(previous_context):
return {"result": previous_context.storage}
@action(retry=3)
def load(previous_context):
return {"saved": previous_context.storage}
def main():
config = Config(
metrics=MetricsOpenTelemetry(service_name="my-pipeline"),
)
workflow = DotFlow(config=config)
workflow.task.add(step=extract)
workflow.task.add(step=transform)
workflow.task.add(step=load)
workflow.start()
return workflow
if __name__ == "__main__":
main()
Exporting metrics¶
To visualize metrics, add an exporter. The example below prints metrics to the console.
pip install opentelemetry-exporter-otlp-proto-grpc
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import (
ConsoleMetricExporter,
PeriodicExportingMetricReader,
)
from opentelemetry.sdk.resources import Resource
from dotflow import Config, DotFlow, action
from dotflow.providers import MetricsOpenTelemetry
@action
def extract():
return {"data": "fetched"}
@action
def transform(previous_context):
return {"result": previous_context.storage}
@action(retry=3)
def load(previous_context):
return {"saved": previous_context.storage}
def main():
m = MetricsOpenTelemetry(service_name="my-pipeline")
reader = PeriodicExportingMetricReader(ConsoleMetricExporter())
provider = metrics.get_meter_provider()
provider._all_metric_readers.add(reader)
config = Config(metrics=m)
workflow = DotFlow(config=config)
workflow.task.add(step=extract)
workflow.task.add(step=transform)
workflow.task.add(step=load)
workflow.start()
reader.force_flush()
return workflow
if __name__ == "__main__":
main()
Exported metrics¶
| Metric | Type | When |
|---|---|---|
dotflow_workflow_total |
Counter | Workflow starts, completes, or fails |
dotflow_workflow_duration_seconds |
Histogram | Workflow completes or fails |
dotflow_task_total |
Counter | Task completes or fails |
dotflow_task_duration_seconds |
Histogram | Task completes or fails |
dotflow_task_retry_total |
Counter | Task is retried |
Lifecycle¶
| ABC method | When | Metric action |
|---|---|---|
workflow_started |
Manager.init | Increments workflow_total{status=started} |
workflow_completed |
_callback_workflow (success) | Increments workflow_total{status=completed}, records duration |
workflow_failed |
_callback_workflow (failure) | Increments workflow_total{status=failed}, records duration |
task_completed |
Task status = Completed | Increments task_total{status=completed}, records duration |
task_failed |
Task status = Failed | Increments task_total{status=failed}, records duration |
task_retried |
Task status = Retry | Increments task_retry_total |
Compatible backends¶
| Backend | Exporter package |
|---|---|
| Prometheus | opentelemetry-exporter-prometheus |
| Grafana | opentelemetry-exporter-otlp-proto-grpc |
| Datadog | opentelemetry-exporter-otlp-proto-grpc |
| Console (debug) | Built-in (ConsoleMetricExporter) |