Name | eventrabbit JSON |
Version |
0.1.1
JSON |
| download |
home_page | https://your-homepage-url.com |
Summary | Event sourcing library for RabbitMQ (aio_pika) |
upload_time | 2025-07-25 17:55:57 |
maintainer | None |
docs_url | None |
author | Artem |
requires_python | <4.0,>=3.11 |
license | MIT |
keywords |
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# eventrabbit
A library for asynchronous work with RabbitMQ, implementing the **Event Sourcing** pattern. It allows you to easily build an event-driven architecture using decorators for producers, consumers, and functions.
## Purpose
Implements the **event sourcing** pattern: all state changes in the system are represented as events, which are sent and processed via the RabbitMQ message queue.
## Message Structure
All messages processed by the library must have the following format:
```json
{
"action": "<event_type>",
"data": { ... } // event parameters
}
```
- `action` — a string that defines the type of event/action
- `data` — a dictionary with event parameters
## Quick Start and Setup
```python
from eventrabbit import build_event_dependencies, RetryConfig
# Create decorators and event handler
retry_config = RetryConfig(max_retries=3, retry_delay_seconds=60)
events, handle = build_event_dependencies(
url="amqp://user:password@localhost:5672/",
idle_timeout=300, # connection idle timeout before closing (seconds)
retry_config=retry_config, # retry parameters
)
```
- `url` — RabbitMQ connection string
- `idle_timeout` — connection idle timeout before automatic closing
- `retry_config` — retry parameters (default: infinite retries, 5 seconds delay)
## Using Decorators
### 1. @events.consumer
Registers an async function as a handler for incoming messages with a specific action.
```python
@events.consumer(action="USER_CREATED")
async def handle_user_created(user_id: int, name: str):
# handle user creation event
...
```
- The function must accept parameters matching the keys in `data`.
### 2. @events.producer
Wraps a function so that its result is automatically sent to the queue as an event.
```python
@events.producer(exchange_name="user", action="USER_CREATED")
async def create_user(user_id: int, name: str):
# user creation logic
return {"user_id": user_id, "name": name}
```
- `exchange_name` — exchange for publishing the event
- `action` — event type
- `key` (optional) — routing key
### 3. @events.function
Registers a function as an event handler and automatically sends the result to the queue.
```python
@events.function(action="SEND_EMAIL", exchange_name="email", action_reply="EMAIL_SENT")
async def send_email(user_id: int, email: str):
# email sending logic
return {"user_id": user_id, "email": email, "status": "sent"}
```
- `action` — incoming event type
- `exchange_name` — exchange for publishing the result
- `action_reply` — event type for the reply (by default, same as action)
- `key` (optional) — routing key
## Important
- All messages must be in the format `{ "action": str, "data": dict }` — otherwise, processing will not occur.
- The library automatically manages the connection and retry logic.
## Minimal Configuration
```python
from eventrabbit import build_event_dependencies
events, handle = build_event_dependencies(
url="amqp://user:password@localhost:5672/"
)
```
- For advanced scenarios, use the `idle_timeout`, `retry_config`, and other parameters.
---
## Queue Setup and Consumption
To consume queues, use the `ConsumeChannel` object, where you specify the queue name, exchange, and exchange type:
```python
from eventrabbit.common import ConsumeChannel
from aio_pika import ExchangeType
channel = ConsumeChannel(
url="amqp://user:password@localhost:5672/",
queue_name="my_queue",
exchange_name="MY_EXCHANGE",
exchange_type=ExchangeType.FANOUT, # optional parameter
)
await handle.consume(channel)
```
- `exchange_type` — **optional** parameter. If not specified, the queue will be bound to the default RabbitMQ exchange.
You can run multiple queues in parallel using `asyncio.gather`:
```python
import asyncio
queues = [
ConsumeChannel(
url="amqp://user:password@localhost:5672/",
queue_name="queue1",
exchange_name="EX1",
exchange_type=ExchangeType.FANOUT,
),
ConsumeChannel(
url="amqp://user:password@localhost:5672/",
queue_name="queue2",
exchange_name="EX2",
exchange_type=ExchangeType.DIRECT,
),
# You can omit exchange_type — the default exchange will be used
ConsumeChannel(
url="amqp://user:password@localhost:5672/",
queue_name="queue3",
),
]
await asyncio.gather(*(handle.consume(ch) for ch in queues))
```
---
## reply_to Support (Response to Messages)
The library supports the **reply_to** mechanism. If the incoming message contains the `reply_to` field, the result of your function will be automatically sent back to the sender in the queue specified in `reply_to`.
- The **QueueResponse** model is used for the response, which automatically serializes the data to JSON.
- All return values of your function must be JSON-serializable (e.g., dicts, lists, strings, numbers, etc.).
- Response format: `{ "data": <your function result> }`
- This is convenient for implementing RPC over RabbitMQ.
---
## Full Example
```python
import asyncio
from aio_pika import ExchangeType
from eventrabbit import build_event_dependencies
from eventrabbit.common import ConsumeChannel
# Initialize dependencies
events, handle = build_event_dependencies(
url="amqp://user:password@localhost:5672/",
idle_timeout=300,
)
# Global call counter
count_call = 0
# Handler for TRACKERS_INFO event
@events.consumer(action="TRACKERS_INFO")
async def a1(b: str):
global count_call
print(b, "23")
retro = Retro()
await retro.abc()
count_call += 1
print("count", count_call)
return b
# Handler for TRACKERS_INFO_1 event
@events.consumer(action="TRACKERS_INFO_1")
async def a2(b: str):
global count_call
print(b, "23")
retro = Retro()
count_call += 1
print("count", count_call)
await retro.abc1()
return b
# Map of queues and exchanges
QUEUES_EXCHANGES = {
"calendar_user_sync": "PROFILE_FANOUT_EXCHANGE",
"calendar_google_sync": "GOOGLE_FANOUT_EXCHANGE",
"calendar_user_status_sync": "USER_STATUS_FANOUT_EXCHANGE",
}
# Class with producers
class Retro:
@events.producer(
exchange_name="GOOGLE_FANOUT_EXCHANGE",
action="TRACKERS_INFO",
)
async def abc(self):
return {"b": "12"}
@events.producer(
exchange_name="PROFILE_FANOUT_EXCHANGE",
action="TRACKERS_INFO_1",
)
async def abc1(self):
return {"b": "12"}
# Main function to start queue consumption
async def main() -> None:
tasks = [
asyncio.create_task(
handle.consume(ConsumeChannel(
url="amqp://user:password@localhost:5672/",
queue_name=queue,
exchange_name=exchange,
exchange_type=ExchangeType.FANOUT,
)),
)
for queue, exchange in QUEUES_EXCHANGES.items()
]
tasks += [handle.consume(ConsumeChannel(
url="amqp://user:password@localhost:5672/",
queue_name="tracker_info",
exchange_type=ExchangeType.DIRECT,
))]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
```
---
The library does not clutter your project with unnecessary abstractions and is suitable for a concise event-driven architecture.
Raw data
{
"_id": null,
"home_page": "https://your-homepage-url.com",
"name": "eventrabbit",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.11",
"maintainer_email": null,
"keywords": null,
"author": "Artem",
"author_email": "artemsenkevic348@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/62/ea/aa6db81792f526489d4eea525201f7280fced2e2b866b9f24de1b1b2be8e/eventrabbit-0.1.1.tar.gz",
"platform": null,
"description": "# eventrabbit\n\nA library for asynchronous work with RabbitMQ, implementing the **Event Sourcing** pattern. It allows you to easily build an event-driven architecture using decorators for producers, consumers, and functions.\n\n## Purpose\n\nImplements the **event sourcing** pattern: all state changes in the system are represented as events, which are sent and processed via the RabbitMQ message queue.\n\n## Message Structure\n\nAll messages processed by the library must have the following format:\n\n```json\n{\n \"action\": \"<event_type>\",\n \"data\": { ... } // event parameters\n}\n```\n\n- `action` \u2014 a string that defines the type of event/action\n- `data` \u2014 a dictionary with event parameters\n\n## Quick Start and Setup\n\n```python\nfrom eventrabbit import build_event_dependencies, RetryConfig\n\n# Create decorators and event handler\nretry_config = RetryConfig(max_retries=3, retry_delay_seconds=60)\nevents, handle = build_event_dependencies(\n url=\"amqp://user:password@localhost:5672/\",\n idle_timeout=300, # connection idle timeout before closing (seconds)\n retry_config=retry_config, # retry parameters\n)\n```\n\n- `url` \u2014 RabbitMQ connection string\n- `idle_timeout` \u2014 connection idle timeout before automatic closing\n- `retry_config` \u2014 retry parameters (default: infinite retries, 5 seconds delay)\n\n## Using Decorators\n\n### 1. @events.consumer\n\nRegisters an async function as a handler for incoming messages with a specific action.\n\n```python\n@events.consumer(action=\"USER_CREATED\")\nasync def handle_user_created(user_id: int, name: str):\n # handle user creation event\n ...\n```\n\n- The function must accept parameters matching the keys in `data`.\n\n### 2. @events.producer\n\nWraps a function so that its result is automatically sent to the queue as an event.\n\n```python\n@events.producer(exchange_name=\"user\", action=\"USER_CREATED\")\nasync def create_user(user_id: int, name: str):\n # user creation logic\n return {\"user_id\": user_id, \"name\": name}\n```\n\n- `exchange_name` \u2014 exchange for publishing the event\n- `action` \u2014 event type\n- `key` (optional) \u2014 routing key\n\n### 3. @events.function\n\nRegisters a function as an event handler and automatically sends the result to the queue.\n\n```python\n@events.function(action=\"SEND_EMAIL\", exchange_name=\"email\", action_reply=\"EMAIL_SENT\")\nasync def send_email(user_id: int, email: str):\n # email sending logic\n return {\"user_id\": user_id, \"email\": email, \"status\": \"sent\"}\n```\n\n- `action` \u2014 incoming event type\n- `exchange_name` \u2014 exchange for publishing the result\n- `action_reply` \u2014 event type for the reply (by default, same as action)\n- `key` (optional) \u2014 routing key\n\n## Important\n\n- All messages must be in the format `{ \"action\": str, \"data\": dict }` \u2014 otherwise, processing will not occur.\n- The library automatically manages the connection and retry logic.\n\n## Minimal Configuration\n\n```python\nfrom eventrabbit import build_event_dependencies\n\nevents, handle = build_event_dependencies(\n url=\"amqp://user:password@localhost:5672/\"\n)\n```\n\n- For advanced scenarios, use the `idle_timeout`, `retry_config`, and other parameters.\n\n---\n\n## Queue Setup and Consumption\n\nTo consume queues, use the `ConsumeChannel` object, where you specify the queue name, exchange, and exchange type:\n\n```python\nfrom eventrabbit.common import ConsumeChannel\nfrom aio_pika import ExchangeType\n\nchannel = ConsumeChannel(\n url=\"amqp://user:password@localhost:5672/\",\n queue_name=\"my_queue\",\n exchange_name=\"MY_EXCHANGE\",\n exchange_type=ExchangeType.FANOUT, # optional parameter\n)\n\nawait handle.consume(channel)\n```\n\n- `exchange_type` \u2014 **optional** parameter. If not specified, the queue will be bound to the default RabbitMQ exchange.\n\nYou can run multiple queues in parallel using `asyncio.gather`:\n\n```python\nimport asyncio\n\nqueues = [\n ConsumeChannel(\n url=\"amqp://user:password@localhost:5672/\",\n queue_name=\"queue1\",\n exchange_name=\"EX1\",\n exchange_type=ExchangeType.FANOUT,\n ),\n ConsumeChannel(\n url=\"amqp://user:password@localhost:5672/\",\n queue_name=\"queue2\",\n exchange_name=\"EX2\",\n exchange_type=ExchangeType.DIRECT,\n ),\n # You can omit exchange_type \u2014 the default exchange will be used\n ConsumeChannel(\n url=\"amqp://user:password@localhost:5672/\",\n queue_name=\"queue3\",\n ),\n]\n\nawait asyncio.gather(*(handle.consume(ch) for ch in queues))\n```\n\n---\n\n## reply_to Support (Response to Messages)\n\nThe library supports the **reply_to** mechanism. If the incoming message contains the `reply_to` field, the result of your function will be automatically sent back to the sender in the queue specified in `reply_to`.\n\n- The **QueueResponse** model is used for the response, which automatically serializes the data to JSON.\n- All return values of your function must be JSON-serializable (e.g., dicts, lists, strings, numbers, etc.).\n- Response format: `{ \"data\": <your function result> }`\n- This is convenient for implementing RPC over RabbitMQ.\n\n---\n\n## Full Example\n\n```python\nimport asyncio\nfrom aio_pika import ExchangeType\nfrom eventrabbit import build_event_dependencies\nfrom eventrabbit.common import ConsumeChannel\n\n# Initialize dependencies\n\nevents, handle = build_event_dependencies(\n url=\"amqp://user:password@localhost:5672/\",\n idle_timeout=300,\n)\n\n# Global call counter\ncount_call = 0\n\n# Handler for TRACKERS_INFO event\n@events.consumer(action=\"TRACKERS_INFO\")\nasync def a1(b: str):\n global count_call\n print(b, \"23\")\n retro = Retro()\n await retro.abc()\n count_call += 1\n print(\"count\", count_call)\n return b\n\n# Handler for TRACKERS_INFO_1 event\n@events.consumer(action=\"TRACKERS_INFO_1\")\nasync def a2(b: str):\n global count_call\n print(b, \"23\")\n retro = Retro()\n count_call += 1\n print(\"count\", count_call)\n await retro.abc1()\n return b\n\n# Map of queues and exchanges\nQUEUES_EXCHANGES = {\n \"calendar_user_sync\": \"PROFILE_FANOUT_EXCHANGE\",\n \"calendar_google_sync\": \"GOOGLE_FANOUT_EXCHANGE\",\n \"calendar_user_status_sync\": \"USER_STATUS_FANOUT_EXCHANGE\",\n}\n\n# Class with producers\nclass Retro:\n @events.producer(\n exchange_name=\"GOOGLE_FANOUT_EXCHANGE\",\n action=\"TRACKERS_INFO\",\n )\n async def abc(self):\n return {\"b\": \"12\"}\n\n @events.producer(\n exchange_name=\"PROFILE_FANOUT_EXCHANGE\",\n action=\"TRACKERS_INFO_1\",\n )\n async def abc1(self):\n return {\"b\": \"12\"}\n\n# Main function to start queue consumption\nasync def main() -> None:\n tasks = [\n asyncio.create_task(\n handle.consume(ConsumeChannel(\n url=\"amqp://user:password@localhost:5672/\",\n queue_name=queue,\n exchange_name=exchange,\n exchange_type=ExchangeType.FANOUT,\n )),\n )\n for queue, exchange in QUEUES_EXCHANGES.items()\n ]\n tasks += [handle.consume(ConsumeChannel(\n url=\"amqp://user:password@localhost:5672/\",\n queue_name=\"tracker_info\",\n exchange_type=ExchangeType.DIRECT,\n ))]\n\n await asyncio.gather(*tasks)\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\n---\n\nThe library does not clutter your project with unnecessary abstractions and is suitable for a concise event-driven architecture.\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Event sourcing library for RabbitMQ (aio_pika)",
"version": "0.1.1",
"project_urls": {
"Homepage": "https://your-homepage-url.com"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "3d12dc109fe85af734a64f878009dee69f2e25d11d932c6baa5595fdc4700318",
"md5": "032f6301136f4950cbf7a77db90e5b75",
"sha256": "8557b472b68d42d8a8c874bb4126286e75d43b588127c55b3d6b292a8eec97e8"
},
"downloads": -1,
"filename": "eventrabbit-0.1.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "032f6301136f4950cbf7a77db90e5b75",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.11",
"size": 12950,
"upload_time": "2025-07-25T17:55:56",
"upload_time_iso_8601": "2025-07-25T17:55:56.083947Z",
"url": "https://files.pythonhosted.org/packages/3d/12/dc109fe85af734a64f878009dee69f2e25d11d932c6baa5595fdc4700318/eventrabbit-0.1.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "62eaaa6db81792f526489d4eea525201f7280fced2e2b866b9f24de1b1b2be8e",
"md5": "87cb456c63240e6f90f09f4a6bba4168",
"sha256": "6d0c695c7a128e66b4c6c6d608348a730d0098225b8cca1a2b27f0446de9c89c"
},
"downloads": -1,
"filename": "eventrabbit-0.1.1.tar.gz",
"has_sig": false,
"md5_digest": "87cb456c63240e6f90f09f4a6bba4168",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.11",
"size": 11626,
"upload_time": "2025-07-25T17:55:57",
"upload_time_iso_8601": "2025-07-25T17:55:57.486010Z",
"url": "https://files.pythonhosted.org/packages/62/ea/aa6db81792f526489d4eea525201f7280fced2e2b866b9f24de1b1b2be8e/eventrabbit-0.1.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-25 17:55:57",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "eventrabbit"
}