Custom Storage¶
The Storage provider is called after every task completes, saving the task context so it can be retrieved by subsequent steps or used for checkpoint-based resume.
Methods¶
post(key, context)— save a context object under the given keyget(key)— retrieve a context object by key, returning aContextwithstorage=Noneif not foundkey(task)— generate a unique key for the given task
Example¶
from dotflow.abc.storage import Storage
from dotflow.core.context import Context
class StorageRedis(Storage):
def __init__(self, host: str = "localhost", port: int = 6379):
import redis
self.client = redis.Redis(host=host, port=port)
def post(self, key: str, context: Context) -> None:
import json
self.client.set(key, json.dumps(context.storage))
def get(self, key: str) -> Context:
import json
data = self.client.get(key)
return Context(storage=json.loads(data) if data else None)
def key(self, task):
return f"{task.workflow_id}:{task.task_id}"