StorageFile

dotflow.providers.storage_file.StorageFile

Bases: Storage

Storage

Source code in dotflow/providers/storage_file.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class StorageFile(Storage):
    """Storage"""

    def __init__(self, *args, path: str = settings.START_PATH, **kwargs):
        self.path = Path(path, "tasks")
        self.path.mkdir(parents=True, exist_ok=True)

    def post(self, key: str, context: Context) -> None:
        task_context = []

        if Path(self.path, key).exists():
            task_context = read_file(path=Path(self.path, key))

        if isinstance(context.storage, list):
            for item in context.storage:
                if isinstance(item, Context):
                    task_context.append(self._dumps(storage=item.storage))

            write_file(path=Path(self.path, key), content=task_context, mode="a")
            return None

        task_context.append(self._dumps(storage=context.storage))
        write_file(path=Path(self.path, key), content=task_context)
        return None

    def get(self, key: str) -> Context:
        task_context = []

        if Path(self.path, key).exists():
            task_context = read_file(path=Path(self.path, key))

        if len(task_context) == 1:
            return self._loads(storage=task_context[0])

        contexts = Context(storage=[])
        for context in task_context:
            contexts.storage.append(self._loads(storage=context))

        return contexts

    def key(self, task: Callable):
        return f"{task.workflow_id}-{task.task_id}.json"

    def _loads(self, storage: Any) -> Context:
        try:
            return Context(storage=loads(storage))
        except Exception:
            return Context(storage=storage)

    def _dumps(self, storage: Any) -> str:
        try:
            return dumps(storage)
        except TypeError:
            return str(storage)

path = Path(path, 'tasks') instance-attribute

__init__(*args, path=settings.START_PATH, **kwargs)

Source code in dotflow/providers/storage_file.py
16
17
18
def __init__(self, *args, path: str = settings.START_PATH, **kwargs):
    self.path = Path(path, "tasks")
    self.path.mkdir(parents=True, exist_ok=True)

_dumps(storage)

Source code in dotflow/providers/storage_file.py
62
63
64
65
66
def _dumps(self, storage: Any) -> str:
    try:
        return dumps(storage)
    except TypeError:
        return str(storage)

_loads(storage)

Source code in dotflow/providers/storage_file.py
56
57
58
59
60
def _loads(self, storage: Any) -> Context:
    try:
        return Context(storage=loads(storage))
    except Exception:
        return Context(storage=storage)

get(key)

Source code in dotflow/providers/storage_file.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def get(self, key: str) -> Context:
    task_context = []

    if Path(self.path, key).exists():
        task_context = read_file(path=Path(self.path, key))

    if len(task_context) == 1:
        return self._loads(storage=task_context[0])

    contexts = Context(storage=[])
    for context in task_context:
        contexts.storage.append(self._loads(storage=context))

    return contexts

key(task)

Source code in dotflow/providers/storage_file.py
53
54
def key(self, task: Callable):
    return f"{task.workflow_id}-{task.task_id}.json"

post(key, context)

Source code in dotflow/providers/storage_file.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def post(self, key: str, context: Context) -> None:
    task_context = []

    if Path(self.path, key).exists():
        task_context = read_file(path=Path(self.path, key))

    if isinstance(context.storage, list):
        for item in context.storage:
            if isinstance(item, Context):
                task_context.append(self._dumps(storage=item.storage))

        write_file(path=Path(self.path, key), content=task_context, mode="a")
        return None

    task_context.append(self._dumps(storage=context.storage))
    write_file(path=Path(self.path, key), content=task_context)
    return None