# sqsx
[![Tests](https://github.com/allisson/pysqsx/actions/workflows/tests.yml/badge.svg?branch=main)](https://github.com/allisson/pysqsx/actions/workflows/tests.yml)
![PyPI - Version](https://img.shields.io/pypi/v/sqsx)
![PyPI - Python Version](https://img.shields.io/pypi/pyversions/sqsx)
![GitHub License](https://img.shields.io/github/license/allisson/pysqsx)
A simple task processor for Amazon SQS.
## Quickstart
For this demonstration we will use elasticmq locally using docker:
```bash
docker run --name pysqsx-elasticmq -p 9324:9324 -d softwaremill/elasticmq-native
```
Install the package:
```bash
pip install sqsx
```
### Working with sqsx.Queue
We use sqsx.Queue when we need to work with scheduling and consuming tasks.
Now let's create a script that will create a new task and we will consume them:
```python
# file script.py
import logging
import boto3
from sqsx import Queue
# configure the logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('botocore').setLevel(logging.CRITICAL)
logging.getLogger('urllib3').setLevel(logging.CRITICAL)
# create the sqs_client
queue_url = "http://localhost:9324/000000000000/tests"
queue_name = "tests"
sqs_client = boto3.client(
"sqs",
endpoint_url="http://localhost:9324",
region_name="elasticmq",
aws_secret_access_key="x",
aws_access_key_id="x",
use_ssl=False,
)
# create the new sqs queue
sqs_client.create_queue(QueueName=queue_name)
# create the sqsx.Queue
queue = Queue(url=queue_url, sqs_client=sqs_client)
# add a new task
queue.add_task("my_task", a=1, b=2, c=3)
# create the task handler, which must be a simple function like this
def task_handler(context: dict, a: int, b: int, c: int):
print(f"context={context}, a={a}, b={b}, c={c}")
# add a new task handler
queue.add_task_handler("my_task", task_handler)
# start the consumption of messages, to stop press ctrl+c to exit gracefully
queue.consume_messages()
```
Running the script:
```bash
python script.py
INFO:sqsx.queue:Starting consuming tasks, queue_url=http://localhost:9324/000000000000/tests
context={'queue_url': 'http://localhost:9324/000000000000/tests', 'task_name': 'my_task', 'sqs_message': {'MessageId': '42513c2d-ac93-4701-bb63-83b45e6fe2ca', 'ReceiptHandle': '42513c2d-ac93-4701-bb63-83b45e6fe2ca#6eb5443b-a2eb-454e-8619-86f6d2e67561', 'MD5OfBody': '8087eb7436895841c5d646156a8a469f', 'Body': 'eyJrd2FyZ3MiOiB7ImEiOiAxLCAiYiI6IDIsICJjIjogM319', 'Attributes': {'SentTimestamp': '1702573178736', 'ApproximateReceiveCount': '1', 'ApproximateFirstReceiveTimestamp': '1702573178740', 'SenderId': '127.0.0.1'}, 'MD5OfMessageAttributes': '5346f2cd7c539a880febaf9112a86921', 'MessageAttributes': {'TaskName': {'StringValue': 'my_task', 'DataType': 'String'}}}}, a=1, b=2, c=3
DEBUG:sqsx.queue:Waiting some seconds because no message was received, seconds=10, queue_url=http://localhost:9324/000000000000/tests
DEBUG:sqsx.queue:Waiting some seconds because no message was received, seconds=10, queue_url=http://localhost:9324/000000000000/tests
^CINFO:sqsx.queue:Starting graceful shutdown process
INFO:sqsx.queue:Stopping consuming tasks, queue_url=http://localhost:9324/000000000000/tests
```
### Working with sqsx.RawQueue
We use sqsx.RawQueue when we need to work with one handler consuming all the queue messages.
Now let's create a script that will create a new message and we will consume them:
```python
# file raw_script.py
import logging
import boto3
from sqsx import RawQueue
# configure the logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('botocore').setLevel(logging.CRITICAL)
logging.getLogger('urllib3').setLevel(logging.CRITICAL)
# create the sqs_client
queue_url = "http://localhost:9324/000000000000/tests"
queue_name = "tests"
sqs_client = boto3.client(
"sqs",
endpoint_url="http://localhost:9324",
region_name="elasticmq",
aws_secret_access_key="x",
aws_access_key_id="x",
use_ssl=False,
)
# create the new sqs queue
sqs_client.create_queue(QueueName=queue_name)
# create the message handler, which must be a simple function like this
def message_handler(queue_url: str, sqs_message: dict):
print(f"queue_url={queue_url}, sqs_message={sqs_message}")
# create the sqsx.Queue
queue = RawQueue(url=queue_url, message_handler_function=message_handler, sqs_client=sqs_client)
# add a new message
queue.add_message(
message_body="My Message",
message_attributes={"Attr1": {"DataType": "String", "StringValue": "Attr1"}},
)
# start the consumption of messages, to stop press ctrl+c to exit gracefully
queue.consume_messages()
```
Running the script:
```bash
INFO:sqsx.queue:Starting consuming tasks, queue_url=http://localhost:9324/000000000000/tests
queue_url=http://localhost:9324/000000000000/tests, sqs_message={'MessageId': 'fb2ed6cf-9346-4ded-8cfe-4fc297f95928', 'ReceiptHandle': 'fb2ed6cf-9346-4ded-8cfe-4fc297f95928#bd9f27a6-0a73-4d27-9c1e-0947f21d3c02', 'MD5OfBody': '069840f6917e85a02167febb964f0041', 'Body': 'My Message', 'Attributes': {'SentTimestamp': '1702573585302', 'ApproximateReceiveCount': '1', 'ApproximateFirstReceiveTimestamp': '1702573585306', 'SenderId': '127.0.0.1'}, 'MD5OfMessageAttributes': '90f34a800b9d242c1b32320e4a3ed630', 'MessageAttributes': {'Attr1': {'StringValue': 'Attr1', 'DataType': 'String'}}}
DEBUG:sqsx.queue:Waiting some seconds because no message was received, seconds=10, queue_url=http://localhost:9324/000000000000/tests
DEBUG:sqsx.queue:Waiting some seconds because no message was received, seconds=10, queue_url=http://localhost:9324/000000000000/tests
^CINFO:sqsx.queue:Starting graceful shutdown process
INFO:sqsx.queue:Stopping consuming tasks, queue_url=http://localhost:9324/000000000000/tests
```
### Working with exceptions
The default behavior is to retry the message when an exception is raised, you can change this behavior using the exceptions sqsx.exceptions.Retry and sqsx.exceptions.NoRetry.
If you want to change the backoff policy, use the sqsx.exceptions.Retry like this:
```python
from sqsx.exceptions import Retry
# to use with sqsx.Queue and change the default backoff policy
def task_handler(context: dict, a: int, b: int, c: int):
raise Retry(min_backoff_seconds=100, max_backoff_seconds=200)
# to use with sqsx.RawQueue and change the default backoff policy
def message_handler(queue_url: str, sqs_message: dict):
raise Retry(min_backoff_seconds=100, max_backoff_seconds=200)
```
If you want to remove the task or message from the queue use the sqsx.exceptions.NoRetry like this:
```python
from sqsx.exceptions import NoRetry
# to use with sqsx.Queue and remove the task
def task_handler(context: dict, a: int, b: int, c: int):
raise NoRetry()
# to use with sqsx.RawQueue and remove the message
def message_handler(queue_url: str, sqs_message: dict):
raise NoRetry()
```
Raw data
{
"_id": null,
"home_page": "https://github.com/allisson/pysqsx",
"name": "sqsx",
"maintainer": null,
"docs_url": null,
"requires_python": null,
"maintainer_email": null,
"keywords": "aws, sqs",
"author": "Allisson Azevedo",
"author_email": "allisson@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/79/40/93df1c663379733deb7ac0c3a5c2ec4a13488fe731073018b9c8f15ce7a2/sqsx-0.5.1.tar.gz",
"platform": null,
"description": "# sqsx\n[![Tests](https://github.com/allisson/pysqsx/actions/workflows/tests.yml/badge.svg?branch=main)](https://github.com/allisson/pysqsx/actions/workflows/tests.yml)\n![PyPI - Version](https://img.shields.io/pypi/v/sqsx)\n![PyPI - Python Version](https://img.shields.io/pypi/pyversions/sqsx)\n![GitHub License](https://img.shields.io/github/license/allisson/pysqsx)\n\nA simple task processor for Amazon SQS.\n\n## Quickstart\n\nFor this demonstration we will use elasticmq locally using docker:\n\n```bash\ndocker run --name pysqsx-elasticmq -p 9324:9324 -d softwaremill/elasticmq-native\n```\n\nInstall the package:\n\n```bash\npip install sqsx\n```\n\n### Working with sqsx.Queue\n\nWe use sqsx.Queue when we need to work with scheduling and consuming tasks.\n\nNow let's create a script that will create a new task and we will consume them:\n\n```python\n# file script.py\nimport logging\n\nimport boto3\n\nfrom sqsx import Queue\n\n# configure the logging\nlogging.basicConfig(level=logging.DEBUG)\nlogging.getLogger('botocore').setLevel(logging.CRITICAL)\nlogging.getLogger('urllib3').setLevel(logging.CRITICAL)\n\n# create the sqs_client\nqueue_url = \"http://localhost:9324/000000000000/tests\"\nqueue_name = \"tests\"\nsqs_client = boto3.client(\n \"sqs\",\n endpoint_url=\"http://localhost:9324\",\n region_name=\"elasticmq\",\n aws_secret_access_key=\"x\",\n aws_access_key_id=\"x\",\n use_ssl=False,\n)\n\n# create the new sqs queue\nsqs_client.create_queue(QueueName=queue_name)\n\n# create the sqsx.Queue\nqueue = Queue(url=queue_url, sqs_client=sqs_client)\n\n# add a new task\nqueue.add_task(\"my_task\", a=1, b=2, c=3)\n\n# create the task handler, which must be a simple function like this\ndef task_handler(context: dict, a: int, b: int, c: int):\n print(f\"context={context}, a={a}, b={b}, c={c}\")\n\n# add a new task handler\nqueue.add_task_handler(\"my_task\", task_handler)\n\n# start the consumption of messages, to stop press ctrl+c to exit gracefully\nqueue.consume_messages()\n```\n\nRunning the script:\n\n```bash\npython script.py\nINFO:sqsx.queue:Starting consuming tasks, queue_url=http://localhost:9324/000000000000/tests\ncontext={'queue_url': 'http://localhost:9324/000000000000/tests', 'task_name': 'my_task', 'sqs_message': {'MessageId': '42513c2d-ac93-4701-bb63-83b45e6fe2ca', 'ReceiptHandle': '42513c2d-ac93-4701-bb63-83b45e6fe2ca#6eb5443b-a2eb-454e-8619-86f6d2e67561', 'MD5OfBody': '8087eb7436895841c5d646156a8a469f', 'Body': 'eyJrd2FyZ3MiOiB7ImEiOiAxLCAiYiI6IDIsICJjIjogM319', 'Attributes': {'SentTimestamp': '1702573178736', 'ApproximateReceiveCount': '1', 'ApproximateFirstReceiveTimestamp': '1702573178740', 'SenderId': '127.0.0.1'}, 'MD5OfMessageAttributes': '5346f2cd7c539a880febaf9112a86921', 'MessageAttributes': {'TaskName': {'StringValue': 'my_task', 'DataType': 'String'}}}}, a=1, b=2, c=3\nDEBUG:sqsx.queue:Waiting some seconds because no message was received, seconds=10, queue_url=http://localhost:9324/000000000000/tests\nDEBUG:sqsx.queue:Waiting some seconds because no message was received, seconds=10, queue_url=http://localhost:9324/000000000000/tests\n^CINFO:sqsx.queue:Starting graceful shutdown process\nINFO:sqsx.queue:Stopping consuming tasks, queue_url=http://localhost:9324/000000000000/tests\n```\n\n### Working with sqsx.RawQueue\n\nWe use sqsx.RawQueue when we need to work with one handler consuming all the queue messages.\n\nNow let's create a script that will create a new message and we will consume them:\n\n```python\n# file raw_script.py\nimport logging\n\nimport boto3\n\nfrom sqsx import RawQueue\n\n# configure the logging\nlogging.basicConfig(level=logging.DEBUG)\nlogging.getLogger('botocore').setLevel(logging.CRITICAL)\nlogging.getLogger('urllib3').setLevel(logging.CRITICAL)\n\n# create the sqs_client\nqueue_url = \"http://localhost:9324/000000000000/tests\"\nqueue_name = \"tests\"\nsqs_client = boto3.client(\n \"sqs\",\n endpoint_url=\"http://localhost:9324\",\n region_name=\"elasticmq\",\n aws_secret_access_key=\"x\",\n aws_access_key_id=\"x\",\n use_ssl=False,\n)\n\n# create the new sqs queue\nsqs_client.create_queue(QueueName=queue_name)\n\n# create the message handler, which must be a simple function like this\ndef message_handler(queue_url: str, sqs_message: dict):\n print(f\"queue_url={queue_url}, sqs_message={sqs_message}\")\n\n# create the sqsx.Queue\nqueue = RawQueue(url=queue_url, message_handler_function=message_handler, sqs_client=sqs_client)\n\n# add a new message\nqueue.add_message(\n message_body=\"My Message\",\n message_attributes={\"Attr1\": {\"DataType\": \"String\", \"StringValue\": \"Attr1\"}},\n)\n\n# start the consumption of messages, to stop press ctrl+c to exit gracefully\nqueue.consume_messages()\n```\n\nRunning the script:\n\n```bash\nINFO:sqsx.queue:Starting consuming tasks, queue_url=http://localhost:9324/000000000000/tests\nqueue_url=http://localhost:9324/000000000000/tests, sqs_message={'MessageId': 'fb2ed6cf-9346-4ded-8cfe-4fc297f95928', 'ReceiptHandle': 'fb2ed6cf-9346-4ded-8cfe-4fc297f95928#bd9f27a6-0a73-4d27-9c1e-0947f21d3c02', 'MD5OfBody': '069840f6917e85a02167febb964f0041', 'Body': 'My Message', 'Attributes': {'SentTimestamp': '1702573585302', 'ApproximateReceiveCount': '1', 'ApproximateFirstReceiveTimestamp': '1702573585306', 'SenderId': '127.0.0.1'}, 'MD5OfMessageAttributes': '90f34a800b9d242c1b32320e4a3ed630', 'MessageAttributes': {'Attr1': {'StringValue': 'Attr1', 'DataType': 'String'}}}\nDEBUG:sqsx.queue:Waiting some seconds because no message was received, seconds=10, queue_url=http://localhost:9324/000000000000/tests\nDEBUG:sqsx.queue:Waiting some seconds because no message was received, seconds=10, queue_url=http://localhost:9324/000000000000/tests\n^CINFO:sqsx.queue:Starting graceful shutdown process\nINFO:sqsx.queue:Stopping consuming tasks, queue_url=http://localhost:9324/000000000000/tests\n```\n\n### Working with exceptions\n\nThe default behavior is to retry the message when an exception is raised, you can change this behavior using the exceptions sqsx.exceptions.Retry and sqsx.exceptions.NoRetry.\n\nIf you want to change the backoff policy, use the sqsx.exceptions.Retry like this:\n\n```python\nfrom sqsx.exceptions import Retry\n\n# to use with sqsx.Queue and change the default backoff policy\ndef task_handler(context: dict, a: int, b: int, c: int):\n raise Retry(min_backoff_seconds=100, max_backoff_seconds=200)\n\n# to use with sqsx.RawQueue and change the default backoff policy\ndef message_handler(queue_url: str, sqs_message: dict):\n raise Retry(min_backoff_seconds=100, max_backoff_seconds=200)\n```\n\nIf you want to remove the task or message from the queue use the sqsx.exceptions.NoRetry like this:\n\n```python\nfrom sqsx.exceptions import NoRetry\n\n# to use with sqsx.Queue and remove the task\ndef task_handler(context: dict, a: int, b: int, c: int):\n raise NoRetry()\n\n# to use with sqsx.RawQueue and remove the message\ndef message_handler(queue_url: str, sqs_message: dict):\n raise NoRetry()\n```\n",
"bugtrack_url": null,
"license": null,
"summary": "A simple task processor for Amazon SQS",
"version": "0.5.1",
"project_urls": {
"Homepage": "https://github.com/allisson/pysqsx"
},
"split_keywords": [
"aws",
" sqs"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "9da8c225ad267ee4c6c57c26be83d1619ed7da1ce143ed96eb8b2261a11696b1",
"md5": "60f18783bb0c1610e78f61f022395a4c",
"sha256": "4459c40d89d460f5858a518cccb66323042268c00ed72cf18dce181dd53aa90e"
},
"downloads": -1,
"filename": "sqsx-0.5.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "60f18783bb0c1610e78f61f022395a4c",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": null,
"size": 10208,
"upload_time": "2024-04-15T10:19:24",
"upload_time_iso_8601": "2024-04-15T10:19:24.913997Z",
"url": "https://files.pythonhosted.org/packages/9d/a8/c225ad267ee4c6c57c26be83d1619ed7da1ce143ed96eb8b2261a11696b1/sqsx-0.5.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "794093df1c663379733deb7ac0c3a5c2ec4a13488fe731073018b9c8f15ce7a2",
"md5": "8e8f5aa8d975cdfa2a614723e527a642",
"sha256": "a41b6d27e1df1f6f6f09f5bf0d88b15f8952651390859415851322e6cf4add7d"
},
"downloads": -1,
"filename": "sqsx-0.5.1.tar.gz",
"has_sig": false,
"md5_digest": "8e8f5aa8d975cdfa2a614723e527a642",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 10325,
"upload_time": "2024-04-15T10:19:26",
"upload_time_iso_8601": "2024-04-15T10:19:26.749888Z",
"url": "https://files.pythonhosted.org/packages/79/40/93df1c663379733deb7ac0c3a5c2ec4a13488fe731073018b9c8f15ce7a2/sqsx-0.5.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-04-15 10:19:26",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "allisson",
"github_project": "pysqsx",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "sqsx"
}