Name | asimov-agents JSON |
Version |
0.4.2
JSON |
| download |
home_page | None |
Summary | A library of primitives for building agentic flows. |
upload_time | 2024-11-21 22:51:07 |
maintainer | None |
docs_url | None |
author | None |
requires_python | >=3.12 |
license | Apache-2.0 |
keywords |
agent
ai
bismuth
git
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
[![PyPI - Version](https://img.shields.io/pypi/v/asimov_agents.svg)](https://pypi.org/project/asimov_agents)
[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/asimov_agents.svg)](https://pypi.org/project/asimov_agents)
# Asimov Agents
A Python framework for building AI agent systems with robust task management, inference capabilities, and caching.
🔮 Asimov is the foundation of [bismuth.sh](https://waitlist.bismuth.sh) an in terminal coding agent that can handle many tasks autonomously. Check us out! 🔮
## Quickstart
Checkout [these docs](https://github.com/BismuthCloud/asimov/tree/main/docs) which show off two basic examples that should be enough to get you experimenting!
Further documentation greatly appreciated in PRs!
## System Overview
Asimov Agents is composed of three main components:
1. **Task Graph System**
- Manages task execution flow and dependencies
- Supports different task states (WAITING, EXECUTING, COMPLETE, FAILED, PARTIAL)
- Uses Pydantic models for robust data validation
- Unique task identification via UUIDs
2. **Inference Clients**
- Supports multiple LLM providers:
- Anthropic Claude (via API)
- AWS Bedrock
- OpenAI (Including local models)
- Vertex
- Features:
- Streaming responses
- Tool/function calling capabilities
- Token usage tracking
- OpenTelemetry instrumentation
- Prompt caching support
3. **Caching System**
- Abstract Cache interface with Redis implementation
- Features:
- Key-value storage with JSON serialization
- Prefix/suffix namespacing
- Pub/sub messaging via mailboxes
- Bulk operations (get_all, clear)
- Async interface
## Component Interactions
### Task Management
- Tasks are created and tracked using the `Task` class
- Each task has:
- Unique ID
- Type and objective
- Parameters dictionary
- Status tracking
- Result/error storage
### Graph System Architecture
- **Module Types**
- `SUBGRAPH`: Nodes composes of other nodes.
- `EXECUTOR`: Task execution modules
- `FLOW_CONTROL`: Execution flow control modules
- **Node Configuration**
```python
node_config = NodeConfig(
parallel=True, # Enable parallel execution
condition="task.ready", # Conditional execution
retry_on_failure=True, # Enable retry mechanism
max_retries=3, # Maximum retry attempts
max_visits=5, # Maximum node visits
inputs=["data"], # Required inputs
outputs=["result"] # Expected outputs
)
```
- **Flow Control Features**
- Conditional branching based on task state
- Dynamic node addition during execution
- Dependency chain management
- Automatic cleanup of completed nodes
- Execution state tracking and recovery
- **Snapshot System**
- State preservation modes:
- `NEVER`: No snapshots
- `ONCE`: Single snapshot
- `ALWAYS`: Continuous snapshots
- Captures:
- Agent state
- Cache contents
- Task status
- Execution history
- Configurable storage location via `ASIMOV_SNAPSHOT`
- **Error Handling**
- Automatic retry mechanisms
- Partial completion states
- Failed chain tracking
- Detailed error reporting
- Timeout management
### Inference Pipeline
1. Messages are formatted with appropriate roles (SYSTEM, USER, ASSISTANT, TOOL_RESULT)
2. Inference clients handle:
- Message formatting
- API communication
- Response streaming
- Token accounting
- Error handling
### Caching Layer
- Redis cache provides:
- Fast key-value storage
- Message queuing
- Namespace management
- Atomic operations
## Agent Primitives
The Asimov Agents framework is built around several core primitives that enable flexible and powerful agent architectures:
### Module Types
The framework supports different types of modules through the `ModuleType` enum:
- `SUBGRAPH`: Nodes composes of other nodes.
- `EXECUTOR`: Task execution and action implementation
- `FLOW_CONTROL`: Execution flow and routing control
### Agent Module
The `AgentModule` is the base class for all agent components:
```python
class AgentModule:
name: str # Unique module identifier
type: ModuleType # Module type classification
config: ModuleConfig # Module configuration
dependencies: List[str] # Module dependencies
input_mailboxes: List[str] # Input communication channels
output_mailbox: str # Output communication channel
trace: bool # OpenTelemetry tracing flag
```
### Node Configuration
Nodes can be configured with various parameters through `NodeConfig`:
```python
class NodeConfig:
parallel: bool = False # Enable parallel execution
condition: Optional[str] = None # Execution condition
retry_on_failure: bool = True # Auto-retry on failures
max_retries: int = 3 # Maximum retry attempts
max_visits: int = 5 # Maximum node visits
inputs: List[str] = [] # Required inputs
outputs: List[str] = [] # Expected outputs
```
### Flow Control
Flow control enables dynamic execution paths:
```python
class FlowDecision:
next_node: str # Target node
condition: Optional[str] = None # Jump condition
cleanup_on_jump: bool = False # Cleanup on transition
class FlowControlConfig:
decisions: List[FlowDecision] # Decision rules
default: Optional[str] = None # Default node
cleanup_on_default: bool = True # Cleanup on default
```
### Middleware System
Middleware allows for processing interception:
```python
class Middleware:
async def process(self, data: Dict[str, Any], cache: Cache) -> Dict[str, Any]:
return data # Process or transform data
```
### Execution State
The framework maintains execution state through:
```python
class ExecutionState:
execution_index: int # Current execution position
current_plan: ExecutionPlan # Active execution plan
execution_history: List[ExecutionPlan] # Historical plans
total_iterations: int # Total execution iterations
```
### Snapshot Control
State persistence is managed through `SnapshotControl`:
- `NEVER`: No snapshots taken
- `ONCE`: Single snapshot capture
- `ALWAYS`: Continuous state capture
## Setup and Configuration
### Redis Cache Setup
```python
cache = RedisCache(
host="localhost", # Redis host
port=6379, # Redis port
db=0, # Database number
password=None, # Optional password
default_prefix="" # Optional key prefix
)
```
### Inference Client Setup
```python
# Anthropic Client
client = AnthropicInferenceClient(
model="claude-3-5-sonnet-20241022",
api_key="your-api-key",
api_url="https://api.anthropic.com/v1/messages"
)
# AWS Bedrock Client
client = BedrockInferenceClient(
model="anthropic.claude-3-5-sonnet-20241022-v2:0",
region_name="us-east-1"
)
```
There is similar set up for VertexAI and OpenAI
### Task and Graph Setup
```python
# Create a task
task = Task(
type="processing",
objective="Process data",
params={"input": "data"}
)
# Create nodes with different module types
executor_node = Node(
name="executor",
type=ModuleType.EXECUTOR,
modules=[ExecutorModule()],
dependencies=["planner"]
)
flow_control = Node(
name="flow_control",
type=ModuleType.FLOW_CONTROL,
modules=[FlowControlModule(
flow_config=FlowControlConfig(
decisions=[
FlowDecision(
next_node="executor",
condition="task.ready == true" # Conditions are small lua scripts that get run based on current state.
)
],
default="planner"
)
)]
)
# Set up the agent
agent = Agent(
cache=RedisCache(),
max_concurrent_tasks=5,
max_total_iterations=100
)
# Add nodes to the agent
agent.add_multiple_nodes([executor_node, flow_control])
# Run the task
await agent.run_task(task)
```
## Advanced Features
### Middleware System
```python
class LoggingMiddleware(Middleware):
async def process(self, data: Dict[str, Any], cache: Cache) -> Dict[str, Any]:
print(f"Processing data: {data}")
return data
node = Node(
name="executor",
type=ModuleType.EXECUTOR,
modules=[ExecutorModule()],
config=ModuleConfig(
middlewares=[LoggingMiddleware()],
timeout=30.0
)
)
```
### Execution State Management
- Tracks execution history
- Supports execution plan compilation
- Enables dynamic plan modification
- Provides state restoration capabilities
```python
# Access execution state
current_plan = agent.execution_state.current_plan
execution_history = agent.execution_state.execution_history
total_iterations = agent.execution_state.total_iterations
# Compile execution plans
full_plan = agent.compile_execution_plan()
partial_plan = agent.compile_execution_plan_from("specific_node")
# Restore from snapshot
await agent.run_from_snapshot(snapshot_dir)
```
### OpenTelemetry Integration
- Automatic span creation for nodes
- Execution tracking
- Performance monitoring
- Error tracing
```python
node = Node(
name="traced_node",
type=ModuleType.EXECUTOR,
modules=[ExecutorModule()],
trace=True # Enable OpenTelemetry tracing
)
```
## Performance Considerations
### Caching
- Use appropriate key prefixes/suffixes for namespace isolation
- Consider timeout settings for blocking operations
- Monitor Redis memory usage
- Use raw mode when bypassing JSON serialization
### Inference
- Token usage is tracked automatically
- Streaming reduces time-to-first-token
- Tool calls support iteration limits
- Prompt caching can improve response times
### Task Management
- Tasks support partial failure states
- Use UUIDs for guaranteed uniqueness
- Status transitions are atomic
## Development
### Running Tests
```bash
pytest tests/
```
### Required Dependencies
- Redis server (If using caching)
- Python 3.12+
- See requirements.txt for Python packages
## License
ApacheV2
Raw data
{
"_id": null,
"home_page": null,
"name": "asimov-agents",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.12",
"maintainer_email": null,
"keywords": "agent, ai, bismuth, git",
"author": null,
"author_email": "Ian Butler <ian@bismuth.cloud>",
"download_url": "https://files.pythonhosted.org/packages/e8/6b/5856795498078057beaa7177980f01f2921bdf569791572781bc11311dbc/asimov_agents-0.4.2.tar.gz",
"platform": null,
"description": "[![PyPI - Version](https://img.shields.io/pypi/v/asimov_agents.svg)](https://pypi.org/project/asimov_agents)\n[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/asimov_agents.svg)](https://pypi.org/project/asimov_agents)\n\n# Asimov Agents\n\nA Python framework for building AI agent systems with robust task management, inference capabilities, and caching.\n\n\ud83d\udd2e Asimov is the foundation of [bismuth.sh](https://waitlist.bismuth.sh) an in terminal coding agent that can handle many tasks autonomously. Check us out! \ud83d\udd2e\n\n## Quickstart\n\nCheckout [these docs](https://github.com/BismuthCloud/asimov/tree/main/docs) which show off two basic examples that should be enough to get you experimenting!\n\nFurther documentation greatly appreciated in PRs!\n\n## System Overview\n\nAsimov Agents is composed of three main components:\n\n1. **Task Graph System**\n - Manages task execution flow and dependencies\n - Supports different task states (WAITING, EXECUTING, COMPLETE, FAILED, PARTIAL)\n - Uses Pydantic models for robust data validation\n - Unique task identification via UUIDs\n\n2. **Inference Clients**\n - Supports multiple LLM providers:\n - Anthropic Claude (via API)\n - AWS Bedrock\n - OpenAI (Including local models)\n - Vertex\n - Features:\n - Streaming responses\n - Tool/function calling capabilities\n - Token usage tracking\n - OpenTelemetry instrumentation\n - Prompt caching support\n\n3. **Caching System**\n - Abstract Cache interface with Redis implementation\n - Features:\n - Key-value storage with JSON serialization\n - Prefix/suffix namespacing\n - Pub/sub messaging via mailboxes\n - Bulk operations (get_all, clear)\n - Async interface\n\n## Component Interactions\n\n### Task Management\n- Tasks are created and tracked using the `Task` class\n- Each task has:\n - Unique ID\n - Type and objective\n - Parameters dictionary\n - Status tracking\n - Result/error storage\n\n### Graph System Architecture\n- **Module Types**\n - `SUBGRAPH`: Nodes composes of other nodes.\n - `EXECUTOR`: Task execution modules\n - `FLOW_CONTROL`: Execution flow control modules\n\n- **Node Configuration**\n ```python\n node_config = NodeConfig(\n parallel=True, # Enable parallel execution\n condition=\"task.ready\", # Conditional execution\n retry_on_failure=True, # Enable retry mechanism\n max_retries=3, # Maximum retry attempts\n max_visits=5, # Maximum node visits\n inputs=[\"data\"], # Required inputs\n outputs=[\"result\"] # Expected outputs\n )\n ```\n\n- **Flow Control Features**\n - Conditional branching based on task state\n - Dynamic node addition during execution\n - Dependency chain management\n - Automatic cleanup of completed nodes\n - Execution state tracking and recovery\n\n- **Snapshot System**\n - State preservation modes:\n - `NEVER`: No snapshots\n - `ONCE`: Single snapshot\n - `ALWAYS`: Continuous snapshots\n - Captures:\n - Agent state\n - Cache contents\n - Task status\n - Execution history\n - Configurable storage location via `ASIMOV_SNAPSHOT`\n\n- **Error Handling**\n - Automatic retry mechanisms\n - Partial completion states\n - Failed chain tracking\n - Detailed error reporting\n - Timeout management\n\n### Inference Pipeline\n1. Messages are formatted with appropriate roles (SYSTEM, USER, ASSISTANT, TOOL_RESULT)\n2. Inference clients handle:\n - Message formatting\n - API communication\n - Response streaming\n - Token accounting\n - Error handling\n\n### Caching Layer\n- Redis cache provides:\n - Fast key-value storage\n - Message queuing\n - Namespace management\n - Atomic operations\n\n## Agent Primitives\n\nThe Asimov Agents framework is built around several core primitives that enable flexible and powerful agent architectures:\n\n### Module Types\nThe framework supports different types of modules through the `ModuleType` enum:\n- `SUBGRAPH`: Nodes composes of other nodes.\n- `EXECUTOR`: Task execution and action implementation\n- `FLOW_CONTROL`: Execution flow and routing control\n\n### Agent Module\nThe `AgentModule` is the base class for all agent components:\n```python\nclass AgentModule:\n name: str # Unique module identifier\n type: ModuleType # Module type classification\n config: ModuleConfig # Module configuration\n dependencies: List[str] # Module dependencies\n input_mailboxes: List[str] # Input communication channels\n output_mailbox: str # Output communication channel\n trace: bool # OpenTelemetry tracing flag\n```\n\n### Node Configuration\nNodes can be configured with various parameters through `NodeConfig`:\n```python\nclass NodeConfig:\n parallel: bool = False # Enable parallel execution\n condition: Optional[str] = None # Execution condition\n retry_on_failure: bool = True # Auto-retry on failures\n max_retries: int = 3 # Maximum retry attempts\n max_visits: int = 5 # Maximum node visits\n inputs: List[str] = [] # Required inputs\n outputs: List[str] = [] # Expected outputs\n```\n\n### Flow Control\nFlow control enables dynamic execution paths:\n```python\nclass FlowDecision:\n next_node: str # Target node\n condition: Optional[str] = None # Jump condition\n cleanup_on_jump: bool = False # Cleanup on transition\n\nclass FlowControlConfig:\n decisions: List[FlowDecision] # Decision rules\n default: Optional[str] = None # Default node\n cleanup_on_default: bool = True # Cleanup on default\n```\n\n### Middleware System\nMiddleware allows for processing interception:\n```python\nclass Middleware:\n async def process(self, data: Dict[str, Any], cache: Cache) -> Dict[str, Any]:\n return data # Process or transform data\n```\n\n### Execution State\nThe framework maintains execution state through:\n```python\nclass ExecutionState:\n execution_index: int # Current execution position\n current_plan: ExecutionPlan # Active execution plan\n execution_history: List[ExecutionPlan] # Historical plans\n total_iterations: int # Total execution iterations\n```\n\n### Snapshot Control\nState persistence is managed through `SnapshotControl`:\n- `NEVER`: No snapshots taken\n- `ONCE`: Single snapshot capture\n- `ALWAYS`: Continuous state capture\n\n## Setup and Configuration\n\n### Redis Cache Setup\n```python\ncache = RedisCache(\n host=\"localhost\", # Redis host\n port=6379, # Redis port\n db=0, # Database number\n password=None, # Optional password\n default_prefix=\"\" # Optional key prefix\n)\n```\n\n### Inference Client Setup\n```python\n# Anthropic Client\nclient = AnthropicInferenceClient(\n model=\"claude-3-5-sonnet-20241022\",\n api_key=\"your-api-key\",\n api_url=\"https://api.anthropic.com/v1/messages\"\n)\n\n# AWS Bedrock Client\nclient = BedrockInferenceClient(\n model=\"anthropic.claude-3-5-sonnet-20241022-v2:0\",\n region_name=\"us-east-1\"\n)\n```\n\nThere is similar set up for VertexAI and OpenAI\n\n### Task and Graph Setup\n```python\n# Create a task\ntask = Task(\n type=\"processing\",\n objective=\"Process data\",\n params={\"input\": \"data\"}\n)\n\n# Create nodes with different module types\nexecutor_node = Node(\n name=\"executor\",\n type=ModuleType.EXECUTOR,\n modules=[ExecutorModule()],\n dependencies=[\"planner\"]\n)\n\nflow_control = Node(\n name=\"flow_control\",\n type=ModuleType.FLOW_CONTROL,\n modules=[FlowControlModule(\n flow_config=FlowControlConfig(\n decisions=[\n FlowDecision(\n next_node=\"executor\",\n condition=\"task.ready == true\" # Conditions are small lua scripts that get run based on current state.\n )\n ],\n default=\"planner\"\n )\n )]\n)\n\n# Set up the agent\nagent = Agent(\n cache=RedisCache(),\n max_concurrent_tasks=5,\n max_total_iterations=100\n)\n\n# Add nodes to the agent\nagent.add_multiple_nodes([executor_node, flow_control])\n\n# Run the task\nawait agent.run_task(task)\n```\n\n## Advanced Features\n\n### Middleware System\n```python\nclass LoggingMiddleware(Middleware):\n async def process(self, data: Dict[str, Any], cache: Cache) -> Dict[str, Any]:\n print(f\"Processing data: {data}\")\n return data\n\nnode = Node(\n name=\"executor\",\n type=ModuleType.EXECUTOR,\n modules=[ExecutorModule()],\n config=ModuleConfig(\n middlewares=[LoggingMiddleware()],\n timeout=30.0\n )\n)\n```\n\n### Execution State Management\n- Tracks execution history\n- Supports execution plan compilation\n- Enables dynamic plan modification\n- Provides state restoration capabilities\n```python\n# Access execution state\ncurrent_plan = agent.execution_state.current_plan\nexecution_history = agent.execution_state.execution_history\ntotal_iterations = agent.execution_state.total_iterations\n\n# Compile execution plans\nfull_plan = agent.compile_execution_plan()\npartial_plan = agent.compile_execution_plan_from(\"specific_node\")\n\n# Restore from snapshot\nawait agent.run_from_snapshot(snapshot_dir)\n```\n\n### OpenTelemetry Integration\n- Automatic span creation for nodes\n- Execution tracking\n- Performance monitoring\n- Error tracing\n```python\nnode = Node(\n name=\"traced_node\",\n type=ModuleType.EXECUTOR,\n modules=[ExecutorModule()],\n trace=True # Enable OpenTelemetry tracing\n)\n```\n\n## Performance Considerations\n\n### Caching\n- Use appropriate key prefixes/suffixes for namespace isolation\n- Consider timeout settings for blocking operations\n- Monitor Redis memory usage\n- Use raw mode when bypassing JSON serialization\n\n### Inference\n- Token usage is tracked automatically\n- Streaming reduces time-to-first-token\n- Tool calls support iteration limits\n- Prompt caching can improve response times\n\n### Task Management\n- Tasks support partial failure states\n- Use UUIDs for guaranteed uniqueness\n- Status transitions are atomic\n\n## Development\n\n### Running Tests\n```bash\npytest tests/\n```\n\n### Required Dependencies\n- Redis server (If using caching)\n- Python 3.12+\n- See requirements.txt for Python packages\n\n## License\n\nApacheV2\n",
"bugtrack_url": null,
"license": "Apache-2.0",
"summary": "A library of primitives for building agentic flows.",
"version": "0.4.2",
"project_urls": {
"Documentation": "https://github.com/BismuthCloud/asimov",
"Homepage": "https://github.com/BismuthCloud/asimov",
"Issues": "https://github.com/BismuthCloud/asimov/issues",
"Source": "https://github.com/BismuthCloud/asimov",
"URL": "https://github.com/BismuthCloud/asimov"
},
"split_keywords": [
"agent",
" ai",
" bismuth",
" git"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "fd115cac16ddeab2b5b4910129da6377f991f6163dbb6cd6a4d19b0f631ebcac",
"md5": "b3a9255d7a9db1375e8b262b188178bc",
"sha256": "51e6be868546c74efa45ad2f6cf90a00c3e3226d222ee91e9919a864479c9036"
},
"downloads": -1,
"filename": "asimov_agents-0.4.2-py3-none-any.whl",
"has_sig": false,
"md5_digest": "b3a9255d7a9db1375e8b262b188178bc",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.12",
"size": 32844,
"upload_time": "2024-11-21T22:51:09",
"upload_time_iso_8601": "2024-11-21T22:51:09.162858Z",
"url": "https://files.pythonhosted.org/packages/fd/11/5cac16ddeab2b5b4910129da6377f991f6163dbb6cd6a4d19b0f631ebcac/asimov_agents-0.4.2-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "e86b5856795498078057beaa7177980f01f2921bdf569791572781bc11311dbc",
"md5": "8d4cb12a50c1ae2c62f2d19270af0ebd",
"sha256": "ed911d9c4fb4a56213781689ed0c8f4fd9ed614f2c48a71e7ca40e04359c2abf"
},
"downloads": -1,
"filename": "asimov_agents-0.4.2.tar.gz",
"has_sig": false,
"md5_digest": "8d4cb12a50c1ae2c62f2d19270af0ebd",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.12",
"size": 32544,
"upload_time": "2024-11-21T22:51:07",
"upload_time_iso_8601": "2024-11-21T22:51:07.542427Z",
"url": "https://files.pythonhosted.org/packages/e8/6b/5856795498078057beaa7177980f01f2921bdf569791572781bc11311dbc/asimov_agents-0.4.2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-11-21 22:51:07",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "BismuthCloud",
"github_project": "asimov",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "asimov-agents"
}