Tracer OpenTelemetry¶
TracerOpenTelemetry exports distributed traces for every workflow and task execution using the OpenTelemetry standard. Each workflow becomes a parent span, each task becomes a child span with duration, status, retry events, and error details.
Note
Requires pip install dotflow[otel]
Setup¶
pip install dotflow[otel]
Basic example¶
Traces are created internally by the SDK — no exporter is needed for the provider to work.
from dotflow import Config, DotFlow, action
from dotflow.providers import TracerOpenTelemetry
@action
def extract():
return {"data": "fetched"}
@action
def transform(previous_context):
return {"result": previous_context.storage}
@action(retry=3)
def load(previous_context):
return {"saved": previous_context.storage}
def main():
config = Config(
tracer=TracerOpenTelemetry(service_name="my-pipeline"),
)
workflow = DotFlow(config=config)
workflow.task.add(step=extract)
workflow.task.add(step=transform)
workflow.task.add(step=load)
workflow.start()
return workflow
if __name__ == "__main__":
main()
Exporting traces¶
To visualize traces, add an exporter. The example below sends traces to the console and to any OTLP-compatible backend (Jaeger, Tempo, Datadog, etc.).
pip install opentelemetry-exporter-otlp-proto-grpc
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
from dotflow import Config, DotFlow, action
from dotflow.providers import TracerOpenTelemetry
@action
def extract():
return {"data": "fetched"}
@action
def transform(previous_context):
return {"result": previous_context.storage}
@action(retry=3)
def load(previous_context):
return {"saved": previous_context.storage}
def main():
tracer = TracerOpenTelemetry(service_name="my-pipeline")
provider = trace.get_tracer_provider()
provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter()))
config = Config(tracer=tracer)
workflow = DotFlow(config=config)
workflow.task.add(step=extract)
workflow.task.add(step=transform)
workflow.task.add(step=load)
workflow.start()
return workflow
if __name__ == "__main__":
main()
Running Jaeger locally¶
docker run -d --name jaeger -p 16686:16686 -p 4317:4317 jaegertracing/all-in-one:latest
Run the example and open http://localhost:16686 to see traces.
Trace structure¶
[Trace: workflow_id=550e8400...]
└── [Span: task:0] 0.5s OK — Completed
└── [Span: task:1] 1.2s OK — Completed
└── [Span: task:2] 0.3s ERROR — Failed
└── [Event: exception — ValueError "connection refused"]
Span attributes¶
| Attribute | Source |
|---|---|
dotflow.workflow_id |
task.workflow_id |
dotflow.task_id |
task.task_id |
dotflow.task.status |
task.status |
dotflow.task.duration |
task.duration |
dotflow.task.retry_count |
task.retry_count |
Lifecycle¶
| ABC method | When | Span action |
|---|---|---|
start_workflow |
Manager.init | Creates parent span |
start_task |
TaskEngine.start() | Creates child span |
end_task |
TaskEngine.start() finally | Sets attributes, status, ends span |
end_workflow |
_callback_workflow | Sets workflow status, ends parent span |
Compatible backends¶
| Backend | Exporter package |
|---|---|
| Jaeger | opentelemetry-exporter-otlp-proto-grpc |
| Grafana Tempo | opentelemetry-exporter-otlp-proto-grpc |
| Datadog | opentelemetry-exporter-otlp-proto-grpc |
| Honeycomb | opentelemetry-exporter-otlp-proto-grpc |
| New Relic | opentelemetry-exporter-otlp-proto-grpc |
| AWS X-Ray | opentelemetry-exporter-otlp-proto-grpc |
| Google Cloud Trace | opentelemetry-exporter-gcp-trace |
| Console (debug) | Built-in (ConsoleSpanExporter) |