a2a-redis


Namea2a-redis JSON
Version 0.1.1 PyPI version JSON
download
home_pageNone
SummaryRedis components for the Agent-to-Agent (A2A) Python SDK
upload_time2025-08-02 00:36:22
maintainerNone
docs_urlNone
authorRedis, Inc., Andrew Brookins
requires_python>=3.11
licenseMIT
keywords a2a agent redis queue task-store
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # a2a-redis

Redis integrations for the Agent-to-Agent (A2A) Python SDK.

This package provides Redis-backed implementations of core A2A components for persistent task storage, reliable event queue management, and push notification configuration using Redis.

## Features

- **RedisTaskStore & RedisJSONTaskStore**: Redis-backed task storage using hashes or JSON
- **RedisStreamsQueueManager & RedisStreamsEventQueue**: Persistent, reliable event queues with consumer groups
- **RedisPubSubQueueManager & RedisPubSubEventQueue**: Real-time, low-latency event broadcasting
- **RedisPushNotificationConfigStore**: Task-based push notification configuration storage
- **Consumer Group Strategies for Streams**: Flexible load balancing and instance isolation patterns

## Installation

```bash
pip install a2a-redis
```

## Quick Start

```python
from a2a_redis import RedisTaskStore, RedisStreamsQueueManager, RedisPushNotificationConfigStore
from a2a_redis.utils import create_redis_client
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.apps import A2AStarletteApplication

# Create Redis client with connection management
redis_client = create_redis_client(url="redis://localhost:6379/0", max_connections=50)

# Initialize Redis components
task_store = RedisTaskStore(redis_client, prefix="myapp:tasks:")
queue_manager = RedisStreamsQueueManager(redis_client, prefix="myapp:queues:")
push_config_store = RedisPushNotificationConfigStore(redis_client, prefix="myapp:push:")

# Use with A2A request handler
request_handler = DefaultRequestHandler(
    agent_executor=YourAgentExecutor(),
    task_store=task_store,
    queue_manager=queue_manager,
    push_config_store=push_config_store,
)

# Create A2A server
server = A2AStarletteApplication(
    agent_card=your_agent_card,
    http_handler=request_handler
)
```

## Queue Components

The package provides both high-level queue managers and direct queue implementations:

### Queue Managers
- `RedisStreamsQueueManager` - Manages Redis Streams-based queues
- `RedisPubSubQueueManager` - Manages Redis Pub/Sub-based queues
- Both implement the A2A SDK's `QueueManager` interface

### Event Queues
- `RedisStreamsEventQueue` - Direct Redis Streams queue implementation
- `RedisPubSubEventQueue` - Direct Redis Pub/Sub queue implementation
- Both implement the `EventQueue` interface through protocol compliance

## Queue Manager Types: Streams vs Pub/Sub

### RedisStreamsQueueManager

**Key Features:**
- **Persistent storage**: Events remain in streams until explicitly trimmed
- **Guaranteed delivery**: Consumer groups with acknowledgments prevent message loss
- **Load balancing**: Multiple consumers can share work via consumer groups
- **Failure recovery**: Unacknowledged messages can be reclaimed by other consumers
- **Event replay**: Historical events can be re-read from any point in time
- **Ordering**: Maintains strict insertion order with unique message IDs

**Use Cases:**
- Task event queues requiring reliability
- Audit trails and event history
- Work distribution systems
- Systems requiring failure recovery
- Multi-consumer load balancing

**Trade-offs:**
- Higher memory usage (events persist)
- More complex setup (consumer groups)
- Slightly higher latency than pub/sub

### RedisPubSubQueueManager

**Key Features:**
- **Real-time delivery**: Events delivered immediately to active subscribers
- **No persistence**: Events not stored, only delivered to active consumers
- **Fire-and-forget**: No acknowledgments or delivery guarantees
- **Broadcasting**: All subscribers receive all events
- **Low latency**: Minimal overhead for immediate delivery
- **Minimal memory usage**: No storage of events

**Use Cases:**
- Live status updates and notifications
- Real-time dashboard updates
- System event broadcasting
- Non-critical event distribution
- Low-latency requirements
- Simple fan-out scenarios

**Not suitable for:**
- Critical event processing requiring guarantees
- Systems requiring event replay or audit trails
- Offline-capable applications
- Work queues requiring load balancing

## Components

### Task Storage

#### RedisTaskStore
Stores task data in Redis using hashes with JSON serialization. Works with any Redis server.

```python
from a2a_redis import RedisTaskStore

task_store = RedisTaskStore(redis_client, prefix="mytasks:")

# A2A TaskStore interface methods
await task_store.save("task123", {"status": "pending", "data": {"key": "value"}})
task = await task_store.get("task123")
success = await task_store.delete("task123")

# List all task IDs (utility method)
task_ids = await task_store.list_task_ids()
```

#### RedisJSONTaskStore
Stores task data using Redis's JSON module for native JSON operations and complex nested data.

```python
from a2a_redis import RedisJSONTaskStore

# Requires Redis 8 or RedisJSON module
json_task_store = RedisJSONTaskStore(redis_client, prefix="mytasks:")

# Same interface as RedisTaskStore but with native JSON support
await json_task_store.save("task123", {"complex": {"nested": {"data": "value"}}})
```

### Queue Managers

Both queue managers implement the A2A QueueManager interface with full async support:

```python
import asyncio
from a2a_redis import RedisStreamsQueueManager, RedisPubSubQueueManager
from a2a_redis.streams_consumer_strategy import ConsumerGroupConfig, ConsumerGroupStrategy

# Choose based on your requirements:

# For reliable, persistent processing
streams_manager = RedisStreamsQueueManager(redis_client, prefix="myapp:streams:")

# For real-time, low-latency broadcasting
pubsub_manager = RedisPubSubQueueManager(redis_client, prefix="myapp:pubsub:")

# With custom consumer group configuration (streams only)
config = ConsumerGroupConfig(strategy=ConsumerGroupStrategy.SHARED_LOAD_BALANCING)
streams_manager = RedisStreamsQueueManager(redis_client, consumer_config=config)

async def main():
    # Same interface for both managers
    queue = await streams_manager.create_or_tap("task123")

    # Enqueue events
    await queue.enqueue_event({"type": "progress", "message": "Task started"})
    await queue.enqueue_event({"type": "progress", "message": "50% complete"})

    # Dequeue events
    try:
        event = await queue.dequeue_event(no_wait=True)  # Non-blocking
        print(f"Got event: {event}")
        await queue.task_done()  # Acknowledge the message (streams only)
    except RuntimeError:
        print("No events available")

    # Close queue when done
    await queue.close()

asyncio.run(main())
```

### Consumer Group Strategies

The Streams queue manager supports different consumer group strategies:

```python
from a2a_redis.streams_consumer_strategy import ConsumerGroupStrategy, ConsumerGroupConfig

# Multiple instances share work across a single consumer group
config = ConsumerGroupConfig(strategy=ConsumerGroupStrategy.SHARED_LOAD_BALANCING)

# Each instance gets its own consumer group
config = ConsumerGroupConfig(strategy=ConsumerGroupStrategy.INSTANCE_ISOLATED)

# Custom consumer group name
config = ConsumerGroupConfig(strategy=ConsumerGroupStrategy.CUSTOM, group_name="my_group")

streams_manager = RedisStreamsQueueManager(redis_client, consumer_config=config)
```

### RedisPushNotificationConfigStore

Stores push notification configurations per task. Implements the A2A PushNotificationConfigStore interface.

```python
from a2a_redis import RedisPushNotificationConfigStore
from a2a.types import PushNotificationConfig

config_store = RedisPushNotificationConfigStore(redis_client, prefix="myapp:push:")

# Create push notification config
config = PushNotificationConfig(
    url="https://webhook.example.com/notify",
    token="secret_token",
    id="webhook_1"
)

# A2A interface methods
await config_store.set_info("task123", config)

# Get all configs for a task
configs = await config_store.get_info("task123")
for config in configs:
    print(f"Config {config.id}: {config.url}")

# Delete specific config or all configs for a task
await config_store.delete_info("task123", "webhook_1")  # Delete specific
await config_store.delete_info("task123")  # Delete all
```

## Requirements

- Python 3.11+
- redis >= 4.0.0
- a2a-sdk >= 0.2.16 (Agent-to-Agent Python SDK)
- uvicorn >= 0.35.0

## Optional Dependencies

- RedisJSON module for `RedisJSONTaskStore` (enhanced nested data support)
- Redis Stack or Redis with modules for full feature support

## Development

```bash
# Clone the repository
git clone https://github.com/a2aproject/a2a-redis.git
cd a2a-redis

# Create virtual environment and install dependencies
uv venv
source .venv/bin/activate  # or .venv\Scripts\activate on Windows
uv sync --dev

# Run tests with coverage
uv run pytest --cov=a2a_redis --cov-report=term-missing

# Run linting and formatting
uv run ruff check src/ tests/
uv run ruff format src/ tests/
uv run pyright src/

# Install pre-commit hooks
uv run pre-commit install

# Run examples
uv run python examples/basic_usage.py
uv run python examples/redis_travel_agent.py
```

## Testing

Tests use Redis database 15 for isolation and include both mock and real Redis integration tests:

```bash
# Run all tests
uv run pytest

# Run specific test file
uv run pytest tests/test_streams_queue_manager.py -v

# Run with coverage
uv run pytest --cov=a2a_redis --cov-report=term-missing
```

## License

MIT License

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "a2a-redis",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.11",
    "maintainer_email": null,
    "keywords": "a2a, agent, redis, queue, task-store",
    "author": "Redis, Inc., Andrew Brookins",
    "author_email": null,
    "download_url": "https://files.pythonhosted.org/packages/f2/3e/0002aa1749d0e20c8a5fb0f5b10d99a56605da3ba115d0311cae3d5a1df1/a2a_redis-0.1.1.tar.gz",
    "platform": null,
    "description": "# a2a-redis\n\nRedis integrations for the Agent-to-Agent (A2A) Python SDK.\n\nThis package provides Redis-backed implementations of core A2A components for persistent task storage, reliable event queue management, and push notification configuration using Redis.\n\n## Features\n\n- **RedisTaskStore & RedisJSONTaskStore**: Redis-backed task storage using hashes or JSON\n- **RedisStreamsQueueManager & RedisStreamsEventQueue**: Persistent, reliable event queues with consumer groups\n- **RedisPubSubQueueManager & RedisPubSubEventQueue**: Real-time, low-latency event broadcasting\n- **RedisPushNotificationConfigStore**: Task-based push notification configuration storage\n- **Consumer Group Strategies for Streams**: Flexible load balancing and instance isolation patterns\n\n## Installation\n\n```bash\npip install a2a-redis\n```\n\n## Quick Start\n\n```python\nfrom a2a_redis import RedisTaskStore, RedisStreamsQueueManager, RedisPushNotificationConfigStore\nfrom a2a_redis.utils import create_redis_client\nfrom a2a.server.request_handlers import DefaultRequestHandler\nfrom a2a.server.apps import A2AStarletteApplication\n\n# Create Redis client with connection management\nredis_client = create_redis_client(url=\"redis://localhost:6379/0\", max_connections=50)\n\n# Initialize Redis components\ntask_store = RedisTaskStore(redis_client, prefix=\"myapp:tasks:\")\nqueue_manager = RedisStreamsQueueManager(redis_client, prefix=\"myapp:queues:\")\npush_config_store = RedisPushNotificationConfigStore(redis_client, prefix=\"myapp:push:\")\n\n# Use with A2A request handler\nrequest_handler = DefaultRequestHandler(\n    agent_executor=YourAgentExecutor(),\n    task_store=task_store,\n    queue_manager=queue_manager,\n    push_config_store=push_config_store,\n)\n\n# Create A2A server\nserver = A2AStarletteApplication(\n    agent_card=your_agent_card,\n    http_handler=request_handler\n)\n```\n\n## Queue Components\n\nThe package provides both high-level queue managers and direct queue implementations:\n\n### Queue Managers\n- `RedisStreamsQueueManager` - Manages Redis Streams-based queues\n- `RedisPubSubQueueManager` - Manages Redis Pub/Sub-based queues\n- Both implement the A2A SDK's `QueueManager` interface\n\n### Event Queues\n- `RedisStreamsEventQueue` - Direct Redis Streams queue implementation\n- `RedisPubSubEventQueue` - Direct Redis Pub/Sub queue implementation\n- Both implement the `EventQueue` interface through protocol compliance\n\n## Queue Manager Types: Streams vs Pub/Sub\n\n### RedisStreamsQueueManager\n\n**Key Features:**\n- **Persistent storage**: Events remain in streams until explicitly trimmed\n- **Guaranteed delivery**: Consumer groups with acknowledgments prevent message loss\n- **Load balancing**: Multiple consumers can share work via consumer groups\n- **Failure recovery**: Unacknowledged messages can be reclaimed by other consumers\n- **Event replay**: Historical events can be re-read from any point in time\n- **Ordering**: Maintains strict insertion order with unique message IDs\n\n**Use Cases:**\n- Task event queues requiring reliability\n- Audit trails and event history\n- Work distribution systems\n- Systems requiring failure recovery\n- Multi-consumer load balancing\n\n**Trade-offs:**\n- Higher memory usage (events persist)\n- More complex setup (consumer groups)\n- Slightly higher latency than pub/sub\n\n### RedisPubSubQueueManager\n\n**Key Features:**\n- **Real-time delivery**: Events delivered immediately to active subscribers\n- **No persistence**: Events not stored, only delivered to active consumers\n- **Fire-and-forget**: No acknowledgments or delivery guarantees\n- **Broadcasting**: All subscribers receive all events\n- **Low latency**: Minimal overhead for immediate delivery\n- **Minimal memory usage**: No storage of events\n\n**Use Cases:**\n- Live status updates and notifications\n- Real-time dashboard updates\n- System event broadcasting\n- Non-critical event distribution\n- Low-latency requirements\n- Simple fan-out scenarios\n\n**Not suitable for:**\n- Critical event processing requiring guarantees\n- Systems requiring event replay or audit trails\n- Offline-capable applications\n- Work queues requiring load balancing\n\n## Components\n\n### Task Storage\n\n#### RedisTaskStore\nStores task data in Redis using hashes with JSON serialization. Works with any Redis server.\n\n```python\nfrom a2a_redis import RedisTaskStore\n\ntask_store = RedisTaskStore(redis_client, prefix=\"mytasks:\")\n\n# A2A TaskStore interface methods\nawait task_store.save(\"task123\", {\"status\": \"pending\", \"data\": {\"key\": \"value\"}})\ntask = await task_store.get(\"task123\")\nsuccess = await task_store.delete(\"task123\")\n\n# List all task IDs (utility method)\ntask_ids = await task_store.list_task_ids()\n```\n\n#### RedisJSONTaskStore\nStores task data using Redis's JSON module for native JSON operations and complex nested data.\n\n```python\nfrom a2a_redis import RedisJSONTaskStore\n\n# Requires Redis 8 or RedisJSON module\njson_task_store = RedisJSONTaskStore(redis_client, prefix=\"mytasks:\")\n\n# Same interface as RedisTaskStore but with native JSON support\nawait json_task_store.save(\"task123\", {\"complex\": {\"nested\": {\"data\": \"value\"}}})\n```\n\n### Queue Managers\n\nBoth queue managers implement the A2A QueueManager interface with full async support:\n\n```python\nimport asyncio\nfrom a2a_redis import RedisStreamsQueueManager, RedisPubSubQueueManager\nfrom a2a_redis.streams_consumer_strategy import ConsumerGroupConfig, ConsumerGroupStrategy\n\n# Choose based on your requirements:\n\n# For reliable, persistent processing\nstreams_manager = RedisStreamsQueueManager(redis_client, prefix=\"myapp:streams:\")\n\n# For real-time, low-latency broadcasting\npubsub_manager = RedisPubSubQueueManager(redis_client, prefix=\"myapp:pubsub:\")\n\n# With custom consumer group configuration (streams only)\nconfig = ConsumerGroupConfig(strategy=ConsumerGroupStrategy.SHARED_LOAD_BALANCING)\nstreams_manager = RedisStreamsQueueManager(redis_client, consumer_config=config)\n\nasync def main():\n    # Same interface for both managers\n    queue = await streams_manager.create_or_tap(\"task123\")\n\n    # Enqueue events\n    await queue.enqueue_event({\"type\": \"progress\", \"message\": \"Task started\"})\n    await queue.enqueue_event({\"type\": \"progress\", \"message\": \"50% complete\"})\n\n    # Dequeue events\n    try:\n        event = await queue.dequeue_event(no_wait=True)  # Non-blocking\n        print(f\"Got event: {event}\")\n        await queue.task_done()  # Acknowledge the message (streams only)\n    except RuntimeError:\n        print(\"No events available\")\n\n    # Close queue when done\n    await queue.close()\n\nasyncio.run(main())\n```\n\n### Consumer Group Strategies\n\nThe Streams queue manager supports different consumer group strategies:\n\n```python\nfrom a2a_redis.streams_consumer_strategy import ConsumerGroupStrategy, ConsumerGroupConfig\n\n# Multiple instances share work across a single consumer group\nconfig = ConsumerGroupConfig(strategy=ConsumerGroupStrategy.SHARED_LOAD_BALANCING)\n\n# Each instance gets its own consumer group\nconfig = ConsumerGroupConfig(strategy=ConsumerGroupStrategy.INSTANCE_ISOLATED)\n\n# Custom consumer group name\nconfig = ConsumerGroupConfig(strategy=ConsumerGroupStrategy.CUSTOM, group_name=\"my_group\")\n\nstreams_manager = RedisStreamsQueueManager(redis_client, consumer_config=config)\n```\n\n### RedisPushNotificationConfigStore\n\nStores push notification configurations per task. Implements the A2A PushNotificationConfigStore interface.\n\n```python\nfrom a2a_redis import RedisPushNotificationConfigStore\nfrom a2a.types import PushNotificationConfig\n\nconfig_store = RedisPushNotificationConfigStore(redis_client, prefix=\"myapp:push:\")\n\n# Create push notification config\nconfig = PushNotificationConfig(\n    url=\"https://webhook.example.com/notify\",\n    token=\"secret_token\",\n    id=\"webhook_1\"\n)\n\n# A2A interface methods\nawait config_store.set_info(\"task123\", config)\n\n# Get all configs for a task\nconfigs = await config_store.get_info(\"task123\")\nfor config in configs:\n    print(f\"Config {config.id}: {config.url}\")\n\n# Delete specific config or all configs for a task\nawait config_store.delete_info(\"task123\", \"webhook_1\")  # Delete specific\nawait config_store.delete_info(\"task123\")  # Delete all\n```\n\n## Requirements\n\n- Python 3.11+\n- redis >= 4.0.0\n- a2a-sdk >= 0.2.16 (Agent-to-Agent Python SDK)\n- uvicorn >= 0.35.0\n\n## Optional Dependencies\n\n- RedisJSON module for `RedisJSONTaskStore` (enhanced nested data support)\n- Redis Stack or Redis with modules for full feature support\n\n## Development\n\n```bash\n# Clone the repository\ngit clone https://github.com/a2aproject/a2a-redis.git\ncd a2a-redis\n\n# Create virtual environment and install dependencies\nuv venv\nsource .venv/bin/activate  # or .venv\\Scripts\\activate on Windows\nuv sync --dev\n\n# Run tests with coverage\nuv run pytest --cov=a2a_redis --cov-report=term-missing\n\n# Run linting and formatting\nuv run ruff check src/ tests/\nuv run ruff format src/ tests/\nuv run pyright src/\n\n# Install pre-commit hooks\nuv run pre-commit install\n\n# Run examples\nuv run python examples/basic_usage.py\nuv run python examples/redis_travel_agent.py\n```\n\n## Testing\n\nTests use Redis database 15 for isolation and include both mock and real Redis integration tests:\n\n```bash\n# Run all tests\nuv run pytest\n\n# Run specific test file\nuv run pytest tests/test_streams_queue_manager.py -v\n\n# Run with coverage\nuv run pytest --cov=a2a_redis --cov-report=term-missing\n```\n\n## License\n\nMIT License\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Redis components for the Agent-to-Agent (A2A) Python SDK",
    "version": "0.1.1",
    "project_urls": {
        "Homepage": "https://github.com/redis-developer/a2a-redis",
        "Issues": "https://github.com/redis-developer/a2a-redis/issues",
        "Repository": "https://github.com/redis-developer/a2a-redis"
    },
    "split_keywords": [
        "a2a",
        " agent",
        " redis",
        " queue",
        " task-store"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "70a6cf2ae42a3f0688c6c92cf3395602e62aceb150232f54f073114e2470f3ce",
                "md5": "2c2d9550e581488daa52f40e7051c6cc",
                "sha256": "22e69102020784530d6dbc2f1321fa9f0d96c15254b0771b7161e7024ba4af82"
            },
            "downloads": -1,
            "filename": "a2a_redis-0.1.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "2c2d9550e581488daa52f40e7051c6cc",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.11",
            "size": 21311,
            "upload_time": "2025-08-02T00:36:20",
            "upload_time_iso_8601": "2025-08-02T00:36:20.840976Z",
            "url": "https://files.pythonhosted.org/packages/70/a6/cf2ae42a3f0688c6c92cf3395602e62aceb150232f54f073114e2470f3ce/a2a_redis-0.1.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "f23e0002aa1749d0e20c8a5fb0f5b10d99a56605da3ba115d0311cae3d5a1df1",
                "md5": "1d1137ee40d076afe4730679680d56cb",
                "sha256": "740583940995fda97602a2209f46738349eb90fb67f962bfb3e2cf465537ac88"
            },
            "downloads": -1,
            "filename": "a2a_redis-0.1.1.tar.gz",
            "has_sig": false,
            "md5_digest": "1d1137ee40d076afe4730679680d56cb",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.11",
            "size": 87467,
            "upload_time": "2025-08-02T00:36:22",
            "upload_time_iso_8601": "2025-08-02T00:36:22.347007Z",
            "url": "https://files.pythonhosted.org/packages/f2/3e/0002aa1749d0e20c8a5fb0f5b10d99a56605da3ba115d0311cae3d5a1df1/a2a_redis-0.1.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-08-02 00:36:22",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "redis-developer",
    "github_project": "a2a-redis",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "a2a-redis"
}
        
Elapsed time: 2.44817s