# CHUK Tool Processor
[](https://pypi.org/project/chuk-tool-processor/)
[](https://pypi.org/project/chuk-tool-processor/)
[](LICENSE)
**The missing link between LLM tool calls and reliable execution.**
CHUK Tool Processor is a focused, production-ready framework that solves one problem exceptionally well: **processing tool calls from LLM outputs**. It's not a chatbot framework or LLM orchestration platform—it's the glue layer that bridges LLM responses and actual tool execution.
## The Problem
When you build LLM applications, you face a gap:
1. **LLM generates tool calls** in various formats (XML tags, OpenAI `tool_calls`, JSON)
2. **??? Mystery step ???** where you need to:
- Parse those calls reliably
- Handle timeouts, retries, failures
- Cache expensive results
- Rate limit API calls
- Run untrusted code safely
- Connect to external tool servers
- Log everything for debugging
3. **Get results back** to continue the LLM conversation
Most frameworks give you steps 1 and 3, but step 2 is where the complexity lives. CHUK Tool Processor **is** step 2.
## Why chuk-tool-processor?
### It's a Building Block, Not a Framework
Unlike full-fledged LLM frameworks (LangChain, LlamaIndex, etc.), CHUK Tool Processor:
- ✅ **Does one thing well**: Process tool calls reliably
- ✅ **Plugs into any LLM app**: Works with any framework or no framework
- ✅ **Composable by design**: Stack strategies and wrappers like middleware
- ✅ **No opinions about your LLM**: Bring your own OpenAI, Anthropic, local model
- ❌ **Doesn't manage conversations**: That's your job
- ❌ **Doesn't do prompt engineering**: Use whatever prompting you want
- ❌ **Doesn't bundle an LLM client**: Use any client library you prefer
### It's Built for Production
Research code vs production code is about handling the edges:
- **Timeouts**: Every tool execution has proper timeout handling
- **Retries**: Automatic retry with exponential backoff and deadline awareness
- **Rate Limiting**: Global and per-tool rate limits with sliding windows
- **Caching**: Intelligent result caching with TTL and idempotency key support
- **Circuit Breakers**: Prevent cascading failures with automatic fault detection
- **Error Handling**: Machine-readable error codes with structured details
- **Observability**: Structured logging, metrics, request tracing
- **Safety**: Subprocess isolation for untrusted code
- **Type Safety**: Pydantic validation with LLM-friendly argument coercion
- **Tool Discovery**: Formal schema export (OpenAI, Anthropic, MCP formats)
### It's About Stacks
CHUK Tool Processor uses a **composable stack architecture**:
```
┌─────────────────────────────────┐
│ Your LLM Application │
│ (handles prompts, responses) │
└────────────┬────────────────────┘
│ tool calls
▼
┌─────────────────────────────────┐
│ Caching Wrapper │ ← Cache expensive results (idempotency keys)
├─────────────────────────────────┤
│ Rate Limiting Wrapper │ ← Prevent API abuse
├─────────────────────────────────┤
│ Retry Wrapper │ ← Handle transient failures (exponential backoff)
├─────────────────────────────────┤
│ Circuit Breaker Wrapper │ ← Prevent cascading failures (CLOSED/OPEN/HALF_OPEN)
├─────────────────────────────────┤
│ Execution Strategy │ ← How to run tools
│ • InProcess (fast) │
│ • Subprocess (isolated) │
├─────────────────────────────────┤
│ Tool Registry │ ← Your registered tools
└─────────────────────────────────┘
```
Each layer is **optional** and **configurable**. Mix and match what you need.
## Compatibility Matrix
| Component | Supported Versions | Notes |
|-----------|-------------------|-------|
| **Python** | 3.11, 3.12, 3.13 | Python 3.11+ required |
| **Operating Systems** | macOS, Linux, Windows | All platforms fully supported |
| **LLM Providers** | OpenAI, Anthropic, Local models | Any LLM that outputs tool calls |
| **MCP Transports** | HTTP Streamable, STDIO, SSE | All MCP 1.0 transports |
| **MCP Servers** | Notion, SQLite, Atlassian, Echo, Custom | Any MCP-compliant server |
**Tested Configurations:**
- ✅ macOS 14+ (Apple Silicon & Intel)
- ✅ Ubuntu 20.04+ / Debian 11+
- ✅ Windows 10+ (native & WSL2)
- ✅ Python 3.11.0+, 3.12.0+, 3.13.0+
- ✅ OpenAI GPT-4, GPT-4 Turbo
- ✅ Anthropic Claude 3 (Opus, Sonnet, Haiku)
- ✅ Local models (Ollama, LM Studio)
## Quick Start
### Installation
**Prerequisites:** Python 3.11+ • Works on macOS, Linux, Windows
```bash
# Using pip
pip install chuk-tool-processor
# Using uv (recommended)
uv pip install chuk-tool-processor
# Or from source
git clone https://github.com/chrishayuk/chuk-tool-processor.git
cd chuk-tool-processor
uv pip install -e .
```
## 60-Second Quick Start
**Absolutely minimal example** → See `examples/hello_tool.py`:
```bash
python examples/hello_tool.py
```
Single file that demonstrates:
- Registering a tool
- Parsing OpenAI & Anthropic formats
- Executing and getting results
Takes 60 seconds to understand, 3 minutes to master.
### 3-Minute Example
Copy-paste this into a file and run it:
```python
import asyncio
from chuk_tool_processor.core.processor import ToolProcessor
from chuk_tool_processor.registry import initialize, register_tool
# Step 1: Define a tool
@register_tool(name="calculator")
class Calculator:
async def execute(self, operation: str, a: float, b: float) -> dict:
ops = {"add": a + b, "multiply": a * b, "subtract": a - b}
if operation not in ops:
raise ValueError(f"Unsupported operation: {operation}")
return {"result": ops[operation]}
# Step 2: Process LLM output
async def main():
await initialize()
processor = ToolProcessor()
# Your LLM returned this tool call
llm_output = '<tool name="calculator" args=\'{"operation": "multiply", "a": 15, "b": 23}\'/>'
# Process it
results = await processor.process(llm_output)
# Each result is a ToolExecutionResult with: tool, args, result, error, duration, cached
# results[0].result contains the tool output
# results[0].error contains any error message (None if successful)
if results[0].error:
print(f"Error: {results[0].error}")
else:
print(results[0].result) # {'result': 345}
asyncio.run(main())
```
**That's it.** You now have production-ready tool execution with timeouts, retries, and caching.
> **Why not just use OpenAI tool calls?**
> OpenAI's function calling is great for parsing, but you still need: parsing multiple formats (Anthropic XML, etc.), timeouts, retries, rate limits, caching, subprocess isolation, and connecting to external MCP servers. CHUK Tool Processor **is** that missing middle layer.
## Documentation Quick Reference
| Document | What It Covers |
|----------|----------------|
| 📘 [CONFIGURATION.md](docs/CONFIGURATION.md) | **All config knobs & defaults**: ToolProcessor options, timeouts, retry policy, rate limits, circuit breakers, caching, environment variables |
| 🚨 [ERRORS.md](docs/ERRORS.md) | **Error taxonomy**: All error codes, exception classes, error details structure, handling patterns, retryability guide |
| 📊 [OBSERVABILITY.md](docs/OBSERVABILITY.md) | **Metrics & tracing**: OpenTelemetry setup, Prometheus metrics, spans reference, PromQL queries |
| 🔌 [examples/hello_tool.py](examples/hello_tool.py) | **60-second starter**: Single-file, copy-paste-and-run example |
| 🎯 [examples/](examples/) | **20+ working examples**: MCP integration, OAuth flows, streaming, production patterns |
## Choose Your Path
| Your Goal | What You Need | Where to Look |
|-----------|---------------|---------------|
| ☕ **Just process LLM tool calls** | Basic tool registration + processor | [60-Second Quick Start](#60-second-quick-start) |
| 🔌 **Connect to external tools** | MCP integration (HTTP/STDIO/SSE) | [MCP Integration](#5-mcp-integration-external-tools) |
| 🛡️ **Production deployment** | Timeouts, retries, rate limits, caching | [CONFIGURATION.md](docs/CONFIGURATION.md) |
| 🔒 **Run untrusted code safely** | Subprocess isolation strategy | [Subprocess Strategy](#using-subprocess-strategy) |
| 📊 **Monitor and observe** | OpenTelemetry + Prometheus | [OBSERVABILITY.md](docs/OBSERVABILITY.md) |
| 🌊 **Stream incremental results** | StreamingTool pattern | [StreamingTool](#streamingtool-real-time-results) |
| 🚨 **Handle errors reliably** | Error codes & taxonomy | [ERRORS.md](docs/ERRORS.md) |
### Real-World Quick Start
Here are the most common patterns you'll use:
**Pattern 1: Local tools only**
```python
import asyncio
from chuk_tool_processor.core.processor import ToolProcessor
from chuk_tool_processor.registry import initialize, register_tool
@register_tool(name="my_tool")
class MyTool:
async def execute(self, arg: str) -> dict:
return {"result": f"Processed: {arg}"}
async def main():
await initialize()
processor = ToolProcessor()
llm_output = '<tool name="my_tool" args=\'{"arg": "hello"}\'/>'
results = await processor.process(llm_output)
print(results[0].result) # {'result': 'Processed: hello'}
asyncio.run(main())
```
**Pattern 2: Mix local + remote MCP tools (Notion)**
```python
import asyncio
from chuk_tool_processor.registry import initialize, register_tool
from chuk_tool_processor.mcp import setup_mcp_http_streamable
@register_tool(name="local_calculator")
class Calculator:
async def execute(self, a: int, b: int) -> int:
return a + b
async def main():
# Register local tools first
await initialize()
# Then add Notion MCP tools (requires OAuth token)
processor, manager = await setup_mcp_http_streamable(
servers=[{
"name": "notion",
"url": "https://mcp.notion.com/mcp",
"headers": {"Authorization": f"Bearer {access_token}"}
}],
namespace="notion",
initialization_timeout=120.0
)
# Now you have both local and remote tools!
results = await processor.process('''
<tool name="local_calculator" args='{"a": 5, "b": 3}'/>
<tool name="notion.search_pages" args='{"query": "project docs"}'/>
''')
print(f"Local result: {results[0].result}")
print(f"Notion result: {results[1].result}")
asyncio.run(main())
```
See `examples/notion_oauth.py` for complete OAuth flow.
**Pattern 3: Local SQLite database via STDIO**
```python
import asyncio
import json
from chuk_tool_processor.mcp import setup_mcp_stdio
async def main():
# Configure SQLite MCP server (runs locally)
config = {
"mcpServers": {
"sqlite": {
"command": "uvx",
"args": ["mcp-server-sqlite", "--db-path", "./app.db"],
"transport": "stdio"
}
}
}
with open("mcp_config.json", "w") as f:
json.dump(config, f)
processor, manager = await setup_mcp_stdio(
config_file="mcp_config.json",
servers=["sqlite"],
namespace="db",
initialization_timeout=120.0 # First run downloads the package
)
# Query your local database via MCP
results = await processor.process(
'<tool name="db.query" args=\'{"sql": "SELECT * FROM users LIMIT 10"}\'/>'
)
print(results[0].result)
asyncio.run(main())
```
See `examples/stdio_sqlite.py` for complete working example.
## Core Concepts
### 1. Tool Registry
The **registry** is where you register tools for execution. Tools can be:
- **Simple classes** with an `async execute()` method
- **ValidatedTool** subclasses with Pydantic validation
- **StreamingTool** for real-time incremental results
- **Functions** registered via `register_fn_tool()`
```python
from chuk_tool_processor.registry import register_tool
from chuk_tool_processor.models.validated_tool import ValidatedTool
from pydantic import BaseModel, Field
@register_tool(name="weather")
class WeatherTool(ValidatedTool):
class Arguments(BaseModel):
location: str = Field(..., description="City name")
units: str = Field("celsius", description="Temperature units")
class Result(BaseModel):
temperature: float
conditions: str
async def _execute(self, location: str, units: str) -> Result:
# Your weather API logic here
return self.Result(temperature=22.5, conditions="Sunny")
```
### 2. Execution Strategies
**Strategies** determine *how* tools run:
| Strategy | Use Case | Trade-offs |
|----------|----------|------------|
| **InProcessStrategy** | Fast, trusted tools | Speed ✅, Isolation ❌ |
| **SubprocessStrategy** | Untrusted or risky code | Isolation ✅, Speed ❌ |
```python
import asyncio
from chuk_tool_processor.core.processor import ToolProcessor
from chuk_tool_processor.execution.strategies.subprocess_strategy import SubprocessStrategy
from chuk_tool_processor.registry import get_default_registry
async def main():
registry = await get_default_registry()
processor = ToolProcessor(
strategy=SubprocessStrategy(
registry=registry,
max_workers=4,
default_timeout=30.0
)
)
# Use processor...
asyncio.run(main())
```
### 3. Execution Wrappers (Middleware)
**Wrappers** add production features as composable layers:
```python
processor = ToolProcessor(
enable_caching=True, # Cache expensive calls
cache_ttl=600, # 10 minutes
enable_rate_limiting=True, # Prevent abuse
global_rate_limit=100, # 100 req/min globally
enable_retries=True, # Auto-retry failures
max_retries=3 # Up to 3 attempts
)
```
The processor stacks them automatically: **Cache → Rate Limit → Retry → Strategy → Tool**
### 4. Input Parsers (Plugins)
**Parsers** extract tool calls from various LLM output formats:
**XML Tags (Anthropic-style)**
```xml
<tool name="search" args='{"query": "Python"}'/>
```
**OpenAI `tool_calls` (JSON)**
```json
{
"tool_calls": [
{
"type": "function",
"function": {
"name": "search",
"arguments": "{\"query\": \"Python\"}"
}
}
]
}
```
**Direct JSON (array of calls)**
```json
[
{ "tool": "search", "arguments": { "query": "Python" } }
]
```
All formats work automatically—no configuration needed.
**Input Format Compatibility:**
| Format | Example | Use Case |
|--------|---------|----------|
| **XML Tool Tag** | `<tool name="search" args='{"q":"Python"}'/>`| Anthropic Claude, XML-based LLMs |
| **OpenAI tool_calls** | JSON object (above) | OpenAI GPT-4 function calling |
| **Direct JSON** | `[{"tool": "search", "arguments": {"q": "Python"}}]` | Generic API integrations |
| **Single dict** | `{"tool": "search", "arguments": {"q": "Python"}}` | Programmatic calls |
### 5. MCP Integration (External Tools)
Connect to **remote tool servers** using the [Model Context Protocol](https://modelcontextprotocol.io). CHUK Tool Processor supports three transport mechanisms for different use cases:
#### HTTP Streamable (⭐ Recommended for Cloud Services)
Modern HTTP streaming transport for cloud-based MCP servers like Notion:
```python
from chuk_tool_processor.mcp import setup_mcp_http_streamable
# Connect to Notion MCP with OAuth
servers = [
{
"name": "notion",
"url": "https://mcp.notion.com/mcp",
"headers": {"Authorization": f"Bearer {access_token}"}
}
]
processor, manager = await setup_mcp_http_streamable(
servers=servers,
namespace="notion",
initialization_timeout=120.0, # Some services need time to initialize
enable_caching=True,
enable_retries=True
)
# Use Notion tools through MCP
results = await processor.process(
'<tool name="notion.search_pages" args=\'{"query": "meeting notes"}\'/>'
)
```
#### STDIO (Best for Local/On-Device Tools)
For running local MCP servers as subprocesses—great for databases, file systems, and local tools:
```python
from chuk_tool_processor.mcp import setup_mcp_stdio
import json
# Configure SQLite MCP server
config = {
"mcpServers": {
"sqlite": {
"command": "uvx",
"args": ["mcp-server-sqlite", "--db-path", "/path/to/database.db"],
"env": {"MCP_SERVER_NAME": "sqlite"},
"transport": "stdio"
}
}
}
# Save config to file
with open("mcp_config.json", "w") as f:
json.dump(config, f)
# Connect to local SQLite server
processor, manager = await setup_mcp_stdio(
config_file="mcp_config.json",
servers=["sqlite"],
namespace="db",
initialization_timeout=120.0 # First run downloads packages
)
# Query your local database via MCP
results = await processor.process(
'<tool name="db.query" args=\'{"sql": "SELECT * FROM users LIMIT 10"}\'/>'
)
```
#### SSE (Legacy Support)
For backward compatibility with older MCP servers using Server-Sent Events:
```python
from chuk_tool_processor.mcp import setup_mcp_sse
# Connect to Atlassian with OAuth via SSE
servers = [
{
"name": "atlassian",
"url": "https://mcp.atlassian.com/v1/sse",
"headers": {"Authorization": f"Bearer {access_token}"}
}
]
processor, manager = await setup_mcp_sse(
servers=servers,
namespace="atlassian",
initialization_timeout=120.0
)
```
**Transport Comparison:**
| Transport | Use Case | Real Examples |
|-----------|----------|---------------|
| **HTTP Streamable** | Cloud APIs, SaaS services | Notion (`mcp.notion.com`) |
| **STDIO** | Local tools, databases | SQLite (`mcp-server-sqlite`), Echo (`chuk-mcp-echo`) |
| **SSE** | Legacy cloud services | Atlassian (`mcp.atlassian.com`) |
**Relationship with [chuk-mcp](https://github.com/chrishayuk/chuk-mcp):**
- `chuk-mcp` is a low-level MCP protocol client (handles transports, protocol negotiation)
- `chuk-tool-processor` wraps `chuk-mcp` to integrate external tools into your execution pipeline
- You can use local tools, remote MCP tools, or both in the same processor
## Getting Started
### Creating Tools
CHUK Tool Processor supports multiple patterns for defining tools:
#### Simple Function-Based Tools
```python
from chuk_tool_processor.registry.auto_register import register_fn_tool
from datetime import datetime
from zoneinfo import ZoneInfo
def get_current_time(timezone: str = "UTC") -> str:
"""Get the current time in the specified timezone."""
now = datetime.now(ZoneInfo(timezone))
return now.strftime("%Y-%m-%d %H:%M:%S %Z")
# Register the function as a tool (sync — no await needed)
register_fn_tool(get_current_time, namespace="utilities")
```
#### ValidatedTool (Pydantic Type Safety)
For production tools, use Pydantic validation:
```python
@register_tool(name="weather")
class WeatherTool(ValidatedTool):
class Arguments(BaseModel):
location: str = Field(..., description="City name")
units: str = Field("celsius", description="Temperature units")
class Result(BaseModel):
temperature: float
conditions: str
async def _execute(self, location: str, units: str) -> Result:
return self.Result(temperature=22.5, conditions="Sunny")
```
#### StreamingTool (Real-time Results)
For long-running operations that produce incremental results:
```python
from chuk_tool_processor.models import StreamingTool
@register_tool(name="file_processor")
class FileProcessor(StreamingTool):
class Arguments(BaseModel):
file_path: str
class Result(BaseModel):
line: int
content: str
async def _stream_execute(self, file_path: str):
with open(file_path) as f:
for i, line in enumerate(f, 1):
yield self.Result(line=i, content=line.strip())
```
**Consuming streaming results:**
```python
import asyncio
from chuk_tool_processor.core.processor import ToolProcessor
from chuk_tool_processor.registry import initialize
async def main():
await initialize()
processor = ToolProcessor()
async for event in processor.astream('<tool name="file_processor" args=\'{"file_path":"README.md"}\'/>'):
# 'event' is a streamed chunk (either your Result model instance or a dict)
line = event["line"] if isinstance(event, dict) else getattr(event, "line", None)
content = event["content"] if isinstance(event, dict) else getattr(event, "content", None)
print(f"Line {line}: {content}")
asyncio.run(main())
```
### Using the Processor
#### Basic Usage
Call `await initialize()` once at startup to load your registry.
```python
import asyncio
from chuk_tool_processor.core.processor import ToolProcessor
from chuk_tool_processor.registry import initialize
async def main():
await initialize()
processor = ToolProcessor()
llm_output = '<tool name="calculator" args=\'{"operation":"add","a":2,"b":3}\'/>'
results = await processor.process(llm_output)
for result in results:
if result.error:
print(f"Error: {result.error}")
else:
print(f"Success: {result.result}")
asyncio.run(main())
```
#### Production Configuration
```python
from chuk_tool_processor.core.processor import ToolProcessor
processor = ToolProcessor(
# Execution settings
default_timeout=30.0,
max_concurrency=20,
# Production features
enable_caching=True,
cache_ttl=600,
enable_rate_limiting=True,
global_rate_limit=100,
enable_retries=True,
max_retries=3
)
```
### Advanced Production Features
Beyond basic configuration, CHUK Tool Processor includes several advanced features for production environments:
#### Circuit Breaker Pattern
Prevent cascading failures by automatically opening circuits for failing tools:
```python
from chuk_tool_processor.core.processor import ToolProcessor
processor = ToolProcessor(
enable_circuit_breaker=True,
circuit_breaker_threshold=5, # Open after 5 failures
circuit_breaker_timeout=60.0, # Try recovery after 60s
)
# Circuit states: CLOSED → OPEN → HALF_OPEN → CLOSED
# - CLOSED: Normal operation
# - OPEN: Blocking requests (too many failures)
# - HALF_OPEN: Testing recovery with limited requests
```
**How it works:**
1. Tool fails repeatedly (hits threshold)
2. Circuit opens → requests blocked immediately
3. After timeout, circuit enters HALF_OPEN
4. If test requests succeed → circuit closes
5. If test requests fail → back to OPEN
**Benefits:**
- Prevents wasting resources on failing services
- Fast-fail for better UX
- Automatic recovery detection
#### Idempotency Keys
Automatically deduplicate LLM tool calls using SHA256-based keys:
```python
from chuk_tool_processor.models.tool_call import ToolCall
# Idempotency keys are auto-generated
call1 = ToolCall(tool="search", arguments={"query": "Python"})
call2 = ToolCall(tool="search", arguments={"query": "Python"})
# Same arguments = same idempotency key
assert call1.idempotency_key == call2.idempotency_key
# Used automatically by caching layer
processor = ToolProcessor(enable_caching=True)
results1 = await processor.execute([call1]) # Executes
results2 = await processor.execute([call2]) # Cache hit!
```
**Benefits:**
- Prevents duplicate executions from LLM retries
- Deterministic cache keys
- No manual key management needed
#### Tool Schema Export
Export tool definitions to multiple formats for LLM prompting:
```python
from chuk_tool_processor.models.tool_spec import ToolSpec, ToolCapability
from chuk_tool_processor.models.validated_tool import ValidatedTool
@register_tool(name="weather")
class WeatherTool(ValidatedTool):
"""Get current weather for a location."""
class Arguments(BaseModel):
location: str = Field(..., description="City name")
class Result(BaseModel):
temperature: float
conditions: str
# Generate tool spec
spec = ToolSpec.from_validated_tool(WeatherTool)
# Export to different formats
openai_format = spec.to_openai() # For OpenAI function calling
anthropic_format = spec.to_anthropic() # For Claude tools
mcp_format = spec.to_mcp() # For MCP servers
# Example OpenAI format:
# {
# "type": "function",
# "function": {
# "name": "weather",
# "description": "Get current weather for a location.",
# "parameters": {...} # JSON Schema
# }
# }
```
**Use cases:**
- Generate tool definitions for LLM system prompts
- Documentation generation
- API contract validation
- Cross-platform tool sharing
#### Machine-Readable Error Codes
Structured error handling with error codes for programmatic responses:
```python
from chuk_tool_processor.core.exceptions import (
ErrorCode,
ToolNotFoundError,
ToolTimeoutError,
ToolCircuitOpenError,
)
try:
results = await processor.process(llm_output)
except ToolNotFoundError as e:
if e.code == ErrorCode.TOOL_NOT_FOUND:
# Suggest available tools to LLM
available = e.details.get("available_tools", [])
print(f"Try one of: {available}")
except ToolTimeoutError as e:
if e.code == ErrorCode.TOOL_TIMEOUT:
# Inform LLM to use faster alternative
timeout = e.details["timeout"]
print(f"Tool timed out after {timeout}s")
except ToolCircuitOpenError as e:
if e.code == ErrorCode.TOOL_CIRCUIT_OPEN:
# Tell LLM this service is temporarily down
reset_time = e.details.get("reset_timeout")
print(f"Service unavailable, retry in {reset_time}s")
# All errors include .to_dict() for logging
error_dict = e.to_dict()
# {
# "error": "ToolCircuitOpenError",
# "code": "TOOL_CIRCUIT_OPEN",
# "message": "Tool 'api_tool' circuit breaker is open...",
# "details": {"tool_name": "api_tool", "failure_count": 5, ...}
# }
```
**Available error codes:**
- `TOOL_NOT_FOUND` - Tool doesn't exist in registry
- `TOOL_EXECUTION_FAILED` - Tool execution error
- `TOOL_TIMEOUT` - Tool exceeded timeout
- `TOOL_CIRCUIT_OPEN` - Circuit breaker is open
- `TOOL_RATE_LIMITED` - Rate limit exceeded
- `TOOL_VALIDATION_ERROR` - Argument validation failed
- `MCP_CONNECTION_FAILED` - MCP server unreachable
- Plus 11 more for comprehensive error handling
#### LLM-Friendly Argument Coercion
Automatically coerce LLM outputs to correct types:
```python
from chuk_tool_processor.models.validated_tool import ValidatedTool
class SearchTool(ValidatedTool):
class Arguments(BaseModel):
query: str
limit: int = 10
category: str = "all"
# Pydantic config for LLM outputs:
# - str_strip_whitespace=True → Remove accidental whitespace
# - extra="ignore" → Ignore unknown fields
# - use_enum_values=True → Convert enums to values
# - coerce_numbers_to_str=False → Keep type strictness
# LLM outputs often have quirks:
llm_output = {
"query": " Python tutorials ", # Extra whitespace
"limit": "5", # String instead of int
"unknown_field": "ignored" # Extra field
}
# ValidatedTool automatically coerces and validates
tool = SearchTool()
result = await tool.execute(**llm_output)
# ✅ Works! Whitespace stripped, "5" → 5, extra field ignored
```
## Advanced Topics
### Using Subprocess Strategy
Use `SubprocessStrategy` when running untrusted, third-party, or potentially unsafe code that shouldn't share the same process as your main app.
For isolation and safety when running untrusted code:
```python
import asyncio
from chuk_tool_processor.core.processor import ToolProcessor
from chuk_tool_processor.execution.strategies.subprocess_strategy import SubprocessStrategy
from chuk_tool_processor.registry import get_default_registry
async def main():
registry = await get_default_registry()
processor = ToolProcessor(
strategy=SubprocessStrategy(
registry=registry,
max_workers=4,
default_timeout=30.0
)
)
# Use processor...
asyncio.run(main())
```
### Real-World MCP Examples
#### Example 1: Notion Integration with OAuth
Complete OAuth flow connecting to Notion's MCP server:
```python
from chuk_tool_processor.mcp import setup_mcp_http_streamable
# After completing OAuth flow (see examples/notion_oauth.py for full flow)
processor, manager = await setup_mcp_http_streamable(
servers=[{
"name": "notion",
"url": "https://mcp.notion.com/mcp",
"headers": {"Authorization": f"Bearer {access_token}"}
}],
namespace="notion",
initialization_timeout=120.0
)
# Get available Notion tools
tools = manager.get_all_tools()
print(f"Available tools: {[t['name'] for t in tools]}")
# Use Notion tools in your LLM workflow
results = await processor.process(
'<tool name="notion.search_pages" args=\'{"query": "Q4 planning"}\'/>'
)
```
#### Example 2: Local SQLite Database Access
Run SQLite MCP server locally for database operations:
```python
from chuk_tool_processor.mcp import setup_mcp_stdio
import json
# Configure SQLite server
config = {
"mcpServers": {
"sqlite": {
"command": "uvx",
"args": ["mcp-server-sqlite", "--db-path", "./data/app.db"],
"transport": "stdio"
}
}
}
with open("mcp_config.json", "w") as f:
json.dump(config, f)
# Connect to local database
processor, manager = await setup_mcp_stdio(
config_file="mcp_config.json",
servers=["sqlite"],
namespace="db",
initialization_timeout=120.0 # First run downloads mcp-server-sqlite
)
# Query your database via LLM
results = await processor.process(
'<tool name="db.query" args=\'{"sql": "SELECT COUNT(*) FROM users"}\'/>'
)
```
#### Example 3: Simple STDIO Echo Server
Minimal example for testing STDIO transport:
```python
from chuk_tool_processor.mcp import setup_mcp_stdio
import json
# Configure echo server (great for testing)
config = {
"mcpServers": {
"echo": {
"command": "uvx",
"args": ["chuk-mcp-echo", "stdio"],
"transport": "stdio"
}
}
}
with open("echo_config.json", "w") as f:
json.dump(config, f)
processor, manager = await setup_mcp_stdio(
config_file="echo_config.json",
servers=["echo"],
namespace="echo",
initialization_timeout=60.0
)
# Test echo functionality
results = await processor.process(
'<tool name="echo.echo" args=\'{"message": "Hello MCP!"}\'/>'
)
```
See `examples/notion_oauth.py`, `examples/stdio_sqlite.py`, and `examples/stdio_echo.py` for complete working implementations.
#### OAuth Token Refresh
For MCP servers that use OAuth authentication, CHUK Tool Processor supports automatic token refresh when access tokens expire. This prevents your tools from failing due to expired tokens during long-running sessions.
**How it works:**
1. When a tool call receives an OAuth-related error (e.g., "invalid_token", "expired token", "unauthorized")
2. The processor automatically calls your refresh callback
3. Updates the authentication headers with the new token
4. Retries the tool call with fresh credentials
**Setup with HTTP Streamable:**
```python
from chuk_tool_processor.mcp import setup_mcp_http_streamable
async def refresh_oauth_token():
"""Called automatically when tokens expire."""
# Your token refresh logic here
# Return dict with new Authorization header
new_token = await your_refresh_logic()
return {"Authorization": f"Bearer {new_token}"}
processor, manager = await setup_mcp_http_streamable(
servers=[{
"name": "notion",
"url": "https://mcp.notion.com/mcp",
"headers": {"Authorization": f"Bearer {initial_access_token}"}
}],
namespace="notion",
oauth_refresh_callback=refresh_oauth_token # Enable auto-refresh
)
```
**Setup with SSE:**
```python
from chuk_tool_processor.mcp import setup_mcp_sse
async def refresh_oauth_token():
"""Refresh expired OAuth token."""
# Exchange refresh token for new access token
new_access_token = await exchange_refresh_token(refresh_token)
return {"Authorization": f"Bearer {new_access_token}"}
processor, manager = await setup_mcp_sse(
servers=[{
"name": "atlassian",
"url": "https://mcp.atlassian.com/v1/sse",
"headers": {"Authorization": f"Bearer {initial_token}"}
}],
namespace="atlassian",
oauth_refresh_callback=refresh_oauth_token
)
```
**OAuth errors detected automatically:**
- `invalid_token`
- `expired token`
- `OAuth validation failed`
- `unauthorized`
- `token expired`
- `authentication failed`
- `invalid access token`
**Important notes:**
- The refresh callback must return a dict with an `Authorization` key
- If refresh fails or returns invalid headers, the original error is returned
- Token refresh is attempted only once per tool call (no infinite retry loops)
- After successful refresh, the updated headers are used for all subsequent calls
See `examples/notion_oauth.py` for a complete OAuth 2.1 implementation with PKCE and automatic token refresh.
### Observability
#### Structured Logging
Enable JSON logging for production observability:
```python
import asyncio
from chuk_tool_processor.logging import setup_logging, get_logger
async def main():
await setup_logging(
level="INFO",
structured=True, # JSON output (structured=False for human-readable)
log_file="tool_processor.log"
)
logger = get_logger("my_app")
logger.info("logging ready")
asyncio.run(main())
```
When `structured=True`, logs are output as JSON. When `structured=False`, they're human-readable text.
Example JSON log output:
```json
{
"timestamp": "2025-01-15T10:30:45.123Z",
"level": "INFO",
"tool": "calculator",
"status": "success",
"duration_ms": 4.2,
"cached": false,
"attempts": 1
}
```
#### Automatic Metrics
Metrics are automatically collected for:
- ✅ Tool execution (success/failure rates, duration)
- ✅ Cache performance (hit/miss rates)
- ✅ Parser accuracy (which parsers succeeded)
- ✅ Retry attempts (how many retries per tool)
Access metrics programmatically:
```python
import asyncio
from chuk_tool_processor.logging import metrics
async def main():
# Metrics are logged automatically, but you can also access them
await metrics.log_tool_execution(
tool="custom_tool",
success=True,
duration=1.5,
cached=False,
attempts=1
)
asyncio.run(main())
```
#### OpenTelemetry & Prometheus (Drop-in Observability)
**3-Line Setup:**
```python
from chuk_tool_processor.observability import setup_observability
setup_observability(
service_name="my-tool-service",
enable_tracing=True, # → OpenTelemetry traces
enable_metrics=True, # → Prometheus metrics at :9090/metrics
metrics_port=9090
)
# That's it! Every tool execution is now automatically traced and metered.
```
**What you get automatically:**
- ✅ Distributed traces (Jaeger, Zipkin, any OTLP collector)
- ✅ Prometheus metrics (error rate, latency P50/P95/P99, cache hit rate)
- ✅ Circuit breaker state monitoring
- ✅ Retry attempt tracking
- ✅ Zero code changes to your tools
**Why Telemetry Matters**: In production, you need to know *what* your tools are doing, *how long* they take, *when* they fail, and *why*. CHUK Tool Processor provides **enterprise-grade telemetry** that operations teams expect—with zero manual instrumentation.
**What You Get (Automatically)**
✅ **Distributed Traces** - Understand exactly what happened in each tool call
- See the complete execution timeline for every tool
- Track retries, cache hits, circuit breaker state changes
- Correlate failures across your system
- Export to Jaeger, Zipkin, or any OTLP-compatible backend
✅ **Production Metrics** - Monitor health and performance in real-time
- Track error rates, latency percentiles (P50/P95/P99)
- Monitor cache hit rates and retry attempts
- Alert on circuit breaker opens and rate limit hits
- Export to Prometheus, Grafana, or any metrics backend
✅ **Zero Configuration** - Works out of the box
- No manual instrumentation needed
- No code changes to existing tools
- Gracefully degrades if packages not installed
- Standard OTEL and Prometheus formats
**Installation**
```bash
# Install observability dependencies
pip install chuk-tool-processor[observability]
# Or manually
pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp prometheus-client
# Or with uv (recommended)
uv pip install chuk-tool-processor --group observability
```
> **⚠️ SRE Note**: Observability packages are **optional**. If not installed, all observability calls are no-ops—your tools run normally without tracing/metrics. Zero crashes, zero warnings. Safe to deploy without observability dependencies.
**Quick Start: See Your Tools in Action**
```python
import asyncio
from chuk_tool_processor.observability import setup_observability
from chuk_tool_processor.core.processor import ToolProcessor
from chuk_tool_processor.registry import initialize, register_tool
@register_tool(name="weather_api")
class WeatherTool:
async def execute(self, location: str) -> dict:
# Simulating API call
return {"temperature": 72, "conditions": "sunny", "location": location}
async def main():
# 1. Enable observability (one line!)
setup_observability(
service_name="weather-service",
enable_tracing=True,
enable_metrics=True,
metrics_port=9090
)
# 2. Create processor with production features
await initialize()
processor = ToolProcessor(
enable_caching=True, # Cache expensive API calls
enable_retries=True, # Auto-retry on failures
enable_circuit_breaker=True, # Prevent cascading failures
enable_rate_limiting=True, # Prevent API abuse
)
# 3. Execute tools - automatically traced and metered
results = await processor.process(
'<tool name="weather_api" args=\'{"location": "San Francisco"}\'/>'
)
print(f"Result: {results[0].result}")
print(f"Duration: {results[0].duration}s")
print(f"Cached: {results[0].cached}")
asyncio.run(main())
```
**View Your Data**
```bash
# Start Jaeger for trace visualization
docker run -d -p 4317:4317 -p 16686:16686 jaegertracing/all-in-one:latest
# Start your application
python your_app.py
# View distributed traces
open http://localhost:16686
# View Prometheus metrics
curl http://localhost:9090/metrics | grep tool_
```
**What Gets Traced (Automatic Spans)**
Every execution layer creates standardized OpenTelemetry spans:
| Span Name | When Created | Key Attributes |
|-----------|--------------|----------------|
| `tool.execute` | Every tool execution | `tool.name`, `tool.namespace`, `tool.duration_ms`, `tool.cached`, `tool.error`, `tool.success` |
| `tool.cache.lookup` | Cache lookup | `cache.hit` (true/false), `cache.operation=lookup` |
| `tool.cache.set` | Cache write | `cache.ttl`, `cache.operation=set` |
| `tool.retry.attempt` | Each retry | `retry.attempt`, `retry.max_attempts`, `retry.success` |
| `tool.circuit_breaker.check` | Circuit state check | `circuit.state` (CLOSED/OPEN/HALF_OPEN) |
| `tool.rate_limit.check` | Rate limit check | `rate_limit.allowed` (true/false) |
**Example trace hierarchy:**
```
tool.execute (weather_api)
├── tool.cache.lookup (miss)
├── tool.retry.attempt (0)
│ └── tool.execute (actual API call)
├── tool.retry.attempt (1) [if first failed]
└── tool.cache.set (store result)
```
**What Gets Metered (Automatic Metrics)**
Standard Prometheus metrics exposed at `/metrics`:
| Metric | Type | Labels | Use For |
|--------|------|--------|---------|
| `tool_executions_total` | Counter | `tool`, `namespace`, `status` | Error rate, request volume |
| `tool_execution_duration_seconds` | Histogram | `tool`, `namespace` | P50/P95/P99 latency |
| `tool_cache_operations_total` | Counter | `tool`, `operation`, `result` | Cache hit rate |
| `tool_retry_attempts_total` | Counter | `tool`, `attempt`, `success` | Retry frequency |
| `tool_circuit_breaker_state` | Gauge | `tool` | Circuit health (0=CLOSED, 1=OPEN, 2=HALF_OPEN) |
| `tool_circuit_breaker_failures_total` | Counter | `tool` | Failure count |
| `tool_rate_limit_checks_total` | Counter | `tool`, `allowed` | Rate limit hits |
**Useful PromQL Queries**
```promql
# Error rate per tool (last 5 minutes)
rate(tool_executions_total{status="error"}[5m])
/ rate(tool_executions_total[5m])
# P95 latency
histogram_quantile(0.95, rate(tool_execution_duration_seconds_bucket[5m]))
# Cache hit rate
rate(tool_cache_operations_total{result="hit"}[5m])
/ rate(tool_cache_operations_total{operation="lookup"}[5m])
# Tools currently circuit broken
tool_circuit_breaker_state == 1
# Retry rate (how often tools need retries)
rate(tool_retry_attempts_total{attempt!="0"}[5m])
/ rate(tool_executions_total[5m])
```
**Configuration**
Configure via environment variables:
```bash
# OTLP endpoint (where traces are sent)
export OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
# Service name (shown in traces)
export OTEL_SERVICE_NAME=production-api
# Sampling (reduce overhead in high-traffic scenarios)
export OTEL_TRACES_SAMPLER=traceidratio
export OTEL_TRACES_SAMPLER_ARG=0.1 # Sample 10% of traces
```
Or in code:
```python
status = setup_observability(
service_name="my-service",
enable_tracing=True,
enable_metrics=True,
metrics_port=9090,
metrics_host="0.0.0.0" # Allow external Prometheus scraping
)
# Check status
if status["tracing_enabled"]:
print("Traces exporting to OTLP endpoint")
if status["metrics_server_started"]:
print("Metrics available at http://localhost:9090/metrics")
```
**Production Integration**
**With Grafana + Prometheus:**
```yaml
# prometheus.yml
scrape_configs:
- job_name: 'chuk-tool-processor'
scrape_interval: 15s
static_configs:
- targets: ['app:9090']
```
**With OpenTelemetry Collector:**
```yaml
# otel-collector-config.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
exporters:
jaeger:
endpoint: jaeger:14250
prometheus:
endpoint: 0.0.0.0:8889
service:
pipelines:
traces:
receivers: [otlp]
exporters: [jaeger]
```
**With Cloud Providers:**
```bash
# AWS X-Ray
export OTEL_TRACES_SAMPLER=xray
# Google Cloud Trace
export OTEL_EXPORTER_OTLP_ENDPOINT=https://cloudtrace.googleapis.com/v1/projects/PROJECT_ID/traces
# Datadog
export OTEL_EXPORTER_OTLP_ENDPOINT=http://datadog-agent:4317
```
**Why This Matters**
❌ **Without telemetry:**
- "Why is this tool slow?" → No idea
- "Is caching helping?" → Guessing
- "Did that retry work?" → Check logs manually
- "Is the circuit breaker working?" → Hope so
- "Which tool is failing?" → Debug blindly
✅ **With telemetry:**
- See exact execution timeline in Jaeger
- Monitor cache hit rate in Grafana
- Alert when retry rate spikes
- Dashboard shows circuit breaker states
- Metrics pinpoint the failing tool immediately
**Learn More**
📖 **Complete Guide**: See [`OBSERVABILITY.md`](OBSERVABILITY.md) for:
- Complete span and metric specifications
- Architecture and implementation details
- Integration guides (Jaeger, Grafana, OTEL Collector)
- Testing observability features
- Environment variable configuration
🎯 **Working Example**: See `examples/observability_demo.py` for a complete demonstration with retries, caching, and circuit breakers
**Benefits**
✅ **Drop-in** - One function call, zero code changes
✅ **Automatic** - All execution layers instrumented
✅ **Standard** - OTEL + Prometheus (works with existing tools)
✅ **Production-ready** - Ops teams get exactly what they expect
✅ **Optional** - Gracefully degrades if packages not installed
✅ **Zero-overhead** - No performance impact when disabled
### Error Handling
```python
results = await processor.process(llm_output)
for result in results:
if result.error:
print(f"Tool '{result.tool}' failed: {result.error}")
print(f"Duration: {result.duration}s")
else:
print(f"Tool '{result.tool}' succeeded: {result.result}")
```
### Testing Tools
```python
import pytest
from chuk_tool_processor.core.processor import ToolProcessor
from chuk_tool_processor.registry import initialize
@pytest.mark.asyncio
async def test_calculator():
await initialize()
processor = ToolProcessor()
results = await processor.process(
'<tool name="calculator" args=\'{"operation": "add", "a": 5, "b": 3}\'/>'
)
assert results[0].result["result"] == 8
```
## Configuration
### Timeout Configuration
CHUK Tool Processor uses a unified timeout configuration system that applies to all MCP transports (HTTP Streamable, SSE, STDIO) and the StreamManager. Instead of managing dozens of individual timeout values, there are just **4 logical timeout categories**:
```python
from chuk_tool_processor.mcp.transport import TimeoutConfig
# Create custom timeout configuration
timeout_config = TimeoutConfig(
connect=30.0, # Connection establishment, initialization, session discovery
operation=30.0, # Normal operations (tool calls, listing tools/resources/prompts)
quick=5.0, # Fast health checks and pings
shutdown=2.0 # Cleanup and shutdown operations
)
```
**Using timeout configuration with StreamManager:**
```python
from chuk_tool_processor.mcp.stream_manager import StreamManager
from chuk_tool_processor.mcp.transport import TimeoutConfig
# Create StreamManager with custom timeouts
timeout_config = TimeoutConfig(
connect=60.0, # Longer for slow initialization
operation=45.0, # Longer for heavy operations
quick=3.0, # Faster health checks
shutdown=5.0 # More time for cleanup
)
manager = StreamManager(timeout_config=timeout_config)
```
**Timeout categories explained:**
| Category | Default | Used For | Examples |
|----------|---------|----------|----------|
| `connect` | 30.0s | Connection setup, initialization, discovery | HTTP connection, SSE session discovery, STDIO subprocess launch |
| `operation` | 30.0s | Normal tool operations | Tool calls, listing tools/resources/prompts, get_tools() |
| `quick` | 5.0s | Fast health/status checks | Ping operations, health checks |
| `shutdown` | 2.0s | Cleanup and teardown | Transport close, connection cleanup |
**Why this matters:**
- ✅ **Simple**: 4 timeout values instead of 20+
- ✅ **Consistent**: Same timeout behavior across all transports
- ✅ **Configurable**: Adjust timeouts based on your environment (slow networks, large datasets, etc.)
- ✅ **Type-safe**: Pydantic validation ensures correct values
**Example: Adjusting for slow environments**
```python
from chuk_tool_processor.mcp import setup_mcp_stdio
from chuk_tool_processor.mcp.transport import TimeoutConfig
# For slow network or resource-constrained environments
slow_timeouts = TimeoutConfig(
connect=120.0, # Allow more time for package downloads
operation=60.0, # Allow more time for heavy operations
quick=10.0, # Be patient with health checks
shutdown=10.0 # Allow thorough cleanup
)
processor, manager = await setup_mcp_stdio(
config_file="mcp_config.json",
servers=["sqlite"],
namespace="db",
initialization_timeout=120.0
)
# Set custom timeouts on the manager
manager.timeout_config = slow_timeouts
```
### Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `CHUK_TOOL_REGISTRY_PROVIDER` | `memory` | Registry backend |
| `CHUK_DEFAULT_TIMEOUT` | `30.0` | Default timeout (seconds) |
| `CHUK_LOG_LEVEL` | `INFO` | Logging level |
| `CHUK_STRUCTURED_LOGGING` | `true` | Enable JSON logging |
| `MCP_BEARER_TOKEN` | - | Bearer token for MCP SSE |
### ToolProcessor Options
```python
processor = ToolProcessor(
default_timeout=30.0, # Timeout per tool
max_concurrency=10, # Max concurrent executions
enable_caching=True, # Result caching
cache_ttl=300, # Cache TTL (seconds)
enable_rate_limiting=False, # Rate limiting
global_rate_limit=None, # (requests per minute) global cap
enable_retries=True, # Auto-retry failures
max_retries=3, # Max retry attempts
# Optional per-tool rate limits: {"tool.name": (requests, per_seconds)}
tool_rate_limits=None
)
```
### Performance & Tuning
| Parameter | Default | When to Adjust |
|-----------|---------|----------------|
| `default_timeout` | `30.0` | Increase for slow tools (e.g., AI APIs) |
| `max_concurrency` | `10` | Increase for I/O-bound tools, decrease for CPU-bound |
| `enable_caching` | `True` | Keep on for deterministic tools |
| `cache_ttl` | `300` | Longer for stable data, shorter for real-time |
| `enable_rate_limiting` | `False` | Enable when hitting API rate limits |
| `global_rate_limit` | `None` | Set a global requests/min cap across all tools |
| `enable_retries` | `True` | Disable for non-idempotent operations |
| `max_retries` | `3` | Increase for flaky external APIs |
| `tool_rate_limits` | `None` | Dict mapping tool name → (max_requests, window_seconds). Overrides `global_rate_limit` per tool |
**Per-tool rate limiting example:**
```python
processor = ToolProcessor(
enable_rate_limiting=True,
global_rate_limit=100, # 100 requests/minute across all tools
tool_rate_limits={
"notion.search_pages": (10, 60), # 10 requests per 60 seconds
"expensive_api": (5, 60), # 5 requests per minute
"local_tool": (1000, 60), # 1000 requests per minute (local is fast)
}
)
```
### Security Model
CHUK Tool Processor provides multiple layers of safety:
| Concern | Protection | Configuration |
|---------|------------|---------------|
| **Timeouts** | Every tool has a timeout | `default_timeout=30.0` |
| **Process Isolation** | Run tools in separate processes | `strategy=SubprocessStrategy()` |
| **Rate Limiting** | Prevent abuse and API overuse | `enable_rate_limiting=True` |
| **Input Validation** | Pydantic validation on arguments | Use `ValidatedTool` |
| **Error Containment** | Failures don't crash the processor | Built-in exception handling |
| **Retry Limits** | Prevent infinite retry loops | `max_retries=3` |
**Important Security Notes:**
- **Environment Variables**: Subprocess strategy inherits the parent process environment by default. For stricter isolation, use container-level controls (Docker, cgroups).
- **Network Access**: Tools inherit network access from the host. For network isolation, use OS-level sandboxing (containers, network namespaces, firewalls).
- **Resource Limits**: For hard CPU/memory caps, use OS-level controls (cgroups on Linux, Job Objects on Windows, or Docker resource limits).
- **Secrets**: Never injected automatically. Pass secrets explicitly via tool arguments or environment variables, and prefer scoped env vars for subprocess tools to minimize exposure.
Example security-focused setup for untrusted code:
```python
import asyncio
from chuk_tool_processor.core.processor import ToolProcessor
from chuk_tool_processor.execution.strategies.subprocess_strategy import SubprocessStrategy
from chuk_tool_processor.registry import get_default_registry
async def create_secure_processor():
# Maximum isolation for untrusted code
# Runs each tool in a separate process
registry = await get_default_registry()
processor = ToolProcessor(
strategy=SubprocessStrategy(
registry=registry,
max_workers=4,
default_timeout=10.0
),
default_timeout=10.0,
enable_rate_limiting=True,
global_rate_limit=50, # 50 requests/minute
max_retries=2
)
return processor
# For even stricter isolation:
# - Run the entire processor inside a Docker container with resource limits
# - Use network policies to restrict outbound connections
# - Use read-only filesystems where possible
```
## Architecture Principles
1. **Composability**: Stack strategies and wrappers like middleware
2. **Async-First**: Built for `async/await` from the ground up
3. **Production-Ready**: Timeouts, retries, caching, rate limiting—all built-in
4. **Pluggable**: Parsers, strategies, transports—swap components as needed
5. **Observable**: Structured logging and metrics collection throughout
## Examples
Check out the [`examples/`](examples/) directory for complete working examples:
### Getting Started
- **60-second hello**: `examples/hello_tool.py` - Absolute minimal example (copy-paste-run)
- **Quick start**: `examples/quickstart_demo.py` - Basic tool registration and execution
- **Execution strategies**: `examples/execution_strategies_demo.py` - InProcess vs Subprocess
- **Production wrappers**: `examples/wrappers_demo.py` - Caching, retries, rate limiting
- **Streaming tools**: `examples/streaming_demo.py` - Real-time incremental results
- **Streaming tool calls**: `examples/streaming_tool_calls_demo.py` - Handle partial tool calls from streaming LLMs
- **Schema helper**: `examples/schema_helper_demo.py` - Auto-generate schemas from typed tools (Pydantic → OpenAI/Anthropic/MCP)
- **Observability**: `examples/observability_demo.py` - OpenTelemetry + Prometheus integration
### MCP Integration (Real-World)
- **Notion + OAuth**: `examples/notion_oauth.py` - Complete OAuth 2.1 flow with HTTP Streamable
- Shows: Authorization Server discovery, client registration, PKCE flow, token exchange
- **SQLite Local**: `examples/stdio_sqlite.py` - Local database access via STDIO
- Shows: Command/args passing, environment variables, file paths, initialization timeouts
- **Echo Server**: `examples/stdio_echo.py` - Minimal STDIO transport example
- Shows: Simplest possible MCP integration for testing
- **Atlassian + OAuth**: `examples/atlassian_sse.py` - OAuth with SSE transport (legacy)
### Advanced MCP
- **HTTP Streamable**: `examples/mcp_http_streamable_example.py`
- **STDIO**: `examples/mcp_stdio_example.py`
- **SSE**: `examples/mcp_sse_example.py`
- **Plugin system**: `examples/plugins_builtins_demo.py`, `examples/plugins_custom_parser_demo.py`
## FAQ
**Q: What happens if a tool takes too long?**
A: The tool is cancelled after `default_timeout` seconds and returns an error result. The processor continues with other tools.
**Q: Can I mix local and remote (MCP) tools?**
A: Yes! Register local tools first, then use `setup_mcp_*` to add remote tools. They all work in the same processor.
**Q: How do I handle malformed LLM outputs?**
A: The processor is resilient—invalid tool calls are logged and return error results without crashing.
**Q: What about API rate limits?**
A: Use `enable_rate_limiting=True` and set `tool_rate_limits` per tool or `global_rate_limit` for all tools.
**Q: Can tools return files or binary data?**
A: Yes—tools can return any JSON-serializable data including base64-encoded files, URLs, or structured data.
**Q: How do I test my tools?**
A: Use pytest with `@pytest.mark.asyncio`. See [Testing Tools](#testing-tools) for examples.
**Q: Does this work with streaming LLM responses?**
A: Yes—as tool calls appear in the stream, extract and process them. The processor handles partial/incremental tool call lists.
**Q: What's the difference between InProcess and Subprocess strategies?**
A: InProcess is faster (same process), Subprocess is safer (isolated process). Use InProcess for trusted code, Subprocess for untrusted.
## Comparison with Other Tools
| Feature | chuk-tool-processor | LangChain Tools | OpenAI Tools | MCP SDK |
|---------|-------------------|-----------------|--------------|---------|
| **Async-native** | ✅ | ⚠️ Partial | ✅ | ✅ |
| **Process isolation** | ✅ SubprocessStrategy | ❌ | ❌ | ⚠️ |
| **Built-in retries** | ✅ | ❌ † | ❌ | ❌ |
| **Rate limiting** | ✅ | ❌ † | ⚠️ ‡ | ❌ |
| **Caching** | ✅ | ⚠️ † | ❌ ‡ | ❌ |
| **Multiple parsers** | ✅ (XML, OpenAI, JSON) | ⚠️ | ✅ | ✅ |
| **Streaming tools** | ✅ | ⚠️ | ⚠️ | ✅ |
| **MCP integration** | ✅ All transports | ❌ | ❌ | ✅ (protocol only) |
| **Zero-config start** | ✅ | ❌ | ✅ | ⚠️ |
| **Production-ready** | ✅ Timeouts, metrics | ⚠️ | ⚠️ | ⚠️ |
**Notes:**
- † LangChain offers caching and rate-limiting through separate libraries (`langchain-cache`, external rate limiters), but they're not core features.
- ‡ OpenAI Tools can be combined with external rate limiters and caches, but tool execution itself doesn't include these features.
**When to use chuk-tool-processor:**
- You need production-ready tool execution (timeouts, retries, caching)
- You want to connect to MCP servers (local or remote)
- You need to run untrusted code safely (subprocess isolation)
- You're building a custom LLM application (not using a framework)
**When to use alternatives:**
- **LangChain**: You want a full-featured LLM framework with chains, agents, and memory
- **OpenAI Tools**: You only use OpenAI and don't need advanced execution features
- **MCP SDK**: You're building an MCP server, not a client
## Related Projects
- **[chuk-mcp](https://github.com/chrishayuk/chuk-mcp)**: Low-level Model Context Protocol client
- Powers the MCP transport layer in chuk-tool-processor
- Use directly if you need protocol-level control
- Use chuk-tool-processor if you want high-level tool execution
## Development & Publishing
### For Contributors
Development setup:
```bash
# Clone repository
git clone https://github.com/chrishayuk/chuk-tool-processor.git
cd chuk-tool-processor
# Install development dependencies
uv sync --dev
# Run tests
make test
# Run all quality checks
make check
```
### For Maintainers: Publishing Releases
The project uses **fully automated CI/CD** for releases. Publishing is as simple as:
```bash
# 1. Bump version
make bump-patch # or bump-minor, bump-major
# 2. Commit version change
git add pyproject.toml
git commit -m "version X.Y.Z"
git push
# 3. Create release (automated)
make publish
```
This will:
- Create and push a git tag
- Trigger GitHub Actions to create a release with auto-generated changelog
- Run tests across all platforms and Python versions
- Build and publish to PyPI automatically
For detailed release documentation, see:
- **[RELEASING.md](RELEASING.md)** - Complete release process guide
- **[docs/CI-CD.md](docs/CI-CD.md)** - Full CI/CD pipeline documentation
## Contributing & Support
- **GitHub**: [chrishayuk/chuk-tool-processor](https://github.com/chrishayuk/chuk-tool-processor)
- **Issues**: [Report bugs and request features](https://github.com/chrishayuk/chuk-tool-processor/issues)
- **Discussions**: [Community discussions](https://github.com/chrishayuk/chuk-tool-processor/discussions)
- **License**: MIT
---
**Remember**: CHUK Tool Processor is the missing link between LLM outputs and reliable tool execution. It's not trying to be everything—it's trying to be the best at one thing: processing tool calls in production.
Built with ❤️ by the CHUK AI team for the LLM tool integration community.
Raw data
{
"_id": null,
"home_page": null,
"name": "chuk-tool-processor",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.11",
"maintainer_email": "CHUK Team <chrishayuk@somejunkmailbox.com>",
"keywords": "llm, tools, async, ai, openai, mcp, model-context-protocol, tool-calling, function-calling",
"author": null,
"author_email": "CHUK Team <chrishayuk@somejunkmailbox.com>",
"download_url": "https://files.pythonhosted.org/packages/c9/6e/8ad269c72743aa9b4926c7baffa1c0c79aaa52b4f81ea0af0886ca7f0316/chuk_tool_processor-0.9.7.tar.gz",
"platform": null,
"description": "# CHUK Tool Processor\n\n[](https://pypi.org/project/chuk-tool-processor/)\n[](https://pypi.org/project/chuk-tool-processor/)\n[](LICENSE)\n\n**The missing link between LLM tool calls and reliable execution.**\n\nCHUK Tool Processor is a focused, production-ready framework that solves one problem exceptionally well: **processing tool calls from LLM outputs**. It's not a chatbot framework or LLM orchestration platform\u2014it's the glue layer that bridges LLM responses and actual tool execution.\n\n## The Problem\n\nWhen you build LLM applications, you face a gap:\n\n1. **LLM generates tool calls** in various formats (XML tags, OpenAI `tool_calls`, JSON)\n2. **??? Mystery step ???** where you need to:\n - Parse those calls reliably\n - Handle timeouts, retries, failures\n - Cache expensive results\n - Rate limit API calls\n - Run untrusted code safely\n - Connect to external tool servers\n - Log everything for debugging\n3. **Get results back** to continue the LLM conversation\n\nMost frameworks give you steps 1 and 3, but step 2 is where the complexity lives. CHUK Tool Processor **is** step 2.\n\n## Why chuk-tool-processor?\n\n### It's a Building Block, Not a Framework\n\nUnlike full-fledged LLM frameworks (LangChain, LlamaIndex, etc.), CHUK Tool Processor:\n\n- \u2705 **Does one thing well**: Process tool calls reliably\n- \u2705 **Plugs into any LLM app**: Works with any framework or no framework\n- \u2705 **Composable by design**: Stack strategies and wrappers like middleware\n- \u2705 **No opinions about your LLM**: Bring your own OpenAI, Anthropic, local model\n- \u274c **Doesn't manage conversations**: That's your job\n- \u274c **Doesn't do prompt engineering**: Use whatever prompting you want\n- \u274c **Doesn't bundle an LLM client**: Use any client library you prefer\n\n### It's Built for Production\n\nResearch code vs production code is about handling the edges:\n\n- **Timeouts**: Every tool execution has proper timeout handling\n- **Retries**: Automatic retry with exponential backoff and deadline awareness\n- **Rate Limiting**: Global and per-tool rate limits with sliding windows\n- **Caching**: Intelligent result caching with TTL and idempotency key support\n- **Circuit Breakers**: Prevent cascading failures with automatic fault detection\n- **Error Handling**: Machine-readable error codes with structured details\n- **Observability**: Structured logging, metrics, request tracing\n- **Safety**: Subprocess isolation for untrusted code\n- **Type Safety**: Pydantic validation with LLM-friendly argument coercion\n- **Tool Discovery**: Formal schema export (OpenAI, Anthropic, MCP formats)\n\n### It's About Stacks\n\nCHUK Tool Processor uses a **composable stack architecture**:\n\n```\n\u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510\n\u2502 Your LLM Application \u2502\n\u2502 (handles prompts, responses) \u2502\n\u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518\n \u2502 tool calls\n \u25bc\n\u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510\n\u2502 Caching Wrapper \u2502 \u2190 Cache expensive results (idempotency keys)\n\u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2524\n\u2502 Rate Limiting Wrapper \u2502 \u2190 Prevent API abuse\n\u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2524\n\u2502 Retry Wrapper \u2502 \u2190 Handle transient failures (exponential backoff)\n\u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2524\n\u2502 Circuit Breaker Wrapper \u2502 \u2190 Prevent cascading failures (CLOSED/OPEN/HALF_OPEN)\n\u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2524\n\u2502 Execution Strategy \u2502 \u2190 How to run tools\n\u2502 \u2022 InProcess (fast) \u2502\n\u2502 \u2022 Subprocess (isolated) \u2502\n\u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2524\n\u2502 Tool Registry \u2502 \u2190 Your registered tools\n\u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518\n```\n\nEach layer is **optional** and **configurable**. Mix and match what you need.\n\n## Compatibility Matrix\n\n| Component | Supported Versions | Notes |\n|-----------|-------------------|-------|\n| **Python** | 3.11, 3.12, 3.13 | Python 3.11+ required |\n| **Operating Systems** | macOS, Linux, Windows | All platforms fully supported |\n| **LLM Providers** | OpenAI, Anthropic, Local models | Any LLM that outputs tool calls |\n| **MCP Transports** | HTTP Streamable, STDIO, SSE | All MCP 1.0 transports |\n| **MCP Servers** | Notion, SQLite, Atlassian, Echo, Custom | Any MCP-compliant server |\n\n**Tested Configurations:**\n- \u2705 macOS 14+ (Apple Silicon & Intel)\n- \u2705 Ubuntu 20.04+ / Debian 11+\n- \u2705 Windows 10+ (native & WSL2)\n- \u2705 Python 3.11.0+, 3.12.0+, 3.13.0+\n- \u2705 OpenAI GPT-4, GPT-4 Turbo\n- \u2705 Anthropic Claude 3 (Opus, Sonnet, Haiku)\n- \u2705 Local models (Ollama, LM Studio)\n\n## Quick Start\n\n### Installation\n\n**Prerequisites:** Python 3.11+ \u2022 Works on macOS, Linux, Windows\n\n```bash\n# Using pip\npip install chuk-tool-processor\n\n# Using uv (recommended)\nuv pip install chuk-tool-processor\n\n# Or from source\ngit clone https://github.com/chrishayuk/chuk-tool-processor.git\ncd chuk-tool-processor\nuv pip install -e .\n```\n\n## 60-Second Quick Start\n\n**Absolutely minimal example** \u2192 See `examples/hello_tool.py`:\n\n```bash\npython examples/hello_tool.py\n```\n\nSingle file that demonstrates:\n- Registering a tool\n- Parsing OpenAI & Anthropic formats\n- Executing and getting results\n\nTakes 60 seconds to understand, 3 minutes to master.\n\n### 3-Minute Example\n\nCopy-paste this into a file and run it:\n\n```python\nimport asyncio\nfrom chuk_tool_processor.core.processor import ToolProcessor\nfrom chuk_tool_processor.registry import initialize, register_tool\n\n# Step 1: Define a tool\n@register_tool(name=\"calculator\")\nclass Calculator:\n async def execute(self, operation: str, a: float, b: float) -> dict:\n ops = {\"add\": a + b, \"multiply\": a * b, \"subtract\": a - b}\n if operation not in ops:\n raise ValueError(f\"Unsupported operation: {operation}\")\n return {\"result\": ops[operation]}\n\n# Step 2: Process LLM output\nasync def main():\n await initialize()\n processor = ToolProcessor()\n\n # Your LLM returned this tool call\n llm_output = '<tool name=\"calculator\" args=\\'{\"operation\": \"multiply\", \"a\": 15, \"b\": 23}\\'/>'\n\n # Process it\n results = await processor.process(llm_output)\n\n # Each result is a ToolExecutionResult with: tool, args, result, error, duration, cached\n # results[0].result contains the tool output\n # results[0].error contains any error message (None if successful)\n if results[0].error:\n print(f\"Error: {results[0].error}\")\n else:\n print(results[0].result) # {'result': 345}\n\nasyncio.run(main())\n```\n\n**That's it.** You now have production-ready tool execution with timeouts, retries, and caching.\n\n> **Why not just use OpenAI tool calls?**\n> OpenAI's function calling is great for parsing, but you still need: parsing multiple formats (Anthropic XML, etc.), timeouts, retries, rate limits, caching, subprocess isolation, and connecting to external MCP servers. CHUK Tool Processor **is** that missing middle layer.\n\n## Documentation Quick Reference\n\n| Document | What It Covers |\n|----------|----------------|\n| \ud83d\udcd8 [CONFIGURATION.md](docs/CONFIGURATION.md) | **All config knobs & defaults**: ToolProcessor options, timeouts, retry policy, rate limits, circuit breakers, caching, environment variables |\n| \ud83d\udea8 [ERRORS.md](docs/ERRORS.md) | **Error taxonomy**: All error codes, exception classes, error details structure, handling patterns, retryability guide |\n| \ud83d\udcca [OBSERVABILITY.md](docs/OBSERVABILITY.md) | **Metrics & tracing**: OpenTelemetry setup, Prometheus metrics, spans reference, PromQL queries |\n| \ud83d\udd0c [examples/hello_tool.py](examples/hello_tool.py) | **60-second starter**: Single-file, copy-paste-and-run example |\n| \ud83c\udfaf [examples/](examples/) | **20+ working examples**: MCP integration, OAuth flows, streaming, production patterns |\n\n## Choose Your Path\n\n| Your Goal | What You Need | Where to Look |\n|-----------|---------------|---------------|\n| \u2615 **Just process LLM tool calls** | Basic tool registration + processor | [60-Second Quick Start](#60-second-quick-start) |\n| \ud83d\udd0c **Connect to external tools** | MCP integration (HTTP/STDIO/SSE) | [MCP Integration](#5-mcp-integration-external-tools) |\n| \ud83d\udee1\ufe0f **Production deployment** | Timeouts, retries, rate limits, caching | [CONFIGURATION.md](docs/CONFIGURATION.md) |\n| \ud83d\udd12 **Run untrusted code safely** | Subprocess isolation strategy | [Subprocess Strategy](#using-subprocess-strategy) |\n| \ud83d\udcca **Monitor and observe** | OpenTelemetry + Prometheus | [OBSERVABILITY.md](docs/OBSERVABILITY.md) |\n| \ud83c\udf0a **Stream incremental results** | StreamingTool pattern | [StreamingTool](#streamingtool-real-time-results) |\n| \ud83d\udea8 **Handle errors reliably** | Error codes & taxonomy | [ERRORS.md](docs/ERRORS.md) |\n\n### Real-World Quick Start\n\nHere are the most common patterns you'll use:\n\n**Pattern 1: Local tools only**\n```python\nimport asyncio\nfrom chuk_tool_processor.core.processor import ToolProcessor\nfrom chuk_tool_processor.registry import initialize, register_tool\n\n@register_tool(name=\"my_tool\")\nclass MyTool:\n async def execute(self, arg: str) -> dict:\n return {\"result\": f\"Processed: {arg}\"}\n\nasync def main():\n await initialize()\n processor = ToolProcessor()\n\n llm_output = '<tool name=\"my_tool\" args=\\'{\"arg\": \"hello\"}\\'/>'\n results = await processor.process(llm_output)\n print(results[0].result) # {'result': 'Processed: hello'}\n\nasyncio.run(main())\n```\n\n**Pattern 2: Mix local + remote MCP tools (Notion)**\n```python\nimport asyncio\nfrom chuk_tool_processor.registry import initialize, register_tool\nfrom chuk_tool_processor.mcp import setup_mcp_http_streamable\n\n@register_tool(name=\"local_calculator\")\nclass Calculator:\n async def execute(self, a: int, b: int) -> int:\n return a + b\n\nasync def main():\n # Register local tools first\n await initialize()\n\n # Then add Notion MCP tools (requires OAuth token)\n processor, manager = await setup_mcp_http_streamable(\n servers=[{\n \"name\": \"notion\",\n \"url\": \"https://mcp.notion.com/mcp\",\n \"headers\": {\"Authorization\": f\"Bearer {access_token}\"}\n }],\n namespace=\"notion\",\n initialization_timeout=120.0\n )\n\n # Now you have both local and remote tools!\n results = await processor.process('''\n <tool name=\"local_calculator\" args='{\"a\": 5, \"b\": 3}'/>\n <tool name=\"notion.search_pages\" args='{\"query\": \"project docs\"}'/>\n ''')\n print(f\"Local result: {results[0].result}\")\n print(f\"Notion result: {results[1].result}\")\n\nasyncio.run(main())\n```\n\nSee `examples/notion_oauth.py` for complete OAuth flow.\n\n**Pattern 3: Local SQLite database via STDIO**\n```python\nimport asyncio\nimport json\nfrom chuk_tool_processor.mcp import setup_mcp_stdio\n\nasync def main():\n # Configure SQLite MCP server (runs locally)\n config = {\n \"mcpServers\": {\n \"sqlite\": {\n \"command\": \"uvx\",\n \"args\": [\"mcp-server-sqlite\", \"--db-path\", \"./app.db\"],\n \"transport\": \"stdio\"\n }\n }\n }\n\n with open(\"mcp_config.json\", \"w\") as f:\n json.dump(config, f)\n\n processor, manager = await setup_mcp_stdio(\n config_file=\"mcp_config.json\",\n servers=[\"sqlite\"],\n namespace=\"db\",\n initialization_timeout=120.0 # First run downloads the package\n )\n\n # Query your local database via MCP\n results = await processor.process(\n '<tool name=\"db.query\" args=\\'{\"sql\": \"SELECT * FROM users LIMIT 10\"}\\'/>'\n )\n print(results[0].result)\n\nasyncio.run(main())\n```\n\nSee `examples/stdio_sqlite.py` for complete working example.\n\n## Core Concepts\n\n### 1. Tool Registry\n\nThe **registry** is where you register tools for execution. Tools can be:\n\n- **Simple classes** with an `async execute()` method\n- **ValidatedTool** subclasses with Pydantic validation\n- **StreamingTool** for real-time incremental results\n- **Functions** registered via `register_fn_tool()`\n\n```python\nfrom chuk_tool_processor.registry import register_tool\nfrom chuk_tool_processor.models.validated_tool import ValidatedTool\nfrom pydantic import BaseModel, Field\n\n@register_tool(name=\"weather\")\nclass WeatherTool(ValidatedTool):\n class Arguments(BaseModel):\n location: str = Field(..., description=\"City name\")\n units: str = Field(\"celsius\", description=\"Temperature units\")\n\n class Result(BaseModel):\n temperature: float\n conditions: str\n\n async def _execute(self, location: str, units: str) -> Result:\n # Your weather API logic here\n return self.Result(temperature=22.5, conditions=\"Sunny\")\n```\n\n### 2. Execution Strategies\n\n**Strategies** determine *how* tools run:\n\n| Strategy | Use Case | Trade-offs |\n|----------|----------|------------|\n| **InProcessStrategy** | Fast, trusted tools | Speed \u2705, Isolation \u274c |\n| **SubprocessStrategy** | Untrusted or risky code | Isolation \u2705, Speed \u274c |\n\n```python\nimport asyncio\nfrom chuk_tool_processor.core.processor import ToolProcessor\nfrom chuk_tool_processor.execution.strategies.subprocess_strategy import SubprocessStrategy\nfrom chuk_tool_processor.registry import get_default_registry\n\nasync def main():\n registry = await get_default_registry()\n processor = ToolProcessor(\n strategy=SubprocessStrategy(\n registry=registry,\n max_workers=4,\n default_timeout=30.0\n )\n )\n # Use processor...\n\nasyncio.run(main())\n```\n\n### 3. Execution Wrappers (Middleware)\n\n**Wrappers** add production features as composable layers:\n\n```python\nprocessor = ToolProcessor(\n enable_caching=True, # Cache expensive calls\n cache_ttl=600, # 10 minutes\n enable_rate_limiting=True, # Prevent abuse\n global_rate_limit=100, # 100 req/min globally\n enable_retries=True, # Auto-retry failures\n max_retries=3 # Up to 3 attempts\n)\n```\n\nThe processor stacks them automatically: **Cache \u2192 Rate Limit \u2192 Retry \u2192 Strategy \u2192 Tool**\n\n### 4. Input Parsers (Plugins)\n\n**Parsers** extract tool calls from various LLM output formats:\n\n**XML Tags (Anthropic-style)**\n```xml\n<tool name=\"search\" args='{\"query\": \"Python\"}'/>\n```\n\n**OpenAI `tool_calls` (JSON)**\n```json\n{\n \"tool_calls\": [\n {\n \"type\": \"function\",\n \"function\": {\n \"name\": \"search\",\n \"arguments\": \"{\\\"query\\\": \\\"Python\\\"}\"\n }\n }\n ]\n}\n```\n\n**Direct JSON (array of calls)**\n```json\n[\n { \"tool\": \"search\", \"arguments\": { \"query\": \"Python\" } }\n]\n```\n\nAll formats work automatically\u2014no configuration needed.\n\n**Input Format Compatibility:**\n\n| Format | Example | Use Case |\n|--------|---------|----------|\n| **XML Tool Tag** | `<tool name=\"search\" args='{\"q\":\"Python\"}'/>`| Anthropic Claude, XML-based LLMs |\n| **OpenAI tool_calls** | JSON object (above) | OpenAI GPT-4 function calling |\n| **Direct JSON** | `[{\"tool\": \"search\", \"arguments\": {\"q\": \"Python\"}}]` | Generic API integrations |\n| **Single dict** | `{\"tool\": \"search\", \"arguments\": {\"q\": \"Python\"}}` | Programmatic calls |\n\n### 5. MCP Integration (External Tools)\n\nConnect to **remote tool servers** using the [Model Context Protocol](https://modelcontextprotocol.io). CHUK Tool Processor supports three transport mechanisms for different use cases:\n\n#### HTTP Streamable (\u2b50 Recommended for Cloud Services)\n\nModern HTTP streaming transport for cloud-based MCP servers like Notion:\n\n```python\nfrom chuk_tool_processor.mcp import setup_mcp_http_streamable\n\n# Connect to Notion MCP with OAuth\nservers = [\n {\n \"name\": \"notion\",\n \"url\": \"https://mcp.notion.com/mcp\",\n \"headers\": {\"Authorization\": f\"Bearer {access_token}\"}\n }\n]\n\nprocessor, manager = await setup_mcp_http_streamable(\n servers=servers,\n namespace=\"notion\",\n initialization_timeout=120.0, # Some services need time to initialize\n enable_caching=True,\n enable_retries=True\n)\n\n# Use Notion tools through MCP\nresults = await processor.process(\n '<tool name=\"notion.search_pages\" args=\\'{\"query\": \"meeting notes\"}\\'/>'\n)\n```\n\n#### STDIO (Best for Local/On-Device Tools)\n\nFor running local MCP servers as subprocesses\u2014great for databases, file systems, and local tools:\n\n```python\nfrom chuk_tool_processor.mcp import setup_mcp_stdio\nimport json\n\n# Configure SQLite MCP server\nconfig = {\n \"mcpServers\": {\n \"sqlite\": {\n \"command\": \"uvx\",\n \"args\": [\"mcp-server-sqlite\", \"--db-path\", \"/path/to/database.db\"],\n \"env\": {\"MCP_SERVER_NAME\": \"sqlite\"},\n \"transport\": \"stdio\"\n }\n }\n}\n\n# Save config to file\nwith open(\"mcp_config.json\", \"w\") as f:\n json.dump(config, f)\n\n# Connect to local SQLite server\nprocessor, manager = await setup_mcp_stdio(\n config_file=\"mcp_config.json\",\n servers=[\"sqlite\"],\n namespace=\"db\",\n initialization_timeout=120.0 # First run downloads packages\n)\n\n# Query your local database via MCP\nresults = await processor.process(\n '<tool name=\"db.query\" args=\\'{\"sql\": \"SELECT * FROM users LIMIT 10\"}\\'/>'\n)\n```\n\n#### SSE (Legacy Support)\n\nFor backward compatibility with older MCP servers using Server-Sent Events:\n\n```python\nfrom chuk_tool_processor.mcp import setup_mcp_sse\n\n# Connect to Atlassian with OAuth via SSE\nservers = [\n {\n \"name\": \"atlassian\",\n \"url\": \"https://mcp.atlassian.com/v1/sse\",\n \"headers\": {\"Authorization\": f\"Bearer {access_token}\"}\n }\n]\n\nprocessor, manager = await setup_mcp_sse(\n servers=servers,\n namespace=\"atlassian\",\n initialization_timeout=120.0\n)\n```\n\n**Transport Comparison:**\n\n| Transport | Use Case | Real Examples |\n|-----------|----------|---------------|\n| **HTTP Streamable** | Cloud APIs, SaaS services | Notion (`mcp.notion.com`) |\n| **STDIO** | Local tools, databases | SQLite (`mcp-server-sqlite`), Echo (`chuk-mcp-echo`) |\n| **SSE** | Legacy cloud services | Atlassian (`mcp.atlassian.com`) |\n\n**Relationship with [chuk-mcp](https://github.com/chrishayuk/chuk-mcp):**\n- `chuk-mcp` is a low-level MCP protocol client (handles transports, protocol negotiation)\n- `chuk-tool-processor` wraps `chuk-mcp` to integrate external tools into your execution pipeline\n- You can use local tools, remote MCP tools, or both in the same processor\n\n## Getting Started\n\n### Creating Tools\n\nCHUK Tool Processor supports multiple patterns for defining tools:\n\n#### Simple Function-Based Tools\n```python\nfrom chuk_tool_processor.registry.auto_register import register_fn_tool\nfrom datetime import datetime\nfrom zoneinfo import ZoneInfo\n\ndef get_current_time(timezone: str = \"UTC\") -> str:\n \"\"\"Get the current time in the specified timezone.\"\"\"\n now = datetime.now(ZoneInfo(timezone))\n return now.strftime(\"%Y-%m-%d %H:%M:%S %Z\")\n\n# Register the function as a tool (sync \u2014 no await needed)\nregister_fn_tool(get_current_time, namespace=\"utilities\")\n```\n\n#### ValidatedTool (Pydantic Type Safety)\n\nFor production tools, use Pydantic validation:\n\n```python\n@register_tool(name=\"weather\")\nclass WeatherTool(ValidatedTool):\n class Arguments(BaseModel):\n location: str = Field(..., description=\"City name\")\n units: str = Field(\"celsius\", description=\"Temperature units\")\n\n class Result(BaseModel):\n temperature: float\n conditions: str\n\n async def _execute(self, location: str, units: str) -> Result:\n return self.Result(temperature=22.5, conditions=\"Sunny\")\n```\n\n#### StreamingTool (Real-time Results)\n\nFor long-running operations that produce incremental results:\n\n```python\nfrom chuk_tool_processor.models import StreamingTool\n\n@register_tool(name=\"file_processor\")\nclass FileProcessor(StreamingTool):\n class Arguments(BaseModel):\n file_path: str\n\n class Result(BaseModel):\n line: int\n content: str\n\n async def _stream_execute(self, file_path: str):\n with open(file_path) as f:\n for i, line in enumerate(f, 1):\n yield self.Result(line=i, content=line.strip())\n```\n\n**Consuming streaming results:**\n\n```python\nimport asyncio\nfrom chuk_tool_processor.core.processor import ToolProcessor\nfrom chuk_tool_processor.registry import initialize\n\nasync def main():\n await initialize()\n processor = ToolProcessor()\n async for event in processor.astream('<tool name=\"file_processor\" args=\\'{\"file_path\":\"README.md\"}\\'/>'):\n # 'event' is a streamed chunk (either your Result model instance or a dict)\n line = event[\"line\"] if isinstance(event, dict) else getattr(event, \"line\", None)\n content = event[\"content\"] if isinstance(event, dict) else getattr(event, \"content\", None)\n print(f\"Line {line}: {content}\")\n\nasyncio.run(main())\n```\n\n### Using the Processor\n\n#### Basic Usage\n\nCall `await initialize()` once at startup to load your registry.\n\n```python\nimport asyncio\nfrom chuk_tool_processor.core.processor import ToolProcessor\nfrom chuk_tool_processor.registry import initialize\n\nasync def main():\n await initialize()\n processor = ToolProcessor()\n llm_output = '<tool name=\"calculator\" args=\\'{\"operation\":\"add\",\"a\":2,\"b\":3}\\'/>'\n results = await processor.process(llm_output)\n for result in results:\n if result.error:\n print(f\"Error: {result.error}\")\n else:\n print(f\"Success: {result.result}\")\n\nasyncio.run(main())\n```\n\n#### Production Configuration\n\n```python\nfrom chuk_tool_processor.core.processor import ToolProcessor\n\nprocessor = ToolProcessor(\n # Execution settings\n default_timeout=30.0,\n max_concurrency=20,\n\n # Production features\n enable_caching=True,\n cache_ttl=600,\n enable_rate_limiting=True,\n global_rate_limit=100,\n enable_retries=True,\n max_retries=3\n)\n```\n\n### Advanced Production Features\n\nBeyond basic configuration, CHUK Tool Processor includes several advanced features for production environments:\n\n#### Circuit Breaker Pattern\n\nPrevent cascading failures by automatically opening circuits for failing tools:\n\n```python\nfrom chuk_tool_processor.core.processor import ToolProcessor\n\nprocessor = ToolProcessor(\n enable_circuit_breaker=True,\n circuit_breaker_threshold=5, # Open after 5 failures\n circuit_breaker_timeout=60.0, # Try recovery after 60s\n)\n\n# Circuit states: CLOSED \u2192 OPEN \u2192 HALF_OPEN \u2192 CLOSED\n# - CLOSED: Normal operation\n# - OPEN: Blocking requests (too many failures)\n# - HALF_OPEN: Testing recovery with limited requests\n```\n\n**How it works:**\n1. Tool fails repeatedly (hits threshold)\n2. Circuit opens \u2192 requests blocked immediately\n3. After timeout, circuit enters HALF_OPEN\n4. If test requests succeed \u2192 circuit closes\n5. If test requests fail \u2192 back to OPEN\n\n**Benefits:**\n- Prevents wasting resources on failing services\n- Fast-fail for better UX\n- Automatic recovery detection\n\n#### Idempotency Keys\n\nAutomatically deduplicate LLM tool calls using SHA256-based keys:\n\n```python\nfrom chuk_tool_processor.models.tool_call import ToolCall\n\n# Idempotency keys are auto-generated\ncall1 = ToolCall(tool=\"search\", arguments={\"query\": \"Python\"})\ncall2 = ToolCall(tool=\"search\", arguments={\"query\": \"Python\"})\n\n# Same arguments = same idempotency key\nassert call1.idempotency_key == call2.idempotency_key\n\n# Used automatically by caching layer\nprocessor = ToolProcessor(enable_caching=True)\nresults1 = await processor.execute([call1]) # Executes\nresults2 = await processor.execute([call2]) # Cache hit!\n```\n\n**Benefits:**\n- Prevents duplicate executions from LLM retries\n- Deterministic cache keys\n- No manual key management needed\n\n#### Tool Schema Export\n\nExport tool definitions to multiple formats for LLM prompting:\n\n```python\nfrom chuk_tool_processor.models.tool_spec import ToolSpec, ToolCapability\nfrom chuk_tool_processor.models.validated_tool import ValidatedTool\n\n@register_tool(name=\"weather\")\nclass WeatherTool(ValidatedTool):\n \"\"\"Get current weather for a location.\"\"\"\n\n class Arguments(BaseModel):\n location: str = Field(..., description=\"City name\")\n\n class Result(BaseModel):\n temperature: float\n conditions: str\n\n# Generate tool spec\nspec = ToolSpec.from_validated_tool(WeatherTool)\n\n# Export to different formats\nopenai_format = spec.to_openai() # For OpenAI function calling\nanthropic_format = spec.to_anthropic() # For Claude tools\nmcp_format = spec.to_mcp() # For MCP servers\n\n# Example OpenAI format:\n# {\n# \"type\": \"function\",\n# \"function\": {\n# \"name\": \"weather\",\n# \"description\": \"Get current weather for a location.\",\n# \"parameters\": {...} # JSON Schema\n# }\n# }\n```\n\n**Use cases:**\n- Generate tool definitions for LLM system prompts\n- Documentation generation\n- API contract validation\n- Cross-platform tool sharing\n\n#### Machine-Readable Error Codes\n\nStructured error handling with error codes for programmatic responses:\n\n```python\nfrom chuk_tool_processor.core.exceptions import (\n ErrorCode,\n ToolNotFoundError,\n ToolTimeoutError,\n ToolCircuitOpenError,\n)\n\ntry:\n results = await processor.process(llm_output)\nexcept ToolNotFoundError as e:\n if e.code == ErrorCode.TOOL_NOT_FOUND:\n # Suggest available tools to LLM\n available = e.details.get(\"available_tools\", [])\n print(f\"Try one of: {available}\")\nexcept ToolTimeoutError as e:\n if e.code == ErrorCode.TOOL_TIMEOUT:\n # Inform LLM to use faster alternative\n timeout = e.details[\"timeout\"]\n print(f\"Tool timed out after {timeout}s\")\nexcept ToolCircuitOpenError as e:\n if e.code == ErrorCode.TOOL_CIRCUIT_OPEN:\n # Tell LLM this service is temporarily down\n reset_time = e.details.get(\"reset_timeout\")\n print(f\"Service unavailable, retry in {reset_time}s\")\n\n# All errors include .to_dict() for logging\nerror_dict = e.to_dict()\n# {\n# \"error\": \"ToolCircuitOpenError\",\n# \"code\": \"TOOL_CIRCUIT_OPEN\",\n# \"message\": \"Tool 'api_tool' circuit breaker is open...\",\n# \"details\": {\"tool_name\": \"api_tool\", \"failure_count\": 5, ...}\n# }\n```\n\n**Available error codes:**\n- `TOOL_NOT_FOUND` - Tool doesn't exist in registry\n- `TOOL_EXECUTION_FAILED` - Tool execution error\n- `TOOL_TIMEOUT` - Tool exceeded timeout\n- `TOOL_CIRCUIT_OPEN` - Circuit breaker is open\n- `TOOL_RATE_LIMITED` - Rate limit exceeded\n- `TOOL_VALIDATION_ERROR` - Argument validation failed\n- `MCP_CONNECTION_FAILED` - MCP server unreachable\n- Plus 11 more for comprehensive error handling\n\n#### LLM-Friendly Argument Coercion\n\nAutomatically coerce LLM outputs to correct types:\n\n```python\nfrom chuk_tool_processor.models.validated_tool import ValidatedTool\n\nclass SearchTool(ValidatedTool):\n class Arguments(BaseModel):\n query: str\n limit: int = 10\n category: str = \"all\"\n\n # Pydantic config for LLM outputs:\n # - str_strip_whitespace=True \u2192 Remove accidental whitespace\n # - extra=\"ignore\" \u2192 Ignore unknown fields\n # - use_enum_values=True \u2192 Convert enums to values\n # - coerce_numbers_to_str=False \u2192 Keep type strictness\n\n# LLM outputs often have quirks:\nllm_output = {\n \"query\": \" Python tutorials \", # Extra whitespace\n \"limit\": \"5\", # String instead of int\n \"unknown_field\": \"ignored\" # Extra field\n}\n\n# ValidatedTool automatically coerces and validates\ntool = SearchTool()\nresult = await tool.execute(**llm_output)\n# \u2705 Works! Whitespace stripped, \"5\" \u2192 5, extra field ignored\n```\n\n## Advanced Topics\n\n### Using Subprocess Strategy\n\nUse `SubprocessStrategy` when running untrusted, third-party, or potentially unsafe code that shouldn't share the same process as your main app.\n\nFor isolation and safety when running untrusted code:\n\n```python\nimport asyncio\nfrom chuk_tool_processor.core.processor import ToolProcessor\nfrom chuk_tool_processor.execution.strategies.subprocess_strategy import SubprocessStrategy\nfrom chuk_tool_processor.registry import get_default_registry\n\nasync def main():\n registry = await get_default_registry()\n processor = ToolProcessor(\n strategy=SubprocessStrategy(\n registry=registry,\n max_workers=4,\n default_timeout=30.0\n )\n )\n # Use processor...\n\nasyncio.run(main())\n```\n\n### Real-World MCP Examples\n\n#### Example 1: Notion Integration with OAuth\n\nComplete OAuth flow connecting to Notion's MCP server:\n\n```python\nfrom chuk_tool_processor.mcp import setup_mcp_http_streamable\n\n# After completing OAuth flow (see examples/notion_oauth.py for full flow)\nprocessor, manager = await setup_mcp_http_streamable(\n servers=[{\n \"name\": \"notion\",\n \"url\": \"https://mcp.notion.com/mcp\",\n \"headers\": {\"Authorization\": f\"Bearer {access_token}\"}\n }],\n namespace=\"notion\",\n initialization_timeout=120.0\n)\n\n# Get available Notion tools\ntools = manager.get_all_tools()\nprint(f\"Available tools: {[t['name'] for t in tools]}\")\n\n# Use Notion tools in your LLM workflow\nresults = await processor.process(\n '<tool name=\"notion.search_pages\" args=\\'{\"query\": \"Q4 planning\"}\\'/>'\n)\n```\n\n#### Example 2: Local SQLite Database Access\n\nRun SQLite MCP server locally for database operations:\n\n```python\nfrom chuk_tool_processor.mcp import setup_mcp_stdio\nimport json\n\n# Configure SQLite server\nconfig = {\n \"mcpServers\": {\n \"sqlite\": {\n \"command\": \"uvx\",\n \"args\": [\"mcp-server-sqlite\", \"--db-path\", \"./data/app.db\"],\n \"transport\": \"stdio\"\n }\n }\n}\n\nwith open(\"mcp_config.json\", \"w\") as f:\n json.dump(config, f)\n\n# Connect to local database\nprocessor, manager = await setup_mcp_stdio(\n config_file=\"mcp_config.json\",\n servers=[\"sqlite\"],\n namespace=\"db\",\n initialization_timeout=120.0 # First run downloads mcp-server-sqlite\n)\n\n# Query your database via LLM\nresults = await processor.process(\n '<tool name=\"db.query\" args=\\'{\"sql\": \"SELECT COUNT(*) FROM users\"}\\'/>'\n)\n```\n\n#### Example 3: Simple STDIO Echo Server\n\nMinimal example for testing STDIO transport:\n\n```python\nfrom chuk_tool_processor.mcp import setup_mcp_stdio\nimport json\n\n# Configure echo server (great for testing)\nconfig = {\n \"mcpServers\": {\n \"echo\": {\n \"command\": \"uvx\",\n \"args\": [\"chuk-mcp-echo\", \"stdio\"],\n \"transport\": \"stdio\"\n }\n }\n}\n\nwith open(\"echo_config.json\", \"w\") as f:\n json.dump(config, f)\n\nprocessor, manager = await setup_mcp_stdio(\n config_file=\"echo_config.json\",\n servers=[\"echo\"],\n namespace=\"echo\",\n initialization_timeout=60.0\n)\n\n# Test echo functionality\nresults = await processor.process(\n '<tool name=\"echo.echo\" args=\\'{\"message\": \"Hello MCP!\"}\\'/>'\n)\n```\n\nSee `examples/notion_oauth.py`, `examples/stdio_sqlite.py`, and `examples/stdio_echo.py` for complete working implementations.\n\n#### OAuth Token Refresh\n\nFor MCP servers that use OAuth authentication, CHUK Tool Processor supports automatic token refresh when access tokens expire. This prevents your tools from failing due to expired tokens during long-running sessions.\n\n**How it works:**\n1. When a tool call receives an OAuth-related error (e.g., \"invalid_token\", \"expired token\", \"unauthorized\")\n2. The processor automatically calls your refresh callback\n3. Updates the authentication headers with the new token\n4. Retries the tool call with fresh credentials\n\n**Setup with HTTP Streamable:**\n\n```python\nfrom chuk_tool_processor.mcp import setup_mcp_http_streamable\n\nasync def refresh_oauth_token():\n \"\"\"Called automatically when tokens expire.\"\"\"\n # Your token refresh logic here\n # Return dict with new Authorization header\n new_token = await your_refresh_logic()\n return {\"Authorization\": f\"Bearer {new_token}\"}\n\nprocessor, manager = await setup_mcp_http_streamable(\n servers=[{\n \"name\": \"notion\",\n \"url\": \"https://mcp.notion.com/mcp\",\n \"headers\": {\"Authorization\": f\"Bearer {initial_access_token}\"}\n }],\n namespace=\"notion\",\n oauth_refresh_callback=refresh_oauth_token # Enable auto-refresh\n)\n```\n\n**Setup with SSE:**\n\n```python\nfrom chuk_tool_processor.mcp import setup_mcp_sse\n\nasync def refresh_oauth_token():\n \"\"\"Refresh expired OAuth token.\"\"\"\n # Exchange refresh token for new access token\n new_access_token = await exchange_refresh_token(refresh_token)\n return {\"Authorization\": f\"Bearer {new_access_token}\"}\n\nprocessor, manager = await setup_mcp_sse(\n servers=[{\n \"name\": \"atlassian\",\n \"url\": \"https://mcp.atlassian.com/v1/sse\",\n \"headers\": {\"Authorization\": f\"Bearer {initial_token}\"}\n }],\n namespace=\"atlassian\",\n oauth_refresh_callback=refresh_oauth_token\n)\n```\n\n**OAuth errors detected automatically:**\n- `invalid_token`\n- `expired token`\n- `OAuth validation failed`\n- `unauthorized`\n- `token expired`\n- `authentication failed`\n- `invalid access token`\n\n**Important notes:**\n- The refresh callback must return a dict with an `Authorization` key\n- If refresh fails or returns invalid headers, the original error is returned\n- Token refresh is attempted only once per tool call (no infinite retry loops)\n- After successful refresh, the updated headers are used for all subsequent calls\n\nSee `examples/notion_oauth.py` for a complete OAuth 2.1 implementation with PKCE and automatic token refresh.\n\n### Observability\n\n#### Structured Logging\n\nEnable JSON logging for production observability:\n\n```python\nimport asyncio\nfrom chuk_tool_processor.logging import setup_logging, get_logger\n\nasync def main():\n await setup_logging(\n level=\"INFO\",\n structured=True, # JSON output (structured=False for human-readable)\n log_file=\"tool_processor.log\"\n )\n logger = get_logger(\"my_app\")\n logger.info(\"logging ready\")\n\nasyncio.run(main())\n```\n\nWhen `structured=True`, logs are output as JSON. When `structured=False`, they're human-readable text.\n\nExample JSON log output:\n\n```json\n{\n \"timestamp\": \"2025-01-15T10:30:45.123Z\",\n \"level\": \"INFO\",\n \"tool\": \"calculator\",\n \"status\": \"success\",\n \"duration_ms\": 4.2,\n \"cached\": false,\n \"attempts\": 1\n}\n```\n\n#### Automatic Metrics\n\nMetrics are automatically collected for:\n- \u2705 Tool execution (success/failure rates, duration)\n- \u2705 Cache performance (hit/miss rates)\n- \u2705 Parser accuracy (which parsers succeeded)\n- \u2705 Retry attempts (how many retries per tool)\n\nAccess metrics programmatically:\n\n```python\nimport asyncio\nfrom chuk_tool_processor.logging import metrics\n\nasync def main():\n # Metrics are logged automatically, but you can also access them\n await metrics.log_tool_execution(\n tool=\"custom_tool\",\n success=True,\n duration=1.5,\n cached=False,\n attempts=1\n )\n\nasyncio.run(main())\n```\n\n#### OpenTelemetry & Prometheus (Drop-in Observability)\n\n**3-Line Setup:**\n\n```python\nfrom chuk_tool_processor.observability import setup_observability\n\nsetup_observability(\n service_name=\"my-tool-service\",\n enable_tracing=True, # \u2192 OpenTelemetry traces\n enable_metrics=True, # \u2192 Prometheus metrics at :9090/metrics\n metrics_port=9090\n)\n# That's it! Every tool execution is now automatically traced and metered.\n```\n\n**What you get automatically:**\n- \u2705 Distributed traces (Jaeger, Zipkin, any OTLP collector)\n- \u2705 Prometheus metrics (error rate, latency P50/P95/P99, cache hit rate)\n- \u2705 Circuit breaker state monitoring\n- \u2705 Retry attempt tracking\n- \u2705 Zero code changes to your tools\n\n**Why Telemetry Matters**: In production, you need to know *what* your tools are doing, *how long* they take, *when* they fail, and *why*. CHUK Tool Processor provides **enterprise-grade telemetry** that operations teams expect\u2014with zero manual instrumentation.\n\n**What You Get (Automatically)**\n\n\u2705 **Distributed Traces** - Understand exactly what happened in each tool call\n- See the complete execution timeline for every tool\n- Track retries, cache hits, circuit breaker state changes\n- Correlate failures across your system\n- Export to Jaeger, Zipkin, or any OTLP-compatible backend\n\n\u2705 **Production Metrics** - Monitor health and performance in real-time\n- Track error rates, latency percentiles (P50/P95/P99)\n- Monitor cache hit rates and retry attempts\n- Alert on circuit breaker opens and rate limit hits\n- Export to Prometheus, Grafana, or any metrics backend\n\n\u2705 **Zero Configuration** - Works out of the box\n- No manual instrumentation needed\n- No code changes to existing tools\n- Gracefully degrades if packages not installed\n- Standard OTEL and Prometheus formats\n\n**Installation**\n\n```bash\n# Install observability dependencies\npip install chuk-tool-processor[observability]\n\n# Or manually\npip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp prometheus-client\n\n# Or with uv (recommended)\nuv pip install chuk-tool-processor --group observability\n```\n\n> **\u26a0\ufe0f SRE Note**: Observability packages are **optional**. If not installed, all observability calls are no-ops\u2014your tools run normally without tracing/metrics. Zero crashes, zero warnings. Safe to deploy without observability dependencies.\n\n**Quick Start: See Your Tools in Action**\n\n```python\nimport asyncio\nfrom chuk_tool_processor.observability import setup_observability\nfrom chuk_tool_processor.core.processor import ToolProcessor\nfrom chuk_tool_processor.registry import initialize, register_tool\n\n@register_tool(name=\"weather_api\")\nclass WeatherTool:\n async def execute(self, location: str) -> dict:\n # Simulating API call\n return {\"temperature\": 72, \"conditions\": \"sunny\", \"location\": location}\n\nasync def main():\n # 1. Enable observability (one line!)\n setup_observability(\n service_name=\"weather-service\",\n enable_tracing=True,\n enable_metrics=True,\n metrics_port=9090\n )\n\n # 2. Create processor with production features\n await initialize()\n processor = ToolProcessor(\n enable_caching=True, # Cache expensive API calls\n enable_retries=True, # Auto-retry on failures\n enable_circuit_breaker=True, # Prevent cascading failures\n enable_rate_limiting=True, # Prevent API abuse\n )\n\n # 3. Execute tools - automatically traced and metered\n results = await processor.process(\n '<tool name=\"weather_api\" args=\\'{\"location\": \"San Francisco\"}\\'/>'\n )\n\n print(f\"Result: {results[0].result}\")\n print(f\"Duration: {results[0].duration}s\")\n print(f\"Cached: {results[0].cached}\")\n\nasyncio.run(main())\n```\n\n**View Your Data**\n\n```bash\n# Start Jaeger for trace visualization\ndocker run -d -p 4317:4317 -p 16686:16686 jaegertracing/all-in-one:latest\n\n# Start your application\npython your_app.py\n\n# View distributed traces\nopen http://localhost:16686\n\n# View Prometheus metrics\ncurl http://localhost:9090/metrics | grep tool_\n```\n\n**What Gets Traced (Automatic Spans)**\n\nEvery execution layer creates standardized OpenTelemetry spans:\n\n| Span Name | When Created | Key Attributes |\n|-----------|--------------|----------------|\n| `tool.execute` | Every tool execution | `tool.name`, `tool.namespace`, `tool.duration_ms`, `tool.cached`, `tool.error`, `tool.success` |\n| `tool.cache.lookup` | Cache lookup | `cache.hit` (true/false), `cache.operation=lookup` |\n| `tool.cache.set` | Cache write | `cache.ttl`, `cache.operation=set` |\n| `tool.retry.attempt` | Each retry | `retry.attempt`, `retry.max_attempts`, `retry.success` |\n| `tool.circuit_breaker.check` | Circuit state check | `circuit.state` (CLOSED/OPEN/HALF_OPEN) |\n| `tool.rate_limit.check` | Rate limit check | `rate_limit.allowed` (true/false) |\n\n**Example trace hierarchy:**\n```\ntool.execute (weather_api)\n\u251c\u2500\u2500 tool.cache.lookup (miss)\n\u251c\u2500\u2500 tool.retry.attempt (0)\n\u2502 \u2514\u2500\u2500 tool.execute (actual API call)\n\u251c\u2500\u2500 tool.retry.attempt (1) [if first failed]\n\u2514\u2500\u2500 tool.cache.set (store result)\n```\n\n**What Gets Metered (Automatic Metrics)**\n\nStandard Prometheus metrics exposed at `/metrics`:\n\n| Metric | Type | Labels | Use For |\n|--------|------|--------|---------|\n| `tool_executions_total` | Counter | `tool`, `namespace`, `status` | Error rate, request volume |\n| `tool_execution_duration_seconds` | Histogram | `tool`, `namespace` | P50/P95/P99 latency |\n| `tool_cache_operations_total` | Counter | `tool`, `operation`, `result` | Cache hit rate |\n| `tool_retry_attempts_total` | Counter | `tool`, `attempt`, `success` | Retry frequency |\n| `tool_circuit_breaker_state` | Gauge | `tool` | Circuit health (0=CLOSED, 1=OPEN, 2=HALF_OPEN) |\n| `tool_circuit_breaker_failures_total` | Counter | `tool` | Failure count |\n| `tool_rate_limit_checks_total` | Counter | `tool`, `allowed` | Rate limit hits |\n\n**Useful PromQL Queries**\n\n```promql\n# Error rate per tool (last 5 minutes)\nrate(tool_executions_total{status=\"error\"}[5m])\n/ rate(tool_executions_total[5m])\n\n# P95 latency\nhistogram_quantile(0.95, rate(tool_execution_duration_seconds_bucket[5m]))\n\n# Cache hit rate\nrate(tool_cache_operations_total{result=\"hit\"}[5m])\n/ rate(tool_cache_operations_total{operation=\"lookup\"}[5m])\n\n# Tools currently circuit broken\ntool_circuit_breaker_state == 1\n\n# Retry rate (how often tools need retries)\nrate(tool_retry_attempts_total{attempt!=\"0\"}[5m])\n/ rate(tool_executions_total[5m])\n```\n\n**Configuration**\n\nConfigure via environment variables:\n\n```bash\n# OTLP endpoint (where traces are sent)\nexport OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317\n\n# Service name (shown in traces)\nexport OTEL_SERVICE_NAME=production-api\n\n# Sampling (reduce overhead in high-traffic scenarios)\nexport OTEL_TRACES_SAMPLER=traceidratio\nexport OTEL_TRACES_SAMPLER_ARG=0.1 # Sample 10% of traces\n```\n\nOr in code:\n\n```python\nstatus = setup_observability(\n service_name=\"my-service\",\n enable_tracing=True,\n enable_metrics=True,\n metrics_port=9090,\n metrics_host=\"0.0.0.0\" # Allow external Prometheus scraping\n)\n\n# Check status\nif status[\"tracing_enabled\"]:\n print(\"Traces exporting to OTLP endpoint\")\nif status[\"metrics_server_started\"]:\n print(\"Metrics available at http://localhost:9090/metrics\")\n```\n\n**Production Integration**\n\n**With Grafana + Prometheus:**\n```yaml\n# prometheus.yml\nscrape_configs:\n - job_name: 'chuk-tool-processor'\n scrape_interval: 15s\n static_configs:\n - targets: ['app:9090']\n```\n\n**With OpenTelemetry Collector:**\n```yaml\n# otel-collector-config.yaml\nreceivers:\n otlp:\n protocols:\n grpc:\n endpoint: 0.0.0.0:4317\n\nexporters:\n jaeger:\n endpoint: jaeger:14250\n prometheus:\n endpoint: 0.0.0.0:8889\n\nservice:\n pipelines:\n traces:\n receivers: [otlp]\n exporters: [jaeger]\n```\n\n**With Cloud Providers:**\n```bash\n# AWS X-Ray\nexport OTEL_TRACES_SAMPLER=xray\n\n# Google Cloud Trace\nexport OTEL_EXPORTER_OTLP_ENDPOINT=https://cloudtrace.googleapis.com/v1/projects/PROJECT_ID/traces\n\n# Datadog\nexport OTEL_EXPORTER_OTLP_ENDPOINT=http://datadog-agent:4317\n```\n\n**Why This Matters**\n\n\u274c **Without telemetry:**\n- \"Why is this tool slow?\" \u2192 No idea\n- \"Is caching helping?\" \u2192 Guessing\n- \"Did that retry work?\" \u2192 Check logs manually\n- \"Is the circuit breaker working?\" \u2192 Hope so\n- \"Which tool is failing?\" \u2192 Debug blindly\n\n\u2705 **With telemetry:**\n- See exact execution timeline in Jaeger\n- Monitor cache hit rate in Grafana\n- Alert when retry rate spikes\n- Dashboard shows circuit breaker states\n- Metrics pinpoint the failing tool immediately\n\n**Learn More**\n\n\ud83d\udcd6 **Complete Guide**: See [`OBSERVABILITY.md`](OBSERVABILITY.md) for:\n- Complete span and metric specifications\n- Architecture and implementation details\n- Integration guides (Jaeger, Grafana, OTEL Collector)\n- Testing observability features\n- Environment variable configuration\n\n\ud83c\udfaf **Working Example**: See `examples/observability_demo.py` for a complete demonstration with retries, caching, and circuit breakers\n\n**Benefits**\n\n\u2705 **Drop-in** - One function call, zero code changes\n\u2705 **Automatic** - All execution layers instrumented\n\u2705 **Standard** - OTEL + Prometheus (works with existing tools)\n\u2705 **Production-ready** - Ops teams get exactly what they expect\n\u2705 **Optional** - Gracefully degrades if packages not installed\n\u2705 **Zero-overhead** - No performance impact when disabled\n\n### Error Handling\n\n```python\nresults = await processor.process(llm_output)\n\nfor result in results:\n if result.error:\n print(f\"Tool '{result.tool}' failed: {result.error}\")\n print(f\"Duration: {result.duration}s\")\n else:\n print(f\"Tool '{result.tool}' succeeded: {result.result}\")\n```\n\n### Testing Tools\n\n```python\nimport pytest\nfrom chuk_tool_processor.core.processor import ToolProcessor\nfrom chuk_tool_processor.registry import initialize\n\n@pytest.mark.asyncio\nasync def test_calculator():\n await initialize()\n processor = ToolProcessor()\n\n results = await processor.process(\n '<tool name=\"calculator\" args=\\'{\"operation\": \"add\", \"a\": 5, \"b\": 3}\\'/>'\n )\n\n assert results[0].result[\"result\"] == 8\n```\n\n## Configuration\n\n### Timeout Configuration\n\nCHUK Tool Processor uses a unified timeout configuration system that applies to all MCP transports (HTTP Streamable, SSE, STDIO) and the StreamManager. Instead of managing dozens of individual timeout values, there are just **4 logical timeout categories**:\n\n```python\nfrom chuk_tool_processor.mcp.transport import TimeoutConfig\n\n# Create custom timeout configuration\ntimeout_config = TimeoutConfig(\n connect=30.0, # Connection establishment, initialization, session discovery\n operation=30.0, # Normal operations (tool calls, listing tools/resources/prompts)\n quick=5.0, # Fast health checks and pings\n shutdown=2.0 # Cleanup and shutdown operations\n)\n```\n\n**Using timeout configuration with StreamManager:**\n\n```python\nfrom chuk_tool_processor.mcp.stream_manager import StreamManager\nfrom chuk_tool_processor.mcp.transport import TimeoutConfig\n\n# Create StreamManager with custom timeouts\ntimeout_config = TimeoutConfig(\n connect=60.0, # Longer for slow initialization\n operation=45.0, # Longer for heavy operations\n quick=3.0, # Faster health checks\n shutdown=5.0 # More time for cleanup\n)\n\nmanager = StreamManager(timeout_config=timeout_config)\n```\n\n**Timeout categories explained:**\n\n| Category | Default | Used For | Examples |\n|----------|---------|----------|----------|\n| `connect` | 30.0s | Connection setup, initialization, discovery | HTTP connection, SSE session discovery, STDIO subprocess launch |\n| `operation` | 30.0s | Normal tool operations | Tool calls, listing tools/resources/prompts, get_tools() |\n| `quick` | 5.0s | Fast health/status checks | Ping operations, health checks |\n| `shutdown` | 2.0s | Cleanup and teardown | Transport close, connection cleanup |\n\n**Why this matters:**\n- \u2705 **Simple**: 4 timeout values instead of 20+\n- \u2705 **Consistent**: Same timeout behavior across all transports\n- \u2705 **Configurable**: Adjust timeouts based on your environment (slow networks, large datasets, etc.)\n- \u2705 **Type-safe**: Pydantic validation ensures correct values\n\n**Example: Adjusting for slow environments**\n\n```python\nfrom chuk_tool_processor.mcp import setup_mcp_stdio\nfrom chuk_tool_processor.mcp.transport import TimeoutConfig\n\n# For slow network or resource-constrained environments\nslow_timeouts = TimeoutConfig(\n connect=120.0, # Allow more time for package downloads\n operation=60.0, # Allow more time for heavy operations\n quick=10.0, # Be patient with health checks\n shutdown=10.0 # Allow thorough cleanup\n)\n\nprocessor, manager = await setup_mcp_stdio(\n config_file=\"mcp_config.json\",\n servers=[\"sqlite\"],\n namespace=\"db\",\n initialization_timeout=120.0\n)\n\n# Set custom timeouts on the manager\nmanager.timeout_config = slow_timeouts\n```\n\n### Environment Variables\n\n| Variable | Default | Description |\n|----------|---------|-------------|\n| `CHUK_TOOL_REGISTRY_PROVIDER` | `memory` | Registry backend |\n| `CHUK_DEFAULT_TIMEOUT` | `30.0` | Default timeout (seconds) |\n| `CHUK_LOG_LEVEL` | `INFO` | Logging level |\n| `CHUK_STRUCTURED_LOGGING` | `true` | Enable JSON logging |\n| `MCP_BEARER_TOKEN` | - | Bearer token for MCP SSE |\n\n### ToolProcessor Options\n\n```python\nprocessor = ToolProcessor(\n default_timeout=30.0, # Timeout per tool\n max_concurrency=10, # Max concurrent executions\n enable_caching=True, # Result caching\n cache_ttl=300, # Cache TTL (seconds)\n enable_rate_limiting=False, # Rate limiting\n global_rate_limit=None, # (requests per minute) global cap\n enable_retries=True, # Auto-retry failures\n max_retries=3, # Max retry attempts\n # Optional per-tool rate limits: {\"tool.name\": (requests, per_seconds)}\n tool_rate_limits=None\n)\n```\n\n### Performance & Tuning\n\n| Parameter | Default | When to Adjust |\n|-----------|---------|----------------|\n| `default_timeout` | `30.0` | Increase for slow tools (e.g., AI APIs) |\n| `max_concurrency` | `10` | Increase for I/O-bound tools, decrease for CPU-bound |\n| `enable_caching` | `True` | Keep on for deterministic tools |\n| `cache_ttl` | `300` | Longer for stable data, shorter for real-time |\n| `enable_rate_limiting` | `False` | Enable when hitting API rate limits |\n| `global_rate_limit` | `None` | Set a global requests/min cap across all tools |\n| `enable_retries` | `True` | Disable for non-idempotent operations |\n| `max_retries` | `3` | Increase for flaky external APIs |\n| `tool_rate_limits` | `None` | Dict mapping tool name \u2192 (max_requests, window_seconds). Overrides `global_rate_limit` per tool |\n\n**Per-tool rate limiting example:**\n\n```python\nprocessor = ToolProcessor(\n enable_rate_limiting=True,\n global_rate_limit=100, # 100 requests/minute across all tools\n tool_rate_limits={\n \"notion.search_pages\": (10, 60), # 10 requests per 60 seconds\n \"expensive_api\": (5, 60), # 5 requests per minute\n \"local_tool\": (1000, 60), # 1000 requests per minute (local is fast)\n }\n)\n```\n\n### Security Model\n\nCHUK Tool Processor provides multiple layers of safety:\n\n| Concern | Protection | Configuration |\n|---------|------------|---------------|\n| **Timeouts** | Every tool has a timeout | `default_timeout=30.0` |\n| **Process Isolation** | Run tools in separate processes | `strategy=SubprocessStrategy()` |\n| **Rate Limiting** | Prevent abuse and API overuse | `enable_rate_limiting=True` |\n| **Input Validation** | Pydantic validation on arguments | Use `ValidatedTool` |\n| **Error Containment** | Failures don't crash the processor | Built-in exception handling |\n| **Retry Limits** | Prevent infinite retry loops | `max_retries=3` |\n\n**Important Security Notes:**\n- **Environment Variables**: Subprocess strategy inherits the parent process environment by default. For stricter isolation, use container-level controls (Docker, cgroups).\n- **Network Access**: Tools inherit network access from the host. For network isolation, use OS-level sandboxing (containers, network namespaces, firewalls).\n- **Resource Limits**: For hard CPU/memory caps, use OS-level controls (cgroups on Linux, Job Objects on Windows, or Docker resource limits).\n- **Secrets**: Never injected automatically. Pass secrets explicitly via tool arguments or environment variables, and prefer scoped env vars for subprocess tools to minimize exposure.\n\nExample security-focused setup for untrusted code:\n\n```python\nimport asyncio\nfrom chuk_tool_processor.core.processor import ToolProcessor\nfrom chuk_tool_processor.execution.strategies.subprocess_strategy import SubprocessStrategy\nfrom chuk_tool_processor.registry import get_default_registry\n\nasync def create_secure_processor():\n # Maximum isolation for untrusted code\n # Runs each tool in a separate process\n registry = await get_default_registry()\n\n processor = ToolProcessor(\n strategy=SubprocessStrategy(\n registry=registry,\n max_workers=4,\n default_timeout=10.0\n ),\n default_timeout=10.0,\n enable_rate_limiting=True,\n global_rate_limit=50, # 50 requests/minute\n max_retries=2\n )\n return processor\n\n# For even stricter isolation:\n# - Run the entire processor inside a Docker container with resource limits\n# - Use network policies to restrict outbound connections\n# - Use read-only filesystems where possible\n```\n\n## Architecture Principles\n\n1. **Composability**: Stack strategies and wrappers like middleware\n2. **Async-First**: Built for `async/await` from the ground up\n3. **Production-Ready**: Timeouts, retries, caching, rate limiting\u2014all built-in\n4. **Pluggable**: Parsers, strategies, transports\u2014swap components as needed\n5. **Observable**: Structured logging and metrics collection throughout\n\n## Examples\n\nCheck out the [`examples/`](examples/) directory for complete working examples:\n\n### Getting Started\n- **60-second hello**: `examples/hello_tool.py` - Absolute minimal example (copy-paste-run)\n- **Quick start**: `examples/quickstart_demo.py` - Basic tool registration and execution\n- **Execution strategies**: `examples/execution_strategies_demo.py` - InProcess vs Subprocess\n- **Production wrappers**: `examples/wrappers_demo.py` - Caching, retries, rate limiting\n- **Streaming tools**: `examples/streaming_demo.py` - Real-time incremental results\n- **Streaming tool calls**: `examples/streaming_tool_calls_demo.py` - Handle partial tool calls from streaming LLMs\n- **Schema helper**: `examples/schema_helper_demo.py` - Auto-generate schemas from typed tools (Pydantic \u2192 OpenAI/Anthropic/MCP)\n- **Observability**: `examples/observability_demo.py` - OpenTelemetry + Prometheus integration\n\n### MCP Integration (Real-World)\n- **Notion + OAuth**: `examples/notion_oauth.py` - Complete OAuth 2.1 flow with HTTP Streamable\n - Shows: Authorization Server discovery, client registration, PKCE flow, token exchange\n- **SQLite Local**: `examples/stdio_sqlite.py` - Local database access via STDIO\n - Shows: Command/args passing, environment variables, file paths, initialization timeouts\n- **Echo Server**: `examples/stdio_echo.py` - Minimal STDIO transport example\n - Shows: Simplest possible MCP integration for testing\n- **Atlassian + OAuth**: `examples/atlassian_sse.py` - OAuth with SSE transport (legacy)\n\n### Advanced MCP\n- **HTTP Streamable**: `examples/mcp_http_streamable_example.py`\n- **STDIO**: `examples/mcp_stdio_example.py`\n- **SSE**: `examples/mcp_sse_example.py`\n- **Plugin system**: `examples/plugins_builtins_demo.py`, `examples/plugins_custom_parser_demo.py`\n\n## FAQ\n\n**Q: What happens if a tool takes too long?**\nA: The tool is cancelled after `default_timeout` seconds and returns an error result. The processor continues with other tools.\n\n**Q: Can I mix local and remote (MCP) tools?**\nA: Yes! Register local tools first, then use `setup_mcp_*` to add remote tools. They all work in the same processor.\n\n**Q: How do I handle malformed LLM outputs?**\nA: The processor is resilient\u2014invalid tool calls are logged and return error results without crashing.\n\n**Q: What about API rate limits?**\nA: Use `enable_rate_limiting=True` and set `tool_rate_limits` per tool or `global_rate_limit` for all tools.\n\n**Q: Can tools return files or binary data?**\nA: Yes\u2014tools can return any JSON-serializable data including base64-encoded files, URLs, or structured data.\n\n**Q: How do I test my tools?**\nA: Use pytest with `@pytest.mark.asyncio`. See [Testing Tools](#testing-tools) for examples.\n\n**Q: Does this work with streaming LLM responses?**\nA: Yes\u2014as tool calls appear in the stream, extract and process them. The processor handles partial/incremental tool call lists.\n\n**Q: What's the difference between InProcess and Subprocess strategies?**\nA: InProcess is faster (same process), Subprocess is safer (isolated process). Use InProcess for trusted code, Subprocess for untrusted.\n\n## Comparison with Other Tools\n\n| Feature | chuk-tool-processor | LangChain Tools | OpenAI Tools | MCP SDK |\n|---------|-------------------|-----------------|--------------|---------|\n| **Async-native** | \u2705 | \u26a0\ufe0f Partial | \u2705 | \u2705 |\n| **Process isolation** | \u2705 SubprocessStrategy | \u274c | \u274c | \u26a0\ufe0f |\n| **Built-in retries** | \u2705 | \u274c \u2020 | \u274c | \u274c |\n| **Rate limiting** | \u2705 | \u274c \u2020 | \u26a0\ufe0f \u2021 | \u274c |\n| **Caching** | \u2705 | \u26a0\ufe0f \u2020 | \u274c \u2021 | \u274c |\n| **Multiple parsers** | \u2705 (XML, OpenAI, JSON) | \u26a0\ufe0f | \u2705 | \u2705 |\n| **Streaming tools** | \u2705 | \u26a0\ufe0f | \u26a0\ufe0f | \u2705 |\n| **MCP integration** | \u2705 All transports | \u274c | \u274c | \u2705 (protocol only) |\n| **Zero-config start** | \u2705 | \u274c | \u2705 | \u26a0\ufe0f |\n| **Production-ready** | \u2705 Timeouts, metrics | \u26a0\ufe0f | \u26a0\ufe0f | \u26a0\ufe0f |\n\n**Notes:**\n- \u2020 LangChain offers caching and rate-limiting through separate libraries (`langchain-cache`, external rate limiters), but they're not core features.\n- \u2021 OpenAI Tools can be combined with external rate limiters and caches, but tool execution itself doesn't include these features.\n\n**When to use chuk-tool-processor:**\n- You need production-ready tool execution (timeouts, retries, caching)\n- You want to connect to MCP servers (local or remote)\n- You need to run untrusted code safely (subprocess isolation)\n- You're building a custom LLM application (not using a framework)\n\n**When to use alternatives:**\n- **LangChain**: You want a full-featured LLM framework with chains, agents, and memory\n- **OpenAI Tools**: You only use OpenAI and don't need advanced execution features\n- **MCP SDK**: You're building an MCP server, not a client\n\n## Related Projects\n\n- **[chuk-mcp](https://github.com/chrishayuk/chuk-mcp)**: Low-level Model Context Protocol client\n - Powers the MCP transport layer in chuk-tool-processor\n - Use directly if you need protocol-level control\n - Use chuk-tool-processor if you want high-level tool execution\n\n## Development & Publishing\n\n### For Contributors\n\nDevelopment setup:\n\n```bash\n# Clone repository\ngit clone https://github.com/chrishayuk/chuk-tool-processor.git\ncd chuk-tool-processor\n\n# Install development dependencies\nuv sync --dev\n\n# Run tests\nmake test\n\n# Run all quality checks\nmake check\n```\n\n### For Maintainers: Publishing Releases\n\nThe project uses **fully automated CI/CD** for releases. Publishing is as simple as:\n\n```bash\n# 1. Bump version\nmake bump-patch # or bump-minor, bump-major\n\n# 2. Commit version change\ngit add pyproject.toml\ngit commit -m \"version X.Y.Z\"\ngit push\n\n# 3. Create release (automated)\nmake publish\n```\n\nThis will:\n- Create and push a git tag\n- Trigger GitHub Actions to create a release with auto-generated changelog\n- Run tests across all platforms and Python versions\n- Build and publish to PyPI automatically\n\nFor detailed release documentation, see:\n- **[RELEASING.md](RELEASING.md)** - Complete release process guide\n- **[docs/CI-CD.md](docs/CI-CD.md)** - Full CI/CD pipeline documentation\n\n## Contributing & Support\n\n- **GitHub**: [chrishayuk/chuk-tool-processor](https://github.com/chrishayuk/chuk-tool-processor)\n- **Issues**: [Report bugs and request features](https://github.com/chrishayuk/chuk-tool-processor/issues)\n- **Discussions**: [Community discussions](https://github.com/chrishayuk/chuk-tool-processor/discussions)\n- **License**: MIT\n\n---\n\n**Remember**: CHUK Tool Processor is the missing link between LLM outputs and reliable tool execution. It's not trying to be everything\u2014it's trying to be the best at one thing: processing tool calls in production.\n\nBuilt with \u2764\ufe0f by the CHUK AI team for the LLM tool integration community.\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Async-native framework for registering, discovering, and executing tools referenced in LLM responses",
"version": "0.9.7",
"project_urls": null,
"split_keywords": [
"llm",
" tools",
" async",
" ai",
" openai",
" mcp",
" model-context-protocol",
" tool-calling",
" function-calling"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "e181588f03d01abe71445175f40f8193c93753801d637e26fad91d11e169c16d",
"md5": "471f52ea807095552ecab19856583a12",
"sha256": "0f543e410d037eb3cd15229408c87e76de9c783427dd978aae4fbf6605412df8"
},
"downloads": -1,
"filename": "chuk_tool_processor-0.9.7-py3-none-any.whl",
"has_sig": false,
"md5_digest": "471f52ea807095552ecab19856583a12",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.11",
"size": 155555,
"upload_time": "2025-11-02T15:35:52",
"upload_time_iso_8601": "2025-11-02T15:35:52.236495Z",
"url": "https://files.pythonhosted.org/packages/e1/81/588f03d01abe71445175f40f8193c93753801d637e26fad91d11e169c16d/chuk_tool_processor-0.9.7-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "c96e8ad269c72743aa9b4926c7baffa1c0c79aaa52b4f81ea0af0886ca7f0316",
"md5": "685cdf2d998d4f664bc7f65000812dac",
"sha256": "33d69ed56c69ddba36639d87d983c548b408d76d74473548bee0cfbd009354bf"
},
"downloads": -1,
"filename": "chuk_tool_processor-0.9.7.tar.gz",
"has_sig": false,
"md5_digest": "685cdf2d998d4f664bc7f65000812dac",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.11",
"size": 156810,
"upload_time": "2025-11-02T15:35:54",
"upload_time_iso_8601": "2025-11-02T15:35:54.221395Z",
"url": "https://files.pythonhosted.org/packages/c9/6e/8ad269c72743aa9b4926c7baffa1c0c79aaa52b4f81ea0af0886ca7f0316/chuk_tool_processor-0.9.7.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-11-02 15:35:54",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "chuk-tool-processor"
}