eve-bus


Nameeve-bus JSON
Version 0.1.2 PyPI version JSON
download
home_pagehttps://github.com/DotNetAge/eve-bus
SummaryA lightweight event bus implementation using Redis
upload_time2025-08-21 16:27:39
maintainerNone
docs_urlNone
authorRay
requires_python>=3.8
licenseNone
keywords
VCS
bugtrack_url
requirements redis pydantic dependency-injector python-dotenv
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Eve Bus

A lightweight, reliable event bus implementation using Redis as the message broker. This library provides a simple interface for publishing and subscribing to events in distributed systems, enabling efficient communication between components in microservice architectures.

## Features

- **Simple yet powerful API**: Intuitive interface for publishing and subscribing to events
- **Redis-backed**: Leverages Redis' pub/sub functionality for robust message delivery
- **Thread-safe implementation**: Safe for use in multi-threaded environments
- **Multiple handlers per event**: Support for registering multiple functions to handle the same event
- **Automatic retry mechanism**: Built-in support for retrying failed event handlers
- **Clean shutdown**: Proper resource management and graceful shutdown
- **Dependency injection ready**: Easy integration with popular DI frameworks
- **Distributed system friendly**: Ideal for communication between services in microservice architectures
- **Type-safe events**: Leverages Python's type hinting for event data structures
- **Minimal dependencies**: Lightweight with few external dependencies

## Architecture Highlights

Eve Bus follows a clean architecture approach with clear separation of concerns:

- **Domain layer**: Defines core event models
- **Ports layer**: Specifies abstract interfaces for event publishing and subscription
- **Adapters layer**: Implements the ports using Redis as the underlying technology

This architecture ensures loose coupling between your application code and the event bus implementation, making it easy to swap out the underlying message broker if needed.

## Installation

```bash
pip install eve-bus
```

## Usage

### Basic Setup

```python
from redis import Redis
from eve.core import RedisEventBus, subscribe, publish, set_event_bus, Event

# Create a Redis client
redis_client = Redis(host='localhost', port=6379, db=0)

# Create an event bus instance
event_bus = RedisEventBus(redis_client)

# Set the global event bus instance (required for using @subscribe decorator)
set_event_bus(event_bus)

# Define an event class
class UserCreated(Event):
    user_id: str
    username: str
    email: str

# Subscribe to an event
@subscribe('UserCreated')
def handle_user_created(event_data):
    print(f"New user created: {event_data['username']} ({event_data['email']})")
    print(f"User ID: {event_data['user_id']}")

# Publish an event
user_created_event = UserCreated(user_id='123', username='john_doe', email='john@example.com')
publish(user_created_event)

# When done, shutdown the event bus to clean up resources
event_bus.shutdown()
```

### Using with Dependency Injection

Eve Bus integrates seamlessly with dependency injection frameworks like `dependency_injector`:

```python
from dependency_injector import containers, providers
from redis import Redis
from eve.core import RedisEventBus, subscribe, set_event_bus, Event

class Container(containers.DeclarativeContainer):
    # Redis client provider
    redis_client = providers.Singleton(
        Redis,
        host='localhost',
        port=6379,
        db=0
    )

    # Event bus provider
    event_bus = providers.Singleton(RedisEventBus, redis_client=redis_client)

# Define a service that uses the event bus
class UserService:
    def __init__(self, event_bus: RedisEventBus):
        self.event_bus = event_bus
    
    def create_user(self, user_id: str, username: str, email: str):
        # Create user logic here
        print(f"Creating user: {username}")
        
        # Publish user created event
        event = UserCreated(user_id=user_id, username=username, email=email)
        self.event_bus.publish(event)
        
        return {"user_id": user_id, "username": username}

# Add the service to the container
Container.user_service = providers.Factory(UserService, event_bus=Container.event_bus)

# Create a container instance
container = Container()

# Get the event bus instance and set it globally
set_event_bus(container.event_bus())

# Get the user service
user_service = container.user_service()

# Use the service
user_service.create_user("456", "jane_doe", "jane@example.com")
```

### Integration with FastAPI

Here's how to integrate Eve Bus with FastAPI:

```python
from fastapi import FastAPI, Depends, BackgroundTasks
from redis import Redis
from eve.core import RedisEventBus, subscribe, publish, set_event_bus, Event
from pydantic import BaseModel
import uvicorn

# Define event models
class OrderCreated(Event):
    order_id: str
    user_id: str
    total_amount: float

class PaymentReceived(Event):
    payment_id: str
    order_id: str
    amount: float

# FastAPI app instance
app = FastAPI(title="Eve Bus with FastAPI")

# Redis and Event Bus setup
class EventBusManager:
    _instance: RedisEventBus = None
    
    @classmethod
    def get_event_bus(cls):
        if cls._instance is None:
            redis_client = Redis(host='localhost', port=6379, db=0)
            cls._instance = RedisEventBus(redis_client)
            set_event_bus(cls._instance)
        return cls._instance

    @classmethod
    def shutdown(cls):
        if cls._instance:
            cls._instance.shutdown()
            cls._instance = None

    # Dependency to get the event bus
    @classmethod
    def get_event_bus(cls):
        return cls._instance

# Pydantic models for API requests
class CreateOrderRequest(BaseModel):
    user_id: str
    items: list
    total_amount: float

# API endpoint to create an order
@app.post("/orders/")
async def create_order(
    request: CreateOrderRequest,
    event_bus: RedisEventBus = Depends(get_event_bus)
):
    # Generate order ID (in real app, use a proper ID generation strategy)
    order_id = f"order_{hash(request.user_id + str(request.total_amount))}"
    
    # Create and publish the order created event
    event = OrderCreated(
        order_id=order_id,
        user_id=request.user_id,
        total_amount=request.total_amount
    )
    event_bus.publish(event)
    
    return {"order_id": order_id, "status": "created"}

# API endpoint to process payment
@app.post("/payments/")
async def process_payment(
    order_id: str,
    amount: float,
    background_tasks: BackgroundTasks,
    event_bus: RedisEventBus = Depends(get_event_bus)
):
    # Process payment logic here
    payment_id = f"payment_{hash(order_id + str(amount))}"
    
    # Publish payment received event using background task
    background_tasks.add_task(
        event_bus.publish,
        PaymentReceived(payment_id=payment_id, order_id=order_id, amount=amount)
    )
    
    return {"payment_id": payment_id, "status": "processing"}

# Event handler for order creation
@subscribe("OrderCreated")
def handle_order_created(event_data):
    print(f"New order received: {event_data['order_id']}")
    print(f"User ID: {event_data['user_id']}")
    print(f"Total amount: ${event_data['total_amount']}")
    # Here you could update inventory, send notifications, etc.

# Event handler for payment processing
@subscribe("PaymentReceived")
def handle_payment_received(event_data):
    print(f"Payment received: {event_data['payment_id']}")
    print(f"For order: {event_data['order_id']}")
    print(f"Amount: ${event_data['amount']}")
    # Here you could update order status, generate invoice, etc.

# Shutdown event bus on app exit
@app.on_event("shutdown")
async def shutdown_event_bus():
    event_bus = EventBusManager.get_event_bus()
    event_bus.shutdown()

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)
```

## Configuration

Eve Bus can be configured using environment variables. Create a `.env` file in your project root or set these variables in your environment:

### Core Configuration

- `EVENT_CHANNEL`: Prefix for Redis channels (default: 'event')
  This prefix is used when creating Redis channels for event communication.

### Redis Configuration

- `REDIS_HOST`: Redis server hostname or IP address (default: 'localhost')
- `REDIS_PORT`: Redis server port (default: 6379)
- `REDIS_DB`: Redis database number to use (default: 0)
- `REDIS_PASSWORD`: Redis server password (optional, default: None)

### Example .env File

```env
# Eve Bus Configuration
EVENT_CHANNEL=my_app_events

# Redis Configuration
REDIS_HOST=redis.example.com
REDIS_PORT=6379
REDIS_DB=2
REDIS_PASSWORD=my_secure_password
```

For advanced usage, you can also configure the Redis client directly when creating the `RedisEventBus` instance, overriding any environment variables:

## Best Practices

When using Eve Bus in your applications, consider these best practices:

1. **Define clear event structures**: Create well-defined event classes that clearly represent the data being communicated
2. **Use meaningful event names**: Choose descriptive names for your events to make the system easier to understand
3. **Handle failures gracefully**: Implement error handling in your event handlers
4. **Keep handlers focused**: Each event handler should perform a single responsibility
5. **Monitor event processing**: Add logging to track event publishing and handling
6. **Always shutdown**: Call `event_bus.shutdown()` when your application exits to clean up resources

## Advanced Usage

### Batch Processing
Eve Bus includes support for batching events, which can improve performance in high-throughput scenarios.

### Error Handling and Retries
You can implement custom retry logic for failed event handlers to improve system resilience.

### Event Filtering
Implement event filtering to process only the events that are relevant to your application components.

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

## License

This project is licensed under the MIT License - see the LICENSE file for details.

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/DotNetAge/eve-bus",
    "name": "eve-bus",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": null,
    "keywords": null,
    "author": "Ray",
    "author_email": "Ray <ray@rayainfo.cn>",
    "download_url": "https://files.pythonhosted.org/packages/22/e7/689b1d3c3a95a061a52a7ea092e6410147ea78da05df981b63cb483af690/eve_bus-0.1.2.tar.gz",
    "platform": null,
    "description": "# Eve Bus\n\nA lightweight, reliable event bus implementation using Redis as the message broker. This library provides a simple interface for publishing and subscribing to events in distributed systems, enabling efficient communication between components in microservice architectures.\n\n## Features\n\n- **Simple yet powerful API**: Intuitive interface for publishing and subscribing to events\n- **Redis-backed**: Leverages Redis' pub/sub functionality for robust message delivery\n- **Thread-safe implementation**: Safe for use in multi-threaded environments\n- **Multiple handlers per event**: Support for registering multiple functions to handle the same event\n- **Automatic retry mechanism**: Built-in support for retrying failed event handlers\n- **Clean shutdown**: Proper resource management and graceful shutdown\n- **Dependency injection ready**: Easy integration with popular DI frameworks\n- **Distributed system friendly**: Ideal for communication between services in microservice architectures\n- **Type-safe events**: Leverages Python's type hinting for event data structures\n- **Minimal dependencies**: Lightweight with few external dependencies\n\n## Architecture Highlights\n\nEve Bus follows a clean architecture approach with clear separation of concerns:\n\n- **Domain layer**: Defines core event models\n- **Ports layer**: Specifies abstract interfaces for event publishing and subscription\n- **Adapters layer**: Implements the ports using Redis as the underlying technology\n\nThis architecture ensures loose coupling between your application code and the event bus implementation, making it easy to swap out the underlying message broker if needed.\n\n## Installation\n\n```bash\npip install eve-bus\n```\n\n## Usage\n\n### Basic Setup\n\n```python\nfrom redis import Redis\nfrom eve.core import RedisEventBus, subscribe, publish, set_event_bus, Event\n\n# Create a Redis client\nredis_client = Redis(host='localhost', port=6379, db=0)\n\n# Create an event bus instance\nevent_bus = RedisEventBus(redis_client)\n\n# Set the global event bus instance (required for using @subscribe decorator)\nset_event_bus(event_bus)\n\n# Define an event class\nclass UserCreated(Event):\n    user_id: str\n    username: str\n    email: str\n\n# Subscribe to an event\n@subscribe('UserCreated')\ndef handle_user_created(event_data):\n    print(f\"New user created: {event_data['username']} ({event_data['email']})\")\n    print(f\"User ID: {event_data['user_id']}\")\n\n# Publish an event\nuser_created_event = UserCreated(user_id='123', username='john_doe', email='john@example.com')\npublish(user_created_event)\n\n# When done, shutdown the event bus to clean up resources\nevent_bus.shutdown()\n```\n\n### Using with Dependency Injection\n\nEve Bus integrates seamlessly with dependency injection frameworks like `dependency_injector`:\n\n```python\nfrom dependency_injector import containers, providers\nfrom redis import Redis\nfrom eve.core import RedisEventBus, subscribe, set_event_bus, Event\n\nclass Container(containers.DeclarativeContainer):\n    # Redis client provider\n    redis_client = providers.Singleton(\n        Redis,\n        host='localhost',\n        port=6379,\n        db=0\n    )\n\n    # Event bus provider\n    event_bus = providers.Singleton(RedisEventBus, redis_client=redis_client)\n\n# Define a service that uses the event bus\nclass UserService:\n    def __init__(self, event_bus: RedisEventBus):\n        self.event_bus = event_bus\n    \n    def create_user(self, user_id: str, username: str, email: str):\n        # Create user logic here\n        print(f\"Creating user: {username}\")\n        \n        # Publish user created event\n        event = UserCreated(user_id=user_id, username=username, email=email)\n        self.event_bus.publish(event)\n        \n        return {\"user_id\": user_id, \"username\": username}\n\n# Add the service to the container\nContainer.user_service = providers.Factory(UserService, event_bus=Container.event_bus)\n\n# Create a container instance\ncontainer = Container()\n\n# Get the event bus instance and set it globally\nset_event_bus(container.event_bus())\n\n# Get the user service\nuser_service = container.user_service()\n\n# Use the service\nuser_service.create_user(\"456\", \"jane_doe\", \"jane@example.com\")\n```\n\n### Integration with FastAPI\n\nHere's how to integrate Eve Bus with FastAPI:\n\n```python\nfrom fastapi import FastAPI, Depends, BackgroundTasks\nfrom redis import Redis\nfrom eve.core import RedisEventBus, subscribe, publish, set_event_bus, Event\nfrom pydantic import BaseModel\nimport uvicorn\n\n# Define event models\nclass OrderCreated(Event):\n    order_id: str\n    user_id: str\n    total_amount: float\n\nclass PaymentReceived(Event):\n    payment_id: str\n    order_id: str\n    amount: float\n\n# FastAPI app instance\napp = FastAPI(title=\"Eve Bus with FastAPI\")\n\n# Redis and Event Bus setup\nclass EventBusManager:\n    _instance: RedisEventBus = None\n    \n    @classmethod\n    def get_event_bus(cls):\n        if cls._instance is None:\n            redis_client = Redis(host='localhost', port=6379, db=0)\n            cls._instance = RedisEventBus(redis_client)\n            set_event_bus(cls._instance)\n        return cls._instance\n\n    @classmethod\n    def shutdown(cls):\n        if cls._instance:\n            cls._instance.shutdown()\n            cls._instance = None\n\n    # Dependency to get the event bus\n    @classmethod\n    def get_event_bus(cls):\n        return cls._instance\n\n# Pydantic models for API requests\nclass CreateOrderRequest(BaseModel):\n    user_id: str\n    items: list\n    total_amount: float\n\n# API endpoint to create an order\n@app.post(\"/orders/\")\nasync def create_order(\n    request: CreateOrderRequest,\n    event_bus: RedisEventBus = Depends(get_event_bus)\n):\n    # Generate order ID (in real app, use a proper ID generation strategy)\n    order_id = f\"order_{hash(request.user_id + str(request.total_amount))}\"\n    \n    # Create and publish the order created event\n    event = OrderCreated(\n        order_id=order_id,\n        user_id=request.user_id,\n        total_amount=request.total_amount\n    )\n    event_bus.publish(event)\n    \n    return {\"order_id\": order_id, \"status\": \"created\"}\n\n# API endpoint to process payment\n@app.post(\"/payments/\")\nasync def process_payment(\n    order_id: str,\n    amount: float,\n    background_tasks: BackgroundTasks,\n    event_bus: RedisEventBus = Depends(get_event_bus)\n):\n    # Process payment logic here\n    payment_id = f\"payment_{hash(order_id + str(amount))}\"\n    \n    # Publish payment received event using background task\n    background_tasks.add_task(\n        event_bus.publish,\n        PaymentReceived(payment_id=payment_id, order_id=order_id, amount=amount)\n    )\n    \n    return {\"payment_id\": payment_id, \"status\": \"processing\"}\n\n# Event handler for order creation\n@subscribe(\"OrderCreated\")\ndef handle_order_created(event_data):\n    print(f\"New order received: {event_data['order_id']}\")\n    print(f\"User ID: {event_data['user_id']}\")\n    print(f\"Total amount: ${event_data['total_amount']}\")\n    # Here you could update inventory, send notifications, etc.\n\n# Event handler for payment processing\n@subscribe(\"PaymentReceived\")\ndef handle_payment_received(event_data):\n    print(f\"Payment received: {event_data['payment_id']}\")\n    print(f\"For order: {event_data['order_id']}\")\n    print(f\"Amount: ${event_data['amount']}\")\n    # Here you could update order status, generate invoice, etc.\n\n# Shutdown event bus on app exit\n@app.on_event(\"shutdown\")\nasync def shutdown_event_bus():\n    event_bus = EventBusManager.get_event_bus()\n    event_bus.shutdown()\n\nif __name__ == \"__main__\":\n    uvicorn.run(app, host=\"0.0.0.0\", port=8000)\n```\n\n## Configuration\n\nEve Bus can be configured using environment variables. Create a `.env` file in your project root or set these variables in your environment:\n\n### Core Configuration\n\n- `EVENT_CHANNEL`: Prefix for Redis channels (default: 'event')\n  This prefix is used when creating Redis channels for event communication.\n\n### Redis Configuration\n\n- `REDIS_HOST`: Redis server hostname or IP address (default: 'localhost')\n- `REDIS_PORT`: Redis server port (default: 6379)\n- `REDIS_DB`: Redis database number to use (default: 0)\n- `REDIS_PASSWORD`: Redis server password (optional, default: None)\n\n### Example .env File\n\n```env\n# Eve Bus Configuration\nEVENT_CHANNEL=my_app_events\n\n# Redis Configuration\nREDIS_HOST=redis.example.com\nREDIS_PORT=6379\nREDIS_DB=2\nREDIS_PASSWORD=my_secure_password\n```\n\nFor advanced usage, you can also configure the Redis client directly when creating the `RedisEventBus` instance, overriding any environment variables:\n\n## Best Practices\n\nWhen using Eve Bus in your applications, consider these best practices:\n\n1. **Define clear event structures**: Create well-defined event classes that clearly represent the data being communicated\n2. **Use meaningful event names**: Choose descriptive names for your events to make the system easier to understand\n3. **Handle failures gracefully**: Implement error handling in your event handlers\n4. **Keep handlers focused**: Each event handler should perform a single responsibility\n5. **Monitor event processing**: Add logging to track event publishing and handling\n6. **Always shutdown**: Call `event_bus.shutdown()` when your application exits to clean up resources\n\n## Advanced Usage\n\n### Batch Processing\nEve Bus includes support for batching events, which can improve performance in high-throughput scenarios.\n\n### Error Handling and Retries\nYou can implement custom retry logic for failed event handlers to improve system resilience.\n\n### Event Filtering\nImplement event filtering to process only the events that are relevant to your application components.\n\n## Contributing\n\nContributions are welcome! Please feel free to submit a Pull Request.\n\n## License\n\nThis project is licensed under the MIT License - see the LICENSE file for details.\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "A lightweight event bus implementation using Redis",
    "version": "0.1.2",
    "project_urls": {
        "Bug Tracker": "https://github.com/DotNetAge/eve-bus/issues",
        "Homepage": "https://github.com/DotNetAge/eve-bus"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "3b2fe19221bb71fb8bbd4b1abb2c895884b063cd41ea90d0203ff0381a4fbd7a",
                "md5": "4447a6e8a750a200c312f3622c034ea7",
                "sha256": "7c0a591b5e7e54f5ddb33dfe711f1fc979d68d3a1e5b40eef4a8015204bd18a5"
            },
            "downloads": -1,
            "filename": "eve_bus-0.1.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "4447a6e8a750a200c312f3622c034ea7",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 12995,
            "upload_time": "2025-08-21T16:27:37",
            "upload_time_iso_8601": "2025-08-21T16:27:37.599258Z",
            "url": "https://files.pythonhosted.org/packages/3b/2f/e19221bb71fb8bbd4b1abb2c895884b063cd41ea90d0203ff0381a4fbd7a/eve_bus-0.1.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "22e7689b1d3c3a95a061a52a7ea092e6410147ea78da05df981b63cb483af690",
                "md5": "1ecb9ba34f41492a68e1e41afac92c20",
                "sha256": "a9c13479f299cd536b981fab1a7403710cf37af7a1412472c6627d3abe9100d4"
            },
            "downloads": -1,
            "filename": "eve_bus-0.1.2.tar.gz",
            "has_sig": false,
            "md5_digest": "1ecb9ba34f41492a68e1e41afac92c20",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 16250,
            "upload_time": "2025-08-21T16:27:39",
            "upload_time_iso_8601": "2025-08-21T16:27:39.149276Z",
            "url": "https://files.pythonhosted.org/packages/22/e7/689b1d3c3a95a061a52a7ea092e6410147ea78da05df981b63cb483af690/eve_bus-0.1.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-08-21 16:27:39",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "DotNetAge",
    "github_project": "eve-bus",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "requirements": [
        {
            "name": "redis",
            "specs": [
                [
                    ">=",
                    "5.0.0"
                ]
            ]
        },
        {
            "name": "pydantic",
            "specs": [
                [
                    ">=",
                    "2.0.0"
                ]
            ]
        },
        {
            "name": "dependency-injector",
            "specs": [
                [
                    ">=",
                    "4.41.0"
                ]
            ]
        },
        {
            "name": "python-dotenv",
            "specs": [
                [
                    "==",
                    "1.0.1"
                ]
            ]
        }
    ],
    "lcname": "eve-bus"
}
        
Ray
Elapsed time: 0.42946s