# CHUK Tool Processor - Architectural Analysis
## Overview
The CHUK Tool Processor is a sophisticated async-native framework for registering, discovering, and executing tools referenced in LLM responses. Built from the ground up for production use with comprehensive error handling, monitoring, and scalability features. It features a modular architecture with multiple transport mechanisms, execution strategies, and comprehensive tooling for production use.
## Quick Start Example
```python
import asyncio
from chuk_tool_processor import ToolProcessor, register_tool, initialize
# 1. Create a tool
@register_tool(name="calculator", description="Perform basic math operations")
class Calculator:
async def execute(self, operation: str, a: float, b: float) -> dict:
operations = {
"add": a + b,
"subtract": a - b,
"multiply": a * b,
"divide": a / b if b != 0 else None
}
if operation not in operations:
raise ValueError(f"Unknown operation: {operation}")
result = operations[operation]
if result is None:
raise ValueError("Cannot divide by zero")
return {"operation": operation, "operands": [a, b], "result": result}
async def main():
# 2. Initialize the system
await initialize()
# 3. Process LLM output containing tool calls
processor = ToolProcessor()
results = await processor.process('''
<tool name="calculator" args='{"operation": "multiply", "a": 15, "b": 23}'/>
''')
# 4. Handle results
for result in results:
if result.error:
print(f"❌ Tool '{result.tool}' failed: {result.error}")
else:
print(f"✅ Tool '{result.tool}' result: {result.result}")
asyncio.run(main())
```
## Key Features & Benefits
- **🔄 Async-Native**: Built for `async/await` from the ground up for optimal performance
- **🛡️ Production Ready**: Comprehensive error handling, timeouts, retries, and monitoring
- **📦 Multiple Execution Strategies**: In-process for speed, subprocess for isolation
- **🚀 High Performance**: Built-in caching, rate limiting, and concurrency control
- **📊 Observability**: Structured logging, metrics collection, and request tracing
- **🔗 MCP Integration**: Full Model Context Protocol support (STDIO, SSE, HTTP Streamable)
- **📡 Streaming Support**: Real-time incremental results for long-running operations
- **🔧 Extensible Architecture**: Plugin system for custom parsers and execution strategies
- **🎯 Multiple Input Formats**: XML tags, OpenAI tool_calls, JSON, function_call formats
- **⚡ Zero-Config Start**: Works out of the box with sensible defaults
## Core Architecture
### Installation & Setup
```bash
# From source (recommended for development)
git clone https://github.com/chrishayuk/chuk-tool-processor.git
cd chuk-tool-processor
pip install -e .
# Or install from PyPI (when available)
pip install chuk-tool-processor
```
### Environment Configuration
```bash
# Optional: Registry provider (default: memory)
export CHUK_TOOL_REGISTRY_PROVIDER=memory
# Optional: Default timeout (default: 30.0)
export CHUK_DEFAULT_TIMEOUT=30.0
# Optional: Enable structured JSON logging
export CHUK_STRUCTURED_LOGGING=true
# MCP Integration (if using external MCP servers)
export MCP_BEARER_TOKEN=your_bearer_token_here
```
### 1. Registry System
- **Interface-driven**: `ToolRegistryInterface` protocol defines the contract
- **Async-native**: All registry operations are async
- **Namespace support**: Tools are organized into namespaces (default: "default")
- **Metadata tracking**: Rich metadata with `ToolMetadata` model
- **Provider pattern**: `ToolRegistryProvider` for singleton management
```python
# Example registry usage
registry = await ToolRegistryProvider.get_registry()
await registry.register_tool(MyTool(), name="my_tool", namespace="custom")
tool = await registry.get_tool("my_tool", "custom")
```
### 2. Tool Development Patterns
#### Simple Function-Based Tools
```python
from chuk_tool_processor.registry.auto_register import register_fn_tool
async def get_current_time(timezone: str = "UTC") -> str:
"""Get the current time in the specified timezone."""
from datetime import datetime
import pytz
tz = pytz.timezone(timezone)
current_time = datetime.now(tz)
return current_time.strftime("%Y-%m-%d %H:%M:%S %Z")
# Register the function as a tool
await register_fn_tool(get_current_time, namespace="utilities")
```
#### ValidatedTool (Declarative with Pydantic)
```python
@register_tool(name="weather", namespace="api")
class WeatherTool(ValidatedTool):
class Arguments(BaseModel):
location: str = Field(..., description="City name or coordinates")
units: str = Field("metric", description="Temperature units")
class Result(BaseModel):
location: str
temperature: float
conditions: str
async def _execute(self, location: str, units: str) -> Result:
# Implementation here
return self.Result(location=location, temperature=22.5, conditions="Sunny")
```
#### StreamingTool (Real-time Results)
```python
@register_tool(name="file_processor")
class FileProcessorTool(StreamingTool):
class Arguments(BaseModel):
file_path: str
operation: str = "count_lines"
class Result(BaseModel):
line_number: int
content: str
async def _stream_execute(self, file_path: str, operation: str):
"""Stream results as each line is processed."""
for i in range(1, 100):
await asyncio.sleep(0.01) # Simulate processing
yield self.Result(line_number=i, content=f"Processed line {i}")
```
### 3. Processing LLM Responses
The processor automatically detects and parses multiple input formats:
```python
processor = ToolProcessor()
# 1. XML Tool Tags (most common)
xml_response = """
<tool name="search" args='{"query": "Python programming", "limit": 5}'/>
<tool name="get_current_time" args='{"timezone": "UTC"}'/>
"""
# 2. OpenAI Chat Completions Format
openai_response = {
"tool_calls": [
{
"id": "call_123",
"type": "function",
"function": {
"name": "search",
"arguments": '{"query": "Python programming", "limit": 5}'
}
}
]
}
# 3. Direct ToolCall objects
tool_calls = [
{"tool": "search", "arguments": {"query": "Python programming", "limit": 5}},
{"tool": "get_current_time", "arguments": {"timezone": "UTC"}}
]
# Process any format
results1 = await processor.process(xml_response)
results2 = await processor.process(openai_response)
results3 = await processor.process(tool_calls)
```
### 4. Execution Strategies
#### InProcessStrategy (Default - Fast & Efficient)
- **Concurrent execution**: Uses asyncio for parallelism within the same process
- **Semaphore-based limiting**: Optional max_concurrency control
- **True streaming support**: Direct access to `stream_execute` methods
- **Enhanced tool resolution**: Namespace fallback logic with fuzzy matching
- **Proper timeout handling**: Always applies concrete timeouts
#### SubprocessStrategy (Isolation & Safety)
- **Process isolation**: Each tool runs in separate OS process for safety
- **Serialization support**: Handles complex objects and Pydantic models properly
- **Worker pool management**: Concurrent futures with automatic cleanup
- **Enhanced error handling**: Broken pool recovery and restart
- **Timeout coordination**: Safety timeouts prevent worker hangs
```python
# Configure execution strategy
from chuk_tool_processor.execution.strategies.subprocess_strategy import SubprocessStrategy
processor = ToolProcessor(
strategy=SubprocessStrategy(
registry=await get_default_registry(),
max_workers=4,
default_timeout=30.0
)
)
```
### 5. Production Features & Wrappers
#### Caching for Performance
```python
from chuk_tool_processor.execution.wrappers.caching import cacheable
@cacheable(ttl=600) # Cache for 10 minutes
@register_tool(name="expensive_api")
class ExpensiveApiTool(ValidatedTool):
# Tool implementation
pass
# Or configure at processor level
processor = ToolProcessor(
enable_caching=True,
cache_ttl=300 # 5 minutes default
)
```
#### Rate Limiting
```python
from chuk_tool_processor.execution.wrappers.rate_limiting import rate_limited
@rate_limited(limit=20, period=60.0) # 20 calls per minute
@register_tool(name="api_tool")
class ApiTool(ValidatedTool):
# Tool implementation
pass
# Or processor-level configuration
processor = ToolProcessor(
enable_rate_limiting=True,
global_rate_limit=100, # 100 requests per minute globally
tool_rate_limits={
"expensive_api": (10, 60), # 10 per minute for specific tool
}
)
```
#### Automatic Retries
```python
from chuk_tool_processor.execution.wrappers.retry import retryable
@retryable(max_retries=3, base_delay=1.0)
@register_tool(name="unreliable_api")
class UnreliableApiTool(ValidatedTool):
# Tool implementation
pass
# Processor-level retry configuration
processor = ToolProcessor(
enable_retries=True,
max_retries=3
)
```
### 6. MCP (Model Context Protocol) Integration
Connect to external tool servers using multiple transport protocols:
#### Quick MCP Setup with SSE (Server-Sent Events)
```python
from chuk_tool_processor.mcp import setup_mcp_sse
# Configure external MCP servers
servers = [
{
"name": "weather-service",
"url": "https://weather-mcp.example.com",
"api_key": "your_weather_api_key"
},
{
"name": "database-service",
"url": "https://db-mcp.example.com",
"api_key": "your_db_api_key"
}
]
# Initialize with full production configuration
processor, stream_manager = await setup_mcp_sse(
servers=servers,
namespace="mcp", # Tools available as mcp.tool_name
default_timeout=30.0,
enable_caching=True,
enable_retries=True
)
# Use external tools through MCP
results = await processor.process('''
<tool name="mcp.weather" args='{"location": "London"}'/>
<tool name="mcp.database_query" args='{"sql": "SELECT COUNT(*) FROM users"}'/>
''')
```
#### STDIO Transport (Process-based)
```python
from chuk_tool_processor.mcp import setup_mcp_stdio
# Create MCP config for local processes
mcp_config = {
"weather": {
"command": "python",
"args": ["-m", "weather_mcp_server"],
"env": {"API_KEY": "your_weather_key"}
}
}
processor, stream_manager = await setup_mcp_stdio(
config_file="mcp_config.json",
servers=["weather"],
namespace="tools"
)
```
#### Supported Transports
- **STDIO**: Process-based communication for local MCP servers
- **SSE**: Server-Sent Events for cloud-based MCP services
- **HTTP Streamable**: Modern HTTP-based transport (spec 2025-03-26)
### 7. Monitoring & Observability
#### Structured Logging
```python
from chuk_tool_processor.logging import setup_logging, get_logger, log_context_span
# Setup structured logging
await setup_logging(
level=logging.INFO,
structured=True, # JSON output for production
log_file="tool_processor.log"
)
# Use contextual logging
logger = get_logger("my_app")
async def process_user_request(user_id: str, request: str):
async with log_context_span("user_request", {"user_id": user_id}):
logger.info("Processing user request", extra={
"request_length": len(request),
"user_id": user_id
})
results = await processor.process(request)
logger.info("Request processed successfully", extra={
"num_tools": len(results),
"success_rate": sum(1 for r in results if not r.error) / len(results)
})
```
#### Automatic Metrics Collection
```python
# Metrics are automatically collected for:
# - Tool execution success/failure rates
# - Execution durations and performance
# - Cache hit/miss rates and efficiency
# - Parser performance and accuracy
# - Registry operations and health
# Access programmatic metrics
from chuk_tool_processor.logging import metrics
# Custom metrics
await metrics.log_tool_execution(
tool="custom_metric",
success=True,
duration=1.5,
cached=False,
attempts=1
)
```
### 8. Error Handling & Best Practices
#### Robust Error Handling
```python
async def robust_tool_processing(llm_response: str):
"""Example of production-ready error handling."""
processor = ToolProcessor(
default_timeout=30.0,
enable_retries=True,
max_retries=3
)
try:
results = await processor.process(llm_response, timeout=60.0)
successful_results = []
failed_results = []
for result in results:
if result.error:
failed_results.append(result)
logger.error(f"Tool {result.tool} failed: {result.error}", extra={
"tool": result.tool,
"duration": result.duration,
"attempts": getattr(result, "attempts", 1)
})
else:
successful_results.append(result)
logger.info(f"Tool {result.tool} succeeded", extra={
"tool": result.tool,
"duration": result.duration,
"cached": getattr(result, "cached", False)
})
return {
"successful": successful_results,
"failed": failed_results,
"success_rate": len(successful_results) / len(results) if results else 0
}
except Exception as e:
logger.exception("Failed to process LLM response")
raise
```
#### Testing Your Tools
```python
import pytest
from chuk_tool_processor import ToolProcessor, initialize
@pytest.mark.asyncio
async def test_calculator_tool():
await initialize()
processor = ToolProcessor()
results = await processor.process(
'<tool name="calculator" args=\'{"operation": "add", "a": 5, "b": 3}\'/>'
)
assert len(results) == 1
result = results[0]
assert result.error is None
assert result.result["result"] == 8
```
## Advanced Configuration
### Production-Ready Setup
```python
from chuk_tool_processor import ToolProcessor
from chuk_tool_processor.execution.strategies.subprocess_strategy import SubprocessStrategy
async def create_production_processor():
"""Configure processor for high-throughput production use."""
processor = ToolProcessor(
# Execution settings
default_timeout=30.0,
max_concurrency=20, # Allow 20 concurrent executions
# Use subprocess strategy for isolation
strategy=SubprocessStrategy(
registry=await get_default_registry(),
max_workers=8, # 8 worker processes
default_timeout=30.0
),
# Performance optimizations
enable_caching=True,
cache_ttl=900, # 15-minute cache
# Rate limiting to prevent abuse
enable_rate_limiting=True,
global_rate_limit=500, # 500 requests per minute globally
tool_rate_limits={
"expensive_api": (10, 60), # 10 per minute
"file_processor": (5, 60), # 5 per minute
},
# Reliability features
enable_retries=True,
max_retries=3,
# Input parsing
parser_plugins=["xml_tool", "openai_tool", "json_tool"]
)
await processor.initialize()
return processor
```
### Performance Optimization
```python
# Concurrent batch processing
async def process_batch(requests: list[str]):
"""Process multiple LLM responses concurrently."""
processor = await create_production_processor()
tasks = [processor.process(request) for request in requests]
all_results = await asyncio.gather(*tasks, return_exceptions=True)
successful = []
failed = []
for i, result in enumerate(all_results):
if isinstance(result, Exception):
failed.append({"request_index": i, "error": str(result)})
else:
successful.append({"request_index": i, "results": result})
return {"successful": successful, "failed": failed}
# Memory management for long-running applications
async def maintenance_task():
"""Periodic maintenance for production deployments."""
while True:
await asyncio.sleep(3600) # Every hour
# Clear old cache entries
if hasattr(processor.executor, 'cache'):
await processor.executor.cache.clear()
logger.info("Cache cleared for memory management")
```
## Key Design Patterns
1. **Async-First Design**: All core operations use async/await with proper timeout handling, graceful cancellation support, and comprehensive resource cleanup via context managers.
2. **Strategy Pattern**: Pluggable execution strategies (InProcess vs Subprocess), composable wrapper chains, and interface-driven design for maximum flexibility.
3. **Registry Pattern**: Centralized tool management with namespace isolation, rich metadata tracking, and lazy initialization for optimal resource usage.
4. **Plugin Architecture**: Discoverable parsers for different input formats, transport abstractions for MCP integration, and extensible validation systems.
5. **Producer-Consumer**: Queue-based streaming architecture for real-time results, with proper backpressure handling and timeout coordination.
6. **Decorator Pattern**: Composable execution wrappers (caching, retries, rate limiting) that can be stacked and configured independently.
## Configuration Reference
### Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `CHUK_TOOL_REGISTRY_PROVIDER` | `memory` | Registry backend (memory, redis, etc.) |
| `CHUK_DEFAULT_TIMEOUT` | `30.0` | Default tool execution timeout (seconds) |
| `CHUK_LOG_LEVEL` | `INFO` | Logging level (DEBUG, INFO, WARNING, ERROR) |
| `CHUK_STRUCTURED_LOGGING` | `true` | Enable JSON structured logging |
| `CHUK_MAX_CONCURRENCY` | `10` | Default max concurrent executions |
| `MCP_BEARER_TOKEN` | - | Bearer token for MCP SSE authentication |
### ToolProcessor Options
```python
processor = ToolProcessor(
# Core execution
default_timeout=30.0, # Default timeout per tool
max_concurrency=10, # Max concurrent executions
# Strategy selection
strategy=InProcessStrategy(...), # Fast, shared memory
# strategy=SubprocessStrategy(...), # Isolated, safer for untrusted code
# Performance features
enable_caching=True, # Result caching
cache_ttl=300, # Cache TTL in seconds
enable_rate_limiting=False, # Rate limiting
enable_retries=True, # Automatic retries
max_retries=3, # Max retry attempts
# Input processing
parser_plugins=["xml_tool", "openai_tool", "json_tool"]
)
```
## Why Choose CHUK Tool Processor?
### Built for Production
- **Battle-tested**: Comprehensive error handling, timeout management, and resource cleanup
- **Scalable**: Support for high-throughput concurrent execution with configurable limits
- **Observable**: Built-in structured logging, metrics collection, and request tracing
- **Reliable**: Automatic retries, circuit breakers, and graceful degradation
### Developer Experience
- **Zero-config start**: Works out of the box with sensible defaults
- **Type-safe**: Full Pydantic integration for argument and result validation
- **Multiple paradigms**: Support for functions, classes, and streaming tools
- **Flexible inputs**: Handles XML tags, OpenAI format, JSON, and direct objects
### Enterprise Ready
- **Process isolation**: Subprocess strategy for running untrusted code safely
- **Rate limiting**: Global and per-tool rate limiting with sliding window algorithm
- **Caching layer**: Intelligent caching with TTL and invalidation strategies
- **MCP integration**: Connect to external tool servers using industry standards
### Performance Optimized
- **Async-native**: Built from ground up for `async/await` with proper concurrency
- **Streaming support**: Real-time incremental results for long-running operations
- **Resource efficient**: Lazy initialization, connection pooling, and memory management
- **Configurable strategies**: Choose between speed (in-process) and safety (subprocess)
## Getting Started
### 1. Installation
```bash
# From source (recommended for development)
git clone https://github.com/chrishayuk/chuk-tool-processor.git
cd chuk-tool-processor
pip install -e .
# Or install from PyPI (when available)
pip install chuk-tool-processor
```
### 2. Quick Example
```python
import asyncio
from chuk_tool_processor import ToolProcessor, register_tool, initialize
@register_tool(name="hello")
class HelloTool:
async def execute(self, name: str) -> str:
return f"Hello, {name}!"
async def main():
await initialize()
processor = ToolProcessor()
results = await processor.process(
'<tool name="hello" args=\'{"name": "World"}\'/>'
)
print(results[0].result) # Output: Hello, World!
asyncio.run(main())
```
### 3. Next Steps
- Review the [Architecture Guide](docs/architecture.md) for deeper understanding
- Check out [Tool Development Guide](docs/tools.md) for advanced patterns
- Explore [MCP Integration](docs/mcp.md) for external tool servers
- See [Production Deployment](docs/deployment.md) for scaling considerations
## Contributing & Support
- **GitHub**: [chrishayuk/chuk-tool-processor](https://github.com/chrishayuk/chuk-tool-processor)
- **Issues**: [Report bugs and request features](https://github.com/chrishayuk/chuk-tool-processor/issues)
- **Discussions**: [Community discussions](https://github.com/chrishayuk/chuk-tool-processor/discussions)
- **License**: MIT - see [LICENSE](LICENSE) file for details
Built with ❤️ by the CHUK AI team for the LLM tool integration community.
Raw data
{
"_id": null,
"home_page": null,
"name": "chuk-tool-processor",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.11",
"maintainer_email": "CHUK Team <chrishayuk@somejunkmailbox.com>",
"keywords": "llm, tools, async, ai, openai, mcp, model-context-protocol, tool-calling, function-calling",
"author": null,
"author_email": "CHUK Team <chrishayuk@somejunkmailbox.com>",
"download_url": "https://files.pythonhosted.org/packages/0c/c8/3f8e5e445e373ed5e0587623aa7f204f430dde723a0a48323c7d4045f6ad/chuk_tool_processor-0.6.1.tar.gz",
"platform": null,
"description": "# CHUK Tool Processor - Architectural Analysis\n\n## Overview\nThe CHUK Tool Processor is a sophisticated async-native framework for registering, discovering, and executing tools referenced in LLM responses. Built from the ground up for production use with comprehensive error handling, monitoring, and scalability features. It features a modular architecture with multiple transport mechanisms, execution strategies, and comprehensive tooling for production use.\n\n## Quick Start Example\n```python\nimport asyncio\nfrom chuk_tool_processor import ToolProcessor, register_tool, initialize\n\n# 1. Create a tool\n@register_tool(name=\"calculator\", description=\"Perform basic math operations\")\nclass Calculator:\n async def execute(self, operation: str, a: float, b: float) -> dict:\n operations = {\n \"add\": a + b,\n \"subtract\": a - b,\n \"multiply\": a * b,\n \"divide\": a / b if b != 0 else None\n }\n \n if operation not in operations:\n raise ValueError(f\"Unknown operation: {operation}\")\n \n result = operations[operation]\n if result is None:\n raise ValueError(\"Cannot divide by zero\")\n \n return {\"operation\": operation, \"operands\": [a, b], \"result\": result}\n\nasync def main():\n # 2. Initialize the system\n await initialize()\n \n # 3. Process LLM output containing tool calls\n processor = ToolProcessor()\n results = await processor.process('''\n <tool name=\"calculator\" args='{\"operation\": \"multiply\", \"a\": 15, \"b\": 23}'/>\n ''')\n \n # 4. Handle results\n for result in results:\n if result.error:\n print(f\"\u274c Tool '{result.tool}' failed: {result.error}\")\n else:\n print(f\"\u2705 Tool '{result.tool}' result: {result.result}\")\n\nasyncio.run(main())\n```\n\n## Key Features & Benefits\n\n- **\ud83d\udd04 Async-Native**: Built for `async/await` from the ground up for optimal performance\n- **\ud83d\udee1\ufe0f Production Ready**: Comprehensive error handling, timeouts, retries, and monitoring\n- **\ud83d\udce6 Multiple Execution Strategies**: In-process for speed, subprocess for isolation\n- **\ud83d\ude80 High Performance**: Built-in caching, rate limiting, and concurrency control\n- **\ud83d\udcca Observability**: Structured logging, metrics collection, and request tracing\n- **\ud83d\udd17 MCP Integration**: Full Model Context Protocol support (STDIO, SSE, HTTP Streamable)\n- **\ud83d\udce1 Streaming Support**: Real-time incremental results for long-running operations\n- **\ud83d\udd27 Extensible Architecture**: Plugin system for custom parsers and execution strategies\n- **\ud83c\udfaf Multiple Input Formats**: XML tags, OpenAI tool_calls, JSON, function_call formats\n- **\u26a1 Zero-Config Start**: Works out of the box with sensible defaults\n\n## Core Architecture\n\n### Installation & Setup\n\n```bash\n# From source (recommended for development)\ngit clone https://github.com/chrishayuk/chuk-tool-processor.git\ncd chuk-tool-processor\npip install -e .\n\n# Or install from PyPI (when available)\npip install chuk-tool-processor\n```\n\n### Environment Configuration\n```bash\n# Optional: Registry provider (default: memory)\nexport CHUK_TOOL_REGISTRY_PROVIDER=memory\n\n# Optional: Default timeout (default: 30.0)\nexport CHUK_DEFAULT_TIMEOUT=30.0\n\n# Optional: Enable structured JSON logging\nexport CHUK_STRUCTURED_LOGGING=true\n\n# MCP Integration (if using external MCP servers)\nexport MCP_BEARER_TOKEN=your_bearer_token_here\n```\n### 1. Registry System\n- **Interface-driven**: `ToolRegistryInterface` protocol defines the contract\n- **Async-native**: All registry operations are async\n- **Namespace support**: Tools are organized into namespaces (default: \"default\")\n- **Metadata tracking**: Rich metadata with `ToolMetadata` model\n- **Provider pattern**: `ToolRegistryProvider` for singleton management\n\n```python\n# Example registry usage\nregistry = await ToolRegistryProvider.get_registry()\nawait registry.register_tool(MyTool(), name=\"my_tool\", namespace=\"custom\")\ntool = await registry.get_tool(\"my_tool\", \"custom\")\n```\n\n### 2. Tool Development Patterns\n\n#### Simple Function-Based Tools\n```python\nfrom chuk_tool_processor.registry.auto_register import register_fn_tool\n\nasync def get_current_time(timezone: str = \"UTC\") -> str:\n \"\"\"Get the current time in the specified timezone.\"\"\"\n from datetime import datetime\n import pytz\n \n tz = pytz.timezone(timezone)\n current_time = datetime.now(tz)\n return current_time.strftime(\"%Y-%m-%d %H:%M:%S %Z\")\n\n# Register the function as a tool\nawait register_fn_tool(get_current_time, namespace=\"utilities\")\n```\n\n#### ValidatedTool (Declarative with Pydantic)\n```python\n@register_tool(name=\"weather\", namespace=\"api\")\nclass WeatherTool(ValidatedTool):\n class Arguments(BaseModel):\n location: str = Field(..., description=\"City name or coordinates\")\n units: str = Field(\"metric\", description=\"Temperature units\")\n \n class Result(BaseModel):\n location: str\n temperature: float\n conditions: str\n \n async def _execute(self, location: str, units: str) -> Result:\n # Implementation here\n return self.Result(location=location, temperature=22.5, conditions=\"Sunny\")\n```\n\n#### StreamingTool (Real-time Results)\n```python\n@register_tool(name=\"file_processor\")\nclass FileProcessorTool(StreamingTool):\n class Arguments(BaseModel):\n file_path: str\n operation: str = \"count_lines\"\n \n class Result(BaseModel):\n line_number: int\n content: str\n \n async def _stream_execute(self, file_path: str, operation: str):\n \"\"\"Stream results as each line is processed.\"\"\"\n for i in range(1, 100):\n await asyncio.sleep(0.01) # Simulate processing\n yield self.Result(line_number=i, content=f\"Processed line {i}\")\n```\n\n### 3. Processing LLM Responses\n\nThe processor automatically detects and parses multiple input formats:\n\n```python\nprocessor = ToolProcessor()\n\n# 1. XML Tool Tags (most common)\nxml_response = \"\"\"\n<tool name=\"search\" args='{\"query\": \"Python programming\", \"limit\": 5}'/>\n<tool name=\"get_current_time\" args='{\"timezone\": \"UTC\"}'/>\n\"\"\"\n\n# 2. OpenAI Chat Completions Format\nopenai_response = {\n \"tool_calls\": [\n {\n \"id\": \"call_123\",\n \"type\": \"function\", \n \"function\": {\n \"name\": \"search\",\n \"arguments\": '{\"query\": \"Python programming\", \"limit\": 5}'\n }\n }\n ]\n}\n\n# 3. Direct ToolCall objects\ntool_calls = [\n {\"tool\": \"search\", \"arguments\": {\"query\": \"Python programming\", \"limit\": 5}},\n {\"tool\": \"get_current_time\", \"arguments\": {\"timezone\": \"UTC\"}}\n]\n\n# Process any format\nresults1 = await processor.process(xml_response)\nresults2 = await processor.process(openai_response) \nresults3 = await processor.process(tool_calls)\n```\n\n### 4. Execution Strategies\n\n#### InProcessStrategy (Default - Fast & Efficient)\n- **Concurrent execution**: Uses asyncio for parallelism within the same process\n- **Semaphore-based limiting**: Optional max_concurrency control\n- **True streaming support**: Direct access to `stream_execute` methods\n- **Enhanced tool resolution**: Namespace fallback logic with fuzzy matching\n- **Proper timeout handling**: Always applies concrete timeouts\n\n#### SubprocessStrategy (Isolation & Safety)\n- **Process isolation**: Each tool runs in separate OS process for safety\n- **Serialization support**: Handles complex objects and Pydantic models properly\n- **Worker pool management**: Concurrent futures with automatic cleanup\n- **Enhanced error handling**: Broken pool recovery and restart\n- **Timeout coordination**: Safety timeouts prevent worker hangs\n\n```python\n# Configure execution strategy\nfrom chuk_tool_processor.execution.strategies.subprocess_strategy import SubprocessStrategy\n\nprocessor = ToolProcessor(\n strategy=SubprocessStrategy(\n registry=await get_default_registry(),\n max_workers=4,\n default_timeout=30.0\n )\n)\n```\n\n### 5. Production Features & Wrappers\n\n#### Caching for Performance\n```python\nfrom chuk_tool_processor.execution.wrappers.caching import cacheable\n\n@cacheable(ttl=600) # Cache for 10 minutes\n@register_tool(name=\"expensive_api\")\nclass ExpensiveApiTool(ValidatedTool):\n # Tool implementation\n pass\n\n# Or configure at processor level\nprocessor = ToolProcessor(\n enable_caching=True,\n cache_ttl=300 # 5 minutes default\n)\n```\n\n#### Rate Limiting\n```python\nfrom chuk_tool_processor.execution.wrappers.rate_limiting import rate_limited\n\n@rate_limited(limit=20, period=60.0) # 20 calls per minute\n@register_tool(name=\"api_tool\")\nclass ApiTool(ValidatedTool):\n # Tool implementation \n pass\n\n# Or processor-level configuration\nprocessor = ToolProcessor(\n enable_rate_limiting=True,\n global_rate_limit=100, # 100 requests per minute globally\n tool_rate_limits={\n \"expensive_api\": (10, 60), # 10 per minute for specific tool\n }\n)\n```\n\n#### Automatic Retries\n```python\nfrom chuk_tool_processor.execution.wrappers.retry import retryable\n\n@retryable(max_retries=3, base_delay=1.0)\n@register_tool(name=\"unreliable_api\")\nclass UnreliableApiTool(ValidatedTool):\n # Tool implementation\n pass\n\n# Processor-level retry configuration\nprocessor = ToolProcessor(\n enable_retries=True,\n max_retries=3\n)\n```\n\n### 6. MCP (Model Context Protocol) Integration\n\nConnect to external tool servers using multiple transport protocols:\n\n#### Quick MCP Setup with SSE (Server-Sent Events)\n```python\nfrom chuk_tool_processor.mcp import setup_mcp_sse\n\n# Configure external MCP servers\nservers = [\n {\n \"name\": \"weather-service\",\n \"url\": \"https://weather-mcp.example.com\",\n \"api_key\": \"your_weather_api_key\"\n },\n {\n \"name\": \"database-service\", \n \"url\": \"https://db-mcp.example.com\",\n \"api_key\": \"your_db_api_key\"\n }\n]\n\n# Initialize with full production configuration\nprocessor, stream_manager = await setup_mcp_sse(\n servers=servers,\n namespace=\"mcp\", # Tools available as mcp.tool_name\n default_timeout=30.0,\n enable_caching=True,\n enable_retries=True\n)\n\n# Use external tools through MCP\nresults = await processor.process('''\n<tool name=\"mcp.weather\" args='{\"location\": \"London\"}'/>\n<tool name=\"mcp.database_query\" args='{\"sql\": \"SELECT COUNT(*) FROM users\"}'/>\n''')\n```\n\n#### STDIO Transport (Process-based)\n```python\nfrom chuk_tool_processor.mcp import setup_mcp_stdio\n\n# Create MCP config for local processes\nmcp_config = {\n \"weather\": {\n \"command\": \"python\", \n \"args\": [\"-m\", \"weather_mcp_server\"],\n \"env\": {\"API_KEY\": \"your_weather_key\"}\n }\n}\n\nprocessor, stream_manager = await setup_mcp_stdio(\n config_file=\"mcp_config.json\",\n servers=[\"weather\"],\n namespace=\"tools\"\n)\n```\n\n#### Supported Transports\n- **STDIO**: Process-based communication for local MCP servers\n- **SSE**: Server-Sent Events for cloud-based MCP services \n- **HTTP Streamable**: Modern HTTP-based transport (spec 2025-03-26)\n\n### 7. Monitoring & Observability\n\n#### Structured Logging\n```python\nfrom chuk_tool_processor.logging import setup_logging, get_logger, log_context_span\n\n# Setup structured logging\nawait setup_logging(\n level=logging.INFO,\n structured=True, # JSON output for production\n log_file=\"tool_processor.log\"\n)\n\n# Use contextual logging\nlogger = get_logger(\"my_app\")\n\nasync def process_user_request(user_id: str, request: str):\n async with log_context_span(\"user_request\", {\"user_id\": user_id}):\n logger.info(\"Processing user request\", extra={\n \"request_length\": len(request),\n \"user_id\": user_id\n })\n \n results = await processor.process(request)\n \n logger.info(\"Request processed successfully\", extra={\n \"num_tools\": len(results),\n \"success_rate\": sum(1 for r in results if not r.error) / len(results)\n })\n```\n\n#### Automatic Metrics Collection\n```python\n# Metrics are automatically collected for:\n# - Tool execution success/failure rates\n# - Execution durations and performance\n# - Cache hit/miss rates and efficiency \n# - Parser performance and accuracy\n# - Registry operations and health\n\n# Access programmatic metrics\nfrom chuk_tool_processor.logging import metrics\n\n# Custom metrics\nawait metrics.log_tool_execution(\n tool=\"custom_metric\",\n success=True,\n duration=1.5,\n cached=False,\n attempts=1\n)\n```\n\n### 8. Error Handling & Best Practices\n\n#### Robust Error Handling\n```python\nasync def robust_tool_processing(llm_response: str):\n \"\"\"Example of production-ready error handling.\"\"\"\n processor = ToolProcessor(\n default_timeout=30.0,\n enable_retries=True,\n max_retries=3\n )\n \n try:\n results = await processor.process(llm_response, timeout=60.0)\n \n successful_results = []\n failed_results = []\n \n for result in results:\n if result.error:\n failed_results.append(result)\n logger.error(f\"Tool {result.tool} failed: {result.error}\", extra={\n \"tool\": result.tool,\n \"duration\": result.duration,\n \"attempts\": getattr(result, \"attempts\", 1)\n })\n else:\n successful_results.append(result)\n logger.info(f\"Tool {result.tool} succeeded\", extra={\n \"tool\": result.tool,\n \"duration\": result.duration,\n \"cached\": getattr(result, \"cached\", False)\n })\n \n return {\n \"successful\": successful_results,\n \"failed\": failed_results,\n \"success_rate\": len(successful_results) / len(results) if results else 0\n }\n \n except Exception as e:\n logger.exception(\"Failed to process LLM response\")\n raise\n```\n\n#### Testing Your Tools\n```python\nimport pytest\nfrom chuk_tool_processor import ToolProcessor, initialize\n\n@pytest.mark.asyncio\nasync def test_calculator_tool():\n await initialize()\n processor = ToolProcessor()\n \n results = await processor.process(\n '<tool name=\"calculator\" args=\\'{\"operation\": \"add\", \"a\": 5, \"b\": 3}\\'/>'\n )\n \n assert len(results) == 1\n result = results[0]\n assert result.error is None\n assert result.result[\"result\"] == 8\n```\n\n## Advanced Configuration\n\n### Production-Ready Setup\n```python\nfrom chuk_tool_processor import ToolProcessor\nfrom chuk_tool_processor.execution.strategies.subprocess_strategy import SubprocessStrategy\n\nasync def create_production_processor():\n \"\"\"Configure processor for high-throughput production use.\"\"\"\n \n processor = ToolProcessor(\n # Execution settings\n default_timeout=30.0,\n max_concurrency=20, # Allow 20 concurrent executions\n \n # Use subprocess strategy for isolation\n strategy=SubprocessStrategy(\n registry=await get_default_registry(),\n max_workers=8, # 8 worker processes\n default_timeout=30.0\n ),\n \n # Performance optimizations\n enable_caching=True,\n cache_ttl=900, # 15-minute cache\n \n # Rate limiting to prevent abuse\n enable_rate_limiting=True,\n global_rate_limit=500, # 500 requests per minute globally\n tool_rate_limits={\n \"expensive_api\": (10, 60), # 10 per minute\n \"file_processor\": (5, 60), # 5 per minute\n },\n \n # Reliability features\n enable_retries=True,\n max_retries=3,\n \n # Input parsing\n parser_plugins=[\"xml_tool\", \"openai_tool\", \"json_tool\"]\n )\n \n await processor.initialize()\n return processor\n```\n\n### Performance Optimization\n```python\n# Concurrent batch processing\nasync def process_batch(requests: list[str]):\n \"\"\"Process multiple LLM responses concurrently.\"\"\"\n processor = await create_production_processor()\n \n tasks = [processor.process(request) for request in requests]\n all_results = await asyncio.gather(*tasks, return_exceptions=True)\n \n successful = []\n failed = []\n \n for i, result in enumerate(all_results):\n if isinstance(result, Exception):\n failed.append({\"request_index\": i, \"error\": str(result)})\n else:\n successful.append({\"request_index\": i, \"results\": result})\n \n return {\"successful\": successful, \"failed\": failed}\n\n# Memory management for long-running applications\nasync def maintenance_task():\n \"\"\"Periodic maintenance for production deployments.\"\"\"\n while True:\n await asyncio.sleep(3600) # Every hour\n \n # Clear old cache entries\n if hasattr(processor.executor, 'cache'):\n await processor.executor.cache.clear()\n logger.info(\"Cache cleared for memory management\")\n```\n\n## Key Design Patterns\n\n1. **Async-First Design**: All core operations use async/await with proper timeout handling, graceful cancellation support, and comprehensive resource cleanup via context managers.\n\n2. **Strategy Pattern**: Pluggable execution strategies (InProcess vs Subprocess), composable wrapper chains, and interface-driven design for maximum flexibility.\n\n3. **Registry Pattern**: Centralized tool management with namespace isolation, rich metadata tracking, and lazy initialization for optimal resource usage.\n\n4. **Plugin Architecture**: Discoverable parsers for different input formats, transport abstractions for MCP integration, and extensible validation systems.\n\n5. **Producer-Consumer**: Queue-based streaming architecture for real-time results, with proper backpressure handling and timeout coordination.\n\n6. **Decorator Pattern**: Composable execution wrappers (caching, retries, rate limiting) that can be stacked and configured independently.\n\n## Configuration Reference\n\n### Environment Variables\n| Variable | Default | Description |\n|----------|---------|-------------|\n| `CHUK_TOOL_REGISTRY_PROVIDER` | `memory` | Registry backend (memory, redis, etc.) |\n| `CHUK_DEFAULT_TIMEOUT` | `30.0` | Default tool execution timeout (seconds) |\n| `CHUK_LOG_LEVEL` | `INFO` | Logging level (DEBUG, INFO, WARNING, ERROR) |\n| `CHUK_STRUCTURED_LOGGING` | `true` | Enable JSON structured logging |\n| `CHUK_MAX_CONCURRENCY` | `10` | Default max concurrent executions |\n| `MCP_BEARER_TOKEN` | - | Bearer token for MCP SSE authentication |\n\n### ToolProcessor Options\n```python\nprocessor = ToolProcessor(\n # Core execution\n default_timeout=30.0, # Default timeout per tool\n max_concurrency=10, # Max concurrent executions\n \n # Strategy selection\n strategy=InProcessStrategy(...), # Fast, shared memory\n # strategy=SubprocessStrategy(...), # Isolated, safer for untrusted code\n \n # Performance features\n enable_caching=True, # Result caching\n cache_ttl=300, # Cache TTL in seconds\n enable_rate_limiting=False, # Rate limiting\n enable_retries=True, # Automatic retries\n max_retries=3, # Max retry attempts\n \n # Input processing\n parser_plugins=[\"xml_tool\", \"openai_tool\", \"json_tool\"]\n)\n```\n\n## Why Choose CHUK Tool Processor?\n\n### Built for Production\n- **Battle-tested**: Comprehensive error handling, timeout management, and resource cleanup\n- **Scalable**: Support for high-throughput concurrent execution with configurable limits\n- **Observable**: Built-in structured logging, metrics collection, and request tracing\n- **Reliable**: Automatic retries, circuit breakers, and graceful degradation\n\n### Developer Experience\n- **Zero-config start**: Works out of the box with sensible defaults\n- **Type-safe**: Full Pydantic integration for argument and result validation \n- **Multiple paradigms**: Support for functions, classes, and streaming tools\n- **Flexible inputs**: Handles XML tags, OpenAI format, JSON, and direct objects\n\n### Enterprise Ready\n- **Process isolation**: Subprocess strategy for running untrusted code safely\n- **Rate limiting**: Global and per-tool rate limiting with sliding window algorithm\n- **Caching layer**: Intelligent caching with TTL and invalidation strategies \n- **MCP integration**: Connect to external tool servers using industry standards\n\n### Performance Optimized\n- **Async-native**: Built from ground up for `async/await` with proper concurrency\n- **Streaming support**: Real-time incremental results for long-running operations\n- **Resource efficient**: Lazy initialization, connection pooling, and memory management\n- **Configurable strategies**: Choose between speed (in-process) and safety (subprocess)\n\n## Getting Started\n\n### 1. Installation\n```bash\n# From source (recommended for development)\ngit clone https://github.com/chrishayuk/chuk-tool-processor.git\ncd chuk-tool-processor\npip install -e .\n\n# Or install from PyPI (when available)\npip install chuk-tool-processor\n```\n\n### 2. Quick Example\n```python\nimport asyncio\nfrom chuk_tool_processor import ToolProcessor, register_tool, initialize\n\n@register_tool(name=\"hello\")\nclass HelloTool:\n async def execute(self, name: str) -> str:\n return f\"Hello, {name}!\"\n\nasync def main():\n await initialize()\n processor = ToolProcessor()\n \n results = await processor.process(\n '<tool name=\"hello\" args=\\'{\"name\": \"World\"}\\'/>'\n )\n \n print(results[0].result) # Output: Hello, World!\n\nasyncio.run(main())\n```\n\n### 3. Next Steps\n- Review the [Architecture Guide](docs/architecture.md) for deeper understanding\n- Check out [Tool Development Guide](docs/tools.md) for advanced patterns\n- Explore [MCP Integration](docs/mcp.md) for external tool servers\n- See [Production Deployment](docs/deployment.md) for scaling considerations\n\n## Contributing & Support\n\n- **GitHub**: [chrishayuk/chuk-tool-processor](https://github.com/chrishayuk/chuk-tool-processor)\n- **Issues**: [Report bugs and request features](https://github.com/chrishayuk/chuk-tool-processor/issues)\n- **Discussions**: [Community discussions](https://github.com/chrishayuk/chuk-tool-processor/discussions)\n- **License**: MIT - see [LICENSE](LICENSE) file for details\n\nBuilt with \u2764\ufe0f by the CHUK AI team for the LLM tool integration community.\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Async-native framework for registering, discovering, and executing tools referenced in LLM responses",
"version": "0.6.1",
"project_urls": null,
"split_keywords": [
"llm",
" tools",
" async",
" ai",
" openai",
" mcp",
" model-context-protocol",
" tool-calling",
" function-calling"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "b74f73baca62ee8a66053e8750037ce6e040dceea115736e27ec84b58f576962",
"md5": "1f2dc44d9260aca60d5e79bfac688796",
"sha256": "e7fbd12e0f6a86d0c65cf834168dec9f7df168cb810de0ac0084db11b1bd64a7"
},
"downloads": -1,
"filename": "chuk_tool_processor-0.6.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "1f2dc44d9260aca60d5e79bfac688796",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.11",
"size": 112663,
"upload_time": "2025-07-11T16:14:23",
"upload_time_iso_8601": "2025-07-11T16:14:23.891241Z",
"url": "https://files.pythonhosted.org/packages/b7/4f/73baca62ee8a66053e8750037ce6e040dceea115736e27ec84b58f576962/chuk_tool_processor-0.6.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "0cc83f8e5e445e373ed5e0587623aa7f204f430dde723a0a48323c7d4045f6ad",
"md5": "90a6b5651c71cc916831673d42a7e14c",
"sha256": "2fbed6f401684f0eaddc8e3b6e9aff5ac97d3ad6988c1e01d08812e5735e57c0"
},
"downloads": -1,
"filename": "chuk_tool_processor-0.6.1.tar.gz",
"has_sig": false,
"md5_digest": "90a6b5651c71cc916831673d42a7e14c",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.11",
"size": 88803,
"upload_time": "2025-07-11T16:14:25",
"upload_time_iso_8601": "2025-07-11T16:14:25.375977Z",
"url": "https://files.pythonhosted.org/packages/0c/c8/3f8e5e445e373ed5e0587623aa7f204f430dde723a0a48323c7d4045f6ad/chuk_tool_processor-0.6.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-11 16:14:25",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "chuk-tool-processor"
}