Context

dotflow.core.context.Context

Bases: ContextInstance

Import

You can import the Context class directly from dotflow:

from dotflow import Context
Example

class dotflow.core.context.Context

Context(
    storage={"data": [0, 1, 2, 3]}
)

Parameters:

Name Type Description Default
storage Any

Attribute where any type of Python object can be stored.

None
task_id int

Task ID.

None
workflow_id UUID

Workflow ID.

None
Source code in dotflow/core/context.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
class Context(ContextInstance):
    """
    Import:
        You can import the Context class directly from dotflow:

            from dotflow import Context

    Example:
        `class` dotflow.core.context.Context

            Context(
                storage={"data": [0, 1, 2, 3]}
            )

    Args:
        storage (Any): Attribute where any type of Python object can be stored.

        task_id (int): Task ID.

        workflow_id (UUID): Workflow ID.
    """

    def __init__(
            self,
            storage: Any = None,
            task_id: int = None,
            workflow_id: UUID = None,
    ) -> None:
        super().__init__(
            task_id,
            storage,
            task_id,
            workflow_id
        )
        self.time = datetime.now()
        self.task_id = task_id
        self.workflow_id = workflow_id
        self.storage = storage

    @property
    def time(self):
        return self._time

    @time.setter
    def time(self, value: datetime):
        self._time = value

    @property
    def task_id(self):
        return self._task_id

    @task_id.setter
    def task_id(self, value: int):
        if isinstance(value, int):
            self._task_id = value

        if not self.task_id:
            self._task_id = value

    @property
    def workflow_id(self):
        return self._workflow_id

    @workflow_id.setter
    def workflow_id(self, value: UUID):
        if isinstance(value, UUID):
            self._workflow_id = value

    @property
    def storage(self):
        return self._storage

    @storage.setter
    def storage(self, value: Any):
        if isinstance(value, Context):
            self._storage = value.storage

            self.time = value.time
            self.task_id = value.task_id
            self.workflow_id = value.workflow_id
        else:
            self._storage = value

time property writable

task_id property writable

workflow_id property writable

storage property writable