Ir para o conteúdo

Config

dotflow.core.config.Config

Import

You can import the Config class with:

from dotflow import Config

from dotflow.providers import (
    StorageDefault,
    NotifyDefault,
    LogDefault,
    SchedulerDefault,
)
Example

class dotflow.core.config.Config

config = Config(
    storage=StorageFile(path=".output"),
    notify=NotifyDefault(),
    log=LogDefault()
)

Parameters:

Name Type Description Default
storage Optional[Storage]

Type of the storage.

None
notify Optional[Notify]

Type of the notify.

None
log Optional[Log]

Type of the log.

None
scheduler Optional[Scheduler]

Type of the scheduler.

None
tracer Optional[Tracer]

Type of the tracer.

None
metrics Optional[Metrics]

Type of the metrics.

None

Attributes:

Name Type Description
storage Optional[Storage]
notify Optional[Notify]
log Optional[Log]
scheduler Optional[Scheduler]
tracer Optional[Tracer]
metrics Optional[Metrics]
Source code in dotflow/core/config.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
class Config:
    """
    Import:
        You can import the **Config** class with:

            from dotflow import Config

            from dotflow.providers import (
                StorageDefault,
                NotifyDefault,
                LogDefault,
                SchedulerDefault,
            )

    Example:
        `class` dotflow.core.config.Config

            config = Config(
                storage=StorageFile(path=".output"),
                notify=NotifyDefault(),
                log=LogDefault()
            )

    Args:
        storage (Optional[Storage]): Type of the storage.
        notify (Optional[Notify]): Type of the notify.
        log (Optional[Log]): Type of the log.
        scheduler (Optional[Scheduler]): Type of the scheduler.
        tracer (Optional[Tracer]): Type of the tracer.
        metrics (Optional[Metrics]): Type of the metrics.

    Attributes:
        storage (Optional[Storage]):
        notify (Optional[Notify]):
        log (Optional[Log]):
        scheduler (Optional[Scheduler]):
        tracer (Optional[Tracer]):
        metrics (Optional[Metrics]):
    """

    _PROVIDERS = {
        "storage": Storage,
        "notify": Notify,
        "log": Log,
        "server": Server,
        "scheduler": Scheduler,
        "tracer": Tracer,
        "metrics": Metrics,
    }

    def __init__(
        self,
        storage: Storage | None = None,
        notify: Notify | None = None,
        log: Log | None = None,
        server: Server | None = None,
        scheduler: Scheduler | None = None,
        tracer: Tracer | None = None,
        metrics: Metrics | None = None,
    ) -> None:
        self.storage = storage if storage is not None else StorageDefault()
        self.notify = notify if notify is not None else NotifyDefault()
        self.log = log if log is not None else LogDefault()
        self.server = server if server is not None else ServerDefault()
        self.scheduler = (
            scheduler if scheduler is not None else SchedulerDefault()
        )
        self.tracer = tracer if tracer is not None else TracerDefault()
        self.metrics = metrics if metrics is not None else MetricsDefault()

        self._validate()

    def _validate(self) -> None:
        for name, abc in self._PROVIDERS.items():
            value = getattr(self, name)
            if value is not None and not isinstance(value, abc):
                raise NotCallableObject(name=name)

storage = storage if storage is not None else StorageDefault() instance-attribute

notify = notify if notify is not None else NotifyDefault() instance-attribute

log = log if log is not None else LogDefault() instance-attribute

server = server if server is not None else ServerDefault() instance-attribute

scheduler = scheduler if scheduler is not None else SchedulerDefault() instance-attribute

tracer = tracer if tracer is not None else TracerDefault() instance-attribute

metrics = metrics if metrics is not None else MetricsDefault() instance-attribute