asimov-agents


Nameasimov-agents JSON
Version 0.4.6 PyPI version JSON
download
home_pageNone
SummaryA library of primitives for building agentic flows.
upload_time2024-12-07 00:47:25
maintainerNone
docs_urlNone
authorNone
requires_python>=3.12
licenseApache-2.0
keywords agent ai bismuth git
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            [![PyPI - Version](https://img.shields.io/pypi/v/asimov_agents.svg)](https://pypi.org/project/asimov_agents)
[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/asimov_agents.svg)](https://pypi.org/project/asimov_agents)

[![](https://dcbadge.limes.pink/api/server/https://discord.gg/bismuthai)](https://discord.gg/bismuthai)

👋 Hello AI Tinkerers Group! Welcome to:

# Asimov Agents

A Python framework for building AI agent systems with robust task management in the form of a graph execution engine, inference capabilities, and caching.

We support advanced features like State Snapshotting, Middleware, Agent Directed Graph Execution, Open Telemetry Integrations and more.

🔮 Asimov is the foundation of [bismuth.sh](https://waitlist.bismuth.sh) an in terminal coding agent that can handle many tasks autonomously. Check us out! 🔮

## Quickstart
```bash
pip install asimov_agents
```

Checkout [these docs](https://github.com/BismuthCloud/asimov/tree/main/docs) which show off two basic examples that should be enough to get you experimenting!

Further documentation greatly appreciated in PRs!

## System Overview

Asimov Agents is composed of three main components:

1. **Task Graph System**
   - Manages task execution flow and dependencies
   - Supports different task states (WAITING, EXECUTING, COMPLETE, FAILED, PARTIAL)
   - Uses Pydantic models for robust data validation
   - Unique task identification via UUIDs

2. **Inference Clients**
   - Supports multiple LLM providers:
     - Anthropic Claude (via API)
     - AWS Bedrock
     - OpenAI (Including local models)
     - Vertex
   - Features:
     - Streaming responses
     - Tool/function calling capabilities
     - Token usage tracking
     - OpenTelemetry instrumentation
     - Prompt caching support

3. **Caching System**
   - Abstract Cache interface with Redis implementation
   - Features:
     - Key-value storage with JSON serialization
     - Prefix/suffix namespacing
     - Pub/sub messaging via mailboxes
     - Bulk operations (get_all, clear)
     - Async interface

## Component Interactions

### Task Management
- Tasks are created and tracked using the `Task` class
- Each task has:
  - Unique ID
  - Type and objective
  - Parameters dictionary
  - Status tracking
  - Result/error storage

### Graph System Architecture
- **Module Types**
  - `SUBGRAPH`: Nodes composes of other nodes.
  - `EXECUTOR`: Task execution modules
  - `FLOW_CONTROL`: Execution flow control modules

- **Node Configuration**
  ```python
  node_config = NodeConfig(
      parallel=True,              # Enable parallel execution
      condition="task.ready",     # Conditional execution
      retry_on_failure=True,      # Enable retry mechanism
      max_retries=3,             # Maximum retry attempts
      max_visits=5,              # Maximum node visits
      inputs=["data"],           # Required inputs
      outputs=["result"]         # Expected outputs
  )
  ```

- **Flow Control Features**
  - Conditional branching based on task state
  - Dynamic node addition during execution
  - Dependency chain management
  - Automatic cleanup of completed nodes
  - Execution state tracking and recovery
  - LLM directed flow for complex decisisons

- **Snapshot System**
  - State preservation modes:
    - `NEVER`: No snapshots
    - `ONCE`: Single snapshot
    - `ALWAYS`: Continuous snapshots
  - Captures:
    - Agent state
    - Cache contents
    - Task status
    - Execution history
  - Configurable storage location via `ASIMOV_SNAPSHOT`

- **Error Handling**
  - Automatic retry mechanisms
  - Partial completion states
  - Failed chain tracking
  - Detailed error reporting
  - Timeout management

### Inference Pipeline
1. Messages are formatted with appropriate roles (SYSTEM, USER, ASSISTANT, TOOL_RESULT)
2. Inference clients handle:
   - Message formatting
   - API communication
   - Response streaming
   - Token accounting
   - Error handling

### Caching Layer
- Redis cache provides:
  - Fast key-value storage
  - Message queuing
  - Namespace management
  - Atomic operations

## Agent Primitives

The Asimov Agents framework is built around several core primitives that enable flexible and powerful agent architectures:

### Module Types
The framework supports different types of modules through the `ModuleType` enum:
- `SUBGRAPH`: Nodes composes of other nodes.
- `EXECUTOR`: Task execution and action implementation
- `FLOW_CONTROL`: Execution flow and routing control

### Agent Module
The `AgentModule` is the base class for all agent components:
```python
class AgentModule:
    name: str                   # Unique module identifier
    type: ModuleType           # Module type classification
    config: ModuleConfig       # Module configuration
    dependencies: List[str]    # Module dependencies
    input_mailboxes: List[str] # Input communication channels
    output_mailbox: str        # Output communication channel
    trace: bool                # OpenTelemetry tracing flag
```

### Node Configuration
Nodes can be configured with various parameters through `NodeConfig`:
```python
class NodeConfig:
    parallel: bool = False           # Enable parallel execution
    condition: Optional[str] = None  # Execution condition
    retry_on_failure: bool = True    # Auto-retry on failures
    max_retries: int = 3            # Maximum retry attempts
    max_visits: int = 5             # Maximum node visits
    inputs: List[str] = []          # Required inputs
    outputs: List[str] = []         # Expected outputs
```

### Flow Control
Flow control enables dynamic execution paths:
```python
class FlowDecision:
    next_node: str                    # Target node
    condition: Optional[str] = None   # Jump condition
    cleanup_on_jump: bool = False     # Cleanup on transition

class FlowControlConfig:
    decisions: List[FlowDecision]     # Decision rules
    default: Optional[str] = None     # Default node
    cleanup_on_default: bool = True   # Cleanup on default
```

### Agent Directed Flow Control

Agent Directed Flow Control is a powerful feature that enables intelligent routing of tasks based on LLM decision making. It allows the system to:

- Dynamically route tasks to specialized modules based on content analysis
- Use example-based learning for routing decisions
- Support multiple voters for consensus-based routing
- Handle fallback cases with error handlers

Example configuration:
```python
flow_control = Node(
    name="flow_control",
    type=ModuleType.FLOW_CONTROL,
    modules=[
        AgentDirectedFlowControl(
            name="ContentFlowControl",
            type=ModuleType.FLOW_CONTROL,
            voters=3,  # Number of voters for consensus
            inference_client=inference_client,
            system_description="A system that handles various content creation tasks",
            flow_config=AgentDrivenFlowControlConfig(
                decisions=[
                    AgentDrivenFlowDecision(
                        next_node="blog_writer",
                        metadata={"description": "Writes blog posts on technical topics"},
                        examples=[
                            Example(
                                message="Write a blog post about AI agents",
                                choices=[
                                    {"choice": "blog_writer", "description": "Writes blog posts"},
                                    {"choice": "code_writer", "description": "Writes code"}
                                ],
                                choice="blog_writer",
                                reasoning="The request is specifically for blog content"
                            )
                        ]
                    ),
                    AgentDrivenFlowDecision(
                        next_node="code_writer",
                        metadata={"description": "Writes code examples and tutorials"},
                        examples=[
                            Example(
                                message="Create a Python script for data processing",
                                choices=[
                                    {"choice": "blog_writer", "description": "Writes blog posts"},
                                    {"choice": "code_writer", "description": "Writes code"}
                                ],
                                choice="code_writer",
                                reasoning="The request is for code creation"
                            )
                        ]
                    )
                ],
                default="error_handler"  # Fallback node for unmatched requests
            )
        )
    ]
)
```

Key features:
- Example-based routing decisions with clear reasoning
- Multiple voter support (configurable number of voters) for robust decision making
- Specialized executor modules for different content types (e.g., blog posts, code)
- Metadata-enriched routing configuration for better decision context
- Fallback error handling for unmatched requests
- Cached message passing between nodes using Redis
- Asynchronous execution with semaphore control
- Comprehensive error handling and reporting

For a complete working example of Agent Directed Flow Control, check out the `examples/agent_directed_flow.py` file which demonstrates a content creation system that intelligently routes tasks between blog writing and code generation modules.

### Middleware System
Middleware allows for processing interception:
```python
class Middleware:
    async def process(self, data: Dict[str, Any], cache: Cache) -> Dict[str, Any]:
        return data  # Process or transform data
```

### Execution State
The framework maintains execution state through:
```python
class ExecutionState:
    execution_index: int              # Current execution position
    current_plan: ExecutionPlan       # Active execution plan
    execution_history: List[ExecutionPlan]  # Historical plans
    total_iterations: int             # Total execution iterations
```

### Snapshot Control
State persistence is managed through `SnapshotControl`:
- `NEVER`: No snapshots taken
- `ONCE`: Single snapshot capture
- `ALWAYS`: Continuous state capture

## Setup and Configuration

### Redis Cache Setup
```python
cache = RedisCache(
    host="localhost",  # Redis host
    port=6379,        # Redis port
    db=0,             # Database number
    password=None,    # Optional password
    default_prefix="" # Optional key prefix
)
```

### Inference Client Setup
```python
# Anthropic Client
client = AnthropicInferenceClient(
    model="claude-3-5-sonnet-20241022",
    api_key="your-api-key",
    api_url="https://api.anthropic.com/v1/messages"
)

# AWS Bedrock Client
client = BedrockInferenceClient(
    model="anthropic.claude-3-5-sonnet-20241022-v2:0",
    region_name="us-east-1"
)
```

There is similar set up for VertexAI and OpenAI

### Task and Graph Setup
```python
# Create a task
task = Task(
    type="processing",
    objective="Process data",
    params={"input": "data"}
)

# Create nodes with different module types
executor_node = Node(
    name="executor",
    type=ModuleType.EXECUTOR,
    modules=[ExecutorModule()],
    dependencies=["planner"]
)

flow_control = Node(
    name="flow_control",
    type=ModuleType.FLOW_CONTROL,
    modules=[FlowControlModule(
        flow_config=FlowControlConfig(
            decisions=[
                FlowDecision(
                    next_node="executor",
                    condition="task.ready == true" # Conditions are small lua scripts that get run based on current state.
                )
            ],
            default="planner"
        )
    )]
)

# Set up the agent
agent = Agent(
    cache=RedisCache(),
    max_concurrent_tasks=5,
    max_total_iterations=100
)

# Add nodes to the agent
agent.add_multiple_nodes([executor_node, flow_control])

# Run the task
await agent.run_task(task)
```

## Advanced Features

### Middleware System
```python
class LoggingMiddleware(Middleware):
    async def process(self, data: Dict[str, Any], cache: Cache) -> Dict[str, Any]:
        print(f"Processing data: {data}")
        return data

node = Node(
    name="executor",
    type=ModuleType.EXECUTOR,
    modules=[ExecutorModule()],
    config=ModuleConfig(
        middlewares=[LoggingMiddleware()],
        timeout=30.0
    )
)
```

### Execution State Management
- Tracks execution history
- Supports execution plan compilation
- Enables dynamic plan modification
- Provides state restoration capabilities
```python
# Access execution state
current_plan = agent.execution_state.current_plan
execution_history = agent.execution_state.execution_history
total_iterations = agent.execution_state.total_iterations

# Compile execution plans
full_plan = agent.compile_execution_plan()
partial_plan = agent.compile_execution_plan_from("specific_node")

# Restore from snapshot
await agent.run_from_snapshot(snapshot_dir)
```

### OpenTelemetry Integration
- Automatic span creation for nodes
- Execution tracking
- Performance monitoring
- Error tracing
```python
node = Node(
    name="traced_node",
    type=ModuleType.EXECUTOR,
    modules=[ExecutorModule()],
    trace=True  # Enable OpenTelemetry tracing
)
```

## Performance Considerations

### Caching
- Use appropriate key prefixes/suffixes for namespace isolation
- Consider timeout settings for blocking operations
- Monitor Redis memory usage
- Use raw mode when bypassing JSON serialization

### Inference
- Token usage is tracked automatically
- Streaming reduces time-to-first-token
- Tool calls support iteration limits
- Prompt caching can improve response times

### Task Management
- Tasks support partial failure states
- Use UUIDs for guaranteed uniqueness
- Status transitions are atomic

## Development

### Running Tests
```bash
pytest tests/
```

### Required Dependencies
- Redis server (If using caching)
- Python 3.12+
- See requirements.txt for Python packages

## License

ApacheV2

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "asimov-agents",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.12",
    "maintainer_email": null,
    "keywords": "agent, ai, bismuth, git",
    "author": null,
    "author_email": "Ian Butler <ian@bismuth.cloud>",
    "download_url": "https://files.pythonhosted.org/packages/f7/f5/7969a76c9067a1b4c3563e0d64d298a0472fbf6881631516e0d19992bf0a/asimov_agents-0.4.6.tar.gz",
    "platform": null,
    "description": "[![PyPI - Version](https://img.shields.io/pypi/v/asimov_agents.svg)](https://pypi.org/project/asimov_agents)\n[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/asimov_agents.svg)](https://pypi.org/project/asimov_agents)\n\n[![](https://dcbadge.limes.pink/api/server/https://discord.gg/bismuthai)](https://discord.gg/bismuthai)\n\n\ud83d\udc4b Hello AI Tinkerers Group! Welcome to:\n\n# Asimov Agents\n\nA Python framework for building AI agent systems with robust task management in the form of a graph execution engine, inference capabilities, and caching.\n\nWe support advanced features like State Snapshotting, Middleware, Agent Directed Graph Execution, Open Telemetry Integrations and more.\n\n\ud83d\udd2e Asimov is the foundation of [bismuth.sh](https://waitlist.bismuth.sh) an in terminal coding agent that can handle many tasks autonomously. Check us out! \ud83d\udd2e\n\n## Quickstart\n```bash\npip install asimov_agents\n```\n\nCheckout [these docs](https://github.com/BismuthCloud/asimov/tree/main/docs) which show off two basic examples that should be enough to get you experimenting!\n\nFurther documentation greatly appreciated in PRs!\n\n## System Overview\n\nAsimov Agents is composed of three main components:\n\n1. **Task Graph System**\n   - Manages task execution flow and dependencies\n   - Supports different task states (WAITING, EXECUTING, COMPLETE, FAILED, PARTIAL)\n   - Uses Pydantic models for robust data validation\n   - Unique task identification via UUIDs\n\n2. **Inference Clients**\n   - Supports multiple LLM providers:\n     - Anthropic Claude (via API)\n     - AWS Bedrock\n     - OpenAI (Including local models)\n     - Vertex\n   - Features:\n     - Streaming responses\n     - Tool/function calling capabilities\n     - Token usage tracking\n     - OpenTelemetry instrumentation\n     - Prompt caching support\n\n3. **Caching System**\n   - Abstract Cache interface with Redis implementation\n   - Features:\n     - Key-value storage with JSON serialization\n     - Prefix/suffix namespacing\n     - Pub/sub messaging via mailboxes\n     - Bulk operations (get_all, clear)\n     - Async interface\n\n## Component Interactions\n\n### Task Management\n- Tasks are created and tracked using the `Task` class\n- Each task has:\n  - Unique ID\n  - Type and objective\n  - Parameters dictionary\n  - Status tracking\n  - Result/error storage\n\n### Graph System Architecture\n- **Module Types**\n  - `SUBGRAPH`: Nodes composes of other nodes.\n  - `EXECUTOR`: Task execution modules\n  - `FLOW_CONTROL`: Execution flow control modules\n\n- **Node Configuration**\n  ```python\n  node_config = NodeConfig(\n      parallel=True,              # Enable parallel execution\n      condition=\"task.ready\",     # Conditional execution\n      retry_on_failure=True,      # Enable retry mechanism\n      max_retries=3,             # Maximum retry attempts\n      max_visits=5,              # Maximum node visits\n      inputs=[\"data\"],           # Required inputs\n      outputs=[\"result\"]         # Expected outputs\n  )\n  ```\n\n- **Flow Control Features**\n  - Conditional branching based on task state\n  - Dynamic node addition during execution\n  - Dependency chain management\n  - Automatic cleanup of completed nodes\n  - Execution state tracking and recovery\n  - LLM directed flow for complex decisisons\n\n- **Snapshot System**\n  - State preservation modes:\n    - `NEVER`: No snapshots\n    - `ONCE`: Single snapshot\n    - `ALWAYS`: Continuous snapshots\n  - Captures:\n    - Agent state\n    - Cache contents\n    - Task status\n    - Execution history\n  - Configurable storage location via `ASIMOV_SNAPSHOT`\n\n- **Error Handling**\n  - Automatic retry mechanisms\n  - Partial completion states\n  - Failed chain tracking\n  - Detailed error reporting\n  - Timeout management\n\n### Inference Pipeline\n1. Messages are formatted with appropriate roles (SYSTEM, USER, ASSISTANT, TOOL_RESULT)\n2. Inference clients handle:\n   - Message formatting\n   - API communication\n   - Response streaming\n   - Token accounting\n   - Error handling\n\n### Caching Layer\n- Redis cache provides:\n  - Fast key-value storage\n  - Message queuing\n  - Namespace management\n  - Atomic operations\n\n## Agent Primitives\n\nThe Asimov Agents framework is built around several core primitives that enable flexible and powerful agent architectures:\n\n### Module Types\nThe framework supports different types of modules through the `ModuleType` enum:\n- `SUBGRAPH`: Nodes composes of other nodes.\n- `EXECUTOR`: Task execution and action implementation\n- `FLOW_CONTROL`: Execution flow and routing control\n\n### Agent Module\nThe `AgentModule` is the base class for all agent components:\n```python\nclass AgentModule:\n    name: str                   # Unique module identifier\n    type: ModuleType           # Module type classification\n    config: ModuleConfig       # Module configuration\n    dependencies: List[str]    # Module dependencies\n    input_mailboxes: List[str] # Input communication channels\n    output_mailbox: str        # Output communication channel\n    trace: bool                # OpenTelemetry tracing flag\n```\n\n### Node Configuration\nNodes can be configured with various parameters through `NodeConfig`:\n```python\nclass NodeConfig:\n    parallel: bool = False           # Enable parallel execution\n    condition: Optional[str] = None  # Execution condition\n    retry_on_failure: bool = True    # Auto-retry on failures\n    max_retries: int = 3            # Maximum retry attempts\n    max_visits: int = 5             # Maximum node visits\n    inputs: List[str] = []          # Required inputs\n    outputs: List[str] = []         # Expected outputs\n```\n\n### Flow Control\nFlow control enables dynamic execution paths:\n```python\nclass FlowDecision:\n    next_node: str                    # Target node\n    condition: Optional[str] = None   # Jump condition\n    cleanup_on_jump: bool = False     # Cleanup on transition\n\nclass FlowControlConfig:\n    decisions: List[FlowDecision]     # Decision rules\n    default: Optional[str] = None     # Default node\n    cleanup_on_default: bool = True   # Cleanup on default\n```\n\n### Agent Directed Flow Control\n\nAgent Directed Flow Control is a powerful feature that enables intelligent routing of tasks based on LLM decision making. It allows the system to:\n\n- Dynamically route tasks to specialized modules based on content analysis\n- Use example-based learning for routing decisions\n- Support multiple voters for consensus-based routing\n- Handle fallback cases with error handlers\n\nExample configuration:\n```python\nflow_control = Node(\n    name=\"flow_control\",\n    type=ModuleType.FLOW_CONTROL,\n    modules=[\n        AgentDirectedFlowControl(\n            name=\"ContentFlowControl\",\n            type=ModuleType.FLOW_CONTROL,\n            voters=3,  # Number of voters for consensus\n            inference_client=inference_client,\n            system_description=\"A system that handles various content creation tasks\",\n            flow_config=AgentDrivenFlowControlConfig(\n                decisions=[\n                    AgentDrivenFlowDecision(\n                        next_node=\"blog_writer\",\n                        metadata={\"description\": \"Writes blog posts on technical topics\"},\n                        examples=[\n                            Example(\n                                message=\"Write a blog post about AI agents\",\n                                choices=[\n                                    {\"choice\": \"blog_writer\", \"description\": \"Writes blog posts\"},\n                                    {\"choice\": \"code_writer\", \"description\": \"Writes code\"}\n                                ],\n                                choice=\"blog_writer\",\n                                reasoning=\"The request is specifically for blog content\"\n                            )\n                        ]\n                    ),\n                    AgentDrivenFlowDecision(\n                        next_node=\"code_writer\",\n                        metadata={\"description\": \"Writes code examples and tutorials\"},\n                        examples=[\n                            Example(\n                                message=\"Create a Python script for data processing\",\n                                choices=[\n                                    {\"choice\": \"blog_writer\", \"description\": \"Writes blog posts\"},\n                                    {\"choice\": \"code_writer\", \"description\": \"Writes code\"}\n                                ],\n                                choice=\"code_writer\",\n                                reasoning=\"The request is for code creation\"\n                            )\n                        ]\n                    )\n                ],\n                default=\"error_handler\"  # Fallback node for unmatched requests\n            )\n        )\n    ]\n)\n```\n\nKey features:\n- Example-based routing decisions with clear reasoning\n- Multiple voter support (configurable number of voters) for robust decision making\n- Specialized executor modules for different content types (e.g., blog posts, code)\n- Metadata-enriched routing configuration for better decision context\n- Fallback error handling for unmatched requests\n- Cached message passing between nodes using Redis\n- Asynchronous execution with semaphore control\n- Comprehensive error handling and reporting\n\nFor a complete working example of Agent Directed Flow Control, check out the `examples/agent_directed_flow.py` file which demonstrates a content creation system that intelligently routes tasks between blog writing and code generation modules.\n\n### Middleware System\nMiddleware allows for processing interception:\n```python\nclass Middleware:\n    async def process(self, data: Dict[str, Any], cache: Cache) -> Dict[str, Any]:\n        return data  # Process or transform data\n```\n\n### Execution State\nThe framework maintains execution state through:\n```python\nclass ExecutionState:\n    execution_index: int              # Current execution position\n    current_plan: ExecutionPlan       # Active execution plan\n    execution_history: List[ExecutionPlan]  # Historical plans\n    total_iterations: int             # Total execution iterations\n```\n\n### Snapshot Control\nState persistence is managed through `SnapshotControl`:\n- `NEVER`: No snapshots taken\n- `ONCE`: Single snapshot capture\n- `ALWAYS`: Continuous state capture\n\n## Setup and Configuration\n\n### Redis Cache Setup\n```python\ncache = RedisCache(\n    host=\"localhost\",  # Redis host\n    port=6379,        # Redis port\n    db=0,             # Database number\n    password=None,    # Optional password\n    default_prefix=\"\" # Optional key prefix\n)\n```\n\n### Inference Client Setup\n```python\n# Anthropic Client\nclient = AnthropicInferenceClient(\n    model=\"claude-3-5-sonnet-20241022\",\n    api_key=\"your-api-key\",\n    api_url=\"https://api.anthropic.com/v1/messages\"\n)\n\n# AWS Bedrock Client\nclient = BedrockInferenceClient(\n    model=\"anthropic.claude-3-5-sonnet-20241022-v2:0\",\n    region_name=\"us-east-1\"\n)\n```\n\nThere is similar set up for VertexAI and OpenAI\n\n### Task and Graph Setup\n```python\n# Create a task\ntask = Task(\n    type=\"processing\",\n    objective=\"Process data\",\n    params={\"input\": \"data\"}\n)\n\n# Create nodes with different module types\nexecutor_node = Node(\n    name=\"executor\",\n    type=ModuleType.EXECUTOR,\n    modules=[ExecutorModule()],\n    dependencies=[\"planner\"]\n)\n\nflow_control = Node(\n    name=\"flow_control\",\n    type=ModuleType.FLOW_CONTROL,\n    modules=[FlowControlModule(\n        flow_config=FlowControlConfig(\n            decisions=[\n                FlowDecision(\n                    next_node=\"executor\",\n                    condition=\"task.ready == true\" # Conditions are small lua scripts that get run based on current state.\n                )\n            ],\n            default=\"planner\"\n        )\n    )]\n)\n\n# Set up the agent\nagent = Agent(\n    cache=RedisCache(),\n    max_concurrent_tasks=5,\n    max_total_iterations=100\n)\n\n# Add nodes to the agent\nagent.add_multiple_nodes([executor_node, flow_control])\n\n# Run the task\nawait agent.run_task(task)\n```\n\n## Advanced Features\n\n### Middleware System\n```python\nclass LoggingMiddleware(Middleware):\n    async def process(self, data: Dict[str, Any], cache: Cache) -> Dict[str, Any]:\n        print(f\"Processing data: {data}\")\n        return data\n\nnode = Node(\n    name=\"executor\",\n    type=ModuleType.EXECUTOR,\n    modules=[ExecutorModule()],\n    config=ModuleConfig(\n        middlewares=[LoggingMiddleware()],\n        timeout=30.0\n    )\n)\n```\n\n### Execution State Management\n- Tracks execution history\n- Supports execution plan compilation\n- Enables dynamic plan modification\n- Provides state restoration capabilities\n```python\n# Access execution state\ncurrent_plan = agent.execution_state.current_plan\nexecution_history = agent.execution_state.execution_history\ntotal_iterations = agent.execution_state.total_iterations\n\n# Compile execution plans\nfull_plan = agent.compile_execution_plan()\npartial_plan = agent.compile_execution_plan_from(\"specific_node\")\n\n# Restore from snapshot\nawait agent.run_from_snapshot(snapshot_dir)\n```\n\n### OpenTelemetry Integration\n- Automatic span creation for nodes\n- Execution tracking\n- Performance monitoring\n- Error tracing\n```python\nnode = Node(\n    name=\"traced_node\",\n    type=ModuleType.EXECUTOR,\n    modules=[ExecutorModule()],\n    trace=True  # Enable OpenTelemetry tracing\n)\n```\n\n## Performance Considerations\n\n### Caching\n- Use appropriate key prefixes/suffixes for namespace isolation\n- Consider timeout settings for blocking operations\n- Monitor Redis memory usage\n- Use raw mode when bypassing JSON serialization\n\n### Inference\n- Token usage is tracked automatically\n- Streaming reduces time-to-first-token\n- Tool calls support iteration limits\n- Prompt caching can improve response times\n\n### Task Management\n- Tasks support partial failure states\n- Use UUIDs for guaranteed uniqueness\n- Status transitions are atomic\n\n## Development\n\n### Running Tests\n```bash\npytest tests/\n```\n\n### Required Dependencies\n- Redis server (If using caching)\n- Python 3.12+\n- See requirements.txt for Python packages\n\n## License\n\nApacheV2\n",
    "bugtrack_url": null,
    "license": "Apache-2.0",
    "summary": "A library of primitives for building agentic flows.",
    "version": "0.4.6",
    "project_urls": {
        "Documentation": "https://github.com/BismuthCloud/asimov",
        "Homepage": "https://github.com/BismuthCloud/asimov",
        "Issues": "https://github.com/BismuthCloud/asimov/issues",
        "Source": "https://github.com/BismuthCloud/asimov",
        "URL": "https://github.com/BismuthCloud/asimov"
    },
    "split_keywords": [
        "agent",
        " ai",
        " bismuth",
        " git"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "7a5394c5416b8cca57c3f2b79f147561ac3b87a3d23e4393fff52ee365bc69a0",
                "md5": "c4e26a8d097e34e6ae3a6c7867b6d89e",
                "sha256": "fbb32c2b549faf61c43056ebd27ba97cc991b9349d87a34ef402f20a635ba3d1"
            },
            "downloads": -1,
            "filename": "asimov_agents-0.4.6-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "c4e26a8d097e34e6ae3a6c7867b6d89e",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.12",
            "size": 35402,
            "upload_time": "2024-12-07T00:47:26",
            "upload_time_iso_8601": "2024-12-07T00:47:26.969436Z",
            "url": "https://files.pythonhosted.org/packages/7a/53/94c5416b8cca57c3f2b79f147561ac3b87a3d23e4393fff52ee365bc69a0/asimov_agents-0.4.6-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "f7f57969a76c9067a1b4c3563e0d64d298a0472fbf6881631516e0d19992bf0a",
                "md5": "6f366b2e1b76d3219c63875ed73cf79c",
                "sha256": "49933c919a1ed72f2b37292a0d54b8162194b39767a9a183595645aee3283c2c"
            },
            "downloads": -1,
            "filename": "asimov_agents-0.4.6.tar.gz",
            "has_sig": false,
            "md5_digest": "6f366b2e1b76d3219c63875ed73cf79c",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.12",
            "size": 35067,
            "upload_time": "2024-12-07T00:47:25",
            "upload_time_iso_8601": "2024-12-07T00:47:25.516415Z",
            "url": "https://files.pythonhosted.org/packages/f7/f5/7969a76c9067a1b4c3563e0d64d298a0472fbf6881631516e0d19992bf0a/asimov_agents-0.4.6.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-12-07 00:47:25",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "BismuthCloud",
    "github_project": "asimov",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "asimov-agents"
}
        
Elapsed time: 0.38486s