12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126 | class TracerOpenTelemetry(Tracer):
"""
Import:
You can import the **TracerOpenTelemetry** class with:
from dotflow.providers import TracerOpenTelemetry
Example:
`class` dotflow.providers.tracer_opentelemetry.TracerOpenTelemetry
from dotflow import Config, DotFlow
from dotflow.providers import TracerOpenTelemetry
config = Config(
tracer=TracerOpenTelemetry(service_name="my-pipeline"),
)
workflow = DotFlow(config=config)
Args:
service_name (str): The service name used in the OpenTelemetry tracer.
"""
def __init__(self, service_name: str = "dotflow") -> None:
try:
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.trace import StatusCode
except ImportError as err:
raise ModuleNotFound(
module="opentelemetry", library="dotflow[otel]"
) from err
resource = Resource.create({"service.name": service_name})
provider = TracerProvider(resource=resource)
trace.set_tracer_provider(provider)
self._trace = trace
self._tracer = trace.get_tracer("dotflow")
self._status_ok = StatusCode.OK
self._status_error = StatusCode.ERROR
self._spans: dict = {}
def start_workflow(self, workflow_id: Any, **kwargs) -> None:
key = str(workflow_id)
if key not in self._spans:
span = self._tracer.start_span(name=f"workflow:{key}")
span.set_attribute("dotflow.workflow_id", key)
for k, v in kwargs.items():
span.set_attribute(f"dotflow.{k}", str(v))
self._spans[key] = span
def end_workflow(self, workflow_id: Any, **kwargs) -> None:
key = str(workflow_id)
span = self._spans.pop(key, None)
if not span:
return
failed = kwargs.get("failed", False)
if failed:
span.set_status(self._status_error, "workflow failed")
else:
span.set_status(self._status_ok)
if "duration" in kwargs:
span.set_attribute("dotflow.duration", kwargs["duration"])
span.end()
def start_task(self, task: Any) -> None:
workflow_key = str(task.workflow_id)
task_key = f"{workflow_key}:{task.task_id}"
workflow_span = self._spans.get(workflow_key)
if workflow_span:
ctx = self._trace.set_span_in_context(workflow_span)
else:
ctx = None
span = self._tracer.start_span(
name=f"task:{task.task_id}", context=ctx
)
span.set_attribute("dotflow.workflow_id", workflow_key)
span.set_attribute("dotflow.task_id", str(task.task_id))
self._spans[task_key] = span
def end_task(self, task: Any) -> None:
task_key = f"{task.workflow_id}:{task.task_id}"
span = self._spans.pop(task_key, None)
if not span:
return
span.set_attribute("dotflow.task.status", str(task.status))
if task.duration is not None:
span.set_attribute("dotflow.task.duration", task.duration)
if task.retry_count:
span.set_attribute("dotflow.task.retry_count", task.retry_count)
if task.status == TypeStatus.FAILED:
if task.errors:
last_error = task.errors[-1]
span.add_event(
"exception",
attributes={
"exception.type": str(last_error.exception),
"exception.message": str(last_error.message),
},
)
span.set_status(self._status_error, str(task.status))
else:
span.set_status(self._status_ok)
span.end()
|