Ir para o conteúdo

StorageDefault

dotflow.providers.storage_default.StorageDefault

Bases: Storage

In-memory storage using a dictionary.

Source code in dotflow/providers/storage_default.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class StorageDefault(Storage):
    """In-memory storage using a dictionary."""

    def __init__(self):
        self._store: dict[str, Context] = {}

    def post(self, key: str, context: Context) -> None:
        self._store[key] = context

    def get(self, key: str) -> Context:
        return self._store.get(key, Context())

    def key(self, task: Callable) -> str:
        return f"{task.workflow_id}-{task.task_id}"

    def clear(self, workflow_id: str) -> None:
        prefix = f"{workflow_id}-"
        stale = [k for k in self._store if k.startswith(prefix)]

        for key in stale:
            del self._store[key]

__init__()

Source code in dotflow/providers/storage_default.py
12
13
def __init__(self):
    self._store: dict[str, Context] = {}

clear(workflow_id)

Source code in dotflow/providers/storage_default.py
24
25
26
27
28
29
def clear(self, workflow_id: str) -> None:
    prefix = f"{workflow_id}-"
    stale = [k for k in self._store if k.startswith(prefix)]

    for key in stale:
        del self._store[key]

get(key)

Source code in dotflow/providers/storage_default.py
18
19
def get(self, key: str) -> Context:
    return self._store.get(key, Context())

key(task)

Source code in dotflow/providers/storage_default.py
21
22
def key(self, task: Callable) -> str:
    return f"{task.workflow_id}-{task.task_id}"

post(key, context)

Source code in dotflow/providers/storage_default.py
15
16
def post(self, key: str, context: Context) -> None:
    self._store[key] = context