AWS SQS Listener
----------------
.. image:: https://img.shields.io/pypi/v/pySqsListener.svg?style=popout
:alt: PyPI
:target: https://github.com/jegesh/python-sqs-listener
.. image:: https://img.shields.io/pypi/pyversions/pySqsListener.svg?style=popout
:alt: PyPI - Python Version
:target: https://pypi.org/project/pySqsListener/
This package takes care of the boilerplate involved in listening to an SQS
queue, as well as sending messages to a queue. Works with python 2.7 & 3.6+.
Installation
~~~~~~~~~~~~
``pip install pySqsListener``
Listening to a queue
~~~~~~~~~~~~~~~~~~~~
| Using the listener is very straightforward - just inherit from the
``SqsListener`` class and implement the ``handle_message()`` method.
The queue will be created at runtime if it doesn't already exist.
You can also specify an error queue to automatically push any errors to.
Here is a basic code sample:
**Standard Listener**
::
from sqs_listener import SqsListener
class MyListener(SqsListener):
def handle_message(self, body, attributes, messages_attributes):
run_my_function(body['param1'], body['param2'])
listener = MyListener('my-message-queue', error_queue='my-error-queue', region_name='us-east-1')
listener.listen()
**Error Listener**
::
from sqs_listener import SqsListener
class MyErrorListener(SqsListener):
def handle_message(self, body, attributes, messages_attributes):
save_to_log(body['exception_type'], body['error_message']
error_listener = MyErrorListener('my-error-queue')
error_listener.listen()
| The options available as ``kwargs`` are as follows:
- error_queue (str) - name of queue to push errors.
- force_delete (boolean) - delete the message received from the queue, whether or not the handler function is successful. By default the message is deleted only if the handler function returns with no exceptions
- interval (int) - number of seconds in between polls. Set to 60 by default
- visibility_timeout (str) - Number of seconds the message will be invisible ('in flight') after being read. After this time interval it reappear in the queue if it wasn't deleted in the meantime. Set to '600' (10 minutes) by default
- error_visibility_timeout (str) - Same as previous argument, for the error queue. Applicable only if the ``error_queue`` argument is set, and the queue doesn't already exist.
- wait_time (int) - number of seconds to wait for a message to arrive (for long polling). Set to 0 by default to provide short polling.
- max_number_of_messages (int) - Max number of messages to receive from the queue. Set to 1 by default, max is 10
- message_attribute_names (list) - message attributes by which to filter messages
- attribute_names (list) - attributes by which to filter messages (see boto docs for difference between these two)
- region_name (str) - AWS region name (defaults to ``us-east-1``)
- queue_url (str) - overrides ``queue`` parameter. Mostly useful for getting around `this bug <https://github.com/aws/aws-cli/issues/1715>`_ in the boto library
- deserializer (function str -> dict) - Deserialization function that will be used to parse the message body. Set to python's ``json.loads`` by default.
- aws_access_key, aws_secret_key (str) - for manually providing AWS credentials
Running as a Daemon
~~~~~~~~~~~~~~~~~~~
| Typically, in a production environment, you'll want to listen to an SQS queue with a daemonized process.
The simplest way to do this is by running the listener in a detached process. On a typical Linux distribution it might look like this:
|
``nohup python my_listener.py > listener.log &``
| And saving the resulting process id for later (for stopping the listener via the ``kill`` command).
|
A more complete implementation can be achieved easily by inheriting from the package's ``Daemon`` class and overriding the ``run()`` method.
|
| The sample_daemon.py file in the source root folder provides a clear example for achieving this. Using this example,
you can run the listener as a daemon with the command ``python sample_daemon.py start``. Similarly, the command
``python sample_daemon.py stop`` will stop the process. You'll most likely need to run the start script using ``sudo``.
|
Logging
~~~~~~~
| The listener and launcher instances push all their messages to a ``logger`` instance, called 'sqs_listener'.
In order to view the messages, the logger needs to be redirected to ``stdout`` or to a log file.
|
| For instance:
::
logger = logging.getLogger('sqs_listener')
logger.setLevel(logging.INFO)
sh = logging.StreamHandler()
formatstr = '[%(asctime)s - %(name)s - %(levelname)s] %(message)s'
formatter = logging.Formatter(formatstr)
sh.setFormatter(formatter)
logger.addHandler(sh)
|
| Or to a log file:
::
logger = logging.getLogger('sqs_listener')
logger.setLevel(logging.INFO)
sh = logging.FileHandler('mylog.log')
sh.setLevel(logging.INFO)
formatstr = '[%(asctime)s - %(name)s - %(levelname)s] %(message)s'
formatter = logging.Formatter(formatstr)
sh.setFormatter(formatter)
logger.addHandler(sh)
Sending messages
~~~~~~~~~~~~~~~~
| In order to send a message, instantiate an ``SqsLauncher`` with the name of the queue. By default an exception will
be raised if the queue doesn't exist, but it can be created automatically if the ``create_queue`` parameter is
set to true. In such a case, there's also an option to set the newly created queue's ``VisibilityTimeout`` via the
third parameter. It is possible to provide a ``serializer`` function if custom types need to be sent. This function
expects a dict object and should return a string. If not provided, python's `json.dumps` is used by default.
|
| After instantiation, use the ``launch_message()`` method to send the message. The message body should be a ``dict``,
and additional kwargs can be specified as stated in the `SQS docs
<http://boto3.readthedocs.io/en/latest/reference/services/sqs.html#SQS.Client.send_message>`_.
The method returns the response from SQS.
**Launcher Example**
::
from sqs_launcher import SqsLauncher
launcher = SqsLauncher('my-queue')
response = launcher.launch_message({'param1': 'hello', 'param2': 'world'})
Important Notes
~~~~~~~~~~~~~~~
- The environment variable ``AWS_ACCOUNT_ID`` must be set, in addition
to the environment having valid AWS credentials (via environment variables
or a credentials file) or if running in an aws ec2 instance a role attached
with the required permissions.
- For both the main queue and the error queue, if the queue doesn’t
exist (in the specified region), it will be created at runtime.
- The error queue receives only two values in the message body: ``exception_type`` and ``error_message``. Both are of type ``str``
- If the function that the listener executes involves connecting to a database, you should explicitly close the connection at the end of the function. Otherwise, you're likely to get an error like this: ``OperationalError(2006, 'MySQL server has gone away')``
- Either the queue name or the queue url should be provided. When both are provided the queue url is used and the queue name is ignored.
Contributing
~~~~~~~~~~~~
Fork the repo and make a pull request.
Raw data
{
"_id": null,
"home_page": "https://github.com/jegesh/python-sqs-listener",
"name": "pySqsListener",
"maintainer": "",
"docs_url": null,
"requires_python": "",
"maintainer_email": "",
"keywords": "aws sqs listener and message launcher",
"author": "Yaakov Gesher",
"author_email": "yaakov@gesher.net",
"download_url": "",
"platform": null,
"description": "AWS SQS Listener\n----------------\n\n.. image:: https://img.shields.io/pypi/v/pySqsListener.svg?style=popout\n :alt: PyPI\n :target: https://github.com/jegesh/python-sqs-listener\n.. image:: https://img.shields.io/pypi/pyversions/pySqsListener.svg?style=popout\n :alt: PyPI - Python Version\n :target: https://pypi.org/project/pySqsListener/\n\n\n\n\nThis package takes care of the boilerplate involved in listening to an SQS\nqueue, as well as sending messages to a queue. Works with python 2.7 & 3.6+.\n\nInstallation\n~~~~~~~~~~~~\n\n``pip install pySqsListener``\n\nListening to a queue\n~~~~~~~~~~~~~~~~~~~~\n\n| Using the listener is very straightforward - just inherit from the\n ``SqsListener`` class and implement the ``handle_message()`` method.\n The queue will be created at runtime if it doesn't already exist.\n You can also specify an error queue to automatically push any errors to.\n\nHere is a basic code sample:\n\n**Standard Listener**\n\n::\n\n from sqs_listener import SqsListener\n\n class MyListener(SqsListener):\n def handle_message(self, body, attributes, messages_attributes):\n run_my_function(body['param1'], body['param2'])\n\n listener = MyListener('my-message-queue', error_queue='my-error-queue', region_name='us-east-1')\n listener.listen()\n\n**Error Listener**\n\n::\n\n from sqs_listener import SqsListener\n class MyErrorListener(SqsListener):\n def handle_message(self, body, attributes, messages_attributes):\n save_to_log(body['exception_type'], body['error_message']\n\n error_listener = MyErrorListener('my-error-queue')\n error_listener.listen()\n\n\n| The options available as ``kwargs`` are as follows:\n\n- error_queue (str) - name of queue to push errors.\n- force_delete (boolean) - delete the message received from the queue, whether or not the handler function is successful. By default the message is deleted only if the handler function returns with no exceptions\n- interval (int) - number of seconds in between polls. Set to 60 by default\n- visibility_timeout (str) - Number of seconds the message will be invisible ('in flight') after being read. After this time interval it reappear in the queue if it wasn't deleted in the meantime. Set to '600' (10 minutes) by default\n- error_visibility_timeout (str) - Same as previous argument, for the error queue. Applicable only if the ``error_queue`` argument is set, and the queue doesn't already exist.\n- wait_time (int) - number of seconds to wait for a message to arrive (for long polling). Set to 0 by default to provide short polling.\n- max_number_of_messages (int) - Max number of messages to receive from the queue. Set to 1 by default, max is 10\n- message_attribute_names (list) - message attributes by which to filter messages\n- attribute_names (list) - attributes by which to filter messages (see boto docs for difference between these two)\n- region_name (str) - AWS region name (defaults to ``us-east-1``)\n- queue_url (str) - overrides ``queue`` parameter. Mostly useful for getting around `this bug <https://github.com/aws/aws-cli/issues/1715>`_ in the boto library\n- deserializer (function str -> dict) - Deserialization function that will be used to parse the message body. Set to python's ``json.loads`` by default.\n- aws_access_key, aws_secret_key (str) - for manually providing AWS credentials\n\n\nRunning as a Daemon\n~~~~~~~~~~~~~~~~~~~\n\n| Typically, in a production environment, you'll want to listen to an SQS queue with a daemonized process.\n The simplest way to do this is by running the listener in a detached process. On a typical Linux distribution it might look like this:\n|\n ``nohup python my_listener.py > listener.log &``\n| And saving the resulting process id for later (for stopping the listener via the ``kill`` command).\n|\n A more complete implementation can be achieved easily by inheriting from the package's ``Daemon`` class and overriding the ``run()`` method.\n|\n| The sample_daemon.py file in the source root folder provides a clear example for achieving this. Using this example,\n you can run the listener as a daemon with the command ``python sample_daemon.py start``. Similarly, the command\n ``python sample_daemon.py stop`` will stop the process. You'll most likely need to run the start script using ``sudo``.\n|\n\nLogging\n~~~~~~~\n\n| The listener and launcher instances push all their messages to a ``logger`` instance, called 'sqs_listener'.\n In order to view the messages, the logger needs to be redirected to ``stdout`` or to a log file.\n|\n| For instance:\n\n::\n\n logger = logging.getLogger('sqs_listener')\n logger.setLevel(logging.INFO)\n\n sh = logging.StreamHandler()\n\n formatstr = '[%(asctime)s - %(name)s - %(levelname)s] %(message)s'\n formatter = logging.Formatter(formatstr)\n\n sh.setFormatter(formatter)\n logger.addHandler(sh)\n\n|\n| Or to a log file:\n\n::\n\n logger = logging.getLogger('sqs_listener')\n logger.setLevel(logging.INFO)\n\n sh = logging.FileHandler('mylog.log')\n sh.setLevel(logging.INFO)\n\n formatstr = '[%(asctime)s - %(name)s - %(levelname)s] %(message)s'\n formatter = logging.Formatter(formatstr)\n\n sh.setFormatter(formatter)\n logger.addHandler(sh)\n\nSending messages\n~~~~~~~~~~~~~~~~\n\n| In order to send a message, instantiate an ``SqsLauncher`` with the name of the queue. By default an exception will\n be raised if the queue doesn't exist, but it can be created automatically if the ``create_queue`` parameter is\n set to true. In such a case, there's also an option to set the newly created queue's ``VisibilityTimeout`` via the\n third parameter. It is possible to provide a ``serializer`` function if custom types need to be sent. This function\n expects a dict object and should return a string. If not provided, python's `json.dumps` is used by default.\n\n|\n| After instantiation, use the ``launch_message()`` method to send the message. The message body should be a ``dict``,\n and additional kwargs can be specified as stated in the `SQS docs\n <http://boto3.readthedocs.io/en/latest/reference/services/sqs.html#SQS.Client.send_message>`_.\n The method returns the response from SQS.\n\n**Launcher Example**\n\n::\n\n from sqs_launcher import SqsLauncher\n\n launcher = SqsLauncher('my-queue')\n response = launcher.launch_message({'param1': 'hello', 'param2': 'world'})\n\nImportant Notes\n~~~~~~~~~~~~~~~\n\n- The environment variable ``AWS_ACCOUNT_ID`` must be set, in addition\n to the environment having valid AWS credentials (via environment variables\n or a credentials file) or if running in an aws ec2 instance a role attached\n with the required permissions.\n- For both the main queue and the error queue, if the queue doesn\u2019t\n exist (in the specified region), it will be created at runtime.\n- The error queue receives only two values in the message body: ``exception_type`` and ``error_message``. Both are of type ``str``\n- If the function that the listener executes involves connecting to a database, you should explicitly close the connection at the end of the function. Otherwise, you're likely to get an error like this: ``OperationalError(2006, 'MySQL server has gone away')``\n- Either the queue name or the queue url should be provided. When both are provided the queue url is used and the queue name is ignored.\n\nContributing\n~~~~~~~~~~~~\n\nFork the repo and make a pull request.\n\n\n",
"bugtrack_url": null,
"license": "Apache Software License",
"summary": "A simple Python SQS utility package",
"version": "0.9.0",
"split_keywords": [
"aws",
"sqs",
"listener",
"and",
"message",
"launcher"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "d546f069543dd8870aee0591af773ce7f2e7bed7a77d3a6b8b586f9a04ef6c7a",
"md5": "dfff3d95185df4c582b0563953ea75c9",
"sha256": "c6adb5361bb0425d547db75300cc52cd3bf0bededf116549c0de2d82eb705504"
},
"downloads": -1,
"filename": "pySqsListener-0.9.0-py2.py3-none-any.whl",
"has_sig": false,
"md5_digest": "dfff3d95185df4c582b0563953ea75c9",
"packagetype": "bdist_wheel",
"python_version": "py2.py3",
"requires_python": null,
"size": 10419,
"upload_time": "2023-01-30T20:51:23",
"upload_time_iso_8601": "2023-01-30T20:51:23.843320Z",
"url": "https://files.pythonhosted.org/packages/d5/46/f069543dd8870aee0591af773ce7f2e7bed7a77d3a6b8b586f9a04ef6c7a/pySqsListener-0.9.0-py2.py3-none-any.whl",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-01-30 20:51:23",
"github": true,
"gitlab": false,
"bitbucket": false,
"github_user": "jegesh",
"github_project": "python-sqs-listener",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "pysqslistener"
}