taskiq-aio-sqs


Nametaskiq-aio-sqs JSON
Version 0.2.0 PyPI version JSON
download
home_pageNone
SummarySQS/S3 Broker for TaskIQ using Aiobotocore
upload_time2024-09-17 23:17:44
maintainerNone
docs_urlNone
authorNone
requires_python>=3.10
licenseMIT License Copyright (c) 2022-2024 Jesse Constante Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
keywords taskiq broker aws sqs s3 aiobotocore taskiq-broker queue
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # TaskIQ SQS/S3 aiobotocore

[![PyPI](https://img.shields.io/pypi/v/taskiq-aio-sqs)](https://pypi.org/project/taskiq-aio-sqs/)
[![Python Versions](https://img.shields.io/pypi/pyversions/taskiq-aio-sqs)](https://pypi.org/project/taskiq-aio-sqs/)
[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff)
[![pre-commit](https://img.shields.io/badge/pre--commit-enabled-brightgreen?logo=pre-commit&logoColor=white)](https://github.com/pre-commit/pre-commit)
[![Coverage Status](./coverage-badge.svg?dummy=8484744)](./coverage.xml)

This library provides you with a fully asynchronous SQS broker and S3 backend for TaskIQ using aiobotocore.
Inspired by the [taskiq-sqs](https://github.com/ApeWorX/taskiq-sqs) broker.

Besides the SQS broker, this library also provides an S3 backend for the results, this is useful when the results are too large for SQS.
Addidionally, the broker itself can be configured to use S3 + SQS for messages that are too large for SQS,
replicating the behaviour of the [Amazon Extended Client Library](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-managing-large-messages.html).

## Installation

```bash
pip install taskiq-aio-sqs
```

## General Usage:
Here is an example of how to use the SQS broker with the S3 backend:

```python
# broker.py
import asyncio
from taskiq_aio_sqs import SQSBroker, S3Backend

s3_result_backend = S3Backend(
    endpoint_url="http://localhost:4566",
    bucket_name="response-bucket",  # bucket must exist
)

broker = SQSBroker(
    endpoint_url="http://localhost:4566",
    result_backend=s3_result_backend,
    sqs_queue_name="my-queue",
)


@broker.task
async def i_love_aws() -> None:
    """I hope my cloud bill doesn't get too high!"""
    await asyncio.sleep(5.5)
    print("Hello there!")


async def main():
    task = await i_love_aws.kiq()
    print(await task.wait_result())


if __name__ == "__main__":
    asyncio.run(main())

```
### Delayed Tasks:

Delayed tasks can be created in 3 ways:
 - by using the `delay` parameter in the task decorator
 - by using the kicker with the `delay` label
 - by setting the `delay_seconds` parameter in the broker, which will apply to all tasks processed by the broker.

Here's an example of how to use delayed tasks:

```python
broker = SQSBroker(
    endpoint_url="http://localhost:4566",
    delay_seconds=3,
    sqs_queue_name="my-queue",
)

@broker.task()
async def general_task() -> int:
    return 1

@broker.task(delay=7)
async def delayed_task() -> int:
    return 1

async def main():
    await broker.startup()
    # This message will be received by workers after 3 seconds
    # delay using the delay_seconds parameter in the broker init.
    await general_task.kiq()

    # This message will be received by workers after 7 seconds delay.
    await delayed_task.kiq()

    # This message is going to be received after the delay in 4 seconds.
    # Since we overriden the `delay` label using kicker.
    await delayed_task.kicker().with_labels(delay=4).kiq()

```

### Extended Messages with S3:

You can also use S3 to store messages that are too large for SQS. To do this, you need to set the `s3_extended_bucket_name` parameter in the broker configuration.

Here's an example of this behaviour:
```python
pub_broker = SQSBroker(
    endpoint_url="http://localhost:4566",
    sqs_queue_name="my-queue",
    s3_extended_bucket_name="response-bucket",
)

sub_broker = SQSBroker(
    endpoint_url="http://localhost:4566",
    s3_extended_bucket_name="response-bucket",
)

LARGE_MESSAGE = b"x" * (256 * 1024 + 1)  # 256 KB is the limit for SQS

@pub_broker.task()
async def large_task() -> bytes:
    return LARGE_MESSAGE


async def main():
    await pub_broker.startup()
    await sub_broker.startup()
    # This message will store data in S3 and send a reference to SQS
    # This reference will include the S3 bucket and key.
    await large_task.kiq()

    async for msg in sub_broker.listen():
        message = msg
        break  # Stop after receiving one message

    # The message will be automatically retrieved from S3
    # and the full data will be available in the message.
    assert message.data == LARGE_MESSAGE


```

## Configuration:

SQS Broker parameters:
* `endpoint_url` - url to access sqs, this is particularly useful if running on ECS.
* `sqs_queue_name` - name of the sqs queue.
* `region_name` - region name, defaults to `us-east-1`.
* `aws_access_key_id` - aws access key id (Optional).
* `aws_secret_access_key` - aws secret access key (Optional).
* `use_task_id_for_deduplication` - use task_id for deduplication, this is useful when using a Fifo queue without content based deduplication, defaults to False.
* `wait_time_seconds` - wait time in seconds for long polling, defaults to 0.
* `max_number_of_messages` - maximum number of messages to receive, defaults to 1 (max 10).
* `s3_extended_bucket_name` - extended bucket name for the s3 objects,
  adding this will allow the broker to kick messages that are too large for SQS by using S3 as well,
  by default the listen function handles this behaviour, defaults to None.
* `task_id_generator` - custom task_id generator (Optional).
* `result_backend` - custom result backend (Optional).


S3 Result Backend parameters:
* `bucket_name` - name of the s3 bucket.
* `base_path` - base path for the s3 objects, defaults to "".
* `endpoint_url` - url to access s3, this is particularly useful if running on ECS.
* `region_name` - region name, defaults to `us-east-1`.
* `aws_access_key_id` - aws access key id (Optional).
* `aws_secret_access_key` - aws secret access key (Optional).
* `serializer` - custom serializer, defaults to `OrjsonSerializer`.

# Local Development:
We use make to handle the commands for the project, you can see the available commands by running this in the root directory:
```bash
make
```

## Setup
To setup the project, you can run the following commands:
```bash
make install
```
This will install the required dependencies for the project just using pip.

## Linting
We use pre-commit to do linting locally, this will be included in the dev dependencies.
We use ruff for linting and formatting, and pyright for static type checking.
To install the pre-commit hooks, you can run the following command:
```bash
pre-commit install
```
If you for some reason hate pre-commit, you can run the following command to lint the code:
```bash
make check
```

## Testing
To run tests, you can use the following command:
```bash
make test
```
In the background this will setup localstack to replicate the AWS services, and run the tests.
It will also generate the coverage report and the badge.

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "taskiq-aio-sqs",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.10",
    "maintainer_email": null,
    "keywords": "taskiq, broker, aws, sqs, s3, aiobotocore, taskiq-broker, queue",
    "author": null,
    "author_email": null,
    "download_url": "https://files.pythonhosted.org/packages/3a/56/412f566e101fc8f3a3fcdf0f0a90b98f1c06c81121422c235f4a28f43e2c/taskiq_aio_sqs-0.2.0.tar.gz",
    "platform": null,
    "description": "# TaskIQ SQS/S3 aiobotocore\n\n[![PyPI](https://img.shields.io/pypi/v/taskiq-aio-sqs)](https://pypi.org/project/taskiq-aio-sqs/)\n[![Python Versions](https://img.shields.io/pypi/pyversions/taskiq-aio-sqs)](https://pypi.org/project/taskiq-aio-sqs/)\n[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff)\n[![pre-commit](https://img.shields.io/badge/pre--commit-enabled-brightgreen?logo=pre-commit&logoColor=white)](https://github.com/pre-commit/pre-commit)\n[![Coverage Status](./coverage-badge.svg?dummy=8484744)](./coverage.xml)\n\nThis library provides you with a fully asynchronous SQS broker and S3 backend for TaskIQ using aiobotocore.\nInspired by the [taskiq-sqs](https://github.com/ApeWorX/taskiq-sqs) broker.\n\nBesides the SQS broker, this library also provides an S3 backend for the results, this is useful when the results are too large for SQS.\nAddidionally, the broker itself can be configured to use S3 + SQS for messages that are too large for SQS,\nreplicating the behaviour of the [Amazon Extended Client Library](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-managing-large-messages.html).\n\n## Installation\n\n```bash\npip install taskiq-aio-sqs\n```\n\n## General Usage:\nHere is an example of how to use the SQS broker with the S3 backend:\n\n```python\n# broker.py\nimport asyncio\nfrom taskiq_aio_sqs import SQSBroker, S3Backend\n\ns3_result_backend = S3Backend(\n    endpoint_url=\"http://localhost:4566\",\n    bucket_name=\"response-bucket\",  # bucket must exist\n)\n\nbroker = SQSBroker(\n    endpoint_url=\"http://localhost:4566\",\n    result_backend=s3_result_backend,\n    sqs_queue_name=\"my-queue\",\n)\n\n\n@broker.task\nasync def i_love_aws() -> None:\n    \"\"\"I hope my cloud bill doesn't get too high!\"\"\"\n    await asyncio.sleep(5.5)\n    print(\"Hello there!\")\n\n\nasync def main():\n    task = await i_love_aws.kiq()\n    print(await task.wait_result())\n\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n\n```\n### Delayed Tasks:\n\nDelayed tasks can be created in 3 ways:\n - by using the `delay` parameter in the task decorator\n - by using the kicker with the `delay` label\n - by setting the `delay_seconds` parameter in the broker, which will apply to all tasks processed by the broker.\n\nHere's an example of how to use delayed tasks:\n\n```python\nbroker = SQSBroker(\n    endpoint_url=\"http://localhost:4566\",\n    delay_seconds=3,\n    sqs_queue_name=\"my-queue\",\n)\n\n@broker.task()\nasync def general_task() -> int:\n    return 1\n\n@broker.task(delay=7)\nasync def delayed_task() -> int:\n    return 1\n\nasync def main():\n    await broker.startup()\n    # This message will be received by workers after 3 seconds\n    # delay using the delay_seconds parameter in the broker init.\n    await general_task.kiq()\n\n    # This message will be received by workers after 7 seconds delay.\n    await delayed_task.kiq()\n\n    # This message is going to be received after the delay in 4 seconds.\n    # Since we overriden the `delay` label using kicker.\n    await delayed_task.kicker().with_labels(delay=4).kiq()\n\n```\n\n### Extended Messages with S3:\n\nYou can also use S3 to store messages that are too large for SQS. To do this, you need to set the `s3_extended_bucket_name` parameter in the broker configuration.\n\nHere's an example of this behaviour:\n```python\npub_broker = SQSBroker(\n    endpoint_url=\"http://localhost:4566\",\n    sqs_queue_name=\"my-queue\",\n    s3_extended_bucket_name=\"response-bucket\",\n)\n\nsub_broker = SQSBroker(\n    endpoint_url=\"http://localhost:4566\",\n    s3_extended_bucket_name=\"response-bucket\",\n)\n\nLARGE_MESSAGE = b\"x\" * (256 * 1024 + 1)  # 256 KB is the limit for SQS\n\n@pub_broker.task()\nasync def large_task() -> bytes:\n    return LARGE_MESSAGE\n\n\nasync def main():\n    await pub_broker.startup()\n    await sub_broker.startup()\n    # This message will store data in S3 and send a reference to SQS\n    # This reference will include the S3 bucket and key.\n    await large_task.kiq()\n\n    async for msg in sub_broker.listen():\n        message = msg\n        break  # Stop after receiving one message\n\n    # The message will be automatically retrieved from S3\n    # and the full data will be available in the message.\n    assert message.data == LARGE_MESSAGE\n\n\n```\n\n## Configuration:\n\nSQS Broker parameters:\n* `endpoint_url` - url to access sqs, this is particularly useful if running on ECS.\n* `sqs_queue_name` - name of the sqs queue.\n* `region_name` - region name, defaults to `us-east-1`.\n* `aws_access_key_id` - aws access key id (Optional).\n* `aws_secret_access_key` - aws secret access key (Optional).\n* `use_task_id_for_deduplication` - use task_id for deduplication, this is useful when using a Fifo queue without content based deduplication, defaults to False.\n* `wait_time_seconds` - wait time in seconds for long polling, defaults to 0.\n* `max_number_of_messages` - maximum number of messages to receive, defaults to 1 (max 10).\n* `s3_extended_bucket_name` - extended bucket name for the s3 objects,\n  adding this will allow the broker to kick messages that are too large for SQS by using S3 as well,\n  by default the listen function handles this behaviour, defaults to None.\n* `task_id_generator` - custom task_id generator (Optional).\n* `result_backend` - custom result backend (Optional).\n\n\nS3 Result Backend parameters:\n* `bucket_name` - name of the s3 bucket.\n* `base_path` - base path for the s3 objects, defaults to \"\".\n* `endpoint_url` - url to access s3, this is particularly useful if running on ECS.\n* `region_name` - region name, defaults to `us-east-1`.\n* `aws_access_key_id` - aws access key id (Optional).\n* `aws_secret_access_key` - aws secret access key (Optional).\n* `serializer` - custom serializer, defaults to `OrjsonSerializer`.\n\n# Local Development:\nWe use make to handle the commands for the project, you can see the available commands by running this in the root directory:\n```bash\nmake\n```\n\n## Setup\nTo setup the project, you can run the following commands:\n```bash\nmake install\n```\nThis will install the required dependencies for the project just using pip.\n\n## Linting\nWe use pre-commit to do linting locally, this will be included in the dev dependencies.\nWe use ruff for linting and formatting, and pyright for static type checking.\nTo install the pre-commit hooks, you can run the following command:\n```bash\npre-commit install\n```\nIf you for some reason hate pre-commit, you can run the following command to lint the code:\n```bash\nmake check\n```\n\n## Testing\nTo run tests, you can use the following command:\n```bash\nmake test\n```\nIn the background this will setup localstack to replicate the AWS services, and run the tests.\nIt will also generate the coverage report and the badge.\n",
    "bugtrack_url": null,
    "license": "MIT License  Copyright (c) 2022-2024 Jesse Constante  Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the \"Software\"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:  The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.  THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ",
    "summary": "SQS/S3 Broker for TaskIQ using Aiobotocore",
    "version": "0.2.0",
    "project_urls": {
        "source": "https://github.com/vonsteer/taskiq-aio-sqs"
    },
    "split_keywords": [
        "taskiq",
        " broker",
        " aws",
        " sqs",
        " s3",
        " aiobotocore",
        " taskiq-broker",
        " queue"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "5fb370b1d7aecd98f2be375581b454b72cd27175bbdc364e0e390f76143458bf",
                "md5": "c77b55bb029f44633478b98257c1554e",
                "sha256": "7223c498ff69745abf8c0b5a0c507628150284a71f8f5c61ef370126773a0408"
            },
            "downloads": -1,
            "filename": "taskiq_aio_sqs-0.2.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "c77b55bb029f44633478b98257c1554e",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.10",
            "size": 11233,
            "upload_time": "2024-09-17T23:17:43",
            "upload_time_iso_8601": "2024-09-17T23:17:43.413548Z",
            "url": "https://files.pythonhosted.org/packages/5f/b3/70b1d7aecd98f2be375581b454b72cd27175bbdc364e0e390f76143458bf/taskiq_aio_sqs-0.2.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "3a56412f566e101fc8f3a3fcdf0f0a90b98f1c06c81121422c235f4a28f43e2c",
                "md5": "ef91321798db26c0517d4d0f60e09fdb",
                "sha256": "6299d593adfd58ee54cef85419d37c9ea183d8a9925444348f0d374374656c1b"
            },
            "downloads": -1,
            "filename": "taskiq_aio_sqs-0.2.0.tar.gz",
            "has_sig": false,
            "md5_digest": "ef91321798db26c0517d4d0f60e09fdb",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.10",
            "size": 15419,
            "upload_time": "2024-09-17T23:17:44",
            "upload_time_iso_8601": "2024-09-17T23:17:44.335054Z",
            "url": "https://files.pythonhosted.org/packages/3a/56/412f566e101fc8f3a3fcdf0f0a90b98f1c06c81121422c235f4a28f43e2c/taskiq_aio_sqs-0.2.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-09-17 23:17:44",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "vonsteer",
    "github_project": "taskiq-aio-sqs",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "taskiq-aio-sqs"
}
        
Elapsed time: 0.33814s