Log OpenTelemetry¶
LogOpenTelemetry emits structured log records using the OpenTelemetry Logs SDK. Logs are exported to any OTLP-compatible backend or printed to the console.
Use this when you want your dotflow logs to appear alongside traces and metrics in the same observability platform.
Note
Requires pip install dotflow[otel]
Setup¶
pip install dotflow[otel]
Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
service_name |
str |
"dotflow" |
Service name used in the OTel resource |
level |
str |
"INFO" |
Minimum log level: DEBUG, INFO, WARNING, ERROR |
output |
str |
"console" |
Log destination: console, file, or both |
path |
str |
.output/flow.log |
Path to the log file (used when output is file or both) |
format |
str |
"simple" |
Message format: simple or json |
Basic example¶
Pass output="console" to emit log records to the console — useful for local development.
from dotflow import Config, DotFlow, action
from dotflow.providers import LogOpenTelemetry
@action
def extract():
return {"data": "fetched"}
@action
def transform(previous_context):
return {"result": previous_context.storage}
def main():
config = Config(
log=LogOpenTelemetry(service_name="my-pipeline", output="console"),
)
workflow = DotFlow(config=config)
workflow.task.add(step=extract)
workflow.task.add(step=transform)
workflow.start()
return workflow
if __name__ == "__main__":
main()
Exporting logs¶
To send logs to an OTLP-compatible backend (Loki, Datadog, Elastic, etc.), configure a provider with the OTLP exporter before creating the LogOpenTelemetry instance.
pip install opentelemetry-exporter-otlp-proto-grpc
from opentelemetry._logs import set_logger_provider
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs.export import SimpleLogRecordProcessor
from opentelemetry.sdk.resources import Resource
from dotflow import Config, DotFlow, action
from dotflow.providers import LogOpenTelemetry
@action
def extract():
return {"data": "fetched"}
@action
def transform(previous_context):
return {"result": previous_context.storage}
def main():
resource = Resource.create({"service.name": "my-pipeline"})
provider = LoggerProvider(resource=resource)
provider.add_log_record_processor(
SimpleLogRecordProcessor(OTLPLogExporter())
)
set_logger_provider(provider)
config = Config(
log=LogOpenTelemetry(service_name="my-pipeline"),
)
workflow = DotFlow(config=config)
workflow.task.add(step=extract)
workflow.task.add(step=transform)
workflow.start()
return workflow
if __name__ == "__main__":
main()
Log levels¶
| Level | Logged when |
|---|---|
INFO |
Task status changes (Not started, In progress, Completed) |
WARNING |
Task status changes to Retry |
ERROR |
Task status changes to Failed (includes traceback) |
DEBUG |
Available for custom use |
How it differs from LogDefault¶
| Feature | LogDefault | LogOpenTelemetry |
|---|---|---|
| Output | File / Console | OTel Logs SDK + File / Console |
| Format | Simple text or JSON | Simple text or JSON |
| Backend | Local file | Loki, Datadog, Elastic, any OTLP |
| Correlation | None | Shares service.name with Tracer/Metrics |
Full observability stack¶
Use all three OpenTelemetry providers together:
from dotflow import Config, DotFlow
from dotflow.providers import LogOpenTelemetry, TracerOpenTelemetry, MetricsOpenTelemetry
config = Config(
log=LogOpenTelemetry(service_name="my-pipeline"),
tracer=TracerOpenTelemetry(service_name="my-pipeline"),
metrics=MetricsOpenTelemetry(service_name="my-pipeline"),
)
workflow = DotFlow(config=config)