Custom Metrics¶
The Metrics provider records counters and histograms for workflow and task execution.
Methods¶
workflow_started(workflow_id, **kwargs)— increment workflow counterworkflow_completed(workflow_id, duration)— record successful workflow durationworkflow_failed(workflow_id, duration)— record failed workflow durationtask_completed(task)— increment task success countertask_failed(task)— increment task failure countertask_retried(task)— increment retry counter
Example¶
from typing import Any
from dotflow.abc.metrics import Metrics
class MetricsStatsd(Metrics):
def __init__(self, host: str = "localhost", port: int = 8125):
from statsd import StatsClient
self._client = StatsClient(host, port)
def workflow_started(self, workflow_id: Any, **kwargs) -> None:
self._client.incr("dotflow.workflow.started")
def workflow_completed(self, workflow_id: Any, duration: float) -> None:
self._client.incr("dotflow.workflow.completed")
self._client.timing("dotflow.workflow.duration", duration * 1000)
def workflow_failed(self, workflow_id: Any, duration: float) -> None:
self._client.incr("dotflow.workflow.failed")
def task_completed(self, task: Any) -> None:
self._client.incr("dotflow.task.completed")
def task_failed(self, task: Any) -> None:
self._client.incr("dotflow.task.failed")
def task_retried(self, task: Any) -> None:
self._client.incr("dotflow.task.retried")