Ir para o conteúdo

StorageFile

dotflow.providers.storage_file.StorageFile

Bases: Storage

Storage

Source code in dotflow/providers/storage_file.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class StorageFile(Storage):
    """Storage"""

    def __init__(self, *args, path: str = settings.START_PATH, **kwargs):
        self.path = Path(path, "tasks")
        self.path.mkdir(parents=True, exist_ok=True)

    def post(self, key: str, context: Context) -> None:
        task_context = []

        if Path(self.path, key).exists():
            data = read_file(path=Path(self.path, key))
            if isinstance(data, list):
                task_context = data

        if isinstance(context.storage, list):
            for item in context.storage:
                if isinstance(item, Context):
                    task_context.append(self._dumps(storage=item.storage))
        else:
            task_context.append(self._dumps(storage=context.storage))

        write_file(path=Path(self.path, key), content=task_context)
        return None

    def get(self, key: str) -> Context:
        task_context = []

        if Path(self.path, key).exists():
            data = read_file(path=Path(self.path, key))
            if isinstance(data, list):
                task_context = data

        if not task_context:
            return Context()

        if len(task_context) == 1:
            return self._loads(storage=task_context[0])

        contexts = Context(storage=[])
        for context in task_context:
            contexts.storage.append(self._loads(storage=context))

        return contexts

    def key(self, task: Callable):
        return f"{task.workflow_id}-{task.task_id}.json"

    def clear(self, workflow_id: str) -> None:
        prefix = f"{workflow_id}-"

        for entry in self.path.iterdir():
            if entry.is_file() and entry.name.startswith(prefix):
                entry.unlink(missing_ok=True)

    def _loads(self, storage: Any) -> Context:
        try:
            return Context(storage=loads(storage))
        except Exception:
            return Context(storage=storage)

    def _dumps(self, storage: Any) -> str:
        try:
            return dumps(storage)
        except TypeError:
            return str(storage)

path = Path(path, 'tasks') instance-attribute

__init__(*args, path=settings.START_PATH, **kwargs)

Source code in dotflow/providers/storage_file.py
17
18
19
def __init__(self, *args, path: str = settings.START_PATH, **kwargs):
    self.path = Path(path, "tasks")
    self.path.mkdir(parents=True, exist_ok=True)

clear(workflow_id)

Source code in dotflow/providers/storage_file.py
62
63
64
65
66
67
def clear(self, workflow_id: str) -> None:
    prefix = f"{workflow_id}-"

    for entry in self.path.iterdir():
        if entry.is_file() and entry.name.startswith(prefix):
            entry.unlink(missing_ok=True)

get(key)

Source code in dotflow/providers/storage_file.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def get(self, key: str) -> Context:
    task_context = []

    if Path(self.path, key).exists():
        data = read_file(path=Path(self.path, key))
        if isinstance(data, list):
            task_context = data

    if not task_context:
        return Context()

    if len(task_context) == 1:
        return self._loads(storage=task_context[0])

    contexts = Context(storage=[])
    for context in task_context:
        contexts.storage.append(self._loads(storage=context))

    return contexts

key(task)

Source code in dotflow/providers/storage_file.py
59
60
def key(self, task: Callable):
    return f"{task.workflow_id}-{task.task_id}.json"

post(key, context)

Source code in dotflow/providers/storage_file.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def post(self, key: str, context: Context) -> None:
    task_context = []

    if Path(self.path, key).exists():
        data = read_file(path=Path(self.path, key))
        if isinstance(data, list):
            task_context = data

    if isinstance(context.storage, list):
        for item in context.storage:
            if isinstance(item, Context):
                task_context.append(self._dumps(storage=item.storage))
    else:
        task_context.append(self._dumps(storage=context.storage))

    write_file(path=Path(self.path, key), content=task_context)
    return None