Skip to content

Log OpenTelemetry

LogOpenTelemetry emits structured log records using the OpenTelemetry Logs SDK. Logs are exported to any OTLP-compatible backend or printed to the console.

Use this when you want your dotflow logs to appear alongside traces and metrics in the same observability platform.

Note

Requires pip install dotflow[otel]

Setup

pip install dotflow[otel]

Parameters

Parameter Type Default Description
service_name str "dotflow" Service name used in the OTel resource
level str "INFO" Minimum log level: DEBUG, INFO, WARNING, ERROR
output str "console" Log destination: console, file, or both
path str .output/flow.log Path to the log file (used when output is file or both)
format str "simple" Message format: simple or json

Basic example

Pass output="console" to emit log records to the console — useful for local development.

from dotflow import Config, DotFlow, action
from dotflow.providers import LogOpenTelemetry


@action
def extract():
    return {"data": "fetched"}


@action
def transform(previous_context):
    return {"result": previous_context.storage}


def main():
    config = Config(
        log=LogOpenTelemetry(service_name="my-pipeline", output="console"),
    )

    workflow = DotFlow(config=config)
    workflow.task.add(step=extract)
    workflow.task.add(step=transform)
    workflow.start()

    return workflow


if __name__ == "__main__":
    main()

Exporting logs

To send logs to an OTLP-compatible backend (Loki, Datadog, Elastic, etc.), configure a provider with the OTLP exporter before creating the LogOpenTelemetry instance.

pip install opentelemetry-exporter-otlp-proto-grpc
from opentelemetry._logs import set_logger_provider
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs.export import SimpleLogRecordProcessor
from opentelemetry.sdk.resources import Resource

from dotflow import Config, DotFlow, action
from dotflow.providers import LogOpenTelemetry


@action
def extract():
    return {"data": "fetched"}


@action
def transform(previous_context):
    return {"result": previous_context.storage}


def main():
    resource = Resource.create({"service.name": "my-pipeline"})
    provider = LoggerProvider(resource=resource)
    provider.add_log_record_processor(
        SimpleLogRecordProcessor(OTLPLogExporter())
    )
    set_logger_provider(provider)

    config = Config(
        log=LogOpenTelemetry(service_name="my-pipeline"),
    )

    workflow = DotFlow(config=config)
    workflow.task.add(step=extract)
    workflow.task.add(step=transform)
    workflow.start()

    return workflow


if __name__ == "__main__":
    main()

Log levels

Level Logged when
INFO Task status changes (Not started, In progress, Completed)
WARNING Task status changes to Retry
ERROR Task status changes to Failed (includes traceback)
DEBUG Available for custom use

How it differs from LogDefault

Feature LogDefault LogOpenTelemetry
Output File / Console OTel Logs SDK + File / Console
Format Simple text or JSON Simple text or JSON
Backend Local file Loki, Datadog, Elastic, any OTLP
Correlation None Shares service.name with Tracer/Metrics

Full observability stack

Use all three OpenTelemetry providers together:

from dotflow import Config, DotFlow
from dotflow.providers import LogOpenTelemetry, TracerOpenTelemetry, MetricsOpenTelemetry

config = Config(
    log=LogOpenTelemetry(service_name="my-pipeline"),
    tracer=TracerOpenTelemetry(service_name="my-pipeline"),
    metrics=MetricsOpenTelemetry(service_name="my-pipeline"),
)

workflow = DotFlow(config=config)

References