Skip to content

StorageS3

dotflow.providers.storage_s3.StorageS3

Bases: Storage

Import

You can import the StorageS3 class directly from dotflow providers:

from dotflow.providers import StorageS3
Example

class dotflow.providers.storage_s3.StorageS3

from dotflow import Config
from dotflow.providers import StorageS3

config = Config(
    storage=StorageS3(
        bucket="my-dotflow-bucket",
        prefix="workflows/",
        region="us-east-1"
    )
)

Parameters:

Name Type Description Default
bucket str

S3 bucket name.

required
prefix str

Key prefix for all stored objects.

'dotflow/'
region str

AWS region name. Defaults to boto3 default.

None
Source code in dotflow/providers/storage_s3.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
class StorageS3(Storage):
    """
    Import:
        You can import the **StorageS3** class directly from dotflow providers:

            from dotflow.providers import StorageS3

    Example:
        `class` dotflow.providers.storage_s3.StorageS3

            from dotflow import Config
            from dotflow.providers import StorageS3

            config = Config(
                storage=StorageS3(
                    bucket="my-dotflow-bucket",
                    prefix="workflows/",
                    region="us-east-1"
                )
            )

    Args:
        bucket (str): S3 bucket name.

        prefix (str): Key prefix for all stored objects.

        region (str): AWS region name. Defaults to boto3 default.
    """

    def __init__(
        self,
        *args,
        bucket: str,
        prefix: str = "dotflow/",
        region: str = None,
        **kwargs,
    ):
        try:
            import boto3
        except ImportError:
            raise ModuleNotFound(
                module="boto3", library="dotflow[aws]"
            ) from None

        self.s3 = boto3.client("s3", region_name=region)
        self.bucket = bucket
        self.prefix = prefix

        self.s3.head_bucket(Bucket=self.bucket)

    def post(self, key: str, context: Context) -> None:
        task_context = []

        if isinstance(context.storage, list):
            for item in context.storage:
                if isinstance(item, Context):
                    task_context.append(self._dumps(storage=item.storage))
        else:
            task_context.append(self._dumps(storage=context.storage))

        self._write(key=key, data=task_context)

    def get(self, key: str) -> Context:
        task_context = self._read(key)

        if len(task_context) == 0:
            return Context()

        if len(task_context) == 1:
            return self._loads(storage=task_context[0])

        contexts = Context(storage=[])
        for context in task_context:
            contexts.storage.append(self._loads(storage=context))

        return contexts

    def key(self, task: Callable):
        return f"{task.workflow_id}-{task.task_id}"

    def _read(self, key: str) -> list:
        try:
            response = self.s3.get_object(
                Bucket=self.bucket,
                Key=f"{self.prefix}{key}",
            )
            data = response["Body"].read().decode("utf-8")
            return loads(data)
        except self.s3.exceptions.NoSuchKey:
            return []

    def _write(self, key: str, data: list) -> None:
        self.s3.put_object(
            Bucket=self.bucket,
            Key=f"{self.prefix}{key}",
            Body=dumps(data),
            ContentType="application/json",
        )

    def _loads(self, storage: Any) -> Context:
        try:
            return Context(storage=loads(storage))
        except Exception:
            return Context(storage=storage)

    def _dumps(self, storage: Any) -> str:
        try:
            return dumps(storage)
        except TypeError:
            return str(storage)

bucket = bucket instance-attribute

prefix = prefix instance-attribute

s3 = boto3.client('s3', region_name=region) instance-attribute

__init__(*args, bucket, prefix='dotflow/', region=None, **kwargs)

Source code in dotflow/providers/storage_s3.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def __init__(
    self,
    *args,
    bucket: str,
    prefix: str = "dotflow/",
    region: str = None,
    **kwargs,
):
    try:
        import boto3
    except ImportError:
        raise ModuleNotFound(
            module="boto3", library="dotflow[aws]"
        ) from None

    self.s3 = boto3.client("s3", region_name=region)
    self.bucket = bucket
    self.prefix = prefix

    self.s3.head_bucket(Bucket=self.bucket)

get(key)

Source code in dotflow/providers/storage_s3.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def get(self, key: str) -> Context:
    task_context = self._read(key)

    if len(task_context) == 0:
        return Context()

    if len(task_context) == 1:
        return self._loads(storage=task_context[0])

    contexts = Context(storage=[])
    for context in task_context:
        contexts.storage.append(self._loads(storage=context))

    return contexts

key(task)

Source code in dotflow/providers/storage_s3.py
89
90
def key(self, task: Callable):
    return f"{task.workflow_id}-{task.task_id}"

post(key, context)

Source code in dotflow/providers/storage_s3.py
62
63
64
65
66
67
68
69
70
71
72
def post(self, key: str, context: Context) -> None:
    task_context = []

    if isinstance(context.storage, list):
        for item in context.storage:
            if isinstance(item, Context):
                task_context.append(self._dumps(storage=item.storage))
    else:
        task_context.append(self._dumps(storage=context.storage))

    self._write(key=key, data=task_context)