bus_queue


Namebus_queue JSON
Version 0.5.4 PyPI version JSON
download
home_pageNone
SummaryNone
upload_time2025-01-15 10:01:42
maintainerNone
docs_urlNone
authorGonzalo Ayuso
requires_python<4.0,>=3.9
licenseNone
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            ## Event Bus in Python with RabbitMQ: Exploring Synchronous and Asynchronous Solutions

Today we're going to build an event bus with Python. It's an event bus according to my personal needs. The idea is to create a scalable event bus, with RabbitMQ as the message broker but easy to replace with another message broker such as MQTT or Redis. In fact, I've started with a memomry-based message broker. I'm not going to use never this on-memory message broker, but it was a good start to understand the basics of the event bus.

That's the on memory version:

```python
import logging

from bus_queue.backend.memory.bus import MemoryEventBus as Bus
from bus_queue import EventBus

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)


def callback(topic, msg):
    logger.info(f"Received: topic: {topic} msg: {msg}")


def main():
    backend = Bus()
    bus = EventBus(backend)

    bus.subscribe("test", callback)

    bus.publish("test", dict(hola="Gonzalo"))
    bus.wait()


if __name__ == "__main__":
    main()
```

This on-memory version uses this implementation:

```python
from time import sleep
from typing import Callable, Dict, List, Any

from bus_queue import Backend


class MemoryEventBus(Backend):
    def __init__(self):
        self.subscribers: Dict[str, List[Callable[[str, Any], None]]] = {}

    def publish(self, topic: str, message: str) -> None:
        if topic in self.subscribers:
            for callback in self.subscribers[topic]:
                callback(topic, message)

    def broadcast(self, topic: str, payload: Any):
        self.publish(topic, payload)

    def subscribe(self, topic: str, callback: Callable[[str, Any], None]) -> None:
        if topic not in self.subscribers:
            self.subscribers[topic] = []
        self.subscribers[topic].append(callback)

    def wait(self):
        while True:
            sleep(1)
```

This implementation is a synchronous version. I also want to create an asynchronous version. 

```python
import asyncio
import logging

from bus_queue.backend.memory.assync_bus import AsyncMemoryEventBus as Bus
from bus_queue import AsyncEventBus

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

for l in ['asyncio', ]:
    logging.getLogger(l).setLevel(logging. WARNING)

logger = logging.getLogger(__name__)


async def callback(topic, msg):
    logger.info(f"Received: topic: {topic} msg: {msg}")


async def main():
    backend = Bus()
    bus = AsyncEventBus(backend)

    await bus.subscribe("test", callback)

    await bus.publish("test", dict(hola="Gonzalo"))
    await bus.wait()


if __name__ == "__main__":
    asyncio.run(main())

```

```python
import asyncio
from typing import Callable, Dict, List, Any, Awaitable

from bus_queue import AsyncBackend


class AsyncMemoryEventBus(AsyncBackend):
    def __init__(self):
        self.subscribers: Dict[str, List[Callable[[str, Any], Awaitable[None]]]] = {}

    async def publish(self, topic: str, message: str):
        if topic in self.subscribers:
            tasks = [
                asyncio.create_task(subscriber(topic, message))
                for subscriber in self.subscribers[topic]
            ]
            await asyncio.gather(*tasks)

    async def broadcast(self, topic: str, message: str):
        await self.publish(topic, message)

    async def subscribe(self, topic: str, handler: Callable[[str, Any], Awaitable[None]]):
        if topic not in self.subscribers:
            self.subscribers[topic] = []
        self.subscribers[topic].append(handler)

    async def wait(self):
        await asyncio.Event().wait()
```

But this on-memory version is not useful for me. I want to use RabbitMQ as the message broker. I'm going to create also a synchronous and an asynchronous version also. In this version I´m going to create two kind of ways to publish messages. One way is a simple publish, and the other way is a broadcast. The broadcast is going to send the message to all the subscribers of the topic, and the publishing is going to send the message to only one subscriber, using a round-robin strategy.

The synchronous version:
The listener:

```python
import logging

from bus_queue import EventBus
from bus_queue.backend.rabbit.bus import RabbitEventBus as Bus

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

for l in ['pika', ]:
    logging.getLogger(l).setLevel(logging.WARNING)

logger = logging.getLogger(__name__)


def callback(topic, msg):
    logger.info(f"Received: topic: {topic} msg: {msg}")


def main():
    backend = Bus("amqp://guest:guest@localhost:5672/")
    bus = EventBus(backend)

    bus.subscribe("test", callback)
    bus.wait()


if __name__ == "__main__":
    main()
```

And the publisher:

```python
import logging

from bus_queue.backend.rabbit.bus import RabbitEventBus as Bus
from bus_queue import EventBus

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

for l in ['pika',]:
    logging.getLogger(l).setLevel(logging. WARNING)

logger = logging.getLogger(__name__)


def main():
    backend = Bus("amqp://guest:guest@localhost:5672/")
    bus = EventBus(backend)

    bus.publish("test", dict(hola="Gonzalo"))
    bus.broadcast("test", "Hola, broadcast")


if __name__ == "__main__":
    main()
```

The implementation is like that:

```python
import logging
from typing import Callable, Dict, Any, List

import pika

from bus_queue import Backend

logger = logging.getLogger(__name__)


def get_broadcast_exchange_from_topic(topic: str):
    return f"broadcast_{topic}"


class RabbitEventBus(Backend):
    def __init__(self, rabbitmq_url: str, max_retries: int = 3):
        self.rabbitmq_url = rabbitmq_url
        self.subscribers: Dict[str, List[Callable[[str, Any], None]]] = {}
        self.connection = None
        self.channel = None
        self.max_retries = max_retries

    def connect(self):
        self.connection = pika.BlockingConnection(pika.URLParameters(self.rabbitmq_url))
        self.channel = self.connection.channel()

    def broadcast(self, topic: str, payload: Any):
        if self.channel is None:
            self.connect()
        exchange = get_broadcast_exchange_from_topic(topic)
        self.channel.exchange_declare(exchange=exchange, exchange_type='fanout')
        self.channel.basic_publish(exchange=exchange, routing_key='', body=payload.encode())

    def publish(self, topic: str, payload: Any):
        if self.channel is None:
            self.connect()
        self.channel.basic_publish(exchange='', routing_key=topic, body=payload.encode())

    def subscribe(self, topic: str, handler: Callable[[str, Any], None]):
        if topic not in self.subscribers:
            self.subscribers[topic] = []
        self.subscribers[topic].append(handler)

        if self.channel is None:
            self.connect()

        self.channel.queue_declare(queue=topic, auto_delete=True)
        exchange = get_broadcast_exchange_from_topic(topic)
        self.channel.exchange_declare(exchange=exchange, exchange_type='fanout')

        result = self.channel.queue_declare(queue='', exclusive=True)
        queue_name = result.method.queue

        self.channel.queue_bind(exchange=exchange, queue=queue_name)

        def on_message(ch, method, properties, body):
            for subscriber in self.subscribers[topic]:
                try:
                    subscriber(topic, body.decode())
                    ch.basic_ack(delivery_tag=method.delivery_tag)
                except Exception as ex:
                    logger.exception(ex)
                    if method.delivery_tag <= self.max_retries:
                        logger.info(f"Retrying message ({method.delivery_tag}/{self.max_retries})")
                        self.channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
                    else:
                        logger.info(f"Max retries. max_retries: {self.max_retries})")
                        self.channel.basic_ack(delivery_tag=method.delivery_tag)

        self.channel.basic_consume(queue=topic, on_message_callback=on_message, auto_ack=False)
        self.channel.basic_consume(queue=queue_name, on_message_callback=on_message, auto_ack=False)

    def wait(self):
        if self.channel is None:
            self.connect()
        self.channel.basic_qos(prefetch_count=1)
        self.channel.start_consuming()
```

And the asynchronous version:

The listener:

```python
import asyncio
import logging

from bus_queue import AsyncEventBus
from bus_queue.backend.rabbit.assync_bus import AsyncRabbitEventBus as Bus

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

for l in ['asyncio', 'aio-pika']:
    logging.getLogger(l).setLevel(logging. WARNING)

logger = logging.getLogger(__name__)


async def callback(topic, msg):
    logger.info(f"Received: topic: {topic} msg: {msg}")


async def main():
    backend = Bus("amqp://guest:guest@localhost:5672/")
    bus = AsyncEventBus(backend)

    await bus.subscribe("test", callback)
    await bus.wait()


if __name__ == "__main__":
    asyncio.run(main())
```

The implementation is like that:

```python
import asyncio
import logging
from typing import Callable, Dict, List, Any, Awaitable

import aio_pika

from bus_queue import AsyncBackend

logger = logging.getLogger(__name__)


def get_broadcast_exchange_from_topic(topic: str):
    return f"broadcast_{topic}"


class AsyncRabbitEventBus(AsyncBackend):
    def __init__(self, rabbitmq_url: str, max_retries: int = 3):
        self.subscribers: Dict[str, List[Callable[[str, Any], Awaitable[None]]]] = {}
        self.rabbitmq_url = rabbitmq_url
        self.max_retries = max_retries

    async def broadcast(self, topic: str, payload: Any):
        connection = await aio_pika.connect_robust(self.rabbitmq_url)
        exchange_type = aio_pika.ExchangeType.FANOUT
        exchange = get_broadcast_exchange_from_topic(topic)
        async with connection:
            channel = await connection.channel()
            exchange = await channel.declare_exchange(exchange, exchange_type)
            await exchange.publish(
                aio_pika.Message(body=payload.encode()),
                routing_key=topic
            )

    async def publish(self, topic: str, payload: Any):
        connection = await aio_pika.connect_robust(self.rabbitmq_url)
        async with connection:
            channel = await connection.channel()
            await channel.default_exchange.publish(
                aio_pika.Message(body=payload.encode()),
                routing_key=topic
            )

    async def subscribe(self, topic: str, handler: Callable[[str, Any], Awaitable[None]]):
        if topic not in self.subscribers:
            self.subscribers[topic] = []
        self.subscribers[topic].append(handler)
        exchange = get_broadcast_exchange_from_topic(topic)

        connection = await aio_pika.connect_robust(self.rabbitmq_url)
        async with connection:
            channel = await connection.channel()

            direct_queue = await channel.declare_queue(topic, auto_delete=True)
            broadcast_exchange = await channel.declare_exchange(exchange, aio_pika.ExchangeType.FANOUT)
            broadcast_queue = await channel.declare_queue('', exclusive=True)
            await broadcast_queue.bind(broadcast_exchange)

            async def process_queue(queue_iter):
                async for message in queue_iter:
                    try:
                        await handler(topic, message.body.decode())
                        await message.ack()
                    except Exception as ex:
                        if message.delivery_tag <= self.max_retries:
                            logger.info(f"Retrying message ({message.delivery_tag}/{self.max_retries})")
                            await message.nack(requeue=True)
                        else:
                            logger.exception(ex)
                            logger.info(
                                f"Max retries. Discarding event (max_retries: {self.max_retries})")
                            await message.ack()

            async with direct_queue.iterator() as direct_queue_iter, broadcast_queue.iterator() as broadcast_queue_iter:
                await asyncio.gather(
                    process_queue(direct_queue_iter),
                    process_queue(broadcast_queue_iter)
                )

    async def wait(self):
        await asyncio.Event().wait()
```

And that's all. The library can be installed with poetry in both versions: async and sync. You can use pip or poetry to install the library. 

```bash

For the sync version:
```bash
poetry add bus_queue --extras "sync"
pip install bus_queue[sync]
```

and for the async version:

```bash
poetry add bus_queue --extras "async"
pip install bus_queue[async]
```


            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "bus_queue",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.9",
    "maintainer_email": null,
    "keywords": null,
    "author": "Gonzalo Ayuso",
    "author_email": "gonzalo123@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/fc/ca/c77bd07eaee8a34137dda43cc19550bb44f817e268c5605f9470a4037da0/bus_queue-0.5.4.tar.gz",
    "platform": null,
    "description": "## Event Bus in Python with RabbitMQ: Exploring Synchronous and Asynchronous Solutions\n\nToday we're going to build an event bus with Python. It's an event bus according to my personal needs. The idea is to create a scalable event bus, with RabbitMQ as the message broker but easy to replace with another message broker such as MQTT or Redis. In fact, I've started with a memomry-based message broker. I'm not going to use never this on-memory message broker, but it was a good start to understand the basics of the event bus.\n\nThat's the on memory version:\n\n```python\nimport logging\n\nfrom bus_queue.backend.memory.bus import MemoryEventBus as Bus\nfrom bus_queue import EventBus\n\nlogging.basicConfig(\n    format='%(asctime)s [%(levelname)s] %(message)s',\n    level='INFO',\n    datefmt='%d/%m/%Y %X')\n\nlogger = logging.getLogger(__name__)\n\n\ndef callback(topic, msg):\n    logger.info(f\"Received: topic: {topic} msg: {msg}\")\n\n\ndef main():\n    backend = Bus()\n    bus = EventBus(backend)\n\n    bus.subscribe(\"test\", callback)\n\n    bus.publish(\"test\", dict(hola=\"Gonzalo\"))\n    bus.wait()\n\n\nif __name__ == \"__main__\":\n    main()\n```\n\nThis on-memory version uses this implementation:\n\n```python\nfrom time import sleep\nfrom typing import Callable, Dict, List, Any\n\nfrom bus_queue import Backend\n\n\nclass MemoryEventBus(Backend):\n    def __init__(self):\n        self.subscribers: Dict[str, List[Callable[[str, Any], None]]] = {}\n\n    def publish(self, topic: str, message: str) -> None:\n        if topic in self.subscribers:\n            for callback in self.subscribers[topic]:\n                callback(topic, message)\n\n    def broadcast(self, topic: str, payload: Any):\n        self.publish(topic, payload)\n\n    def subscribe(self, topic: str, callback: Callable[[str, Any], None]) -> None:\n        if topic not in self.subscribers:\n            self.subscribers[topic] = []\n        self.subscribers[topic].append(callback)\n\n    def wait(self):\n        while True:\n            sleep(1)\n```\n\nThis implementation is a synchronous version. I also want to create an asynchronous version. \n\n```python\nimport asyncio\nimport logging\n\nfrom bus_queue.backend.memory.assync_bus import AsyncMemoryEventBus as Bus\nfrom bus_queue import AsyncEventBus\n\nlogging.basicConfig(\n    format='%(asctime)s [%(levelname)s] %(message)s',\n    level='INFO',\n    datefmt='%d/%m/%Y %X')\n\nfor l in ['asyncio', ]:\n    logging.getLogger(l).setLevel(logging. WARNING)\n\nlogger = logging.getLogger(__name__)\n\n\nasync def callback(topic, msg):\n    logger.info(f\"Received: topic: {topic} msg: {msg}\")\n\n\nasync def main():\n    backend = Bus()\n    bus = AsyncEventBus(backend)\n\n    await bus.subscribe(\"test\", callback)\n\n    await bus.publish(\"test\", dict(hola=\"Gonzalo\"))\n    await bus.wait()\n\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n\n```\n\n```python\nimport asyncio\nfrom typing import Callable, Dict, List, Any, Awaitable\n\nfrom bus_queue import AsyncBackend\n\n\nclass AsyncMemoryEventBus(AsyncBackend):\n    def __init__(self):\n        self.subscribers: Dict[str, List[Callable[[str, Any], Awaitable[None]]]] = {}\n\n    async def publish(self, topic: str, message: str):\n        if topic in self.subscribers:\n            tasks = [\n                asyncio.create_task(subscriber(topic, message))\n                for subscriber in self.subscribers[topic]\n            ]\n            await asyncio.gather(*tasks)\n\n    async def broadcast(self, topic: str, message: str):\n        await self.publish(topic, message)\n\n    async def subscribe(self, topic: str, handler: Callable[[str, Any], Awaitable[None]]):\n        if topic not in self.subscribers:\n            self.subscribers[topic] = []\n        self.subscribers[topic].append(handler)\n\n    async def wait(self):\n        await asyncio.Event().wait()\n```\n\nBut this on-memory version is not useful for me. I want to use RabbitMQ as the message broker. I'm going to create also a synchronous and an asynchronous version also. In this version I\u00b4m going to create two kind of ways to publish messages. One way is a simple publish, and the other way is a broadcast. The broadcast is going to send the message to all the subscribers of the topic, and the publishing is going to send the message to only one subscriber, using a round-robin strategy.\n\nThe synchronous version:\nThe listener:\n\n```python\nimport logging\n\nfrom bus_queue import EventBus\nfrom bus_queue.backend.rabbit.bus import RabbitEventBus as Bus\n\nlogging.basicConfig(\n    format='%(asctime)s [%(levelname)s] %(message)s',\n    level='INFO',\n    datefmt='%d/%m/%Y %X')\n\nfor l in ['pika', ]:\n    logging.getLogger(l).setLevel(logging.WARNING)\n\nlogger = logging.getLogger(__name__)\n\n\ndef callback(topic, msg):\n    logger.info(f\"Received: topic: {topic} msg: {msg}\")\n\n\ndef main():\n    backend = Bus(\"amqp://guest:guest@localhost:5672/\")\n    bus = EventBus(backend)\n\n    bus.subscribe(\"test\", callback)\n    bus.wait()\n\n\nif __name__ == \"__main__\":\n    main()\n```\n\nAnd the publisher:\n\n```python\nimport logging\n\nfrom bus_queue.backend.rabbit.bus import RabbitEventBus as Bus\nfrom bus_queue import EventBus\n\nlogging.basicConfig(\n    format='%(asctime)s [%(levelname)s] %(message)s',\n    level='INFO',\n    datefmt='%d/%m/%Y %X')\n\nfor l in ['pika',]:\n    logging.getLogger(l).setLevel(logging. WARNING)\n\nlogger = logging.getLogger(__name__)\n\n\ndef main():\n    backend = Bus(\"amqp://guest:guest@localhost:5672/\")\n    bus = EventBus(backend)\n\n    bus.publish(\"test\", dict(hola=\"Gonzalo\"))\n    bus.broadcast(\"test\", \"Hola, broadcast\")\n\n\nif __name__ == \"__main__\":\n    main()\n```\n\nThe implementation is like that:\n\n```python\nimport logging\nfrom typing import Callable, Dict, Any, List\n\nimport pika\n\nfrom bus_queue import Backend\n\nlogger = logging.getLogger(__name__)\n\n\ndef get_broadcast_exchange_from_topic(topic: str):\n    return f\"broadcast_{topic}\"\n\n\nclass RabbitEventBus(Backend):\n    def __init__(self, rabbitmq_url: str, max_retries: int = 3):\n        self.rabbitmq_url = rabbitmq_url\n        self.subscribers: Dict[str, List[Callable[[str, Any], None]]] = {}\n        self.connection = None\n        self.channel = None\n        self.max_retries = max_retries\n\n    def connect(self):\n        self.connection = pika.BlockingConnection(pika.URLParameters(self.rabbitmq_url))\n        self.channel = self.connection.channel()\n\n    def broadcast(self, topic: str, payload: Any):\n        if self.channel is None:\n            self.connect()\n        exchange = get_broadcast_exchange_from_topic(topic)\n        self.channel.exchange_declare(exchange=exchange, exchange_type='fanout')\n        self.channel.basic_publish(exchange=exchange, routing_key='', body=payload.encode())\n\n    def publish(self, topic: str, payload: Any):\n        if self.channel is None:\n            self.connect()\n        self.channel.basic_publish(exchange='', routing_key=topic, body=payload.encode())\n\n    def subscribe(self, topic: str, handler: Callable[[str, Any], None]):\n        if topic not in self.subscribers:\n            self.subscribers[topic] = []\n        self.subscribers[topic].append(handler)\n\n        if self.channel is None:\n            self.connect()\n\n        self.channel.queue_declare(queue=topic, auto_delete=True)\n        exchange = get_broadcast_exchange_from_topic(topic)\n        self.channel.exchange_declare(exchange=exchange, exchange_type='fanout')\n\n        result = self.channel.queue_declare(queue='', exclusive=True)\n        queue_name = result.method.queue\n\n        self.channel.queue_bind(exchange=exchange, queue=queue_name)\n\n        def on_message(ch, method, properties, body):\n            for subscriber in self.subscribers[topic]:\n                try:\n                    subscriber(topic, body.decode())\n                    ch.basic_ack(delivery_tag=method.delivery_tag)\n                except Exception as ex:\n                    logger.exception(ex)\n                    if method.delivery_tag <= self.max_retries:\n                        logger.info(f\"Retrying message ({method.delivery_tag}/{self.max_retries})\")\n                        self.channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True)\n                    else:\n                        logger.info(f\"Max retries. max_retries: {self.max_retries})\")\n                        self.channel.basic_ack(delivery_tag=method.delivery_tag)\n\n        self.channel.basic_consume(queue=topic, on_message_callback=on_message, auto_ack=False)\n        self.channel.basic_consume(queue=queue_name, on_message_callback=on_message, auto_ack=False)\n\n    def wait(self):\n        if self.channel is None:\n            self.connect()\n        self.channel.basic_qos(prefetch_count=1)\n        self.channel.start_consuming()\n```\n\nAnd the asynchronous version:\n\nThe listener:\n\n```python\nimport asyncio\nimport logging\n\nfrom bus_queue import AsyncEventBus\nfrom bus_queue.backend.rabbit.assync_bus import AsyncRabbitEventBus as Bus\n\nlogging.basicConfig(\n    format='%(asctime)s [%(levelname)s] %(message)s',\n    level='INFO',\n    datefmt='%d/%m/%Y %X')\n\nfor l in ['asyncio', 'aio-pika']:\n    logging.getLogger(l).setLevel(logging. WARNING)\n\nlogger = logging.getLogger(__name__)\n\n\nasync def callback(topic, msg):\n    logger.info(f\"Received: topic: {topic} msg: {msg}\")\n\n\nasync def main():\n    backend = Bus(\"amqp://guest:guest@localhost:5672/\")\n    bus = AsyncEventBus(backend)\n\n    await bus.subscribe(\"test\", callback)\n    await bus.wait()\n\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n```\n\nThe implementation is like that:\n\n```python\nimport asyncio\nimport logging\nfrom typing import Callable, Dict, List, Any, Awaitable\n\nimport aio_pika\n\nfrom bus_queue import AsyncBackend\n\nlogger = logging.getLogger(__name__)\n\n\ndef get_broadcast_exchange_from_topic(topic: str):\n    return f\"broadcast_{topic}\"\n\n\nclass AsyncRabbitEventBus(AsyncBackend):\n    def __init__(self, rabbitmq_url: str, max_retries: int = 3):\n        self.subscribers: Dict[str, List[Callable[[str, Any], Awaitable[None]]]] = {}\n        self.rabbitmq_url = rabbitmq_url\n        self.max_retries = max_retries\n\n    async def broadcast(self, topic: str, payload: Any):\n        connection = await aio_pika.connect_robust(self.rabbitmq_url)\n        exchange_type = aio_pika.ExchangeType.FANOUT\n        exchange = get_broadcast_exchange_from_topic(topic)\n        async with connection:\n            channel = await connection.channel()\n            exchange = await channel.declare_exchange(exchange, exchange_type)\n            await exchange.publish(\n                aio_pika.Message(body=payload.encode()),\n                routing_key=topic\n            )\n\n    async def publish(self, topic: str, payload: Any):\n        connection = await aio_pika.connect_robust(self.rabbitmq_url)\n        async with connection:\n            channel = await connection.channel()\n            await channel.default_exchange.publish(\n                aio_pika.Message(body=payload.encode()),\n                routing_key=topic\n            )\n\n    async def subscribe(self, topic: str, handler: Callable[[str, Any], Awaitable[None]]):\n        if topic not in self.subscribers:\n            self.subscribers[topic] = []\n        self.subscribers[topic].append(handler)\n        exchange = get_broadcast_exchange_from_topic(topic)\n\n        connection = await aio_pika.connect_robust(self.rabbitmq_url)\n        async with connection:\n            channel = await connection.channel()\n\n            direct_queue = await channel.declare_queue(topic, auto_delete=True)\n            broadcast_exchange = await channel.declare_exchange(exchange, aio_pika.ExchangeType.FANOUT)\n            broadcast_queue = await channel.declare_queue('', exclusive=True)\n            await broadcast_queue.bind(broadcast_exchange)\n\n            async def process_queue(queue_iter):\n                async for message in queue_iter:\n                    try:\n                        await handler(topic, message.body.decode())\n                        await message.ack()\n                    except Exception as ex:\n                        if message.delivery_tag <= self.max_retries:\n                            logger.info(f\"Retrying message ({message.delivery_tag}/{self.max_retries})\")\n                            await message.nack(requeue=True)\n                        else:\n                            logger.exception(ex)\n                            logger.info(\n                                f\"Max retries. Discarding event (max_retries: {self.max_retries})\")\n                            await message.ack()\n\n            async with direct_queue.iterator() as direct_queue_iter, broadcast_queue.iterator() as broadcast_queue_iter:\n                await asyncio.gather(\n                    process_queue(direct_queue_iter),\n                    process_queue(broadcast_queue_iter)\n                )\n\n    async def wait(self):\n        await asyncio.Event().wait()\n```\n\nAnd that's all. The library can be installed with poetry in both versions: async and sync. You can use pip or poetry to install the library. \n\n```bash\n\nFor the sync version:\n```bash\npoetry add bus_queue --extras \"sync\"\npip install bus_queue[sync]\n```\n\nand for the async version:\n\n```bash\npoetry add bus_queue --extras \"async\"\npip install bus_queue[async]\n```\n\n",
    "bugtrack_url": null,
    "license": null,
    "summary": null,
    "version": "0.5.4",
    "project_urls": null,
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "013ebe1bc759eca9e76f313317a2c584c38ddcbcc7bd514763cb6d537c1a62de",
                "md5": "3da1eb5aabfe50306601a515669363dd",
                "sha256": "157580c649375f41afb99b7e35e28f461ca591c306b1415757a7871183ad2193"
            },
            "downloads": -1,
            "filename": "bus_queue-0.5.4-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "3da1eb5aabfe50306601a515669363dd",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.9",
            "size": 8039,
            "upload_time": "2025-01-15T10:01:41",
            "upload_time_iso_8601": "2025-01-15T10:01:41.039801Z",
            "url": "https://files.pythonhosted.org/packages/01/3e/be1bc759eca9e76f313317a2c584c38ddcbcc7bd514763cb6d537c1a62de/bus_queue-0.5.4-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "fccac77bd07eaee8a34137dda43cc19550bb44f817e268c5605f9470a4037da0",
                "md5": "01c167c9a782f685d12da7d4205b65da",
                "sha256": "e66258baa81b4f82f62d29380f6c298be492f33b029c90f504eff45318b91095"
            },
            "downloads": -1,
            "filename": "bus_queue-0.5.4.tar.gz",
            "has_sig": false,
            "md5_digest": "01c167c9a782f685d12da7d4205b65da",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.9",
            "size": 5337,
            "upload_time": "2025-01-15T10:01:42",
            "upload_time_iso_8601": "2025-01-15T10:01:42.191990Z",
            "url": "https://files.pythonhosted.org/packages/fc/ca/c77bd07eaee8a34137dda43cc19550bb44f817e268c5605f9470a4037da0/bus_queue-0.5.4.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-01-15 10:01:42",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "bus_queue"
}
        
Elapsed time: 0.44790s