Async Actions¶
Dotflow supports async def actions for I/O-bound tasks like HTTP calls, database queries, and file operations.
Example¶
import asyncio
from dotflow import DotFlow, action
@action
async def fetch_users():
await asyncio.sleep(0.1)
return {"users": ["Alice", "Bob"]}
@action
async def fetch_orders():
await asyncio.sleep(0.1)
return {"orders": [1, 2, 3]}
@action
def process(previous_context):
return {"processed": previous_context.storage}
def main():
workflow = DotFlow()
workflow.task.add(step=fetch_users)
workflow.task.add(step=process)
workflow.start()
for task in workflow.result_task():
print(f"Task {task.task_id}: {task.status}")
return workflow
if __name__ == "__main__":
main()
How it works¶
When an action is an async def function, dotflow detects it automatically and runs it with asyncio.run(). No changes needed in the workflow — just use async def instead of def.
# Sync — blocks the thread while waiting
@action
def step_sync():
return requests.get("https://api.com").json()
# Async — frees the thread while waiting
@action
async def step_async():
async with aiohttp.ClientSession() as session:
resp = await session.get("https://api.com")
return await resp.json()
Mixing sync and async¶
You can mix sync and async actions in the same workflow:
workflow = DotFlow()
workflow.task.add(step=async_step) # async
workflow.task.add(step=sync_step) # sync
workflow.task.add(step=async_step) # async
workflow.start()
Retry and timeout¶
Retry, timeout, and backoff work the same way with async actions:
@action(retry=3, timeout=30, backoff=True)
async def unreliable_api():
async with aiohttp.ClientSession() as session:
resp = await session.get("https://api.com/data")
return await resp.json()
When to use async¶
| Scenario | Use async? |
|---|---|
| HTTP calls | Yes |
| Database queries | Yes |
| File I/O | Yes |
| CPU-bound computation | No — use sync |
| Simple data transformation | No — use sync |