Skip to content

StorageGCS

dotflow.providers.storage_gcs.StorageGCS

Bases: Storage

Import

You can import the StorageGCS class directly from dotflow providers:

from dotflow.providers import StorageGCS
Example

class dotflow.providers.storage_gcs.StorageGCS

from dotflow import Config
from dotflow.providers import StorageGCS

config = Config(
    storage=StorageGCS(
        bucket="my-dotflow-bucket",
        prefix="workflows/",
        project="my-gcp-project"
    )
)

Parameters:

Name Type Description Default
bucket str

GCS bucket name.

required
prefix str

Key prefix for all stored objects.

'dotflow/'
project str

GCP project ID. Defaults to ADC project.

None
Source code in dotflow/providers/storage_gcs.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class StorageGCS(Storage):
    """
    Import:
        You can import the **StorageGCS** class directly from dotflow providers:

            from dotflow.providers import StorageGCS

    Example:
        `class` dotflow.providers.storage_gcs.StorageGCS

            from dotflow import Config
            from dotflow.providers import StorageGCS

            config = Config(
                storage=StorageGCS(
                    bucket="my-dotflow-bucket",
                    prefix="workflows/",
                    project="my-gcp-project"
                )
            )

    Args:
        bucket (str): GCS bucket name.

        prefix (str): Key prefix for all stored objects.

        project (str): GCP project ID. Defaults to ADC project.
    """

    def __init__(
        self,
        *args,
        bucket: str,
        prefix: str = "dotflow/",
        project: str = None,
        **kwargs,
    ):
        try:
            from google.api_core.exceptions import NotFound
            from google.cloud import storage as gcs
        except ImportError:
            raise ModuleNotFound(
                module="google-cloud-storage",
                library="dotflow[gcp]",
            ) from None

        self._not_found = NotFound
        self.client = gcs.Client(project=project)
        self.bucket_obj = self.client.bucket(bucket)
        self.bucket_obj.reload()
        self.prefix = prefix

    def post(self, key: str, context: Context) -> None:
        task_context = []

        if isinstance(context.storage, list):
            for item in context.storage:
                if isinstance(item, Context):
                    task_context.append(self._dumps(storage=item.storage))
        else:
            task_context.append(self._dumps(storage=context.storage))

        self._write(key=key, data=task_context)

    def get(self, key: str) -> Context:
        task_context = self._read(key)

        if len(task_context) == 0:
            return Context()

        if len(task_context) == 1:
            return self._loads(storage=task_context[0])

        contexts = Context(storage=[])
        for context in task_context:
            contexts.storage.append(self._loads(storage=context))

        return contexts

    def key(self, task: Callable):
        return f"{task.workflow_id}-{task.task_id}"

    def _read(self, key: str) -> list:
        blob = self.bucket_obj.blob(f"{self.prefix}{key}")
        try:
            data = blob.download_as_text()
            return loads(data)
        except self._not_found:
            return []

    def _write(self, key: str, data: list) -> None:
        blob = self.bucket_obj.blob(f"{self.prefix}{key}")
        blob.upload_from_string(
            dumps(data),
            content_type="application/json",
        )

    def _loads(self, storage: Any) -> Context:
        try:
            return Context(storage=loads(storage))
        except Exception:
            return Context(storage=storage)

    def _dumps(self, storage: Any) -> str:
        try:
            return dumps(storage)
        except TypeError:
            return str(storage)

bucket_obj = self.client.bucket(bucket) instance-attribute

client = gcs.Client(project=project) instance-attribute

prefix = prefix instance-attribute

__init__(*args, bucket, prefix='dotflow/', project=None, **kwargs)

Source code in dotflow/providers/storage_gcs.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def __init__(
    self,
    *args,
    bucket: str,
    prefix: str = "dotflow/",
    project: str = None,
    **kwargs,
):
    try:
        from google.api_core.exceptions import NotFound
        from google.cloud import storage as gcs
    except ImportError:
        raise ModuleNotFound(
            module="google-cloud-storage",
            library="dotflow[gcp]",
        ) from None

    self._not_found = NotFound
    self.client = gcs.Client(project=project)
    self.bucket_obj = self.client.bucket(bucket)
    self.bucket_obj.reload()
    self.prefix = prefix

get(key)

Source code in dotflow/providers/storage_gcs.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def get(self, key: str) -> Context:
    task_context = self._read(key)

    if len(task_context) == 0:
        return Context()

    if len(task_context) == 1:
        return self._loads(storage=task_context[0])

    contexts = Context(storage=[])
    for context in task_context:
        contexts.storage.append(self._loads(storage=context))

    return contexts

key(task)

Source code in dotflow/providers/storage_gcs.py
91
92
def key(self, task: Callable):
    return f"{task.workflow_id}-{task.task_id}"

post(key, context)

Source code in dotflow/providers/storage_gcs.py
64
65
66
67
68
69
70
71
72
73
74
def post(self, key: str, context: Context) -> None:
    task_context = []

    if isinstance(context.storage, list):
        for item in context.storage:
            if isinstance(item, Context):
                task_context.append(self._dumps(storage=item.storage))
    else:
        task_context.append(self._dumps(storage=context.storage))

    self._write(key=key, data=task_context)