Rejected
========
Rejected is a AMQP consumer daemon and message processing framework. It allows
for rapid development of message processing consumers by handling all of the
core functionality of communicating with RabbitMQ and management of consumer
processes.
Rejected runs as a master process with multiple consumer configurations that are
each run it an isolated process. It has the ability to collect statistical
data from the consumer processes and report on it.
Rejected supports Python 2.7 and 3.4+.
|Version| |Status| |Coverage| |License|
Features
--------
- Automatic exception handling including connection management and consumer restarting
- Smart consumer classes that can automatically decode and deserialize message bodies based upon message headers
- Metrics logging and submission to statsd and InfluxDB
- Built-in profiling of consumer code
- Ability to write asynchronous code in consumers allowing for parallel communication with external resources
Documentation
-------------
https://rejected.readthedocs.io
Example Consumers
-----------------
.. code:: python
from rejected import consumer
import logging
LOGGER = logging.getLogger(__name__)
class Test(consumer.Consumer):
def process(self, message):
LOGGER.debug('In Test.process: %s' % message.body)
Async Consumer
^^^^^^^^^^^^^^
To make a consumer async, you can decorate the
`Consumer.prepare <http://rejected.readthedocs.org/en/latest/api_consumer.html#rejected.consumer.Consumer.prepare>`_
and `Consumer.process <http://rejected.readthedocs.org/en/latest/api_consumer.html#rejected.consumer.Consumer.process>`_
methods using Tornado's
`@gen.coroutine <http://www.tornadoweb.org/en/stable/gen.html#tornado.gen.coroutine>`_.
Asynchronous consumers do not allow for concurrent processing multiple messages in the same process, but
rather allow you to use asynchronous clients like
`Tornado's <http://tornadoweb.org>`_
`AsyncHTTPClient <http://www.tornadoweb.org/en/stable/httpclient.html>`_ and the
`Queries <http://queries.readthedocs.org/en/latest/tornado_session.html>`_
PostgreSQL library to perform parallel tasks using coroutines when processing a single message.
.. code:: python
import logging
from rejected import consumer
from tornado import gen
from tornado import httpclient
class AsyncExampleConsumer(consumer.Consumer):
@gen.coroutine
def process(self):
LOGGER.debug('Message: %r', self.body)
http_client = httpclient.AsyncHTTPClient()
results = yield [http_client.fetch('http://www.github.com'),
http_client.fetch('http://www.reddit.com')]
LOGGER.info('Length: %r', [len(r.body) for r in results])
Example Configuration
---------------------
.. code:: yaml
%YAML 1.2
---
Application:
poll_interval: 10.0
stats:
log: True
influxdb:
enabled: True
scheme: http
host: localhost
port: 8086
user: username
password: password
database: dbname
statsd:
enabled: True
host: localhost
port: 8125
prefix: applications.rejected
Connections:
rabbitmq:
host: localhost
port: 5672
user: guest
pass: guest
ssl: False
vhost: /
heartbeat_interval: 300
Consumers:
example:
consumer: rejected.example.Consumer
sentry_dsn: https://[YOUR-SENTRY-DSN]
connections:
- name: rabbitmq1
consume: True
drop_exchange: dlxname
qty: 2
queue: generated_messages
qos_prefetch: 100
ack: True
max_errors: 100
config:
foo: True
bar: baz
Daemon:
user: rejected
group: daemon
pidfile: /var/run/rejected/example.%(pid)s.pid
Logging:
version: 1
formatters:
verbose:
format: "%(levelname) -10s %(asctime)s %(process)-6d %(processName) -25s %(name) -20s %(funcName) -25s: %(message)s"
datefmt: "%Y-%m-%d %H:%M:%S"
verbose_correlation:
format: "%(levelname) -10s %(asctime)s %(process)-6d %(processName) -25s %(name) -20s %(funcName) -25s: %(message)s {CID %(correlation_id)s}"
datefmt: "%Y-%m-%d %H:%M:%S"
syslog:
format: "%(levelname)s <PID %(process)d:%(processName)s> %(name)s.%(funcName)s: %(message)s"
syslog_correlation:
format: "%(levelname)s <PID %(process)d:%(processName)s> %(name)s.%(funcName)s: %(message)s {CID %(correlation_id)s)"
filters:
correlation:
'()': rejected.log.CorrelationFilter
'exists': True
no_correlation:
'()': rejected.log.CorrelationFilter
'exists': False
handlers:
console:
class: logging.StreamHandler
formatter: verbose
debug_only: false
filters: [no_correlation]
console_correlation:
class: logging.StreamHandler
formatter: verbose_correlation
debug_only: false
filters: [correlation]
syslog:
class: logging.handlers.SysLogHandler
facility: daemon
address: /var/run/syslog
formatter: syslog
filters: [no_correlation]
syslog_correlation:
class: logging.handlers.SysLogHandler
facility: daemon
address: /var/run/syslog
formatter: syslog
filters: [correlation]
loggers:
helper:
level: INFO
propagate: true
handlers: [console, console_correlation, syslog, syslog_correlation]
rejected:
level: INFO
propagate: true
handlers: [console, console_correlation, syslog, syslog_correlation]
tornado:
level: INFO
propagate: true
handlers: [console, console_correlation, syslog, syslog_correlation]
disable_existing_loggers: true
incremental: false
Version History
---------------
Available at https://rejected.readthedocs.org/en/latest/history.html
.. |Version| image:: https://img.shields.io/pypi/v/rejected.svg?
:target: https://pypi.python.org/pypi/rejected
.. |Status| image:: https://img.shields.io/travis/gmr/rejected.svg?
:target: https://travis-ci.org/gmr/rejected
.. |Coverage| image:: https://img.shields.io/codecov/c/github/gmr/rejected.svg?
:target: https://codecov.io/github/gmr/rejected?branch=master
.. |License| image:: https://img.shields.io/pypi/l/rejected.svg?
:target: https://rejected.readthedocs.org
Raw data
{
"_id": null,
"home_page": "https://github.com/gmr/rejected",
"name": "rejected",
"maintainer": null,
"docs_url": "https://pythonhosted.org/rejected/",
"requires_python": null,
"maintainer_email": null,
"keywords": "amqp rabbitmq",
"author": "Gavin M. Roy",
"author_email": "gavinmroy@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/69/18/353da913990d9f60352f8a888a2f1b08c6efd6a0c925ef251faabcb013fc/rejected-3.23.1.tar.gz",
"platform": null,
"description": "Rejected\n========\n\nRejected is a AMQP consumer daemon and message processing framework. It allows\nfor rapid development of message processing consumers by handling all of the\ncore functionality of communicating with RabbitMQ and management of consumer\nprocesses.\n\nRejected runs as a master process with multiple consumer configurations that are\neach run it an isolated process. It has the ability to collect statistical\ndata from the consumer processes and report on it.\n\nRejected supports Python 2.7 and 3.4+.\n\n|Version| |Status| |Coverage| |License|\n\nFeatures\n--------\n\n- Automatic exception handling including connection management and consumer restarting\n- Smart consumer classes that can automatically decode and deserialize message bodies based upon message headers\n- Metrics logging and submission to statsd and InfluxDB\n- Built-in profiling of consumer code\n- Ability to write asynchronous code in consumers allowing for parallel communication with external resources\n\nDocumentation\n-------------\n\nhttps://rejected.readthedocs.io\n\nExample Consumers\n-----------------\n.. code:: python\n\n from rejected import consumer\n import logging\n\n LOGGER = logging.getLogger(__name__)\n\n\n class Test(consumer.Consumer):\n\n def process(self, message):\n LOGGER.debug('In Test.process: %s' % message.body)\n\nAsync Consumer\n^^^^^^^^^^^^^^\nTo make a consumer async, you can decorate the\n`Consumer.prepare <http://rejected.readthedocs.org/en/latest/api_consumer.html#rejected.consumer.Consumer.prepare>`_\nand `Consumer.process <http://rejected.readthedocs.org/en/latest/api_consumer.html#rejected.consumer.Consumer.process>`_\nmethods using Tornado's\n`@gen.coroutine <http://www.tornadoweb.org/en/stable/gen.html#tornado.gen.coroutine>`_.\nAsynchronous consumers do not allow for concurrent processing multiple messages in the same process, but\nrather allow you to use asynchronous clients like\n`Tornado's <http://tornadoweb.org>`_\n`AsyncHTTPClient <http://www.tornadoweb.org/en/stable/httpclient.html>`_ and the\n`Queries <http://queries.readthedocs.org/en/latest/tornado_session.html>`_\nPostgreSQL library to perform parallel tasks using coroutines when processing a single message.\n\n.. code:: python\n\n import logging\n\n from rejected import consumer\n\n from tornado import gen\n from tornado import httpclient\n\n\n class AsyncExampleConsumer(consumer.Consumer):\n\n @gen.coroutine\n def process(self):\n LOGGER.debug('Message: %r', self.body)\n http_client = httpclient.AsyncHTTPClient()\n results = yield [http_client.fetch('http://www.github.com'),\n http_client.fetch('http://www.reddit.com')]\n LOGGER.info('Length: %r', [len(r.body) for r in results])\n\n\nExample Configuration\n---------------------\n.. code:: yaml\n\n %YAML 1.2\n ---\n Application:\n poll_interval: 10.0\n stats:\n log: True\n influxdb:\n enabled: True\n scheme: http\n host: localhost\n port: 8086\n user: username\n password: password\n database: dbname\n statsd:\n enabled: True\n host: localhost\n port: 8125\n prefix: applications.rejected\n Connections:\n rabbitmq:\n host: localhost\n port: 5672\n user: guest\n pass: guest\n ssl: False\n vhost: /\n heartbeat_interval: 300\n Consumers:\n example:\n consumer: rejected.example.Consumer\n sentry_dsn: https://[YOUR-SENTRY-DSN]\n connections:\n - name: rabbitmq1\n consume: True\n drop_exchange: dlxname\n qty: 2\n queue: generated_messages\n qos_prefetch: 100\n ack: True\n max_errors: 100\n config:\n foo: True\n bar: baz\n\n Daemon:\n user: rejected\n group: daemon\n pidfile: /var/run/rejected/example.%(pid)s.pid\n\n Logging:\n version: 1\n formatters:\n verbose:\n format: \"%(levelname) -10s %(asctime)s %(process)-6d %(processName) -25s %(name) -20s %(funcName) -25s: %(message)s\"\n datefmt: \"%Y-%m-%d %H:%M:%S\"\n verbose_correlation:\n format: \"%(levelname) -10s %(asctime)s %(process)-6d %(processName) -25s %(name) -20s %(funcName) -25s: %(message)s {CID %(correlation_id)s}\"\n datefmt: \"%Y-%m-%d %H:%M:%S\"\n syslog:\n format: \"%(levelname)s <PID %(process)d:%(processName)s> %(name)s.%(funcName)s: %(message)s\"\n syslog_correlation:\n format: \"%(levelname)s <PID %(process)d:%(processName)s> %(name)s.%(funcName)s: %(message)s {CID %(correlation_id)s)\"\n filters:\n correlation:\n '()': rejected.log.CorrelationFilter\n 'exists': True\n no_correlation:\n '()': rejected.log.CorrelationFilter\n 'exists': False\n handlers:\n console:\n class: logging.StreamHandler\n formatter: verbose\n debug_only: false\n filters: [no_correlation]\n console_correlation:\n class: logging.StreamHandler\n formatter: verbose_correlation\n debug_only: false\n filters: [correlation]\n syslog:\n class: logging.handlers.SysLogHandler\n facility: daemon\n address: /var/run/syslog\n formatter: syslog\n filters: [no_correlation]\n syslog_correlation:\n class: logging.handlers.SysLogHandler\n facility: daemon\n address: /var/run/syslog\n formatter: syslog\n filters: [correlation]\n loggers:\n helper:\n level: INFO\n propagate: true\n handlers: [console, console_correlation, syslog, syslog_correlation]\n rejected:\n level: INFO\n propagate: true\n handlers: [console, console_correlation, syslog, syslog_correlation]\n tornado:\n level: INFO\n propagate: true\n handlers: [console, console_correlation, syslog, syslog_correlation]\n disable_existing_loggers: true\n incremental: false\n\nVersion History\n---------------\nAvailable at https://rejected.readthedocs.org/en/latest/history.html\n\n.. |Version| image:: https://img.shields.io/pypi/v/rejected.svg?\n :target: https://pypi.python.org/pypi/rejected\n\n.. |Status| image:: https://img.shields.io/travis/gmr/rejected.svg?\n :target: https://travis-ci.org/gmr/rejected\n\n.. |Coverage| image:: https://img.shields.io/codecov/c/github/gmr/rejected.svg?\n :target: https://codecov.io/github/gmr/rejected?branch=master\n\n.. |License| image:: https://img.shields.io/pypi/l/rejected.svg?\n :target: https://rejected.readthedocs.org\n\n",
"bugtrack_url": null,
"license": "BSD",
"summary": "Rejected is a Python RabbitMQ Consumer Framework and Controller Daemon",
"version": "3.23.1",
"project_urls": {
"Homepage": "https://github.com/gmr/rejected"
},
"split_keywords": [
"amqp",
"rabbitmq"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "56a17d9ed7c2c1c945726eb0d5c4a1a385b5f8a6a0e215fb0ec62163212190c3",
"md5": "45d494207aeabf812a586ce31ab5e9ab",
"sha256": "33aaa373cb492c689444d526360a1e4c9e78d7971e1e65bc60562a5dfc2523fa"
},
"downloads": -1,
"filename": "rejected-3.23.1-py2.py3-none-any.whl",
"has_sig": false,
"md5_digest": "45d494207aeabf812a586ce31ab5e9ab",
"packagetype": "bdist_wheel",
"python_version": "py2.py3",
"requires_python": null,
"size": 48199,
"upload_time": "2024-05-06T21:36:20",
"upload_time_iso_8601": "2024-05-06T21:36:20.238370Z",
"url": "https://files.pythonhosted.org/packages/56/a1/7d9ed7c2c1c945726eb0d5c4a1a385b5f8a6a0e215fb0ec62163212190c3/rejected-3.23.1-py2.py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "6918353da913990d9f60352f8a888a2f1b08c6efd6a0c925ef251faabcb013fc",
"md5": "5749a90906c8a1bdd196e034f98d2b2d",
"sha256": "d56047a549037463b9a3a64e8c75a3f0e08c3209ece3a0ed53b8e4131b4d3678"
},
"downloads": -1,
"filename": "rejected-3.23.1.tar.gz",
"has_sig": false,
"md5_digest": "5749a90906c8a1bdd196e034f98d2b2d",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 53651,
"upload_time": "2024-05-06T21:36:29",
"upload_time_iso_8601": "2024-05-06T21:36:29.612862Z",
"url": "https://files.pythonhosted.org/packages/69/18/353da913990d9f60352f8a888a2f1b08c6efd6a0c925ef251faabcb013fc/rejected-3.23.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-05-06 21:36:29",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "gmr",
"github_project": "rejected",
"travis_ci": true,
"coveralls": false,
"github_actions": false,
"lcname": "rejected"
}