Ir para o conteúdo

Tracer OpenTelemetry

TracerOpenTelemetry exports distributed traces for every workflow and task execution using the OpenTelemetry standard. Each workflow becomes a parent span, each task becomes a child span with duration, status, retry events, and error details.

Note

Requires pip install dotflow[otel]

Setup

pip install dotflow[otel]

Basic example

Traces are created internally by the SDK — no exporter is needed for the provider to work.

from dotflow import Config, DotFlow, action
from dotflow.providers import TracerOpenTelemetry


@action
def extract():
    return {"data": "fetched"}


@action
def transform(previous_context):
    return {"result": previous_context.storage}


@action(retry=3)
def load(previous_context):
    return {"saved": previous_context.storage}


def main():
    config = Config(
        tracer=TracerOpenTelemetry(service_name="my-pipeline"),
    )

    workflow = DotFlow(config=config)
    workflow.task.add(step=extract)
    workflow.task.add(step=transform)
    workflow.task.add(step=load)
    workflow.start()

    return workflow


if __name__ == "__main__":
    main()

Exporting traces

To visualize traces, add an exporter. The example below sends traces to the console and to any OTLP-compatible backend (Jaeger, Tempo, Datadog, etc.).

pip install opentelemetry-exporter-otlp-proto-grpc
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor

from dotflow import Config, DotFlow, action
from dotflow.providers import TracerOpenTelemetry


@action
def extract():
    return {"data": "fetched"}


@action
def transform(previous_context):
    return {"result": previous_context.storage}


@action(retry=3)
def load(previous_context):
    return {"saved": previous_context.storage}


def main():
    tracer = TracerOpenTelemetry(service_name="my-pipeline")

    provider = trace.get_tracer_provider()
    provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
    provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter()))

    config = Config(tracer=tracer)

    workflow = DotFlow(config=config)
    workflow.task.add(step=extract)
    workflow.task.add(step=transform)
    workflow.task.add(step=load)
    workflow.start()

    return workflow


if __name__ == "__main__":
    main()

Running Jaeger locally

docker run -d --name jaeger -p 16686:16686 -p 4317:4317 jaegertracing/all-in-one:latest

Run the example and open http://localhost:16686 to see traces.

Trace structure

[Trace: workflow_id=550e8400...]
  └── [Span: task:0] 0.5s OK — Completed
  └── [Span: task:1] 1.2s OK — Completed
  └── [Span: task:2] 0.3s ERROR — Failed
        └── [Event: exception — ValueError "connection refused"]

Span attributes

Attribute Source
dotflow.workflow_id task.workflow_id
dotflow.task_id task.task_id
dotflow.task.status task.status
dotflow.task.duration task.duration
dotflow.task.retry_count task.retry_count

Lifecycle

ABC method When Span action
start_workflow Manager.init Creates parent span
start_task TaskEngine.start() Creates child span
end_task TaskEngine.start() finally Sets attributes, status, ends span
end_workflow _callback_workflow Sets workflow status, ends parent span

Compatible backends

Backend Exporter package
Jaeger opentelemetry-exporter-otlp-proto-grpc
Grafana Tempo opentelemetry-exporter-otlp-proto-grpc
Datadog opentelemetry-exporter-otlp-proto-grpc
Honeycomb opentelemetry-exporter-otlp-proto-grpc
New Relic opentelemetry-exporter-otlp-proto-grpc
AWS X-Ray opentelemetry-exporter-otlp-proto-grpc
Google Cloud Trace opentelemetry-exporter-gcp-trace
Console (debug) Built-in (ConsoleSpanExporter)

References