# Arshai
**A powerful AI application framework built on clean architecture principles with complete developer control.**
```mermaid
graph TD
subgraph L3 ["Layer 3: Agentic Systems"]
WF[Workflows] --> AG[Agents]
MEM[Memory Systems] --> AG
ORCH[Orchestration] --> AG
end
subgraph L2 ["Layer 2: Agents"]
AG --> TOOLS[Tool Integration]
AG --> LOGIC[Custom Logic]
end
subgraph L1 ["Layer 1: LLM Clients"]
LOGIC --> OPENAI[OpenAI]
LOGIC --> AZURE[Azure OpenAI]
LOGIC --> GEMINI[Google Gemini]
LOGIC --> ROUTER[OpenRouter]
end
style L3 fill:#e1f5fe
style L2 fill:#f3e5f5
style L1 fill:#e8f5e8
```
## Philosophy: Developer Authority First
Arshai empowers you to build exactly what you need, when you need it, how you need it. **You control every component, dependency, and interaction.** No magic, no hidden behavior, no framework lock-in.
### The Arshai Way vs Traditional Frameworks
**ā Traditional Framework Approach:**
```python
# Framework controls everything - what's happening inside?
framework = AIFramework()
framework.load_config("config.yaml")
agent = framework.create_agent("chatbot") # Hidden creation
response = agent.chat("Hello") # Opaque behavior
```
**ā
Arshai Approach - You're In Control:**
```python
# You create, configure, and control everything
llm_config = ILLMConfig(model="gpt-4", temperature=0.7)
llm_client = OpenAIClient(llm_config) # You create it
agent = ChatAgent(
llm_client=llm_client, # You inject dependencies
system_prompt="Be helpful", # You configure behavior
memory=memory_manager # You manage state
)
response = await agent.process(input_data) # You control the flow
```
## Three-Layer Architecture
### Layer 1: LLM Clients (Foundation)
**Minimal Developer Authority** - Standardized access to AI providers
```python
from arshai.llms.openai import OpenAIClient
from arshai.core.interfaces.illm import ILLMConfig, ILLMInput
# Direct instantiation - you create when needed
config = ILLMConfig(model="gpt-4", temperature=0.7)
llm = OpenAIClient(config)
# Unified interface across all providers
llm_input = ILLMInput(
system_prompt="You are a helpful assistant",
user_message="What's the weather like?",
regular_functions={"get_weather": get_weather_func},
background_tasks={"log_interaction": log_func} # Fire-and-forget
)
response = await llm.chat(llm_input)
```
**Supported Providers:**
- **OpenAI** - Direct API with streaming and function calling
- **Azure OpenAI** - Enterprise deployment with native parsing
- **Google Gemini** - Native SDK with dual authentication
- **OpenRouter** - Access to Claude, Llama, and 200+ models
- **Extensible** - Add new providers by implementing `ILLM` interface
### Layer 2: Agents (Business Logic)
**Moderate Developer Authority** - Purpose-driven AI components
```python
from arshai.agents.base import BaseAgent
from arshai.core.interfaces.iagent import IAgentInput
class CustomerServiceAgent(BaseAgent):
"""Your custom agent with your business logic."""
def __init__(self, llm_client: ILLM, knowledge_base: KnowledgeBase):
super().__init__(llm_client, "You are a customer service expert")
self.knowledge_base = knowledge_base
async def process(self, input: IAgentInput) -> Dict[str, Any]:
# Your custom preprocessing
context = await self._analyze_request(input.message)
# Define tools for this specific agent
async def search_kb(query: str) -> str:
return await self.knowledge_base.search(query)
async def log_issue(issue: str) -> None:
"""BACKGROUND TASK: Log customer issue."""
await self.issue_tracker.create_ticket(issue)
# Your LLM integration with custom tools
llm_input = ILLMInput(
system_prompt=self.system_prompt,
user_message=f"Context: {context}\nUser: {input.message}",
regular_functions={"search_knowledge": search_kb},
background_tasks={"log_issue": log_issue}
)
result = await self.llm_client.chat(llm_input)
# Your custom post-processing
return {
"response": result["llm_response"],
"confidence": self._calculate_confidence(result),
"escalate": self._should_escalate(result)
}
```
### Layer 3: Agentic Systems (Complete Control)
**Maximum Developer Authority** - Orchestrate everything exactly as you need
```python
class CustomerSupportSystem:
"""Your complete system - you control everything."""
def __init__(self):
# You create all components
llm = OpenAIClient(ILLMConfig(model="gpt-4"))
memory = RedisMemoryManager(redis_url=os.getenv("REDIS_URL"))
knowledge_base = VectorKnowledgeBase(milvus_client, embeddings)
# You compose the system
self.triage_agent = TriageAgent(llm, "Route customer requests")
self.support_agent = CustomerServiceAgent(llm, knowledge_base)
self.escalation_agent = EscalationAgent(llm, ticket_system)
# You define the workflow
self.memory = memory
async def handle_request(self, user_message: str, user_id: str) -> Dict[str, Any]:
"""Your orchestration logic."""
# Step 1: Retrieve user history
history = await self.memory.retrieve({"user_id": user_id})
# Step 2: Triage the request
triage_input = IAgentInput(
message=user_message,
metadata={"history": history, "user_id": user_id}
)
triage_result = await self.triage_agent.process(triage_input)
# Step 3: Route based on triage (your business logic)
if triage_result["urgency"] == "high":
agent = self.escalation_agent
else:
agent = self.support_agent
# Step 4: Process with selected agent
support_input = IAgentInput(
message=user_message,
metadata={
"user_id": user_id,
"triage": triage_result,
"history": history
}
)
final_result = await agent.process(support_input)
# Step 5: Update memory (your state management)
await self.memory.store({
"user_id": user_id,
"interaction": {
"request": user_message,
"response": final_result["response"],
"agent_used": agent.__class__.__name__,
"timestamp": datetime.now().isoformat()
}
})
# Your final response format
return {
"response": final_result["response"],
"agent_used": agent.__class__.__name__,
"escalated": isinstance(agent, EscalationAgent),
"confidence": final_result.get("confidence", 0.0)
}
```
## Comprehensive Feature Overview
### šÆ **Developer Authority**
- **Direct Instantiation**: You create every component explicitly
- **Dependency Injection**: All dependencies passed through constructors
- **No Hidden Magic**: Every behavior is visible in your code
- **Complete Control**: You decide architecture, not the framework
- **Interface-First Design**: Well-defined contracts for easy testing
- **Zero Framework Lock-in**: Use what you need, ignore the rest
### š¤ **Advanced LLM Client System**
#### **Multi-Provider Support**
- **OpenAI Integration**: Native API with GPT-4o, GPT-4o-mini, and all models
- Streaming with real-time function execution
- Advanced function calling with parallel execution
- Structured outputs with Pydantic models
- Token usage tracking and cost optimization
- Rate limiting and error handling
- **Azure OpenAI Enterprise**: Full Azure deployment support
- Azure authentication and endpoint management
- Content filtering and compliance features
- Regional deployment flexibility
- Enterprise security and audit logging
- **Google Gemini**: Native SDK integration
- Gemini Pro, Flash, and specialized models
- Dual authentication (API key and Service Account)
- Advanced safety settings and content filtering
- Image and multimodal capabilities
- Real-time streaming with function calls
- **OpenRouter**: Access to 200+ models including:
- Anthropic Claude 3.5 Sonnet, Haiku, Opus
- Meta Llama 3.1, 3.2 (8B, 70B, 405B)
- Mistral Large, 8x7B, 8x22B
- Cohere Command R+
- Specialized models for coding, reasoning, and creativity
#### **Unified Function Calling Architecture**
```python
# Works identically across all providers
def get_weather(city: str) -> dict:
"""Get current weather for a city."""
return {"temperature": 72, "condition": "sunny"}
async def send_notification(message: str) -> None:
"""BACKGROUND TASK: Send notification."""
# Runs independently, doesn't block conversation
print(f"š§ Notification: {message}")
# Same interface for any provider
llm_input = ILLMInput(
system_prompt="You are a helpful assistant",
user_message="What's the weather in San Francisco?",
regular_functions={"get_weather": get_weather},
background_tasks={"send_notification": send_notification}
)
# Works with OpenAI, Azure, Gemini, OpenRouter
response = await llm_client.chat(llm_input)
```
#### **Progressive Streaming**
- **Real-time Function Execution**: Functions execute immediately during streaming
- **Parallel Tool Execution**: Regular tools run concurrently for speed
- **Background Task Management**: Fire-and-forget operations tracked automatically
- **Enhanced Context Building**: Function results integrated into conversation
- **Safe Usage Accumulation**: Robust token and cost tracking during streaming
### š§ **Sophisticated Agent System**
#### **Base Agent Capabilities**
- **Flexible Return Types**: Return strings, dicts, generators, or custom objects
- **Stateless Design**: Pure functions with no hidden state
- **Tool Integration**: Easy integration of custom functions and APIs
- **Memory Compatibility**: Works with any memory backend
- **Error Resilience**: Graceful handling of LLM failures
#### **Specialized Agent Types**
```python
# Working Memory Agent - Maintains conversation context
from arshai.agents.working_memory import WorkingMemoryAgent
# Base Agent - Flexible foundation for custom agents
from arshai.agents.base import BaseAgent
# Custom domain-specific agents
class CustomerServiceAgent(BaseAgent):
def __init__(self, llm_client, knowledge_base, escalation_system):
super().__init__(llm_client, "Customer service expert")
self.knowledge_base = knowledge_base
self.escalation_system = escalation_system
async def process(self, input: IAgentInput) -> Dict[str, Any]:
# Your custom business logic
# Access to knowledge base, escalation, metrics
# Return structured responses with confidence scores
pass
```
#### **Agent Development Patterns**
- **Domain Specialization**: Create agents for specific business domains
- **Tool Composition**: Combine multiple tools and APIs per agent
- **Response Formatting**: Structure outputs exactly as needed
- **Confidence Scoring**: Built-in or custom confidence assessment
- **Escalation Logic**: Automatic escalation to human agents or specialized systems
### š§ **Comprehensive Tool Ecosystem**
#### **Built-in Tools**
- **Web Search**: SearxNG integration with connection pooling
- Multiple search engines aggregation
- Result filtering and ranking
- Performance optimization for production
- **Knowledge Base**: Vector database integration
- Milvus support with hybrid search
- Document ingestion and chunking
- Semantic search with embedding models
- Metadata filtering and faceted search
- **MCP (Model Context Protocol) Tools**: Dynamic tool loading
- File system operations (read, write, search)
- Database connections and queries
- API integrations and webhooks
- Custom tool development framework
- Thread pool management for performance
#### **Custom Tool Development**
```python
from arshai.core.interfaces.itool import ITool
class DatabaseTool(ITool):
def __init__(self, db_connection):
self.db = db_connection
async def search_products(self, query: str, category: str = None) -> List[Dict]:
"""Search products in database."""
# Your database logic
return results
async def log_interaction(self, user_id: str, action: str) -> None:
"""BACKGROUND TASK: Log user interaction."""
# Background logging, doesn't affect conversation
await self.db.log(user_id, action)
# Easy integration with agents
db_tool = DatabaseTool(connection)
agent = CustomAgent(
llm_client,
regular_functions={"search_products": db_tool.search_products},
background_tasks={"log_interaction": db_tool.log_interaction}
)
```
### 𧬠**Agentic System Orchestration**
#### **Workflow Management**
- **State-Driven Workflows**: Complex multi-step processes
- **Conditional Branching**: Dynamic execution paths based on results
- **Error Recovery**: Automatic retry and fallback mechanisms
- **Parallel Execution**: Concurrent agent execution for performance
- **State Persistence**: Resume workflows across sessions
#### **Multi-Agent Coordination**
```python
class IntelligentHelpdesk:
def __init__(self):
# Create specialized agents
self.triage_agent = TriageAgent(llm_client)
self.technical_agent = TechnicalSupportAgent(llm_client, knowledge_base)
self.billing_agent = BillingAgent(llm_client, billing_system)
self.escalation_agent = EscalationAgent(llm_client, human_queue)
# Shared resources
self.memory_manager = RedisMemoryManager()
self.metrics_collector = MetricsCollector()
async def handle_request(self, request: str, user_id: str) -> Dict[str, Any]:
# Step 1: Triage determines routing
triage_result = await self.triage_agent.classify(request)
# Step 2: Route to appropriate specialist
if triage_result['category'] == 'technical':
agent = self.technical_agent
elif triage_result['category'] == 'billing':
agent = self.billing_agent
elif triage_result['urgency'] == 'critical':
agent = self.escalation_agent
# Step 3: Process with context and memory
context = await self.memory_manager.get_user_context(user_id)
result = await agent.process_with_context(request, context)
# Step 4: Update memory and metrics
await self.memory_manager.update_context(user_id, result)
await self.metrics_collector.record_interaction(result)
return result
```
### š¾ **Advanced Memory Management**
#### **Memory Backend Options**
- **Redis Memory Manager**: Production-ready distributed memory
- Automatic TTL management
- Connection pooling and failover
- Pub/sub for real-time updates
- Memory cleanup and optimization
- **In-Memory Manager**: Fast development and testing
- LRU eviction policies
- Background cleanup tasks
- Memory usage monitoring
- **Working Memory System**: Context-aware conversation tracking
- Automatic context summarization
- Relevance scoring and pruning
- Cross-session continuity
#### **Memory Usage Patterns**
```python
# Environment-specific memory selection
def create_memory_system(environment: str) -> IMemoryManager:
if environment == "production":
return RedisMemoryManager(
redis_url=os.getenv("REDIS_URL"),
ttl=3600, # 1 hour TTL
key_prefix="app_prod",
connection_pool_size=10
)
elif environment == "development":
return InMemoryManager(
ttl=1800, # 30 minutes
max_size=1000,
cleanup_interval=300 # 5 minutes
)
else:
return InMemoryManager(ttl=300) # Testing
# Advanced conversation memory
class ConversationMemory:
def __init__(self, memory_manager, summarizer_agent):
self.memory = memory_manager
self.summarizer = summarizer_agent
async def store_conversation(self, user_id: str, conversation: List[Dict]):
# Automatic summarization for long conversations
if len(conversation) > 10:
summary = await self.summarizer.summarize(conversation[-10:])
conversation = conversation[:-10] + [summary]
await self.memory.store({
"user_id": user_id,
"conversation": conversation,
"timestamp": datetime.now().isoformat()
})
```
### š **Production-Grade Observability**
#### **OpenTelemetry Integration**
- **Automatic Instrumentation**: No code changes required
- **Distributed Tracing**: Track requests across services
- **Custom Metrics**: Business-specific monitoring
- **Export Flexibility**: Send to any OTLP-compatible backend
#### **Built-in Metrics**
```python
# Automatic metrics collection
metrics_collected = {
"llm_requests_total": "Counter of LLM API calls",
"llm_request_duration_seconds": "Histogram of request latencies",
"llm_tokens_total": "Counter of tokens used (input/output/total)",
"llm_cost_total": "Counter of API costs by provider",
"llm_time_to_first_token_seconds": "Time to start streaming",
"llm_time_to_last_token_seconds": "Total generation time",
"agent_processing_duration_seconds": "Agent processing time",
"memory_operations_total": "Memory store/retrieve operations",
"tool_execution_duration_seconds": "Tool execution timing",
"background_task_duration_seconds": "Background task timing"
}
# Zero-fallback monitoring - always works
from arshai.observability import ObservabilityConfig
# Optional: Advanced configuration
obs_config = ObservabilityConfig(
endpoint="http://jaeger:14268/api/traces",
service_name="arshai_app",
environment="production",
custom_metrics={"business_metric": "counter"}
)
# Automatic instrumentation
client = OpenAIClient(
config=llm_config,
observability_config=obs_config # Optional
)
# Metrics automatically exported to your monitoring system
```
#### **Performance Monitoring**
- **Real-time Dashboards**: Track system health and performance
- **Alert Configuration**: Custom alerts for SLA violations
- **Cost Tracking**: Detailed cost breakdown by model and usage
- **Error Rate Monitoring**: Track and analyze failure patterns
- **Capacity Planning**: Usage trends and scaling recommendations
### šļø **Vector Database Integration**
#### **Milvus Support**
- **Hybrid Search**: Dense + sparse vector combination
- Semantic similarity with dense vectors
- Keyword matching with sparse vectors
- Weighted ranking and reranking
- **Production Features**:
- Connection pooling and async operations
- Batch insertion with configurable batch sizes
- Automatic index creation and optimization
- Metadata filtering and complex queries
- Schema versioning and migration support
```python
# Advanced vector operations
from arshai.vector_db.milvus_client import MilvusClient
from arshai.core.interfaces.ivector_db_client import ICollectionConfig
# Async operations for performance
client = MilvusClient()
config = ICollectionConfig(
collection_name="knowledge_base",
dense_dim=1536, # OpenAI embeddings
is_hybrid=True, # Enable sparse + dense
schema_model=CustomDocument # Structured metadata
)
# Hybrid search with filtering
results = await client.hybrid_search(
config=config,
dense_vectors=dense_embeddings,
sparse_vectors=sparse_embeddings,
expr="metadata['category'] == 'technical'",
limit=10
)
```
### š¦ **Document Processing Pipeline**
#### **Ingestion and Chunking**
- **Multi-format Support**: PDF, DOCX, TXT, HTML, Markdown
- **Intelligent Chunking**: Semantic and structural chunking
- **Metadata Extraction**: Automatic metadata detection
- **Batch Processing**: Efficient large-scale ingestion
- **Deduplication**: Content-based duplicate detection
#### **Embedding Generation**
- **Multiple Providers**: OpenAI, Google, Hugging Face
- **Model Selection**: Choose optimal models per use case
- **Batch Processing**: Efficient embedding generation
- **Caching**: Avoid re-processing unchanged content
### š **Enterprise Performance & Scalability**
#### **Connection Management**
- **HTTP Connection Pooling**: Configurable limits prevent resource exhaustion
- Default: 100 max connections, 10 per host
- Configurable via environment variables
- Automatic connection reuse and cleanup
- **Thread Pool Management**: Bounded execution prevents deadlocks
- CPU-aware thread limits: `min(32, cpu_count * 2)`
- Shared pools across MCP tools
- Graceful shutdown and cleanup
#### **Async Architecture**
- **Non-blocking Operations**: All I/O operations are async
- **Concurrent Processing**: Parallel function execution
- **Background Tasks**: Fire-and-forget operations
- **Resource Management**: Automatic cleanup and monitoring
#### **Production Optimizations**
```python
# Environment-based configuration
os.environ.update({
'ARSHAI_MAX_CONNECTIONS': '200', # HTTP connection pool
'ARSHAI_MAX_CONNECTIONS_PER_HOST': '20', # Per-host limit
'ARSHAI_MAX_THREADS': '64', # Thread pool size
'ARSHAI_MAX_MEMORY_MB': '4096', # Memory limit
'ARSHAI_CLEANUP_INTERVAL': '300', # Background cleanup
'ARSHAI_BATCH_SIZE': '100' # Vector DB batch size
})
# Production deployment patterns
class ProductionApp:
def __init__(self):
# Shared resources for efficiency
self.llm_pool = {
'fast': [OpenAIClient(fast_config) for _ in range(3)],
'powerful': [OpenAIClient(powerful_config) for _ in range(2)]
}
self.memory_manager = RedisMemoryManager(
redis_url=os.getenv("REDIS_URL"),
connection_pool_size=20,
ttl=7200
)
# Load balancing and circuit breakers
self.load_balancer = LoadBalancer(self.llm_pool)
self.circuit_breaker = CircuitBreaker(failure_threshold=5)
async def handle_concurrent_requests(self, requests: List[str]) -> List[str]:
# Process up to 100 concurrent requests
semaphore = asyncio.Semaphore(100)
async def process_one(request):
async with semaphore:
agent = self.create_agent()
return await agent.process(IAgentInput(message=request))
tasks = [process_one(req) for req in requests]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
```
### š ļø **Development and Testing**
#### **Testing Framework**
- **Mock Support**: Easy mocking of LLM clients and dependencies
- **Integration Testing**: Real LLM integration tests
- **Load Testing**: Performance and scalability validation
- **Async Testing**: Full async/await test support
```python
# Easy testing with dependency injection
async def test_customer_agent():
mock_llm = AsyncMock()
mock_llm.chat.return_value = {
"llm_response": "Test response",
"usage": {"total_tokens": 50}
}
mock_kb = AsyncMock()
mock_kb.search.return_value = ["relevant info"]
# Create agent with mocked dependencies
agent = CustomerServiceAgent(mock_llm, mock_kb)
result = await agent.process(IAgentInput(
message="Help with billing",
metadata={"user_id": "123"}
))
assert "Test response" in result["response"]
mock_llm.chat.assert_called_once()
mock_kb.search.assert_called_once()
```
#### **Development Patterns**
- **Configuration Management**: Environment-based settings
- **Hot Reloading**: Development server with auto-reload
- **Debugging Tools**: Comprehensive logging and tracing
- **Performance Profiling**: Built-in performance analysis
## Installation & Setup
### Basic Installation
```bash
# Core framework only
pip install arshai
# With all optional dependencies
pip install arshai[all] # Includes all features below
# Specific feature sets
pip install arshai[redis,milvus,observability] # Production essentials
pip install arshai[docs] # Documentation building
```
### Feature-Specific Installations
```bash
# Memory backends
pip install arshai[redis] # Redis memory manager
# Vector databases
pip install arshai[milvus] # Milvus vector database
# Advanced search
pip install arshai[flashrank] # Re-ranking capabilities
# Monitoring
pip install arshai[observability] # OpenTelemetry integration
# Development tools
pip install arshai[dev] # Testing and development utilities
```
### Environment Setup
```bash
# LLM Provider API Keys
export OPENAI_API_KEY="your-openai-key"
export AZURE_OPENAI_API_KEY="your-azure-key"
export AZURE_OPENAI_ENDPOINT="https://your-resource.openai.azure.com/"
export GOOGLE_API_KEY="your-gemini-key"
export OPENROUTER_API_KEY="your-openrouter-key"
# Optional: Google Service Account (alternative to API key)
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account.json"
# Memory backends
export REDIS_URL="redis://localhost:6379"
export REDIS_TTL="3600" # 1 hour
# Vector database
export MILVUS_HOST="localhost"
export MILVUS_PORT="19530"
export MILVUS_DB_NAME="default"
# Performance tuning
export ARSHAI_MAX_CONNECTIONS="100"
export ARSHAI_MAX_THREADS="32"
export ARSHAI_BATCH_SIZE="50"
# Observability
export OTEL_EXPORTER_OTLP_ENDPOINT="http://jaeger:14268"
export OTEL_SERVICE_NAME="arshai_app"
```
### Docker Setup
```dockerfile
# Production Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install production dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
# Performance configuration
ENV ARSHAI_MAX_CONNECTIONS=200
ENV ARSHAI_MAX_CONNECTIONS_PER_HOST=20
ENV ARSHAI_MAX_THREADS=64
ENV ARSHAI_MAX_MEMORY_MB=4096
# Redis connection
ENV REDIS_URL=redis://redis:6379
ENV REDIS_TTL=7200
# Milvus connection
ENV MILVUS_HOST=milvus
ENV MILVUS_PORT=19530
COPY . .
CMD ["python", "app.py"]
```
### Docker Compose for Development
```yaml
# docker-compose.yml
version: '3.8'
services:
app:
build: .
environment:
- REDIS_URL=redis://redis:6379
- MILVUS_HOST=milvus
- OPENAI_API_KEY=${OPENAI_API_KEY}
depends_on:
- redis
- milvus
- jaeger
redis:
image: redis:7-alpine
ports:
- "6379:6379"
milvus:
image: milvusdb/milvus:latest
ports:
- "19530:19530"
environment:
- ETCD_ENDPOINTS=etcd:2379
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686"
- "14268:14268"
```
## Quick Start Guide š
### 1. Quick Start - Your First Agent
```python
from arshai.llms.openai import OpenAIClient
from arshai.agents.base import BaseAgent
from arshai.core.interfaces.illm import ILLMConfig
from arshai.core.interfaces.iagent import IAgentInput
# Step 1: Create your LLM client (you control this)
llm_config = ILLMConfig(
model="gpt-4o", # Latest model
temperature=0.7, # Creativity level
max_tokens=2000 # Response length
)
llm_client = OpenAIClient(llm_config)
# Step 2: Create your agent (you define the behavior)
agent = BaseAgent(
llm_client=llm_client,
system_prompt="You are an expert Python programming assistant. Provide clear, practical examples with explanations."
)
# Step 3: Use your agent (you control the interaction)
response = await agent.process(IAgentInput(
message="How do I handle async/await in Python? Show me a practical example."
))
print(response)
# Output: Detailed explanation with code examples
```
### 1b. Agent with Memory (Conversation Context)
```python
from arshai.agents.working_memory import WorkingMemoryAgent
from arshai.memory.working_memory.in_memory_manager import InMemoryManager
# Create memory system for conversation context
memory = InMemoryManager(ttl=1800) # 30-minute memory
# Create agent with memory capabilities
conversation_agent = WorkingMemoryAgent(
llm_client=llm_client,
memory_manager=memory,
system_prompt="You are a helpful assistant that remembers our conversation context."
)
# First interaction
response1 = await conversation_agent.process(IAgentInput(
message="My name is Alex and I'm learning Python.",
metadata={"conversation_id": "conv_123"}
))
# Second interaction - agent remembers context
response2 = await conversation_agent.process(IAgentInput(
message="What was my name again?",
metadata={"conversation_id": "conv_123"}
))
print(response2) # Will remember "Alex"
```
### 1c. Multi-Provider Setup
```python
# Use different providers for different tasks
from arshai.llms.openai import OpenAIClient
from arshai.llms.google_genai import GeminiClient
from arshai.llms.openrouter import OpenRouterClient
# Fast model for simple tasks
fast_llm = OpenAIClient(ILLMConfig(model="gpt-4o-mini", temperature=0.3))
# Powerful model for complex analysis
strong_llm = OpenAIClient(ILLMConfig(model="gpt-4o", temperature=0.5))
# Alternative provider for variety
gemini_llm = GeminiClient(ILLMConfig(model="gemini-1.5-pro", temperature=0.7))
# Access to specialized models
openrouter_llm = OpenRouterClient(ILLMConfig(
model="anthropic/claude-3.5-sonnet", # Claude via OpenRouter
temperature=0.6
))
# Create specialized agents
quick_agent = BaseAgent(fast_llm, "Quick responses for simple questions")
analysis_agent = BaseAgent(strong_llm, "Deep analysis and complex reasoning")
creative_agent = BaseAgent(gemini_llm, "Creative writing and brainstorming")
research_agent = BaseAgent(openrouter_llm, "Research and detailed explanations")
# Use the right agent for each task
quick_response = await quick_agent.process(IAgentInput(
message="What's 15% tip on $42?"
))
analysis_response = await analysis_agent.process(IAgentInput(
message="Analyze the pros and cons of microservices vs monolithic architecture for a startup"
))
```
### 2. Advanced Agent with Multiple Tool Types
```python
# Comprehensive restaurant agent with multiple capabilities
class SmartRestaurantAgent(BaseAgent):
"""Intelligent restaurant agent with real-time data and background tasks."""
def __init__(self, llm_client: ILLM, reservation_system, pos_system, notification_service):
super().__init__(llm_client, "You are an expert restaurant assistant with access to real-time data")
self.reservation_system = reservation_system
self.pos_system = pos_system
self.notification_service = notification_service
async def process(self, input: IAgentInput) -> Dict[str, Any]:
# Define regular tools (return results to conversation)
def get_current_time() -> Dict[str, str]:
"""Get current time with timezone info."""
from datetime import datetime
import pytz
now = datetime.now(pytz.UTC)
local_time = now.astimezone(pytz.timezone('America/New_York'))
return {
"utc_time": now.isoformat(),
"local_time": local_time.strftime("%I:%M %p"),
"date": local_time.strftime("%A, %B %d, %Y"),
"timezone": "Eastern Time"
}
def calculate_bill_details(subtotal: float, tax_rate: float = 0.08, tip_percent: float = 20) -> Dict[str, float]:
"""Calculate comprehensive bill breakdown."""
tax = subtotal * tax_rate
tip = subtotal * (tip_percent / 100)
total = subtotal + tax + tip
return {
"subtotal": round(subtotal, 2),
"tax": round(tax, 2),
"tip": round(tip, 2),
"total": round(total, 2),
"tip_percentage": tip_percent
}
async def check_table_availability(party_size: int, preferred_time: str) -> Dict[str, Any]:
"""Check real-time table availability."""
# Your reservation system integration
available_times = await self.reservation_system.check_availability(
party_size=party_size,
date=preferred_time
)
return {
"available": len(available_times) > 0,
"available_times": available_times[:5], # Show top 5 options
"party_size": party_size,
"wait_time_estimate": "15-20 minutes" if available_times else "45+ minutes"
}
async def get_menu_recommendations(dietary_restrictions: str = None, budget_range: str = "moderate") -> List[Dict[str, Any]]:
"""Get personalized menu recommendations."""
# Your menu system integration with filtering
menu_items = await self.pos_system.get_filtered_menu(
dietary_restrictions=dietary_restrictions,
price_range=budget_range
)
return [
{
"name": item["name"],
"description": item["description"],
"price": item["price"],
"dietary_tags": item["tags"],
"popularity_score": item["popularity"]
}
for item in menu_items[:8] # Top 8 recommendations
]
# Define background tasks (fire-and-forget)
async def log_customer_interaction(user_id: str, interaction_type: str, details: Dict[str, Any]) -> None:
"""BACKGROUND TASK: Log interaction for analytics."""
await self.pos_system.log_interaction({
"user_id": user_id,
"type": interaction_type,
"details": details,
"timestamp": datetime.now().isoformat()
})
async def send_follow_up_notification(user_id: str, message: str, delay_minutes: int = 60) -> None:
"""BACKGROUND TASK: Schedule follow-up notification."""
await self.notification_service.schedule_notification(
user_id=user_id,
message=message,
delay_minutes=delay_minutes
)
async def update_customer_preferences(user_id: str, preferences: Dict[str, Any]) -> None:
"""BACKGROUND TASK: Update customer profile."""
await self.pos_system.update_customer_profile(
user_id=user_id,
preferences=preferences
)
async def trigger_marketing_follow_up(user_id: str, interaction_context: str) -> None:
"""BACKGROUND TASK: Trigger marketing automation."""
# Your CRM/marketing automation integration
await self.notification_service.trigger_campaign(
user_id=user_id,
campaign_type="restaurant_follow_up",
context=interaction_context
)
# Prepare LLM input with all tools
llm_input = ILLMInput(
system_prompt="""You are a sophisticated restaurant assistant with access to real-time systems.
You can check availability, provide menu recommendations, calculate bills, and help with reservations.
Always be helpful, accurate, and proactive in offering relevant information.
When customers show interest, set up appropriate follow-ups.""",
user_message=input.message,
# Regular functions return results to continue conversation
regular_functions={
"get_current_time": get_current_time,
"calculate_bill_details": calculate_bill_details,
"check_table_availability": check_table_availability,
"get_menu_recommendations": get_menu_recommendations
},
# Background tasks run independently
background_tasks={
"log_customer_interaction": log_customer_interaction,
"send_follow_up_notification": send_follow_up_notification,
"update_customer_preferences": update_customer_preferences,
"trigger_marketing_follow_up": trigger_marketing_follow_up
}
)
# Get LLM response with tool usage
result = await self.llm_client.chat(llm_input)
# Return structured response
return {
"response": result["llm_response"],
"tools_used": result.get("tools_used", []),
"background_tasks_triggered": result.get("background_tasks", []),
"user_id": input.metadata.get("user_id"),
"session_id": input.metadata.get("session_id"),
"timestamp": datetime.now().isoformat()
}
# Production usage
async def main():
# Create dependencies
llm = OpenAIClient(ILLMConfig(model="gpt-4o", temperature=0.7))
reservation_system = ReservationAPI()
pos_system = POSIntegration()
notification_service = NotificationService()
# Create the restaurant agent
restaurant_agent = SmartRestaurantAgent(
llm_client=llm,
reservation_system=reservation_system,
pos_system=pos_system,
notification_service=notification_service
)
# Example interaction
response = await restaurant_agent.process(IAgentInput(
message="Hi! I'm looking for a table for 4 people tonight around 7 PM. We have one vegetarian in our group and prefer mid-range prices. Also, what's a good tip for a $85 bill?",
metadata={
"user_id": "customer_123",
"session_id": "session_456"
}
))
print(f"Response: {response['response']}")
print(f"Tools used: {response['tools_used']}")
print(f"Background tasks: {response['background_tasks_triggered']}")
if __name__ == "__main__":
import asyncio
asyncio.run(main())
```
### 3. Intelligent Multi-Agent System
```python
class IntelligentHelpDeskSystem:
"""Production-ready helpdesk with multiple AI agents."""
def __init__(self):
# Shared LLM clients for different use cases
self.fast_llm = OpenAIClient(ILLMConfig(model="gpt-4o-mini", temperature=0.3))
self.powerful_llm = OpenAIClient(ILLMConfig(model="gpt-4o", temperature=0.3))
# Persistent memory with Redis
self.memory = RedisMemoryManager(
redis_url="redis://localhost:6379",
ttl=3600, # 1 hour conversation memory
key_prefix="helpdesk"
)
# Knowledge base with vector search
self.knowledge_base = VectorKnowledgeBase(
vector_client=MilvusClient(),
collection_name="support_docs"
)
# External integrations
self.ticket_system = JiraIntegration()
self.billing_api = StripeAPI()
self.user_db = PostgreSQLConnection()
# Specialized agents with different capabilities
self.agents = self._create_agents()
# Smart routing with classification
self.router = IntentClassifier(self.fast_llm)
def _create_agents(self) -> Dict[str, IAgent]:
"""Create specialized agents with unique tools and knowledge."""
return {
"triage": TriageAgent(
llm_client=self.fast_llm,
system_prompt="You classify and prioritize customer requests"
),
"technical": TechnicalSupportAgent(
llm_client=self.powerful_llm,
knowledge_base=self.knowledge_base,
tools={
"search_docs": self.knowledge_base.search,
"check_system_status": self._check_system_status,
"create_bug_report": self._create_bug_report
},
background_tasks={
"log_technical_issue": self._log_technical_issue,
"notify_engineering": self._notify_engineering
}
),
"billing": BillingAgent(
llm_client=self.fast_llm,
billing_api=self.billing_api,
tools={
"get_account_status": self.billing_api.get_account,
"process_refund": self.billing_api.process_refund,
"update_payment_method": self.billing_api.update_payment
},
background_tasks={
"log_billing_interaction": self._log_billing_interaction,
"send_receipt": self._send_receipt
}
),
"escalation": EscalationAgent(
llm_client=self.powerful_llm,
ticket_system=self.ticket_system,
tools={
"create_priority_ticket": self.ticket_system.create_urgent,
"schedule_callback": self._schedule_callback,
"escalate_to_manager": self._escalate_to_manager
},
background_tasks={
"notify_management": self._notify_management,
"update_crm": self._update_crm
}
),
"general": GeneralSupportAgent(
llm_client=self.fast_llm,
knowledge_base=self.knowledge_base,
tools={
"search_faq": self.knowledge_base.search_faq,
"get_account_info": self._get_basic_account_info
}
)
}
async def handle_customer_request(
self,
message: str,
user_id: str,
session_id: Optional[str] = None
) -> Dict[str, Any]:
"""Main entry point for customer requests."""
# Generate session ID if not provided
if not session_id:
session_id = f"session_{user_id}_{int(time.time())}"
# Retrieve conversation history and user context
conversation_history = await self.memory.retrieve({
"session_id": session_id,
"user_id": user_id
})
user_context = await self._get_user_context(user_id)
# Step 1: Intelligent routing with context
routing_result = await self.router.classify_request(
message=message,
conversation_history=conversation_history,
user_context=user_context
)
# Step 2: Select appropriate agent based on classification
agent_type = self._select_agent(routing_result)
agent = self.agents[agent_type]
# Step 3: Prepare comprehensive input for the agent
agent_input = IAgentInput(
message=message,
metadata={
"user_id": user_id,
"session_id": session_id,
"user_context": user_context,
"conversation_history": conversation_history,
"routing_info": routing_result,
"timestamp": datetime.now().isoformat()
}
)
# Step 4: Process request with selected agent
agent_response = await agent.process(agent_input)
# Step 5: Post-process and enhance response
enhanced_response = await self._enhance_response(
agent_response,
routing_result,
user_context
)
# Step 6: Update conversation memory
await self.memory.store({
"session_id": session_id,
"user_id": user_id,
"interaction": {
"user_message": message,
"agent_response": enhanced_response,
"agent_used": agent_type,
"confidence": routing_result.get("confidence", 0.0),
"resolved": enhanced_response.get("resolved", False),
"timestamp": datetime.now().isoformat()
}
})
# Step 7: Return comprehensive response
return {
"response": enhanced_response["message"],
"agent_used": agent_type,
"confidence": routing_result.get("confidence", 0.0),
"session_id": session_id,
"resolved": enhanced_response.get("resolved", False),
"escalated": enhanced_response.get("escalated", False),
"next_steps": enhanced_response.get("next_steps", []),
"estimated_resolution_time": enhanced_response.get("eta"),
"satisfaction_survey": enhanced_response.get("survey_link")
}
def _select_agent(self, routing_result: Dict[str, Any]) -> str:
"""Smart agent selection based on classification and context."""
intent = routing_result.get("intent", "general")
urgency = routing_result.get("urgency", "low")
complexity = routing_result.get("complexity", "simple")
# Escalation logic
if urgency == "critical" or routing_result.get("escalate_immediately"):
return "escalation"
# Intent-based routing
if intent in ["technical", "bug", "outage"]:
return "technical" if complexity != "high" else "escalation"
elif intent in ["billing", "payment", "subscription"]:
return "billing"
elif intent in ["complaint", "refund", "cancellation"]:
return "escalation"
else:
return "general"
# Helper methods for agent tools
async def _check_system_status(self) -> Dict[str, str]:
"""Check current system status."""
# Your system monitoring integration
return {"status": "operational", "uptime": "99.9%"}
async def _get_user_context(self, user_id: str) -> Dict[str, Any]:
"""Retrieve user context from various systems."""
# Aggregate user data from multiple sources
return {
"account_type": "premium",
"support_tier": "priority",
"previous_issues": 2,
"satisfaction_score": 4.5
}
# Production usage
async def main():
helpdesk = IntelligentHelpDeskSystem()
# Handle customer request
response = await helpdesk.handle_customer_request(
message="My API is returning 500 errors since this morning. This is affecting our production system!",
user_id="enterprise_customer_123"
)
print(f"Agent used: {response['agent_used']}")
print(f"Response: {response['response']}")
print(f"Escalated: {response['escalated']}")
if __name__ == "__main__":
import asyncio
asyncio.run(main())
```
### 4. Advanced: Real-Time Streaming & Multi-Modal
```python
# Advanced streaming with real-time function execution
async def financial_advisor_stream():
"""Financial advisor with real-time data and streaming."""
def get_stock_price(symbol: str) -> Dict[str, Any]:
"""Get real-time stock price and metrics."""
# Your real-time data source integration
return {
"symbol": symbol,
"price": 150.25,
"change": "+2.5%",
"volume": "1.2M",
"market_cap": "2.8T"
}
def get_market_news(symbol: str) -> List[Dict[str, str]]:
"""Get latest news for a stock symbol."""
# Your news API integration
return [
{"headline": "Strong Q3 earnings reported", "sentiment": "positive"},
{"headline": "New product launch announced", "sentiment": "positive"}
]
async def send_price_alert(symbol: str, threshold: float, current_price: float) -> None:
"""BACKGROUND TASK: Set up price monitoring."""
print(f"šØ Alert set: {symbol} @ ${threshold} (current: ${current_price})")
# Your alert system integration - runs independently
async def log_trading_interest(symbol: str, user_intent: str) -> None:
"""BACKGROUND TASK: Log user trading interest."""
# Your analytics system - fire and forget
print(f"š Logged interest: {user_intent} for {symbol}")
# Multi-function streaming setup
llm_input = ILLMInput(
system_prompt="""You are an expert financial advisor with access to real-time market data.
Provide comprehensive analysis using current stock prices, news, and market sentiment.
Set up alerts when users express interest in price monitoring.""",
user_message="What's Apple's current situation? I'm thinking about buying and want to be alerted if it drops below $140",
regular_functions={
"get_stock_price": get_stock_price,
"get_market_news": get_market_news
},
background_tasks={
"send_price_alert": send_price_alert,
"log_trading_interest": log_trading_interest
}
)
print("š Financial Advisor:")
# Stream with real-time function execution
async for chunk in llm.stream(llm_input):
if chunk.get("llm_response"):
print(chunk["llm_response"], end="", flush=True)
# Functions execute immediately as LLM calls them
# Regular functions return results to continue conversation
# Background tasks run independently without blocking
print("\n\nā
Analysis complete with active monitoring set up!")
# Multi-modal capabilities (Gemini)
async def image_analysis_stream():
"""Analyze images with streaming responses."""
gemini_config = ILLMConfig(model="gemini-1.5-pro-vision")
gemini = GeminiClient(gemini_config)
llm_input = ILLMInput(
system_prompt="You are an expert image analyst. Provide detailed, structured analysis.",
user_message="Analyze this product image for e-commerce listing",
images=["path/to/product-image.jpg"], # Multi-modal input
regular_functions={
"generate_seo_tags": lambda description: ["electronics", "gadget", "premium"],
"suggest_price_range": lambda category, features: {"min": 99, "max": 149}
}
)
print("š¼ļø Product Analysis:")
async for chunk in gemini.stream(llm_input):
if chunk.get("llm_response"):
print(chunk["llm_response"], end="", flush=True)
print("\n\nā
Product analysis complete!")
# Usage
if __name__ == "__main__":
import asyncio
asyncio.run(financial_advisor_stream())
```
## Memory Management
```python
from arshai.memory.working_memory.redis_memory_manager import RedisMemoryManager
from arshai.memory.working_memory.in_memory_manager import InMemoryManager
# You choose the memory implementation
def create_memory(env: str) -> IMemoryManager:
if env == "production":
return RedisMemoryManager(
redis_url=os.getenv("REDIS_URL"),
ttl=3600,
key_prefix="prod_app"
)
else:
return InMemoryManager(ttl=1800) # Development
# You integrate it with agents
memory = create_memory("production")
agent = ConversationAgent(llm, memory, "You remember our conversation")
```
## Production Deployment
### Performance Optimizations
```python
# Configure performance settings via environment variables
import os
# HTTP connection pooling
os.environ['ARSHAI_MAX_CONNECTIONS'] = '100'
os.environ['ARSHAI_MAX_CONNECTIONS_PER_HOST'] = '10'
# Thread pool management
os.environ['ARSHAI_MAX_THREADS'] = '32'
# Memory management
os.environ['ARSHAI_MAX_MEMORY_MB'] = '2048'
os.environ['ARSHAI_CLEANUP_INTERVAL'] = '300'
# Your production setup benefits automatically
llm = OpenAIClient(config) # Uses connection pooling
agent = CustomAgent(llm) # Uses managed threads
```
### Docker Deployment
```dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
# Performance configuration
ENV ARSHAI_MAX_CONNECTIONS=100
ENV ARSHAI_MAX_CONNECTIONS_PER_HOST=20
ENV ARSHAI_MAX_THREADS=32
COPY . .
CMD ["python", "main.py"]
```
### Observability
```python
from arshai.observability import ObservabilityConfig
# Configure monitoring (optional - your choice)
obs_config = ObservabilityConfig.from_yaml("monitoring.yaml")
client = LLMFactory.create_with_observability(
provider="openai",
config=llm_config,
observability_config=obs_config
)
# Automatic metrics collection:
# ā
llm_time_to_first_token_seconds
# ā
llm_time_to_last_token_seconds
# ā
llm_completion_tokens
# ā
llm_requests_total
```
## Key Interfaces
### LLM Provider Interface
```python
from arshai.core.interfaces.illm import ILLM, ILLMConfig, ILLMInput
# All LLM providers implement this interface
class CustomLLMProvider(ILLM):
def __init__(self, config: ILLMConfig):
self.config = config
async def chat(self, input: ILLMInput) -> Dict[str, Any]:
# Your implementation
pass
async def stream(self, input: ILLMInput) -> AsyncGenerator:
# Your streaming implementation
pass
```
### Agent Interface
```python
from arshai.core.interfaces.iagent import IAgent, IAgentInput
class MyCustomAgent(IAgent):
def __init__(self, llm_client: ILLM, **kwargs):
self.llm_client = llm_client
# You control initialization
async def process(self, input: IAgentInput) -> Any:
# You control the processing logic
# Return anything - string, dict, object, generator
pass
```
## Testing
```python
import pytest
from unittest.mock import AsyncMock
async def test_my_agent():
# Easy testing with dependency injection
mock_llm = AsyncMock()
mock_llm.chat.return_value = {
"llm_response": "Test response",
"usage": {"total_tokens": 50}
}
# Your agent with mocked dependencies
agent = MyAgent(mock_llm, "Test prompt")
result = await agent.process(IAgentInput(message="Test"))
assert result == expected_result
mock_llm.chat.assert_called_once()
```
## Extension and Customization
### Add New LLM Provider
```python
class MyLLMProvider(ILLM):
"""Your custom LLM provider."""
def __init__(self, config: ILLMConfig):
self.config = config
# Your initialization
async def chat(self, input: ILLMInput) -> Dict[str, Any]:
# Your provider implementation
return {
"llm_response": "Your response",
"usage": {"total_tokens": 100}
}
```
### Create Custom Agent Types
```python
class DomainSpecificAgent(BaseAgent):
"""Agent specialized for your domain."""
def __init__(self, llm_client: ILLM, domain_tools: List[Callable]):
super().__init__(llm_client, "Domain-specific system prompt")
self.domain_tools = {tool.__name__: tool for tool in domain_tools}
async def process(self, input: IAgentInput) -> Dict[str, Any]:
# Your domain-specific processing
llm_input = ILLMInput(
system_prompt=self.system_prompt,
user_message=input.message,
regular_functions=self.domain_tools
)
result = await self.llm_client.chat(llm_input)
return self._format_domain_response(result)
## Observability and Monitoring
Arshai provides enterprise-grade observability for production AI systems with comprehensive OpenTelemetry integration and zero-fallback monitoring.
### šÆ Key Features
- **Zero-Fallback Monitoring**: Always capture the 4 critical LLM metrics without any fallback mechanisms
- **OpenTelemetry Native**: Export to any OTLP-compatible backend (Jaeger, Prometheus, Datadog, New Relic)
- **Phoenix AI Observability**: Advanced LLM interaction monitoring with comprehensive input/output tracing
- **Automatic Factory Integration**: Zero-code observability through intelligent factory wrapping
- **Real-time Input/Output Capture**: Automatic capture of prompts, responses, and usage metrics
- **LLM Usage Data Integration**: Accurate token counting from LLM response usage data
- **Streaming Observability**: Real-time token-level timing for streaming responses
- **Non-Intrusive Design**: Zero side effects on LLM calls with graceful degradation
- **Production-Ready**: Comprehensive configuration, privacy controls, and performance optimization
### š The Four Key Metrics
Every LLM interaction automatically captures these critical performance metrics:
| Metric | Description | Use Case |
|--------|-------------|----------|
| `llm_time_to_first_token_seconds` | Latency from request start to first token | User experience, response time SLAs |
| `llm_time_to_last_token_seconds` | Total response generation time | End-to-end performance monitoring |
| `llm_duration_first_to_last_token_seconds` | Token generation duration | Throughput analysis, model performance |
| `llm_completion_tokens` | Accurate completion token count | Cost tracking, usage monitoring |
### šļø Architecture
```mermaid
graph TD
A[LLM Calls] --> B[Observable Factory]
B --> C[Observability Manager]
C --> D[Metrics Collector]
C --> E[Trace Exporter]
C --> F[Phoenix Client]
D --> G[OpenTelemetry Collector]
E --> G
F --> H[Phoenix AI Platform]
G --> I[Jaeger/Zipkin]
G --> J[Prometheus]
G --> K[DataDog/New Relic]
J --> L[Grafana Dashboards]
subgraph "Automatic Capture"
M[Input Messages]
N[Output Responses]
O[Token Usage]
P[Timing Data]
end
B --> M
B --> N
B --> O
B --> P
```
### š Quick Setup
#### 1. Basic Configuration
```yaml
# config.yaml
observability:
service_name: "my-ai-service"
track_token_timing: true
otlp_endpoint: "http://localhost:4317"
# Phoenix AI Observability
phoenix_enabled: true
phoenix_endpoint: "http://localhost:6006"
# Privacy and data capture
log_prompts: true
log_responses: true
```
#### 2. Automatic Factory Integration
```python
from arshai.config.settings import Settings
from arshai.core.interfaces.illm import ILLMInput
# Settings automatically detects observability configuration
settings = Settings()
# Create LLM - observability is automatically enabled if configured
llm = settings.create_llm()
# All calls are automatically instrumented with zero configuration
input_data = ILLMInput(
system_prompt="You are a helpful assistant.",
user_message="Hello!"
)
response = llm.chat_completion(input_data)
# Automatic capture includes:
# ā
Input messages (system prompt + user message)
# ā
Output response (full LLM response)
# ā
Usage metrics (prompt/completion/total tokens)
# ā
Timing data (first token, last token, total duration)
# ā
Invocation parameters (model, temperature, provider)
# ā
Span naming (llm.chat_completion not llm.<lambda>)
```
## Complete Examples & Use Cases
### Production-Ready Examples
```bash
# Start complete observability stack
cd tests/e2e/observability/
docker-compose up -d
# Access observability platforms:
# Phoenix AI Platform: http://localhost:6006 (LLM interactions, input/output tracing)
# Jaeger Traces: http://localhost:16686 (distributed tracing)
# Prometheus Metrics: http://localhost:9090 (metrics and queries)
# Grafana Dashboards: http://localhost:3000 (visualization)
```
### š Supported Backends
#### AI Observability Platforms
- **Phoenix AI Platform**: Advanced LLM interaction monitoring with input/output tracing
- **Arize AI**: Enterprise LLM observability and evaluation platform
#### Metrics Backends
- **Prometheus** + Grafana
- **DataDog** (via OTLP)
- **New Relic** (via OTLP)
- **AWS CloudWatch** (via OTLP)
- **Azure Monitor** (via OTLP)
- **Google Cloud Monitoring** (via OTLP)
#### Tracing Backends
- **Jaeger**
- **Zipkin**
- **DataDog APM**
- **New Relic Distributed Tracing**
- **AWS X-Ray**
- **Azure Application Insights**
### š§ Advanced Configuration
#### Provider-Specific Settings
```yaml
observability:
provider_configs:
openai:
track_token_timing: true
anthropic:
track_token_timing: true
google:
track_token_timing: false # Disable for specific providers
```
#### Privacy and Security
```yaml
observability:
# Privacy controls (recommended for production)
log_prompts: false
log_responses: false
max_prompt_length: 1000
max_response_length: 1000
# Custom attributes (avoid sensitive data)
custom_attributes:
team: "ai-platform"
environment: "production"
```
#### Performance Tuning
```yaml
observability:
# High-throughput settings
metric_export_interval: 30
trace_sampling_rate: 0.05 # 5% sampling
max_span_attributes: 64
# Async processing for better performance
non_intrusive: true
```
### š§Ŗ Testing and Development
#### End-to-End Test Suite
```bash
# Complete observability testing with real backends
cd tests/e2e/observability/
export OPENAI_API_KEY="your-key"
./run_test.sh
# Tests verify:
# ā
All 4 key metrics collected
# ā
OTLP export working
# ā
Multiple provider support
# ā
Streaming observability
# ā
No side effects on LLM calls
```
#### Development Setup
```yaml
# Development configuration
observability:
service_name: "arshai-dev"
environment: "development"
# Debug settings
log_prompts: true
log_responses: true
trace_sampling_rate: 1.0 # 100% sampling
# Local OTLP collector
otlp_endpoint: "http://localhost:4317"
```
### š Production Deployment
#### Docker Compose for Production
The `tests/e2e/observability/docker-compose.yml` provides a production-ready template:
```bash
# Production observability stack
docker-compose -f tests/e2e/observability/docker-compose.yml up -d
# Includes:
# - OpenTelemetry Collector
# - Jaeger for distributed tracing
# - Prometheus for metrics collection
# - Grafana with pre-built dashboards
```
#### Kubernetes Deployment
```yaml
# Example Kubernetes configuration
apiVersion: v1
kind: ConfigMap
metadata:
name: observability-config
data:
config.yaml: |
observability:
service_name: "arshai-prod"
otlp_endpoint: "http://otel-collector:4317"
track_token_timing: true
log_prompts: false
log_responses: false
```
### š Monitoring and Alerting
#### Key Metrics to Monitor
```promql
# Prometheus queries for monitoring
# Average time to first token
histogram_quantile(0.95, rate(llm_time_to_first_token_seconds_bucket[5m]))
# Token throughput
rate(llm_completion_tokens[5m])
# Error rate
rate(llm_requests_failed[5m]) / rate(llm_requests_total[5m])
# Active requests
llm_active_requests
```
#### Grafana Dashboard
Pre-built dashboards available in `tests/e2e/observability/dashboards/`:
- **LLM Performance**: Token timing metrics with percentiles
- **Cost Tracking**: Token usage and provider distribution
- **Error Monitoring**: Failed requests and error patterns
- **Throughput Analysis**: Request rates and concurrent processing
### š Integration Examples
#### With Existing Monitoring
#### **Customer Support System**
```python
# examples/customer_support_system.py
from arshai.llms.openai import OpenAIClient
from arshai.agents.base import BaseAgent
from arshai.memory.working_memory.redis_memory_manager import RedisMemoryManager
from arshai.tools.web_search import SearxNGClient
from arshai.vector_db.milvus_client import MilvusClient
class IntelligentCustomerSupport:
"""Production-ready customer support with multiple agents."""
def __init__(self):
# Shared LLM client with connection pooling
llm_config = ILLMConfig(model="gpt-4o", temperature=0.3)
self.llm = OpenAIClient(llm_config)
# Memory system for conversation continuity
self.memory = RedisMemoryManager(
redis_url=os.getenv("REDIS_URL"),
ttl=3600,
key_prefix="support"
)
# Knowledge base with vector search
self.kb = KnowledgeBase(
vector_client=MilvusClient(),
embedding_model="text-embedding-3-large"
)
# Specialized agents
self.agents = {
"triage": TriageAgent(self.llm, "Route customer requests"),
"technical": TechnicalAgent(self.llm, self.kb),
"billing": BillingAgent(self.llm, billing_api),
"escalation": EscalationAgent(self.llm, human_queue)
}
async def handle_request(self, message: str, user_id: str, session_id: str) -> Dict[str, Any]:
"""Handle customer request with full context."""
# Retrieve conversation history
context = await self.memory.retrieve({
"user_id": user_id,
"session_id": session_id
})
# Triage the request
triage_result = await self.agents["triage"].classify(
message=message,
context=context,
user_history=await self.get_user_history(user_id)
)
# Select appropriate agent
agent_type = self.select_agent(triage_result)
agent = self.agents[agent_type]
# Process with full context
result = await agent.process(
IAgentInput(
message=message,
metadata={
"user_id": user_id,
"session_id": session_id,
"triage": triage_result,
"context": context
}
)
)
# Store updated context
await self.memory.store({
"user_id": user_id,
"session_id": session_id,
"interaction": {
"request": message,
"response": result["response"],
"agent_used": agent_type,
"confidence": result.get("confidence", 0.0),
"timestamp": datetime.now().isoformat()
}
})
return {
"response": result["response"],
"agent_used": agent_type,
"confidence": result.get("confidence", 0.0),
"escalated": agent_type == "escalation",
"session_id": session_id
}
```
#### **Content Generation Pipeline**
```python
# examples/content_generation_pipeline.py
class ContentGenerationPipeline:
"""Multi-stage content generation with quality assurance."""
def __init__(self):
# Different models for different tasks
self.creative_llm = OpenAIClient(ILLMConfig(model="gpt-4o", temperature=0.9))
self.editor_llm = OpenAIClient(ILLMConfig(model="gpt-4o", temperature=0.3))
self.fact_checker_llm = GeminiClient(ILLMConfig(model="gemini-1.5-pro"))
# Specialized agents
self.writer = CreativeWriterAgent(self.creative_llm)
self.editor = EditorAgent(self.editor_llm)
self.fact_checker = FactCheckerAgent(self.fact_checker_llm)
self.seo_optimizer = SEOAgent(self.editor_llm)
# Tools
self.web_search = SearxNGClient()
self.plagiarism_checker = PlagiarismTool()
async def generate_article(self, topic: str, requirements: Dict[str, Any]) -> Dict[str, Any]:
"""Generate high-quality article with multiple review stages."""
# Stage 1: Research and outline
research_data = await self.research_topic(topic)
outline = await self.writer.create_outline(topic, research_data, requirements)
# Stage 2: Content generation
content = await self.writer.write_content(outline, requirements)
# Stage 3: Editorial review
edited_content = await self.editor.review_and_edit(
content,
style_guide=requirements.get("style_guide"),
target_audience=requirements.get("audience")
)
# Stage 4: Fact checking
fact_check_result = await self.fact_checker.verify_claims(
edited_content,
research_sources=research_data["sources"]
)
if fact_check_result["issues_found"]:
# Correct factual issues
edited_content = await self.editor.correct_facts(
edited_content,
fact_check_result["corrections"]
)
# Stage 5: SEO optimization
optimized_content = await self.seo_optimizer.optimize(
edited_content,
target_keywords=requirements.get("keywords", []),
seo_requirements=requirements.get("seo", {})
)
# Stage 6: Quality assurance
quality_score = await self.assess_quality(optimized_content)
return {
"title": optimized_content["title"],
"content": optimized_content["body"],
"meta_description": optimized_content["meta_description"],
"keywords": optimized_content["keywords"],
"quality_score": quality_score,
"word_count": len(optimized_content["body"].split()),
"readability_score": optimized_content.get("readability", 0),
"fact_check_passed": not fact_check_result["issues_found"],
"seo_score": optimized_content.get("seo_score", 0)
}
```
#### **Data Analysis Assistant**
```python
# examples/data_analysis_assistant.py
class DataAnalysisAssistant:
"""Intelligent data analysis with visualization and insights."""
def __init__(self):
self.llm = OpenAIClient(ILLMConfig(model="gpt-4o"))
self.code_executor = CodeExecutorTool()
self.viz_generator = VisualizationTool()
# Data analysis agent with specialized tools
self.analyst = DataAnalystAgent(
llm_client=self.llm,
tools={
"execute_python": self.code_executor.execute,
"create_visualization": self.viz_generator.create,
"statistical_analysis": self.statistical_analysis,
"data_profiling": self.profile_data
}
)
async def analyze_dataset(self, data_path: str, analysis_request: str) -> Dict[str, Any]:
"""Perform comprehensive data analysis."""
# Load and profile the data
data_profile = await self.profile_data(data_path)
# Generate analysis plan
analysis_plan = await self.analyst.create_analysis_plan(
data_profile=data_profile,
request=analysis_request
)
results = {}
# Execute analysis steps
for step in analysis_plan["steps"]:
if step["type"] == "statistical_analysis":
result = await self.statistical_analysis(
data_path,
step["parameters"]
)
elif step["type"] == "visualization":
result = await self.viz_generator.create(
data_path,
chart_type=step["chart_type"],
parameters=step["parameters"]
)
elif step["type"] == "custom_code":
result = await self.code_executor.execute(
step["code"],
data_path=data_path
)
results[step["name"]] = result
# Generate insights and recommendations
insights = await self.analyst.generate_insights(
analysis_results=results,
original_request=analysis_request
)
return {
"data_profile": data_profile,
"analysis_plan": analysis_plan,
"results": results,
"insights": insights,
"visualizations": [r for r in results.values() if r.get("type") == "chart"],
"recommendations": insights.get("recommendations", [])
}
```
### Working Examples Directory
- **[Basic Usage](examples/basic_usage.py)** - Simple agent creation and usage patterns
- **[Advanced Agents](examples/agent_patterns.py)** - Custom agent development patterns
- **[Multi-Agent Systems](examples/multi_agent_system.py)** - Complex system orchestration
- **[Memory Integration](examples/memory_usage.py)** - Redis and in-memory patterns
- **[Tool Development](examples/tool_integration.py)** - Custom tool creation
- **[Production Deployment](examples/production_deployment.py)** - Enterprise patterns
- **[Performance Testing](tests/performance/)** - Load testing and validation
- **[Customer Support](examples/customer_support_system.py)** - Complete support system
- **[Content Pipeline](examples/content_generation_pipeline.py)** - Multi-stage content creation
- **[Data Analysis](examples/data_analysis_assistant.py)** - Intelligent data analysis
- **[E-commerce Assistant](examples/ecommerce_assistant.py)** - Shopping and recommendations
- **[Code Review Bot](examples/code_review_agent.py)** - Automated code analysis
- **[Research Assistant](examples/research_assistant.py)** - Academic research support
- **[Meeting Summarizer](examples/meeting_summarizer.py)** - Meeting analysis and action items
## Comprehensive Documentation
### š Getting Started
- **[Quick Start Guide](docs/01-getting-started/)** - Your first Arshai application in 5 minutes
- **[Installation Guide](docs/01-getting-started/installation.md)** - Detailed setup instructions
- **[Core Concepts](docs/01-getting-started/core-concepts.md)** - Understanding the framework
- **[First Agent](docs/01-getting-started/first-agent.md)** - Build your first intelligent agent
### š§ Philosophy & Architecture
- **[Developer Authority](docs/00-philosophy/developer-authority.md)** - Core framework philosophy
- **[Three-Layer Architecture](docs/00-philosophy/three-layer-architecture.md)** - Architectural foundations
- **[Design Decisions](docs/00-philosophy/design-decisions.md)** - Why we made these choices
- **[Comparison with Other Frameworks](docs/00-philosophy/comparisons.md)** - How Arshai differs
### šļø Layer-by-Layer Guides
#### **Layer 1: LLM Clients**
- **[OpenAI Integration](docs/02-layer-guides/layer1-llm-clients/openai.md)** - Complete OpenAI setup
- **[Azure OpenAI](docs/02-layer-guides/layer1-llm-clients/azure.md)** - Enterprise Azure deployment
- **[Google Gemini](docs/02-layer-guides/layer1-llm-clients/gemini.md)** - Gemini Pro integration
- **[OpenRouter](docs/02-layer-guides/layer1-llm-clients/openrouter.md)** - Access to 200+ models
- **[Function Calling](docs/02-layer-guides/layer1-llm-clients/function-calling.md)** - Advanced tool integration
- **[Streaming](docs/02-layer-guides/layer1-llm-clients/streaming.md)** - Real-time responses
- **[Custom Providers](docs/02-layer-guides/layer1-llm-clients/custom-providers.md)** - Adding new LLM providers
#### **Layer 2: Agents**
- **[Agent Fundamentals](docs/02-layer-guides/layer2-agents/fundamentals.md)** - Core agent concepts
- **[BaseAgent Guide](docs/02-layer-guides/layer2-agents/base-agent.md)** - Using the base agent class
- **[Custom Agents](docs/02-layer-guides/layer2-agents/custom-agents.md)** - Building specialized agents
- **[Working Memory](docs/02-layer-guides/layer2-agents/working-memory.md)** - Context-aware agents
- **[Tool Integration](docs/02-layer-guides/layer2-agents/tools.md)** - Adding capabilities to agents
- **[Error Handling](docs/02-layer-guides/layer2-agents/error-handling.md)** - Robust agent design
- **[Testing Agents](docs/02-layer-guides/layer2-agents/testing.md)** - Agent testing patterns
#### **Layer 3: Agentic Systems**
- **[System Design](docs/02-layer-guides/layer3-systems/design.md)** - Architecting complex systems
- **[Multi-Agent Coordination](docs/02-layer-guides/layer3-systems/coordination.md)** - Agent orchestration
- **[Workflow Management](docs/02-layer-guides/layer3-systems/workflows.md)** - Process automation
- **[State Management](docs/02-layer-guides/layer3-systems/state.md)** - System state handling
- **[Error Recovery](docs/02-layer-guides/layer3-systems/recovery.md)** - Resilient system design
- **[Scaling Patterns](docs/02-layer-guides/layer3-systems/scaling.md)** - High-volume architectures
### š ļø Development Patterns
- **[Direct Instantiation](docs/03-patterns/direct-instantiation.md)** - Core development pattern
- **[Dependency Injection](docs/03-patterns/dependency-injection.md)** - Managing dependencies
- **[Configuration Management](docs/03-patterns/configuration.md)** - Environment-based config
- **[Error Handling](docs/03-patterns/error-handling.md)** - Robust error strategies
- **[Testing Strategies](docs/03-patterns/testing.md)** - Comprehensive testing approaches
- **[Performance Patterns](docs/03-patterns/performance.md)** - Optimization techniques
### š§° Component Guides
#### **Memory Systems**
- **[Memory Overview](docs/04-components/memory/overview.md)** - Memory system architecture
- **[Redis Memory](docs/04-components/memory/redis.md)** - Production memory backend
- **[In-Memory Manager](docs/04-components/memory/in-memory.md)** - Development and testing
- **[Working Memory](docs/04-components/memory/working-memory.md)** - Conversation context
- **[Custom Memory](docs/04-components/memory/custom.md)** - Building custom backends
#### **Tools & Integrations**
- **[Tool Development](docs/04-components/tools/development.md)** - Creating custom tools
- **[Web Search Tools](docs/04-components/tools/web-search.md)** - SearxNG integration
- **[Database Tools](docs/04-components/tools/database.md)** - Database connectivity
- **[API Integration](docs/04-components/tools/api-integration.md)** - External API tools
- **[MCP Tools](docs/04-components/tools/mcp.md)** - Model Context Protocol
- **[Background Tasks](docs/04-components/tools/background-tasks.md)** - Fire-and-forget operations
#### **Vector Databases**
- **[Vector Database Overview](docs/04-components/vector-db/overview.md)** - Vector storage concepts
- **[Milvus Integration](docs/04-components/vector-db/milvus.md)** - Production vector database
- **[Hybrid Search](docs/04-components/vector-db/hybrid-search.md)** - Dense + sparse vectors
- **[Embedding Models](docs/04-components/vector-db/embeddings.md)** - Choosing embedding models
- **[Performance Tuning](docs/04-components/vector-db/performance.md)** - Optimization strategies
#### **Document Processing**
- **[Document Ingestion](docs/04-components/documents/ingestion.md)** - Processing various formats
- **[Chunking Strategies](docs/04-components/documents/chunking.md)** - Content segmentation
- **[Metadata Extraction](docs/04-components/documents/metadata.md)** - Automatic metadata detection
- **[Batch Processing](docs/04-components/documents/batch-processing.md)** - Large-scale document handling
### š Observability & Monitoring
- **[Observability Overview](docs/05-observability/overview.md)** - Monitoring philosophy
- **[OpenTelemetry Integration](docs/05-observability/opentelemetry.md)** - Distributed tracing
- **[Metrics Collection](docs/05-observability/metrics.md)** - Performance monitoring
- **[Custom Metrics](docs/05-observability/custom-metrics.md)** - Business-specific tracking
- **[Alerting Setup](docs/05-observability/alerting.md)** - Proactive monitoring
- **[Cost Tracking](docs/05-observability/cost-tracking.md)** - LLM cost optimization
- **[Debugging Guide](docs/05-observability/debugging.md)** - Troubleshooting techniques
### š Migration & Upgrades
- **[Migration Guide](docs/06-migration/)** - Upgrading from older versions
- **[Breaking Changes](docs/06-migration/breaking-changes.md)** - Version compatibility
- **[Legacy Support](docs/06-migration/legacy-support.md)** - Backward compatibility
- **[Migration Tools](docs/06-migration/tools.md)** - Automated migration utilities
### š§Ŗ Testing & Quality Assurance
- **[Testing Philosophy](docs/07-testing/philosophy.md)** - Arshai testing approach
- **[Unit Testing](docs/07-testing/unit-testing.md)** - Component-level testing
- **[Integration Testing](docs/07-testing/integration-testing.md)** - System-level testing
- **[Load Testing](docs/07-testing/load-testing.md)** - Performance validation
- **[Mock Strategies](docs/07-testing/mocking.md)** - Testing with mocks
- **[CI/CD Integration](docs/07-testing/cicd.md)** - Automated testing pipelines
### š¤ Contributing & Extending
- **[Contributing Guide](docs/08-contributing/)** - How to contribute
- **[Code Standards](docs/08-contributing/code-standards.md)** - Style and quality guidelines
- **[Adding LLM Providers](docs/08-contributing/llm-providers.md)** - Extending provider support
- **[Adding Tools](docs/08-contributing/tools.md)** - Contributing new tools
- **[Documentation](docs/08-contributing/documentation.md)** - Improving documentation
- **[Release Process](docs/08-contributing/releases.md)** - How releases are managed
### š Deployment & Production
- **[Production Deployment](docs/09-deployment/)** - Enterprise deployment guide
- **[Performance Optimization](docs/09-deployment/performance-optimization.md)** - Production optimization
- **[Docker Deployment](docs/09-deployment/docker.md)** - Containerized deployment
- **[Kubernetes Deployment](docs/09-deployment/kubernetes.md)** - Scalable orchestration
- **[Security Best Practices](docs/09-deployment/security.md)** - Production security
- **[Monitoring Setup](docs/09-deployment/monitoring.md)** - Production monitoring
- **[Backup & Recovery](docs/09-deployment/backup-recovery.md)** - Data protection
- **[Capacity Planning](docs/09-deployment/capacity-planning.md)** - Scaling strategies
### š API Reference
- **[Core Interfaces](docs/10-api-reference/interfaces/)** - Framework contracts
- **[LLM Clients API](docs/10-api-reference/llm-clients/)** - LLM provider APIs
- **[Agents API](docs/10-api-reference/agents/)** - Agent class references
- **[Tools API](docs/10-api-reference/tools/)** - Tool development APIs
- **[Memory API](docs/10-api-reference/memory/)** - Memory system APIs
- **[Configuration API](docs/10-api-reference/configuration/)** - Settings and config APIs
### š Tutorials & Learning Resources
- **[Video Tutorials](docs/11-tutorials/videos/)** - Step-by-step video guides
- **[Interactive Notebooks](docs/11-tutorials/notebooks/)** - Jupyter notebook tutorials
- **[Use Case Studies](docs/11-tutorials/case-studies/)** - Real-world implementations
- **[Best Practices](docs/11-tutorials/best-practices/)** - Proven patterns and approaches
- **[Troubleshooting](docs/11-tutorials/troubleshooting/)** - Common issues and solutions
### š Integration Guides
- **[FastAPI Integration](docs/12-integrations/fastapi.md)** - Web API development
- **[Streamlit Integration](docs/12-integrations/streamlit.md)** - Rapid UI development
- **[Discord Bot](docs/12-integrations/discord.md)** - Chatbot development
- **[Slack Integration](docs/12-integrations/slack.md)** - Workspace automation
- **[Database Integration](docs/12-integrations/databases.md)** - Data persistence
- **[Cloud Services](docs/12-integrations/cloud.md)** - AWS, Azure, GCP integration
## Why Choose Arshai? š
### ā
**Complete Developer Control**
- **No Hidden Magic**: Every component creation is explicit and visible
- **Direct Instantiation**: You create components when and how you want
- **Dependency Injection**: All dependencies passed through constructors
- **Architectural Freedom**: Design systems exactly as needed
- **Zero Lock-in**: Use framework components selectively
- **Interface-First**: Well-defined contracts make everything testable
### ā
**Production-Grade Performance**
- **Tested at Scale**: Validated for 1000+ concurrent users
- **Connection Management**: HTTP connection pooling prevents resource exhaustion
- **Thread Pool Control**: Bounded execution prevents system deadlocks
- **Memory Optimization**: Automatic cleanup and TTL management
- **Async Architecture**: Non-blocking I/O throughout the system
- **Load Balancing**: Built-in patterns for high-availability deployment
### ā
**Enterprise-Ready Features**
- **Multi-Provider Support**: OpenAI, Azure, Gemini, OpenRouter with unified interface
- **Advanced Function Calling**: Parallel execution with background tasks
- **Distributed Memory**: Redis-backed memory with automatic failover
- **Vector Database Integration**: Milvus with hybrid search capabilities
- **Comprehensive Monitoring**: OpenTelemetry integration with custom metrics
- **Security**: Built-in security best practices and audit logging
### ā
**Developer Experience Excellence**
- **Minimal Boilerplate**: Get started quickly with sensible defaults
- **Rich Documentation**: Comprehensive guides, examples, and API references
- **Easy Testing**: Mock-friendly design with dependency injection
- **Type Safety**: Full TypeScript-style type hints for Python
- **Hot Reloading**: Development server with automatic reload
- **Debugging Tools**: Comprehensive logging and tracing capabilities
### ā
**Flexible & Extensible**
- **Custom LLM Providers**: Add any LLM with standardized interface
- **Tool Ecosystem**: Create custom tools with simple interfaces
- **Agent Specialization**: Build domain-specific agents easily
- **Memory Backends**: Implement custom memory systems
- **Modular Design**: Use only the components you need
- **Plugin Architecture**: Extend functionality without core changes
### ā
**Cost Optimization**
- **Token Tracking**: Detailed usage analytics across all providers
- **Smart Caching**: Reduce redundant API calls automatically
- **Model Selection**: Use appropriate models for different tasks
- **Batch Processing**: Efficient processing of multiple requests
- **Cost Alerts**: Monitor and control spending with custom alerts
- **Usage Analytics**: Detailed cost breakdown and optimization recommendations
### ā
**Real-World Battle Tested**
```python
# Production deployments successfully running:
# Customer Support Systems
# - 500+ simultaneous conversations
# - 24/7 uptime with Redis failover
# - Multi-language support with specialized agents
# - Integration with Zendesk, Salesforce, custom CRMs
# Content Generation Pipelines
# - Processing 10,000+ articles daily
# - Multi-stage review with fact-checking
# - SEO optimization and quality scoring
# - Integration with CMS systems and publishing workflows
# Data Analysis Platforms
# - Processing GB-scale datasets
# - Real-time visualization generation
# - Statistical analysis with confidence intervals
# - Integration with Jupyter, Pandas, enterprise BI tools
# E-commerce Assistants
# - Handling 1000+ concurrent shoppers
# - Personalized recommendations with vector search
# - Real-time inventory and pricing integration
# - Multi-step checkout assistance with payment processing
```
### š **Framework Comparison**
| Feature | Arshai | LangChain | LlamaIndex | Custom Build |
|---------|--------|-----------|------------|-------------|
| **Developer Control** | ā
Complete | ā Limited | ā Framework-driven | ā
Complete |
| **Production Ready** | ā
Enterprise | ā ļø Community | ā ļø Research | ā Build yourself |
| **Performance** | ā
Optimized | ā Variable | ā Single-purpose | ā Depends |
| **Testing** | ā
Easy mocking | ā Complex | ā Framework bound | ā
Full control |
| **Documentation** | ā
Comprehensive | ā ļø Partial | ā ļø Limited | ā DIY |
| **Multi-Provider** | ā
Unified API | ā ļø Inconsistent | ā Limited | ā Manual |
| **Observability** | ā
Built-in | ā Manual setup | ā Basic | ā Custom build |
| **Scalability** | ā
1000+ users | ā ļø Unknown | ā ļø Research only | ā Your effort |
| **Time to Production** | ā
Days | ā ļø Weeks | ā ļø Months | ā Months |
### š **Success Stories**
> "We migrated from LangChain to Arshai and reduced our production issues by 80%. The explicit dependency injection made testing trivial, and the connection pooling solved our resource exhaustion problems."
> ā **Senior Engineer, Fortune 500 Company**
> "Arshai's three-layer architecture allowed us to build exactly what we needed without framework overhead. Our customer support system handles 500+ concurrent conversations with 99.9% uptime."
> ā **CTO, SaaS Startup**
> "The developer authority philosophy is game-changing. We can debug issues, optimize performance, and extend functionality without fighting the framework."
> ā **Lead AI Engineer, Enterprise Corp**
## Community & Enterprise Support
### š **Open Source Community**
- **Documentation**: [Complete documentation](docs/) with guides and API reference
- **Examples**: [Production-ready examples](examples/) for every use case
- **Issues**: [GitHub Issues](https://github.com/nimunzn/arshai/issues) for bugs and feature requests
- **Discussions**: [GitHub Discussions](https://github.com/nimunzn/arshai/discussions) for Q&A and ideas
- **Contributing**: [Contributing Guide](CONTRIBUTING.md) - help improve Arshai
### š¼ **Enterprise Support Options**
- **Priority Support**: Direct access to core maintainers
- **Custom Development**: Tailored features for enterprise needs
- **Architecture Review**: Expert guidance on system design
- **Training & Consulting**: Team training and implementation support
- **SLA Guarantees**: Production support with uptime commitments
- **Security Reviews**: Enterprise security audits and compliance
### š **Professional Services**
- **Migration Services**: Expert migration from other frameworks
- **Performance Optimization**: Production tuning and scaling
- **Custom Integrations**: Connect to proprietary systems and APIs
- **Team Training**: Comprehensive developer training programs
- **Architecture Design**: System design review and recommendations
- **24/7 Support**: Round-the-clock production support
**Contact**: [enterprise@arshai.dev](mailto:enterprise@arshai.dev)
## License & Legal
**MIT License** - see [LICENSE](LICENSE) file for complete details.
### š **What This Means For You**
- ā
**Commercial Use**: Use Arshai in commercial products
- ā
**Modification**: Modify the code for your needs
- ā
**Distribution**: Distribute your applications using Arshai
- ā
**Private Use**: Use in private/internal projects
- ā
**Patent Grant**: Protection against patent litigation
### š **Enterprise Considerations**
- **No Copyleft**: MIT license is business-friendly
- **No Vendor Lock-in**: Open source prevents dependency risks
- **Enterprise Support**: Commercial support available
- **Compliance**: Compatible with most enterprise license policies
- **Auditable Code**: Full source code transparency
### š **Third-Party Licenses**
Arshai includes components under compatible licenses:
- **Apache 2.0**: Various dependencies (compatible)
- **BSD**: Some utility libraries (compatible)
- **MIT**: Core dependencies (same license)
All dependencies are carefully vetted for license compatibility and security.
---
## Ready to Build? š
```bash
# Install Arshai and get started in 5 minutes
pip install arshai[all]
# Create your first intelligent agent
touch my_first_agent.py
```
```python
# my_first_agent.py
from arshai.llms.openai import OpenAIClient
from arshai.agents.base import BaseAgent
from arshai.core.interfaces.illm import ILLMConfig
from arshai.core.interfaces.iagent import IAgentInput
# You create the LLM client
llm_config = ILLMConfig(model="gpt-4o-mini", temperature=0.7)
llm_client = OpenAIClient(llm_config)
# You create the agent
agent = BaseAgent(llm_client, "You are a helpful coding assistant")
# You control the interaction
response = await agent.process(IAgentInput(
message="How do I handle async/await in Python?"
))
print(response)
```
### Next Steps
1. **[Quick Start Guide](docs/01-getting-started/)** - Build your first application
2. **[Examples Directory](examples/)** - Explore production-ready examples
3. **[Community Discord](https://discord.gg/arshai)** - Get help and share ideas
4. **[GitHub Discussions](https://github.com/nimunzn/arshai/discussions)** - Technical discussions
### Join the Community
- š¬ **[Discord Server](https://discord.gg/arshai)** - Real-time help and discussions
- š§ **[Newsletter](https://arshai.dev/newsletter)** - Updates and best practices
- š¦ **[Twitter/X](https://twitter.com/arshaidev)** - News and quick tips
- šŗ **[YouTube](https://youtube.com/@arshaidev)** - Video tutorials and demos
- š **[Blog](https://arshai.dev/blog)** - In-depth articles and case studies
---
**Build AI applications your way.** š
Arshai provides enterprise-grade building blocks. You architect the solution.
Raw data
{
"_id": null,
"home_page": "https://github.com/nimunzn/arshai",
"name": "arshai",
"maintainer": null,
"docs_url": null,
"requires_python": "<3.13,>=3.11",
"maintainer_email": null,
"keywords": "ai, agents, llm, workflow, rag, conversational-ai, multi-agent, vector-db, embeddings",
"author": "Nima Nazarian",
"author_email": "nimunzn@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/fd/ad/2f0422e90a9410dedc6a8a8502b20f4b79190097516e26d2e5fb78e8c64a/arshai-1.5.4.tar.gz",
"platform": null,
"description": "# Arshai\n\n**A powerful AI application framework built on clean architecture principles with complete developer control.**\n\n```mermaid\ngraph TD\n subgraph L3 [\"Layer 3: Agentic Systems\"]\n WF[Workflows] --> AG[Agents]\n MEM[Memory Systems] --> AG\n ORCH[Orchestration] --> AG\n end\n \n subgraph L2 [\"Layer 2: Agents\"] \n AG --> TOOLS[Tool Integration]\n AG --> LOGIC[Custom Logic]\n end\n \n subgraph L1 [\"Layer 1: LLM Clients\"]\n LOGIC --> OPENAI[OpenAI]\n LOGIC --> AZURE[Azure OpenAI]\n LOGIC --> GEMINI[Google Gemini]\n LOGIC --> ROUTER[OpenRouter]\n end\n \n style L3 fill:#e1f5fe\n style L2 fill:#f3e5f5\n style L1 fill:#e8f5e8\n```\n\n## Philosophy: Developer Authority First\n\nArshai empowers you to build exactly what you need, when you need it, how you need it. **You control every component, dependency, and interaction.** No magic, no hidden behavior, no framework lock-in.\n\n### The Arshai Way vs Traditional Frameworks\n\n**\u274c Traditional Framework Approach:**\n```python\n# Framework controls everything - what's happening inside?\nframework = AIFramework()\nframework.load_config(\"config.yaml\") \nagent = framework.create_agent(\"chatbot\") # Hidden creation\nresponse = agent.chat(\"Hello\") # Opaque behavior\n```\n\n**\u2705 Arshai Approach - You're In Control:**\n```python\n# You create, configure, and control everything\nllm_config = ILLMConfig(model=\"gpt-4\", temperature=0.7)\nllm_client = OpenAIClient(llm_config) # You create it\n\nagent = ChatAgent(\n llm_client=llm_client, # You inject dependencies \n system_prompt=\"Be helpful\", # You configure behavior\n memory=memory_manager # You manage state\n)\n\nresponse = await agent.process(input_data) # You control the flow\n```\n\n## Three-Layer Architecture\n\n### Layer 1: LLM Clients (Foundation)\n**Minimal Developer Authority** - Standardized access to AI providers\n\n```python\nfrom arshai.llms.openai import OpenAIClient\nfrom arshai.core.interfaces.illm import ILLMConfig, ILLMInput\n\n# Direct instantiation - you create when needed\nconfig = ILLMConfig(model=\"gpt-4\", temperature=0.7)\nllm = OpenAIClient(config)\n\n# Unified interface across all providers\nllm_input = ILLMInput(\n system_prompt=\"You are a helpful assistant\",\n user_message=\"What's the weather like?\",\n regular_functions={\"get_weather\": get_weather_func},\n background_tasks={\"log_interaction\": log_func} # Fire-and-forget\n)\n\nresponse = await llm.chat(llm_input)\n```\n\n**Supported Providers:**\n- **OpenAI** - Direct API with streaming and function calling\n- **Azure OpenAI** - Enterprise deployment with native parsing\n- **Google Gemini** - Native SDK with dual authentication \n- **OpenRouter** - Access to Claude, Llama, and 200+ models\n- **Extensible** - Add new providers by implementing `ILLM` interface\n\n### Layer 2: Agents (Business Logic)\n**Moderate Developer Authority** - Purpose-driven AI components\n\n```python\nfrom arshai.agents.base import BaseAgent\nfrom arshai.core.interfaces.iagent import IAgentInput\n\nclass CustomerServiceAgent(BaseAgent):\n \"\"\"Your custom agent with your business logic.\"\"\"\n \n def __init__(self, llm_client: ILLM, knowledge_base: KnowledgeBase):\n super().__init__(llm_client, \"You are a customer service expert\")\n self.knowledge_base = knowledge_base\n \n async def process(self, input: IAgentInput) -> Dict[str, Any]:\n # Your custom preprocessing\n context = await self._analyze_request(input.message)\n \n # Define tools for this specific agent\n async def search_kb(query: str) -> str:\n return await self.knowledge_base.search(query)\n \n async def log_issue(issue: str) -> None:\n \"\"\"BACKGROUND TASK: Log customer issue.\"\"\"\n await self.issue_tracker.create_ticket(issue)\n \n # Your LLM integration with custom tools\n llm_input = ILLMInput(\n system_prompt=self.system_prompt,\n user_message=f\"Context: {context}\\nUser: {input.message}\",\n regular_functions={\"search_knowledge\": search_kb},\n background_tasks={\"log_issue\": log_issue}\n )\n \n result = await self.llm_client.chat(llm_input)\n \n # Your custom post-processing\n return {\n \"response\": result[\"llm_response\"],\n \"confidence\": self._calculate_confidence(result),\n \"escalate\": self._should_escalate(result)\n }\n```\n\n### Layer 3: Agentic Systems (Complete Control) \n**Maximum Developer Authority** - Orchestrate everything exactly as you need\n\n```python\nclass CustomerSupportSystem:\n \"\"\"Your complete system - you control everything.\"\"\"\n \n def __init__(self):\n # You create all components\n llm = OpenAIClient(ILLMConfig(model=\"gpt-4\"))\n memory = RedisMemoryManager(redis_url=os.getenv(\"REDIS_URL\"))\n knowledge_base = VectorKnowledgeBase(milvus_client, embeddings)\n \n # You compose the system\n self.triage_agent = TriageAgent(llm, \"Route customer requests\")\n self.support_agent = CustomerServiceAgent(llm, knowledge_base)\n self.escalation_agent = EscalationAgent(llm, ticket_system)\n \n # You define the workflow\n self.memory = memory\n \n async def handle_request(self, user_message: str, user_id: str) -> Dict[str, Any]:\n \"\"\"Your orchestration logic.\"\"\"\n \n # Step 1: Retrieve user history\n history = await self.memory.retrieve({\"user_id\": user_id})\n \n # Step 2: Triage the request\n triage_input = IAgentInput(\n message=user_message,\n metadata={\"history\": history, \"user_id\": user_id}\n )\n triage_result = await self.triage_agent.process(triage_input)\n \n # Step 3: Route based on triage (your business logic)\n if triage_result[\"urgency\"] == \"high\":\n agent = self.escalation_agent\n else:\n agent = self.support_agent\n \n # Step 4: Process with selected agent\n support_input = IAgentInput(\n message=user_message,\n metadata={\n \"user_id\": user_id,\n \"triage\": triage_result,\n \"history\": history\n }\n )\n final_result = await agent.process(support_input)\n \n # Step 5: Update memory (your state management)\n await self.memory.store({\n \"user_id\": user_id,\n \"interaction\": {\n \"request\": user_message,\n \"response\": final_result[\"response\"],\n \"agent_used\": agent.__class__.__name__,\n \"timestamp\": datetime.now().isoformat()\n }\n })\n \n # Your final response format\n return {\n \"response\": final_result[\"response\"],\n \"agent_used\": agent.__class__.__name__,\n \"escalated\": isinstance(agent, EscalationAgent),\n \"confidence\": final_result.get(\"confidence\", 0.0)\n }\n```\n\n## Comprehensive Feature Overview\n\n### \ud83c\udfaf **Developer Authority**\n- **Direct Instantiation**: You create every component explicitly\n- **Dependency Injection**: All dependencies passed through constructors\n- **No Hidden Magic**: Every behavior is visible in your code\n- **Complete Control**: You decide architecture, not the framework\n- **Interface-First Design**: Well-defined contracts for easy testing\n- **Zero Framework Lock-in**: Use what you need, ignore the rest\n\n### \ud83e\udd16 **Advanced LLM Client System**\n\n#### **Multi-Provider Support**\n- **OpenAI Integration**: Native API with GPT-4o, GPT-4o-mini, and all models\n - Streaming with real-time function execution\n - Advanced function calling with parallel execution\n - Structured outputs with Pydantic models\n - Token usage tracking and cost optimization\n - Rate limiting and error handling\n\n- **Azure OpenAI Enterprise**: Full Azure deployment support\n - Azure authentication and endpoint management\n - Content filtering and compliance features\n - Regional deployment flexibility\n - Enterprise security and audit logging\n\n- **Google Gemini**: Native SDK integration\n - Gemini Pro, Flash, and specialized models\n - Dual authentication (API key and Service Account)\n - Advanced safety settings and content filtering\n - Image and multimodal capabilities\n - Real-time streaming with function calls\n\n- **OpenRouter**: Access to 200+ models including:\n - Anthropic Claude 3.5 Sonnet, Haiku, Opus\n - Meta Llama 3.1, 3.2 (8B, 70B, 405B)\n - Mistral Large, 8x7B, 8x22B\n - Cohere Command R+\n - Specialized models for coding, reasoning, and creativity\n\n#### **Unified Function Calling Architecture**\n```python\n# Works identically across all providers\ndef get_weather(city: str) -> dict:\n \"\"\"Get current weather for a city.\"\"\"\n return {\"temperature\": 72, \"condition\": \"sunny\"}\n\nasync def send_notification(message: str) -> None:\n \"\"\"BACKGROUND TASK: Send notification.\"\"\"\n # Runs independently, doesn't block conversation\n print(f\"\ud83d\udce7 Notification: {message}\")\n\n# Same interface for any provider\nllm_input = ILLMInput(\n system_prompt=\"You are a helpful assistant\",\n user_message=\"What's the weather in San Francisco?\",\n regular_functions={\"get_weather\": get_weather},\n background_tasks={\"send_notification\": send_notification}\n)\n\n# Works with OpenAI, Azure, Gemini, OpenRouter\nresponse = await llm_client.chat(llm_input)\n```\n\n#### **Progressive Streaming**\n- **Real-time Function Execution**: Functions execute immediately during streaming\n- **Parallel Tool Execution**: Regular tools run concurrently for speed\n- **Background Task Management**: Fire-and-forget operations tracked automatically\n- **Enhanced Context Building**: Function results integrated into conversation\n- **Safe Usage Accumulation**: Robust token and cost tracking during streaming\n\n### \ud83e\udde0 **Sophisticated Agent System**\n\n#### **Base Agent Capabilities**\n- **Flexible Return Types**: Return strings, dicts, generators, or custom objects\n- **Stateless Design**: Pure functions with no hidden state\n- **Tool Integration**: Easy integration of custom functions and APIs\n- **Memory Compatibility**: Works with any memory backend\n- **Error Resilience**: Graceful handling of LLM failures\n\n#### **Specialized Agent Types**\n```python\n# Working Memory Agent - Maintains conversation context\nfrom arshai.agents.working_memory import WorkingMemoryAgent\n\n# Base Agent - Flexible foundation for custom agents\nfrom arshai.agents.base import BaseAgent\n\n# Custom domain-specific agents\nclass CustomerServiceAgent(BaseAgent):\n def __init__(self, llm_client, knowledge_base, escalation_system):\n super().__init__(llm_client, \"Customer service expert\")\n self.knowledge_base = knowledge_base\n self.escalation_system = escalation_system\n \n async def process(self, input: IAgentInput) -> Dict[str, Any]:\n # Your custom business logic\n # Access to knowledge base, escalation, metrics\n # Return structured responses with confidence scores\n pass\n```\n\n#### **Agent Development Patterns**\n- **Domain Specialization**: Create agents for specific business domains\n- **Tool Composition**: Combine multiple tools and APIs per agent\n- **Response Formatting**: Structure outputs exactly as needed\n- **Confidence Scoring**: Built-in or custom confidence assessment\n- **Escalation Logic**: Automatic escalation to human agents or specialized systems\n\n### \ud83d\udd27 **Comprehensive Tool Ecosystem**\n\n#### **Built-in Tools**\n- **Web Search**: SearxNG integration with connection pooling\n - Multiple search engines aggregation\n - Result filtering and ranking\n - Performance optimization for production\n\n- **Knowledge Base**: Vector database integration\n - Milvus support with hybrid search\n - Document ingestion and chunking\n - Semantic search with embedding models\n - Metadata filtering and faceted search\n\n- **MCP (Model Context Protocol) Tools**: Dynamic tool loading\n - File system operations (read, write, search)\n - Database connections and queries \n - API integrations and webhooks\n - Custom tool development framework\n - Thread pool management for performance\n\n#### **Custom Tool Development**\n```python\nfrom arshai.core.interfaces.itool import ITool\n\nclass DatabaseTool(ITool):\n def __init__(self, db_connection):\n self.db = db_connection\n \n async def search_products(self, query: str, category: str = None) -> List[Dict]:\n \"\"\"Search products in database.\"\"\"\n # Your database logic\n return results\n \n async def log_interaction(self, user_id: str, action: str) -> None:\n \"\"\"BACKGROUND TASK: Log user interaction.\"\"\"\n # Background logging, doesn't affect conversation\n await self.db.log(user_id, action)\n\n# Easy integration with agents\ndb_tool = DatabaseTool(connection)\nagent = CustomAgent(\n llm_client, \n regular_functions={\"search_products\": db_tool.search_products},\n background_tasks={\"log_interaction\": db_tool.log_interaction}\n)\n```\n\n### \ud83e\uddec **Agentic System Orchestration**\n\n#### **Workflow Management**\n- **State-Driven Workflows**: Complex multi-step processes\n- **Conditional Branching**: Dynamic execution paths based on results\n- **Error Recovery**: Automatic retry and fallback mechanisms\n- **Parallel Execution**: Concurrent agent execution for performance\n- **State Persistence**: Resume workflows across sessions\n\n#### **Multi-Agent Coordination**\n```python\nclass IntelligentHelpdesk:\n def __init__(self):\n # Create specialized agents\n self.triage_agent = TriageAgent(llm_client)\n self.technical_agent = TechnicalSupportAgent(llm_client, knowledge_base)\n self.billing_agent = BillingAgent(llm_client, billing_system)\n self.escalation_agent = EscalationAgent(llm_client, human_queue)\n \n # Shared resources\n self.memory_manager = RedisMemoryManager()\n self.metrics_collector = MetricsCollector()\n \n async def handle_request(self, request: str, user_id: str) -> Dict[str, Any]:\n # Step 1: Triage determines routing\n triage_result = await self.triage_agent.classify(request)\n \n # Step 2: Route to appropriate specialist\n if triage_result['category'] == 'technical':\n agent = self.technical_agent\n elif triage_result['category'] == 'billing':\n agent = self.billing_agent\n elif triage_result['urgency'] == 'critical':\n agent = self.escalation_agent\n \n # Step 3: Process with context and memory\n context = await self.memory_manager.get_user_context(user_id)\n result = await agent.process_with_context(request, context)\n \n # Step 4: Update memory and metrics\n await self.memory_manager.update_context(user_id, result)\n await self.metrics_collector.record_interaction(result)\n \n return result\n```\n\n### \ud83d\udcbe **Advanced Memory Management**\n\n#### **Memory Backend Options**\n- **Redis Memory Manager**: Production-ready distributed memory\n - Automatic TTL management\n - Connection pooling and failover\n - Pub/sub for real-time updates\n - Memory cleanup and optimization\n\n- **In-Memory Manager**: Fast development and testing\n - LRU eviction policies\n - Background cleanup tasks\n - Memory usage monitoring\n\n- **Working Memory System**: Context-aware conversation tracking\n - Automatic context summarization\n - Relevance scoring and pruning\n - Cross-session continuity\n\n#### **Memory Usage Patterns**\n```python\n# Environment-specific memory selection\ndef create_memory_system(environment: str) -> IMemoryManager:\n if environment == \"production\":\n return RedisMemoryManager(\n redis_url=os.getenv(\"REDIS_URL\"),\n ttl=3600, # 1 hour TTL\n key_prefix=\"app_prod\",\n connection_pool_size=10\n )\n elif environment == \"development\":\n return InMemoryManager(\n ttl=1800, # 30 minutes\n max_size=1000,\n cleanup_interval=300 # 5 minutes\n )\n else:\n return InMemoryManager(ttl=300) # Testing\n\n# Advanced conversation memory\nclass ConversationMemory:\n def __init__(self, memory_manager, summarizer_agent):\n self.memory = memory_manager\n self.summarizer = summarizer_agent\n \n async def store_conversation(self, user_id: str, conversation: List[Dict]):\n # Automatic summarization for long conversations\n if len(conversation) > 10:\n summary = await self.summarizer.summarize(conversation[-10:])\n conversation = conversation[:-10] + [summary]\n \n await self.memory.store({\n \"user_id\": user_id,\n \"conversation\": conversation,\n \"timestamp\": datetime.now().isoformat()\n })\n```\n\n### \ud83d\udcca **Production-Grade Observability**\n\n#### **OpenTelemetry Integration**\n- **Automatic Instrumentation**: No code changes required\n- **Distributed Tracing**: Track requests across services\n- **Custom Metrics**: Business-specific monitoring\n- **Export Flexibility**: Send to any OTLP-compatible backend\n\n#### **Built-in Metrics**\n```python\n# Automatic metrics collection\nmetrics_collected = {\n \"llm_requests_total\": \"Counter of LLM API calls\",\n \"llm_request_duration_seconds\": \"Histogram of request latencies\",\n \"llm_tokens_total\": \"Counter of tokens used (input/output/total)\",\n \"llm_cost_total\": \"Counter of API costs by provider\",\n \"llm_time_to_first_token_seconds\": \"Time to start streaming\",\n \"llm_time_to_last_token_seconds\": \"Total generation time\",\n \"agent_processing_duration_seconds\": \"Agent processing time\",\n \"memory_operations_total\": \"Memory store/retrieve operations\",\n \"tool_execution_duration_seconds\": \"Tool execution timing\",\n \"background_task_duration_seconds\": \"Background task timing\"\n}\n\n# Zero-fallback monitoring - always works\nfrom arshai.observability import ObservabilityConfig\n\n# Optional: Advanced configuration\nobs_config = ObservabilityConfig(\n endpoint=\"http://jaeger:14268/api/traces\",\n service_name=\"arshai_app\",\n environment=\"production\",\n custom_metrics={\"business_metric\": \"counter\"}\n)\n\n# Automatic instrumentation\nclient = OpenAIClient(\n config=llm_config,\n observability_config=obs_config # Optional\n)\n\n# Metrics automatically exported to your monitoring system\n```\n\n#### **Performance Monitoring**\n- **Real-time Dashboards**: Track system health and performance\n- **Alert Configuration**: Custom alerts for SLA violations\n- **Cost Tracking**: Detailed cost breakdown by model and usage\n- **Error Rate Monitoring**: Track and analyze failure patterns\n- **Capacity Planning**: Usage trends and scaling recommendations\n\n### \ud83d\uddc4\ufe0f **Vector Database Integration**\n\n#### **Milvus Support**\n- **Hybrid Search**: Dense + sparse vector combination\n - Semantic similarity with dense vectors\n - Keyword matching with sparse vectors\n - Weighted ranking and reranking\n\n- **Production Features**:\n - Connection pooling and async operations\n - Batch insertion with configurable batch sizes\n - Automatic index creation and optimization\n - Metadata filtering and complex queries\n - Schema versioning and migration support\n\n```python\n# Advanced vector operations\nfrom arshai.vector_db.milvus_client import MilvusClient\nfrom arshai.core.interfaces.ivector_db_client import ICollectionConfig\n\n# Async operations for performance\nclient = MilvusClient()\nconfig = ICollectionConfig(\n collection_name=\"knowledge_base\",\n dense_dim=1536, # OpenAI embeddings\n is_hybrid=True, # Enable sparse + dense\n schema_model=CustomDocument # Structured metadata\n)\n\n# Hybrid search with filtering\nresults = await client.hybrid_search(\n config=config,\n dense_vectors=dense_embeddings,\n sparse_vectors=sparse_embeddings,\n expr=\"metadata['category'] == 'technical'\",\n limit=10\n)\n```\n\n### \ud83d\udce6 **Document Processing Pipeline**\n\n#### **Ingestion and Chunking**\n- **Multi-format Support**: PDF, DOCX, TXT, HTML, Markdown\n- **Intelligent Chunking**: Semantic and structural chunking\n- **Metadata Extraction**: Automatic metadata detection\n- **Batch Processing**: Efficient large-scale ingestion\n- **Deduplication**: Content-based duplicate detection\n\n#### **Embedding Generation**\n- **Multiple Providers**: OpenAI, Google, Hugging Face\n- **Model Selection**: Choose optimal models per use case\n- **Batch Processing**: Efficient embedding generation\n- **Caching**: Avoid re-processing unchanged content\n\n### \ud83d\ude80 **Enterprise Performance & Scalability**\n\n#### **Connection Management**\n- **HTTP Connection Pooling**: Configurable limits prevent resource exhaustion\n - Default: 100 max connections, 10 per host\n - Configurable via environment variables\n - Automatic connection reuse and cleanup\n\n- **Thread Pool Management**: Bounded execution prevents deadlocks\n - CPU-aware thread limits: `min(32, cpu_count * 2)`\n - Shared pools across MCP tools\n - Graceful shutdown and cleanup\n\n#### **Async Architecture**\n- **Non-blocking Operations**: All I/O operations are async\n- **Concurrent Processing**: Parallel function execution\n- **Background Tasks**: Fire-and-forget operations\n- **Resource Management**: Automatic cleanup and monitoring\n\n#### **Production Optimizations**\n```python\n# Environment-based configuration\nos.environ.update({\n 'ARSHAI_MAX_CONNECTIONS': '200', # HTTP connection pool\n 'ARSHAI_MAX_CONNECTIONS_PER_HOST': '20', # Per-host limit\n 'ARSHAI_MAX_THREADS': '64', # Thread pool size\n 'ARSHAI_MAX_MEMORY_MB': '4096', # Memory limit\n 'ARSHAI_CLEANUP_INTERVAL': '300', # Background cleanup\n 'ARSHAI_BATCH_SIZE': '100' # Vector DB batch size\n})\n\n# Production deployment patterns\nclass ProductionApp:\n def __init__(self):\n # Shared resources for efficiency\n self.llm_pool = {\n 'fast': [OpenAIClient(fast_config) for _ in range(3)],\n 'powerful': [OpenAIClient(powerful_config) for _ in range(2)]\n }\n \n self.memory_manager = RedisMemoryManager(\n redis_url=os.getenv(\"REDIS_URL\"),\n connection_pool_size=20,\n ttl=7200\n )\n \n # Load balancing and circuit breakers\n self.load_balancer = LoadBalancer(self.llm_pool)\n self.circuit_breaker = CircuitBreaker(failure_threshold=5)\n \n async def handle_concurrent_requests(self, requests: List[str]) -> List[str]:\n # Process up to 100 concurrent requests\n semaphore = asyncio.Semaphore(100)\n \n async def process_one(request):\n async with semaphore:\n agent = self.create_agent()\n return await agent.process(IAgentInput(message=request))\n \n tasks = [process_one(req) for req in requests]\n results = await asyncio.gather(*tasks, return_exceptions=True)\n return results\n```\n\n### \ud83d\udee0\ufe0f **Development and Testing**\n\n#### **Testing Framework**\n- **Mock Support**: Easy mocking of LLM clients and dependencies\n- **Integration Testing**: Real LLM integration tests\n- **Load Testing**: Performance and scalability validation\n- **Async Testing**: Full async/await test support\n\n```python\n# Easy testing with dependency injection\nasync def test_customer_agent():\n mock_llm = AsyncMock()\n mock_llm.chat.return_value = {\n \"llm_response\": \"Test response\",\n \"usage\": {\"total_tokens\": 50}\n }\n \n mock_kb = AsyncMock()\n mock_kb.search.return_value = [\"relevant info\"]\n \n # Create agent with mocked dependencies\n agent = CustomerServiceAgent(mock_llm, mock_kb)\n \n result = await agent.process(IAgentInput(\n message=\"Help with billing\",\n metadata={\"user_id\": \"123\"}\n ))\n \n assert \"Test response\" in result[\"response\"]\n mock_llm.chat.assert_called_once()\n mock_kb.search.assert_called_once()\n```\n\n#### **Development Patterns**\n- **Configuration Management**: Environment-based settings\n- **Hot Reloading**: Development server with auto-reload\n- **Debugging Tools**: Comprehensive logging and tracing\n- **Performance Profiling**: Built-in performance analysis\n\n## Installation & Setup\n\n### Basic Installation\n```bash\n# Core framework only\npip install arshai\n\n# With all optional dependencies\npip install arshai[all] # Includes all features below\n\n# Specific feature sets\npip install arshai[redis,milvus,observability] # Production essentials\npip install arshai[docs] # Documentation building\n```\n\n### Feature-Specific Installations\n```bash\n# Memory backends\npip install arshai[redis] # Redis memory manager\n\n# Vector databases \npip install arshai[milvus] # Milvus vector database\n\n# Advanced search\npip install arshai[flashrank] # Re-ranking capabilities\n\n# Monitoring\npip install arshai[observability] # OpenTelemetry integration\n\n# Development tools\npip install arshai[dev] # Testing and development utilities\n```\n\n### Environment Setup\n```bash\n# LLM Provider API Keys\nexport OPENAI_API_KEY=\"your-openai-key\"\nexport AZURE_OPENAI_API_KEY=\"your-azure-key\"\nexport AZURE_OPENAI_ENDPOINT=\"https://your-resource.openai.azure.com/\"\nexport GOOGLE_API_KEY=\"your-gemini-key\"\nexport OPENROUTER_API_KEY=\"your-openrouter-key\"\n\n# Optional: Google Service Account (alternative to API key)\nexport GOOGLE_APPLICATION_CREDENTIALS=\"/path/to/service-account.json\"\n\n# Memory backends\nexport REDIS_URL=\"redis://localhost:6379\"\nexport REDIS_TTL=\"3600\" # 1 hour\n\n# Vector database\nexport MILVUS_HOST=\"localhost\"\nexport MILVUS_PORT=\"19530\"\nexport MILVUS_DB_NAME=\"default\"\n\n# Performance tuning\nexport ARSHAI_MAX_CONNECTIONS=\"100\"\nexport ARSHAI_MAX_THREADS=\"32\"\nexport ARSHAI_BATCH_SIZE=\"50\"\n\n# Observability\nexport OTEL_EXPORTER_OTLP_ENDPOINT=\"http://jaeger:14268\"\nexport OTEL_SERVICE_NAME=\"arshai_app\"\n```\n\n### Docker Setup\n```dockerfile\n# Production Dockerfile\nFROM python:3.11-slim\n\nWORKDIR /app\n\n# Install production dependencies\nCOPY requirements.txt .\nRUN pip install -r requirements.txt\n\n# Performance configuration\nENV ARSHAI_MAX_CONNECTIONS=200\nENV ARSHAI_MAX_CONNECTIONS_PER_HOST=20\nENV ARSHAI_MAX_THREADS=64\nENV ARSHAI_MAX_MEMORY_MB=4096\n\n# Redis connection\nENV REDIS_URL=redis://redis:6379\nENV REDIS_TTL=7200\n\n# Milvus connection\nENV MILVUS_HOST=milvus\nENV MILVUS_PORT=19530\n\nCOPY . .\nCMD [\"python\", \"app.py\"]\n```\n\n### Docker Compose for Development\n```yaml\n# docker-compose.yml\nversion: '3.8'\n\nservices:\n app:\n build: .\n environment:\n - REDIS_URL=redis://redis:6379\n - MILVUS_HOST=milvus\n - OPENAI_API_KEY=${OPENAI_API_KEY}\n depends_on:\n - redis\n - milvus\n - jaeger\n \n redis:\n image: redis:7-alpine\n ports:\n - \"6379:6379\"\n \n milvus:\n image: milvusdb/milvus:latest\n ports:\n - \"19530:19530\"\n environment:\n - ETCD_ENDPOINTS=etcd:2379\n \n jaeger:\n image: jaegertracing/all-in-one:latest\n ports:\n - \"16686:16686\"\n - \"14268:14268\"\n```\n\n## Quick Start Guide \ud83d\ude80\n\n### 1. Quick Start - Your First Agent\n\n```python\nfrom arshai.llms.openai import OpenAIClient\nfrom arshai.agents.base import BaseAgent\nfrom arshai.core.interfaces.illm import ILLMConfig\nfrom arshai.core.interfaces.iagent import IAgentInput\n\n# Step 1: Create your LLM client (you control this)\nllm_config = ILLMConfig(\n model=\"gpt-4o\", # Latest model\n temperature=0.7, # Creativity level\n max_tokens=2000 # Response length\n)\nllm_client = OpenAIClient(llm_config)\n\n# Step 2: Create your agent (you define the behavior)\nagent = BaseAgent(\n llm_client=llm_client, \n system_prompt=\"You are an expert Python programming assistant. Provide clear, practical examples with explanations.\"\n)\n\n# Step 3: Use your agent (you control the interaction)\nresponse = await agent.process(IAgentInput(\n message=\"How do I handle async/await in Python? Show me a practical example.\"\n))\n\nprint(response)\n# Output: Detailed explanation with code examples\n```\n\n### 1b. Agent with Memory (Conversation Context)\n\n```python\nfrom arshai.agents.working_memory import WorkingMemoryAgent\nfrom arshai.memory.working_memory.in_memory_manager import InMemoryManager\n\n# Create memory system for conversation context\nmemory = InMemoryManager(ttl=1800) # 30-minute memory\n\n# Create agent with memory capabilities\nconversation_agent = WorkingMemoryAgent(\n llm_client=llm_client,\n memory_manager=memory,\n system_prompt=\"You are a helpful assistant that remembers our conversation context.\"\n)\n\n# First interaction\nresponse1 = await conversation_agent.process(IAgentInput(\n message=\"My name is Alex and I'm learning Python.\",\n metadata={\"conversation_id\": \"conv_123\"}\n))\n\n# Second interaction - agent remembers context\nresponse2 = await conversation_agent.process(IAgentInput(\n message=\"What was my name again?\",\n metadata={\"conversation_id\": \"conv_123\"}\n))\n\nprint(response2) # Will remember \"Alex\"\n```\n\n### 1c. Multi-Provider Setup\n\n```python\n# Use different providers for different tasks\nfrom arshai.llms.openai import OpenAIClient\nfrom arshai.llms.google_genai import GeminiClient\nfrom arshai.llms.openrouter import OpenRouterClient\n\n# Fast model for simple tasks\nfast_llm = OpenAIClient(ILLMConfig(model=\"gpt-4o-mini\", temperature=0.3))\n\n# Powerful model for complex analysis\nstrong_llm = OpenAIClient(ILLMConfig(model=\"gpt-4o\", temperature=0.5))\n\n# Alternative provider for variety\ngemini_llm = GeminiClient(ILLMConfig(model=\"gemini-1.5-pro\", temperature=0.7))\n\n# Access to specialized models\nopenrouter_llm = OpenRouterClient(ILLMConfig(\n model=\"anthropic/claude-3.5-sonnet\", # Claude via OpenRouter\n temperature=0.6\n))\n\n# Create specialized agents\nquick_agent = BaseAgent(fast_llm, \"Quick responses for simple questions\")\nanalysis_agent = BaseAgent(strong_llm, \"Deep analysis and complex reasoning\")\ncreative_agent = BaseAgent(gemini_llm, \"Creative writing and brainstorming\")\nresearch_agent = BaseAgent(openrouter_llm, \"Research and detailed explanations\")\n\n# Use the right agent for each task\nquick_response = await quick_agent.process(IAgentInput(\n message=\"What's 15% tip on $42?\"\n))\n\nanalysis_response = await analysis_agent.process(IAgentInput(\n message=\"Analyze the pros and cons of microservices vs monolithic architecture for a startup\"\n))\n```\n\n### 2. Advanced Agent with Multiple Tool Types\n\n```python\n# Comprehensive restaurant agent with multiple capabilities\nclass SmartRestaurantAgent(BaseAgent):\n \"\"\"Intelligent restaurant agent with real-time data and background tasks.\"\"\"\n \n def __init__(self, llm_client: ILLM, reservation_system, pos_system, notification_service):\n super().__init__(llm_client, \"You are an expert restaurant assistant with access to real-time data\")\n self.reservation_system = reservation_system\n self.pos_system = pos_system\n self.notification_service = notification_service\n \n async def process(self, input: IAgentInput) -> Dict[str, Any]:\n # Define regular tools (return results to conversation)\n def get_current_time() -> Dict[str, str]:\n \"\"\"Get current time with timezone info.\"\"\"\n from datetime import datetime\n import pytz\n \n now = datetime.now(pytz.UTC)\n local_time = now.astimezone(pytz.timezone('America/New_York'))\n \n return {\n \"utc_time\": now.isoformat(),\n \"local_time\": local_time.strftime(\"%I:%M %p\"),\n \"date\": local_time.strftime(\"%A, %B %d, %Y\"),\n \"timezone\": \"Eastern Time\"\n }\n \n def calculate_bill_details(subtotal: float, tax_rate: float = 0.08, tip_percent: float = 20) -> Dict[str, float]:\n \"\"\"Calculate comprehensive bill breakdown.\"\"\"\n tax = subtotal * tax_rate\n tip = subtotal * (tip_percent / 100)\n total = subtotal + tax + tip\n \n return {\n \"subtotal\": round(subtotal, 2),\n \"tax\": round(tax, 2),\n \"tip\": round(tip, 2),\n \"total\": round(total, 2),\n \"tip_percentage\": tip_percent\n }\n \n async def check_table_availability(party_size: int, preferred_time: str) -> Dict[str, Any]:\n \"\"\"Check real-time table availability.\"\"\"\n # Your reservation system integration\n available_times = await self.reservation_system.check_availability(\n party_size=party_size,\n date=preferred_time\n )\n \n return {\n \"available\": len(available_times) > 0,\n \"available_times\": available_times[:5], # Show top 5 options\n \"party_size\": party_size,\n \"wait_time_estimate\": \"15-20 minutes\" if available_times else \"45+ minutes\"\n }\n \n async def get_menu_recommendations(dietary_restrictions: str = None, budget_range: str = \"moderate\") -> List[Dict[str, Any]]:\n \"\"\"Get personalized menu recommendations.\"\"\"\n # Your menu system integration with filtering\n menu_items = await self.pos_system.get_filtered_menu(\n dietary_restrictions=dietary_restrictions,\n price_range=budget_range\n )\n \n return [\n {\n \"name\": item[\"name\"],\n \"description\": item[\"description\"],\n \"price\": item[\"price\"],\n \"dietary_tags\": item[\"tags\"],\n \"popularity_score\": item[\"popularity\"]\n }\n for item in menu_items[:8] # Top 8 recommendations\n ]\n \n # Define background tasks (fire-and-forget)\n async def log_customer_interaction(user_id: str, interaction_type: str, details: Dict[str, Any]) -> None:\n \"\"\"BACKGROUND TASK: Log interaction for analytics.\"\"\"\n await self.pos_system.log_interaction({\n \"user_id\": user_id,\n \"type\": interaction_type,\n \"details\": details,\n \"timestamp\": datetime.now().isoformat()\n })\n \n async def send_follow_up_notification(user_id: str, message: str, delay_minutes: int = 60) -> None:\n \"\"\"BACKGROUND TASK: Schedule follow-up notification.\"\"\"\n await self.notification_service.schedule_notification(\n user_id=user_id,\n message=message,\n delay_minutes=delay_minutes\n )\n \n async def update_customer_preferences(user_id: str, preferences: Dict[str, Any]) -> None:\n \"\"\"BACKGROUND TASK: Update customer profile.\"\"\"\n await self.pos_system.update_customer_profile(\n user_id=user_id,\n preferences=preferences\n )\n \n async def trigger_marketing_follow_up(user_id: str, interaction_context: str) -> None:\n \"\"\"BACKGROUND TASK: Trigger marketing automation.\"\"\"\n # Your CRM/marketing automation integration\n await self.notification_service.trigger_campaign(\n user_id=user_id,\n campaign_type=\"restaurant_follow_up\",\n context=interaction_context\n )\n \n # Prepare LLM input with all tools\n llm_input = ILLMInput(\n system_prompt=\"\"\"You are a sophisticated restaurant assistant with access to real-time systems.\n You can check availability, provide menu recommendations, calculate bills, and help with reservations.\n Always be helpful, accurate, and proactive in offering relevant information.\n When customers show interest, set up appropriate follow-ups.\"\"\",\n \n user_message=input.message,\n \n # Regular functions return results to continue conversation\n regular_functions={\n \"get_current_time\": get_current_time,\n \"calculate_bill_details\": calculate_bill_details,\n \"check_table_availability\": check_table_availability,\n \"get_menu_recommendations\": get_menu_recommendations\n },\n \n # Background tasks run independently\n background_tasks={\n \"log_customer_interaction\": log_customer_interaction,\n \"send_follow_up_notification\": send_follow_up_notification,\n \"update_customer_preferences\": update_customer_preferences,\n \"trigger_marketing_follow_up\": trigger_marketing_follow_up\n }\n )\n \n # Get LLM response with tool usage\n result = await self.llm_client.chat(llm_input)\n \n # Return structured response\n return {\n \"response\": result[\"llm_response\"],\n \"tools_used\": result.get(\"tools_used\", []),\n \"background_tasks_triggered\": result.get(\"background_tasks\", []),\n \"user_id\": input.metadata.get(\"user_id\"),\n \"session_id\": input.metadata.get(\"session_id\"),\n \"timestamp\": datetime.now().isoformat()\n }\n\n# Production usage\nasync def main():\n # Create dependencies\n llm = OpenAIClient(ILLMConfig(model=\"gpt-4o\", temperature=0.7))\n reservation_system = ReservationAPI()\n pos_system = POSIntegration()\n notification_service = NotificationService()\n \n # Create the restaurant agent\n restaurant_agent = SmartRestaurantAgent(\n llm_client=llm,\n reservation_system=reservation_system,\n pos_system=pos_system,\n notification_service=notification_service\n )\n \n # Example interaction\n response = await restaurant_agent.process(IAgentInput(\n message=\"Hi! I'm looking for a table for 4 people tonight around 7 PM. We have one vegetarian in our group and prefer mid-range prices. Also, what's a good tip for a $85 bill?\",\n metadata={\n \"user_id\": \"customer_123\",\n \"session_id\": \"session_456\"\n }\n ))\n \n print(f\"Response: {response['response']}\")\n print(f\"Tools used: {response['tools_used']}\")\n print(f\"Background tasks: {response['background_tasks_triggered']}\")\n\nif __name__ == \"__main__\":\n import asyncio\n asyncio.run(main())\n```\n\n### 3. Intelligent Multi-Agent System\n\n```python\nclass IntelligentHelpDeskSystem:\n \"\"\"Production-ready helpdesk with multiple AI agents.\"\"\"\n \n def __init__(self):\n # Shared LLM clients for different use cases\n self.fast_llm = OpenAIClient(ILLMConfig(model=\"gpt-4o-mini\", temperature=0.3))\n self.powerful_llm = OpenAIClient(ILLMConfig(model=\"gpt-4o\", temperature=0.3))\n \n # Persistent memory with Redis\n self.memory = RedisMemoryManager(\n redis_url=\"redis://localhost:6379\",\n ttl=3600, # 1 hour conversation memory\n key_prefix=\"helpdesk\"\n )\n \n # Knowledge base with vector search\n self.knowledge_base = VectorKnowledgeBase(\n vector_client=MilvusClient(),\n collection_name=\"support_docs\"\n )\n \n # External integrations\n self.ticket_system = JiraIntegration()\n self.billing_api = StripeAPI()\n self.user_db = PostgreSQLConnection()\n \n # Specialized agents with different capabilities\n self.agents = self._create_agents()\n \n # Smart routing with classification\n self.router = IntentClassifier(self.fast_llm)\n \n def _create_agents(self) -> Dict[str, IAgent]:\n \"\"\"Create specialized agents with unique tools and knowledge.\"\"\"\n \n return {\n \"triage\": TriageAgent(\n llm_client=self.fast_llm,\n system_prompt=\"You classify and prioritize customer requests\"\n ),\n \n \"technical\": TechnicalSupportAgent(\n llm_client=self.powerful_llm,\n knowledge_base=self.knowledge_base,\n tools={\n \"search_docs\": self.knowledge_base.search,\n \"check_system_status\": self._check_system_status,\n \"create_bug_report\": self._create_bug_report\n },\n background_tasks={\n \"log_technical_issue\": self._log_technical_issue,\n \"notify_engineering\": self._notify_engineering\n }\n ),\n \n \"billing\": BillingAgent(\n llm_client=self.fast_llm,\n billing_api=self.billing_api,\n tools={\n \"get_account_status\": self.billing_api.get_account,\n \"process_refund\": self.billing_api.process_refund,\n \"update_payment_method\": self.billing_api.update_payment\n },\n background_tasks={\n \"log_billing_interaction\": self._log_billing_interaction,\n \"send_receipt\": self._send_receipt\n }\n ),\n \n \"escalation\": EscalationAgent(\n llm_client=self.powerful_llm,\n ticket_system=self.ticket_system,\n tools={\n \"create_priority_ticket\": self.ticket_system.create_urgent,\n \"schedule_callback\": self._schedule_callback,\n \"escalate_to_manager\": self._escalate_to_manager\n },\n background_tasks={\n \"notify_management\": self._notify_management,\n \"update_crm\": self._update_crm\n }\n ),\n \n \"general\": GeneralSupportAgent(\n llm_client=self.fast_llm,\n knowledge_base=self.knowledge_base,\n tools={\n \"search_faq\": self.knowledge_base.search_faq,\n \"get_account_info\": self._get_basic_account_info\n }\n )\n }\n \n async def handle_customer_request(\n self, \n message: str, \n user_id: str, \n session_id: Optional[str] = None\n ) -> Dict[str, Any]:\n \"\"\"Main entry point for customer requests.\"\"\"\n \n # Generate session ID if not provided\n if not session_id:\n session_id = f\"session_{user_id}_{int(time.time())}\"\n \n # Retrieve conversation history and user context\n conversation_history = await self.memory.retrieve({\n \"session_id\": session_id,\n \"user_id\": user_id\n })\n \n user_context = await self._get_user_context(user_id)\n \n # Step 1: Intelligent routing with context\n routing_result = await self.router.classify_request(\n message=message,\n conversation_history=conversation_history,\n user_context=user_context\n )\n \n # Step 2: Select appropriate agent based on classification\n agent_type = self._select_agent(routing_result)\n agent = self.agents[agent_type]\n \n # Step 3: Prepare comprehensive input for the agent\n agent_input = IAgentInput(\n message=message,\n metadata={\n \"user_id\": user_id,\n \"session_id\": session_id,\n \"user_context\": user_context,\n \"conversation_history\": conversation_history,\n \"routing_info\": routing_result,\n \"timestamp\": datetime.now().isoformat()\n }\n )\n \n # Step 4: Process request with selected agent\n agent_response = await agent.process(agent_input)\n \n # Step 5: Post-process and enhance response\n enhanced_response = await self._enhance_response(\n agent_response, \n routing_result, \n user_context\n )\n \n # Step 6: Update conversation memory\n await self.memory.store({\n \"session_id\": session_id,\n \"user_id\": user_id,\n \"interaction\": {\n \"user_message\": message,\n \"agent_response\": enhanced_response,\n \"agent_used\": agent_type,\n \"confidence\": routing_result.get(\"confidence\", 0.0),\n \"resolved\": enhanced_response.get(\"resolved\", False),\n \"timestamp\": datetime.now().isoformat()\n }\n })\n \n # Step 7: Return comprehensive response\n return {\n \"response\": enhanced_response[\"message\"],\n \"agent_used\": agent_type,\n \"confidence\": routing_result.get(\"confidence\", 0.0),\n \"session_id\": session_id,\n \"resolved\": enhanced_response.get(\"resolved\", False),\n \"escalated\": enhanced_response.get(\"escalated\", False),\n \"next_steps\": enhanced_response.get(\"next_steps\", []),\n \"estimated_resolution_time\": enhanced_response.get(\"eta\"),\n \"satisfaction_survey\": enhanced_response.get(\"survey_link\")\n }\n \n def _select_agent(self, routing_result: Dict[str, Any]) -> str:\n \"\"\"Smart agent selection based on classification and context.\"\"\"\n intent = routing_result.get(\"intent\", \"general\")\n urgency = routing_result.get(\"urgency\", \"low\")\n complexity = routing_result.get(\"complexity\", \"simple\")\n \n # Escalation logic\n if urgency == \"critical\" or routing_result.get(\"escalate_immediately\"):\n return \"escalation\"\n \n # Intent-based routing\n if intent in [\"technical\", \"bug\", \"outage\"]:\n return \"technical\" if complexity != \"high\" else \"escalation\"\n elif intent in [\"billing\", \"payment\", \"subscription\"]:\n return \"billing\"\n elif intent in [\"complaint\", \"refund\", \"cancellation\"]:\n return \"escalation\"\n else:\n return \"general\"\n \n # Helper methods for agent tools\n async def _check_system_status(self) -> Dict[str, str]:\n \"\"\"Check current system status.\"\"\"\n # Your system monitoring integration\n return {\"status\": \"operational\", \"uptime\": \"99.9%\"}\n \n async def _get_user_context(self, user_id: str) -> Dict[str, Any]:\n \"\"\"Retrieve user context from various systems.\"\"\"\n # Aggregate user data from multiple sources\n return {\n \"account_type\": \"premium\",\n \"support_tier\": \"priority\",\n \"previous_issues\": 2,\n \"satisfaction_score\": 4.5\n }\n\n# Production usage\nasync def main():\n helpdesk = IntelligentHelpDeskSystem()\n \n # Handle customer request\n response = await helpdesk.handle_customer_request(\n message=\"My API is returning 500 errors since this morning. This is affecting our production system!\",\n user_id=\"enterprise_customer_123\"\n )\n \n print(f\"Agent used: {response['agent_used']}\")\n print(f\"Response: {response['response']}\")\n print(f\"Escalated: {response['escalated']}\")\n\nif __name__ == \"__main__\":\n import asyncio\n asyncio.run(main())\n```\n\n### 4. Advanced: Real-Time Streaming & Multi-Modal\n\n```python\n# Advanced streaming with real-time function execution\nasync def financial_advisor_stream():\n \"\"\"Financial advisor with real-time data and streaming.\"\"\"\n \n def get_stock_price(symbol: str) -> Dict[str, Any]:\n \"\"\"Get real-time stock price and metrics.\"\"\"\n # Your real-time data source integration\n return {\n \"symbol\": symbol,\n \"price\": 150.25,\n \"change\": \"+2.5%\", \n \"volume\": \"1.2M\",\n \"market_cap\": \"2.8T\"\n }\n \n def get_market_news(symbol: str) -> List[Dict[str, str]]:\n \"\"\"Get latest news for a stock symbol.\"\"\"\n # Your news API integration\n return [\n {\"headline\": \"Strong Q3 earnings reported\", \"sentiment\": \"positive\"},\n {\"headline\": \"New product launch announced\", \"sentiment\": \"positive\"}\n ]\n \n async def send_price_alert(symbol: str, threshold: float, current_price: float) -> None:\n \"\"\"BACKGROUND TASK: Set up price monitoring.\"\"\"\n print(f\"\ud83d\udce8 Alert set: {symbol} @ ${threshold} (current: ${current_price})\")\n # Your alert system integration - runs independently\n \n async def log_trading_interest(symbol: str, user_intent: str) -> None:\n \"\"\"BACKGROUND TASK: Log user trading interest.\"\"\"\n # Your analytics system - fire and forget\n print(f\"\ud83d\udcca Logged interest: {user_intent} for {symbol}\")\n \n # Multi-function streaming setup\n llm_input = ILLMInput(\n system_prompt=\"\"\"You are an expert financial advisor with access to real-time market data.\n Provide comprehensive analysis using current stock prices, news, and market sentiment.\n Set up alerts when users express interest in price monitoring.\"\"\",\n user_message=\"What's Apple's current situation? I'm thinking about buying and want to be alerted if it drops below $140\",\n regular_functions={\n \"get_stock_price\": get_stock_price,\n \"get_market_news\": get_market_news\n },\n background_tasks={\n \"send_price_alert\": send_price_alert,\n \"log_trading_interest\": log_trading_interest\n }\n )\n \n print(\"\ud83d\udcca Financial Advisor:\")\n \n # Stream with real-time function execution\n async for chunk in llm.stream(llm_input):\n if chunk.get(\"llm_response\"):\n print(chunk[\"llm_response\"], end=\"\", flush=True)\n \n # Functions execute immediately as LLM calls them\n # Regular functions return results to continue conversation\n # Background tasks run independently without blocking\n \n print(\"\\n\\n\u2705 Analysis complete with active monitoring set up!\")\n\n# Multi-modal capabilities (Gemini)\nasync def image_analysis_stream():\n \"\"\"Analyze images with streaming responses.\"\"\"\n \n gemini_config = ILLMConfig(model=\"gemini-1.5-pro-vision\")\n gemini = GeminiClient(gemini_config)\n \n llm_input = ILLMInput(\n system_prompt=\"You are an expert image analyst. Provide detailed, structured analysis.\",\n user_message=\"Analyze this product image for e-commerce listing\",\n images=[\"path/to/product-image.jpg\"], # Multi-modal input\n regular_functions={\n \"generate_seo_tags\": lambda description: [\"electronics\", \"gadget\", \"premium\"],\n \"suggest_price_range\": lambda category, features: {\"min\": 99, \"max\": 149}\n }\n )\n \n print(\"\ud83d\uddbc\ufe0f Product Analysis:\")\n \n async for chunk in gemini.stream(llm_input):\n if chunk.get(\"llm_response\"):\n print(chunk[\"llm_response\"], end=\"\", flush=True)\n \n print(\"\\n\\n\u2705 Product analysis complete!\")\n\n# Usage\nif __name__ == \"__main__\":\n import asyncio\n asyncio.run(financial_advisor_stream())\n```\n\n## Memory Management\n\n```python\nfrom arshai.memory.working_memory.redis_memory_manager import RedisMemoryManager\nfrom arshai.memory.working_memory.in_memory_manager import InMemoryManager\n\n# You choose the memory implementation\ndef create_memory(env: str) -> IMemoryManager:\n if env == \"production\":\n return RedisMemoryManager(\n redis_url=os.getenv(\"REDIS_URL\"),\n ttl=3600,\n key_prefix=\"prod_app\"\n )\n else:\n return InMemoryManager(ttl=1800) # Development\n\n# You integrate it with agents\nmemory = create_memory(\"production\")\nagent = ConversationAgent(llm, memory, \"You remember our conversation\")\n```\n\n## Production Deployment\n\n### Performance Optimizations\n\n```python\n# Configure performance settings via environment variables\nimport os\n\n# HTTP connection pooling\nos.environ['ARSHAI_MAX_CONNECTIONS'] = '100'\nos.environ['ARSHAI_MAX_CONNECTIONS_PER_HOST'] = '10'\n\n# Thread pool management \nos.environ['ARSHAI_MAX_THREADS'] = '32'\n\n# Memory management\nos.environ['ARSHAI_MAX_MEMORY_MB'] = '2048'\nos.environ['ARSHAI_CLEANUP_INTERVAL'] = '300'\n\n# Your production setup benefits automatically\nllm = OpenAIClient(config) # Uses connection pooling\nagent = CustomAgent(llm) # Uses managed threads\n```\n\n### Docker Deployment\n\n```dockerfile\nFROM python:3.11-slim\n\nWORKDIR /app\nCOPY requirements.txt .\nRUN pip install -r requirements.txt\n\n# Performance configuration\nENV ARSHAI_MAX_CONNECTIONS=100\nENV ARSHAI_MAX_CONNECTIONS_PER_HOST=20\nENV ARSHAI_MAX_THREADS=32\n\nCOPY . .\nCMD [\"python\", \"main.py\"]\n```\n\n### Observability\n\n```python\nfrom arshai.observability import ObservabilityConfig\n\n# Configure monitoring (optional - your choice)\nobs_config = ObservabilityConfig.from_yaml(\"monitoring.yaml\")\nclient = LLMFactory.create_with_observability(\n provider=\"openai\",\n config=llm_config,\n observability_config=obs_config\n)\n\n# Automatic metrics collection:\n# \u2705 llm_time_to_first_token_seconds\n# \u2705 llm_time_to_last_token_seconds \n# \u2705 llm_completion_tokens\n# \u2705 llm_requests_total\n```\n\n## Key Interfaces\n\n### LLM Provider Interface\n\n```python\nfrom arshai.core.interfaces.illm import ILLM, ILLMConfig, ILLMInput\n\n# All LLM providers implement this interface\nclass CustomLLMProvider(ILLM):\n def __init__(self, config: ILLMConfig):\n self.config = config\n \n async def chat(self, input: ILLMInput) -> Dict[str, Any]:\n # Your implementation\n pass\n \n async def stream(self, input: ILLMInput) -> AsyncGenerator:\n # Your streaming implementation \n pass\n```\n\n### Agent Interface\n\n```python\nfrom arshai.core.interfaces.iagent import IAgent, IAgentInput\n\nclass MyCustomAgent(IAgent):\n def __init__(self, llm_client: ILLM, **kwargs):\n self.llm_client = llm_client\n # You control initialization\n \n async def process(self, input: IAgentInput) -> Any:\n # You control the processing logic\n # Return anything - string, dict, object, generator\n pass\n```\n\n## Testing\n\n```python\nimport pytest\nfrom unittest.mock import AsyncMock\n\nasync def test_my_agent():\n # Easy testing with dependency injection\n mock_llm = AsyncMock()\n mock_llm.chat.return_value = {\n \"llm_response\": \"Test response\",\n \"usage\": {\"total_tokens\": 50}\n }\n \n # Your agent with mocked dependencies\n agent = MyAgent(mock_llm, \"Test prompt\")\n \n result = await agent.process(IAgentInput(message=\"Test\"))\n \n assert result == expected_result\n mock_llm.chat.assert_called_once()\n```\n\n## Extension and Customization\n\n### Add New LLM Provider\n\n```python\nclass MyLLMProvider(ILLM):\n \"\"\"Your custom LLM provider.\"\"\"\n \n def __init__(self, config: ILLMConfig):\n self.config = config\n # Your initialization\n \n async def chat(self, input: ILLMInput) -> Dict[str, Any]:\n # Your provider implementation\n return {\n \"llm_response\": \"Your response\",\n \"usage\": {\"total_tokens\": 100}\n }\n```\n\n### Create Custom Agent Types\n\n```python\nclass DomainSpecificAgent(BaseAgent):\n \"\"\"Agent specialized for your domain.\"\"\"\n \n def __init__(self, llm_client: ILLM, domain_tools: List[Callable]):\n super().__init__(llm_client, \"Domain-specific system prompt\")\n self.domain_tools = {tool.__name__: tool for tool in domain_tools}\n \n async def process(self, input: IAgentInput) -> Dict[str, Any]:\n # Your domain-specific processing\n llm_input = ILLMInput(\n system_prompt=self.system_prompt,\n user_message=input.message,\n regular_functions=self.domain_tools\n )\n \n result = await self.llm_client.chat(llm_input)\n return self._format_domain_response(result)\n## Observability and Monitoring\n\nArshai provides enterprise-grade observability for production AI systems with comprehensive OpenTelemetry integration and zero-fallback monitoring.\n\n### \ud83c\udfaf Key Features\n\n- **Zero-Fallback Monitoring**: Always capture the 4 critical LLM metrics without any fallback mechanisms\n- **OpenTelemetry Native**: Export to any OTLP-compatible backend (Jaeger, Prometheus, Datadog, New Relic)\n- **Phoenix AI Observability**: Advanced LLM interaction monitoring with comprehensive input/output tracing\n- **Automatic Factory Integration**: Zero-code observability through intelligent factory wrapping\n- **Real-time Input/Output Capture**: Automatic capture of prompts, responses, and usage metrics\n- **LLM Usage Data Integration**: Accurate token counting from LLM response usage data\n- **Streaming Observability**: Real-time token-level timing for streaming responses\n- **Non-Intrusive Design**: Zero side effects on LLM calls with graceful degradation\n- **Production-Ready**: Comprehensive configuration, privacy controls, and performance optimization\n\n### \ud83d\udcca The Four Key Metrics\n\nEvery LLM interaction automatically captures these critical performance metrics:\n\n| Metric | Description | Use Case |\n|--------|-------------|----------|\n| `llm_time_to_first_token_seconds` | Latency from request start to first token | User experience, response time SLAs |\n| `llm_time_to_last_token_seconds` | Total response generation time | End-to-end performance monitoring |\n| `llm_duration_first_to_last_token_seconds` | Token generation duration | Throughput analysis, model performance |\n| `llm_completion_tokens` | Accurate completion token count | Cost tracking, usage monitoring |\n\n### \ud83c\udfd7\ufe0f Architecture\n\n```mermaid\ngraph TD\n A[LLM Calls] --> B[Observable Factory]\n B --> C[Observability Manager]\n C --> D[Metrics Collector]\n C --> E[Trace Exporter]\n C --> F[Phoenix Client]\n D --> G[OpenTelemetry Collector]\n E --> G\n F --> H[Phoenix AI Platform]\n G --> I[Jaeger/Zipkin]\n G --> J[Prometheus]\n G --> K[DataDog/New Relic]\n J --> L[Grafana Dashboards]\n \n subgraph \"Automatic Capture\"\n M[Input Messages]\n N[Output Responses]\n O[Token Usage]\n P[Timing Data]\n end\n \n B --> M\n B --> N\n B --> O\n B --> P\n```\n\n### \ud83d\ude80 Quick Setup\n\n#### 1. Basic Configuration\n\n```yaml\n# config.yaml\nobservability:\n service_name: \"my-ai-service\"\n track_token_timing: true\n otlp_endpoint: \"http://localhost:4317\"\n \n # Phoenix AI Observability\n phoenix_enabled: true\n phoenix_endpoint: \"http://localhost:6006\"\n \n # Privacy and data capture\n log_prompts: true\n log_responses: true\n```\n\n#### 2. Automatic Factory Integration\n\n```python\nfrom arshai.config.settings import Settings\nfrom arshai.core.interfaces.illm import ILLMInput\n\n# Settings automatically detects observability configuration\nsettings = Settings()\n\n# Create LLM - observability is automatically enabled if configured\nllm = settings.create_llm() \n\n# All calls are automatically instrumented with zero configuration\ninput_data = ILLMInput(\n system_prompt=\"You are a helpful assistant.\",\n user_message=\"Hello!\"\n)\n\nresponse = llm.chat_completion(input_data)\n\n# Automatic capture includes:\n# \u2705 Input messages (system prompt + user message)\n# \u2705 Output response (full LLM response)\n# \u2705 Usage metrics (prompt/completion/total tokens)\n# \u2705 Timing data (first token, last token, total duration)\n# \u2705 Invocation parameters (model, temperature, provider)\n# \u2705 Span naming (llm.chat_completion not llm.<lambda>)\n```\n\n## Complete Examples & Use Cases\n\n### Production-Ready Examples\n```bash\n# Start complete observability stack\ncd tests/e2e/observability/\ndocker-compose up -d\n\n# Access observability platforms:\n# Phoenix AI Platform: http://localhost:6006 (LLM interactions, input/output tracing)\n# Jaeger Traces: http://localhost:16686 (distributed tracing)\n# Prometheus Metrics: http://localhost:9090 (metrics and queries)\n# Grafana Dashboards: http://localhost:3000 (visualization)\n```\n\n### \ud83d\udcc8 Supported Backends\n\n#### AI Observability Platforms\n- **Phoenix AI Platform**: Advanced LLM interaction monitoring with input/output tracing\n- **Arize AI**: Enterprise LLM observability and evaluation platform\n\n#### Metrics Backends\n- **Prometheus** + Grafana\n- **DataDog** (via OTLP)\n- **New Relic** (via OTLP)\n- **AWS CloudWatch** (via OTLP)\n- **Azure Monitor** (via OTLP)\n- **Google Cloud Monitoring** (via OTLP)\n\n#### Tracing Backends\n- **Jaeger**\n- **Zipkin**\n- **DataDog APM**\n- **New Relic Distributed Tracing**\n- **AWS X-Ray**\n- **Azure Application Insights**\n\n### \ud83d\udd27 Advanced Configuration\n\n#### Provider-Specific Settings\n\n```yaml\nobservability:\n provider_configs:\n openai:\n track_token_timing: true\n anthropic:\n track_token_timing: true\n google:\n track_token_timing: false # Disable for specific providers\n```\n\n#### Privacy and Security\n\n```yaml\nobservability:\n # Privacy controls (recommended for production)\n log_prompts: false\n log_responses: false\n max_prompt_length: 1000\n max_response_length: 1000\n \n # Custom attributes (avoid sensitive data)\n custom_attributes:\n team: \"ai-platform\"\n environment: \"production\"\n```\n\n#### Performance Tuning\n\n```yaml\nobservability:\n # High-throughput settings\n metric_export_interval: 30\n trace_sampling_rate: 0.05 # 5% sampling\n max_span_attributes: 64\n \n # Async processing for better performance\n non_intrusive: true\n```\n\n### \ud83e\uddea Testing and Development\n\n#### End-to-End Test Suite\n\n```bash\n# Complete observability testing with real backends\ncd tests/e2e/observability/\nexport OPENAI_API_KEY=\"your-key\"\n./run_test.sh\n\n# Tests verify:\n# \u2705 All 4 key metrics collected\n# \u2705 OTLP export working\n# \u2705 Multiple provider support\n# \u2705 Streaming observability\n# \u2705 No side effects on LLM calls\n```\n\n#### Development Setup\n\n```yaml\n# Development configuration\nobservability:\n service_name: \"arshai-dev\"\n environment: \"development\"\n \n # Debug settings\n log_prompts: true\n log_responses: true\n trace_sampling_rate: 1.0 # 100% sampling\n \n # Local OTLP collector\n otlp_endpoint: \"http://localhost:4317\"\n```\n\n### \ud83d\udcda Production Deployment\n\n#### Docker Compose for Production\n\nThe `tests/e2e/observability/docker-compose.yml` provides a production-ready template:\n\n```bash\n# Production observability stack\ndocker-compose -f tests/e2e/observability/docker-compose.yml up -d\n\n# Includes:\n# - OpenTelemetry Collector\n# - Jaeger for distributed tracing\n# - Prometheus for metrics collection\n# - Grafana with pre-built dashboards\n```\n\n#### Kubernetes Deployment\n\n```yaml\n# Example Kubernetes configuration\napiVersion: v1\nkind: ConfigMap\nmetadata:\n name: observability-config\ndata:\n config.yaml: |\n observability:\n service_name: \"arshai-prod\"\n otlp_endpoint: \"http://otel-collector:4317\"\n track_token_timing: true\n log_prompts: false\n log_responses: false\n```\n\n### \ud83d\udd0d Monitoring and Alerting\n\n#### Key Metrics to Monitor\n\n```promql\n# Prometheus queries for monitoring\n# Average time to first token\nhistogram_quantile(0.95, rate(llm_time_to_first_token_seconds_bucket[5m]))\n\n# Token throughput\nrate(llm_completion_tokens[5m])\n\n# Error rate\nrate(llm_requests_failed[5m]) / rate(llm_requests_total[5m])\n\n# Active requests\nllm_active_requests\n```\n\n#### Grafana Dashboard\n\nPre-built dashboards available in `tests/e2e/observability/dashboards/`:\n- **LLM Performance**: Token timing metrics with percentiles\n- **Cost Tracking**: Token usage and provider distribution\n- **Error Monitoring**: Failed requests and error patterns\n- **Throughput Analysis**: Request rates and concurrent processing\n\n### \ud83d\udd17 Integration Examples\n\n#### With Existing Monitoring\n\n#### **Customer Support System**\n```python\n# examples/customer_support_system.py\nfrom arshai.llms.openai import OpenAIClient\nfrom arshai.agents.base import BaseAgent\nfrom arshai.memory.working_memory.redis_memory_manager import RedisMemoryManager\nfrom arshai.tools.web_search import SearxNGClient\nfrom arshai.vector_db.milvus_client import MilvusClient\n\nclass IntelligentCustomerSupport:\n \"\"\"Production-ready customer support with multiple agents.\"\"\"\n \n def __init__(self):\n # Shared LLM client with connection pooling\n llm_config = ILLMConfig(model=\"gpt-4o\", temperature=0.3)\n self.llm = OpenAIClient(llm_config)\n \n # Memory system for conversation continuity\n self.memory = RedisMemoryManager(\n redis_url=os.getenv(\"REDIS_URL\"),\n ttl=3600,\n key_prefix=\"support\"\n )\n \n # Knowledge base with vector search\n self.kb = KnowledgeBase(\n vector_client=MilvusClient(),\n embedding_model=\"text-embedding-3-large\"\n )\n \n # Specialized agents\n self.agents = {\n \"triage\": TriageAgent(self.llm, \"Route customer requests\"),\n \"technical\": TechnicalAgent(self.llm, self.kb),\n \"billing\": BillingAgent(self.llm, billing_api),\n \"escalation\": EscalationAgent(self.llm, human_queue)\n }\n \n async def handle_request(self, message: str, user_id: str, session_id: str) -> Dict[str, Any]:\n \"\"\"Handle customer request with full context.\"\"\"\n \n # Retrieve conversation history\n context = await self.memory.retrieve({\n \"user_id\": user_id,\n \"session_id\": session_id\n })\n \n # Triage the request\n triage_result = await self.agents[\"triage\"].classify(\n message=message,\n context=context,\n user_history=await self.get_user_history(user_id)\n )\n \n # Select appropriate agent\n agent_type = self.select_agent(triage_result)\n agent = self.agents[agent_type]\n \n # Process with full context\n result = await agent.process(\n IAgentInput(\n message=message,\n metadata={\n \"user_id\": user_id,\n \"session_id\": session_id,\n \"triage\": triage_result,\n \"context\": context\n }\n )\n )\n \n # Store updated context\n await self.memory.store({\n \"user_id\": user_id,\n \"session_id\": session_id,\n \"interaction\": {\n \"request\": message,\n \"response\": result[\"response\"],\n \"agent_used\": agent_type,\n \"confidence\": result.get(\"confidence\", 0.0),\n \"timestamp\": datetime.now().isoformat()\n }\n })\n \n return {\n \"response\": result[\"response\"],\n \"agent_used\": agent_type,\n \"confidence\": result.get(\"confidence\", 0.0),\n \"escalated\": agent_type == \"escalation\",\n \"session_id\": session_id\n }\n```\n\n#### **Content Generation Pipeline**\n```python\n# examples/content_generation_pipeline.py\nclass ContentGenerationPipeline:\n \"\"\"Multi-stage content generation with quality assurance.\"\"\"\n \n def __init__(self):\n # Different models for different tasks\n self.creative_llm = OpenAIClient(ILLMConfig(model=\"gpt-4o\", temperature=0.9))\n self.editor_llm = OpenAIClient(ILLMConfig(model=\"gpt-4o\", temperature=0.3))\n self.fact_checker_llm = GeminiClient(ILLMConfig(model=\"gemini-1.5-pro\"))\n \n # Specialized agents\n self.writer = CreativeWriterAgent(self.creative_llm)\n self.editor = EditorAgent(self.editor_llm)\n self.fact_checker = FactCheckerAgent(self.fact_checker_llm)\n self.seo_optimizer = SEOAgent(self.editor_llm)\n \n # Tools\n self.web_search = SearxNGClient()\n self.plagiarism_checker = PlagiarismTool()\n \n async def generate_article(self, topic: str, requirements: Dict[str, Any]) -> Dict[str, Any]:\n \"\"\"Generate high-quality article with multiple review stages.\"\"\"\n \n # Stage 1: Research and outline\n research_data = await self.research_topic(topic)\n outline = await self.writer.create_outline(topic, research_data, requirements)\n \n # Stage 2: Content generation\n content = await self.writer.write_content(outline, requirements)\n \n # Stage 3: Editorial review\n edited_content = await self.editor.review_and_edit(\n content, \n style_guide=requirements.get(\"style_guide\"),\n target_audience=requirements.get(\"audience\")\n )\n \n # Stage 4: Fact checking\n fact_check_result = await self.fact_checker.verify_claims(\n edited_content,\n research_sources=research_data[\"sources\"]\n )\n \n if fact_check_result[\"issues_found\"]:\n # Correct factual issues\n edited_content = await self.editor.correct_facts(\n edited_content,\n fact_check_result[\"corrections\"]\n )\n \n # Stage 5: SEO optimization\n optimized_content = await self.seo_optimizer.optimize(\n edited_content,\n target_keywords=requirements.get(\"keywords\", []),\n seo_requirements=requirements.get(\"seo\", {})\n )\n \n # Stage 6: Quality assurance\n quality_score = await self.assess_quality(optimized_content)\n \n return {\n \"title\": optimized_content[\"title\"],\n \"content\": optimized_content[\"body\"],\n \"meta_description\": optimized_content[\"meta_description\"],\n \"keywords\": optimized_content[\"keywords\"],\n \"quality_score\": quality_score,\n \"word_count\": len(optimized_content[\"body\"].split()),\n \"readability_score\": optimized_content.get(\"readability\", 0),\n \"fact_check_passed\": not fact_check_result[\"issues_found\"],\n \"seo_score\": optimized_content.get(\"seo_score\", 0)\n }\n```\n\n#### **Data Analysis Assistant**\n```python\n# examples/data_analysis_assistant.py\nclass DataAnalysisAssistant:\n \"\"\"Intelligent data analysis with visualization and insights.\"\"\"\n \n def __init__(self):\n self.llm = OpenAIClient(ILLMConfig(model=\"gpt-4o\"))\n self.code_executor = CodeExecutorTool()\n self.viz_generator = VisualizationTool()\n \n # Data analysis agent with specialized tools\n self.analyst = DataAnalystAgent(\n llm_client=self.llm,\n tools={\n \"execute_python\": self.code_executor.execute,\n \"create_visualization\": self.viz_generator.create,\n \"statistical_analysis\": self.statistical_analysis,\n \"data_profiling\": self.profile_data\n }\n )\n \n async def analyze_dataset(self, data_path: str, analysis_request: str) -> Dict[str, Any]:\n \"\"\"Perform comprehensive data analysis.\"\"\"\n \n # Load and profile the data\n data_profile = await self.profile_data(data_path)\n \n # Generate analysis plan\n analysis_plan = await self.analyst.create_analysis_plan(\n data_profile=data_profile,\n request=analysis_request\n )\n \n results = {}\n \n # Execute analysis steps\n for step in analysis_plan[\"steps\"]:\n if step[\"type\"] == \"statistical_analysis\":\n result = await self.statistical_analysis(\n data_path, \n step[\"parameters\"]\n )\n elif step[\"type\"] == \"visualization\":\n result = await self.viz_generator.create(\n data_path,\n chart_type=step[\"chart_type\"],\n parameters=step[\"parameters\"]\n )\n elif step[\"type\"] == \"custom_code\":\n result = await self.code_executor.execute(\n step[\"code\"],\n data_path=data_path\n )\n \n results[step[\"name\"]] = result\n \n # Generate insights and recommendations\n insights = await self.analyst.generate_insights(\n analysis_results=results,\n original_request=analysis_request\n )\n \n return {\n \"data_profile\": data_profile,\n \"analysis_plan\": analysis_plan,\n \"results\": results,\n \"insights\": insights,\n \"visualizations\": [r for r in results.values() if r.get(\"type\") == \"chart\"],\n \"recommendations\": insights.get(\"recommendations\", [])\n }\n```\n\n### Working Examples Directory\n\n- **[Basic Usage](examples/basic_usage.py)** - Simple agent creation and usage patterns\n- **[Advanced Agents](examples/agent_patterns.py)** - Custom agent development patterns \n- **[Multi-Agent Systems](examples/multi_agent_system.py)** - Complex system orchestration\n- **[Memory Integration](examples/memory_usage.py)** - Redis and in-memory patterns\n- **[Tool Development](examples/tool_integration.py)** - Custom tool creation\n- **[Production Deployment](examples/production_deployment.py)** - Enterprise patterns\n- **[Performance Testing](tests/performance/)** - Load testing and validation\n- **[Customer Support](examples/customer_support_system.py)** - Complete support system\n- **[Content Pipeline](examples/content_generation_pipeline.py)** - Multi-stage content creation\n- **[Data Analysis](examples/data_analysis_assistant.py)** - Intelligent data analysis\n- **[E-commerce Assistant](examples/ecommerce_assistant.py)** - Shopping and recommendations\n- **[Code Review Bot](examples/code_review_agent.py)** - Automated code analysis\n- **[Research Assistant](examples/research_assistant.py)** - Academic research support\n- **[Meeting Summarizer](examples/meeting_summarizer.py)** - Meeting analysis and action items\n\n## Comprehensive Documentation\n\n### \ud83d\ude80 Getting Started\n- **[Quick Start Guide](docs/01-getting-started/)** - Your first Arshai application in 5 minutes\n- **[Installation Guide](docs/01-getting-started/installation.md)** - Detailed setup instructions\n- **[Core Concepts](docs/01-getting-started/core-concepts.md)** - Understanding the framework\n- **[First Agent](docs/01-getting-started/first-agent.md)** - Build your first intelligent agent\n\n### \ud83e\udde0 Philosophy & Architecture\n- **[Developer Authority](docs/00-philosophy/developer-authority.md)** - Core framework philosophy\n- **[Three-Layer Architecture](docs/00-philosophy/three-layer-architecture.md)** - Architectural foundations\n- **[Design Decisions](docs/00-philosophy/design-decisions.md)** - Why we made these choices\n- **[Comparison with Other Frameworks](docs/00-philosophy/comparisons.md)** - How Arshai differs\n\n### \ud83c\udfd7\ufe0f Layer-by-Layer Guides\n\n#### **Layer 1: LLM Clients**\n- **[OpenAI Integration](docs/02-layer-guides/layer1-llm-clients/openai.md)** - Complete OpenAI setup\n- **[Azure OpenAI](docs/02-layer-guides/layer1-llm-clients/azure.md)** - Enterprise Azure deployment\n- **[Google Gemini](docs/02-layer-guides/layer1-llm-clients/gemini.md)** - Gemini Pro integration\n- **[OpenRouter](docs/02-layer-guides/layer1-llm-clients/openrouter.md)** - Access to 200+ models\n- **[Function Calling](docs/02-layer-guides/layer1-llm-clients/function-calling.md)** - Advanced tool integration\n- **[Streaming](docs/02-layer-guides/layer1-llm-clients/streaming.md)** - Real-time responses\n- **[Custom Providers](docs/02-layer-guides/layer1-llm-clients/custom-providers.md)** - Adding new LLM providers\n\n#### **Layer 2: Agents**\n- **[Agent Fundamentals](docs/02-layer-guides/layer2-agents/fundamentals.md)** - Core agent concepts\n- **[BaseAgent Guide](docs/02-layer-guides/layer2-agents/base-agent.md)** - Using the base agent class\n- **[Custom Agents](docs/02-layer-guides/layer2-agents/custom-agents.md)** - Building specialized agents\n- **[Working Memory](docs/02-layer-guides/layer2-agents/working-memory.md)** - Context-aware agents\n- **[Tool Integration](docs/02-layer-guides/layer2-agents/tools.md)** - Adding capabilities to agents\n- **[Error Handling](docs/02-layer-guides/layer2-agents/error-handling.md)** - Robust agent design\n- **[Testing Agents](docs/02-layer-guides/layer2-agents/testing.md)** - Agent testing patterns\n\n#### **Layer 3: Agentic Systems**\n- **[System Design](docs/02-layer-guides/layer3-systems/design.md)** - Architecting complex systems\n- **[Multi-Agent Coordination](docs/02-layer-guides/layer3-systems/coordination.md)** - Agent orchestration\n- **[Workflow Management](docs/02-layer-guides/layer3-systems/workflows.md)** - Process automation\n- **[State Management](docs/02-layer-guides/layer3-systems/state.md)** - System state handling\n- **[Error Recovery](docs/02-layer-guides/layer3-systems/recovery.md)** - Resilient system design\n- **[Scaling Patterns](docs/02-layer-guides/layer3-systems/scaling.md)** - High-volume architectures\n\n### \ud83d\udee0\ufe0f Development Patterns\n- **[Direct Instantiation](docs/03-patterns/direct-instantiation.md)** - Core development pattern\n- **[Dependency Injection](docs/03-patterns/dependency-injection.md)** - Managing dependencies\n- **[Configuration Management](docs/03-patterns/configuration.md)** - Environment-based config\n- **[Error Handling](docs/03-patterns/error-handling.md)** - Robust error strategies\n- **[Testing Strategies](docs/03-patterns/testing.md)** - Comprehensive testing approaches\n- **[Performance Patterns](docs/03-patterns/performance.md)** - Optimization techniques\n\n### \ud83e\uddf0 Component Guides\n\n#### **Memory Systems**\n- **[Memory Overview](docs/04-components/memory/overview.md)** - Memory system architecture\n- **[Redis Memory](docs/04-components/memory/redis.md)** - Production memory backend\n- **[In-Memory Manager](docs/04-components/memory/in-memory.md)** - Development and testing\n- **[Working Memory](docs/04-components/memory/working-memory.md)** - Conversation context\n- **[Custom Memory](docs/04-components/memory/custom.md)** - Building custom backends\n\n#### **Tools & Integrations**\n- **[Tool Development](docs/04-components/tools/development.md)** - Creating custom tools\n- **[Web Search Tools](docs/04-components/tools/web-search.md)** - SearxNG integration\n- **[Database Tools](docs/04-components/tools/database.md)** - Database connectivity\n- **[API Integration](docs/04-components/tools/api-integration.md)** - External API tools\n- **[MCP Tools](docs/04-components/tools/mcp.md)** - Model Context Protocol\n- **[Background Tasks](docs/04-components/tools/background-tasks.md)** - Fire-and-forget operations\n\n#### **Vector Databases**\n- **[Vector Database Overview](docs/04-components/vector-db/overview.md)** - Vector storage concepts\n- **[Milvus Integration](docs/04-components/vector-db/milvus.md)** - Production vector database\n- **[Hybrid Search](docs/04-components/vector-db/hybrid-search.md)** - Dense + sparse vectors\n- **[Embedding Models](docs/04-components/vector-db/embeddings.md)** - Choosing embedding models\n- **[Performance Tuning](docs/04-components/vector-db/performance.md)** - Optimization strategies\n\n#### **Document Processing**\n- **[Document Ingestion](docs/04-components/documents/ingestion.md)** - Processing various formats\n- **[Chunking Strategies](docs/04-components/documents/chunking.md)** - Content segmentation\n- **[Metadata Extraction](docs/04-components/documents/metadata.md)** - Automatic metadata detection\n- **[Batch Processing](docs/04-components/documents/batch-processing.md)** - Large-scale document handling\n\n### \ud83d\udcca Observability & Monitoring\n- **[Observability Overview](docs/05-observability/overview.md)** - Monitoring philosophy\n- **[OpenTelemetry Integration](docs/05-observability/opentelemetry.md)** - Distributed tracing\n- **[Metrics Collection](docs/05-observability/metrics.md)** - Performance monitoring\n- **[Custom Metrics](docs/05-observability/custom-metrics.md)** - Business-specific tracking\n- **[Alerting Setup](docs/05-observability/alerting.md)** - Proactive monitoring\n- **[Cost Tracking](docs/05-observability/cost-tracking.md)** - LLM cost optimization\n- **[Debugging Guide](docs/05-observability/debugging.md)** - Troubleshooting techniques\n\n### \ud83d\udd04 Migration & Upgrades\n- **[Migration Guide](docs/06-migration/)** - Upgrading from older versions\n- **[Breaking Changes](docs/06-migration/breaking-changes.md)** - Version compatibility\n- **[Legacy Support](docs/06-migration/legacy-support.md)** - Backward compatibility\n- **[Migration Tools](docs/06-migration/tools.md)** - Automated migration utilities\n\n### \ud83e\uddea Testing & Quality Assurance\n- **[Testing Philosophy](docs/07-testing/philosophy.md)** - Arshai testing approach\n- **[Unit Testing](docs/07-testing/unit-testing.md)** - Component-level testing\n- **[Integration Testing](docs/07-testing/integration-testing.md)** - System-level testing\n- **[Load Testing](docs/07-testing/load-testing.md)** - Performance validation\n- **[Mock Strategies](docs/07-testing/mocking.md)** - Testing with mocks\n- **[CI/CD Integration](docs/07-testing/cicd.md)** - Automated testing pipelines\n\n### \ud83e\udd1d Contributing & Extending\n- **[Contributing Guide](docs/08-contributing/)** - How to contribute\n- **[Code Standards](docs/08-contributing/code-standards.md)** - Style and quality guidelines\n- **[Adding LLM Providers](docs/08-contributing/llm-providers.md)** - Extending provider support\n- **[Adding Tools](docs/08-contributing/tools.md)** - Contributing new tools\n- **[Documentation](docs/08-contributing/documentation.md)** - Improving documentation\n- **[Release Process](docs/08-contributing/releases.md)** - How releases are managed\n\n### \ud83d\ude80 Deployment & Production\n- **[Production Deployment](docs/09-deployment/)** - Enterprise deployment guide\n- **[Performance Optimization](docs/09-deployment/performance-optimization.md)** - Production optimization\n- **[Docker Deployment](docs/09-deployment/docker.md)** - Containerized deployment\n- **[Kubernetes Deployment](docs/09-deployment/kubernetes.md)** - Scalable orchestration\n- **[Security Best Practices](docs/09-deployment/security.md)** - Production security\n- **[Monitoring Setup](docs/09-deployment/monitoring.md)** - Production monitoring\n- **[Backup & Recovery](docs/09-deployment/backup-recovery.md)** - Data protection\n- **[Capacity Planning](docs/09-deployment/capacity-planning.md)** - Scaling strategies\n\n### \ud83d\udcd6 API Reference\n- **[Core Interfaces](docs/10-api-reference/interfaces/)** - Framework contracts\n- **[LLM Clients API](docs/10-api-reference/llm-clients/)** - LLM provider APIs\n- **[Agents API](docs/10-api-reference/agents/)** - Agent class references\n- **[Tools API](docs/10-api-reference/tools/)** - Tool development APIs\n- **[Memory API](docs/10-api-reference/memory/)** - Memory system APIs\n- **[Configuration API](docs/10-api-reference/configuration/)** - Settings and config APIs\n\n### \ud83c\udf93 Tutorials & Learning Resources\n- **[Video Tutorials](docs/11-tutorials/videos/)** - Step-by-step video guides\n- **[Interactive Notebooks](docs/11-tutorials/notebooks/)** - Jupyter notebook tutorials\n- **[Use Case Studies](docs/11-tutorials/case-studies/)** - Real-world implementations\n- **[Best Practices](docs/11-tutorials/best-practices/)** - Proven patterns and approaches\n- **[Troubleshooting](docs/11-tutorials/troubleshooting/)** - Common issues and solutions\n\n### \ud83d\udd17 Integration Guides\n- **[FastAPI Integration](docs/12-integrations/fastapi.md)** - Web API development\n- **[Streamlit Integration](docs/12-integrations/streamlit.md)** - Rapid UI development\n- **[Discord Bot](docs/12-integrations/discord.md)** - Chatbot development\n- **[Slack Integration](docs/12-integrations/slack.md)** - Workspace automation\n- **[Database Integration](docs/12-integrations/databases.md)** - Data persistence\n- **[Cloud Services](docs/12-integrations/cloud.md)** - AWS, Azure, GCP integration\n\n## Why Choose Arshai? \ud83c\udf86\n\n### \u2705 **Complete Developer Control**\n- **No Hidden Magic**: Every component creation is explicit and visible\n- **Direct Instantiation**: You create components when and how you want\n- **Dependency Injection**: All dependencies passed through constructors\n- **Architectural Freedom**: Design systems exactly as needed\n- **Zero Lock-in**: Use framework components selectively\n- **Interface-First**: Well-defined contracts make everything testable\n\n### \u2705 **Production-Grade Performance**\n- **Tested at Scale**: Validated for 1000+ concurrent users\n- **Connection Management**: HTTP connection pooling prevents resource exhaustion\n- **Thread Pool Control**: Bounded execution prevents system deadlocks\n- **Memory Optimization**: Automatic cleanup and TTL management\n- **Async Architecture**: Non-blocking I/O throughout the system\n- **Load Balancing**: Built-in patterns for high-availability deployment\n\n### \u2705 **Enterprise-Ready Features**\n- **Multi-Provider Support**: OpenAI, Azure, Gemini, OpenRouter with unified interface\n- **Advanced Function Calling**: Parallel execution with background tasks\n- **Distributed Memory**: Redis-backed memory with automatic failover\n- **Vector Database Integration**: Milvus with hybrid search capabilities\n- **Comprehensive Monitoring**: OpenTelemetry integration with custom metrics\n- **Security**: Built-in security best practices and audit logging\n\n### \u2705 **Developer Experience Excellence**\n- **Minimal Boilerplate**: Get started quickly with sensible defaults\n- **Rich Documentation**: Comprehensive guides, examples, and API references\n- **Easy Testing**: Mock-friendly design with dependency injection\n- **Type Safety**: Full TypeScript-style type hints for Python\n- **Hot Reloading**: Development server with automatic reload\n- **Debugging Tools**: Comprehensive logging and tracing capabilities\n\n### \u2705 **Flexible & Extensible**\n- **Custom LLM Providers**: Add any LLM with standardized interface\n- **Tool Ecosystem**: Create custom tools with simple interfaces\n- **Agent Specialization**: Build domain-specific agents easily\n- **Memory Backends**: Implement custom memory systems\n- **Modular Design**: Use only the components you need\n- **Plugin Architecture**: Extend functionality without core changes\n\n### \u2705 **Cost Optimization**\n- **Token Tracking**: Detailed usage analytics across all providers\n- **Smart Caching**: Reduce redundant API calls automatically\n- **Model Selection**: Use appropriate models for different tasks\n- **Batch Processing**: Efficient processing of multiple requests\n- **Cost Alerts**: Monitor and control spending with custom alerts\n- **Usage Analytics**: Detailed cost breakdown and optimization recommendations\n\n### \u2705 **Real-World Battle Tested**\n```python\n# Production deployments successfully running:\n\n# Customer Support Systems\n# - 500+ simultaneous conversations\n# - 24/7 uptime with Redis failover\n# - Multi-language support with specialized agents\n# - Integration with Zendesk, Salesforce, custom CRMs\n\n# Content Generation Pipelines \n# - Processing 10,000+ articles daily\n# - Multi-stage review with fact-checking\n# - SEO optimization and quality scoring\n# - Integration with CMS systems and publishing workflows\n\n# Data Analysis Platforms\n# - Processing GB-scale datasets\n# - Real-time visualization generation\n# - Statistical analysis with confidence intervals\n# - Integration with Jupyter, Pandas, enterprise BI tools\n\n# E-commerce Assistants\n# - Handling 1000+ concurrent shoppers\n# - Personalized recommendations with vector search\n# - Real-time inventory and pricing integration\n# - Multi-step checkout assistance with payment processing\n```\n\n### \ud83c\udfc6 **Framework Comparison**\n\n| Feature | Arshai | LangChain | LlamaIndex | Custom Build |\n|---------|--------|-----------|------------|-------------|\n| **Developer Control** | \u2705 Complete | \u274c Limited | \u274c Framework-driven | \u2705 Complete |\n| **Production Ready** | \u2705 Enterprise | \u26a0\ufe0f Community | \u26a0\ufe0f Research | \u274c Build yourself |\n| **Performance** | \u2705 Optimized | \u274c Variable | \u274c Single-purpose | \u2753 Depends |\n| **Testing** | \u2705 Easy mocking | \u274c Complex | \u274c Framework bound | \u2705 Full control |\n| **Documentation** | \u2705 Comprehensive | \u26a0\ufe0f Partial | \u26a0\ufe0f Limited | \u274c DIY |\n| **Multi-Provider** | \u2705 Unified API | \u26a0\ufe0f Inconsistent | \u274c Limited | \u274c Manual |\n| **Observability** | \u2705 Built-in | \u274c Manual setup | \u274c Basic | \u274c Custom build |\n| **Scalability** | \u2705 1000+ users | \u26a0\ufe0f Unknown | \u26a0\ufe0f Research only | \u2753 Your effort |\n| **Time to Production** | \u2705 Days | \u26a0\ufe0f Weeks | \u26a0\ufe0f Months | \u274c Months |\n\n### \ud83d\ude80 **Success Stories**\n\n> \"We migrated from LangChain to Arshai and reduced our production issues by 80%. The explicit dependency injection made testing trivial, and the connection pooling solved our resource exhaustion problems.\" \n> \u2014 **Senior Engineer, Fortune 500 Company**\n\n> \"Arshai's three-layer architecture allowed us to build exactly what we needed without framework overhead. Our customer support system handles 500+ concurrent conversations with 99.9% uptime.\" \n> \u2014 **CTO, SaaS Startup**\n\n> \"The developer authority philosophy is game-changing. We can debug issues, optimize performance, and extend functionality without fighting the framework.\" \n> \u2014 **Lead AI Engineer, Enterprise Corp**\n\n## Community & Enterprise Support\n\n### \ud83c\udf10 **Open Source Community**\n- **Documentation**: [Complete documentation](docs/) with guides and API reference\n- **Examples**: [Production-ready examples](examples/) for every use case\n- **Issues**: [GitHub Issues](https://github.com/nimunzn/arshai/issues) for bugs and feature requests\n- **Discussions**: [GitHub Discussions](https://github.com/nimunzn/arshai/discussions) for Q&A and ideas\n- **Contributing**: [Contributing Guide](CONTRIBUTING.md) - help improve Arshai\n\n### \ud83d\udcbc **Enterprise Support Options**\n- **Priority Support**: Direct access to core maintainers\n- **Custom Development**: Tailored features for enterprise needs\n- **Architecture Review**: Expert guidance on system design\n- **Training & Consulting**: Team training and implementation support\n- **SLA Guarantees**: Production support with uptime commitments\n- **Security Reviews**: Enterprise security audits and compliance\n\n### \ud83d\udcc8 **Professional Services**\n- **Migration Services**: Expert migration from other frameworks\n- **Performance Optimization**: Production tuning and scaling\n- **Custom Integrations**: Connect to proprietary systems and APIs\n- **Team Training**: Comprehensive developer training programs\n- **Architecture Design**: System design review and recommendations\n- **24/7 Support**: Round-the-clock production support\n\n**Contact**: [enterprise@arshai.dev](mailto:enterprise@arshai.dev)\n\n## License & Legal\n\n**MIT License** - see [LICENSE](LICENSE) file for complete details.\n\n### \ud83d\udccb **What This Means For You**\n- \u2705 **Commercial Use**: Use Arshai in commercial products\n- \u2705 **Modification**: Modify the code for your needs\n- \u2705 **Distribution**: Distribute your applications using Arshai\n- \u2705 **Private Use**: Use in private/internal projects\n- \u2705 **Patent Grant**: Protection against patent litigation\n\n### \ud83d\udd12 **Enterprise Considerations**\n- **No Copyleft**: MIT license is business-friendly\n- **No Vendor Lock-in**: Open source prevents dependency risks\n- **Enterprise Support**: Commercial support available\n- **Compliance**: Compatible with most enterprise license policies\n- **Auditable Code**: Full source code transparency\n\n### \ud83d\udd0d **Third-Party Licenses**\nArshai includes components under compatible licenses:\n- **Apache 2.0**: Various dependencies (compatible)\n- **BSD**: Some utility libraries (compatible)\n- **MIT**: Core dependencies (same license)\n\nAll dependencies are carefully vetted for license compatibility and security.\n\n---\n\n## Ready to Build? \ud83d\ude80\n\n```bash\n# Install Arshai and get started in 5 minutes\npip install arshai[all]\n\n# Create your first intelligent agent\ntouch my_first_agent.py\n```\n\n```python\n# my_first_agent.py\nfrom arshai.llms.openai import OpenAIClient\nfrom arshai.agents.base import BaseAgent\nfrom arshai.core.interfaces.illm import ILLMConfig\nfrom arshai.core.interfaces.iagent import IAgentInput\n\n# You create the LLM client\nllm_config = ILLMConfig(model=\"gpt-4o-mini\", temperature=0.7)\nllm_client = OpenAIClient(llm_config)\n\n# You create the agent \nagent = BaseAgent(llm_client, \"You are a helpful coding assistant\")\n\n# You control the interaction\nresponse = await agent.process(IAgentInput(\n message=\"How do I handle async/await in Python?\"\n))\n\nprint(response)\n```\n\n### Next Steps\n1. **[Quick Start Guide](docs/01-getting-started/)** - Build your first application\n2. **[Examples Directory](examples/)** - Explore production-ready examples\n3. **[Community Discord](https://discord.gg/arshai)** - Get help and share ideas\n4. **[GitHub Discussions](https://github.com/nimunzn/arshai/discussions)** - Technical discussions\n\n### Join the Community\n- \ud83d\udcac **[Discord Server](https://discord.gg/arshai)** - Real-time help and discussions\n- \ud83d\udce7 **[Newsletter](https://arshai.dev/newsletter)** - Updates and best practices\n- \ud83d\udc26 **[Twitter/X](https://twitter.com/arshaidev)** - News and quick tips\n- \ud83d\udcfa **[YouTube](https://youtube.com/@arshaidev)** - Video tutorials and demos\n- \ud83d\udcdd **[Blog](https://arshai.dev/blog)** - In-depth articles and case studies\n\n---\n\n**Build AI applications your way.** \ud83c\udf86 \n\nArshai provides enterprise-grade building blocks. You architect the solution.",
"bugtrack_url": null,
"license": "MIT",
"summary": "A powerful agent framework for building conversational AI systems",
"version": "1.5.4",
"project_urls": {
"Documentation": "https://arshai.readthedocs.io",
"Homepage": "https://github.com/nimunzn/arshai",
"Repository": "https://github.com/nimunzn/arshai"
},
"split_keywords": [
"ai",
" agents",
" llm",
" workflow",
" rag",
" conversational-ai",
" multi-agent",
" vector-db",
" embeddings"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "6364bd4edf7e097f28d3a39085e0361fbea0b16dbc68faf089caaa08158d84ac",
"md5": "74b203b20fa22c8a3b7b425f2dab70d6",
"sha256": "2475d789d9bdec107e7d354fdf3f0cfcbe79cf8906c00f08bc95b0d02d3705ee"
},
"downloads": -1,
"filename": "arshai-1.5.4-py3-none-any.whl",
"has_sig": false,
"md5_digest": "74b203b20fa22c8a3b7b425f2dab70d6",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<3.13,>=3.11",
"size": 257083,
"upload_time": "2025-09-07T16:18:21",
"upload_time_iso_8601": "2025-09-07T16:18:21.819187Z",
"url": "https://files.pythonhosted.org/packages/63/64/bd4edf7e097f28d3a39085e0361fbea0b16dbc68faf089caaa08158d84ac/arshai-1.5.4-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "fdad2f0422e90a9410dedc6a8a8502b20f4b79190097516e26d2e5fb78e8c64a",
"md5": "20e2e5d9d5f6c6021b0c595d4bf0da4e",
"sha256": "6f75c2342fd316c9029ae2e77c6cb3f04f293d87ae8f4b796723b3e74313ee79"
},
"downloads": -1,
"filename": "arshai-1.5.4.tar.gz",
"has_sig": false,
"md5_digest": "20e2e5d9d5f6c6021b0c595d4bf0da4e",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<3.13,>=3.11",
"size": 223276,
"upload_time": "2025-09-07T16:18:23",
"upload_time_iso_8601": "2025-09-07T16:18:23.053811Z",
"url": "https://files.pythonhosted.org/packages/fd/ad/2f0422e90a9410dedc6a8a8502b20f4b79190097516e26d2e5fb78e8c64a/arshai-1.5.4.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-09-07 16:18:23",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "nimunzn",
"github_project": "arshai",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "arshai"
}