# async-worker-flow
**Stop waiting for the slowest task. Start processing smarter.**
<p align="center">
<img src="docs/images/worker-pool-concept.png" alt="async-worker-flow Worker Pool Concept" width="600">
</p>
<p align="center">
<em>async-worker-flow: Modern async execution library with concurrent.futures-style API and advanced pipelines</em>
</p>
---
## The Problem I Had to Solve
I was processing massive amounts of data using OpenAI's Batch API. The workflow was complex:
1. Upload batches of data to OpenAI
2. Wait for processing to complete
3. Download the results
4. Save to database
5. Repeat for the next batch
Initially, I processed 10 batches at a time using basic async. But here's the problem: **I had to wait for ALL 10 batches to complete before starting the next group**.
### The Bottleneck
Imagine this scenario:
- 9 batches complete in 5 minutes
- 1 batch gets stuck and takes 30 minutes
- **I waste 25 minutes waiting** for that one slow batch while my system sits idle
With hundreds of batches to process, these delays accumulated into **hours of wasted time**. Even worse, one failed batch would block the entire pipeline.
### The Solution: async-worker-flow
I built async-worker-flow to solve this exact problem. Instead of batch-by-batch processing, **async-worker-flow uses worker pools** where:
- ✅ **Each worker handles tasks independently**
- ✅ **When a worker finishes, it immediately grabs the next task**
- ✅ **Slow tasks don't block fast ones**
- ✅ **Always maintain optimal concurrency (e.g., 10 tasks running simultaneously)**
- ✅ **Built-in retry logic for failed tasks**
- ✅ **Multi-stage pipelines for complex workflows**
**Result**: My OpenAI batch processing went from taking hours to completing in a fraction of the time, with automatic retry handling and zero idle time.
---
## Quick Install
```bash
pip install async-worker-flow
```
---
## Quick Example
```python
import asyncio
from asyncflow import Pipeline, Stage
# Your actual work
async def upload_batch(batch_data):
# Upload to OpenAI API
return batch_id
async def check_status(batch_id):
# Check if batch is ready
return result_url
async def download_results(result_url):
# Download processed data
return processed_data
async def save_to_db(processed_data):
# Save results
return "saved"
# Build the pipeline
upload_stage = Stage(name="Upload", workers=10, tasks=[upload_batch])
check_stage = Stage(name="Check", workers=10, tasks=[check_status])
download_stage = Stage(name="Download", workers=10, tasks=[download_results])
save_stage = Stage(name="Save", workers=5, tasks=[save_to_db])
pipeline = Pipeline(stages=[upload_stage, check_stage, download_stage, save_stage])
# Process 1000 batches efficiently
results = await pipeline.run(my_1000_batches)
```
**What happens**: 10 workers process uploads simultaneously. As soon as one finishes, it picks the next batch. No waiting. No idle time. Maximum throughput.
---
## Installation
```bash
pip install async-worker-flow
```
## Key Features
### 🚀 **Worker Pool Architecture**
- Independent workers that never block each other
- Automatic task distribution
- Optimal resource utilization
### 🔄 **Multi-Stage Pipelines**
- Chain operations with configurable worker pools per stage
- Each stage runs independently
- Data flows automatically between stages
### 💪 **Built-in Resilience**
- Per-task retry with exponential backoff
- Per-stage retry for transactional operations
- Failed tasks don't stop the pipeline
### 📊 **Real-time Monitoring**
- StatusTracker for real-time item tracking
- Query item status, history, and statistics
- Event-driven monitoring with callbacks
- Pipeline metrics and queue insights
### 🎯 **Familiar API**
- Drop-in async replacement for `concurrent.futures`
- `submit()`, `map()`, `as_completed()` methods
- Clean, intuitive interface
---
## Use Cases
### ✅ **Perfect for:**
- **Batch API Processing** - OpenAI, Anthropic, any batch API
- **ETL Pipelines** - Extract, transform, load at scale
- **Web Scraping** - Fetch, parse, store web data efficiently
- **Data Processing** - Process large datasets with retry logic
- **Microservices** - Chain async service calls with error handling
### ⚡ **Real-world Impact:**
- Process 1000+ items without bottlenecks
- Automatic retry for transient failures
- Zero idle time = maximum throughput
- Clear observability with metrics and callbacks
---
## AsyncExecutor: Simple Concurrent Execution
For straightforward parallel processing, AsyncExecutor provides a `concurrent.futures`-style API:
### Basic Usage
```python
import asyncio
from asyncflow import AsyncExecutor
async def process_item(x):
await asyncio.sleep(0.1)
return x * 2
async def main():
async with AsyncExecutor(max_workers=10) as executor:
# Using map() for parallel processing
results = []
async for result in executor.map(process_item, range(100)):
results.append(result)
print(f"Processed {len(results)} items")
asyncio.run(main())
```
### Racing Tasks with wait()
Get the fastest result from multiple concurrent operations:
```python
from asyncflow import AsyncExecutor, WaitStrategy
async def main():
async with AsyncExecutor(max_workers=10) as executor:
# Submit same request to multiple API regions
futures = [
executor.submit(call_api_region_1, query),
executor.submit(call_api_region_2, query),
executor.submit(call_api_region_3, query),
]
# Get the first result
done, pending = await executor.wait(
futures,
return_when=WaitStrategy.FIRST_COMPLETED
)
fastest_result = await list(done)[0].result()
print(f"Got result from fastest region: {fastest_result}")
```
### Waiting for Multiple Futures
```python
from asyncflow import WaitStrategy
# Wait for all to complete
done, pending = await executor.wait(futures)
# Wait for first to complete
done, pending = await executor.wait(
futures,
return_when=WaitStrategy.FIRST_COMPLETED
)
# Wait for first exception
done, pending = await executor.wait(
futures,
return_when=WaitStrategy.FIRST_EXCEPTION
)
# Wait with timeout
done, pending = await executor.wait(
futures,
timeout=5.0
)
```
---
## Pipeline: Multi-Stage Processing
For complex workflows with multiple steps:
```python
import asyncio
from asyncflow import Pipeline, Stage
async def fetch(x):
await asyncio.sleep(0.1)
return f"data_{x}"
async def process(x):
await asyncio.sleep(0.1)
return x.upper()
async def save(x):
await asyncio.sleep(0.1)
return f"saved_{x}"
async def main():
# Define stages with different worker counts
fetch_stage = Stage(name="Fetch", workers=10, tasks=[fetch])
process_stage = Stage(name="Process", workers=5, tasks=[process])
save_stage = Stage(name="Save", workers=3, tasks=[save])
# Build pipeline
pipeline = Pipeline(stages=[fetch_stage, process_stage, save_stage])
# Process 100 items through all stages
results = await pipeline.run(range(100))
print(f"Completed: {len(results)} items")
print(f"Stats: {pipeline.get_stats()}")
asyncio.run(main())
```
**Why different worker counts?**
- **Fetch**: I/O bound, use more workers (10)
- **Process**: CPU bound, moderate workers (5)
- **Save**: Rate-limited API, fewer workers (3)
---
## How It Works: Data Flow & Execution
### Pipeline is Like a DAG
async-worker-flow creates a **sequential pipeline** where each stage's output becomes the next stage's input:
```
Stage 1 → Queue → Stage 2 → Queue → Stage 3 → Results
```
**Key Point**: Stages are connected, but workers never wait for all items to complete before processing the next one!
### What Does Each Stage Receive?
**Simple rule**: Each stage receives the `return` value from the last task of the previous stage.
```python
# Stage 1: Returns a dictionary
async def fetch_user(user_id: int) -> dict:
return {"id": user_id, "name": "João", "email": "joao@example.com"}
stage1 = Stage(name="Fetch", workers=5, tasks=[fetch_user])
# Stage 2: Receives the DICTIONARY that Stage 1 returned
async def validate_user(user_data: dict) -> dict:
# user_data IS the dict that fetch_user returned!
# user_data = {"id": 123, "name": "João", "email": "joao@example.com"}
if "@" not in user_data["email"]:
raise ValueError("Invalid email")
return {**user_data, "validated": True}
stage2 = Stage(name="Validate", workers=3, tasks=[validate_user])
# Stage 3: Receives the DICTIONARY that Stage 2 returned
async def save_user(user_data: dict) -> str:
# user_data = {"id": 123, "name": "João", "email": "...", "validated": True}
return f"saved_user_{user_data['id']}"
stage3 = Stage(name="Save", workers=2, tasks=[save_user])
```
**Data flow:**
```
Input: 123
↓
Stage 1: fetch_user(123)
returns: {"id": 123, "name": "João", "email": "..."}
↓ (via Queue)
Stage 2: validate_user({"id": 123, ...})
returns: {"id": 123, "name": "João", "validated": True}
↓ (via Queue)
Stage 3: save_user({"id": 123, "validated": True})
returns: "saved_user_123"
↓
Final Result: "saved_user_123"
```
### Multiple Tasks in One Stage
When a stage has multiple tasks, they execute **sequentially** per item, but workers process items in **parallel**:
```python
validation_stage = Stage(
name="Validation",
workers=5, # 5 workers in PARALLEL
tasks=[check_schema, validate_rules, sanitize] # SEQUENTIAL per item
)
```
**Execution model:**
- ✅ **Workers = PARALLEL** → Process different items simultaneously
- ✅ **Tasks = SEQUENTIAL** → Each item goes through task1 → task2 → task3
- ✅ **Stages = CONNECTED** → Output of one stage feeds into the next via queues
**Example flow:**
```
Worker 1: Item A → check_schema → validate_rules → sanitize → next stage
Worker 2: Item B → check_schema → validate_rules → sanitize → next stage
Worker 3: Item C → check_schema → validate_rules → sanitize → next stage
```
### The Bottleneck Solution Explained
**Problem (Traditional Batch):**
```python
# Process 10 items, wait for ALL to finish
batch = await asyncio.gather(*[process(i) for i in range(10)])
# If 9 finish in 1 min and 1 takes 30 min → 29 min wasted! 😢
```
**Solution (async-worker-flow):**
```python
# Workers continuously grab new items as they finish
stage = Stage(name="Process", workers=10, tasks=[process])
pipeline = Pipeline(stages=[stage])
results = await pipeline.run(range(100))
# Fast items don't wait for slow ones → Zero idle time! 🎉
```
**Why it's faster:**
1. Worker finishes fast item (1 min) → immediately grabs next item
2. Slow item (30 min) processes on one worker
3. Other 9 workers keep processing new items in parallel
4. **Result**: Maximum throughput, no wasted time
---
## Advanced Features
### Retry Strategies
**Per-Task Retry** (for independent operations):
```python
stage = Stage(
name="APICall",
workers=10,
tasks=[call_api],
retry="per_task",
task_attempts=5,
task_wait_seconds=2.0
)
```
**Per-Stage Retry** (for transactional operations):
```python
stage = Stage(
name="Transaction",
workers=3,
tasks=[begin_tx, update_db, commit_tx],
retry="per_stage",
stage_attempts=3
)
```
### Real-Time Monitoring with StatusTracker
Track every item as it flows through your pipeline with **StatusTracker**. Get real-time status updates, query current states, and access complete event history.
#### How It Works
Every item in the pipeline goes through these states:
- `queued` - Waiting in a stage's queue
- `in_progress` - Being processed by a worker
- `completed` - Successfully finished a stage
- `failed` - Failed to complete a stage
#### Basic Usage
```python
from asyncflow import Pipeline, Stage, StatusTracker
tracker = StatusTracker()
pipeline = Pipeline(
stages=[stage1, stage2, stage3],
status_tracker=tracker
)
results = await pipeline.run(items)
# Query current status
stats = tracker.get_stats()
print(f"Completed: {stats['completed']}")
print(f"Failed: {stats['failed']}")
print(f"In Progress: {stats['in_progress']}")
```
#### Real-Time Event Monitoring
Get notified immediately when items change status:
```python
async def on_status_change(event):
"""Called whenever an item changes status"""
print(f"{event.status.upper()}: Item {event.item_id} @ {event.stage}")
if event.status == "failed":
error = event.metadata.get("error")
print(f" Error: {error}")
if event.status == "in_progress":
print(f" Worker: {event.worker}")
tracker = StatusTracker(on_status_change=on_status_change)
pipeline = Pipeline(stages=[...], status_tracker=tracker)
```
#### Query Methods
```python
# Get status of a specific item
status = tracker.get_status(item_id=42)
print(f"Item 42 is {status.status} at {status.stage}")
# Get all items in a specific status
failed_items = tracker.get_by_status("failed")
for event in failed_items:
print(f"Item {event.item_id} failed: {event.metadata['error']}")
# Get complete event history for an item
history = tracker.get_history(item_id=42)
for event in history:
print(f"{event.timestamp}: {event.status} @ {event.stage}")
# Get aggregate statistics
stats = tracker.get_stats()
# Returns: {"queued": 5, "in_progress": 10, "completed": 985, "failed": 0}
```
#### Real-World Example: Dashboard Integration
```python
from asyncflow import Pipeline, StatusTracker
# Track status changes for live dashboard
async def update_dashboard(event):
"""Send status updates to your dashboard"""
await websocket.send_json({
"item_id": event.item_id,
"stage": event.stage,
"status": event.status,
"timestamp": event.timestamp
})
tracker = StatusTracker(on_status_change=update_dashboard)
pipeline = Pipeline(stages=[...], status_tracker=tracker)
# Monitor progress in real-time
async def monitor():
while True:
await asyncio.sleep(1)
stats = tracker.get_stats()
print(f"Progress: {stats['completed']}/{total_items}")
async with asyncio.TaskGroup() as tg:
tg.create_task(monitor())
results = await pipeline.run(items)
```
#### Integration Examples
**Prometheus Metrics:**
```python
from prometheus_client import Gauge
items_by_status = Gauge("pipeline_items", "Items by status", ["status"])
async def update_metrics(event):
items_by_status.labels(status=event.status).inc()
tracker = StatusTracker(on_status_change=update_metrics)
```
**Database Logging:**
```python
async def log_to_db(event):
await db.execute("""
INSERT INTO pipeline_events (item_id, stage, status, timestamp)
VALUES ($1, $2, $3, $4)
""", event.item_id, event.stage, event.status, event.timestamp)
tracker = StatusTracker(on_status_change=log_to_db)
```
### Worker-Level Tracking
Track which specific worker is processing each item:
```python
from asyncflow import Pipeline, Stage, StatusTracker
worker_assignments = {}
async def on_status_change(event):
if event.status == "in_progress":
print(f"Worker {event.worker_id} is processing {event.item_id}")
worker_assignments[event.item_id] = event.worker_id
tracker = StatusTracker(on_status_change=on_status_change)
stage = Stage(name="Process", workers=10, tasks=[process])
pipeline = Pipeline(stages=[stage], status_tracker=tracker)
worker_names = pipeline.get_worker_names()
results = await pipeline.run(items)
```
#### Custom Item IDs (Optional)
By default, items are assigned sequential IDs (0, 1, 2, ...). You can provide custom IDs for better tracking:
```python
items = [
{"id": "batch_0001", "value": data1},
{"id": "batch_0002", "value": data2},
{"id": "user_12345", "value": user_data},
]
results = await pipeline.run(items)
```
**Note**: Custom IDs are optional. If you don't need tracking, just pass regular values:
```python
items = [data1, data2, data3]
results = await pipeline.run(items)
```
### Order Preservation
```python
# Maintain input order in results
pipeline = Pipeline(stages=[stage1, stage2], preserve_order=True)
# Or allow reordering for better performance
pipeline = Pipeline(stages=[stage1, stage2], preserve_order=False)
```
### Real-time Statistics
```python
stats = pipeline.get_stats()
print(f"Processed: {stats.items_processed}")
print(f"Failed: {stats.items_failed}")
print(f"In-flight: {stats.items_in_flight}")
print(f"Queue sizes: {stats.queue_sizes}")
```
---
## Comparison: Before vs After
### Before async-worker-flow ❌
```python
# Process batches sequentially in groups
for batch_group in chunks(batches, 10):
results = await asyncio.gather(*[process(b) for b in batch_group])
# Wait for ALL 10 to complete before continuing
# One slow task blocks 9 fast ones
```
**Problems:**
- Idle workers waiting for slowest task
- No automatic retry
- Difficult to monitor progress
- Hard to optimize worker counts per stage
### After async-worker-flow ✅
```python
# Workers continuously process available tasks
stage = Stage(name="Process", workers=10, tasks=[process])
pipeline = Pipeline(stages=[stage])
results = await pipeline.run(batches)
```
**Benefits:**
- Zero idle time
- Automatic retry with backoff
- Real-time monitoring
- Easy to tune performance
---
## Documentation
📚 **Full documentation available at:** [asyncflow.readthedocs.io](https://asyncflow.readthedocs.io)
- [Getting Started Guide](docs/getting-started/quickstart.md)
- [AsyncExecutor Guide](docs/user-guide/executor.md)
- [Pipeline Guide](docs/user-guide/pipeline.md)
- [Error Handling & Retries](docs/user-guide/error-handling.md)
- [API Reference](docs/api/index.md)
- [Examples & Patterns](docs/examples/basic.md)
---
## Requirements
- Python 3.9+
- tenacity >= 8.0.0
**Note**: For Python 3.9-3.10, the `taskgroup` backport is automatically installed.
---
## Contributing
Contributions are welcome! Please see our [Contributing Guidelines](CONTRIBUTING.md).
---
## License
MIT License - see [LICENSE](LICENSE) file for details.
---
## Why async-worker-flow?
I built async-worker-flow because I was tired of waiting for the slowest task in my batch processing pipelines. Every data engineer and ML engineer has faced this: you're processing hundreds or thousands of API calls, and one slow response blocks everything.
async-worker-flow solves this with a simple concept: **independent workers that never wait**. When a worker finishes a task, it immediately grabs the next one. No coordination overhead. No waiting. Just continuous, efficient processing.
If you're processing data through APIs, building ETL pipelines, or running any kind of batch async operations, async-worker-flow will save you time.
**Start processing smarter, not harder.** 🚀
---
<p align="center">
Made with ❤️ to solve real problems in production
</p>
Raw data
{
"_id": null,
"home_page": null,
"name": "async-worker-flow",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": "async, asyncio, pipeline, concurrent, futures, executor, worker-pool",
"author": null,
"author_email": "Rodolfo Nobrega <rodolfo@example.com>",
"download_url": "https://files.pythonhosted.org/packages/1c/67/56217cc40648bd0343fc2fa07af783403fa751650360159af06e5e09a3a4/async_worker_flow-0.2.0.tar.gz",
"platform": null,
"description": "# async-worker-flow\n\n**Stop waiting for the slowest task. Start processing smarter.**\n\n<p align=\"center\">\n <img src=\"docs/images/worker-pool-concept.png\" alt=\"async-worker-flow Worker Pool Concept\" width=\"600\">\n</p>\n\n<p align=\"center\">\n <em>async-worker-flow: Modern async execution library with concurrent.futures-style API and advanced pipelines</em>\n</p>\n\n---\n\n## The Problem I Had to Solve\n\nI was processing massive amounts of data using OpenAI's Batch API. The workflow was complex:\n1. Upload batches of data to OpenAI\n2. Wait for processing to complete\n3. Download the results\n4. Save to database\n5. Repeat for the next batch\n\nInitially, I processed 10 batches at a time using basic async. But here's the problem: **I had to wait for ALL 10 batches to complete before starting the next group**.\n\n### The Bottleneck\n\nImagine this scenario:\n- 9 batches complete in 5 minutes\n- 1 batch gets stuck and takes 30 minutes\n- **I waste 25 minutes waiting** for that one slow batch while my system sits idle\n\nWith hundreds of batches to process, these delays accumulated into **hours of wasted time**. Even worse, one failed batch would block the entire pipeline.\n\n### The Solution: async-worker-flow\n\nI built async-worker-flow to solve this exact problem. Instead of batch-by-batch processing, **async-worker-flow uses worker pools** where:\n\n- \u2705 **Each worker handles tasks independently**\n- \u2705 **When a worker finishes, it immediately grabs the next task**\n- \u2705 **Slow tasks don't block fast ones**\n- \u2705 **Always maintain optimal concurrency (e.g., 10 tasks running simultaneously)**\n- \u2705 **Built-in retry logic for failed tasks**\n- \u2705 **Multi-stage pipelines for complex workflows**\n\n**Result**: My OpenAI batch processing went from taking hours to completing in a fraction of the time, with automatic retry handling and zero idle time.\n\n---\n\n## Quick Install\n\n```bash\npip install async-worker-flow\n```\n\n---\n\n## Quick Example\n\n```python\nimport asyncio\nfrom asyncflow import Pipeline, Stage\n\n# Your actual work\nasync def upload_batch(batch_data):\n # Upload to OpenAI API\n return batch_id\n\nasync def check_status(batch_id):\n # Check if batch is ready\n return result_url\n\nasync def download_results(result_url):\n # Download processed data\n return processed_data\n\nasync def save_to_db(processed_data):\n # Save results\n return \"saved\"\n\n# Build the pipeline\nupload_stage = Stage(name=\"Upload\", workers=10, tasks=[upload_batch])\ncheck_stage = Stage(name=\"Check\", workers=10, tasks=[check_status])\ndownload_stage = Stage(name=\"Download\", workers=10, tasks=[download_results])\nsave_stage = Stage(name=\"Save\", workers=5, tasks=[save_to_db])\n\npipeline = Pipeline(stages=[upload_stage, check_stage, download_stage, save_stage])\n\n# Process 1000 batches efficiently\nresults = await pipeline.run(my_1000_batches)\n```\n\n**What happens**: 10 workers process uploads simultaneously. As soon as one finishes, it picks the next batch. No waiting. No idle time. Maximum throughput.\n\n---\n\n## Installation\n\n```bash\npip install async-worker-flow\n```\n\n## Key Features\n\n### \ud83d\ude80 **Worker Pool Architecture**\n- Independent workers that never block each other\n- Automatic task distribution\n- Optimal resource utilization\n\n### \ud83d\udd04 **Multi-Stage Pipelines**\n- Chain operations with configurable worker pools per stage\n- Each stage runs independently\n- Data flows automatically between stages\n\n### \ud83d\udcaa **Built-in Resilience**\n- Per-task retry with exponential backoff\n- Per-stage retry for transactional operations\n- Failed tasks don't stop the pipeline\n\n### \ud83d\udcca **Real-time Monitoring**\n- StatusTracker for real-time item tracking\n- Query item status, history, and statistics\n- Event-driven monitoring with callbacks\n- Pipeline metrics and queue insights\n\n### \ud83c\udfaf **Familiar API**\n- Drop-in async replacement for `concurrent.futures`\n- `submit()`, `map()`, `as_completed()` methods\n- Clean, intuitive interface\n\n---\n\n## Use Cases\n\n### \u2705 **Perfect for:**\n- **Batch API Processing** - OpenAI, Anthropic, any batch API\n- **ETL Pipelines** - Extract, transform, load at scale\n- **Web Scraping** - Fetch, parse, store web data efficiently\n- **Data Processing** - Process large datasets with retry logic\n- **Microservices** - Chain async service calls with error handling\n\n### \u26a1 **Real-world Impact:**\n- Process 1000+ items without bottlenecks\n- Automatic retry for transient failures\n- Zero idle time = maximum throughput\n- Clear observability with metrics and callbacks\n\n---\n\n## AsyncExecutor: Simple Concurrent Execution\n\nFor straightforward parallel processing, AsyncExecutor provides a `concurrent.futures`-style API:\n\n### Basic Usage\n\n```python\nimport asyncio\nfrom asyncflow import AsyncExecutor\n\nasync def process_item(x):\n await asyncio.sleep(0.1)\n return x * 2\n\nasync def main():\n async with AsyncExecutor(max_workers=10) as executor:\n # Using map() for parallel processing\n results = []\n async for result in executor.map(process_item, range(100)):\n results.append(result)\n print(f\"Processed {len(results)} items\")\n\nasyncio.run(main())\n```\n\n### Racing Tasks with wait()\n\nGet the fastest result from multiple concurrent operations:\n\n```python\nfrom asyncflow import AsyncExecutor, WaitStrategy\n\nasync def main():\n async with AsyncExecutor(max_workers=10) as executor:\n # Submit same request to multiple API regions\n futures = [\n executor.submit(call_api_region_1, query),\n executor.submit(call_api_region_2, query),\n executor.submit(call_api_region_3, query),\n ]\n\n # Get the first result\n done, pending = await executor.wait(\n futures,\n return_when=WaitStrategy.FIRST_COMPLETED\n )\n\n fastest_result = await list(done)[0].result()\n print(f\"Got result from fastest region: {fastest_result}\")\n```\n\n### Waiting for Multiple Futures\n\n```python\nfrom asyncflow import WaitStrategy\n\n# Wait for all to complete\ndone, pending = await executor.wait(futures)\n\n# Wait for first to complete\ndone, pending = await executor.wait(\n futures,\n return_when=WaitStrategy.FIRST_COMPLETED\n)\n\n# Wait for first exception\ndone, pending = await executor.wait(\n futures,\n return_when=WaitStrategy.FIRST_EXCEPTION\n)\n\n# Wait with timeout\ndone, pending = await executor.wait(\n futures,\n timeout=5.0\n)\n```\n\n---\n\n## Pipeline: Multi-Stage Processing\n\nFor complex workflows with multiple steps:\n\n```python\nimport asyncio\nfrom asyncflow import Pipeline, Stage\n\nasync def fetch(x):\n await asyncio.sleep(0.1)\n return f\"data_{x}\"\n\nasync def process(x):\n await asyncio.sleep(0.1)\n return x.upper()\n\nasync def save(x):\n await asyncio.sleep(0.1)\n return f\"saved_{x}\"\n\nasync def main():\n # Define stages with different worker counts\n fetch_stage = Stage(name=\"Fetch\", workers=10, tasks=[fetch])\n process_stage = Stage(name=\"Process\", workers=5, tasks=[process])\n save_stage = Stage(name=\"Save\", workers=3, tasks=[save])\n\n # Build pipeline\n pipeline = Pipeline(stages=[fetch_stage, process_stage, save_stage])\n\n # Process 100 items through all stages\n results = await pipeline.run(range(100))\n\n print(f\"Completed: {len(results)} items\")\n print(f\"Stats: {pipeline.get_stats()}\")\n\nasyncio.run(main())\n```\n\n**Why different worker counts?**\n- **Fetch**: I/O bound, use more workers (10)\n- **Process**: CPU bound, moderate workers (5)\n- **Save**: Rate-limited API, fewer workers (3)\n\n---\n\n## How It Works: Data Flow & Execution\n\n### Pipeline is Like a DAG\n\nasync-worker-flow creates a **sequential pipeline** where each stage's output becomes the next stage's input:\n\n```\nStage 1 \u2192 Queue \u2192 Stage 2 \u2192 Queue \u2192 Stage 3 \u2192 Results\n```\n\n**Key Point**: Stages are connected, but workers never wait for all items to complete before processing the next one!\n\n### What Does Each Stage Receive?\n\n**Simple rule**: Each stage receives the `return` value from the last task of the previous stage.\n\n```python\n# Stage 1: Returns a dictionary\nasync def fetch_user(user_id: int) -> dict:\n return {\"id\": user_id, \"name\": \"Jo\u00e3o\", \"email\": \"joao@example.com\"}\n\nstage1 = Stage(name=\"Fetch\", workers=5, tasks=[fetch_user])\n\n# Stage 2: Receives the DICTIONARY that Stage 1 returned\nasync def validate_user(user_data: dict) -> dict:\n # user_data IS the dict that fetch_user returned!\n # user_data = {\"id\": 123, \"name\": \"Jo\u00e3o\", \"email\": \"joao@example.com\"}\n\n if \"@\" not in user_data[\"email\"]:\n raise ValueError(\"Invalid email\")\n\n return {**user_data, \"validated\": True}\n\nstage2 = Stage(name=\"Validate\", workers=3, tasks=[validate_user])\n\n# Stage 3: Receives the DICTIONARY that Stage 2 returned\nasync def save_user(user_data: dict) -> str:\n # user_data = {\"id\": 123, \"name\": \"Jo\u00e3o\", \"email\": \"...\", \"validated\": True}\n return f\"saved_user_{user_data['id']}\"\n\nstage3 = Stage(name=\"Save\", workers=2, tasks=[save_user])\n```\n\n**Data flow:**\n```\nInput: 123\n \u2193\nStage 1: fetch_user(123)\n returns: {\"id\": 123, \"name\": \"Jo\u00e3o\", \"email\": \"...\"}\n \u2193 (via Queue)\nStage 2: validate_user({\"id\": 123, ...})\n returns: {\"id\": 123, \"name\": \"Jo\u00e3o\", \"validated\": True}\n \u2193 (via Queue)\nStage 3: save_user({\"id\": 123, \"validated\": True})\n returns: \"saved_user_123\"\n \u2193\nFinal Result: \"saved_user_123\"\n```\n\n### Multiple Tasks in One Stage\n\nWhen a stage has multiple tasks, they execute **sequentially** per item, but workers process items in **parallel**:\n\n```python\nvalidation_stage = Stage(\n name=\"Validation\",\n workers=5, # 5 workers in PARALLEL\n tasks=[check_schema, validate_rules, sanitize] # SEQUENTIAL per item\n)\n```\n\n**Execution model:**\n- \u2705 **Workers = PARALLEL** \u2192 Process different items simultaneously\n- \u2705 **Tasks = SEQUENTIAL** \u2192 Each item goes through task1 \u2192 task2 \u2192 task3\n- \u2705 **Stages = CONNECTED** \u2192 Output of one stage feeds into the next via queues\n\n**Example flow:**\n```\nWorker 1: Item A \u2192 check_schema \u2192 validate_rules \u2192 sanitize \u2192 next stage\nWorker 2: Item B \u2192 check_schema \u2192 validate_rules \u2192 sanitize \u2192 next stage\nWorker 3: Item C \u2192 check_schema \u2192 validate_rules \u2192 sanitize \u2192 next stage\n```\n\n### The Bottleneck Solution Explained\n\n**Problem (Traditional Batch):**\n```python\n# Process 10 items, wait for ALL to finish\nbatch = await asyncio.gather(*[process(i) for i in range(10)])\n# If 9 finish in 1 min and 1 takes 30 min \u2192 29 min wasted! \ud83d\ude22\n```\n\n**Solution (async-worker-flow):**\n```python\n# Workers continuously grab new items as they finish\nstage = Stage(name=\"Process\", workers=10, tasks=[process])\npipeline = Pipeline(stages=[stage])\nresults = await pipeline.run(range(100))\n# Fast items don't wait for slow ones \u2192 Zero idle time! \ud83c\udf89\n```\n\n**Why it's faster:**\n1. Worker finishes fast item (1 min) \u2192 immediately grabs next item\n2. Slow item (30 min) processes on one worker\n3. Other 9 workers keep processing new items in parallel\n4. **Result**: Maximum throughput, no wasted time\n\n---\n\n## Advanced Features\n\n### Retry Strategies\n\n**Per-Task Retry** (for independent operations):\n```python\nstage = Stage(\n name=\"APICall\",\n workers=10,\n tasks=[call_api],\n retry=\"per_task\",\n task_attempts=5,\n task_wait_seconds=2.0\n)\n```\n\n**Per-Stage Retry** (for transactional operations):\n```python\nstage = Stage(\n name=\"Transaction\",\n workers=3,\n tasks=[begin_tx, update_db, commit_tx],\n retry=\"per_stage\",\n stage_attempts=3\n)\n```\n\n### Real-Time Monitoring with StatusTracker\n\nTrack every item as it flows through your pipeline with **StatusTracker**. Get real-time status updates, query current states, and access complete event history.\n\n#### How It Works\n\nEvery item in the pipeline goes through these states:\n- `queued` - Waiting in a stage's queue\n- `in_progress` - Being processed by a worker\n- `completed` - Successfully finished a stage\n- `failed` - Failed to complete a stage\n\n#### Basic Usage\n\n```python\nfrom asyncflow import Pipeline, Stage, StatusTracker\n\ntracker = StatusTracker()\n\npipeline = Pipeline(\n stages=[stage1, stage2, stage3],\n status_tracker=tracker\n)\n\nresults = await pipeline.run(items)\n\n# Query current status\nstats = tracker.get_stats()\nprint(f\"Completed: {stats['completed']}\")\nprint(f\"Failed: {stats['failed']}\")\nprint(f\"In Progress: {stats['in_progress']}\")\n```\n\n#### Real-Time Event Monitoring\n\nGet notified immediately when items change status:\n\n```python\nasync def on_status_change(event):\n \"\"\"Called whenever an item changes status\"\"\"\n print(f\"{event.status.upper()}: Item {event.item_id} @ {event.stage}\")\n\n if event.status == \"failed\":\n error = event.metadata.get(\"error\")\n print(f\" Error: {error}\")\n\n if event.status == \"in_progress\":\n print(f\" Worker: {event.worker}\")\n\ntracker = StatusTracker(on_status_change=on_status_change)\npipeline = Pipeline(stages=[...], status_tracker=tracker)\n```\n\n#### Query Methods\n\n```python\n# Get status of a specific item\nstatus = tracker.get_status(item_id=42)\nprint(f\"Item 42 is {status.status} at {status.stage}\")\n\n# Get all items in a specific status\nfailed_items = tracker.get_by_status(\"failed\")\nfor event in failed_items:\n print(f\"Item {event.item_id} failed: {event.metadata['error']}\")\n\n# Get complete event history for an item\nhistory = tracker.get_history(item_id=42)\nfor event in history:\n print(f\"{event.timestamp}: {event.status} @ {event.stage}\")\n\n# Get aggregate statistics\nstats = tracker.get_stats()\n# Returns: {\"queued\": 5, \"in_progress\": 10, \"completed\": 985, \"failed\": 0}\n```\n\n#### Real-World Example: Dashboard Integration\n\n```python\nfrom asyncflow import Pipeline, StatusTracker\n\n# Track status changes for live dashboard\nasync def update_dashboard(event):\n \"\"\"Send status updates to your dashboard\"\"\"\n await websocket.send_json({\n \"item_id\": event.item_id,\n \"stage\": event.stage,\n \"status\": event.status,\n \"timestamp\": event.timestamp\n })\n\ntracker = StatusTracker(on_status_change=update_dashboard)\npipeline = Pipeline(stages=[...], status_tracker=tracker)\n\n# Monitor progress in real-time\nasync def monitor():\n while True:\n await asyncio.sleep(1)\n stats = tracker.get_stats()\n print(f\"Progress: {stats['completed']}/{total_items}\")\n\nasync with asyncio.TaskGroup() as tg:\n tg.create_task(monitor())\n results = await pipeline.run(items)\n```\n\n#### Integration Examples\n\n**Prometheus Metrics:**\n```python\nfrom prometheus_client import Gauge\n\nitems_by_status = Gauge(\"pipeline_items\", \"Items by status\", [\"status\"])\n\nasync def update_metrics(event):\n items_by_status.labels(status=event.status).inc()\n\ntracker = StatusTracker(on_status_change=update_metrics)\n```\n\n**Database Logging:**\n```python\nasync def log_to_db(event):\n await db.execute(\"\"\"\n INSERT INTO pipeline_events (item_id, stage, status, timestamp)\n VALUES ($1, $2, $3, $4)\n \"\"\", event.item_id, event.stage, event.status, event.timestamp)\n\ntracker = StatusTracker(on_status_change=log_to_db)\n```\n\n### Worker-Level Tracking\n\nTrack which specific worker is processing each item:\n\n```python\nfrom asyncflow import Pipeline, Stage, StatusTracker\n\nworker_assignments = {}\n\nasync def on_status_change(event):\n if event.status == \"in_progress\":\n print(f\"Worker {event.worker_id} is processing {event.item_id}\")\n worker_assignments[event.item_id] = event.worker_id\n\ntracker = StatusTracker(on_status_change=on_status_change)\n\nstage = Stage(name=\"Process\", workers=10, tasks=[process])\npipeline = Pipeline(stages=[stage], status_tracker=tracker)\n\nworker_names = pipeline.get_worker_names()\n\nresults = await pipeline.run(items)\n```\n\n#### Custom Item IDs (Optional)\n\nBy default, items are assigned sequential IDs (0, 1, 2, ...). You can provide custom IDs for better tracking:\n\n```python\nitems = [\n {\"id\": \"batch_0001\", \"value\": data1},\n {\"id\": \"batch_0002\", \"value\": data2},\n {\"id\": \"user_12345\", \"value\": user_data},\n]\n\nresults = await pipeline.run(items)\n```\n\n**Note**: Custom IDs are optional. If you don't need tracking, just pass regular values:\n\n```python\nitems = [data1, data2, data3]\nresults = await pipeline.run(items)\n```\n\n### Order Preservation\n\n```python\n# Maintain input order in results\npipeline = Pipeline(stages=[stage1, stage2], preserve_order=True)\n\n# Or allow reordering for better performance\npipeline = Pipeline(stages=[stage1, stage2], preserve_order=False)\n```\n\n### Real-time Statistics\n\n```python\nstats = pipeline.get_stats()\nprint(f\"Processed: {stats.items_processed}\")\nprint(f\"Failed: {stats.items_failed}\")\nprint(f\"In-flight: {stats.items_in_flight}\")\nprint(f\"Queue sizes: {stats.queue_sizes}\")\n```\n\n---\n\n## Comparison: Before vs After\n\n### Before async-worker-flow \u274c\n```python\n# Process batches sequentially in groups\nfor batch_group in chunks(batches, 10):\n results = await asyncio.gather(*[process(b) for b in batch_group])\n # Wait for ALL 10 to complete before continuing\n # One slow task blocks 9 fast ones\n```\n\n**Problems:**\n- Idle workers waiting for slowest task\n- No automatic retry\n- Difficult to monitor progress\n- Hard to optimize worker counts per stage\n\n### After async-worker-flow \u2705\n```python\n# Workers continuously process available tasks\nstage = Stage(name=\"Process\", workers=10, tasks=[process])\npipeline = Pipeline(stages=[stage])\nresults = await pipeline.run(batches)\n```\n\n**Benefits:**\n- Zero idle time\n- Automatic retry with backoff\n- Real-time monitoring\n- Easy to tune performance\n\n---\n\n## Documentation\n\n\ud83d\udcda **Full documentation available at:** [asyncflow.readthedocs.io](https://asyncflow.readthedocs.io)\n\n- [Getting Started Guide](docs/getting-started/quickstart.md)\n- [AsyncExecutor Guide](docs/user-guide/executor.md)\n- [Pipeline Guide](docs/user-guide/pipeline.md)\n- [Error Handling & Retries](docs/user-guide/error-handling.md)\n- [API Reference](docs/api/index.md)\n- [Examples & Patterns](docs/examples/basic.md)\n\n---\n\n## Requirements\n\n- Python 3.9+\n- tenacity >= 8.0.0\n\n**Note**: For Python 3.9-3.10, the `taskgroup` backport is automatically installed.\n\n---\n\n## Contributing\n\nContributions are welcome! Please see our [Contributing Guidelines](CONTRIBUTING.md).\n\n---\n\n## License\n\nMIT License - see [LICENSE](LICENSE) file for details.\n\n---\n\n## Why async-worker-flow?\n\nI built async-worker-flow because I was tired of waiting for the slowest task in my batch processing pipelines. Every data engineer and ML engineer has faced this: you're processing hundreds or thousands of API calls, and one slow response blocks everything.\n\nasync-worker-flow solves this with a simple concept: **independent workers that never wait**. When a worker finishes a task, it immediately grabs the next one. No coordination overhead. No waiting. Just continuous, efficient processing.\n\nIf you're processing data through APIs, building ETL pipelines, or running any kind of batch async operations, async-worker-flow will save you time.\n\n**Start processing smarter, not harder.** \ud83d\ude80\n\n---\n\n<p align=\"center\">\n Made with \u2764\ufe0f to solve real problems in production\n</p>\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Async execution library with concurrent.futures-style API and advanced pipelines",
"version": "0.2.0",
"project_urls": {
"Documentation": "https://async-worker-flow.readthedocs.io",
"Homepage": "https://github.com/rodolfonobrega/async-worker-flow",
"Issues": "https://github.com/rodolfonobrega/async-worker-flow/issues",
"Repository": "https://github.com/rodolfonobrega/async-worker-flow"
},
"split_keywords": [
"async",
" asyncio",
" pipeline",
" concurrent",
" futures",
" executor",
" worker-pool"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "dbdfe6e647bc0088c3a1d6f82cebb5f72e2656cf5e1aefd2a8f6c2a4453235ad",
"md5": "c9f79dcdc9f5af0ba43f67a0d3a1a2f6",
"sha256": "4d693b05bf83e45a4a2857c9ff7e3abaef539edc3a2c736eff42d7a05378a06d"
},
"downloads": -1,
"filename": "async_worker_flow-0.2.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "c9f79dcdc9f5af0ba43f67a0d3a1a2f6",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 20577,
"upload_time": "2025-10-10T02:26:46",
"upload_time_iso_8601": "2025-10-10T02:26:46.755174Z",
"url": "https://files.pythonhosted.org/packages/db/df/e6e647bc0088c3a1d6f82cebb5f72e2656cf5e1aefd2a8f6c2a4453235ad/async_worker_flow-0.2.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "1c6756217cc40648bd0343fc2fa07af783403fa751650360159af06e5e09a3a4",
"md5": "bbd159b0d487122681b40eddb82d0cea",
"sha256": "657dc26257d9f00bc67ec9a3ba25e33ca5f89a20bb311588bb7e23e300ac0bdc"
},
"downloads": -1,
"filename": "async_worker_flow-0.2.0.tar.gz",
"has_sig": false,
"md5_digest": "bbd159b0d487122681b40eddb82d0cea",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 25330,
"upload_time": "2025-10-10T02:26:47",
"upload_time_iso_8601": "2025-10-10T02:26:47.537292Z",
"url": "https://files.pythonhosted.org/packages/1c/67/56217cc40648bd0343fc2fa07af783403fa751650360159af06e5e09a3a4/async_worker_flow-0.2.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-10-10 02:26:47",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "rodolfonobrega",
"github_project": "async-worker-flow",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "async-worker-flow"
}