chuk-tool-processor


Namechuk-tool-processor JSON
Version 0.6.1 PyPI version JSON
download
home_pageNone
SummaryAsync-native framework for registering, discovering, and executing tools referenced in LLM responses
upload_time2025-07-11 16:14:25
maintainerNone
docs_urlNone
authorNone
requires_python>=3.11
licenseMIT
keywords llm tools async ai openai mcp model-context-protocol tool-calling function-calling
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # CHUK Tool Processor - Architectural Analysis

## Overview
The CHUK Tool Processor is a sophisticated async-native framework for registering, discovering, and executing tools referenced in LLM responses. Built from the ground up for production use with comprehensive error handling, monitoring, and scalability features. It features a modular architecture with multiple transport mechanisms, execution strategies, and comprehensive tooling for production use.

## Quick Start Example
```python
import asyncio
from chuk_tool_processor import ToolProcessor, register_tool, initialize

# 1. Create a tool
@register_tool(name="calculator", description="Perform basic math operations")
class Calculator:
    async def execute(self, operation: str, a: float, b: float) -> dict:
        operations = {
            "add": a + b,
            "subtract": a - b,
            "multiply": a * b,
            "divide": a / b if b != 0 else None
        }
        
        if operation not in operations:
            raise ValueError(f"Unknown operation: {operation}")
        
        result = operations[operation]
        if result is None:
            raise ValueError("Cannot divide by zero")
            
        return {"operation": operation, "operands": [a, b], "result": result}

async def main():
    # 2. Initialize the system
    await initialize()
    
    # 3. Process LLM output containing tool calls
    processor = ToolProcessor()
    results = await processor.process('''
        <tool name="calculator" args='{"operation": "multiply", "a": 15, "b": 23}'/>
    ''')
    
    # 4. Handle results
    for result in results:
        if result.error:
            print(f"❌ Tool '{result.tool}' failed: {result.error}")
        else:
            print(f"✅ Tool '{result.tool}' result: {result.result}")

asyncio.run(main())
```

## Key Features & Benefits

- **🔄 Async-Native**: Built for `async/await` from the ground up for optimal performance
- **🛡️ Production Ready**: Comprehensive error handling, timeouts, retries, and monitoring
- **📦 Multiple Execution Strategies**: In-process for speed, subprocess for isolation
- **🚀 High Performance**: Built-in caching, rate limiting, and concurrency control
- **📊 Observability**: Structured logging, metrics collection, and request tracing
- **🔗 MCP Integration**: Full Model Context Protocol support (STDIO, SSE, HTTP Streamable)
- **📡 Streaming Support**: Real-time incremental results for long-running operations
- **🔧 Extensible Architecture**: Plugin system for custom parsers and execution strategies
- **🎯 Multiple Input Formats**: XML tags, OpenAI tool_calls, JSON, function_call formats
- **⚡ Zero-Config Start**: Works out of the box with sensible defaults

## Core Architecture

### Installation & Setup

```bash
# From source (recommended for development)
git clone https://github.com/chrishayuk/chuk-tool-processor.git
cd chuk-tool-processor
pip install -e .

# Or install from PyPI (when available)
pip install chuk-tool-processor
```

### Environment Configuration
```bash
# Optional: Registry provider (default: memory)
export CHUK_TOOL_REGISTRY_PROVIDER=memory

# Optional: Default timeout (default: 30.0)
export CHUK_DEFAULT_TIMEOUT=30.0

# Optional: Enable structured JSON logging
export CHUK_STRUCTURED_LOGGING=true

# MCP Integration (if using external MCP servers)
export MCP_BEARER_TOKEN=your_bearer_token_here
```
### 1. Registry System
- **Interface-driven**: `ToolRegistryInterface` protocol defines the contract
- **Async-native**: All registry operations are async
- **Namespace support**: Tools are organized into namespaces (default: "default")
- **Metadata tracking**: Rich metadata with `ToolMetadata` model
- **Provider pattern**: `ToolRegistryProvider` for singleton management

```python
# Example registry usage
registry = await ToolRegistryProvider.get_registry()
await registry.register_tool(MyTool(), name="my_tool", namespace="custom")
tool = await registry.get_tool("my_tool", "custom")
```

### 2. Tool Development Patterns

#### Simple Function-Based Tools
```python
from chuk_tool_processor.registry.auto_register import register_fn_tool

async def get_current_time(timezone: str = "UTC") -> str:
    """Get the current time in the specified timezone."""
    from datetime import datetime
    import pytz
    
    tz = pytz.timezone(timezone)
    current_time = datetime.now(tz)
    return current_time.strftime("%Y-%m-%d %H:%M:%S %Z")

# Register the function as a tool
await register_fn_tool(get_current_time, namespace="utilities")
```

#### ValidatedTool (Declarative with Pydantic)
```python
@register_tool(name="weather", namespace="api")
class WeatherTool(ValidatedTool):
    class Arguments(BaseModel):
        location: str = Field(..., description="City name or coordinates")
        units: str = Field("metric", description="Temperature units")
    
    class Result(BaseModel):
        location: str
        temperature: float
        conditions: str
    
    async def _execute(self, location: str, units: str) -> Result:
        # Implementation here
        return self.Result(location=location, temperature=22.5, conditions="Sunny")
```

#### StreamingTool (Real-time Results)
```python
@register_tool(name="file_processor")
class FileProcessorTool(StreamingTool):
    class Arguments(BaseModel):
        file_path: str
        operation: str = "count_lines"
    
    class Result(BaseModel):
        line_number: int
        content: str
    
    async def _stream_execute(self, file_path: str, operation: str):
        """Stream results as each line is processed."""
        for i in range(1, 100):
            await asyncio.sleep(0.01)  # Simulate processing
            yield self.Result(line_number=i, content=f"Processed line {i}")
```

### 3. Processing LLM Responses

The processor automatically detects and parses multiple input formats:

```python
processor = ToolProcessor()

# 1. XML Tool Tags (most common)
xml_response = """
<tool name="search" args='{"query": "Python programming", "limit": 5}'/>
<tool name="get_current_time" args='{"timezone": "UTC"}'/>
"""

# 2. OpenAI Chat Completions Format
openai_response = {
    "tool_calls": [
        {
            "id": "call_123",
            "type": "function", 
            "function": {
                "name": "search",
                "arguments": '{"query": "Python programming", "limit": 5}'
            }
        }
    ]
}

# 3. Direct ToolCall objects
tool_calls = [
    {"tool": "search", "arguments": {"query": "Python programming", "limit": 5}},
    {"tool": "get_current_time", "arguments": {"timezone": "UTC"}}
]

# Process any format
results1 = await processor.process(xml_response)
results2 = await processor.process(openai_response) 
results3 = await processor.process(tool_calls)
```

### 4. Execution Strategies

#### InProcessStrategy (Default - Fast & Efficient)
- **Concurrent execution**: Uses asyncio for parallelism within the same process
- **Semaphore-based limiting**: Optional max_concurrency control
- **True streaming support**: Direct access to `stream_execute` methods
- **Enhanced tool resolution**: Namespace fallback logic with fuzzy matching
- **Proper timeout handling**: Always applies concrete timeouts

#### SubprocessStrategy (Isolation & Safety)
- **Process isolation**: Each tool runs in separate OS process for safety
- **Serialization support**: Handles complex objects and Pydantic models properly
- **Worker pool management**: Concurrent futures with automatic cleanup
- **Enhanced error handling**: Broken pool recovery and restart
- **Timeout coordination**: Safety timeouts prevent worker hangs

```python
# Configure execution strategy
from chuk_tool_processor.execution.strategies.subprocess_strategy import SubprocessStrategy

processor = ToolProcessor(
    strategy=SubprocessStrategy(
        registry=await get_default_registry(),
        max_workers=4,
        default_timeout=30.0
    )
)
```

### 5. Production Features & Wrappers

#### Caching for Performance
```python
from chuk_tool_processor.execution.wrappers.caching import cacheable

@cacheable(ttl=600)  # Cache for 10 minutes
@register_tool(name="expensive_api")
class ExpensiveApiTool(ValidatedTool):
    # Tool implementation
    pass

# Or configure at processor level
processor = ToolProcessor(
    enable_caching=True,
    cache_ttl=300  # 5 minutes default
)
```

#### Rate Limiting
```python
from chuk_tool_processor.execution.wrappers.rate_limiting import rate_limited

@rate_limited(limit=20, period=60.0)  # 20 calls per minute
@register_tool(name="api_tool")
class ApiTool(ValidatedTool):
    # Tool implementation  
    pass

# Or processor-level configuration
processor = ToolProcessor(
    enable_rate_limiting=True,
    global_rate_limit=100,  # 100 requests per minute globally
    tool_rate_limits={
        "expensive_api": (10, 60),    # 10 per minute for specific tool
    }
)
```

#### Automatic Retries
```python
from chuk_tool_processor.execution.wrappers.retry import retryable

@retryable(max_retries=3, base_delay=1.0)
@register_tool(name="unreliable_api")
class UnreliableApiTool(ValidatedTool):
    # Tool implementation
    pass

# Processor-level retry configuration
processor = ToolProcessor(
    enable_retries=True,
    max_retries=3
)
```

### 6. MCP (Model Context Protocol) Integration

Connect to external tool servers using multiple transport protocols:

#### Quick MCP Setup with SSE (Server-Sent Events)
```python
from chuk_tool_processor.mcp import setup_mcp_sse

# Configure external MCP servers
servers = [
    {
        "name": "weather-service",
        "url": "https://weather-mcp.example.com",
        "api_key": "your_weather_api_key"
    },
    {
        "name": "database-service", 
        "url": "https://db-mcp.example.com",
        "api_key": "your_db_api_key"
    }
]

# Initialize with full production configuration
processor, stream_manager = await setup_mcp_sse(
    servers=servers,
    namespace="mcp",           # Tools available as mcp.tool_name
    default_timeout=30.0,
    enable_caching=True,
    enable_retries=True
)

# Use external tools through MCP
results = await processor.process('''
<tool name="mcp.weather" args='{"location": "London"}'/>
<tool name="mcp.database_query" args='{"sql": "SELECT COUNT(*) FROM users"}'/>
''')
```

#### STDIO Transport (Process-based)
```python
from chuk_tool_processor.mcp import setup_mcp_stdio

# Create MCP config for local processes
mcp_config = {
    "weather": {
        "command": "python", 
        "args": ["-m", "weather_mcp_server"],
        "env": {"API_KEY": "your_weather_key"}
    }
}

processor, stream_manager = await setup_mcp_stdio(
    config_file="mcp_config.json",
    servers=["weather"],
    namespace="tools"
)
```

#### Supported Transports
- **STDIO**: Process-based communication for local MCP servers
- **SSE**: Server-Sent Events for cloud-based MCP services  
- **HTTP Streamable**: Modern HTTP-based transport (spec 2025-03-26)

### 7. Monitoring & Observability

#### Structured Logging
```python
from chuk_tool_processor.logging import setup_logging, get_logger, log_context_span

# Setup structured logging
await setup_logging(
    level=logging.INFO,
    structured=True,  # JSON output for production
    log_file="tool_processor.log"
)

# Use contextual logging
logger = get_logger("my_app")

async def process_user_request(user_id: str, request: str):
    async with log_context_span("user_request", {"user_id": user_id}):
        logger.info("Processing user request", extra={
            "request_length": len(request),
            "user_id": user_id
        })
        
        results = await processor.process(request)
        
        logger.info("Request processed successfully", extra={
            "num_tools": len(results),
            "success_rate": sum(1 for r in results if not r.error) / len(results)
        })
```

#### Automatic Metrics Collection
```python
# Metrics are automatically collected for:
# - Tool execution success/failure rates
# - Execution durations and performance
# - Cache hit/miss rates and efficiency  
# - Parser performance and accuracy
# - Registry operations and health

# Access programmatic metrics
from chuk_tool_processor.logging import metrics

# Custom metrics
await metrics.log_tool_execution(
    tool="custom_metric",
    success=True,
    duration=1.5,
    cached=False,
    attempts=1
)
```

### 8. Error Handling & Best Practices

#### Robust Error Handling
```python
async def robust_tool_processing(llm_response: str):
    """Example of production-ready error handling."""
    processor = ToolProcessor(
        default_timeout=30.0,
        enable_retries=True,
        max_retries=3
    )
    
    try:
        results = await processor.process(llm_response, timeout=60.0)
        
        successful_results = []
        failed_results = []
        
        for result in results:
            if result.error:
                failed_results.append(result)
                logger.error(f"Tool {result.tool} failed: {result.error}", extra={
                    "tool": result.tool,
                    "duration": result.duration,
                    "attempts": getattr(result, "attempts", 1)
                })
            else:
                successful_results.append(result)
                logger.info(f"Tool {result.tool} succeeded", extra={
                    "tool": result.tool,
                    "duration": result.duration,
                    "cached": getattr(result, "cached", False)
                })
        
        return {
            "successful": successful_results,
            "failed": failed_results,
            "success_rate": len(successful_results) / len(results) if results else 0
        }
        
    except Exception as e:
        logger.exception("Failed to process LLM response")
        raise
```

#### Testing Your Tools
```python
import pytest
from chuk_tool_processor import ToolProcessor, initialize

@pytest.mark.asyncio
async def test_calculator_tool():
    await initialize()
    processor = ToolProcessor()
    
    results = await processor.process(
        '<tool name="calculator" args=\'{"operation": "add", "a": 5, "b": 3}\'/>'
    )
    
    assert len(results) == 1
    result = results[0]
    assert result.error is None
    assert result.result["result"] == 8
```

## Advanced Configuration

### Production-Ready Setup
```python
from chuk_tool_processor import ToolProcessor
from chuk_tool_processor.execution.strategies.subprocess_strategy import SubprocessStrategy

async def create_production_processor():
    """Configure processor for high-throughput production use."""
    
    processor = ToolProcessor(
        # Execution settings
        default_timeout=30.0,
        max_concurrency=20,        # Allow 20 concurrent executions
        
        # Use subprocess strategy for isolation
        strategy=SubprocessStrategy(
            registry=await get_default_registry(),
            max_workers=8,           # 8 worker processes
            default_timeout=30.0
        ),
        
        # Performance optimizations
        enable_caching=True,
        cache_ttl=900,             # 15-minute cache
        
        # Rate limiting to prevent abuse
        enable_rate_limiting=True,
        global_rate_limit=500,     # 500 requests per minute globally
        tool_rate_limits={
            "expensive_api": (10, 60),    # 10 per minute
            "file_processor": (5, 60),    # 5 per minute
        },
        
        # Reliability features
        enable_retries=True,
        max_retries=3,
        
        # Input parsing
        parser_plugins=["xml_tool", "openai_tool", "json_tool"]
    )
    
    await processor.initialize()
    return processor
```

### Performance Optimization
```python
# Concurrent batch processing
async def process_batch(requests: list[str]):
    """Process multiple LLM responses concurrently."""
    processor = await create_production_processor()
    
    tasks = [processor.process(request) for request in requests]
    all_results = await asyncio.gather(*tasks, return_exceptions=True)
    
    successful = []
    failed = []
    
    for i, result in enumerate(all_results):
        if isinstance(result, Exception):
            failed.append({"request_index": i, "error": str(result)})
        else:
            successful.append({"request_index": i, "results": result})
    
    return {"successful": successful, "failed": failed}

# Memory management for long-running applications
async def maintenance_task():
    """Periodic maintenance for production deployments."""
    while True:
        await asyncio.sleep(3600)  # Every hour
        
        # Clear old cache entries
        if hasattr(processor.executor, 'cache'):
            await processor.executor.cache.clear()
            logger.info("Cache cleared for memory management")
```

## Key Design Patterns

1. **Async-First Design**: All core operations use async/await with proper timeout handling, graceful cancellation support, and comprehensive resource cleanup via context managers.

2. **Strategy Pattern**: Pluggable execution strategies (InProcess vs Subprocess), composable wrapper chains, and interface-driven design for maximum flexibility.

3. **Registry Pattern**: Centralized tool management with namespace isolation, rich metadata tracking, and lazy initialization for optimal resource usage.

4. **Plugin Architecture**: Discoverable parsers for different input formats, transport abstractions for MCP integration, and extensible validation systems.

5. **Producer-Consumer**: Queue-based streaming architecture for real-time results, with proper backpressure handling and timeout coordination.

6. **Decorator Pattern**: Composable execution wrappers (caching, retries, rate limiting) that can be stacked and configured independently.

## Configuration Reference

### Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `CHUK_TOOL_REGISTRY_PROVIDER` | `memory` | Registry backend (memory, redis, etc.) |
| `CHUK_DEFAULT_TIMEOUT` | `30.0` | Default tool execution timeout (seconds) |
| `CHUK_LOG_LEVEL` | `INFO` | Logging level (DEBUG, INFO, WARNING, ERROR) |
| `CHUK_STRUCTURED_LOGGING` | `true` | Enable JSON structured logging |
| `CHUK_MAX_CONCURRENCY` | `10` | Default max concurrent executions |
| `MCP_BEARER_TOKEN` | - | Bearer token for MCP SSE authentication |

### ToolProcessor Options
```python
processor = ToolProcessor(
    # Core execution
    default_timeout=30.0,              # Default timeout per tool
    max_concurrency=10,                # Max concurrent executions
    
    # Strategy selection
    strategy=InProcessStrategy(...),   # Fast, shared memory
    # strategy=SubprocessStrategy(...), # Isolated, safer for untrusted code
    
    # Performance features
    enable_caching=True,               # Result caching
    cache_ttl=300,                     # Cache TTL in seconds
    enable_rate_limiting=False,        # Rate limiting
    enable_retries=True,               # Automatic retries
    max_retries=3,                     # Max retry attempts
    
    # Input processing
    parser_plugins=["xml_tool", "openai_tool", "json_tool"]
)
```

## Why Choose CHUK Tool Processor?

### Built for Production
- **Battle-tested**: Comprehensive error handling, timeout management, and resource cleanup
- **Scalable**: Support for high-throughput concurrent execution with configurable limits
- **Observable**: Built-in structured logging, metrics collection, and request tracing
- **Reliable**: Automatic retries, circuit breakers, and graceful degradation

### Developer Experience
- **Zero-config start**: Works out of the box with sensible defaults
- **Type-safe**: Full Pydantic integration for argument and result validation  
- **Multiple paradigms**: Support for functions, classes, and streaming tools
- **Flexible inputs**: Handles XML tags, OpenAI format, JSON, and direct objects

### Enterprise Ready
- **Process isolation**: Subprocess strategy for running untrusted code safely
- **Rate limiting**: Global and per-tool rate limiting with sliding window algorithm
- **Caching layer**: Intelligent caching with TTL and invalidation strategies  
- **MCP integration**: Connect to external tool servers using industry standards

### Performance Optimized
- **Async-native**: Built from ground up for `async/await` with proper concurrency
- **Streaming support**: Real-time incremental results for long-running operations
- **Resource efficient**: Lazy initialization, connection pooling, and memory management
- **Configurable strategies**: Choose between speed (in-process) and safety (subprocess)

## Getting Started

### 1. Installation
```bash
# From source (recommended for development)
git clone https://github.com/chrishayuk/chuk-tool-processor.git
cd chuk-tool-processor
pip install -e .

# Or install from PyPI (when available)
pip install chuk-tool-processor
```

### 2. Quick Example
```python
import asyncio
from chuk_tool_processor import ToolProcessor, register_tool, initialize

@register_tool(name="hello")
class HelloTool:
    async def execute(self, name: str) -> str:
        return f"Hello, {name}!"

async def main():
    await initialize()
    processor = ToolProcessor()
    
    results = await processor.process(
        '<tool name="hello" args=\'{"name": "World"}\'/>'
    )
    
    print(results[0].result)  # Output: Hello, World!

asyncio.run(main())
```

### 3. Next Steps
- Review the [Architecture Guide](docs/architecture.md) for deeper understanding
- Check out [Tool Development Guide](docs/tools.md) for advanced patterns
- Explore [MCP Integration](docs/mcp.md) for external tool servers
- See [Production Deployment](docs/deployment.md) for scaling considerations

## Contributing & Support

- **GitHub**: [chrishayuk/chuk-tool-processor](https://github.com/chrishayuk/chuk-tool-processor)
- **Issues**: [Report bugs and request features](https://github.com/chrishayuk/chuk-tool-processor/issues)
- **Discussions**: [Community discussions](https://github.com/chrishayuk/chuk-tool-processor/discussions)
- **License**: MIT - see [LICENSE](LICENSE) file for details

Built with ❤️ by the CHUK AI team for the LLM tool integration community.

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "chuk-tool-processor",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.11",
    "maintainer_email": "CHUK Team <chrishayuk@somejunkmailbox.com>",
    "keywords": "llm, tools, async, ai, openai, mcp, model-context-protocol, tool-calling, function-calling",
    "author": null,
    "author_email": "CHUK Team <chrishayuk@somejunkmailbox.com>",
    "download_url": "https://files.pythonhosted.org/packages/0c/c8/3f8e5e445e373ed5e0587623aa7f204f430dde723a0a48323c7d4045f6ad/chuk_tool_processor-0.6.1.tar.gz",
    "platform": null,
    "description": "# CHUK Tool Processor - Architectural Analysis\n\n## Overview\nThe CHUK Tool Processor is a sophisticated async-native framework for registering, discovering, and executing tools referenced in LLM responses. Built from the ground up for production use with comprehensive error handling, monitoring, and scalability features. It features a modular architecture with multiple transport mechanisms, execution strategies, and comprehensive tooling for production use.\n\n## Quick Start Example\n```python\nimport asyncio\nfrom chuk_tool_processor import ToolProcessor, register_tool, initialize\n\n# 1. Create a tool\n@register_tool(name=\"calculator\", description=\"Perform basic math operations\")\nclass Calculator:\n    async def execute(self, operation: str, a: float, b: float) -> dict:\n        operations = {\n            \"add\": a + b,\n            \"subtract\": a - b,\n            \"multiply\": a * b,\n            \"divide\": a / b if b != 0 else None\n        }\n        \n        if operation not in operations:\n            raise ValueError(f\"Unknown operation: {operation}\")\n        \n        result = operations[operation]\n        if result is None:\n            raise ValueError(\"Cannot divide by zero\")\n            \n        return {\"operation\": operation, \"operands\": [a, b], \"result\": result}\n\nasync def main():\n    # 2. Initialize the system\n    await initialize()\n    \n    # 3. Process LLM output containing tool calls\n    processor = ToolProcessor()\n    results = await processor.process('''\n        <tool name=\"calculator\" args='{\"operation\": \"multiply\", \"a\": 15, \"b\": 23}'/>\n    ''')\n    \n    # 4. Handle results\n    for result in results:\n        if result.error:\n            print(f\"\u274c Tool '{result.tool}' failed: {result.error}\")\n        else:\n            print(f\"\u2705 Tool '{result.tool}' result: {result.result}\")\n\nasyncio.run(main())\n```\n\n## Key Features & Benefits\n\n- **\ud83d\udd04 Async-Native**: Built for `async/await` from the ground up for optimal performance\n- **\ud83d\udee1\ufe0f Production Ready**: Comprehensive error handling, timeouts, retries, and monitoring\n- **\ud83d\udce6 Multiple Execution Strategies**: In-process for speed, subprocess for isolation\n- **\ud83d\ude80 High Performance**: Built-in caching, rate limiting, and concurrency control\n- **\ud83d\udcca Observability**: Structured logging, metrics collection, and request tracing\n- **\ud83d\udd17 MCP Integration**: Full Model Context Protocol support (STDIO, SSE, HTTP Streamable)\n- **\ud83d\udce1 Streaming Support**: Real-time incremental results for long-running operations\n- **\ud83d\udd27 Extensible Architecture**: Plugin system for custom parsers and execution strategies\n- **\ud83c\udfaf Multiple Input Formats**: XML tags, OpenAI tool_calls, JSON, function_call formats\n- **\u26a1 Zero-Config Start**: Works out of the box with sensible defaults\n\n## Core Architecture\n\n### Installation & Setup\n\n```bash\n# From source (recommended for development)\ngit clone https://github.com/chrishayuk/chuk-tool-processor.git\ncd chuk-tool-processor\npip install -e .\n\n# Or install from PyPI (when available)\npip install chuk-tool-processor\n```\n\n### Environment Configuration\n```bash\n# Optional: Registry provider (default: memory)\nexport CHUK_TOOL_REGISTRY_PROVIDER=memory\n\n# Optional: Default timeout (default: 30.0)\nexport CHUK_DEFAULT_TIMEOUT=30.0\n\n# Optional: Enable structured JSON logging\nexport CHUK_STRUCTURED_LOGGING=true\n\n# MCP Integration (if using external MCP servers)\nexport MCP_BEARER_TOKEN=your_bearer_token_here\n```\n### 1. Registry System\n- **Interface-driven**: `ToolRegistryInterface` protocol defines the contract\n- **Async-native**: All registry operations are async\n- **Namespace support**: Tools are organized into namespaces (default: \"default\")\n- **Metadata tracking**: Rich metadata with `ToolMetadata` model\n- **Provider pattern**: `ToolRegistryProvider` for singleton management\n\n```python\n# Example registry usage\nregistry = await ToolRegistryProvider.get_registry()\nawait registry.register_tool(MyTool(), name=\"my_tool\", namespace=\"custom\")\ntool = await registry.get_tool(\"my_tool\", \"custom\")\n```\n\n### 2. Tool Development Patterns\n\n#### Simple Function-Based Tools\n```python\nfrom chuk_tool_processor.registry.auto_register import register_fn_tool\n\nasync def get_current_time(timezone: str = \"UTC\") -> str:\n    \"\"\"Get the current time in the specified timezone.\"\"\"\n    from datetime import datetime\n    import pytz\n    \n    tz = pytz.timezone(timezone)\n    current_time = datetime.now(tz)\n    return current_time.strftime(\"%Y-%m-%d %H:%M:%S %Z\")\n\n# Register the function as a tool\nawait register_fn_tool(get_current_time, namespace=\"utilities\")\n```\n\n#### ValidatedTool (Declarative with Pydantic)\n```python\n@register_tool(name=\"weather\", namespace=\"api\")\nclass WeatherTool(ValidatedTool):\n    class Arguments(BaseModel):\n        location: str = Field(..., description=\"City name or coordinates\")\n        units: str = Field(\"metric\", description=\"Temperature units\")\n    \n    class Result(BaseModel):\n        location: str\n        temperature: float\n        conditions: str\n    \n    async def _execute(self, location: str, units: str) -> Result:\n        # Implementation here\n        return self.Result(location=location, temperature=22.5, conditions=\"Sunny\")\n```\n\n#### StreamingTool (Real-time Results)\n```python\n@register_tool(name=\"file_processor\")\nclass FileProcessorTool(StreamingTool):\n    class Arguments(BaseModel):\n        file_path: str\n        operation: str = \"count_lines\"\n    \n    class Result(BaseModel):\n        line_number: int\n        content: str\n    \n    async def _stream_execute(self, file_path: str, operation: str):\n        \"\"\"Stream results as each line is processed.\"\"\"\n        for i in range(1, 100):\n            await asyncio.sleep(0.01)  # Simulate processing\n            yield self.Result(line_number=i, content=f\"Processed line {i}\")\n```\n\n### 3. Processing LLM Responses\n\nThe processor automatically detects and parses multiple input formats:\n\n```python\nprocessor = ToolProcessor()\n\n# 1. XML Tool Tags (most common)\nxml_response = \"\"\"\n<tool name=\"search\" args='{\"query\": \"Python programming\", \"limit\": 5}'/>\n<tool name=\"get_current_time\" args='{\"timezone\": \"UTC\"}'/>\n\"\"\"\n\n# 2. OpenAI Chat Completions Format\nopenai_response = {\n    \"tool_calls\": [\n        {\n            \"id\": \"call_123\",\n            \"type\": \"function\", \n            \"function\": {\n                \"name\": \"search\",\n                \"arguments\": '{\"query\": \"Python programming\", \"limit\": 5}'\n            }\n        }\n    ]\n}\n\n# 3. Direct ToolCall objects\ntool_calls = [\n    {\"tool\": \"search\", \"arguments\": {\"query\": \"Python programming\", \"limit\": 5}},\n    {\"tool\": \"get_current_time\", \"arguments\": {\"timezone\": \"UTC\"}}\n]\n\n# Process any format\nresults1 = await processor.process(xml_response)\nresults2 = await processor.process(openai_response) \nresults3 = await processor.process(tool_calls)\n```\n\n### 4. Execution Strategies\n\n#### InProcessStrategy (Default - Fast & Efficient)\n- **Concurrent execution**: Uses asyncio for parallelism within the same process\n- **Semaphore-based limiting**: Optional max_concurrency control\n- **True streaming support**: Direct access to `stream_execute` methods\n- **Enhanced tool resolution**: Namespace fallback logic with fuzzy matching\n- **Proper timeout handling**: Always applies concrete timeouts\n\n#### SubprocessStrategy (Isolation & Safety)\n- **Process isolation**: Each tool runs in separate OS process for safety\n- **Serialization support**: Handles complex objects and Pydantic models properly\n- **Worker pool management**: Concurrent futures with automatic cleanup\n- **Enhanced error handling**: Broken pool recovery and restart\n- **Timeout coordination**: Safety timeouts prevent worker hangs\n\n```python\n# Configure execution strategy\nfrom chuk_tool_processor.execution.strategies.subprocess_strategy import SubprocessStrategy\n\nprocessor = ToolProcessor(\n    strategy=SubprocessStrategy(\n        registry=await get_default_registry(),\n        max_workers=4,\n        default_timeout=30.0\n    )\n)\n```\n\n### 5. Production Features & Wrappers\n\n#### Caching for Performance\n```python\nfrom chuk_tool_processor.execution.wrappers.caching import cacheable\n\n@cacheable(ttl=600)  # Cache for 10 minutes\n@register_tool(name=\"expensive_api\")\nclass ExpensiveApiTool(ValidatedTool):\n    # Tool implementation\n    pass\n\n# Or configure at processor level\nprocessor = ToolProcessor(\n    enable_caching=True,\n    cache_ttl=300  # 5 minutes default\n)\n```\n\n#### Rate Limiting\n```python\nfrom chuk_tool_processor.execution.wrappers.rate_limiting import rate_limited\n\n@rate_limited(limit=20, period=60.0)  # 20 calls per minute\n@register_tool(name=\"api_tool\")\nclass ApiTool(ValidatedTool):\n    # Tool implementation  \n    pass\n\n# Or processor-level configuration\nprocessor = ToolProcessor(\n    enable_rate_limiting=True,\n    global_rate_limit=100,  # 100 requests per minute globally\n    tool_rate_limits={\n        \"expensive_api\": (10, 60),    # 10 per minute for specific tool\n    }\n)\n```\n\n#### Automatic Retries\n```python\nfrom chuk_tool_processor.execution.wrappers.retry import retryable\n\n@retryable(max_retries=3, base_delay=1.0)\n@register_tool(name=\"unreliable_api\")\nclass UnreliableApiTool(ValidatedTool):\n    # Tool implementation\n    pass\n\n# Processor-level retry configuration\nprocessor = ToolProcessor(\n    enable_retries=True,\n    max_retries=3\n)\n```\n\n### 6. MCP (Model Context Protocol) Integration\n\nConnect to external tool servers using multiple transport protocols:\n\n#### Quick MCP Setup with SSE (Server-Sent Events)\n```python\nfrom chuk_tool_processor.mcp import setup_mcp_sse\n\n# Configure external MCP servers\nservers = [\n    {\n        \"name\": \"weather-service\",\n        \"url\": \"https://weather-mcp.example.com\",\n        \"api_key\": \"your_weather_api_key\"\n    },\n    {\n        \"name\": \"database-service\", \n        \"url\": \"https://db-mcp.example.com\",\n        \"api_key\": \"your_db_api_key\"\n    }\n]\n\n# Initialize with full production configuration\nprocessor, stream_manager = await setup_mcp_sse(\n    servers=servers,\n    namespace=\"mcp\",           # Tools available as mcp.tool_name\n    default_timeout=30.0,\n    enable_caching=True,\n    enable_retries=True\n)\n\n# Use external tools through MCP\nresults = await processor.process('''\n<tool name=\"mcp.weather\" args='{\"location\": \"London\"}'/>\n<tool name=\"mcp.database_query\" args='{\"sql\": \"SELECT COUNT(*) FROM users\"}'/>\n''')\n```\n\n#### STDIO Transport (Process-based)\n```python\nfrom chuk_tool_processor.mcp import setup_mcp_stdio\n\n# Create MCP config for local processes\nmcp_config = {\n    \"weather\": {\n        \"command\": \"python\", \n        \"args\": [\"-m\", \"weather_mcp_server\"],\n        \"env\": {\"API_KEY\": \"your_weather_key\"}\n    }\n}\n\nprocessor, stream_manager = await setup_mcp_stdio(\n    config_file=\"mcp_config.json\",\n    servers=[\"weather\"],\n    namespace=\"tools\"\n)\n```\n\n#### Supported Transports\n- **STDIO**: Process-based communication for local MCP servers\n- **SSE**: Server-Sent Events for cloud-based MCP services  \n- **HTTP Streamable**: Modern HTTP-based transport (spec 2025-03-26)\n\n### 7. Monitoring & Observability\n\n#### Structured Logging\n```python\nfrom chuk_tool_processor.logging import setup_logging, get_logger, log_context_span\n\n# Setup structured logging\nawait setup_logging(\n    level=logging.INFO,\n    structured=True,  # JSON output for production\n    log_file=\"tool_processor.log\"\n)\n\n# Use contextual logging\nlogger = get_logger(\"my_app\")\n\nasync def process_user_request(user_id: str, request: str):\n    async with log_context_span(\"user_request\", {\"user_id\": user_id}):\n        logger.info(\"Processing user request\", extra={\n            \"request_length\": len(request),\n            \"user_id\": user_id\n        })\n        \n        results = await processor.process(request)\n        \n        logger.info(\"Request processed successfully\", extra={\n            \"num_tools\": len(results),\n            \"success_rate\": sum(1 for r in results if not r.error) / len(results)\n        })\n```\n\n#### Automatic Metrics Collection\n```python\n# Metrics are automatically collected for:\n# - Tool execution success/failure rates\n# - Execution durations and performance\n# - Cache hit/miss rates and efficiency  \n# - Parser performance and accuracy\n# - Registry operations and health\n\n# Access programmatic metrics\nfrom chuk_tool_processor.logging import metrics\n\n# Custom metrics\nawait metrics.log_tool_execution(\n    tool=\"custom_metric\",\n    success=True,\n    duration=1.5,\n    cached=False,\n    attempts=1\n)\n```\n\n### 8. Error Handling & Best Practices\n\n#### Robust Error Handling\n```python\nasync def robust_tool_processing(llm_response: str):\n    \"\"\"Example of production-ready error handling.\"\"\"\n    processor = ToolProcessor(\n        default_timeout=30.0,\n        enable_retries=True,\n        max_retries=3\n    )\n    \n    try:\n        results = await processor.process(llm_response, timeout=60.0)\n        \n        successful_results = []\n        failed_results = []\n        \n        for result in results:\n            if result.error:\n                failed_results.append(result)\n                logger.error(f\"Tool {result.tool} failed: {result.error}\", extra={\n                    \"tool\": result.tool,\n                    \"duration\": result.duration,\n                    \"attempts\": getattr(result, \"attempts\", 1)\n                })\n            else:\n                successful_results.append(result)\n                logger.info(f\"Tool {result.tool} succeeded\", extra={\n                    \"tool\": result.tool,\n                    \"duration\": result.duration,\n                    \"cached\": getattr(result, \"cached\", False)\n                })\n        \n        return {\n            \"successful\": successful_results,\n            \"failed\": failed_results,\n            \"success_rate\": len(successful_results) / len(results) if results else 0\n        }\n        \n    except Exception as e:\n        logger.exception(\"Failed to process LLM response\")\n        raise\n```\n\n#### Testing Your Tools\n```python\nimport pytest\nfrom chuk_tool_processor import ToolProcessor, initialize\n\n@pytest.mark.asyncio\nasync def test_calculator_tool():\n    await initialize()\n    processor = ToolProcessor()\n    \n    results = await processor.process(\n        '<tool name=\"calculator\" args=\\'{\"operation\": \"add\", \"a\": 5, \"b\": 3}\\'/>'\n    )\n    \n    assert len(results) == 1\n    result = results[0]\n    assert result.error is None\n    assert result.result[\"result\"] == 8\n```\n\n## Advanced Configuration\n\n### Production-Ready Setup\n```python\nfrom chuk_tool_processor import ToolProcessor\nfrom chuk_tool_processor.execution.strategies.subprocess_strategy import SubprocessStrategy\n\nasync def create_production_processor():\n    \"\"\"Configure processor for high-throughput production use.\"\"\"\n    \n    processor = ToolProcessor(\n        # Execution settings\n        default_timeout=30.0,\n        max_concurrency=20,        # Allow 20 concurrent executions\n        \n        # Use subprocess strategy for isolation\n        strategy=SubprocessStrategy(\n            registry=await get_default_registry(),\n            max_workers=8,           # 8 worker processes\n            default_timeout=30.0\n        ),\n        \n        # Performance optimizations\n        enable_caching=True,\n        cache_ttl=900,             # 15-minute cache\n        \n        # Rate limiting to prevent abuse\n        enable_rate_limiting=True,\n        global_rate_limit=500,     # 500 requests per minute globally\n        tool_rate_limits={\n            \"expensive_api\": (10, 60),    # 10 per minute\n            \"file_processor\": (5, 60),    # 5 per minute\n        },\n        \n        # Reliability features\n        enable_retries=True,\n        max_retries=3,\n        \n        # Input parsing\n        parser_plugins=[\"xml_tool\", \"openai_tool\", \"json_tool\"]\n    )\n    \n    await processor.initialize()\n    return processor\n```\n\n### Performance Optimization\n```python\n# Concurrent batch processing\nasync def process_batch(requests: list[str]):\n    \"\"\"Process multiple LLM responses concurrently.\"\"\"\n    processor = await create_production_processor()\n    \n    tasks = [processor.process(request) for request in requests]\n    all_results = await asyncio.gather(*tasks, return_exceptions=True)\n    \n    successful = []\n    failed = []\n    \n    for i, result in enumerate(all_results):\n        if isinstance(result, Exception):\n            failed.append({\"request_index\": i, \"error\": str(result)})\n        else:\n            successful.append({\"request_index\": i, \"results\": result})\n    \n    return {\"successful\": successful, \"failed\": failed}\n\n# Memory management for long-running applications\nasync def maintenance_task():\n    \"\"\"Periodic maintenance for production deployments.\"\"\"\n    while True:\n        await asyncio.sleep(3600)  # Every hour\n        \n        # Clear old cache entries\n        if hasattr(processor.executor, 'cache'):\n            await processor.executor.cache.clear()\n            logger.info(\"Cache cleared for memory management\")\n```\n\n## Key Design Patterns\n\n1. **Async-First Design**: All core operations use async/await with proper timeout handling, graceful cancellation support, and comprehensive resource cleanup via context managers.\n\n2. **Strategy Pattern**: Pluggable execution strategies (InProcess vs Subprocess), composable wrapper chains, and interface-driven design for maximum flexibility.\n\n3. **Registry Pattern**: Centralized tool management with namespace isolation, rich metadata tracking, and lazy initialization for optimal resource usage.\n\n4. **Plugin Architecture**: Discoverable parsers for different input formats, transport abstractions for MCP integration, and extensible validation systems.\n\n5. **Producer-Consumer**: Queue-based streaming architecture for real-time results, with proper backpressure handling and timeout coordination.\n\n6. **Decorator Pattern**: Composable execution wrappers (caching, retries, rate limiting) that can be stacked and configured independently.\n\n## Configuration Reference\n\n### Environment Variables\n| Variable | Default | Description |\n|----------|---------|-------------|\n| `CHUK_TOOL_REGISTRY_PROVIDER` | `memory` | Registry backend (memory, redis, etc.) |\n| `CHUK_DEFAULT_TIMEOUT` | `30.0` | Default tool execution timeout (seconds) |\n| `CHUK_LOG_LEVEL` | `INFO` | Logging level (DEBUG, INFO, WARNING, ERROR) |\n| `CHUK_STRUCTURED_LOGGING` | `true` | Enable JSON structured logging |\n| `CHUK_MAX_CONCURRENCY` | `10` | Default max concurrent executions |\n| `MCP_BEARER_TOKEN` | - | Bearer token for MCP SSE authentication |\n\n### ToolProcessor Options\n```python\nprocessor = ToolProcessor(\n    # Core execution\n    default_timeout=30.0,              # Default timeout per tool\n    max_concurrency=10,                # Max concurrent executions\n    \n    # Strategy selection\n    strategy=InProcessStrategy(...),   # Fast, shared memory\n    # strategy=SubprocessStrategy(...), # Isolated, safer for untrusted code\n    \n    # Performance features\n    enable_caching=True,               # Result caching\n    cache_ttl=300,                     # Cache TTL in seconds\n    enable_rate_limiting=False,        # Rate limiting\n    enable_retries=True,               # Automatic retries\n    max_retries=3,                     # Max retry attempts\n    \n    # Input processing\n    parser_plugins=[\"xml_tool\", \"openai_tool\", \"json_tool\"]\n)\n```\n\n## Why Choose CHUK Tool Processor?\n\n### Built for Production\n- **Battle-tested**: Comprehensive error handling, timeout management, and resource cleanup\n- **Scalable**: Support for high-throughput concurrent execution with configurable limits\n- **Observable**: Built-in structured logging, metrics collection, and request tracing\n- **Reliable**: Automatic retries, circuit breakers, and graceful degradation\n\n### Developer Experience\n- **Zero-config start**: Works out of the box with sensible defaults\n- **Type-safe**: Full Pydantic integration for argument and result validation  \n- **Multiple paradigms**: Support for functions, classes, and streaming tools\n- **Flexible inputs**: Handles XML tags, OpenAI format, JSON, and direct objects\n\n### Enterprise Ready\n- **Process isolation**: Subprocess strategy for running untrusted code safely\n- **Rate limiting**: Global and per-tool rate limiting with sliding window algorithm\n- **Caching layer**: Intelligent caching with TTL and invalidation strategies  \n- **MCP integration**: Connect to external tool servers using industry standards\n\n### Performance Optimized\n- **Async-native**: Built from ground up for `async/await` with proper concurrency\n- **Streaming support**: Real-time incremental results for long-running operations\n- **Resource efficient**: Lazy initialization, connection pooling, and memory management\n- **Configurable strategies**: Choose between speed (in-process) and safety (subprocess)\n\n## Getting Started\n\n### 1. Installation\n```bash\n# From source (recommended for development)\ngit clone https://github.com/chrishayuk/chuk-tool-processor.git\ncd chuk-tool-processor\npip install -e .\n\n# Or install from PyPI (when available)\npip install chuk-tool-processor\n```\n\n### 2. Quick Example\n```python\nimport asyncio\nfrom chuk_tool_processor import ToolProcessor, register_tool, initialize\n\n@register_tool(name=\"hello\")\nclass HelloTool:\n    async def execute(self, name: str) -> str:\n        return f\"Hello, {name}!\"\n\nasync def main():\n    await initialize()\n    processor = ToolProcessor()\n    \n    results = await processor.process(\n        '<tool name=\"hello\" args=\\'{\"name\": \"World\"}\\'/>'\n    )\n    \n    print(results[0].result)  # Output: Hello, World!\n\nasyncio.run(main())\n```\n\n### 3. Next Steps\n- Review the [Architecture Guide](docs/architecture.md) for deeper understanding\n- Check out [Tool Development Guide](docs/tools.md) for advanced patterns\n- Explore [MCP Integration](docs/mcp.md) for external tool servers\n- See [Production Deployment](docs/deployment.md) for scaling considerations\n\n## Contributing & Support\n\n- **GitHub**: [chrishayuk/chuk-tool-processor](https://github.com/chrishayuk/chuk-tool-processor)\n- **Issues**: [Report bugs and request features](https://github.com/chrishayuk/chuk-tool-processor/issues)\n- **Discussions**: [Community discussions](https://github.com/chrishayuk/chuk-tool-processor/discussions)\n- **License**: MIT - see [LICENSE](LICENSE) file for details\n\nBuilt with \u2764\ufe0f by the CHUK AI team for the LLM tool integration community.\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Async-native framework for registering, discovering, and executing tools referenced in LLM responses",
    "version": "0.6.1",
    "project_urls": null,
    "split_keywords": [
        "llm",
        " tools",
        " async",
        " ai",
        " openai",
        " mcp",
        " model-context-protocol",
        " tool-calling",
        " function-calling"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "b74f73baca62ee8a66053e8750037ce6e040dceea115736e27ec84b58f576962",
                "md5": "1f2dc44d9260aca60d5e79bfac688796",
                "sha256": "e7fbd12e0f6a86d0c65cf834168dec9f7df168cb810de0ac0084db11b1bd64a7"
            },
            "downloads": -1,
            "filename": "chuk_tool_processor-0.6.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "1f2dc44d9260aca60d5e79bfac688796",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.11",
            "size": 112663,
            "upload_time": "2025-07-11T16:14:23",
            "upload_time_iso_8601": "2025-07-11T16:14:23.891241Z",
            "url": "https://files.pythonhosted.org/packages/b7/4f/73baca62ee8a66053e8750037ce6e040dceea115736e27ec84b58f576962/chuk_tool_processor-0.6.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "0cc83f8e5e445e373ed5e0587623aa7f204f430dde723a0a48323c7d4045f6ad",
                "md5": "90a6b5651c71cc916831673d42a7e14c",
                "sha256": "2fbed6f401684f0eaddc8e3b6e9aff5ac97d3ad6988c1e01d08812e5735e57c0"
            },
            "downloads": -1,
            "filename": "chuk_tool_processor-0.6.1.tar.gz",
            "has_sig": false,
            "md5_digest": "90a6b5651c71cc916831673d42a7e14c",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.11",
            "size": 88803,
            "upload_time": "2025-07-11T16:14:25",
            "upload_time_iso_8601": "2025-07-11T16:14:25.375977Z",
            "url": "https://files.pythonhosted.org/packages/0c/c8/3f8e5e445e373ed5e0587623aa7f204f430dde723a0a48323c7d4045f6ad/chuk_tool_processor-0.6.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-07-11 16:14:25",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "chuk-tool-processor"
}
        
Elapsed time: 0.43209s