taskiq-aio-pika


Nametaskiq-aio-pika JSON
Version 0.4.0 PyPI version JSON
download
home_pagehttps://github.com/taskiq-python/taskiq-aio-pika
SummaryRabbitMQ broker for taskiq
upload_time2023-06-11 13:22:56
maintainer
docs_urlNone
authorPavel Kirilin
requires_python>=3.8.1,<4.0.0
license
keywords taskiq tasks distributed async aio-pika
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # AioPika broker for taskiq

This lirary provides you with aio-pika broker for taskiq.

Usage:
```python
from taskiq_aio_pika import AioPikaBroker

broker = AioPikaBroker()

@broker.task
async def test() -> None:
    print("nothing")

```

## Non-obvious things

You can send delayed messages and set priorities to messages using labels.

## Delays

### **Default retries**

To send delayed message, you have to specify
delay label. You can do it with `task` decorator,
or by using kicker.  
In this type of delay we are using additional queue with `expiration` parameter and after with time message will be deleted from `delay` queue and sent to the main taskiq queue.
For example:

```python
broker = AioPikaBroker()

@broker.task(delay=3)
async def delayed_task() -> int:
    return 1

async def main():
    await broker.startup()
    # This message will be received by workers
    # After 3 seconds delay.
    await delayed_task.kiq()

    # This message is going to be received after the delay in 4 seconds.
    # Since we overriden the `delay` label using kicker.
    await delayed_task.kicker().with_labels(delay=4).kiq()

    # This message is going to be send immediately. Since we deleted the label.
    await delayed_task.kicker().with_labels(delay=None).kiq()

    # Of course the delay is managed by rabbitmq, so you don't
    # have to wait delay period before message is going to be sent.
```

### **Retries with `rabbitmq-delayed-message-exchange` plugin**

To send delayed message you can install `rabbitmq-delayed-message-exchange`
plugin https://github.com/rabbitmq/rabbitmq-delayed-message-exchange.

And you need to configure you broker.
There is `delayed_message_exchange_plugin` `AioPikaBroker` parameter and it must be `True` to turn on delayed message functionality.  

The delay plugin can handle tasks with different delay times well, and the delay based on dead letter queue is suitable for tasks with the same delay time.  
For example:

```python
broker = AioPikaBroker(
    delayed_message_exchange_plugin=True,
)

@broker.task(delay=3)
async def delayed_task() -> int:
    return 1

async def main():
    await broker.startup()
    # This message will be received by workers
    # After 3 seconds delay.
    await delayed_task.kiq()

    # This message is going to be received after the delay in 4 seconds.
    # Since we overriden the `delay` label using kicker.
    await delayed_task.kicker().with_labels(delay=4).kiq()
```

## Priorities

You can define priorities for messages using `priority` label.
Messages with higher priorities are delivered faster.
But to use priorities you need to define `max_priority` of the main queue, by passing `max_priority` parameter in broker's init.
This parameter sets maximum priority for the queue and
declares it as the prority queue.

Before doing so please read the [documentation](https://www.rabbitmq.com/priority.html#behaviour) about what
downsides you get by using prioritized queues.


```python
broker = AioPikaBroker(max_priority=10)

# We can define default priority for tasks.
@broker.task(priority=2)
async def prio_task() -> int:
    return 1

async def main():
    await broker.startup()
    # This message has priority = 2.
    await prio_task.kiq()

    # This message is going to have priority 4.
    await prio_task.kicker().with_labels(priority=4).kiq()

    # This message is going to have priority 0.
    await prio_task.kicker().with_labels(priority=None).kiq()

```

## Configuration

AioPikaBroker parameters:
* `url` - url to rabbitmq. If None, "amqp://guest:guest@localhost:5672" is used.
* `result_backend` - custom result backend.
* `task_id_generator` - custom task_id genertaor.
* `exchange_name` - name of exchange that used to send messages.
* `exchange_type` - type of the exchange. Used only if `declare_exchange` is True.
* `queue_name` - queue that used to get incoming messages.
* `routing_key` - that used to bind that queue to the exchange.
* `declare_exchange` - whether you want to declare new exchange if it doesn't exist.
* `max_priority` - maximum priority for messages.
* `delay_queue_name` - custom delay queue name.
    This queue is used to deliver messages with delays.
* `dead_letter_queue_name` - custom dead letter queue name.
    This queue is used to receive negatively acknowleged messages from the main queue.
* `qos` - number of messages that worker can prefetch.
* `declare_queues` - whether you want to declare queues even on
    client side. May be useful for message persistance.

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/taskiq-python/taskiq-aio-pika",
    "name": "taskiq-aio-pika",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.8.1,<4.0.0",
    "maintainer_email": "",
    "keywords": "taskiq,tasks,distributed,async,aio-pika",
    "author": "Pavel Kirilin",
    "author_email": "win10@list.ru",
    "download_url": "https://files.pythonhosted.org/packages/42/e6/345ed478da3579f62aa45e45d2965609cd2bd9c808d828c8636f18f90c0f/taskiq_aio_pika-0.4.0.tar.gz",
    "platform": null,
    "description": "# AioPika broker for taskiq\n\nThis lirary provides you with aio-pika broker for taskiq.\n\nUsage:\n```python\nfrom taskiq_aio_pika import AioPikaBroker\n\nbroker = AioPikaBroker()\n\n@broker.task\nasync def test() -> None:\n    print(\"nothing\")\n\n```\n\n## Non-obvious things\n\nYou can send delayed messages and set priorities to messages using labels.\n\n## Delays\n\n### **Default retries**\n\nTo send delayed message, you have to specify\ndelay label. You can do it with `task` decorator,\nor by using kicker.  \nIn this type of delay we are using additional queue with `expiration` parameter and after with time message will be deleted from `delay` queue and sent to the main taskiq queue.\nFor example:\n\n```python\nbroker = AioPikaBroker()\n\n@broker.task(delay=3)\nasync def delayed_task() -> int:\n    return 1\n\nasync def main():\n    await broker.startup()\n    # This message will be received by workers\n    # After 3 seconds delay.\n    await delayed_task.kiq()\n\n    # This message is going to be received after the delay in 4 seconds.\n    # Since we overriden the `delay` label using kicker.\n    await delayed_task.kicker().with_labels(delay=4).kiq()\n\n    # This message is going to be send immediately. Since we deleted the label.\n    await delayed_task.kicker().with_labels(delay=None).kiq()\n\n    # Of course the delay is managed by rabbitmq, so you don't\n    # have to wait delay period before message is going to be sent.\n```\n\n### **Retries with `rabbitmq-delayed-message-exchange` plugin**\n\nTo send delayed message you can install `rabbitmq-delayed-message-exchange`\nplugin https://github.com/rabbitmq/rabbitmq-delayed-message-exchange.\n\nAnd you need to configure you broker.\nThere is `delayed_message_exchange_plugin` `AioPikaBroker` parameter and it must be `True` to turn on delayed message functionality.  \n\nThe delay plugin can handle tasks with different delay times well, and the delay based on dead letter queue is suitable for tasks with the same delay time.  \nFor example:\n\n```python\nbroker = AioPikaBroker(\n    delayed_message_exchange_plugin=True,\n)\n\n@broker.task(delay=3)\nasync def delayed_task() -> int:\n    return 1\n\nasync def main():\n    await broker.startup()\n    # This message will be received by workers\n    # After 3 seconds delay.\n    await delayed_task.kiq()\n\n    # This message is going to be received after the delay in 4 seconds.\n    # Since we overriden the `delay` label using kicker.\n    await delayed_task.kicker().with_labels(delay=4).kiq()\n```\n\n## Priorities\n\nYou can define priorities for messages using `priority` label.\nMessages with higher priorities are delivered faster.\nBut to use priorities you need to define `max_priority` of the main queue, by passing `max_priority` parameter in broker's init.\nThis parameter sets maximum priority for the queue and\ndeclares it as the prority queue.\n\nBefore doing so please read the [documentation](https://www.rabbitmq.com/priority.html#behaviour) about what\ndownsides you get by using prioritized queues.\n\n\n```python\nbroker = AioPikaBroker(max_priority=10)\n\n# We can define default priority for tasks.\n@broker.task(priority=2)\nasync def prio_task() -> int:\n    return 1\n\nasync def main():\n    await broker.startup()\n    # This message has priority = 2.\n    await prio_task.kiq()\n\n    # This message is going to have priority 4.\n    await prio_task.kicker().with_labels(priority=4).kiq()\n\n    # This message is going to have priority 0.\n    await prio_task.kicker().with_labels(priority=None).kiq()\n\n```\n\n## Configuration\n\nAioPikaBroker parameters:\n* `url` - url to rabbitmq. If None, \"amqp://guest:guest@localhost:5672\" is used.\n* `result_backend` - custom result backend.\n* `task_id_generator` - custom task_id genertaor.\n* `exchange_name` - name of exchange that used to send messages.\n* `exchange_type` - type of the exchange. Used only if `declare_exchange` is True.\n* `queue_name` - queue that used to get incoming messages.\n* `routing_key` - that used to bind that queue to the exchange.\n* `declare_exchange` - whether you want to declare new exchange if it doesn't exist.\n* `max_priority` - maximum priority for messages.\n* `delay_queue_name` - custom delay queue name.\n    This queue is used to deliver messages with delays.\n* `dead_letter_queue_name` - custom dead letter queue name.\n    This queue is used to receive negatively acknowleged messages from the main queue.\n* `qos` - number of messages that worker can prefetch.\n* `declare_queues` - whether you want to declare queues even on\n    client side. May be useful for message persistance.\n",
    "bugtrack_url": null,
    "license": "",
    "summary": "RabbitMQ broker for taskiq",
    "version": "0.4.0",
    "project_urls": {
        "Homepage": "https://github.com/taskiq-python/taskiq-aio-pika",
        "Repository": "https://github.com/taskiq-python/taskiq-aio-pika"
    },
    "split_keywords": [
        "taskiq",
        "tasks",
        "distributed",
        "async",
        "aio-pika"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "c8dbe0a74ced00786d4af7dda150cf0e972fd117eeef25069e846c7f6d31c1e6",
                "md5": "1fd2e6f1c817b68f18c7f5620d393937",
                "sha256": "3d96acf8ee5d9170bdc3f726b7a2f9d29f58e0fbc760fb13a50f40eeaa1368a3"
            },
            "downloads": -1,
            "filename": "taskiq_aio_pika-0.4.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "1fd2e6f1c817b68f18c7f5620d393937",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8.1,<4.0.0",
            "size": 5786,
            "upload_time": "2023-06-11T13:22:55",
            "upload_time_iso_8601": "2023-06-11T13:22:55.257698Z",
            "url": "https://files.pythonhosted.org/packages/c8/db/e0a74ced00786d4af7dda150cf0e972fd117eeef25069e846c7f6d31c1e6/taskiq_aio_pika-0.4.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "42e6345ed478da3579f62aa45e45d2965609cd2bd9c808d828c8636f18f90c0f",
                "md5": "a1e8a41a97796a679e027e62bc45bc9f",
                "sha256": "9295e911ad2c808e10571adee262dcfe51344a2aebba0fbc89249e666bbe44a1"
            },
            "downloads": -1,
            "filename": "taskiq_aio_pika-0.4.0.tar.gz",
            "has_sig": false,
            "md5_digest": "a1e8a41a97796a679e027e62bc45bc9f",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8.1,<4.0.0",
            "size": 5324,
            "upload_time": "2023-06-11T13:22:56",
            "upload_time_iso_8601": "2023-06-11T13:22:56.786666Z",
            "url": "https://files.pythonhosted.org/packages/42/e6/345ed478da3579f62aa45e45d2965609cd2bd9c808d828c8636f18f90c0f/taskiq_aio_pika-0.4.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-06-11 13:22:56",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "taskiq-python",
    "github_project": "taskiq-aio-pika",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "taskiq-aio-pika"
}
        
Elapsed time: 0.07874s