Ir para o conteúdo

StorageGCS

dotflow.providers.storage_gcs.StorageGCS

Bases: Storage

Import

You can import the StorageGCS class directly from dotflow providers:

from dotflow.providers import StorageGCS
Example

class dotflow.providers.storage_gcs.StorageGCS

from dotflow import Config
from dotflow.providers import StorageGCS

config = Config(
    storage=StorageGCS(
        bucket="my-dotflow-bucket",
        prefix="workflows/",
        project="my-gcp-project"
    )
)

Parameters:

Name Type Description Default
bucket str

GCS bucket name.

required
prefix str

Key prefix for all stored objects.

'dotflow/'
project str

GCP project ID. Defaults to ADC project.

None
Source code in dotflow/providers/storage_gcs.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class StorageGCS(Storage):
    """
    Import:
        You can import the **StorageGCS** class directly from dotflow providers:

            from dotflow.providers import StorageGCS

    Example:
        `class` dotflow.providers.storage_gcs.StorageGCS

            from dotflow import Config
            from dotflow.providers import StorageGCS

            config = Config(
                storage=StorageGCS(
                    bucket="my-dotflow-bucket",
                    prefix="workflows/",
                    project="my-gcp-project"
                )
            )

    Args:
        bucket (str): GCS bucket name.

        prefix (str): Key prefix for all stored objects.

        project (str): GCP project ID. Defaults to ADC project.
    """

    def __init__(
        self,
        *args,
        bucket: str,
        prefix: str = "dotflow/",
        project: str = None,
        **kwargs,
    ):
        self._gcs = GCS(bucket=bucket, prefix=prefix, project=project)

    def post(self, key: str, context: Context) -> None:
        task_context = []

        if isinstance(context.storage, list):
            for item in context.storage:
                if isinstance(item, Context):
                    task_context.append(self._dumps(storage=item.storage))
        else:
            task_context.append(self._dumps(storage=context.storage))

        self._gcs.write(key=key, data=task_context)

    def get(self, key: str) -> Context:
        task_context = self._gcs.read(key)

        if len(task_context) == 0:
            return Context()

        if len(task_context) == 1:
            return self._loads(storage=task_context[0])

        contexts = Context(storage=[])
        for context in task_context:
            contexts.storage.append(self._loads(storage=context))

        return contexts

    def key(self, task: Callable):
        return f"{task.workflow_id}-{task.task_id}"

    def clear(self, workflow_id: str) -> None:
        self._gcs.delete_prefix(f"{workflow_id}-")

    def _loads(self, storage: Any) -> Context:
        try:
            return Context(storage=loads(storage))
        except Exception:
            return Context(storage=storage)

    def _dumps(self, storage: Any) -> str:
        try:
            return dumps(storage)
        except TypeError:
            return str(storage)

__init__(*args, bucket, prefix='dotflow/', project=None, **kwargs)

Source code in dotflow/providers/storage_gcs.py
41
42
43
44
45
46
47
48
49
def __init__(
    self,
    *args,
    bucket: str,
    prefix: str = "dotflow/",
    project: str = None,
    **kwargs,
):
    self._gcs = GCS(bucket=bucket, prefix=prefix, project=project)

clear(workflow_id)

Source code in dotflow/providers/storage_gcs.py
81
82
def clear(self, workflow_id: str) -> None:
    self._gcs.delete_prefix(f"{workflow_id}-")

get(key)

Source code in dotflow/providers/storage_gcs.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def get(self, key: str) -> Context:
    task_context = self._gcs.read(key)

    if len(task_context) == 0:
        return Context()

    if len(task_context) == 1:
        return self._loads(storage=task_context[0])

    contexts = Context(storage=[])
    for context in task_context:
        contexts.storage.append(self._loads(storage=context))

    return contexts

key(task)

Source code in dotflow/providers/storage_gcs.py
78
79
def key(self, task: Callable):
    return f"{task.workflow_id}-{task.task_id}"

post(key, context)

Source code in dotflow/providers/storage_gcs.py
51
52
53
54
55
56
57
58
59
60
61
def post(self, key: str, context: Context) -> None:
    task_context = []

    if isinstance(context.storage, list):
        for item in context.storage:
            if isinstance(item, Context):
                task_context.append(self._dumps(storage=item.storage))
    else:
        task_context.append(self._dumps(storage=context.storage))

    self._gcs.write(key=key, data=task_context)