Name | bus_queue JSON |
Version |
0.5.4
JSON |
| download |
home_page | None |
Summary | None |
upload_time | 2025-01-15 10:01:42 |
maintainer | None |
docs_url | None |
author | Gonzalo Ayuso |
requires_python | <4.0,>=3.9 |
license | None |
keywords |
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
## Event Bus in Python with RabbitMQ: Exploring Synchronous and Asynchronous Solutions
Today we're going to build an event bus with Python. It's an event bus according to my personal needs. The idea is to create a scalable event bus, with RabbitMQ as the message broker but easy to replace with another message broker such as MQTT or Redis. In fact, I've started with a memomry-based message broker. I'm not going to use never this on-memory message broker, but it was a good start to understand the basics of the event bus.
That's the on memory version:
```python
import logging
from bus_queue.backend.memory.bus import MemoryEventBus as Bus
from bus_queue import EventBus
logging.basicConfig(
format='%(asctime)s [%(levelname)s] %(message)s',
level='INFO',
datefmt='%d/%m/%Y %X')
logger = logging.getLogger(__name__)
def callback(topic, msg):
logger.info(f"Received: topic: {topic} msg: {msg}")
def main():
backend = Bus()
bus = EventBus(backend)
bus.subscribe("test", callback)
bus.publish("test", dict(hola="Gonzalo"))
bus.wait()
if __name__ == "__main__":
main()
```
This on-memory version uses this implementation:
```python
from time import sleep
from typing import Callable, Dict, List, Any
from bus_queue import Backend
class MemoryEventBus(Backend):
def __init__(self):
self.subscribers: Dict[str, List[Callable[[str, Any], None]]] = {}
def publish(self, topic: str, message: str) -> None:
if topic in self.subscribers:
for callback in self.subscribers[topic]:
callback(topic, message)
def broadcast(self, topic: str, payload: Any):
self.publish(topic, payload)
def subscribe(self, topic: str, callback: Callable[[str, Any], None]) -> None:
if topic not in self.subscribers:
self.subscribers[topic] = []
self.subscribers[topic].append(callback)
def wait(self):
while True:
sleep(1)
```
This implementation is a synchronous version. I also want to create an asynchronous version.
```python
import asyncio
import logging
from bus_queue.backend.memory.assync_bus import AsyncMemoryEventBus as Bus
from bus_queue import AsyncEventBus
logging.basicConfig(
format='%(asctime)s [%(levelname)s] %(message)s',
level='INFO',
datefmt='%d/%m/%Y %X')
for l in ['asyncio', ]:
logging.getLogger(l).setLevel(logging. WARNING)
logger = logging.getLogger(__name__)
async def callback(topic, msg):
logger.info(f"Received: topic: {topic} msg: {msg}")
async def main():
backend = Bus()
bus = AsyncEventBus(backend)
await bus.subscribe("test", callback)
await bus.publish("test", dict(hola="Gonzalo"))
await bus.wait()
if __name__ == "__main__":
asyncio.run(main())
```
```python
import asyncio
from typing import Callable, Dict, List, Any, Awaitable
from bus_queue import AsyncBackend
class AsyncMemoryEventBus(AsyncBackend):
def __init__(self):
self.subscribers: Dict[str, List[Callable[[str, Any], Awaitable[None]]]] = {}
async def publish(self, topic: str, message: str):
if topic in self.subscribers:
tasks = [
asyncio.create_task(subscriber(topic, message))
for subscriber in self.subscribers[topic]
]
await asyncio.gather(*tasks)
async def broadcast(self, topic: str, message: str):
await self.publish(topic, message)
async def subscribe(self, topic: str, handler: Callable[[str, Any], Awaitable[None]]):
if topic not in self.subscribers:
self.subscribers[topic] = []
self.subscribers[topic].append(handler)
async def wait(self):
await asyncio.Event().wait()
```
But this on-memory version is not useful for me. I want to use RabbitMQ as the message broker. I'm going to create also a synchronous and an asynchronous version also. In this version I´m going to create two kind of ways to publish messages. One way is a simple publish, and the other way is a broadcast. The broadcast is going to send the message to all the subscribers of the topic, and the publishing is going to send the message to only one subscriber, using a round-robin strategy.
The synchronous version:
The listener:
```python
import logging
from bus_queue import EventBus
from bus_queue.backend.rabbit.bus import RabbitEventBus as Bus
logging.basicConfig(
format='%(asctime)s [%(levelname)s] %(message)s',
level='INFO',
datefmt='%d/%m/%Y %X')
for l in ['pika', ]:
logging.getLogger(l).setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
def callback(topic, msg):
logger.info(f"Received: topic: {topic} msg: {msg}")
def main():
backend = Bus("amqp://guest:guest@localhost:5672/")
bus = EventBus(backend)
bus.subscribe("test", callback)
bus.wait()
if __name__ == "__main__":
main()
```
And the publisher:
```python
import logging
from bus_queue.backend.rabbit.bus import RabbitEventBus as Bus
from bus_queue import EventBus
logging.basicConfig(
format='%(asctime)s [%(levelname)s] %(message)s',
level='INFO',
datefmt='%d/%m/%Y %X')
for l in ['pika',]:
logging.getLogger(l).setLevel(logging. WARNING)
logger = logging.getLogger(__name__)
def main():
backend = Bus("amqp://guest:guest@localhost:5672/")
bus = EventBus(backend)
bus.publish("test", dict(hola="Gonzalo"))
bus.broadcast("test", "Hola, broadcast")
if __name__ == "__main__":
main()
```
The implementation is like that:
```python
import logging
from typing import Callable, Dict, Any, List
import pika
from bus_queue import Backend
logger = logging.getLogger(__name__)
def get_broadcast_exchange_from_topic(topic: str):
return f"broadcast_{topic}"
class RabbitEventBus(Backend):
def __init__(self, rabbitmq_url: str, max_retries: int = 3):
self.rabbitmq_url = rabbitmq_url
self.subscribers: Dict[str, List[Callable[[str, Any], None]]] = {}
self.connection = None
self.channel = None
self.max_retries = max_retries
def connect(self):
self.connection = pika.BlockingConnection(pika.URLParameters(self.rabbitmq_url))
self.channel = self.connection.channel()
def broadcast(self, topic: str, payload: Any):
if self.channel is None:
self.connect()
exchange = get_broadcast_exchange_from_topic(topic)
self.channel.exchange_declare(exchange=exchange, exchange_type='fanout')
self.channel.basic_publish(exchange=exchange, routing_key='', body=payload.encode())
def publish(self, topic: str, payload: Any):
if self.channel is None:
self.connect()
self.channel.basic_publish(exchange='', routing_key=topic, body=payload.encode())
def subscribe(self, topic: str, handler: Callable[[str, Any], None]):
if topic not in self.subscribers:
self.subscribers[topic] = []
self.subscribers[topic].append(handler)
if self.channel is None:
self.connect()
self.channel.queue_declare(queue=topic, auto_delete=True)
exchange = get_broadcast_exchange_from_topic(topic)
self.channel.exchange_declare(exchange=exchange, exchange_type='fanout')
result = self.channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
self.channel.queue_bind(exchange=exchange, queue=queue_name)
def on_message(ch, method, properties, body):
for subscriber in self.subscribers[topic]:
try:
subscriber(topic, body.decode())
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as ex:
logger.exception(ex)
if method.delivery_tag <= self.max_retries:
logger.info(f"Retrying message ({method.delivery_tag}/{self.max_retries})")
self.channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
else:
logger.info(f"Max retries. max_retries: {self.max_retries})")
self.channel.basic_ack(delivery_tag=method.delivery_tag)
self.channel.basic_consume(queue=topic, on_message_callback=on_message, auto_ack=False)
self.channel.basic_consume(queue=queue_name, on_message_callback=on_message, auto_ack=False)
def wait(self):
if self.channel is None:
self.connect()
self.channel.basic_qos(prefetch_count=1)
self.channel.start_consuming()
```
And the asynchronous version:
The listener:
```python
import asyncio
import logging
from bus_queue import AsyncEventBus
from bus_queue.backend.rabbit.assync_bus import AsyncRabbitEventBus as Bus
logging.basicConfig(
format='%(asctime)s [%(levelname)s] %(message)s',
level='INFO',
datefmt='%d/%m/%Y %X')
for l in ['asyncio', 'aio-pika']:
logging.getLogger(l).setLevel(logging. WARNING)
logger = logging.getLogger(__name__)
async def callback(topic, msg):
logger.info(f"Received: topic: {topic} msg: {msg}")
async def main():
backend = Bus("amqp://guest:guest@localhost:5672/")
bus = AsyncEventBus(backend)
await bus.subscribe("test", callback)
await bus.wait()
if __name__ == "__main__":
asyncio.run(main())
```
The implementation is like that:
```python
import asyncio
import logging
from typing import Callable, Dict, List, Any, Awaitable
import aio_pika
from bus_queue import AsyncBackend
logger = logging.getLogger(__name__)
def get_broadcast_exchange_from_topic(topic: str):
return f"broadcast_{topic}"
class AsyncRabbitEventBus(AsyncBackend):
def __init__(self, rabbitmq_url: str, max_retries: int = 3):
self.subscribers: Dict[str, List[Callable[[str, Any], Awaitable[None]]]] = {}
self.rabbitmq_url = rabbitmq_url
self.max_retries = max_retries
async def broadcast(self, topic: str, payload: Any):
connection = await aio_pika.connect_robust(self.rabbitmq_url)
exchange_type = aio_pika.ExchangeType.FANOUT
exchange = get_broadcast_exchange_from_topic(topic)
async with connection:
channel = await connection.channel()
exchange = await channel.declare_exchange(exchange, exchange_type)
await exchange.publish(
aio_pika.Message(body=payload.encode()),
routing_key=topic
)
async def publish(self, topic: str, payload: Any):
connection = await aio_pika.connect_robust(self.rabbitmq_url)
async with connection:
channel = await connection.channel()
await channel.default_exchange.publish(
aio_pika.Message(body=payload.encode()),
routing_key=topic
)
async def subscribe(self, topic: str, handler: Callable[[str, Any], Awaitable[None]]):
if topic not in self.subscribers:
self.subscribers[topic] = []
self.subscribers[topic].append(handler)
exchange = get_broadcast_exchange_from_topic(topic)
connection = await aio_pika.connect_robust(self.rabbitmq_url)
async with connection:
channel = await connection.channel()
direct_queue = await channel.declare_queue(topic, auto_delete=True)
broadcast_exchange = await channel.declare_exchange(exchange, aio_pika.ExchangeType.FANOUT)
broadcast_queue = await channel.declare_queue('', exclusive=True)
await broadcast_queue.bind(broadcast_exchange)
async def process_queue(queue_iter):
async for message in queue_iter:
try:
await handler(topic, message.body.decode())
await message.ack()
except Exception as ex:
if message.delivery_tag <= self.max_retries:
logger.info(f"Retrying message ({message.delivery_tag}/{self.max_retries})")
await message.nack(requeue=True)
else:
logger.exception(ex)
logger.info(
f"Max retries. Discarding event (max_retries: {self.max_retries})")
await message.ack()
async with direct_queue.iterator() as direct_queue_iter, broadcast_queue.iterator() as broadcast_queue_iter:
await asyncio.gather(
process_queue(direct_queue_iter),
process_queue(broadcast_queue_iter)
)
async def wait(self):
await asyncio.Event().wait()
```
And that's all. The library can be installed with poetry in both versions: async and sync. You can use pip or poetry to install the library.
```bash
For the sync version:
```bash
poetry add bus_queue --extras "sync"
pip install bus_queue[sync]
```
and for the async version:
```bash
poetry add bus_queue --extras "async"
pip install bus_queue[async]
```
Raw data
{
"_id": null,
"home_page": null,
"name": "bus_queue",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.9",
"maintainer_email": null,
"keywords": null,
"author": "Gonzalo Ayuso",
"author_email": "gonzalo123@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/fc/ca/c77bd07eaee8a34137dda43cc19550bb44f817e268c5605f9470a4037da0/bus_queue-0.5.4.tar.gz",
"platform": null,
"description": "## Event Bus in Python with RabbitMQ: Exploring Synchronous and Asynchronous Solutions\n\nToday we're going to build an event bus with Python. It's an event bus according to my personal needs. The idea is to create a scalable event bus, with RabbitMQ as the message broker but easy to replace with another message broker such as MQTT or Redis. In fact, I've started with a memomry-based message broker. I'm not going to use never this on-memory message broker, but it was a good start to understand the basics of the event bus.\n\nThat's the on memory version:\n\n```python\nimport logging\n\nfrom bus_queue.backend.memory.bus import MemoryEventBus as Bus\nfrom bus_queue import EventBus\n\nlogging.basicConfig(\n format='%(asctime)s [%(levelname)s] %(message)s',\n level='INFO',\n datefmt='%d/%m/%Y %X')\n\nlogger = logging.getLogger(__name__)\n\n\ndef callback(topic, msg):\n logger.info(f\"Received: topic: {topic} msg: {msg}\")\n\n\ndef main():\n backend = Bus()\n bus = EventBus(backend)\n\n bus.subscribe(\"test\", callback)\n\n bus.publish(\"test\", dict(hola=\"Gonzalo\"))\n bus.wait()\n\n\nif __name__ == \"__main__\":\n main()\n```\n\nThis on-memory version uses this implementation:\n\n```python\nfrom time import sleep\nfrom typing import Callable, Dict, List, Any\n\nfrom bus_queue import Backend\n\n\nclass MemoryEventBus(Backend):\n def __init__(self):\n self.subscribers: Dict[str, List[Callable[[str, Any], None]]] = {}\n\n def publish(self, topic: str, message: str) -> None:\n if topic in self.subscribers:\n for callback in self.subscribers[topic]:\n callback(topic, message)\n\n def broadcast(self, topic: str, payload: Any):\n self.publish(topic, payload)\n\n def subscribe(self, topic: str, callback: Callable[[str, Any], None]) -> None:\n if topic not in self.subscribers:\n self.subscribers[topic] = []\n self.subscribers[topic].append(callback)\n\n def wait(self):\n while True:\n sleep(1)\n```\n\nThis implementation is a synchronous version. I also want to create an asynchronous version. \n\n```python\nimport asyncio\nimport logging\n\nfrom bus_queue.backend.memory.assync_bus import AsyncMemoryEventBus as Bus\nfrom bus_queue import AsyncEventBus\n\nlogging.basicConfig(\n format='%(asctime)s [%(levelname)s] %(message)s',\n level='INFO',\n datefmt='%d/%m/%Y %X')\n\nfor l in ['asyncio', ]:\n logging.getLogger(l).setLevel(logging. WARNING)\n\nlogger = logging.getLogger(__name__)\n\n\nasync def callback(topic, msg):\n logger.info(f\"Received: topic: {topic} msg: {msg}\")\n\n\nasync def main():\n backend = Bus()\n bus = AsyncEventBus(backend)\n\n await bus.subscribe(\"test\", callback)\n\n await bus.publish(\"test\", dict(hola=\"Gonzalo\"))\n await bus.wait()\n\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n\n```\n\n```python\nimport asyncio\nfrom typing import Callable, Dict, List, Any, Awaitable\n\nfrom bus_queue import AsyncBackend\n\n\nclass AsyncMemoryEventBus(AsyncBackend):\n def __init__(self):\n self.subscribers: Dict[str, List[Callable[[str, Any], Awaitable[None]]]] = {}\n\n async def publish(self, topic: str, message: str):\n if topic in self.subscribers:\n tasks = [\n asyncio.create_task(subscriber(topic, message))\n for subscriber in self.subscribers[topic]\n ]\n await asyncio.gather(*tasks)\n\n async def broadcast(self, topic: str, message: str):\n await self.publish(topic, message)\n\n async def subscribe(self, topic: str, handler: Callable[[str, Any], Awaitable[None]]):\n if topic not in self.subscribers:\n self.subscribers[topic] = []\n self.subscribers[topic].append(handler)\n\n async def wait(self):\n await asyncio.Event().wait()\n```\n\nBut this on-memory version is not useful for me. I want to use RabbitMQ as the message broker. I'm going to create also a synchronous and an asynchronous version also. In this version I\u00b4m going to create two kind of ways to publish messages. One way is a simple publish, and the other way is a broadcast. The broadcast is going to send the message to all the subscribers of the topic, and the publishing is going to send the message to only one subscriber, using a round-robin strategy.\n\nThe synchronous version:\nThe listener:\n\n```python\nimport logging\n\nfrom bus_queue import EventBus\nfrom bus_queue.backend.rabbit.bus import RabbitEventBus as Bus\n\nlogging.basicConfig(\n format='%(asctime)s [%(levelname)s] %(message)s',\n level='INFO',\n datefmt='%d/%m/%Y %X')\n\nfor l in ['pika', ]:\n logging.getLogger(l).setLevel(logging.WARNING)\n\nlogger = logging.getLogger(__name__)\n\n\ndef callback(topic, msg):\n logger.info(f\"Received: topic: {topic} msg: {msg}\")\n\n\ndef main():\n backend = Bus(\"amqp://guest:guest@localhost:5672/\")\n bus = EventBus(backend)\n\n bus.subscribe(\"test\", callback)\n bus.wait()\n\n\nif __name__ == \"__main__\":\n main()\n```\n\nAnd the publisher:\n\n```python\nimport logging\n\nfrom bus_queue.backend.rabbit.bus import RabbitEventBus as Bus\nfrom bus_queue import EventBus\n\nlogging.basicConfig(\n format='%(asctime)s [%(levelname)s] %(message)s',\n level='INFO',\n datefmt='%d/%m/%Y %X')\n\nfor l in ['pika',]:\n logging.getLogger(l).setLevel(logging. WARNING)\n\nlogger = logging.getLogger(__name__)\n\n\ndef main():\n backend = Bus(\"amqp://guest:guest@localhost:5672/\")\n bus = EventBus(backend)\n\n bus.publish(\"test\", dict(hola=\"Gonzalo\"))\n bus.broadcast(\"test\", \"Hola, broadcast\")\n\n\nif __name__ == \"__main__\":\n main()\n```\n\nThe implementation is like that:\n\n```python\nimport logging\nfrom typing import Callable, Dict, Any, List\n\nimport pika\n\nfrom bus_queue import Backend\n\nlogger = logging.getLogger(__name__)\n\n\ndef get_broadcast_exchange_from_topic(topic: str):\n return f\"broadcast_{topic}\"\n\n\nclass RabbitEventBus(Backend):\n def __init__(self, rabbitmq_url: str, max_retries: int = 3):\n self.rabbitmq_url = rabbitmq_url\n self.subscribers: Dict[str, List[Callable[[str, Any], None]]] = {}\n self.connection = None\n self.channel = None\n self.max_retries = max_retries\n\n def connect(self):\n self.connection = pika.BlockingConnection(pika.URLParameters(self.rabbitmq_url))\n self.channel = self.connection.channel()\n\n def broadcast(self, topic: str, payload: Any):\n if self.channel is None:\n self.connect()\n exchange = get_broadcast_exchange_from_topic(topic)\n self.channel.exchange_declare(exchange=exchange, exchange_type='fanout')\n self.channel.basic_publish(exchange=exchange, routing_key='', body=payload.encode())\n\n def publish(self, topic: str, payload: Any):\n if self.channel is None:\n self.connect()\n self.channel.basic_publish(exchange='', routing_key=topic, body=payload.encode())\n\n def subscribe(self, topic: str, handler: Callable[[str, Any], None]):\n if topic not in self.subscribers:\n self.subscribers[topic] = []\n self.subscribers[topic].append(handler)\n\n if self.channel is None:\n self.connect()\n\n self.channel.queue_declare(queue=topic, auto_delete=True)\n exchange = get_broadcast_exchange_from_topic(topic)\n self.channel.exchange_declare(exchange=exchange, exchange_type='fanout')\n\n result = self.channel.queue_declare(queue='', exclusive=True)\n queue_name = result.method.queue\n\n self.channel.queue_bind(exchange=exchange, queue=queue_name)\n\n def on_message(ch, method, properties, body):\n for subscriber in self.subscribers[topic]:\n try:\n subscriber(topic, body.decode())\n ch.basic_ack(delivery_tag=method.delivery_tag)\n except Exception as ex:\n logger.exception(ex)\n if method.delivery_tag <= self.max_retries:\n logger.info(f\"Retrying message ({method.delivery_tag}/{self.max_retries})\")\n self.channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True)\n else:\n logger.info(f\"Max retries. max_retries: {self.max_retries})\")\n self.channel.basic_ack(delivery_tag=method.delivery_tag)\n\n self.channel.basic_consume(queue=topic, on_message_callback=on_message, auto_ack=False)\n self.channel.basic_consume(queue=queue_name, on_message_callback=on_message, auto_ack=False)\n\n def wait(self):\n if self.channel is None:\n self.connect()\n self.channel.basic_qos(prefetch_count=1)\n self.channel.start_consuming()\n```\n\nAnd the asynchronous version:\n\nThe listener:\n\n```python\nimport asyncio\nimport logging\n\nfrom bus_queue import AsyncEventBus\nfrom bus_queue.backend.rabbit.assync_bus import AsyncRabbitEventBus as Bus\n\nlogging.basicConfig(\n format='%(asctime)s [%(levelname)s] %(message)s',\n level='INFO',\n datefmt='%d/%m/%Y %X')\n\nfor l in ['asyncio', 'aio-pika']:\n logging.getLogger(l).setLevel(logging. WARNING)\n\nlogger = logging.getLogger(__name__)\n\n\nasync def callback(topic, msg):\n logger.info(f\"Received: topic: {topic} msg: {msg}\")\n\n\nasync def main():\n backend = Bus(\"amqp://guest:guest@localhost:5672/\")\n bus = AsyncEventBus(backend)\n\n await bus.subscribe(\"test\", callback)\n await bus.wait()\n\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\nThe implementation is like that:\n\n```python\nimport asyncio\nimport logging\nfrom typing import Callable, Dict, List, Any, Awaitable\n\nimport aio_pika\n\nfrom bus_queue import AsyncBackend\n\nlogger = logging.getLogger(__name__)\n\n\ndef get_broadcast_exchange_from_topic(topic: str):\n return f\"broadcast_{topic}\"\n\n\nclass AsyncRabbitEventBus(AsyncBackend):\n def __init__(self, rabbitmq_url: str, max_retries: int = 3):\n self.subscribers: Dict[str, List[Callable[[str, Any], Awaitable[None]]]] = {}\n self.rabbitmq_url = rabbitmq_url\n self.max_retries = max_retries\n\n async def broadcast(self, topic: str, payload: Any):\n connection = await aio_pika.connect_robust(self.rabbitmq_url)\n exchange_type = aio_pika.ExchangeType.FANOUT\n exchange = get_broadcast_exchange_from_topic(topic)\n async with connection:\n channel = await connection.channel()\n exchange = await channel.declare_exchange(exchange, exchange_type)\n await exchange.publish(\n aio_pika.Message(body=payload.encode()),\n routing_key=topic\n )\n\n async def publish(self, topic: str, payload: Any):\n connection = await aio_pika.connect_robust(self.rabbitmq_url)\n async with connection:\n channel = await connection.channel()\n await channel.default_exchange.publish(\n aio_pika.Message(body=payload.encode()),\n routing_key=topic\n )\n\n async def subscribe(self, topic: str, handler: Callable[[str, Any], Awaitable[None]]):\n if topic not in self.subscribers:\n self.subscribers[topic] = []\n self.subscribers[topic].append(handler)\n exchange = get_broadcast_exchange_from_topic(topic)\n\n connection = await aio_pika.connect_robust(self.rabbitmq_url)\n async with connection:\n channel = await connection.channel()\n\n direct_queue = await channel.declare_queue(topic, auto_delete=True)\n broadcast_exchange = await channel.declare_exchange(exchange, aio_pika.ExchangeType.FANOUT)\n broadcast_queue = await channel.declare_queue('', exclusive=True)\n await broadcast_queue.bind(broadcast_exchange)\n\n async def process_queue(queue_iter):\n async for message in queue_iter:\n try:\n await handler(topic, message.body.decode())\n await message.ack()\n except Exception as ex:\n if message.delivery_tag <= self.max_retries:\n logger.info(f\"Retrying message ({message.delivery_tag}/{self.max_retries})\")\n await message.nack(requeue=True)\n else:\n logger.exception(ex)\n logger.info(\n f\"Max retries. Discarding event (max_retries: {self.max_retries})\")\n await message.ack()\n\n async with direct_queue.iterator() as direct_queue_iter, broadcast_queue.iterator() as broadcast_queue_iter:\n await asyncio.gather(\n process_queue(direct_queue_iter),\n process_queue(broadcast_queue_iter)\n )\n\n async def wait(self):\n await asyncio.Event().wait()\n```\n\nAnd that's all. The library can be installed with poetry in both versions: async and sync. You can use pip or poetry to install the library. \n\n```bash\n\nFor the sync version:\n```bash\npoetry add bus_queue --extras \"sync\"\npip install bus_queue[sync]\n```\n\nand for the async version:\n\n```bash\npoetry add bus_queue --extras \"async\"\npip install bus_queue[async]\n```\n\n",
"bugtrack_url": null,
"license": null,
"summary": null,
"version": "0.5.4",
"project_urls": null,
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "013ebe1bc759eca9e76f313317a2c584c38ddcbcc7bd514763cb6d537c1a62de",
"md5": "3da1eb5aabfe50306601a515669363dd",
"sha256": "157580c649375f41afb99b7e35e28f461ca591c306b1415757a7871183ad2193"
},
"downloads": -1,
"filename": "bus_queue-0.5.4-py3-none-any.whl",
"has_sig": false,
"md5_digest": "3da1eb5aabfe50306601a515669363dd",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.9",
"size": 8039,
"upload_time": "2025-01-15T10:01:41",
"upload_time_iso_8601": "2025-01-15T10:01:41.039801Z",
"url": "https://files.pythonhosted.org/packages/01/3e/be1bc759eca9e76f313317a2c584c38ddcbcc7bd514763cb6d537c1a62de/bus_queue-0.5.4-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "fccac77bd07eaee8a34137dda43cc19550bb44f817e268c5605f9470a4037da0",
"md5": "01c167c9a782f685d12da7d4205b65da",
"sha256": "e66258baa81b4f82f62d29380f6c298be492f33b029c90f504eff45318b91095"
},
"downloads": -1,
"filename": "bus_queue-0.5.4.tar.gz",
"has_sig": false,
"md5_digest": "01c167c9a782f685d12da7d4205b65da",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.9",
"size": 5337,
"upload_time": "2025-01-15T10:01:42",
"upload_time_iso_8601": "2025-01-15T10:01:42.191990Z",
"url": "https://files.pythonhosted.org/packages/fc/ca/c77bd07eaee8a34137dda43cc19550bb44f817e268c5605f9470a4037da0/bus_queue-0.5.4.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-01-15 10:01:42",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "bus_queue"
}