aiormq


Nameaiormq JSON
Version 6.8.0 PyPI version JSON
download
home_pagehttps://github.com/mosquito/aiormq
SummaryPure python AMQP asynchronous client library
upload_time2024-01-15 11:27:29
maintainer
docs_urlNone
authorDmitry Orlov
requires_python>=3.8,<4.0
licenseApache-2.0
keywords rabbitmq asyncio amqp amqp 0.9.1 driver pamqp
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage
            ======
AIORMQ
======

.. image:: https://coveralls.io/repos/github/mosquito/aiormq/badge.svg?branch=master
   :target: https://coveralls.io/github/mosquito/aiormq?branch=master
   :alt: Coveralls

.. image:: https://img.shields.io/pypi/status/aiormq.svg
   :target: https://github.com/mosquito/aiormq
   :alt: Status

.. image:: https://github.com/mosquito/aiormq/workflows/tests/badge.svg
   :target: https://github.com/mosquito/aiormq/actions?query=workflow%3Atests
   :alt: Build status

.. image:: https://img.shields.io/pypi/v/aiormq.svg
   :target: https://pypi.python.org/pypi/aiormq/
   :alt: Latest Version

.. image:: https://img.shields.io/pypi/wheel/aiormq.svg
   :target: https://pypi.python.org/pypi/aiormq/

.. image:: https://img.shields.io/pypi/pyversions/aiormq.svg
   :target: https://pypi.python.org/pypi/aiormq/

.. image:: https://img.shields.io/pypi/l/aiormq.svg
   :target: https://github.com/mosquito/aiormq/blob/master/LICENSE.md


aiormq is a pure python AMQP client library.

.. contents:: Table of contents

Status
======

* 3.x.x branch - Production/Stable
* 4.x.x branch - Unstable (Experimental)
* 5.x.x and greater is only Production/Stable releases.

Features
========

* Connecting by URL

 * amqp example: **amqp://user:password@server.host/vhost**
 * secure amqp example: **amqps://user:password@server.host/vhost?cafile=ca.pem&keyfile=key.pem&certfile=cert.pem&no_verify_ssl=0**

* Buffered queue for received frames
* Only `PLAIN`_ auth mechanism support
* `Publisher confirms`_ support
* `Transactions`_ support
* Channel based asynchronous locks

  .. note::
      AMQP 0.9.1 requires serialize sending for some frame types
      on the channel. e.g. Content body must be following after
      content header. But frames might be sent asynchronously
      on another channels.

* Tracking unroutable messages
  (Use **connection.channel(on_return_raises=False)** for disabling)
* Full SSL/TLS support, using your choice of:
    * ``amqps://`` url query parameters:
        * ``cafile=`` - string contains path to ca certificate file
        * ``capath=`` - string contains path to ca certificates
        * ``cadata=`` - base64 encoded ca certificate data
        * ``keyfile=`` - string contains path to key file
        * ``certfile=`` - string contains path to certificate file
        * ``no_verify_ssl`` - boolean disables certificates validation
    * ``context=`` `SSLContext`_ keyword argument to ``connect()``.
* Python `type hints`_
* Uses `pamqp`_ as an AMQP 0.9.1 frame encoder/decoder


.. _Publisher confirms: https://www.rabbitmq.com/confirms.html
.. _Transactions: https://www.rabbitmq.com/semantics.html
.. _PLAIN: https://www.rabbitmq.com/authentication.html
.. _type hints: https://docs.python.org/3/library/typing.html
.. _pamqp: https://pypi.org/project/pamqp/
.. _SSLContext: https://docs.python.org/3/library/ssl.html#ssl.SSLContext

Tutorial
========

Introduction
------------

Simple consumer
***************

.. code-block:: python

    import asyncio
    import aiormq

    async def on_message(message):
        """
        on_message doesn't necessarily have to be defined as async.
        Here it is to show that it's possible.
        """
        print(f" [x] Received message {message!r}")
        print(f"Message body is: {message.body!r}")
        print("Before sleep!")
        await asyncio.sleep(5)   # Represents async I/O operations
        print("After sleep!")


    async def main():
        # Perform connection
        connection = await aiormq.connect("amqp://guest:guest@localhost/")

        # Creating a channel
        channel = await connection.channel()

        # Declaring queue
        declare_ok = await channel.queue_declare('helo')
        consume_ok = await channel.basic_consume(
            declare_ok.queue, on_message, no_ack=True
        )


    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.run_forever()


Simple publisher
****************

.. code-block:: python
    :name: test_simple_publisher

    import asyncio
    from typing import Optional

    import aiormq
    from aiormq.abc import DeliveredMessage


    MESSAGE: Optional[DeliveredMessage] = None


    async def main():
        global MESSAGE

        body = b'Hello World!'

        # Perform connection
        connection = await aiormq.connect("amqp://guest:guest@localhost//")

        # Creating a channel
        channel = await connection.channel()

        declare_ok = await channel.queue_declare("hello", auto_delete=True)

        # Sending the message
        await channel.basic_publish(body, routing_key='hello')
        print(f" [x] Sent {body}")

        MESSAGE = await channel.basic_get(declare_ok.queue)
        print(f" [x] Received message from {declare_ok.queue!r}")


    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    assert MESSAGE is not None
    assert MESSAGE.routing_key == "hello"
    assert MESSAGE.body == b'Hello World!'


Work Queues
-----------

Create new task
***************

.. code-block:: python

    import sys
    import asyncio
    import aiormq


    async def main():
        # Perform connection
        connection = await aiormq.connect("amqp://guest:guest@localhost/")

        # Creating a channel
        channel = await connection.channel()

        body = b' '.join(sys.argv[1:]) or b"Hello World!"

        # Sending the message
        await channel.basic_publish(
            body,
            routing_key='task_queue',
            properties=aiormq.spec.Basic.Properties(
                delivery_mode=1,
            )
        )

        print(f" [x] Sent {body!r}")

        await connection.close()


    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())


Simple worker
*************

.. code-block:: python

    import asyncio
    import aiormq
    import aiormq.abc


    async def on_message(message: aiormq.abc.DeliveredMessage):
        print(f" [x] Received message {message!r}")
        print(f"     Message body is: {message.body!r}")


    async def main():
        # Perform connection
        connection = await aiormq.connect("amqp://guest:guest@localhost/")


        # Creating a channel
        channel = await connection.channel()
        await channel.basic_qos(prefetch_count=1)

        # Declaring queue
        declare_ok = await channel.queue_declare('task_queue', durable=True)

        # Start listening the queue with name 'task_queue'
        await channel.basic_consume(declare_ok.queue, on_message, no_ack=True)


    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    # we enter a never-ending loop that waits for data and runs
    # callbacks whenever necessary.
    print(" [*] Waiting for messages. To exit press CTRL+C")
    loop.run_forever()


Publish Subscribe
-----------------

Publisher
*********

.. code-block:: python

    import sys
    import asyncio
    import aiormq


    async def main():
        # Perform connection
        connection = await aiormq.connect("amqp://guest:guest@localhost/")

        # Creating a channel
        channel = await connection.channel()

        await channel.exchange_declare(
            exchange='logs', exchange_type='fanout'
        )

        body = b' '.join(sys.argv[1:]) or b"Hello World!"

        # Sending the message
        await channel.basic_publish(
            body, routing_key='info', exchange='logs'
        )

        print(f" [x] Sent {body!r}")

        await connection.close()


    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())


Subscriber
**********

.. code-block:: python

    import asyncio
    import aiormq
    import aiormq.abc


    async def on_message(message: aiormq.abc.DeliveredMessage):
        print(f"[x] {message.body!r}")

        await message.channel.basic_ack(
            message.delivery.delivery_tag
        )


    async def main():
        # Perform connection
        connection = await aiormq.connect("amqp://guest:guest@localhost/")

        # Creating a channel
        channel = await connection.channel()
        await channel.basic_qos(prefetch_count=1)

        await channel.exchange_declare(
            exchange='logs', exchange_type='fanout'
        )

        # Declaring queue
        declare_ok = await channel.queue_declare(exclusive=True)

        # Binding the queue to the exchange
        await channel.queue_bind(declare_ok.queue, 'logs')

        # Start listening the queue with name 'task_queue'
        await channel.basic_consume(declare_ok.queue, on_message)


    loop = asyncio.get_event_loop()
    loop.create_task(main())

    # we enter a never-ending loop that waits for data
    # and runs callbacks whenever necessary.
    print(' [*] Waiting for logs. To exit press CTRL+C')
    loop.run_forever()


Routing
-------

Direct consumer
***************

.. code-block:: python

    import sys
    import asyncio
    import aiormq
    import aiormq.abc


    async def on_message(message: aiormq.abc.DeliveredMessage):
        print(f" [x] {message.delivery.routing_key!r}:{message.body!r}"
        await message.channel.basic_ack(
            message.delivery.delivery_tag
        )


    async def main():
        # Perform connection
        connection = aiormq.Connection("amqp://guest:guest@localhost/")
        await connection.connect()

        # Creating a channel
        channel = await connection.channel()
        await channel.basic_qos(prefetch_count=1)

        severities = sys.argv[1:]

        if not severities:
            sys.stderr.write(f"Usage: {sys.argv[0]} [info] [warning] [error]\n")
            sys.exit(1)

        # Declare an exchange
        await channel.exchange_declare(
            exchange='logs', exchange_type='direct'
        )

        # Declaring random queue
        declare_ok = await channel.queue_declare(durable=True, auto_delete=True)

        for severity in severities:
            await channel.queue_bind(
                declare_ok.queue, 'logs', routing_key=severity
            )

        # Start listening the random queue
        await channel.basic_consume(declare_ok.queue, on_message)


    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    # we enter a never-ending loop that waits for data
    # and runs callbacks whenever necessary.
    print(" [*] Waiting for messages. To exit press CTRL+C")
    loop.run_forever()


Emitter
*******

.. code-block:: python

    import sys
    import asyncio
    import aiormq


    async def main():
        # Perform connection
        connection = await aiormq.connect("amqp://guest:guest@localhost/")

        # Creating a channel
        channel = await connection.channel()

        await channel.exchange_declare(
            exchange='logs', exchange_type='direct'
        )

        body = (
            b' '.join(arg.encode() for arg in sys.argv[2:])
            or
            b"Hello World!"
        )

        # Sending the message
        routing_key = sys.argv[1] if len(sys.argv) > 2 else 'info'

        await channel.basic_publish(
            body, exchange='logs', routing_key=routing_key,
            properties=aiormq.spec.Basic.Properties(
                delivery_mode=1
            )
        )

        print(f" [x] Sent {body!r}")

        await connection.close()


    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Topics
------

Publisher
*********

.. code-block:: python

    import sys
    import asyncio
    import aiormq


    async def main():
        # Perform connection
        connection = await aiormq.connect("amqp://guest:guest@localhost/")

        # Creating a channel
        channel = await connection.channel()

        await channel.exchange_declare('topic_logs', exchange_type='topic')

        routing_key = (
            sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
        )

        body = (
            b' '.join(arg.encode() for arg in sys.argv[2:])
            or
            b"Hello World!"
        )

        # Sending the message
        await channel.basic_publish(
            body, exchange='topic_logs', routing_key=routing_key,
            properties=aiormq.spec.Basic.Properties(
                delivery_mode=1
            )
        )

        print(f" [x] Sent {body!r}")

        await connection.close()


    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Consumer
********

.. code-block:: python

    import asyncio
    import sys
    import aiormq
    import aiormq.abc


    async def on_message(message: aiormq.abc.DeliveredMessage):
        print(f" [x] {message.delivery.routing_key!r}:{message.body!r}")
        await message.channel.basic_ack(
            message.delivery.delivery_tag
        )


    async def main():
        # Perform connection
        connection = await aiormq.connect(
            "amqp://guest:guest@localhost/", loop=loop
        )

        # Creating a channel
        channel = await connection.channel()
        await channel.basic_qos(prefetch_count=1)

        # Declare an exchange
        await channel.exchange_declare('topic_logs', exchange_type='topic')

        # Declaring queue
        declare_ok = await channel.queue_declare('task_queue', durable=True)

        binding_keys = sys.argv[1:]

        if not binding_keys:
            sys.stderr.write(
                f"Usage: {sys.argv[0]} [binding_key]...\n"
            )
            sys.exit(1)

        for binding_key in binding_keys:
            await channel.queue_bind(
                declare_ok.queue, 'topic_logs', routing_key=binding_key
            )

        # Start listening the queue with name 'task_queue'
        await channel.basic_consume(declare_ok.queue, on_message)


    loop = asyncio.get_event_loop()
    loop.create_task(main())

    # we enter a never-ending loop that waits for
    # data and runs callbacks whenever necessary.
    print(" [*] Waiting for messages. To exit press CTRL+C")
    loop.run_forever()

Remote procedure call (RPC)
---------------------------

RPC server
**********

.. code-block:: python

    import asyncio
    import aiormq
    import aiormq.abc


    def fib(n):
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return fib(n-1) + fib(n-2)


    async def on_message(message:aiormq.abc.DeliveredMessage):
        n = int(message.body.decode())

        print(f" [.] fib({n})")
        response = str(fib(n)).encode()

        await message.channel.basic_publish(
            response, routing_key=message.header.properties.reply_to,
            properties=aiormq.spec.Basic.Properties(
                correlation_id=message.header.properties.correlation_id
            ),

        )

        await message.channel.basic_ack(message.delivery.delivery_tag)
        print('Request complete')


    async def main():
        # Perform connection
        connection = await aiormq.connect("amqp://guest:guest@localhost/")

        # Creating a channel
        channel = await connection.channel()

        # Declaring queue
        declare_ok = await channel.queue_declare('rpc_queue')

        # Start listening the queue with name 'hello'
        await channel.basic_consume(declare_ok.queue, on_message)


    loop = asyncio.get_event_loop()
    loop.create_task(main())

    # we enter a never-ending loop that waits for data
    # and runs callbacks whenever necessary.
    print(" [x] Awaiting RPC requests")
    loop.run_forever()


RPC client
**********

.. code-block:: python

    import asyncio
    import uuid
    import aiormq
    import aiormq.abc


    class FibonacciRpcClient:
        def __init__(self):
            self.connection = None      # type: aiormq.Connection
            self.channel = None         # type: aiormq.Channel
            self.callback_queue = ''
            self.futures = {}
            self.loop = loop

        async def connect(self):
            self.connection = await aiormq.connect("amqp://guest:guest@localhost/")

            self.channel = await self.connection.channel()
            declare_ok = await self.channel.queue_declare(
                exclusive=True, auto_delete=True
            )

            await self.channel.basic_consume(declare_ok.queue, self.on_response)

            self.callback_queue = declare_ok.queue

            return self

        async def on_response(self, message: aiormq.abc.DeliveredMessage):
            future = self.futures.pop(message.header.properties.correlation_id)
            future.set_result(message.body)

        async def call(self, n):
            correlation_id = str(uuid.uuid4())
            future = loop.create_future()

            self.futures[correlation_id] = future

            await self.channel.basic_publish(
                str(n).encode(), routing_key='rpc_queue',
                properties=aiormq.spec.Basic.Properties(
                    content_type='text/plain',
                    correlation_id=correlation_id,
                    reply_to=self.callback_queue,
                )
            )

            return int(await future)


    async def main():
        fibonacci_rpc = await FibonacciRpcClient().connect()
        print(" [x] Requesting fib(30)")
        response = await fibonacci_rpc.call(30)
        print(r" [.] Got {response!r}")


    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/mosquito/aiormq",
    "name": "aiormq",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.8,<4.0",
    "maintainer_email": "",
    "keywords": "rabbitmq,asyncio,amqp,amqp 0.9.1,driver,pamqp",
    "author": "Dmitry Orlov",
    "author_email": "me@mosquito.su",
    "download_url": "https://files.pythonhosted.org/packages/fe/1f/2d84569999665b1dfc7d81f9910b83bc75fe124265118803093ef3cba930/aiormq-6.8.0.tar.gz",
    "platform": null,
    "description": "======\nAIORMQ\n======\n\n.. image:: https://coveralls.io/repos/github/mosquito/aiormq/badge.svg?branch=master\n   :target: https://coveralls.io/github/mosquito/aiormq?branch=master\n   :alt: Coveralls\n\n.. image:: https://img.shields.io/pypi/status/aiormq.svg\n   :target: https://github.com/mosquito/aiormq\n   :alt: Status\n\n.. image:: https://github.com/mosquito/aiormq/workflows/tests/badge.svg\n   :target: https://github.com/mosquito/aiormq/actions?query=workflow%3Atests\n   :alt: Build status\n\n.. image:: https://img.shields.io/pypi/v/aiormq.svg\n   :target: https://pypi.python.org/pypi/aiormq/\n   :alt: Latest Version\n\n.. image:: https://img.shields.io/pypi/wheel/aiormq.svg\n   :target: https://pypi.python.org/pypi/aiormq/\n\n.. image:: https://img.shields.io/pypi/pyversions/aiormq.svg\n   :target: https://pypi.python.org/pypi/aiormq/\n\n.. image:: https://img.shields.io/pypi/l/aiormq.svg\n   :target: https://github.com/mosquito/aiormq/blob/master/LICENSE.md\n\n\naiormq is a pure python AMQP client library.\n\n.. contents:: Table of contents\n\nStatus\n======\n\n* 3.x.x branch - Production/Stable\n* 4.x.x branch - Unstable (Experimental)\n* 5.x.x and greater is only Production/Stable releases.\n\nFeatures\n========\n\n* Connecting by URL\n\n * amqp example: **amqp://user:password@server.host/vhost**\n * secure amqp example: **amqps://user:password@server.host/vhost?cafile=ca.pem&keyfile=key.pem&certfile=cert.pem&no_verify_ssl=0**\n\n* Buffered queue for received frames\n* Only `PLAIN`_ auth mechanism support\n* `Publisher confirms`_ support\n* `Transactions`_ support\n* Channel based asynchronous locks\n\n  .. note::\n      AMQP 0.9.1 requires serialize sending for some frame types\n      on the channel. e.g. Content body must be following after\n      content header. But frames might be sent asynchronously\n      on another channels.\n\n* Tracking unroutable messages\n  (Use **connection.channel(on_return_raises=False)** for disabling)\n* Full SSL/TLS support, using your choice of:\n    * ``amqps://`` url query parameters:\n        * ``cafile=`` - string contains path to ca certificate file\n        * ``capath=`` - string contains path to ca certificates\n        * ``cadata=`` - base64 encoded ca certificate data\n        * ``keyfile=`` - string contains path to key file\n        * ``certfile=`` - string contains path to certificate file\n        * ``no_verify_ssl`` - boolean disables certificates validation\n    * ``context=`` `SSLContext`_ keyword argument to ``connect()``.\n* Python `type hints`_\n* Uses `pamqp`_ as an AMQP 0.9.1 frame encoder/decoder\n\n\n.. _Publisher confirms: https://www.rabbitmq.com/confirms.html\n.. _Transactions: https://www.rabbitmq.com/semantics.html\n.. _PLAIN: https://www.rabbitmq.com/authentication.html\n.. _type hints: https://docs.python.org/3/library/typing.html\n.. _pamqp: https://pypi.org/project/pamqp/\n.. _SSLContext: https://docs.python.org/3/library/ssl.html#ssl.SSLContext\n\nTutorial\n========\n\nIntroduction\n------------\n\nSimple consumer\n***************\n\n.. code-block:: python\n\n    import asyncio\n    import aiormq\n\n    async def on_message(message):\n        \"\"\"\n        on_message doesn't necessarily have to be defined as async.\n        Here it is to show that it's possible.\n        \"\"\"\n        print(f\" [x] Received message {message!r}\")\n        print(f\"Message body is: {message.body!r}\")\n        print(\"Before sleep!\")\n        await asyncio.sleep(5)   # Represents async I/O operations\n        print(\"After sleep!\")\n\n\n    async def main():\n        # Perform connection\n        connection = await aiormq.connect(\"amqp://guest:guest@localhost/\")\n\n        # Creating a channel\n        channel = await connection.channel()\n\n        # Declaring queue\n        declare_ok = await channel.queue_declare('helo')\n        consume_ok = await channel.basic_consume(\n            declare_ok.queue, on_message, no_ack=True\n        )\n\n\n    loop = asyncio.get_event_loop()\n    loop.run_until_complete(main())\n    loop.run_forever()\n\n\nSimple publisher\n****************\n\n.. code-block:: python\n    :name: test_simple_publisher\n\n    import asyncio\n    from typing import Optional\n\n    import aiormq\n    from aiormq.abc import DeliveredMessage\n\n\n    MESSAGE: Optional[DeliveredMessage] = None\n\n\n    async def main():\n        global MESSAGE\n\n        body = b'Hello World!'\n\n        # Perform connection\n        connection = await aiormq.connect(\"amqp://guest:guest@localhost//\")\n\n        # Creating a channel\n        channel = await connection.channel()\n\n        declare_ok = await channel.queue_declare(\"hello\", auto_delete=True)\n\n        # Sending the message\n        await channel.basic_publish(body, routing_key='hello')\n        print(f\" [x] Sent {body}\")\n\n        MESSAGE = await channel.basic_get(declare_ok.queue)\n        print(f\" [x] Received message from {declare_ok.queue!r}\")\n\n\n    loop = asyncio.get_event_loop()\n    loop.run_until_complete(main())\n\n    assert MESSAGE is not None\n    assert MESSAGE.routing_key == \"hello\"\n    assert MESSAGE.body == b'Hello World!'\n\n\nWork Queues\n-----------\n\nCreate new task\n***************\n\n.. code-block:: python\n\n    import sys\n    import asyncio\n    import aiormq\n\n\n    async def main():\n        # Perform connection\n        connection = await aiormq.connect(\"amqp://guest:guest@localhost/\")\n\n        # Creating a channel\n        channel = await connection.channel()\n\n        body = b' '.join(sys.argv[1:]) or b\"Hello World!\"\n\n        # Sending the message\n        await channel.basic_publish(\n            body,\n            routing_key='task_queue',\n            properties=aiormq.spec.Basic.Properties(\n                delivery_mode=1,\n            )\n        )\n\n        print(f\" [x] Sent {body!r}\")\n\n        await connection.close()\n\n\n    loop = asyncio.get_event_loop()\n    loop.run_until_complete(main())\n\n\nSimple worker\n*************\n\n.. code-block:: python\n\n    import asyncio\n    import aiormq\n    import aiormq.abc\n\n\n    async def on_message(message: aiormq.abc.DeliveredMessage):\n        print(f\" [x] Received message {message!r}\")\n        print(f\"     Message body is: {message.body!r}\")\n\n\n    async def main():\n        # Perform connection\n        connection = await aiormq.connect(\"amqp://guest:guest@localhost/\")\n\n\n        # Creating a channel\n        channel = await connection.channel()\n        await channel.basic_qos(prefetch_count=1)\n\n        # Declaring queue\n        declare_ok = await channel.queue_declare('task_queue', durable=True)\n\n        # Start listening the queue with name 'task_queue'\n        await channel.basic_consume(declare_ok.queue, on_message, no_ack=True)\n\n\n    loop = asyncio.get_event_loop()\n    loop.run_until_complete(main())\n\n    # we enter a never-ending loop that waits for data and runs\n    # callbacks whenever necessary.\n    print(\" [*] Waiting for messages. To exit press CTRL+C\")\n    loop.run_forever()\n\n\nPublish Subscribe\n-----------------\n\nPublisher\n*********\n\n.. code-block:: python\n\n    import sys\n    import asyncio\n    import aiormq\n\n\n    async def main():\n        # Perform connection\n        connection = await aiormq.connect(\"amqp://guest:guest@localhost/\")\n\n        # Creating a channel\n        channel = await connection.channel()\n\n        await channel.exchange_declare(\n            exchange='logs', exchange_type='fanout'\n        )\n\n        body = b' '.join(sys.argv[1:]) or b\"Hello World!\"\n\n        # Sending the message\n        await channel.basic_publish(\n            body, routing_key='info', exchange='logs'\n        )\n\n        print(f\" [x] Sent {body!r}\")\n\n        await connection.close()\n\n\n    loop = asyncio.get_event_loop()\n    loop.run_until_complete(main())\n\n\nSubscriber\n**********\n\n.. code-block:: python\n\n    import asyncio\n    import aiormq\n    import aiormq.abc\n\n\n    async def on_message(message: aiormq.abc.DeliveredMessage):\n        print(f\"[x] {message.body!r}\")\n\n        await message.channel.basic_ack(\n            message.delivery.delivery_tag\n        )\n\n\n    async def main():\n        # Perform connection\n        connection = await aiormq.connect(\"amqp://guest:guest@localhost/\")\n\n        # Creating a channel\n        channel = await connection.channel()\n        await channel.basic_qos(prefetch_count=1)\n\n        await channel.exchange_declare(\n            exchange='logs', exchange_type='fanout'\n        )\n\n        # Declaring queue\n        declare_ok = await channel.queue_declare(exclusive=True)\n\n        # Binding the queue to the exchange\n        await channel.queue_bind(declare_ok.queue, 'logs')\n\n        # Start listening the queue with name 'task_queue'\n        await channel.basic_consume(declare_ok.queue, on_message)\n\n\n    loop = asyncio.get_event_loop()\n    loop.create_task(main())\n\n    # we enter a never-ending loop that waits for data\n    # and runs callbacks whenever necessary.\n    print(' [*] Waiting for logs. To exit press CTRL+C')\n    loop.run_forever()\n\n\nRouting\n-------\n\nDirect consumer\n***************\n\n.. code-block:: python\n\n    import sys\n    import asyncio\n    import aiormq\n    import aiormq.abc\n\n\n    async def on_message(message: aiormq.abc.DeliveredMessage):\n        print(f\" [x] {message.delivery.routing_key!r}:{message.body!r}\"\n        await message.channel.basic_ack(\n            message.delivery.delivery_tag\n        )\n\n\n    async def main():\n        # Perform connection\n        connection = aiormq.Connection(\"amqp://guest:guest@localhost/\")\n        await connection.connect()\n\n        # Creating a channel\n        channel = await connection.channel()\n        await channel.basic_qos(prefetch_count=1)\n\n        severities = sys.argv[1:]\n\n        if not severities:\n            sys.stderr.write(f\"Usage: {sys.argv[0]} [info] [warning] [error]\\n\")\n            sys.exit(1)\n\n        # Declare an exchange\n        await channel.exchange_declare(\n            exchange='logs', exchange_type='direct'\n        )\n\n        # Declaring random queue\n        declare_ok = await channel.queue_declare(durable=True, auto_delete=True)\n\n        for severity in severities:\n            await channel.queue_bind(\n                declare_ok.queue, 'logs', routing_key=severity\n            )\n\n        # Start listening the random queue\n        await channel.basic_consume(declare_ok.queue, on_message)\n\n\n    loop = asyncio.get_event_loop()\n    loop.run_until_complete(main())\n\n    # we enter a never-ending loop that waits for data\n    # and runs callbacks whenever necessary.\n    print(\" [*] Waiting for messages. To exit press CTRL+C\")\n    loop.run_forever()\n\n\nEmitter\n*******\n\n.. code-block:: python\n\n    import sys\n    import asyncio\n    import aiormq\n\n\n    async def main():\n        # Perform connection\n        connection = await aiormq.connect(\"amqp://guest:guest@localhost/\")\n\n        # Creating a channel\n        channel = await connection.channel()\n\n        await channel.exchange_declare(\n            exchange='logs', exchange_type='direct'\n        )\n\n        body = (\n            b' '.join(arg.encode() for arg in sys.argv[2:])\n            or\n            b\"Hello World!\"\n        )\n\n        # Sending the message\n        routing_key = sys.argv[1] if len(sys.argv) > 2 else 'info'\n\n        await channel.basic_publish(\n            body, exchange='logs', routing_key=routing_key,\n            properties=aiormq.spec.Basic.Properties(\n                delivery_mode=1\n            )\n        )\n\n        print(f\" [x] Sent {body!r}\")\n\n        await connection.close()\n\n\n    loop = asyncio.get_event_loop()\n    loop.run_until_complete(main())\n\nTopics\n------\n\nPublisher\n*********\n\n.. code-block:: python\n\n    import sys\n    import asyncio\n    import aiormq\n\n\n    async def main():\n        # Perform connection\n        connection = await aiormq.connect(\"amqp://guest:guest@localhost/\")\n\n        # Creating a channel\n        channel = await connection.channel()\n\n        await channel.exchange_declare('topic_logs', exchange_type='topic')\n\n        routing_key = (\n            sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'\n        )\n\n        body = (\n            b' '.join(arg.encode() for arg in sys.argv[2:])\n            or\n            b\"Hello World!\"\n        )\n\n        # Sending the message\n        await channel.basic_publish(\n            body, exchange='topic_logs', routing_key=routing_key,\n            properties=aiormq.spec.Basic.Properties(\n                delivery_mode=1\n            )\n        )\n\n        print(f\" [x] Sent {body!r}\")\n\n        await connection.close()\n\n\n    loop = asyncio.get_event_loop()\n    loop.run_until_complete(main())\n\nConsumer\n********\n\n.. code-block:: python\n\n    import asyncio\n    import sys\n    import aiormq\n    import aiormq.abc\n\n\n    async def on_message(message: aiormq.abc.DeliveredMessage):\n        print(f\" [x] {message.delivery.routing_key!r}:{message.body!r}\")\n        await message.channel.basic_ack(\n            message.delivery.delivery_tag\n        )\n\n\n    async def main():\n        # Perform connection\n        connection = await aiormq.connect(\n            \"amqp://guest:guest@localhost/\", loop=loop\n        )\n\n        # Creating a channel\n        channel = await connection.channel()\n        await channel.basic_qos(prefetch_count=1)\n\n        # Declare an exchange\n        await channel.exchange_declare('topic_logs', exchange_type='topic')\n\n        # Declaring queue\n        declare_ok = await channel.queue_declare('task_queue', durable=True)\n\n        binding_keys = sys.argv[1:]\n\n        if not binding_keys:\n            sys.stderr.write(\n                f\"Usage: {sys.argv[0]} [binding_key]...\\n\"\n            )\n            sys.exit(1)\n\n        for binding_key in binding_keys:\n            await channel.queue_bind(\n                declare_ok.queue, 'topic_logs', routing_key=binding_key\n            )\n\n        # Start listening the queue with name 'task_queue'\n        await channel.basic_consume(declare_ok.queue, on_message)\n\n\n    loop = asyncio.get_event_loop()\n    loop.create_task(main())\n\n    # we enter a never-ending loop that waits for\n    # data and runs callbacks whenever necessary.\n    print(\" [*] Waiting for messages. To exit press CTRL+C\")\n    loop.run_forever()\n\nRemote procedure call (RPC)\n---------------------------\n\nRPC server\n**********\n\n.. code-block:: python\n\n    import asyncio\n    import aiormq\n    import aiormq.abc\n\n\n    def fib(n):\n        if n == 0:\n            return 0\n        elif n == 1:\n            return 1\n        else:\n            return fib(n-1) + fib(n-2)\n\n\n    async def on_message(message:aiormq.abc.DeliveredMessage):\n        n = int(message.body.decode())\n\n        print(f\" [.] fib({n})\")\n        response = str(fib(n)).encode()\n\n        await message.channel.basic_publish(\n            response, routing_key=message.header.properties.reply_to,\n            properties=aiormq.spec.Basic.Properties(\n                correlation_id=message.header.properties.correlation_id\n            ),\n\n        )\n\n        await message.channel.basic_ack(message.delivery.delivery_tag)\n        print('Request complete')\n\n\n    async def main():\n        # Perform connection\n        connection = await aiormq.connect(\"amqp://guest:guest@localhost/\")\n\n        # Creating a channel\n        channel = await connection.channel()\n\n        # Declaring queue\n        declare_ok = await channel.queue_declare('rpc_queue')\n\n        # Start listening the queue with name 'hello'\n        await channel.basic_consume(declare_ok.queue, on_message)\n\n\n    loop = asyncio.get_event_loop()\n    loop.create_task(main())\n\n    # we enter a never-ending loop that waits for data\n    # and runs callbacks whenever necessary.\n    print(\" [x] Awaiting RPC requests\")\n    loop.run_forever()\n\n\nRPC client\n**********\n\n.. code-block:: python\n\n    import asyncio\n    import uuid\n    import aiormq\n    import aiormq.abc\n\n\n    class FibonacciRpcClient:\n        def __init__(self):\n            self.connection = None      # type: aiormq.Connection\n            self.channel = None         # type: aiormq.Channel\n            self.callback_queue = ''\n            self.futures = {}\n            self.loop = loop\n\n        async def connect(self):\n            self.connection = await aiormq.connect(\"amqp://guest:guest@localhost/\")\n\n            self.channel = await self.connection.channel()\n            declare_ok = await self.channel.queue_declare(\n                exclusive=True, auto_delete=True\n            )\n\n            await self.channel.basic_consume(declare_ok.queue, self.on_response)\n\n            self.callback_queue = declare_ok.queue\n\n            return self\n\n        async def on_response(self, message: aiormq.abc.DeliveredMessage):\n            future = self.futures.pop(message.header.properties.correlation_id)\n            future.set_result(message.body)\n\n        async def call(self, n):\n            correlation_id = str(uuid.uuid4())\n            future = loop.create_future()\n\n            self.futures[correlation_id] = future\n\n            await self.channel.basic_publish(\n                str(n).encode(), routing_key='rpc_queue',\n                properties=aiormq.spec.Basic.Properties(\n                    content_type='text/plain',\n                    correlation_id=correlation_id,\n                    reply_to=self.callback_queue,\n                )\n            )\n\n            return int(await future)\n\n\n    async def main():\n        fibonacci_rpc = await FibonacciRpcClient().connect()\n        print(\" [x] Requesting fib(30)\")\n        response = await fibonacci_rpc.call(30)\n        print(r\" [.] Got {response!r}\")\n\n\n    loop = asyncio.get_event_loop()\n    loop.run_until_complete(main())\n",
    "bugtrack_url": null,
    "license": "Apache-2.0",
    "summary": "Pure python AMQP asynchronous client library",
    "version": "6.8.0",
    "project_urls": {
        "Documentation": "https://github.com/mosquito/aiormq/blob/master/README.rst",
        "Homepage": "https://github.com/mosquito/aiormq",
        "Source": "https://github.com/mosquito/aiormq",
        "Tracker": "https://github.com/mosquito/aiormq/issues"
    },
    "split_keywords": [
        "rabbitmq",
        "asyncio",
        "amqp",
        "amqp 0.9.1",
        "driver",
        "pamqp"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "a004f275fadbc6af276afeb567418ccecca43b88d49224daab41d188e1558b23",
                "md5": "22997203bf3af32faeca0d4bc85590af",
                "sha256": "9a16174dcae4078c957a773d2f02d3dfd6c2fcf12c909dc244333a458f2aeab0"
            },
            "downloads": -1,
            "filename": "aiormq-6.8.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "22997203bf3af32faeca0d4bc85590af",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8,<4.0",
            "size": 31114,
            "upload_time": "2024-01-15T11:27:26",
            "upload_time_iso_8601": "2024-01-15T11:27:26.418644Z",
            "url": "https://files.pythonhosted.org/packages/a0/04/f275fadbc6af276afeb567418ccecca43b88d49224daab41d188e1558b23/aiormq-6.8.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "fe1f2d84569999665b1dfc7d81f9910b83bc75fe124265118803093ef3cba930",
                "md5": "c76098e3fca2940f4cf73b3a4029c1b8",
                "sha256": "198f9c7430feb7bc491016099a06266dc45880b6b1de3925d410fde6541a66fb"
            },
            "downloads": -1,
            "filename": "aiormq-6.8.0.tar.gz",
            "has_sig": false,
            "md5_digest": "c76098e3fca2940f4cf73b3a4029c1b8",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8,<4.0",
            "size": 26582,
            "upload_time": "2024-01-15T11:27:29",
            "upload_time_iso_8601": "2024-01-15T11:27:29.334858Z",
            "url": "https://files.pythonhosted.org/packages/fe/1f/2d84569999665b1dfc7d81f9910b83bc75fe124265118803093ef3cba930/aiormq-6.8.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-01-15 11:27:29",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "mosquito",
    "github_project": "aiormq",
    "travis_ci": false,
    "coveralls": true,
    "github_actions": true,
    "lcname": "aiormq"
}
        
Elapsed time: 0.17632s