Bases: Storage
In-memory storage using a dictionary.
Source code in dotflow/providers/storage_default.py
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 | class StorageDefault(Storage):
"""In-memory storage using a dictionary."""
def __init__(self):
self._store: dict[str, Context] = {}
def post(self, key: str, context: Context) -> None:
self._store[key] = context
def get(self, key: str) -> Context:
return self._store.get(key, Context())
def key(self, task: Callable) -> str:
return f"{task.workflow_id}-{task.task_id}"
def clear(self, workflow_id: str) -> None:
prefix = f"{workflow_id}-"
stale = [k for k in self._store if k.startswith(prefix)]
for key in stale:
del self._store[key]
|
__init__()
Source code in dotflow/providers/storage_default.py
| def __init__(self):
self._store: dict[str, Context] = {}
|
clear(workflow_id)
Source code in dotflow/providers/storage_default.py
| def clear(self, workflow_id: str) -> None:
prefix = f"{workflow_id}-"
stale = [k for k in self._store if k.startswith(prefix)]
for key in stale:
del self._store[key]
|
get(key)
Source code in dotflow/providers/storage_default.py
| def get(self, key: str) -> Context:
return self._store.get(key, Context())
|
key(task)
Source code in dotflow/providers/storage_default.py
| def key(self, task: Callable) -> str:
return f"{task.workflow_id}-{task.task_id}"
|
post(key, context)
Source code in dotflow/providers/storage_default.py
| def post(self, key: str, context: Context) -> None:
self._store[key] = context
|