# SwarmFlow
A distributed multi-agent orchestration framework for building scalable AI workflows with comprehensive observability.
## 🚀 Features
- **Dead-Simple API**: Minimal `@swarm_task` decorator and `run()` function
- **Auto-Dependency Inference**: Dependencies automatically inferred from function parameters
- **Agent Orchestration**: Create complex workflows with multiple AI agents
- **Retry Logic**: Built-in retry mechanisms for resilient agent execution
- **Observability**: OpenTelemetry integration for tracing and monitoring
- **Error Handling**: Graceful failure propagation and recovery
- **Real-time Monitoring**: Send task traces to your monitoring dashboard
- **Cycle Detection**: Automatic detection of circular dependencies
- **Production Ready**: Comprehensive error handling and logging
- **Hooks System**: Powerful before/after/error/final hooks for custom orchestration logic
- **Shared Memory**: Cross-task state sharing with `flow.memory`
- **Policy Enforcement**: DAG-level rules for cost limits, abort conditions, and validation
- **Modular Telemetry**: Comprehensive provider support with automatic metadata extraction
- **Cost Tracking**: Automatic cost calculation and tracking across all major LLM providers
## 📦 Installation
```bash
pip install swarmflow
```
## 🎯 Quick Start
```python
from swarmflow import swarm_task, run
@swarm_task
def fetch_data():
return "Some data from API"
@swarm_task
def process_data(fetch_data):
return f"Processed: {fetch_data}"
@swarm_task
def display_result(process_data):
print(f"Final result: {process_data}")
# Run workflow - that's it!
run()
```
**That's it!** No complex setup, no manual dependency management. SwarmFlow automatically:
- ✅ Registers your tasks
- ✅ Infers dependencies from function parameters
- ✅ Executes in the correct order
- ✅ Handles retries and errors
- ✅ Sends traces to your dashboard
## 🔧 Advanced Usage
### Retry Logic
```python
@swarm_task(retries=3)
def unreliable_task():
# This task will retry up to 3 times on failure
pass
```
### Multiple Dependencies
```python
@swarm_task
def step1():
return "Step 1 completed"
@swarm_task
def step2():
return "Step 2 completed"
@swarm_task
def step3():
return "Step 3 completed"
@swarm_task
def final_step(step1, step2, step3):
# Dependencies automatically inferred from parameter names
return f"Combined: {step1}, {step2}, {step3}"
run()
```
### Multi-Provider LLM Support
```python
from groq import Groq
from openai import OpenAI
from anthropic import Anthropic
@swarm_task
def groq_task():
client = Groq()
response = client.chat.completions.create(
model="llama-3-70b",
messages=[{"role": "user", "content": "Hello"}]
)
return response
@swarm_task
def openai_task():
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Hello"}]
)
return response
@swarm_task
def anthropic_task():
client = Anthropic()
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=100,
messages=[{"role": "user", "content": "Hello"}]
)
return response
# SwarmFlow automatically detects and extracts metadata from all providers:
# - Model name and provider identification
# - Token usage (prompt + completion tokens)
# - Precise cost calculation (USD) using current pricing
# - Timing metrics (queue, prompt, completion, total time)
# - All added to task.metadata automatically
# Example output with metadata:
# [SwarmFlow] Task: groq_task
# ↳ Status: success
# ↳ Duration: 1234 ms
# ↳ Output: <Groq ChatCompletion object>
# ↳ Metadata: {'provider': 'Groq', 'model': 'llama-3-70b', 'tokens_used': 150, 'cost_usd': 0.000089, 'queue_time_s': 0.1, 'prompt_time_s': 0.5, 'completion_time_s': 0.8, 'total_time_s': 1.4}
```
### API Key Configuration
SwarmFlow automatically handles API keys with Martian-style simplicity:
```python
# Option 1: Set environment variable
export SWARMFLOW_API_KEY="sk_abc123..."
run() # Automatically uses key from environment
# Option 2: Pass directly
run(api_key="sk_abc123...")
# Option 3: No key (logs warning but continues)
run() # Shows warning but executes normally
```
### Hooks System & Shared Memory
SwarmFlow now includes a powerful hooks system for custom orchestration logic and shared memory for cross-task state management:
```python
from swarmflow import swarm_task, run
from swarmflow.hooks import write_output_to_memory, read_memory_into_arg, log_input_output
@swarm_task(before=log_input_output()[0], after=log_input_output()[1])
def fetch_data():
return "Some data from API"
@swarm_task(after=write_output_to_memory("processed_data"))
def process_data(fetch_data):
return f"Processed: {fetch_data}"
@swarm_task(before=read_memory_into_arg("processed_data", "input_data"))
def display_result(process_data, input_data=None):
print(f"Final result: {process_data}")
print(f"From memory: {input_data}")
# Run workflow - that's it!
run()
```
**Available Hooks:**
- `before`: Execute before task runs
- `after`: Execute after task succeeds
- `on_error`: Execute when task fails
- `on_final`: Execute after task completes (success or failure)
**Built-in Hook Utilities:**
- `write_output_to_memory(key)`: Save task output to shared memory
- `read_memory_into_arg(mem_key, arg_name)`: Inject memory value into task arguments
- `log_input_output()`: Log task inputs and outputs
- `enforce_max_cost(max_usd)`: Abort if total cost exceeds limit
- `set_flag_on_failure(flag_key)`: Set memory flag when task fails
- `skip_if_flag_set(flag_key)`: Skip task if memory flag is True
### Policy Enforcement
Set DAG-level policies for cost limits, abort conditions, and validation:
```python
from swarmflow import swarm_task, run, SwarmFlow
# Create flow for policy configuration
flow = SwarmFlow()
flow.set_policy("max_cost", 0.10) # Abort if total cost > $0.10
flow.set_policy("abort_on_flag", "error_detected") # Abort if flag is True
flow.set_policy("require_outputs", ["final_result"]) # Abort if missing outputs
@swarm_task
def task1():
return "Task 1 result"
@swarm_task
def task2(task1):
return "Task 2 result"
# Run with policies enforced
run()
```
### Real-time Monitoring
SwarmFlow automatically sends task traces to the SwarmFlow backend service at `http://localhost:8000/api/trace` for real-time monitoring and analytics.
**Trace Structure:**
```json
{
"id": "task-uuid",
"run_id": "dag-run-uuid", // Consistent across all tasks in the same DAG run
"name": "task_name",
"status": "success|failure|retrying|skipped",
"duration_ms": 1234,
"output": "task output",
"metadata": {
"agent": "LLMProcessor",
"provider": "Groq",
"model": "llama-3-70b",
"tokens_used": 150,
"cost_usd": 0.000089
},
"dependencies": ["dep1", "dep2"],
"flow_memory": {"key": "value"}, // Shared memory state
"flow_policy": {"max_cost": 0.10} // Active policies
}
```
### Observability
SwarmFlow automatically provides:
- **Task execution traces** with OpenTelemetry
- **Performance metrics** (execution time, success rates)
- **Dependency visualization** and cycle detection
- **Error tracking** and failure propagation
- **Multi-provider metadata extraction** (Groq, OpenAI, Anthropic with precise cost calculation and timing metrics)
- **Comprehensive cost tracking** across all supported LLM providers
## 🏗️ Architecture
SwarmFlow is designed for **production multi-agent systems** with dead-simple usage:
```
User's Agent Functions → @swarm_task decorator → run() → Observability Dashboard
```
- **Minimal**: Just decorator + run function
- **Scalable**: Handles complex dependency graphs
- **Observable**: Real-time monitoring and debugging
- **Resilient**: Built-in retry logic and error handling
## 📊 Monitoring Dashboard
Get comprehensive insights into your multi-agent workflows:
- **Real-time execution** monitoring
- **Performance analytics** and optimization
- **Error tracking** and debugging
- **Cost analysis** for LLM usage (auto-calculated across all providers)
- **Workflow visualization** and dependency graphs
- **Multi-provider metadata extraction** (Groq, OpenAI, Anthropic with comprehensive model support)
- **DAG run tracking** with unique run_id for grouping and analytics
## 🚀 Deployment Configuration
### API Key Authentication
SwarmFlow supports API key authentication for secure trace reporting:
```python
# Option 1: Environment variable (recommended)
export SWARMFLOW_API_KEY="sk_abc123..."
run() # Automatically picks up from environment
# Option 2: Pass directly
run(api_key="sk_abc123...")
# Option 3: No authentication (logs warning but continues)
run() # Shows warning but executes normally
```
### Backend Configuration
SwarmFlow automatically sends traces to `http://localhost:8000/api/trace`. For production deployment, update the backend URL in the SDK code to point to your centralized backend service.
## 🤝 Contributing
We welcome contributions! Please see our [Contributing Guidelines](https://github.com/anirame128/swarmflow/blob/main/CONTRIBUTING.md).
## 📚 Documentation
For detailed documentation, visit: [https://github.com/anirame128/swarmflow](https://github.com/anirame128/swarmflow)
## 📄 License
### SDK License
The SwarmFlow SDK is licensed under the MIT License - see [LICENSE](https://github.com/anirame128/swarmflow/blob/main/LICENSE) file for details.
### Backend Services
SwarmFlow backend services, dashboard, and infrastructure require separate service agreements and API keys. The SDK is designed to work with official SwarmFlow backend services only.
**Why this model?**
- ✅ **Free SDK**: Developers can use the SDK without restrictions
- ✅ **Paid Services**: Backend services and dashboard require API keys
- ✅ **Industry Standard**: Follows the same model as Google Maps, Stripe, AWS SDKs
- ✅ **Developer Friendly**: Maximizes adoption while protecting your business model
Raw data
{
"_id": null,
"home_page": "https://github.com/anirame128/swarmflow",
"name": "swarmflow",
"maintainer": "Anirudh Ramesh",
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": "anirudhramesh2021@gmail.com",
"keywords": "ai, agents, orchestration, workflow, llm, multi-agent, distributed, observability",
"author": "Anirudh Ramesh",
"author_email": "anirudhramesh2021@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/1c/f6/e24e3712d05a8541b2a4f03679a5144ff72cb5ea601928f4606940861857/swarmflow-0.5.0.tar.gz",
"platform": null,
"description": "# SwarmFlow\n\nA distributed multi-agent orchestration framework for building scalable AI workflows with comprehensive observability.\n\n## \ud83d\ude80 Features\n\n- **Dead-Simple API**: Minimal `@swarm_task` decorator and `run()` function\n- **Auto-Dependency Inference**: Dependencies automatically inferred from function parameters\n- **Agent Orchestration**: Create complex workflows with multiple AI agents\n- **Retry Logic**: Built-in retry mechanisms for resilient agent execution\n- **Observability**: OpenTelemetry integration for tracing and monitoring\n- **Error Handling**: Graceful failure propagation and recovery\n- **Real-time Monitoring**: Send task traces to your monitoring dashboard\n- **Cycle Detection**: Automatic detection of circular dependencies\n- **Production Ready**: Comprehensive error handling and logging\n- **Hooks System**: Powerful before/after/error/final hooks for custom orchestration logic\n- **Shared Memory**: Cross-task state sharing with `flow.memory`\n- **Policy Enforcement**: DAG-level rules for cost limits, abort conditions, and validation\n- **Modular Telemetry**: Comprehensive provider support with automatic metadata extraction\n- **Cost Tracking**: Automatic cost calculation and tracking across all major LLM providers\n\n## \ud83d\udce6 Installation\n\n```bash\npip install swarmflow\n```\n\n## \ud83c\udfaf Quick Start\n\n```python\nfrom swarmflow import swarm_task, run\n\n@swarm_task\ndef fetch_data():\n return \"Some data from API\"\n\n@swarm_task\ndef process_data(fetch_data):\n return f\"Processed: {fetch_data}\"\n\n@swarm_task\ndef display_result(process_data):\n print(f\"Final result: {process_data}\")\n\n# Run workflow - that's it!\nrun()\n```\n\n**That's it!** No complex setup, no manual dependency management. SwarmFlow automatically:\n- \u2705 Registers your tasks\n- \u2705 Infers dependencies from function parameters\n- \u2705 Executes in the correct order\n- \u2705 Handles retries and errors\n- \u2705 Sends traces to your dashboard\n\n## \ud83d\udd27 Advanced Usage\n\n### Retry Logic\n```python\n@swarm_task(retries=3)\ndef unreliable_task():\n # This task will retry up to 3 times on failure\n pass\n```\n\n### Multiple Dependencies\n```python\n@swarm_task\ndef step1():\n return \"Step 1 completed\"\n\n@swarm_task\ndef step2():\n return \"Step 2 completed\"\n\n@swarm_task\ndef step3():\n return \"Step 3 completed\"\n\n@swarm_task\ndef final_step(step1, step2, step3):\n # Dependencies automatically inferred from parameter names\n return f\"Combined: {step1}, {step2}, {step3}\"\n\nrun()\n```\n\n### Multi-Provider LLM Support\n```python\nfrom groq import Groq\nfrom openai import OpenAI\nfrom anthropic import Anthropic\n\n@swarm_task\ndef groq_task():\n client = Groq()\n response = client.chat.completions.create(\n model=\"llama-3-70b\",\n messages=[{\"role\": \"user\", \"content\": \"Hello\"}]\n )\n return response\n\n@swarm_task\ndef openai_task():\n client = OpenAI()\n response = client.chat.completions.create(\n model=\"gpt-4o-mini\",\n messages=[{\"role\": \"user\", \"content\": \"Hello\"}]\n )\n return response\n\n@swarm_task\ndef anthropic_task():\n client = Anthropic()\n response = client.messages.create(\n model=\"claude-3-5-sonnet-20241022\",\n max_tokens=100,\n messages=[{\"role\": \"user\", \"content\": \"Hello\"}]\n )\n return response\n\n# SwarmFlow automatically detects and extracts metadata from all providers:\n# - Model name and provider identification\n# - Token usage (prompt + completion tokens)\n# - Precise cost calculation (USD) using current pricing\n# - Timing metrics (queue, prompt, completion, total time)\n# - All added to task.metadata automatically\n\n# Example output with metadata:\n# [SwarmFlow] Task: groq_task\n# \u21b3 Status: success\n# \u21b3 Duration: 1234 ms\n# \u21b3 Output: <Groq ChatCompletion object>\n# \u21b3 Metadata: {'provider': 'Groq', 'model': 'llama-3-70b', 'tokens_used': 150, 'cost_usd': 0.000089, 'queue_time_s': 0.1, 'prompt_time_s': 0.5, 'completion_time_s': 0.8, 'total_time_s': 1.4}\n```\n\n### API Key Configuration\nSwarmFlow automatically handles API keys with Martian-style simplicity:\n\n```python\n# Option 1: Set environment variable\nexport SWARMFLOW_API_KEY=\"sk_abc123...\"\nrun() # Automatically uses key from environment\n\n# Option 2: Pass directly\nrun(api_key=\"sk_abc123...\")\n\n# Option 3: No key (logs warning but continues)\nrun() # Shows warning but executes normally\n```\n\n### Hooks System & Shared Memory\nSwarmFlow now includes a powerful hooks system for custom orchestration logic and shared memory for cross-task state management:\n\n```python\nfrom swarmflow import swarm_task, run\nfrom swarmflow.hooks import write_output_to_memory, read_memory_into_arg, log_input_output\n\n@swarm_task(before=log_input_output()[0], after=log_input_output()[1])\ndef fetch_data():\n return \"Some data from API\"\n\n@swarm_task(after=write_output_to_memory(\"processed_data\"))\ndef process_data(fetch_data):\n return f\"Processed: {fetch_data}\"\n\n@swarm_task(before=read_memory_into_arg(\"processed_data\", \"input_data\"))\ndef display_result(process_data, input_data=None):\n print(f\"Final result: {process_data}\")\n print(f\"From memory: {input_data}\")\n\n# Run workflow - that's it!\nrun()\n```\n\n**Available Hooks:**\n- `before`: Execute before task runs\n- `after`: Execute after task succeeds \n- `on_error`: Execute when task fails\n- `on_final`: Execute after task completes (success or failure)\n\n**Built-in Hook Utilities:**\n- `write_output_to_memory(key)`: Save task output to shared memory\n- `read_memory_into_arg(mem_key, arg_name)`: Inject memory value into task arguments\n- `log_input_output()`: Log task inputs and outputs\n- `enforce_max_cost(max_usd)`: Abort if total cost exceeds limit\n- `set_flag_on_failure(flag_key)`: Set memory flag when task fails\n- `skip_if_flag_set(flag_key)`: Skip task if memory flag is True\n\n### Policy Enforcement\nSet DAG-level policies for cost limits, abort conditions, and validation:\n\n```python\nfrom swarmflow import swarm_task, run, SwarmFlow\n\n# Create flow for policy configuration\nflow = SwarmFlow()\nflow.set_policy(\"max_cost\", 0.10) # Abort if total cost > $0.10\nflow.set_policy(\"abort_on_flag\", \"error_detected\") # Abort if flag is True\nflow.set_policy(\"require_outputs\", [\"final_result\"]) # Abort if missing outputs\n\n@swarm_task\ndef task1():\n return \"Task 1 result\"\n\n@swarm_task\ndef task2(task1):\n return \"Task 2 result\"\n\n# Run with policies enforced\nrun()\n```\n\n### Real-time Monitoring\nSwarmFlow automatically sends task traces to the SwarmFlow backend service at `http://localhost:8000/api/trace` for real-time monitoring and analytics.\n\n**Trace Structure:**\n```json\n{\n \"id\": \"task-uuid\",\n \"run_id\": \"dag-run-uuid\", // Consistent across all tasks in the same DAG run\n \"name\": \"task_name\",\n \"status\": \"success|failure|retrying|skipped\",\n \"duration_ms\": 1234,\n \"output\": \"task output\",\n \"metadata\": {\n \"agent\": \"LLMProcessor\",\n \"provider\": \"Groq\",\n \"model\": \"llama-3-70b\",\n \"tokens_used\": 150,\n \"cost_usd\": 0.000089\n },\n \"dependencies\": [\"dep1\", \"dep2\"],\n \"flow_memory\": {\"key\": \"value\"}, // Shared memory state\n \"flow_policy\": {\"max_cost\": 0.10} // Active policies\n}\n```\n\n### Observability\nSwarmFlow automatically provides:\n- **Task execution traces** with OpenTelemetry\n- **Performance metrics** (execution time, success rates)\n- **Dependency visualization** and cycle detection\n- **Error tracking** and failure propagation\n- **Multi-provider metadata extraction** (Groq, OpenAI, Anthropic with precise cost calculation and timing metrics)\n- **Comprehensive cost tracking** across all supported LLM providers\n\n## \ud83c\udfd7\ufe0f Architecture\n\nSwarmFlow is designed for **production multi-agent systems** with dead-simple usage:\n\n```\nUser's Agent Functions \u2192 @swarm_task decorator \u2192 run() \u2192 Observability Dashboard\n```\n\n- **Minimal**: Just decorator + run function\n- **Scalable**: Handles complex dependency graphs\n- **Observable**: Real-time monitoring and debugging\n- **Resilient**: Built-in retry logic and error handling\n\n## \ud83d\udcca Monitoring Dashboard\n\nGet comprehensive insights into your multi-agent workflows:\n- **Real-time execution** monitoring\n- **Performance analytics** and optimization\n- **Error tracking** and debugging\n- **Cost analysis** for LLM usage (auto-calculated across all providers)\n- **Workflow visualization** and dependency graphs\n- **Multi-provider metadata extraction** (Groq, OpenAI, Anthropic with comprehensive model support)\n- **DAG run tracking** with unique run_id for grouping and analytics\n\n## \ud83d\ude80 Deployment Configuration\n\n### API Key Authentication\nSwarmFlow supports API key authentication for secure trace reporting:\n\n```python\n# Option 1: Environment variable (recommended)\nexport SWARMFLOW_API_KEY=\"sk_abc123...\"\nrun() # Automatically picks up from environment\n\n# Option 2: Pass directly\nrun(api_key=\"sk_abc123...\")\n\n# Option 3: No authentication (logs warning but continues)\nrun() # Shows warning but executes normally\n```\n\n### Backend Configuration\nSwarmFlow automatically sends traces to `http://localhost:8000/api/trace`. For production deployment, update the backend URL in the SDK code to point to your centralized backend service.\n\n## \ud83e\udd1d Contributing\n\nWe welcome contributions! Please see our [Contributing Guidelines](https://github.com/anirame128/swarmflow/blob/main/CONTRIBUTING.md).\n\n## \ud83d\udcda Documentation\n\nFor detailed documentation, visit: [https://github.com/anirame128/swarmflow](https://github.com/anirame128/swarmflow)\n\n## \ud83d\udcc4 License\n\n### SDK License\nThe SwarmFlow SDK is licensed under the MIT License - see [LICENSE](https://github.com/anirame128/swarmflow/blob/main/LICENSE) file for details.\n\n### Backend Services\nSwarmFlow backend services, dashboard, and infrastructure require separate service agreements and API keys. The SDK is designed to work with official SwarmFlow backend services only.\n\n**Why this model?**\n- \u2705 **Free SDK**: Developers can use the SDK without restrictions\n- \u2705 **Paid Services**: Backend services and dashboard require API keys\n- \u2705 **Industry Standard**: Follows the same model as Google Maps, Stripe, AWS SDKs\n- \u2705 **Developer Friendly**: Maximizes adoption while protecting your business model\n",
"bugtrack_url": null,
"license": null,
"summary": "SwarmFlow: A distributed multi-agent orchestration framework",
"version": "0.5.0",
"project_urls": {
"Bug Reports": "https://github.com/anirame128/swarmflow/issues",
"Changelog": "https://github.com/anirame128/swarmflow/blob/main/CHANGELOG.md",
"Documentation": "https://github.com/anirame128/swarmflow#readme",
"Download": "https://github.com/anirame128/swarmflow/archive/refs/tags/v0.5.0.tar.gz",
"Homepage": "https://github.com/anirame128/swarmflow",
"Source": "https://github.com/anirame128/swarmflow"
},
"split_keywords": [
"ai",
" agents",
" orchestration",
" workflow",
" llm",
" multi-agent",
" distributed",
" observability"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "41bfb4f787a123bd36b72cc612306c5f85011f55c8eea4048c38212616ba93c6",
"md5": "d03aacd907ca4ef687b0d3ee9b008bc1",
"sha256": "6a373c57e15f62e18cd97e462193f83443f52a3bc8061e3d474e06a97d4fde6b"
},
"downloads": -1,
"filename": "swarmflow-0.5.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "d03aacd907ca4ef687b0d3ee9b008bc1",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 29586,
"upload_time": "2025-08-08T17:20:55",
"upload_time_iso_8601": "2025-08-08T17:20:55.280698Z",
"url": "https://files.pythonhosted.org/packages/41/bf/b4f787a123bd36b72cc612306c5f85011f55c8eea4048c38212616ba93c6/swarmflow-0.5.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "1cf6e24e3712d05a8541b2a4f03679a5144ff72cb5ea601928f4606940861857",
"md5": "1d18859767a5805a7e9bda5c9878e264",
"sha256": "274a02172686c9f9b4f3e80384d02a530e0b8ad0e193c9c8c32676903a4a6298"
},
"downloads": -1,
"filename": "swarmflow-0.5.0.tar.gz",
"has_sig": false,
"md5_digest": "1d18859767a5805a7e9bda5c9878e264",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 30321,
"upload_time": "2025-08-08T17:20:56",
"upload_time_iso_8601": "2025-08-08T17:20:56.603737Z",
"url": "https://files.pythonhosted.org/packages/1c/f6/e24e3712d05a8541b2a4f03679a5144ff72cb5ea601928f4606940861857/swarmflow-0.5.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-08-08 17:20:56",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "anirame128",
"github_project": "swarmflow",
"github_not_found": true,
"lcname": "swarmflow"
}