======
AIORMQ
======
.. image:: https://coveralls.io/repos/github/mosquito/aiormq/badge.svg?branch=master
:target: https://coveralls.io/github/mosquito/aiormq?branch=master
:alt: Coveralls
.. image:: https://img.shields.io/pypi/status/aiormq.svg
:target: https://github.com/mosquito/aiormq
:alt: Status
.. image:: https://github.com/mosquito/aiormq/workflows/tests/badge.svg
:target: https://github.com/mosquito/aiormq/actions?query=workflow%3Atests
:alt: Build status
.. image:: https://img.shields.io/pypi/v/aiormq.svg
:target: https://pypi.python.org/pypi/aiormq/
:alt: Latest Version
.. image:: https://img.shields.io/pypi/wheel/aiormq.svg
:target: https://pypi.python.org/pypi/aiormq/
.. image:: https://img.shields.io/pypi/pyversions/aiormq.svg
:target: https://pypi.python.org/pypi/aiormq/
.. image:: https://img.shields.io/pypi/l/aiormq.svg
:target: https://github.com/mosquito/aiormq/blob/master/LICENSE.md
aiormq is a pure python AMQP client library.
.. contents:: Table of contents
Status
======
* 3.x.x branch - Production/Stable
* 4.x.x branch - Unstable (Experimental)
* 5.x.x and greater is only Production/Stable releases.
Features
========
* Connecting by URL
* amqp example: **amqp://user:password@server.host/vhost**
* secure amqp example: **amqps://user:password@server.host/vhost?cafile=ca.pem&keyfile=key.pem&certfile=cert.pem&no_verify_ssl=0**
* Buffered queue for received frames
* Only `PLAIN`_ auth mechanism support
* `Publisher confirms`_ support
* `Transactions`_ support
* Channel based asynchronous locks
.. note::
AMQP 0.9.1 requires serialize sending for some frame types
on the channel. e.g. Content body must be following after
content header. But frames might be sent asynchronously
on another channels.
* Tracking unroutable messages
(Use **connection.channel(on_return_raises=False)** for disabling)
* Full SSL/TLS support, using your choice of:
* ``amqps://`` url query parameters:
* ``cafile=`` - string contains path to ca certificate file
* ``capath=`` - string contains path to ca certificates
* ``cadata=`` - base64 encoded ca certificate data
* ``keyfile=`` - string contains path to key file
* ``certfile=`` - string contains path to certificate file
* ``no_verify_ssl`` - boolean disables certificates validation
* ``context=`` `SSLContext`_ keyword argument to ``connect()``.
* Python `type hints`_
* Uses `pamqp`_ as an AMQP 0.9.1 frame encoder/decoder
.. _Publisher confirms: https://www.rabbitmq.com/confirms.html
.. _Transactions: https://www.rabbitmq.com/semantics.html
.. _PLAIN: https://www.rabbitmq.com/authentication.html
.. _type hints: https://docs.python.org/3/library/typing.html
.. _pamqp: https://pypi.org/project/pamqp/
.. _SSLContext: https://docs.python.org/3/library/ssl.html#ssl.SSLContext
Tutorial
========
Introduction
------------
Simple consumer
***************
.. code-block:: python
import asyncio
import aiormq
async def on_message(message):
"""
on_message doesn't necessarily have to be defined as async.
Here it is to show that it's possible.
"""
print(f" [x] Received message {message!r}")
print(f"Message body is: {message.body!r}")
print("Before sleep!")
await asyncio.sleep(5) # Represents async I/O operations
print("After sleep!")
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
# Declaring queue
declare_ok = await channel.queue_declare('helo')
consume_ok = await channel.basic_consume(
declare_ok.queue, on_message, no_ack=True
)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_forever()
Simple publisher
****************
.. code-block:: python
:name: test_simple_publisher
import asyncio
from typing import Optional
import aiormq
from aiormq.abc import DeliveredMessage
MESSAGE: Optional[DeliveredMessage] = None
async def main():
global MESSAGE
body = b'Hello World!'
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost//")
# Creating a channel
channel = await connection.channel()
declare_ok = await channel.queue_declare("hello", auto_delete=True)
# Sending the message
await channel.basic_publish(body, routing_key='hello')
print(f" [x] Sent {body}")
MESSAGE = await channel.basic_get(declare_ok.queue)
print(f" [x] Received message from {declare_ok.queue!r}")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
assert MESSAGE is not None
assert MESSAGE.routing_key == "hello"
assert MESSAGE.body == b'Hello World!'
Work Queues
-----------
Create new task
***************
.. code-block:: python
import sys
import asyncio
import aiormq
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
body = b' '.join(sys.argv[1:]) or b"Hello World!"
# Sending the message
await channel.basic_publish(
body,
routing_key='task_queue',
properties=aiormq.spec.Basic.Properties(
delivery_mode=1,
)
)
print(f" [x] Sent {body!r}")
await connection.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Simple worker
*************
.. code-block:: python
import asyncio
import aiormq
import aiormq.abc
async def on_message(message: aiormq.abc.DeliveredMessage):
print(f" [x] Received message {message!r}")
print(f" Message body is: {message.body!r}")
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
await channel.basic_qos(prefetch_count=1)
# Declaring queue
declare_ok = await channel.queue_declare('task_queue', durable=True)
# Start listening the queue with name 'task_queue'
await channel.basic_consume(declare_ok.queue, on_message, no_ack=True)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# we enter a never-ending loop that waits for data and runs
# callbacks whenever necessary.
print(" [*] Waiting for messages. To exit press CTRL+C")
loop.run_forever()
Publish Subscribe
-----------------
Publisher
*********
.. code-block:: python
import sys
import asyncio
import aiormq
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
await channel.exchange_declare(
exchange='logs', exchange_type='fanout'
)
body = b' '.join(sys.argv[1:]) or b"Hello World!"
# Sending the message
await channel.basic_publish(
body, routing_key='info', exchange='logs'
)
print(f" [x] Sent {body!r}")
await connection.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Subscriber
**********
.. code-block:: python
import asyncio
import aiormq
import aiormq.abc
async def on_message(message: aiormq.abc.DeliveredMessage):
print(f"[x] {message.body!r}")
await message.channel.basic_ack(
message.delivery.delivery_tag
)
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
await channel.basic_qos(prefetch_count=1)
await channel.exchange_declare(
exchange='logs', exchange_type='fanout'
)
# Declaring queue
declare_ok = await channel.queue_declare(exclusive=True)
# Binding the queue to the exchange
await channel.queue_bind(declare_ok.queue, 'logs')
# Start listening the queue with name 'task_queue'
await channel.basic_consume(declare_ok.queue, on_message)
loop = asyncio.get_event_loop()
loop.create_task(main())
# we enter a never-ending loop that waits for data
# and runs callbacks whenever necessary.
print(' [*] Waiting for logs. To exit press CTRL+C')
loop.run_forever()
Routing
-------
Direct consumer
***************
.. code-block:: python
import sys
import asyncio
import aiormq
import aiormq.abc
async def on_message(message: aiormq.abc.DeliveredMessage):
print(f" [x] {message.delivery.routing_key!r}:{message.body!r}"
await message.channel.basic_ack(
message.delivery.delivery_tag
)
async def main():
# Perform connection
connection = aiormq.Connection("amqp://guest:guest@localhost/")
await connection.connect()
# Creating a channel
channel = await connection.channel()
await channel.basic_qos(prefetch_count=1)
severities = sys.argv[1:]
if not severities:
sys.stderr.write(f"Usage: {sys.argv[0]} [info] [warning] [error]\n")
sys.exit(1)
# Declare an exchange
await channel.exchange_declare(
exchange='logs', exchange_type='direct'
)
# Declaring random queue
declare_ok = await channel.queue_declare(durable=True, auto_delete=True)
for severity in severities:
await channel.queue_bind(
declare_ok.queue, 'logs', routing_key=severity
)
# Start listening the random queue
await channel.basic_consume(declare_ok.queue, on_message)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# we enter a never-ending loop that waits for data
# and runs callbacks whenever necessary.
print(" [*] Waiting for messages. To exit press CTRL+C")
loop.run_forever()
Emitter
*******
.. code-block:: python
import sys
import asyncio
import aiormq
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
await channel.exchange_declare(
exchange='logs', exchange_type='direct'
)
body = (
b' '.join(arg.encode() for arg in sys.argv[2:])
or
b"Hello World!"
)
# Sending the message
routing_key = sys.argv[1] if len(sys.argv) > 2 else 'info'
await channel.basic_publish(
body, exchange='logs', routing_key=routing_key,
properties=aiormq.spec.Basic.Properties(
delivery_mode=1
)
)
print(f" [x] Sent {body!r}")
await connection.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Topics
------
Publisher
*********
.. code-block:: python
import sys
import asyncio
import aiormq
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
await channel.exchange_declare('topic_logs', exchange_type='topic')
routing_key = (
sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
)
body = (
b' '.join(arg.encode() for arg in sys.argv[2:])
or
b"Hello World!"
)
# Sending the message
await channel.basic_publish(
body, exchange='topic_logs', routing_key=routing_key,
properties=aiormq.spec.Basic.Properties(
delivery_mode=1
)
)
print(f" [x] Sent {body!r}")
await connection.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Consumer
********
.. code-block:: python
import asyncio
import sys
import aiormq
import aiormq.abc
async def on_message(message: aiormq.abc.DeliveredMessage):
print(f" [x] {message.delivery.routing_key!r}:{message.body!r}")
await message.channel.basic_ack(
message.delivery.delivery_tag
)
async def main():
# Perform connection
connection = await aiormq.connect(
"amqp://guest:guest@localhost/", loop=loop
)
# Creating a channel
channel = await connection.channel()
await channel.basic_qos(prefetch_count=1)
# Declare an exchange
await channel.exchange_declare('topic_logs', exchange_type='topic')
# Declaring queue
declare_ok = await channel.queue_declare('task_queue', durable=True)
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write(
f"Usage: {sys.argv[0]} [binding_key]...\n"
)
sys.exit(1)
for binding_key in binding_keys:
await channel.queue_bind(
declare_ok.queue, 'topic_logs', routing_key=binding_key
)
# Start listening the queue with name 'task_queue'
await channel.basic_consume(declare_ok.queue, on_message)
loop = asyncio.get_event_loop()
loop.create_task(main())
# we enter a never-ending loop that waits for
# data and runs callbacks whenever necessary.
print(" [*] Waiting for messages. To exit press CTRL+C")
loop.run_forever()
Remote procedure call (RPC)
---------------------------
RPC server
**********
.. code-block:: python
import asyncio
import aiormq
import aiormq.abc
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
async def on_message(message:aiormq.abc.DeliveredMessage):
n = int(message.body.decode())
print(f" [.] fib({n})")
response = str(fib(n)).encode()
await message.channel.basic_publish(
response, routing_key=message.header.properties.reply_to,
properties=aiormq.spec.Basic.Properties(
correlation_id=message.header.properties.correlation_id
),
)
await message.channel.basic_ack(message.delivery.delivery_tag)
print('Request complete')
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
# Declaring queue
declare_ok = await channel.queue_declare('rpc_queue')
# Start listening the queue with name 'hello'
await channel.basic_consume(declare_ok.queue, on_message)
loop = asyncio.get_event_loop()
loop.create_task(main())
# we enter a never-ending loop that waits for data
# and runs callbacks whenever necessary.
print(" [x] Awaiting RPC requests")
loop.run_forever()
RPC client
**********
.. code-block:: python
import asyncio
import uuid
import aiormq
import aiormq.abc
class FibonacciRpcClient:
def __init__(self):
self.connection = None # type: aiormq.Connection
self.channel = None # type: aiormq.Channel
self.callback_queue = ''
self.futures = {}
self.loop = loop
async def connect(self):
self.connection = await aiormq.connect("amqp://guest:guest@localhost/")
self.channel = await self.connection.channel()
declare_ok = await self.channel.queue_declare(
exclusive=True, auto_delete=True
)
await self.channel.basic_consume(declare_ok.queue, self.on_response)
self.callback_queue = declare_ok.queue
return self
async def on_response(self, message: aiormq.abc.DeliveredMessage):
future = self.futures.pop(message.header.properties.correlation_id)
future.set_result(message.body)
async def call(self, n):
correlation_id = str(uuid.uuid4())
future = loop.create_future()
self.futures[correlation_id] = future
await self.channel.basic_publish(
str(n).encode(), routing_key='rpc_queue',
properties=aiormq.spec.Basic.Properties(
content_type='text/plain',
correlation_id=correlation_id,
reply_to=self.callback_queue,
)
)
return int(await future)
async def main():
fibonacci_rpc = await FibonacciRpcClient().connect()
print(" [x] Requesting fib(30)")
response = await fibonacci_rpc.call(30)
print(r" [.] Got {response!r}")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Raw data
{
"_id": null,
"home_page": "https://github.com/mosquito/aiormq",
"name": "aiormq",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.8",
"maintainer_email": null,
"keywords": "rabbitmq, asyncio, amqp, amqp 0.9.1, driver, pamqp",
"author": "Dmitry Orlov",
"author_email": "me@mosquito.su",
"download_url": "https://files.pythonhosted.org/packages/a4/79/5397756a8782bf3d0dce392b48260c3ec81010f16bef8441ff03505dccb4/aiormq-6.8.1.tar.gz",
"platform": null,
"description": "======\nAIORMQ\n======\n\n.. image:: https://coveralls.io/repos/github/mosquito/aiormq/badge.svg?branch=master\n :target: https://coveralls.io/github/mosquito/aiormq?branch=master\n :alt: Coveralls\n\n.. image:: https://img.shields.io/pypi/status/aiormq.svg\n :target: https://github.com/mosquito/aiormq\n :alt: Status\n\n.. image:: https://github.com/mosquito/aiormq/workflows/tests/badge.svg\n :target: https://github.com/mosquito/aiormq/actions?query=workflow%3Atests\n :alt: Build status\n\n.. image:: https://img.shields.io/pypi/v/aiormq.svg\n :target: https://pypi.python.org/pypi/aiormq/\n :alt: Latest Version\n\n.. image:: https://img.shields.io/pypi/wheel/aiormq.svg\n :target: https://pypi.python.org/pypi/aiormq/\n\n.. image:: https://img.shields.io/pypi/pyversions/aiormq.svg\n :target: https://pypi.python.org/pypi/aiormq/\n\n.. image:: https://img.shields.io/pypi/l/aiormq.svg\n :target: https://github.com/mosquito/aiormq/blob/master/LICENSE.md\n\n\naiormq is a pure python AMQP client library.\n\n.. contents:: Table of contents\n\nStatus\n======\n\n* 3.x.x branch - Production/Stable\n* 4.x.x branch - Unstable (Experimental)\n* 5.x.x and greater is only Production/Stable releases.\n\nFeatures\n========\n\n* Connecting by URL\n\n * amqp example: **amqp://user:password@server.host/vhost**\n * secure amqp example: **amqps://user:password@server.host/vhost?cafile=ca.pem&keyfile=key.pem&certfile=cert.pem&no_verify_ssl=0**\n\n* Buffered queue for received frames\n* Only `PLAIN`_ auth mechanism support\n* `Publisher confirms`_ support\n* `Transactions`_ support\n* Channel based asynchronous locks\n\n .. note::\n AMQP 0.9.1 requires serialize sending for some frame types\n on the channel. e.g. Content body must be following after\n content header. But frames might be sent asynchronously\n on another channels.\n\n* Tracking unroutable messages\n (Use **connection.channel(on_return_raises=False)** for disabling)\n* Full SSL/TLS support, using your choice of:\n * ``amqps://`` url query parameters:\n * ``cafile=`` - string contains path to ca certificate file\n * ``capath=`` - string contains path to ca certificates\n * ``cadata=`` - base64 encoded ca certificate data\n * ``keyfile=`` - string contains path to key file\n * ``certfile=`` - string contains path to certificate file\n * ``no_verify_ssl`` - boolean disables certificates validation\n * ``context=`` `SSLContext`_ keyword argument to ``connect()``.\n* Python `type hints`_\n* Uses `pamqp`_ as an AMQP 0.9.1 frame encoder/decoder\n\n\n.. _Publisher confirms: https://www.rabbitmq.com/confirms.html\n.. _Transactions: https://www.rabbitmq.com/semantics.html\n.. _PLAIN: https://www.rabbitmq.com/authentication.html\n.. _type hints: https://docs.python.org/3/library/typing.html\n.. _pamqp: https://pypi.org/project/pamqp/\n.. _SSLContext: https://docs.python.org/3/library/ssl.html#ssl.SSLContext\n\nTutorial\n========\n\nIntroduction\n------------\n\nSimple consumer\n***************\n\n.. code-block:: python\n\n import asyncio\n import aiormq\n\n async def on_message(message):\n \"\"\"\n on_message doesn't necessarily have to be defined as async.\n Here it is to show that it's possible.\n \"\"\"\n print(f\" [x] Received message {message!r}\")\n print(f\"Message body is: {message.body!r}\")\n print(\"Before sleep!\")\n await asyncio.sleep(5) # Represents async I/O operations\n print(\"After sleep!\")\n\n\n async def main():\n # Perform connection\n connection = await aiormq.connect(\"amqp://guest:guest@localhost/\")\n\n # Creating a channel\n channel = await connection.channel()\n\n # Declaring queue\n declare_ok = await channel.queue_declare('helo')\n consume_ok = await channel.basic_consume(\n declare_ok.queue, on_message, no_ack=True\n )\n\n\n loop = asyncio.get_event_loop()\n loop.run_until_complete(main())\n loop.run_forever()\n\n\nSimple publisher\n****************\n\n.. code-block:: python\n :name: test_simple_publisher\n\n import asyncio\n from typing import Optional\n\n import aiormq\n from aiormq.abc import DeliveredMessage\n\n\n MESSAGE: Optional[DeliveredMessage] = None\n\n\n async def main():\n global MESSAGE\n\n body = b'Hello World!'\n\n # Perform connection\n connection = await aiormq.connect(\"amqp://guest:guest@localhost//\")\n\n # Creating a channel\n channel = await connection.channel()\n\n declare_ok = await channel.queue_declare(\"hello\", auto_delete=True)\n\n # Sending the message\n await channel.basic_publish(body, routing_key='hello')\n print(f\" [x] Sent {body}\")\n\n MESSAGE = await channel.basic_get(declare_ok.queue)\n print(f\" [x] Received message from {declare_ok.queue!r}\")\n\n\n loop = asyncio.get_event_loop()\n loop.run_until_complete(main())\n\n assert MESSAGE is not None\n assert MESSAGE.routing_key == \"hello\"\n assert MESSAGE.body == b'Hello World!'\n\n\nWork Queues\n-----------\n\nCreate new task\n***************\n\n.. code-block:: python\n\n import sys\n import asyncio\n import aiormq\n\n\n async def main():\n # Perform connection\n connection = await aiormq.connect(\"amqp://guest:guest@localhost/\")\n\n # Creating a channel\n channel = await connection.channel()\n\n body = b' '.join(sys.argv[1:]) or b\"Hello World!\"\n\n # Sending the message\n await channel.basic_publish(\n body,\n routing_key='task_queue',\n properties=aiormq.spec.Basic.Properties(\n delivery_mode=1,\n )\n )\n\n print(f\" [x] Sent {body!r}\")\n\n await connection.close()\n\n\n loop = asyncio.get_event_loop()\n loop.run_until_complete(main())\n\n\nSimple worker\n*************\n\n.. code-block:: python\n\n import asyncio\n import aiormq\n import aiormq.abc\n\n\n async def on_message(message: aiormq.abc.DeliveredMessage):\n print(f\" [x] Received message {message!r}\")\n print(f\" Message body is: {message.body!r}\")\n\n\n async def main():\n # Perform connection\n connection = await aiormq.connect(\"amqp://guest:guest@localhost/\")\n\n\n # Creating a channel\n channel = await connection.channel()\n await channel.basic_qos(prefetch_count=1)\n\n # Declaring queue\n declare_ok = await channel.queue_declare('task_queue', durable=True)\n\n # Start listening the queue with name 'task_queue'\n await channel.basic_consume(declare_ok.queue, on_message, no_ack=True)\n\n\n loop = asyncio.get_event_loop()\n loop.run_until_complete(main())\n\n # we enter a never-ending loop that waits for data and runs\n # callbacks whenever necessary.\n print(\" [*] Waiting for messages. To exit press CTRL+C\")\n loop.run_forever()\n\n\nPublish Subscribe\n-----------------\n\nPublisher\n*********\n\n.. code-block:: python\n\n import sys\n import asyncio\n import aiormq\n\n\n async def main():\n # Perform connection\n connection = await aiormq.connect(\"amqp://guest:guest@localhost/\")\n\n # Creating a channel\n channel = await connection.channel()\n\n await channel.exchange_declare(\n exchange='logs', exchange_type='fanout'\n )\n\n body = b' '.join(sys.argv[1:]) or b\"Hello World!\"\n\n # Sending the message\n await channel.basic_publish(\n body, routing_key='info', exchange='logs'\n )\n\n print(f\" [x] Sent {body!r}\")\n\n await connection.close()\n\n\n loop = asyncio.get_event_loop()\n loop.run_until_complete(main())\n\n\nSubscriber\n**********\n\n.. code-block:: python\n\n import asyncio\n import aiormq\n import aiormq.abc\n\n\n async def on_message(message: aiormq.abc.DeliveredMessage):\n print(f\"[x] {message.body!r}\")\n\n await message.channel.basic_ack(\n message.delivery.delivery_tag\n )\n\n\n async def main():\n # Perform connection\n connection = await aiormq.connect(\"amqp://guest:guest@localhost/\")\n\n # Creating a channel\n channel = await connection.channel()\n await channel.basic_qos(prefetch_count=1)\n\n await channel.exchange_declare(\n exchange='logs', exchange_type='fanout'\n )\n\n # Declaring queue\n declare_ok = await channel.queue_declare(exclusive=True)\n\n # Binding the queue to the exchange\n await channel.queue_bind(declare_ok.queue, 'logs')\n\n # Start listening the queue with name 'task_queue'\n await channel.basic_consume(declare_ok.queue, on_message)\n\n\n loop = asyncio.get_event_loop()\n loop.create_task(main())\n\n # we enter a never-ending loop that waits for data\n # and runs callbacks whenever necessary.\n print(' [*] Waiting for logs. To exit press CTRL+C')\n loop.run_forever()\n\n\nRouting\n-------\n\nDirect consumer\n***************\n\n.. code-block:: python\n\n import sys\n import asyncio\n import aiormq\n import aiormq.abc\n\n\n async def on_message(message: aiormq.abc.DeliveredMessage):\n print(f\" [x] {message.delivery.routing_key!r}:{message.body!r}\"\n await message.channel.basic_ack(\n message.delivery.delivery_tag\n )\n\n\n async def main():\n # Perform connection\n connection = aiormq.Connection(\"amqp://guest:guest@localhost/\")\n await connection.connect()\n\n # Creating a channel\n channel = await connection.channel()\n await channel.basic_qos(prefetch_count=1)\n\n severities = sys.argv[1:]\n\n if not severities:\n sys.stderr.write(f\"Usage: {sys.argv[0]} [info] [warning] [error]\\n\")\n sys.exit(1)\n\n # Declare an exchange\n await channel.exchange_declare(\n exchange='logs', exchange_type='direct'\n )\n\n # Declaring random queue\n declare_ok = await channel.queue_declare(durable=True, auto_delete=True)\n\n for severity in severities:\n await channel.queue_bind(\n declare_ok.queue, 'logs', routing_key=severity\n )\n\n # Start listening the random queue\n await channel.basic_consume(declare_ok.queue, on_message)\n\n\n loop = asyncio.get_event_loop()\n loop.run_until_complete(main())\n\n # we enter a never-ending loop that waits for data\n # and runs callbacks whenever necessary.\n print(\" [*] Waiting for messages. To exit press CTRL+C\")\n loop.run_forever()\n\n\nEmitter\n*******\n\n.. code-block:: python\n\n import sys\n import asyncio\n import aiormq\n\n\n async def main():\n # Perform connection\n connection = await aiormq.connect(\"amqp://guest:guest@localhost/\")\n\n # Creating a channel\n channel = await connection.channel()\n\n await channel.exchange_declare(\n exchange='logs', exchange_type='direct'\n )\n\n body = (\n b' '.join(arg.encode() for arg in sys.argv[2:])\n or\n b\"Hello World!\"\n )\n\n # Sending the message\n routing_key = sys.argv[1] if len(sys.argv) > 2 else 'info'\n\n await channel.basic_publish(\n body, exchange='logs', routing_key=routing_key,\n properties=aiormq.spec.Basic.Properties(\n delivery_mode=1\n )\n )\n\n print(f\" [x] Sent {body!r}\")\n\n await connection.close()\n\n\n loop = asyncio.get_event_loop()\n loop.run_until_complete(main())\n\nTopics\n------\n\nPublisher\n*********\n\n.. code-block:: python\n\n import sys\n import asyncio\n import aiormq\n\n\n async def main():\n # Perform connection\n connection = await aiormq.connect(\"amqp://guest:guest@localhost/\")\n\n # Creating a channel\n channel = await connection.channel()\n\n await channel.exchange_declare('topic_logs', exchange_type='topic')\n\n routing_key = (\n sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'\n )\n\n body = (\n b' '.join(arg.encode() for arg in sys.argv[2:])\n or\n b\"Hello World!\"\n )\n\n # Sending the message\n await channel.basic_publish(\n body, exchange='topic_logs', routing_key=routing_key,\n properties=aiormq.spec.Basic.Properties(\n delivery_mode=1\n )\n )\n\n print(f\" [x] Sent {body!r}\")\n\n await connection.close()\n\n\n loop = asyncio.get_event_loop()\n loop.run_until_complete(main())\n\nConsumer\n********\n\n.. code-block:: python\n\n import asyncio\n import sys\n import aiormq\n import aiormq.abc\n\n\n async def on_message(message: aiormq.abc.DeliveredMessage):\n print(f\" [x] {message.delivery.routing_key!r}:{message.body!r}\")\n await message.channel.basic_ack(\n message.delivery.delivery_tag\n )\n\n\n async def main():\n # Perform connection\n connection = await aiormq.connect(\n \"amqp://guest:guest@localhost/\", loop=loop\n )\n\n # Creating a channel\n channel = await connection.channel()\n await channel.basic_qos(prefetch_count=1)\n\n # Declare an exchange\n await channel.exchange_declare('topic_logs', exchange_type='topic')\n\n # Declaring queue\n declare_ok = await channel.queue_declare('task_queue', durable=True)\n\n binding_keys = sys.argv[1:]\n\n if not binding_keys:\n sys.stderr.write(\n f\"Usage: {sys.argv[0]} [binding_key]...\\n\"\n )\n sys.exit(1)\n\n for binding_key in binding_keys:\n await channel.queue_bind(\n declare_ok.queue, 'topic_logs', routing_key=binding_key\n )\n\n # Start listening the queue with name 'task_queue'\n await channel.basic_consume(declare_ok.queue, on_message)\n\n\n loop = asyncio.get_event_loop()\n loop.create_task(main())\n\n # we enter a never-ending loop that waits for\n # data and runs callbacks whenever necessary.\n print(\" [*] Waiting for messages. To exit press CTRL+C\")\n loop.run_forever()\n\nRemote procedure call (RPC)\n---------------------------\n\nRPC server\n**********\n\n.. code-block:: python\n\n import asyncio\n import aiormq\n import aiormq.abc\n\n\n def fib(n):\n if n == 0:\n return 0\n elif n == 1:\n return 1\n else:\n return fib(n-1) + fib(n-2)\n\n\n async def on_message(message:aiormq.abc.DeliveredMessage):\n n = int(message.body.decode())\n\n print(f\" [.] fib({n})\")\n response = str(fib(n)).encode()\n\n await message.channel.basic_publish(\n response, routing_key=message.header.properties.reply_to,\n properties=aiormq.spec.Basic.Properties(\n correlation_id=message.header.properties.correlation_id\n ),\n\n )\n\n await message.channel.basic_ack(message.delivery.delivery_tag)\n print('Request complete')\n\n\n async def main():\n # Perform connection\n connection = await aiormq.connect(\"amqp://guest:guest@localhost/\")\n\n # Creating a channel\n channel = await connection.channel()\n\n # Declaring queue\n declare_ok = await channel.queue_declare('rpc_queue')\n\n # Start listening the queue with name 'hello'\n await channel.basic_consume(declare_ok.queue, on_message)\n\n\n loop = asyncio.get_event_loop()\n loop.create_task(main())\n\n # we enter a never-ending loop that waits for data\n # and runs callbacks whenever necessary.\n print(\" [x] Awaiting RPC requests\")\n loop.run_forever()\n\n\nRPC client\n**********\n\n.. code-block:: python\n\n import asyncio\n import uuid\n import aiormq\n import aiormq.abc\n\n\n class FibonacciRpcClient:\n def __init__(self):\n self.connection = None # type: aiormq.Connection\n self.channel = None # type: aiormq.Channel\n self.callback_queue = ''\n self.futures = {}\n self.loop = loop\n\n async def connect(self):\n self.connection = await aiormq.connect(\"amqp://guest:guest@localhost/\")\n\n self.channel = await self.connection.channel()\n declare_ok = await self.channel.queue_declare(\n exclusive=True, auto_delete=True\n )\n\n await self.channel.basic_consume(declare_ok.queue, self.on_response)\n\n self.callback_queue = declare_ok.queue\n\n return self\n\n async def on_response(self, message: aiormq.abc.DeliveredMessage):\n future = self.futures.pop(message.header.properties.correlation_id)\n future.set_result(message.body)\n\n async def call(self, n):\n correlation_id = str(uuid.uuid4())\n future = loop.create_future()\n\n self.futures[correlation_id] = future\n\n await self.channel.basic_publish(\n str(n).encode(), routing_key='rpc_queue',\n properties=aiormq.spec.Basic.Properties(\n content_type='text/plain',\n correlation_id=correlation_id,\n reply_to=self.callback_queue,\n )\n )\n\n return int(await future)\n\n\n async def main():\n fibonacci_rpc = await FibonacciRpcClient().connect()\n print(\" [x] Requesting fib(30)\")\n response = await fibonacci_rpc.call(30)\n print(r\" [.] Got {response!r}\")\n\n\n loop = asyncio.get_event_loop()\n loop.run_until_complete(main())\n",
"bugtrack_url": null,
"license": "Apache-2.0",
"summary": "Pure python AMQP asynchronous client library",
"version": "6.8.1",
"project_urls": {
"Documentation": "https://github.com/mosquito/aiormq/blob/master/README.rst",
"Homepage": "https://github.com/mosquito/aiormq",
"Source": "https://github.com/mosquito/aiormq",
"Tracker": "https://github.com/mosquito/aiormq/issues"
},
"split_keywords": [
"rabbitmq",
" asyncio",
" amqp",
" amqp 0.9.1",
" driver",
" pamqp"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "2ebe1a613ae1564426f86650ff58c351902895aa969f7e537e74bfd568f5c8bf",
"md5": "e693c0c56c8de34de1c0566479b44838",
"sha256": "5da896c8624193708f9409ffad0b20395010e2747f22aa4150593837f40aa017"
},
"downloads": -1,
"filename": "aiormq-6.8.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "e693c0c56c8de34de1c0566479b44838",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.8",
"size": 31174,
"upload_time": "2024-09-04T11:16:37",
"upload_time_iso_8601": "2024-09-04T11:16:37.238466Z",
"url": "https://files.pythonhosted.org/packages/2e/be/1a613ae1564426f86650ff58c351902895aa969f7e537e74bfd568f5c8bf/aiormq-6.8.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "a4795397756a8782bf3d0dce392b48260c3ec81010f16bef8441ff03505dccb4",
"md5": "894abdfed42a33ddcb128afa988a35f5",
"sha256": "a964ab09634be1da1f9298ce225b310859763d5cf83ef3a7eae1a6dc6bd1da1a"
},
"downloads": -1,
"filename": "aiormq-6.8.1.tar.gz",
"has_sig": false,
"md5_digest": "894abdfed42a33ddcb128afa988a35f5",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.8",
"size": 30528,
"upload_time": "2024-09-04T11:16:38",
"upload_time_iso_8601": "2024-09-04T11:16:38.655813Z",
"url": "https://files.pythonhosted.org/packages/a4/79/5397756a8782bf3d0dce392b48260c3ec81010f16bef8441ff03505dccb4/aiormq-6.8.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-09-04 11:16:38",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "mosquito",
"github_project": "aiormq",
"travis_ci": false,
"coveralls": true,
"github_actions": true,
"lcname": "aiormq"
}