eventrabbit


Nameeventrabbit JSON
Version 0.1.1 PyPI version JSON
download
home_pagehttps://your-homepage-url.com
SummaryEvent sourcing library for RabbitMQ (aio_pika)
upload_time2025-07-25 17:55:57
maintainerNone
docs_urlNone
authorArtem
requires_python<4.0,>=3.11
licenseMIT
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # eventrabbit

A library for asynchronous work with RabbitMQ, implementing the **Event Sourcing** pattern. It allows you to easily build an event-driven architecture using decorators for producers, consumers, and functions.

## Purpose

Implements the **event sourcing** pattern: all state changes in the system are represented as events, which are sent and processed via the RabbitMQ message queue.

## Message Structure

All messages processed by the library must have the following format:

```json
{
  "action": "<event_type>",
  "data": { ... } // event parameters
}
```

- `action` — a string that defines the type of event/action
- `data` — a dictionary with event parameters

## Quick Start and Setup

```python
from eventrabbit import build_event_dependencies, RetryConfig

# Create decorators and event handler
retry_config = RetryConfig(max_retries=3, retry_delay_seconds=60)
events, handle = build_event_dependencies(
    url="amqp://user:password@localhost:5672/",
    idle_timeout=300,  # connection idle timeout before closing (seconds)
    retry_config=retry_config,  # retry parameters
)
```

- `url` — RabbitMQ connection string
- `idle_timeout` — connection idle timeout before automatic closing
- `retry_config` — retry parameters (default: infinite retries, 5 seconds delay)

## Using Decorators

### 1. @events.consumer

Registers an async function as a handler for incoming messages with a specific action.

```python
@events.consumer(action="USER_CREATED")
async def handle_user_created(user_id: int, name: str):
    # handle user creation event
    ...
```

- The function must accept parameters matching the keys in `data`.

### 2. @events.producer

Wraps a function so that its result is automatically sent to the queue as an event.

```python
@events.producer(exchange_name="user", action="USER_CREATED")
async def create_user(user_id: int, name: str):
    # user creation logic
    return {"user_id": user_id, "name": name}
```

- `exchange_name` — exchange for publishing the event
- `action` — event type
- `key` (optional) — routing key

### 3. @events.function

Registers a function as an event handler and automatically sends the result to the queue.

```python
@events.function(action="SEND_EMAIL", exchange_name="email", action_reply="EMAIL_SENT")
async def send_email(user_id: int, email: str):
    # email sending logic
    return {"user_id": user_id, "email": email, "status": "sent"}
```

- `action` — incoming event type
- `exchange_name` — exchange for publishing the result
- `action_reply` — event type for the reply (by default, same as action)
- `key` (optional) — routing key

## Important

- All messages must be in the format `{ "action": str, "data": dict }` — otherwise, processing will not occur.
- The library automatically manages the connection and retry logic.

## Minimal Configuration

```python
from eventrabbit import build_event_dependencies

events, handle = build_event_dependencies(
    url="amqp://user:password@localhost:5672/"
)
```

- For advanced scenarios, use the `idle_timeout`, `retry_config`, and other parameters.

---

## Queue Setup and Consumption

To consume queues, use the `ConsumeChannel` object, where you specify the queue name, exchange, and exchange type:

```python
from eventrabbit.common import ConsumeChannel
from aio_pika import ExchangeType

channel = ConsumeChannel(
    url="amqp://user:password@localhost:5672/",
    queue_name="my_queue",
    exchange_name="MY_EXCHANGE",
    exchange_type=ExchangeType.FANOUT,  # optional parameter
)

await handle.consume(channel)
```

- `exchange_type` — **optional** parameter. If not specified, the queue will be bound to the default RabbitMQ exchange.

You can run multiple queues in parallel using `asyncio.gather`:

```python
import asyncio

queues = [
    ConsumeChannel(
        url="amqp://user:password@localhost:5672/",
        queue_name="queue1",
        exchange_name="EX1",
        exchange_type=ExchangeType.FANOUT,
    ),
    ConsumeChannel(
        url="amqp://user:password@localhost:5672/",
        queue_name="queue2",
        exchange_name="EX2",
        exchange_type=ExchangeType.DIRECT,
    ),
    # You can omit exchange_type — the default exchange will be used
    ConsumeChannel(
        url="amqp://user:password@localhost:5672/",
        queue_name="queue3",
    ),
]

await asyncio.gather(*(handle.consume(ch) for ch in queues))
```

---

## reply_to Support (Response to Messages)

The library supports the **reply_to** mechanism. If the incoming message contains the `reply_to` field, the result of your function will be automatically sent back to the sender in the queue specified in `reply_to`.

- The **QueueResponse** model is used for the response, which automatically serializes the data to JSON.
- All return values of your function must be JSON-serializable (e.g., dicts, lists, strings, numbers, etc.).
- Response format: `{ "data": <your function result> }`
- This is convenient for implementing RPC over RabbitMQ.

---

## Full Example

```python
import asyncio
from aio_pika import ExchangeType
from eventrabbit import build_event_dependencies
from eventrabbit.common import ConsumeChannel

# Initialize dependencies

events, handle = build_event_dependencies(
    url="amqp://user:password@localhost:5672/",
    idle_timeout=300,
)

# Global call counter
count_call = 0

# Handler for TRACKERS_INFO event
@events.consumer(action="TRACKERS_INFO")
async def a1(b: str):
    global count_call
    print(b, "23")
    retro = Retro()
    await retro.abc()
    count_call += 1
    print("count", count_call)
    return b

# Handler for TRACKERS_INFO_1 event
@events.consumer(action="TRACKERS_INFO_1")
async def a2(b: str):
    global count_call
    print(b, "23")
    retro = Retro()
    count_call += 1
    print("count", count_call)
    await retro.abc1()
    return b

# Map of queues and exchanges
QUEUES_EXCHANGES = {
    "calendar_user_sync": "PROFILE_FANOUT_EXCHANGE",
    "calendar_google_sync": "GOOGLE_FANOUT_EXCHANGE",
    "calendar_user_status_sync": "USER_STATUS_FANOUT_EXCHANGE",
}

# Class with producers
class Retro:
    @events.producer(
        exchange_name="GOOGLE_FANOUT_EXCHANGE",
        action="TRACKERS_INFO",
    )
    async def abc(self):
        return {"b": "12"}

    @events.producer(
        exchange_name="PROFILE_FANOUT_EXCHANGE",
        action="TRACKERS_INFO_1",
    )
    async def abc1(self):
        return {"b": "12"}

# Main function to start queue consumption
async def main() -> None:
    tasks = [
        asyncio.create_task(
            handle.consume(ConsumeChannel(
                url="amqp://user:password@localhost:5672/",
                queue_name=queue,
                exchange_name=exchange,
                exchange_type=ExchangeType.FANOUT,
            )),
        )
        for queue, exchange in QUEUES_EXCHANGES.items()
    ]
    tasks += [handle.consume(ConsumeChannel(
            url="amqp://user:password@localhost:5672/",
            queue_name="tracker_info",
            exchange_type=ExchangeType.DIRECT,
        ))]

    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())
```

---

The library does not clutter your project with unnecessary abstractions and is suitable for a concise event-driven architecture.

            

Raw data

            {
    "_id": null,
    "home_page": "https://your-homepage-url.com",
    "name": "eventrabbit",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.11",
    "maintainer_email": null,
    "keywords": null,
    "author": "Artem",
    "author_email": "artemsenkevic348@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/62/ea/aa6db81792f526489d4eea525201f7280fced2e2b866b9f24de1b1b2be8e/eventrabbit-0.1.1.tar.gz",
    "platform": null,
    "description": "# eventrabbit\n\nA library for asynchronous work with RabbitMQ, implementing the **Event Sourcing** pattern. It allows you to easily build an event-driven architecture using decorators for producers, consumers, and functions.\n\n## Purpose\n\nImplements the **event sourcing** pattern: all state changes in the system are represented as events, which are sent and processed via the RabbitMQ message queue.\n\n## Message Structure\n\nAll messages processed by the library must have the following format:\n\n```json\n{\n  \"action\": \"<event_type>\",\n  \"data\": { ... } // event parameters\n}\n```\n\n- `action` \u2014 a string that defines the type of event/action\n- `data` \u2014 a dictionary with event parameters\n\n## Quick Start and Setup\n\n```python\nfrom eventrabbit import build_event_dependencies, RetryConfig\n\n# Create decorators and event handler\nretry_config = RetryConfig(max_retries=3, retry_delay_seconds=60)\nevents, handle = build_event_dependencies(\n    url=\"amqp://user:password@localhost:5672/\",\n    idle_timeout=300,  # connection idle timeout before closing (seconds)\n    retry_config=retry_config,  # retry parameters\n)\n```\n\n- `url` \u2014 RabbitMQ connection string\n- `idle_timeout` \u2014 connection idle timeout before automatic closing\n- `retry_config` \u2014 retry parameters (default: infinite retries, 5 seconds delay)\n\n## Using Decorators\n\n### 1. @events.consumer\n\nRegisters an async function as a handler for incoming messages with a specific action.\n\n```python\n@events.consumer(action=\"USER_CREATED\")\nasync def handle_user_created(user_id: int, name: str):\n    # handle user creation event\n    ...\n```\n\n- The function must accept parameters matching the keys in `data`.\n\n### 2. @events.producer\n\nWraps a function so that its result is automatically sent to the queue as an event.\n\n```python\n@events.producer(exchange_name=\"user\", action=\"USER_CREATED\")\nasync def create_user(user_id: int, name: str):\n    # user creation logic\n    return {\"user_id\": user_id, \"name\": name}\n```\n\n- `exchange_name` \u2014 exchange for publishing the event\n- `action` \u2014 event type\n- `key` (optional) \u2014 routing key\n\n### 3. @events.function\n\nRegisters a function as an event handler and automatically sends the result to the queue.\n\n```python\n@events.function(action=\"SEND_EMAIL\", exchange_name=\"email\", action_reply=\"EMAIL_SENT\")\nasync def send_email(user_id: int, email: str):\n    # email sending logic\n    return {\"user_id\": user_id, \"email\": email, \"status\": \"sent\"}\n```\n\n- `action` \u2014 incoming event type\n- `exchange_name` \u2014 exchange for publishing the result\n- `action_reply` \u2014 event type for the reply (by default, same as action)\n- `key` (optional) \u2014 routing key\n\n## Important\n\n- All messages must be in the format `{ \"action\": str, \"data\": dict }` \u2014 otherwise, processing will not occur.\n- The library automatically manages the connection and retry logic.\n\n## Minimal Configuration\n\n```python\nfrom eventrabbit import build_event_dependencies\n\nevents, handle = build_event_dependencies(\n    url=\"amqp://user:password@localhost:5672/\"\n)\n```\n\n- For advanced scenarios, use the `idle_timeout`, `retry_config`, and other parameters.\n\n---\n\n## Queue Setup and Consumption\n\nTo consume queues, use the `ConsumeChannel` object, where you specify the queue name, exchange, and exchange type:\n\n```python\nfrom eventrabbit.common import ConsumeChannel\nfrom aio_pika import ExchangeType\n\nchannel = ConsumeChannel(\n    url=\"amqp://user:password@localhost:5672/\",\n    queue_name=\"my_queue\",\n    exchange_name=\"MY_EXCHANGE\",\n    exchange_type=ExchangeType.FANOUT,  # optional parameter\n)\n\nawait handle.consume(channel)\n```\n\n- `exchange_type` \u2014 **optional** parameter. If not specified, the queue will be bound to the default RabbitMQ exchange.\n\nYou can run multiple queues in parallel using `asyncio.gather`:\n\n```python\nimport asyncio\n\nqueues = [\n    ConsumeChannel(\n        url=\"amqp://user:password@localhost:5672/\",\n        queue_name=\"queue1\",\n        exchange_name=\"EX1\",\n        exchange_type=ExchangeType.FANOUT,\n    ),\n    ConsumeChannel(\n        url=\"amqp://user:password@localhost:5672/\",\n        queue_name=\"queue2\",\n        exchange_name=\"EX2\",\n        exchange_type=ExchangeType.DIRECT,\n    ),\n    # You can omit exchange_type \u2014 the default exchange will be used\n    ConsumeChannel(\n        url=\"amqp://user:password@localhost:5672/\",\n        queue_name=\"queue3\",\n    ),\n]\n\nawait asyncio.gather(*(handle.consume(ch) for ch in queues))\n```\n\n---\n\n## reply_to Support (Response to Messages)\n\nThe library supports the **reply_to** mechanism. If the incoming message contains the `reply_to` field, the result of your function will be automatically sent back to the sender in the queue specified in `reply_to`.\n\n- The **QueueResponse** model is used for the response, which automatically serializes the data to JSON.\n- All return values of your function must be JSON-serializable (e.g., dicts, lists, strings, numbers, etc.).\n- Response format: `{ \"data\": <your function result> }`\n- This is convenient for implementing RPC over RabbitMQ.\n\n---\n\n## Full Example\n\n```python\nimport asyncio\nfrom aio_pika import ExchangeType\nfrom eventrabbit import build_event_dependencies\nfrom eventrabbit.common import ConsumeChannel\n\n# Initialize dependencies\n\nevents, handle = build_event_dependencies(\n    url=\"amqp://user:password@localhost:5672/\",\n    idle_timeout=300,\n)\n\n# Global call counter\ncount_call = 0\n\n# Handler for TRACKERS_INFO event\n@events.consumer(action=\"TRACKERS_INFO\")\nasync def a1(b: str):\n    global count_call\n    print(b, \"23\")\n    retro = Retro()\n    await retro.abc()\n    count_call += 1\n    print(\"count\", count_call)\n    return b\n\n# Handler for TRACKERS_INFO_1 event\n@events.consumer(action=\"TRACKERS_INFO_1\")\nasync def a2(b: str):\n    global count_call\n    print(b, \"23\")\n    retro = Retro()\n    count_call += 1\n    print(\"count\", count_call)\n    await retro.abc1()\n    return b\n\n# Map of queues and exchanges\nQUEUES_EXCHANGES = {\n    \"calendar_user_sync\": \"PROFILE_FANOUT_EXCHANGE\",\n    \"calendar_google_sync\": \"GOOGLE_FANOUT_EXCHANGE\",\n    \"calendar_user_status_sync\": \"USER_STATUS_FANOUT_EXCHANGE\",\n}\n\n# Class with producers\nclass Retro:\n    @events.producer(\n        exchange_name=\"GOOGLE_FANOUT_EXCHANGE\",\n        action=\"TRACKERS_INFO\",\n    )\n    async def abc(self):\n        return {\"b\": \"12\"}\n\n    @events.producer(\n        exchange_name=\"PROFILE_FANOUT_EXCHANGE\",\n        action=\"TRACKERS_INFO_1\",\n    )\n    async def abc1(self):\n        return {\"b\": \"12\"}\n\n# Main function to start queue consumption\nasync def main() -> None:\n    tasks = [\n        asyncio.create_task(\n            handle.consume(ConsumeChannel(\n                url=\"amqp://user:password@localhost:5672/\",\n                queue_name=queue,\n                exchange_name=exchange,\n                exchange_type=ExchangeType.FANOUT,\n            )),\n        )\n        for queue, exchange in QUEUES_EXCHANGES.items()\n    ]\n    tasks += [handle.consume(ConsumeChannel(\n            url=\"amqp://user:password@localhost:5672/\",\n            queue_name=\"tracker_info\",\n            exchange_type=ExchangeType.DIRECT,\n        ))]\n\n    await asyncio.gather(*tasks)\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n```\n\n---\n\nThe library does not clutter your project with unnecessary abstractions and is suitable for a concise event-driven architecture.\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Event sourcing library for RabbitMQ (aio_pika)",
    "version": "0.1.1",
    "project_urls": {
        "Homepage": "https://your-homepage-url.com"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "3d12dc109fe85af734a64f878009dee69f2e25d11d932c6baa5595fdc4700318",
                "md5": "032f6301136f4950cbf7a77db90e5b75",
                "sha256": "8557b472b68d42d8a8c874bb4126286e75d43b588127c55b3d6b292a8eec97e8"
            },
            "downloads": -1,
            "filename": "eventrabbit-0.1.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "032f6301136f4950cbf7a77db90e5b75",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.11",
            "size": 12950,
            "upload_time": "2025-07-25T17:55:56",
            "upload_time_iso_8601": "2025-07-25T17:55:56.083947Z",
            "url": "https://files.pythonhosted.org/packages/3d/12/dc109fe85af734a64f878009dee69f2e25d11d932c6baa5595fdc4700318/eventrabbit-0.1.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "62eaaa6db81792f526489d4eea525201f7280fced2e2b866b9f24de1b1b2be8e",
                "md5": "87cb456c63240e6f90f09f4a6bba4168",
                "sha256": "6d0c695c7a128e66b4c6c6d608348a730d0098225b8cca1a2b27f0446de9c89c"
            },
            "downloads": -1,
            "filename": "eventrabbit-0.1.1.tar.gz",
            "has_sig": false,
            "md5_digest": "87cb456c63240e6f90f09f4a6bba4168",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.11",
            "size": 11626,
            "upload_time": "2025-07-25T17:55:57",
            "upload_time_iso_8601": "2025-07-25T17:55:57.486010Z",
            "url": "https://files.pythonhosted.org/packages/62/ea/aa6db81792f526489d4eea525201f7280fced2e2b866b9f24de1b1b2be8e/eventrabbit-0.1.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-07-25 17:55:57",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "eventrabbit"
}
        
Elapsed time: 0.65935s