pika


Namepika JSON
Version 1.3.2 PyPI version JSON
download
home_page
SummaryPika Python AMQP Client Library
upload_time2023-05-05 14:25:43
maintainer
docs_urlNone
author
requires_python>=3.7
licenseBSD-3-Clause
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage
            Pika
====
Pika is a RabbitMQ (AMQP 0-9-1) client library for Python.

|Version| |Python versions| |Actions Status| |Coverage| |License| |Docs|

Introduction
------------
Pika is a pure-Python implementation of the AMQP 0-9-1 protocol including
RabbitMQ's extensions.

- Supports Python 3.7+ (`1.1.0` was the last version to support 2.7)
- Since threads aren't appropriate to every situation, it doesn't require
  threads. Pika core takes care not to forbid them, either. The same goes for
  greenlets, callbacks, continuations, and generators. An instance of Pika's
  built-in connection adapters isn't thread-safe, however.
- People may be using direct sockets, plain old ``select()``, or any of the
  wide variety of ways of getting network events to and from a Python
  application. Pika tries to stay compatible with all of these, and to make
  adapting it to a new environment as simple as possible.

Documentation
-------------
Pika's documentation can be found at https://pika.readthedocs.io.

Example
-------
Here is the most simple example of use, sending a message with the
``pika.BlockingConnection`` adapter:

.. code :: python

    import pika

    connection = pika.BlockingConnection()
    channel = connection.channel()
    channel.basic_publish(exchange='test', routing_key='test',
                          body=b'Test message.')
    connection.close()

And an example of writing a blocking consumer:

.. code :: python

    import pika

    connection = pika.BlockingConnection()
    channel = connection.channel()

    for method_frame, properties, body in channel.consume('test'):
        # Display the message parts and acknowledge the message
        print(method_frame, properties, body)
        channel.basic_ack(method_frame.delivery_tag)

        # Escape out of the loop after 10 messages
        if method_frame.delivery_tag == 10:
            break

    # Cancel the consumer and return any pending messages
    requeued_messages = channel.cancel()
    print('Requeued %i messages' % requeued_messages)
    connection.close()

Pika provides the following adapters
------------------------------------

- ``pika.adapters.asyncio_connection.AsyncioConnection`` - asynchronous adapter
  for Python 3 `AsyncIO <https://docs.python.org/3/library/asyncio.html>`_'s
  I/O loop.
- ``pika.BlockingConnection`` - synchronous adapter on top of library for
  simple usage.
- ``pika.SelectConnection`` - asynchronous adapter without third-party
  dependencies.
- ``pika.adapters.gevent_connection.GeventConnection`` - asynchronous adapter
  for use with `Gevent <http://www.gevent.org>`_'s I/O loop.
- ``pika.adapters.tornado_connection.TornadoConnection`` - asynchronous adapter
  for use with `Tornado <http://tornadoweb.org>`_'s I/O loop.
- ``pika.adapters.twisted_connection.TwistedProtocolConnection`` - asynchronous
  adapter for use with `Twisted <http://twistedmatrix.com>`_'s I/O loop.

Multiple connection parameters
------------------------------
You can also pass multiple ``pika.ConnectionParameters`` instances for
fault-tolerance as in the code snippet below (host names are just examples, of
course). To enable retries, set ``connection_attempts`` and ``retry_delay`` as
needed in the last ``pika.ConnectionParameters`` element of the sequence.
Retries occur after connection attempts using all of the given connection
parameters fail.

.. code :: python

    import pika

    parameters = (
        pika.ConnectionParameters(host='rabbitmq.zone1.yourdomain.com'),
        pika.ConnectionParameters(host='rabbitmq.zone2.yourdomain.com',
                                  connection_attempts=5, retry_delay=1))
    connection = pika.BlockingConnection(parameters)

With non-blocking adapters, such as ``pika.SelectConnection`` and
``pika.adapters.asyncio_connection.AsyncioConnection``, you can request a
connection using multiple connection parameter instances via the connection
adapter's ``create_connection()`` class method.

Requesting message acknowledgements from another thread
-------------------------------------------------------
The single-threaded usage constraint of an individual Pika connection adapter
instance may result in a dropped AMQP/stream connection due to AMQP heartbeat
timeout in consumers that take a long time to process an incoming message. A
common solution is to delegate processing of the incoming messages to another
thread, while the connection adapter's thread continues to service its I/O
loop's message pump, permitting AMQP heartbeats and other I/O to be serviced in
a timely fashion.

Messages processed in another thread may not be acknowledged directly from that
thread, since all accesses to the connection adapter instance must be from a
single thread, which is the thread running the adapter's I/O loop. This is
accomplished by requesting a callback to be executed in the adapter's
I/O loop thread. For example, the callback function's implementation might look
like this:

.. code :: python

    def ack_message(channel, delivery_tag):
        """Note that `channel` must be the same Pika channel instance via which
        the message being acknowledged was retrieved (AMQP protocol constraint).
        """
        if channel.is_open:
            channel.basic_ack(delivery_tag)
        else:
            # Channel is already closed, so we can't acknowledge this message;
            # log and/or do something that makes sense for your app in this case.
            pass

The code running in the other thread may request the ``ack_message()`` function
to be executed in the connection adapter's I/O loop thread using an
adapter-specific mechanism:

- ``pika.BlockingConnection`` abstracts its I/O loop from the application and
  thus exposes ``pika.BlockingConnection.add_callback_threadsafe()``. Refer to
  this method's docstring for additional information. For example:

  .. code :: python

      connection.add_callback_threadsafe(functools.partial(ack_message, channel, delivery_tag))

- When using a non-blocking connection adapter, such as
  ``pika.adapters.asyncio_connection.AsyncioConnection`` or
  ``pika.SelectConnection``, you use the underlying asynchronous framework's
  native API for requesting an I/O loop-bound callback from another thread. For
  example, ``pika.SelectConnection``'s I/O loop provides
  ``add_callback_threadsafe()``,
  ``pika.adapters.tornado_connection.TornadoConnection``'s I/O loop has
  ``add_callback()``, while
  ``pika.adapters.asyncio_connection.AsyncioConnection``'s I/O loop exposes
  ``call_soon_threadsafe()``.

This threadsafe callback request mechanism may also be used to delegate
publishing of messages, etc., from a background thread to the connection
adapter's thread.

Connection recovery
-------------------

Some RabbitMQ clients (Bunny, Java, .NET, Objective-C, Swift) provide a way to
automatically recover a connection, its channels and topology (e.g. queues,
bindings and consumers) after a network failure. Others require connection
recovery to be performed by the application code and strive to make it a
straightforward process. Pika falls into the second category.

Pika supports multiple connection adapters. They take different approaches to
connection recovery.

For ``pika.BlockingConnection`` adapter exception handling can be used to check
for connection errors. Here is a very basic example:

.. code :: python

    import pika

    while True:
        try:
            connection = pika.BlockingConnection()
            channel = connection.channel()
            channel.basic_consume('test', on_message_callback)
            channel.start_consuming()
        # Don't recover if connection was closed by broker
        except pika.exceptions.ConnectionClosedByBroker:
            break
        # Don't recover on channel errors
        except pika.exceptions.AMQPChannelError:
            break
        # Recover on all other connection errors
        except pika.exceptions.AMQPConnectionError:
            continue

This example can be found in `examples/consume_recover.py`.

Generic operation retry libraries such as
`retry <https://github.com/invl/retry>`_ can be used. Decorators make it
possible to configure some additional recovery behaviours, like delays between
retries and limiting the number of retries:

.. code :: python

    from retry import retry


    @retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
    def consume():
        connection = pika.BlockingConnection()
        channel = connection.channel()
        channel.basic_consume('test', on_message_callback)

        try:
            channel.start_consuming()
        # Don't recover connections closed by server
        except pika.exceptions.ConnectionClosedByBroker:
            pass


    consume()

This example can be found in `examples/consume_recover_retry.py`.

For asynchronous adapters, use ``on_close_callback`` to react to connection
failure events. This callback can be used to clean up and recover the
connection.

An example of recovery using ``on_close_callback`` can be found in
`examples/asynchronous_consumer_example.py`.

Contributing
------------
To contribute to Pika, please make sure that any new features or changes to
existing functionality **include test coverage**.

*Pull requests that add or change code without adequate test coverage will be
rejected.*

Additionally, please format your code using
`Yapf <http://pypi.python.org/pypi/yapf>`_ with ``google`` style prior to
issuing your pull request. *Note: only format those lines that you have changed
in your pull request. If you format an entire file and change code outside of
the scope of your PR, it will likely be rejected.*

Extending to support additional I/O frameworks
----------------------------------------------
New non-blocking adapters may be implemented in either of the following ways:

- By subclassing ``pika.BaseConnection``, implementing its abstract method and
  passing its constructor an implementation of
  ``pika.adapters.utils.nbio_interface.AbstractIOServices``.
  ``pika.BaseConnection`` implements ``pika.connection.Connection``'s abstract
  methods, including internally-initiated connection logic. For examples, refer
  to the implementations of
  ``pika.adapters.asyncio_connection.AsyncioConnection``,
  ``pika.adapters.gevent_connection.GeventConnection`` and
  ``pika.adapters.tornado_connection.TornadoConnection``.
- By subclassing ``pika.connection.Connection`` and implementing its abstract
  methods. This approach facilitates implementation of custom
  connection-establishment and transport mechanisms. For an example, refer to
  the implementation of
  ``pika.adapters.twisted_connection.TwistedProtocolConnection``.

.. |Version| image:: https://img.shields.io/pypi/v/pika.svg?
   :target: http://badge.fury.io/py/pika

.. |Python versions| image:: https://img.shields.io/pypi/pyversions/pika.svg
    :target: https://pypi.python.org/pypi/pika

.. |Actions Status| image:: https://github.com/pika/pika/actions/workflows/main.yaml/badge.svg
   :target: https://github.com/pika/pika/actions/workflows/main.yaml

.. |Coverage| image:: https://img.shields.io/codecov/c/github/pika/pika.svg?
   :target: https://codecov.io/github/pika/pika?branch=main

.. |License| image:: https://img.shields.io/pypi/l/pika.svg?
   :target: https://pika.readthedocs.io

.. |Docs| image:: https://readthedocs.org/projects/pika/badge/?version=stable
   :target: https://pika.readthedocs.io
   :alt: Documentation Status

            

Raw data

            {
    "_id": null,
    "home_page": "",
    "name": "pika",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.7",
    "maintainer_email": "\"Gavin M. Roy\" <gavinmroy@gmail.com>, Luke Bakken <lukerbakken@gmail.com>",
    "keywords": "",
    "author": "",
    "author_email": "",
    "download_url": "https://files.pythonhosted.org/packages/db/db/d4102f356af18f316c67f2cead8ece307f731dd63140e2c71f170ddacf9b/pika-1.3.2.tar.gz",
    "platform": null,
    "description": "Pika\n====\nPika is a RabbitMQ (AMQP 0-9-1) client library for Python.\n\n|Version| |Python versions| |Actions Status| |Coverage| |License| |Docs|\n\nIntroduction\n------------\nPika is a pure-Python implementation of the AMQP 0-9-1 protocol including\nRabbitMQ's extensions.\n\n- Supports Python 3.7+ (`1.1.0` was the last version to support 2.7)\n- Since threads aren't appropriate to every situation, it doesn't require\n  threads. Pika core takes care not to forbid them, either. The same goes for\n  greenlets, callbacks, continuations, and generators. An instance of Pika's\n  built-in connection adapters isn't thread-safe, however.\n- People may be using direct sockets, plain old ``select()``, or any of the\n  wide variety of ways of getting network events to and from a Python\n  application. Pika tries to stay compatible with all of these, and to make\n  adapting it to a new environment as simple as possible.\n\nDocumentation\n-------------\nPika's documentation can be found at https://pika.readthedocs.io.\n\nExample\n-------\nHere is the most simple example of use, sending a message with the\n``pika.BlockingConnection`` adapter:\n\n.. code :: python\n\n    import pika\n\n    connection = pika.BlockingConnection()\n    channel = connection.channel()\n    channel.basic_publish(exchange='test', routing_key='test',\n                          body=b'Test message.')\n    connection.close()\n\nAnd an example of writing a blocking consumer:\n\n.. code :: python\n\n    import pika\n\n    connection = pika.BlockingConnection()\n    channel = connection.channel()\n\n    for method_frame, properties, body in channel.consume('test'):\n        # Display the message parts and acknowledge the message\n        print(method_frame, properties, body)\n        channel.basic_ack(method_frame.delivery_tag)\n\n        # Escape out of the loop after 10 messages\n        if method_frame.delivery_tag == 10:\n            break\n\n    # Cancel the consumer and return any pending messages\n    requeued_messages = channel.cancel()\n    print('Requeued %i messages' % requeued_messages)\n    connection.close()\n\nPika provides the following adapters\n------------------------------------\n\n- ``pika.adapters.asyncio_connection.AsyncioConnection`` - asynchronous adapter\n  for Python 3 `AsyncIO <https://docs.python.org/3/library/asyncio.html>`_'s\n  I/O loop.\n- ``pika.BlockingConnection`` - synchronous adapter on top of library for\n  simple usage.\n- ``pika.SelectConnection`` - asynchronous adapter without third-party\n  dependencies.\n- ``pika.adapters.gevent_connection.GeventConnection`` - asynchronous adapter\n  for use with `Gevent <http://www.gevent.org>`_'s I/O loop.\n- ``pika.adapters.tornado_connection.TornadoConnection`` - asynchronous adapter\n  for use with `Tornado <http://tornadoweb.org>`_'s I/O loop.\n- ``pika.adapters.twisted_connection.TwistedProtocolConnection`` - asynchronous\n  adapter for use with `Twisted <http://twistedmatrix.com>`_'s I/O loop.\n\nMultiple connection parameters\n------------------------------\nYou can also pass multiple ``pika.ConnectionParameters`` instances for\nfault-tolerance as in the code snippet below (host names are just examples, of\ncourse). To enable retries, set ``connection_attempts`` and ``retry_delay`` as\nneeded in the last ``pika.ConnectionParameters`` element of the sequence.\nRetries occur after connection attempts using all of the given connection\nparameters fail.\n\n.. code :: python\n\n    import pika\n\n    parameters = (\n        pika.ConnectionParameters(host='rabbitmq.zone1.yourdomain.com'),\n        pika.ConnectionParameters(host='rabbitmq.zone2.yourdomain.com',\n                                  connection_attempts=5, retry_delay=1))\n    connection = pika.BlockingConnection(parameters)\n\nWith non-blocking adapters, such as ``pika.SelectConnection`` and\n``pika.adapters.asyncio_connection.AsyncioConnection``, you can request a\nconnection using multiple connection parameter instances via the connection\nadapter's ``create_connection()`` class method.\n\nRequesting message acknowledgements from another thread\n-------------------------------------------------------\nThe single-threaded usage constraint of an individual Pika connection adapter\ninstance may result in a dropped AMQP/stream connection due to AMQP heartbeat\ntimeout in consumers that take a long time to process an incoming message. A\ncommon solution is to delegate processing of the incoming messages to another\nthread, while the connection adapter's thread continues to service its I/O\nloop's message pump, permitting AMQP heartbeats and other I/O to be serviced in\na timely fashion.\n\nMessages processed in another thread may not be acknowledged directly from that\nthread, since all accesses to the connection adapter instance must be from a\nsingle thread, which is the thread running the adapter's I/O loop. This is\naccomplished by requesting a callback to be executed in the adapter's\nI/O loop thread. For example, the callback function's implementation might look\nlike this:\n\n.. code :: python\n\n    def ack_message(channel, delivery_tag):\n        \"\"\"Note that `channel` must be the same Pika channel instance via which\n        the message being acknowledged was retrieved (AMQP protocol constraint).\n        \"\"\"\n        if channel.is_open:\n            channel.basic_ack(delivery_tag)\n        else:\n            # Channel is already closed, so we can't acknowledge this message;\n            # log and/or do something that makes sense for your app in this case.\n            pass\n\nThe code running in the other thread may request the ``ack_message()`` function\nto be executed in the connection adapter's I/O loop thread using an\nadapter-specific mechanism:\n\n- ``pika.BlockingConnection`` abstracts its I/O loop from the application and\n  thus exposes ``pika.BlockingConnection.add_callback_threadsafe()``. Refer to\n  this method's docstring for additional information. For example:\n\n  .. code :: python\n\n      connection.add_callback_threadsafe(functools.partial(ack_message, channel, delivery_tag))\n\n- When using a non-blocking connection adapter, such as\n  ``pika.adapters.asyncio_connection.AsyncioConnection`` or\n  ``pika.SelectConnection``, you use the underlying asynchronous framework's\n  native API for requesting an I/O loop-bound callback from another thread. For\n  example, ``pika.SelectConnection``'s I/O loop provides\n  ``add_callback_threadsafe()``,\n  ``pika.adapters.tornado_connection.TornadoConnection``'s I/O loop has\n  ``add_callback()``, while\n  ``pika.adapters.asyncio_connection.AsyncioConnection``'s I/O loop exposes\n  ``call_soon_threadsafe()``.\n\nThis threadsafe callback request mechanism may also be used to delegate\npublishing of messages, etc., from a background thread to the connection\nadapter's thread.\n\nConnection recovery\n-------------------\n\nSome RabbitMQ clients (Bunny, Java, .NET, Objective-C, Swift) provide a way to\nautomatically recover a connection, its channels and topology (e.g. queues,\nbindings and consumers) after a network failure. Others require connection\nrecovery to be performed by the application code and strive to make it a\nstraightforward process. Pika falls into the second category.\n\nPika supports multiple connection adapters. They take different approaches to\nconnection recovery.\n\nFor ``pika.BlockingConnection`` adapter exception handling can be used to check\nfor connection errors. Here is a very basic example:\n\n.. code :: python\n\n    import pika\n\n    while True:\n        try:\n            connection = pika.BlockingConnection()\n            channel = connection.channel()\n            channel.basic_consume('test', on_message_callback)\n            channel.start_consuming()\n        # Don't recover if connection was closed by broker\n        except pika.exceptions.ConnectionClosedByBroker:\n            break\n        # Don't recover on channel errors\n        except pika.exceptions.AMQPChannelError:\n            break\n        # Recover on all other connection errors\n        except pika.exceptions.AMQPConnectionError:\n            continue\n\nThis example can be found in `examples/consume_recover.py`.\n\nGeneric operation retry libraries such as\n`retry <https://github.com/invl/retry>`_ can be used. Decorators make it\npossible to configure some additional recovery behaviours, like delays between\nretries and limiting the number of retries:\n\n.. code :: python\n\n    from retry import retry\n\n\n    @retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))\n    def consume():\n        connection = pika.BlockingConnection()\n        channel = connection.channel()\n        channel.basic_consume('test', on_message_callback)\n\n        try:\n            channel.start_consuming()\n        # Don't recover connections closed by server\n        except pika.exceptions.ConnectionClosedByBroker:\n            pass\n\n\n    consume()\n\nThis example can be found in `examples/consume_recover_retry.py`.\n\nFor asynchronous adapters, use ``on_close_callback`` to react to connection\nfailure events. This callback can be used to clean up and recover the\nconnection.\n\nAn example of recovery using ``on_close_callback`` can be found in\n`examples/asynchronous_consumer_example.py`.\n\nContributing\n------------\nTo contribute to Pika, please make sure that any new features or changes to\nexisting functionality **include test coverage**.\n\n*Pull requests that add or change code without adequate test coverage will be\nrejected.*\n\nAdditionally, please format your code using\n`Yapf <http://pypi.python.org/pypi/yapf>`_ with ``google`` style prior to\nissuing your pull request. *Note: only format those lines that you have changed\nin your pull request. If you format an entire file and change code outside of\nthe scope of your PR, it will likely be rejected.*\n\nExtending to support additional I/O frameworks\n----------------------------------------------\nNew non-blocking adapters may be implemented in either of the following ways:\n\n- By subclassing ``pika.BaseConnection``, implementing its abstract method and\n  passing its constructor an implementation of\n  ``pika.adapters.utils.nbio_interface.AbstractIOServices``.\n  ``pika.BaseConnection`` implements ``pika.connection.Connection``'s abstract\n  methods, including internally-initiated connection logic. For examples, refer\n  to the implementations of\n  ``pika.adapters.asyncio_connection.AsyncioConnection``,\n  ``pika.adapters.gevent_connection.GeventConnection`` and\n  ``pika.adapters.tornado_connection.TornadoConnection``.\n- By subclassing ``pika.connection.Connection`` and implementing its abstract\n  methods. This approach facilitates implementation of custom\n  connection-establishment and transport mechanisms. For an example, refer to\n  the implementation of\n  ``pika.adapters.twisted_connection.TwistedProtocolConnection``.\n\n.. |Version| image:: https://img.shields.io/pypi/v/pika.svg?\n   :target: http://badge.fury.io/py/pika\n\n.. |Python versions| image:: https://img.shields.io/pypi/pyversions/pika.svg\n    :target: https://pypi.python.org/pypi/pika\n\n.. |Actions Status| image:: https://github.com/pika/pika/actions/workflows/main.yaml/badge.svg\n   :target: https://github.com/pika/pika/actions/workflows/main.yaml\n\n.. |Coverage| image:: https://img.shields.io/codecov/c/github/pika/pika.svg?\n   :target: https://codecov.io/github/pika/pika?branch=main\n\n.. |License| image:: https://img.shields.io/pypi/l/pika.svg?\n   :target: https://pika.readthedocs.io\n\n.. |Docs| image:: https://readthedocs.org/projects/pika/badge/?version=stable\n   :target: https://pika.readthedocs.io\n   :alt: Documentation Status\n",
    "bugtrack_url": null,
    "license": "BSD-3-Clause",
    "summary": "Pika Python AMQP Client Library",
    "version": "1.3.2",
    "project_urls": {
        "Homepage": "https://pika.readthedocs.io",
        "Source": "https://github.com/pika/pika"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "f9f3f412836ec714d36f0f4ab581b84c491e3f42c6b5b97a6c6ed1817f3c16d0",
                "md5": "fec3dac0b294d921a52c2be5fe2630cd",
                "sha256": "0779a7c1fafd805672796085560d290213a465e4f6f76a6fb19e378d8041a14f"
            },
            "downloads": -1,
            "filename": "pika-1.3.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "fec3dac0b294d921a52c2be5fe2630cd",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.7",
            "size": 155415,
            "upload_time": "2023-05-05T14:25:41",
            "upload_time_iso_8601": "2023-05-05T14:25:41.484259Z",
            "url": "https://files.pythonhosted.org/packages/f9/f3/f412836ec714d36f0f4ab581b84c491e3f42c6b5b97a6c6ed1817f3c16d0/pika-1.3.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "dbdbd4102f356af18f316c67f2cead8ece307f731dd63140e2c71f170ddacf9b",
                "md5": "f932cfbcc35956b43fd8d393b025957e",
                "sha256": "b2a327ddddf8570b4965b3576ac77091b850262d34ce8c1d8cb4e4146aa4145f"
            },
            "downloads": -1,
            "filename": "pika-1.3.2.tar.gz",
            "has_sig": false,
            "md5_digest": "f932cfbcc35956b43fd8d393b025957e",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.7",
            "size": 145029,
            "upload_time": "2023-05-05T14:25:43",
            "upload_time_iso_8601": "2023-05-05T14:25:43.368029Z",
            "url": "https://files.pythonhosted.org/packages/db/db/d4102f356af18f316c67f2cead8ece307f731dd63140e2c71f170ddacf9b/pika-1.3.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-05-05 14:25:43",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "pika",
    "github_project": "pika",
    "travis_ci": false,
    "coveralls": true,
    "github_actions": true,
    "lcname": "pika"
}
        
Elapsed time: 0.06061s