PyRMQ


NamePyRMQ JSON
Version 3.4.4 PyPI version JSON
download
home_pagehttps://pyrmq.readthedocs.io
SummaryPython with RabbitMQ—simplified so you won't have to.
upload_time2023-11-03 03:23:15
maintainerJasper Sibayan
docs_urlNone
authorAlexandre Gerona
requires_python>=3.8
licenseMIT
keywords rabbitmq pika consumer publisher queue messages
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            <!--suppress HtmlDeprecatedAttribute -->
<div align="center">
  <h1>PyRMQ</h1>
  <a href="https://github.com/first-digital-finance/pyrmq"><img alt="GitHub Workflow Status" src="https://img.shields.io/github/actions/workflow/status/first-digital-finance/pyrmq/testing.yml?branch=master&style=for-the-badge"></a>
  <a href="https://pypi.org/project/PyRMQ/"><img alt="PyPI" src="https://img.shields.io/pypi/v/pyrmq?style=for-the-badge"></a>
  <a href="https://pyrmq.readthedocs.io"><img src='https://readthedocs.org/projects/pyrmq/badge/?version=latest&style=for-the-badge' alt='Documentation Status' /></a>
  <a href="https://codecov.io/gh/first-digital-finance/pyrmq"><img alt="Codecov" src="https://img.shields.io/codecov/c/github/first-digital-finance/pyrmq/master.svg?style=for-the-badge"></a>
  <a href="https://pypi.org/project/PyRMQ/"><img alt="Supports Python >= 3.8" src="https://img.shields.io/pypi/pyversions/pyrmq?style=for-the-badge"/></a>
  <a href="https://mit-license.org" target="_blank"><img src="https://img.shields.io/badge/license-MIT-blue.svg?longCache=true&style=for-the-badge" alt="License"></a> 
  <a href="https://github.com/psf/black"><img alt="Code style: black" src="https://img.shields.io/badge/code%20style-black-000000.svg?longCache=true&style=for-the-badge"></a>
  <a href="https://github.com/PyCQA/isort"><img alt="Imports: isort" src="https://img.shields.io/badge/%20imports-isort-%231674b1?style=for-the-badge&labelColor=ef8336)](https://pycqa.github.io/isort/"></a>
  <p>Python with RabbitMQ—simplified so you won't have to.</p>
</div>

## Features
Stop worrying about boilerplating and implementing retry logic for your queues. PyRMQ already
does it for you.
- Use out-of-the-box `Consumer` and `Publisher` classes created from `pika` for your projects and tests.
- Custom DLX-DLK-based retry logic for message consumption.
- Message priorities
- Works with Python 3.
- Production ready

## Getting Started
### Installation
PyRMQ is available at PyPi.
```shell script
pip install pyrmq
```
### Usage
#### Publishing
Just instantiate the feature you want with their respective settings.
PyRMQ already works out of the box with RabbitMQ's [default initialization settings](https://hub.docker.com/_/rabbitmq).
```python
from pyrmq import Publisher
publisher = Publisher(
    exchange_name="exchange_name",
    queue_name="queue_name",
    routing_key="routing_key",
)
publisher.publish({"pyrmq": "My first message"})
```
#### Publish message with priorities
To enable prioritization of messages, instantiate your queue with the queue 
argument `x-max-priority`. It takes an integer that sets the number of possible 
priority values with a higher number commanding more priority. Then, simply 
publish your message with the priority argument specified. Any number higher 
than the set max priority is floored or considered the same.
Read more about message priorities [here](https://www.rabbitmq.com/priority.html).
```python
from pyrmq import Publisher
publisher = Publisher(
    exchange_name="exchange_name",
    queue_name="queue_name",
    routing_key="routing_key",
    queue_args={"x-max-priority": 3},
)
publisher.publish({"pyrmq": "My first message"}, priority=1)
```

| :warning: Warning                                                                                  |
|:---------------------------------------------------------------------------------------------------|
Adding arguments on an existing queue is not possible. If you wish to add queue arguments, you will need to either
delete the existing queue then recreate the queue with arguments or simply make a new queue with the arguments.

#### Consuming
Instantiating a `Consumer` automatically starts it in its own thread making it
non-blocking by default. When run after the code from before, you should be
able to receive the published data.
```python
from pyrmq import Consumer

def callback(data):
    print(f"Received {data}!")

consumer = Consumer(
    exchange_name="exchange_name",
    queue_name="queue_name",
    routing_key="routing_key",
    callback=callback
)
consumer.start()
```

#### DLX-DLK Retry Logic
What if you wanted to retry a failure on a consumed message? PyRMQ offers a custom solution that keeps your message
in queues while retrying periodically for a set amount of times.

This approach uses [dead letter exchanges and queues](https://www.rabbitmq.com/dlx.html) to republish a message to your
original queue once it has expired. PyRMQ creates this "retry" queue for you with the default naming convention of
appending your original queue with `.retry`.

```python
from pyrmq import Consumer

def callback(data):
    print(f"Received {data}!")
    raise Exception

consumer = Consumer(
    exchange_name="exchange_name",
    queue_name="queue_name",
    routing_key="routing_key",
    callback=callback,
    is_dlk_retry_enabled=True,
)
consumer.start()
```

This will start a loop of passing your message between the original queue and the retry queue until it reaches
the default number of `max_retries`.

##### DLX-DLK Retry backoff vs Periodic retries
Since [RabbitMQ does not remove expired messages that aren't at the head of the queue](https://www.rabbitmq.com/ttl.html#per-message-ttl-caveats),
this leads to a congestion of the retry queue that is bottlenecked with an unexpired message
at the head. As such, as of 3.3.0, PyRMQ will be using a simple periodic retry.

#### Using other exchange types
You can use another exchange type just by simply specifying it in the Publisher class. The default is
`direct`. 

```python
from pyrmq import Publisher

queue_args = {"routing.sample": "sample", "x-match": "all"}

publisher = Publisher(
    exchange_name="exchange_name",
    exchange_type="headers",
    queue_args=queue_args
)

message_properties = {"headers": {"routing.sample": "sample"}}
publisher.publish({"pyrmq": "My first message"}, message_properties=message_properties)
```

This is an example of how to publish to a headers exchange that will get routed
based on its headers.

#### Binding an exchange to another exchange
By default, the `exchange_name` you pass when initializing a `Consumer` is declared and bound to the passed
`queue_name`. What if you want to bind and declare this exchange to another exchange as well?

This is done by using `bound_exchange`. This parameter accepts an object with two keys: `name` of your exchange and its
`type`. Let's take a look at an example to see this in action.

```py
from pyrmq import Consumer

def callback(data):
    print(f"Received {data}!")
    raise Exception

consumer = Consumer(
    exchange_name="direct_exchange",
    queue_name="direct_queue",
    routing_key="routing_key",
    bound_exchange={"name": "headers_exchange_name", "type": "headers"},
    callback=callback,
    is_dlk_retry_enabled=True,
)
consumer.start()
```

In the example above, we want to consume from an exchange called `direct_exchange` that is directly bound to queue
`direct_queue`. We want `direct_exchange` to get its messages from another exchange called `headers_exchange_name` of
type `headers`. By using `bound_exchange`, PyRMQ declares `direct_exchange` and `direct_queue` along with any queue or
exchange arguments you may have _first_ then declares the bound exchange next and binds them together. This is done
to alleviate the need to declare your bound exchange manually.

| :warning: Important                                                                                |
|:---------------------------------------------------------------------------------------------------|
Since this method uses [e2e bindings](https://www.rabbitmq.com/e2e.html), if you're using a headers exchange to bind
your consumer to, they _and_ your publisher must all have the same routing key to route the messages properly. This
is not needed for exchange to queue bindings as the routing key is optional for those.

## Documentation
Visit https://pyrmq.readthedocs.io for the most up-to-date documentation.


## Testing
For development, just run:
```shell script
pytest
```
To test for all the supported Python versions:
```shell script
pip install tox
tox
```
To test for a specific Python version:
```shell script
tox -e py38
```



            

Raw data

            {
    "_id": null,
    "home_page": "https://pyrmq.readthedocs.io",
    "name": "PyRMQ",
    "maintainer": "Jasper Sibayan",
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": "sibayanjasper@gmail.com",
    "keywords": "rabbitmq,pika,consumer,publisher,queue,messages",
    "author": "Alexandre Gerona",
    "author_email": "alecgerona@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/26/8f/80740de737f12793bdec901b8462ac19720f21f963aacde2af92ff1b37be/PyRMQ-3.4.4.tar.gz",
    "platform": "any",
    "description": "<!--suppress HtmlDeprecatedAttribute -->\n<div align=\"center\">\n  <h1>PyRMQ</h1>\n  <a href=\"https://github.com/first-digital-finance/pyrmq\"><img alt=\"GitHub Workflow Status\" src=\"https://img.shields.io/github/actions/workflow/status/first-digital-finance/pyrmq/testing.yml?branch=master&style=for-the-badge\"></a>\n  <a href=\"https://pypi.org/project/PyRMQ/\"><img alt=\"PyPI\" src=\"https://img.shields.io/pypi/v/pyrmq?style=for-the-badge\"></a>\n  <a href=\"https://pyrmq.readthedocs.io\"><img src='https://readthedocs.org/projects/pyrmq/badge/?version=latest&style=for-the-badge' alt='Documentation Status' /></a>\n  <a href=\"https://codecov.io/gh/first-digital-finance/pyrmq\"><img alt=\"Codecov\" src=\"https://img.shields.io/codecov/c/github/first-digital-finance/pyrmq/master.svg?style=for-the-badge\"></a>\n  <a href=\"https://pypi.org/project/PyRMQ/\"><img alt=\"Supports Python >= 3.8\" src=\"https://img.shields.io/pypi/pyversions/pyrmq?style=for-the-badge\"/></a>\n  <a href=\"https://mit-license.org\" target=\"_blank\"><img src=\"https://img.shields.io/badge/license-MIT-blue.svg?longCache=true&style=for-the-badge\" alt=\"License\"></a> \n  <a href=\"https://github.com/psf/black\"><img alt=\"Code style: black\" src=\"https://img.shields.io/badge/code%20style-black-000000.svg?longCache=true&style=for-the-badge\"></a>\n  <a href=\"https://github.com/PyCQA/isort\"><img alt=\"Imports: isort\" src=\"https://img.shields.io/badge/%20imports-isort-%231674b1?style=for-the-badge&labelColor=ef8336)](https://pycqa.github.io/isort/\"></a>\n  <p>Python with RabbitMQ\u2014simplified so you won't have to.</p>\n</div>\n\n## Features\nStop worrying about boilerplating and implementing retry logic for your queues. PyRMQ already\ndoes it for you.\n- Use out-of-the-box `Consumer` and `Publisher` classes created from `pika` for your projects and tests.\n- Custom DLX-DLK-based retry logic for message consumption.\n- Message priorities\n- Works with Python 3.\n- Production ready\n\n## Getting Started\n### Installation\nPyRMQ is available at PyPi.\n```shell script\npip install pyrmq\n```\n### Usage\n#### Publishing\nJust instantiate the feature you want with their respective settings.\nPyRMQ already works out of the box with RabbitMQ's [default initialization settings](https://hub.docker.com/_/rabbitmq).\n```python\nfrom pyrmq import Publisher\npublisher = Publisher(\n    exchange_name=\"exchange_name\",\n    queue_name=\"queue_name\",\n    routing_key=\"routing_key\",\n)\npublisher.publish({\"pyrmq\": \"My first message\"})\n```\n#### Publish message with priorities\nTo enable prioritization of messages, instantiate your queue with the queue \nargument `x-max-priority`. It takes an integer that sets the number of possible \npriority values with a higher number commanding more priority. Then, simply \npublish your message with the priority argument specified. Any number higher \nthan the set max priority is floored or considered the same.\nRead more about message priorities [here](https://www.rabbitmq.com/priority.html).\n```python\nfrom pyrmq import Publisher\npublisher = Publisher(\n    exchange_name=\"exchange_name\",\n    queue_name=\"queue_name\",\n    routing_key=\"routing_key\",\n    queue_args={\"x-max-priority\": 3},\n)\npublisher.publish({\"pyrmq\": \"My first message\"}, priority=1)\n```\n\n| :warning: Warning                                                                                  |\n|:---------------------------------------------------------------------------------------------------|\nAdding arguments on an existing queue is not possible. If you wish to add queue arguments, you will need to either\ndelete the existing queue then recreate the queue with arguments or simply make a new queue with the arguments.\n\n#### Consuming\nInstantiating a `Consumer` automatically starts it in its own thread making it\nnon-blocking by default. When run after the code from before, you should be\nable to receive the published data.\n```python\nfrom pyrmq import Consumer\n\ndef callback(data):\n    print(f\"Received {data}!\")\n\nconsumer = Consumer(\n    exchange_name=\"exchange_name\",\n    queue_name=\"queue_name\",\n    routing_key=\"routing_key\",\n    callback=callback\n)\nconsumer.start()\n```\n\n#### DLX-DLK Retry Logic\nWhat if you wanted to retry a failure on a consumed message? PyRMQ offers a custom solution that keeps your message\nin queues while retrying periodically for a set amount of times.\n\nThis approach uses [dead letter exchanges and queues](https://www.rabbitmq.com/dlx.html) to republish a message to your\noriginal queue once it has expired. PyRMQ creates this \"retry\" queue for you with the default naming convention of\nappending your original queue with `.retry`.\n\n```python\nfrom pyrmq import Consumer\n\ndef callback(data):\n    print(f\"Received {data}!\")\n    raise Exception\n\nconsumer = Consumer(\n    exchange_name=\"exchange_name\",\n    queue_name=\"queue_name\",\n    routing_key=\"routing_key\",\n    callback=callback,\n    is_dlk_retry_enabled=True,\n)\nconsumer.start()\n```\n\nThis will start a loop of passing your message between the original queue and the retry queue until it reaches\nthe default number of `max_retries`.\n\n##### DLX-DLK Retry backoff vs Periodic retries\nSince [RabbitMQ does not remove expired messages that aren't at the head of the queue](https://www.rabbitmq.com/ttl.html#per-message-ttl-caveats),\nthis leads to a congestion of the retry queue that is bottlenecked with an unexpired message\nat the head. As such, as of 3.3.0, PyRMQ will be using a simple periodic retry.\n\n#### Using other exchange types\nYou can use another exchange type just by simply specifying it in the Publisher class. The default is\n`direct`. \n\n```python\nfrom pyrmq import Publisher\n\nqueue_args = {\"routing.sample\": \"sample\", \"x-match\": \"all\"}\n\npublisher = Publisher(\n    exchange_name=\"exchange_name\",\n    exchange_type=\"headers\",\n    queue_args=queue_args\n)\n\nmessage_properties = {\"headers\": {\"routing.sample\": \"sample\"}}\npublisher.publish({\"pyrmq\": \"My first message\"}, message_properties=message_properties)\n```\n\nThis is an example of how to publish to a headers exchange that will get routed\nbased on its headers.\n\n#### Binding an exchange to another exchange\nBy default, the `exchange_name` you pass when initializing a `Consumer` is declared and bound to the passed\n`queue_name`. What if you want to bind and declare this exchange to another exchange as well?\n\nThis is done by using `bound_exchange`. This parameter accepts an object with two keys: `name` of your exchange and its\n`type`. Let's take a look at an example to see this in action.\n\n```py\nfrom pyrmq import Consumer\n\ndef callback(data):\n    print(f\"Received {data}!\")\n    raise Exception\n\nconsumer = Consumer(\n    exchange_name=\"direct_exchange\",\n    queue_name=\"direct_queue\",\n    routing_key=\"routing_key\",\n    bound_exchange={\"name\": \"headers_exchange_name\", \"type\": \"headers\"},\n    callback=callback,\n    is_dlk_retry_enabled=True,\n)\nconsumer.start()\n```\n\nIn the example above, we want to consume from an exchange called `direct_exchange` that is directly bound to queue\n`direct_queue`. We want `direct_exchange` to get its messages from another exchange called `headers_exchange_name` of\ntype `headers`. By using `bound_exchange`, PyRMQ declares `direct_exchange` and `direct_queue` along with any queue or\nexchange arguments you may have _first_ then declares the bound exchange next and binds them together. This is done\nto alleviate the need to declare your bound exchange manually.\n\n| :warning: Important                                                                                |\n|:---------------------------------------------------------------------------------------------------|\nSince this method uses [e2e bindings](https://www.rabbitmq.com/e2e.html), if you're using a headers exchange to bind\nyour consumer to, they _and_ your publisher must all have the same routing key to route the messages properly. This\nis not needed for exchange to queue bindings as the routing key is optional for those.\n\n## Documentation\nVisit https://pyrmq.readthedocs.io for the most up-to-date documentation.\n\n\n## Testing\nFor development, just run:\n```shell script\npytest\n```\nTo test for all the supported Python versions:\n```shell script\npip install tox\ntox\n```\nTo test for a specific Python version:\n```shell script\ntox -e py38\n```\n\n\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Python with RabbitMQ\u2014simplified so you won't have to.",
    "version": "3.4.4",
    "project_urls": {
        "Code": "https://github.com/first-digital-finance/pyrmq",
        "Documentation": "https://pyrmq.readthedocs.io",
        "Homepage": "https://pyrmq.readthedocs.io",
        "Issue tracker": "https://github.com/first-digital-finance/pyrmq/issues"
    },
    "split_keywords": [
        "rabbitmq",
        "pika",
        "consumer",
        "publisher",
        "queue",
        "messages"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "04f811adbfee59c221ef34572296760a1721d76e3dd38058f83aa197ce13e872",
                "md5": "965dd584a370ccf13a1d0fa2da81b5a4",
                "sha256": "9ae99ac3e244c2e265c0ee5157000a460d20cd718849e129c43237867adeb9f5"
            },
            "downloads": -1,
            "filename": "PyRMQ-3.4.4-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "965dd584a370ccf13a1d0fa2da81b5a4",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 17317,
            "upload_time": "2023-11-03T03:23:13",
            "upload_time_iso_8601": "2023-11-03T03:23:13.936340Z",
            "url": "https://files.pythonhosted.org/packages/04/f8/11adbfee59c221ef34572296760a1721d76e3dd38058f83aa197ce13e872/PyRMQ-3.4.4-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "268f80740de737f12793bdec901b8462ac19720f21f963aacde2af92ff1b37be",
                "md5": "924392ed9f349d5879d56e62708590d5",
                "sha256": "48ca8f54e795ff76852c6bbad587c3d9c05c2d0575c7229e5d42856ceee3c655"
            },
            "downloads": -1,
            "filename": "PyRMQ-3.4.4.tar.gz",
            "has_sig": false,
            "md5_digest": "924392ed9f349d5879d56e62708590d5",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 14019,
            "upload_time": "2023-11-03T03:23:15",
            "upload_time_iso_8601": "2023-11-03T03:23:15.582421Z",
            "url": "https://files.pythonhosted.org/packages/26/8f/80740de737f12793bdec901b8462ac19720f21f963aacde2af92ff1b37be/PyRMQ-3.4.4.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-11-03 03:23:15",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "first-digital-finance",
    "github_project": "pyrmq",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "tox": true,
    "lcname": "pyrmq"
}
        
Elapsed time: 0.16904s