amqp-mock


Nameamqp-mock JSON
Version 0.6.2 PyPI version JSON
download
home_pagehttps://github.com/tsv1/amqp-mock
SummaryRemote AMQP mock
upload_time2024-05-12 17:00:28
maintainerNone
docs_urlNone
authorNikita Tsvetkov
requires_python>=3.8.0
licenseApache-2.0
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # AMQP Mock

[![Codecov](https://img.shields.io/codecov/c/github/tsv1/amqp-mock/master.svg?style=flat-square)](https://codecov.io/gh/tsv1/amqp-mock)
[![PyPI](https://img.shields.io/pypi/v/amqp-mock.svg?style=flat-square)](https://pypi.python.org/pypi/amqp-mock)
[![PyPI - Downloads](https://img.shields.io/pypi/dm/amqp-mock?style=flat-square)](https://pypi.python.org/pypi/amqp-mock)
[![Python Version](https://img.shields.io/pypi/pyversions/amqp-mock.svg?style=flat-square)](https://pypi.python.org/pypi/amqp-mock)

* [Installation](#installation)
* [Overview](#overview)
  * [Test Publishing](#test-publishing)
  * [Test Consuming](#test-consuming)
* [Mock Server](#mock-server)
  * [Start server](#start-server)
  * [Publish message](#publish-message)
  * [Get queue message history](#get-queue-message-history)
  * [Get exchange messages](#get-exchange-messages)
  * [Delete exchange messages](#delete-exchange-messages)
  * [Reset](#reset)

## Installation

```sh
pip3 install amqp-mock
```

## Overview

### Test Publishing

```python
from amqp_mock import create_amqp_mock

# 1. Start AMQP mock server
async with create_amqp_mock() as mock:
    # 2. Publish message via "system under test"
    publish_message([1, 2, 3], "exchange")

    # 3. Test message has been published
    messages = await mock.client.get_exchange_messages("exchange")
    assert messages[0].value == [1, 2, 3]
```

Full code available here: [`./examples/publish_example.py`](https://github.com/tsv1/amqp-mock/blob/master/examples/publish_example.py)

### Test Consuming

```python
from amqp_mock import create_amqp_mock, Message, MessageStatus

# 1. Start AMQP mock server
async with create_amqp_mock() as mock:
    # 2. Mock next message
    await mock.client.publish_message("queue", Message([1, 2, 3]))

    # 3. Consume message via "system under test"
    consume_message("queue")

    # 4. Test message has been consumed
    history = await mock.client.get_queue_message_history("queue")
    assert history[0].status == MessageStatus.ACKED
```

Full code available here: [`./examples/consume_example.py`](https://github.com/tsv1/amqp-mock/blob/master/examples/consume_example.py)

## Mock Server

### Start server

```python
import asyncio

from amqp_mock import AmqpServer, HttpServer, Storage, create_amqp_mock


async def run() -> None:
    storage = Storage()
    http_server = HttpServer(storage, port=8080)
    amqp_server = AmqpServer(storage, port=5672)
    async with create_amqp_mock(http_server, amqp_server):
        await asyncio.Future()

asyncio.run(run())
```

or via docker

```shell
docker run -p 8080:80 -p 5672:5672 tsv1/amqp-mock
```

### Publish message

`POST /queues/{queue}/messages`

```js
{
    "id": "9e342ac1-eef6-40b1-9eaf-053ee7887968",
    "value": [1, 2, 3],
    "exchange": "",
    "routing_key": "",
    "properties": null
}
```

<details><summary>HTTP</summary>
<p>

```sh
$ http POST localhost/queues/test_queue/messages \
    value:='[1, 2, 3]' \
    exchange=test_exchange

HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json
```

</p>
</details>

<details><summary>Python</summary>
<p>

```python
from amqp_mock import AmqpMockClient, Message

mock_client = AmqpMockClient()
message = Message([1, 2, 3], exchange="test_exchange")
await mock_client.publish_message("test_queue", message)
```

</p>
</details>

### Get queue message history

`GET /queues/{queue}/messages/history`

<details><summary>HTTP</summary>
<p>

```sh
$ http GET localhost/queues/test_queue/messages/history

HTTP/1.1 200 OK
Content-Length: 190
Content-Type: application/json; charset=utf-8

[
    {
        "message": {
            "exchange": "test_exchange",
            "id": "94459a41-9119-479a-98c9-80bc9dabb719",
            "properties": null,
            "routing_key": "",
            "value": [1, 2, 3]
        },
        "queue": "test_queue",
        "status": "ACKED"
    }
]
```

</p>
</details>

<details><summary>Python</summary>
<p>

```python
from amqp_mock import AmqpMockClient

mock_client = AmqpMockClient()
await mock_client.get_queue_message_history("test_queue")
# [
#   <QueuedMessage message=<Message value=[1, 2, 3], exchange='test_exchange', routing_key=''>,
#                  queue='test_queue',
#                  status=MessageStatus.ACKED>
# ]
```

</p>
</details>

### Get exchange messages

`GET /exchanges/{exchange}/messages`

<details><summary>HTTP</summary>
<p>

```sh
$ http GET localhost/exchanges/test_exchange/messages

HTTP/1.1 200 OK
Content-Length: 423
Content-Type: application/json; charset=utf-8

[
    {
        "exchange": "test_exchange",
        "id": "63fd1646-bdc1-4baa-9780-e337a9ab109c",
        "properties": {
            "app_id": "",
            "cluster_id": "",
            "content_encoding": "",
            "content_type": "",
            "correlation_id": "",
            "delivery_mode": 1,
            "expiration": "",
            "headers": null,
            "message_id": "5ec9024c74eca2e419fd7e29f7be846c",
            "message_type": "",
            "priority": null,
            "reply_to": "",
            "timestamp": null,
            "user_id": ""
        },
        "routing_key": "",
        "value": [1, 2, 3]
    }
]
```

</p>
</details>

<details><summary>Python</summary>
<p>

```python
from amqp_mock import AmqpMockClient

mock_client = AmqpMockClient()
messages = await mock_client.get_exchange_messages("test_exchange")
# [
#   <Message value=[1, 2, 3], exchange='test_exchange', routing_key=''>
# ]
```

</p>
</details>

### Delete exchange messages

`DELETE /exchanges/{exchange}/messages`

<details><summary>HTTP</summary>
<p>

```sh
$ http DELETE localhost/exchanges/test_exchange/messages

HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json
```

</p>
</details>

<details><summary>Python</summary>
<p>

```python
from amqp_mock import AmqpMockClient

mock_client = AmqpMockClient()
await mock_client.delete_exchange_messages("test_exchange")
```

</p>
</details>

### Reset

`DELETE /`

<details><summary>HTTP</summary>
<p>

```sh
$ http DELETE localhost/

HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json
```

</p>
</details>

<details><summary>Python</summary>
<p>

```python
from amqp_mock import AmqpMockClient

mock_client = AmqpMockClient()
await mock_client.reset()
```

</p>
</details>

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/tsv1/amqp-mock",
    "name": "amqp-mock",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8.0",
    "maintainer_email": null,
    "keywords": null,
    "author": "Nikita Tsvetkov",
    "author_email": "tsv1@fastmail.com",
    "download_url": "https://files.pythonhosted.org/packages/05/8e/8eef013c56efeab660c289eae69d65918f5dfe1282425bf554514ee0955e/amqp_mock-0.6.2.tar.gz",
    "platform": null,
    "description": "# AMQP Mock\n\n[![Codecov](https://img.shields.io/codecov/c/github/tsv1/amqp-mock/master.svg?style=flat-square)](https://codecov.io/gh/tsv1/amqp-mock)\n[![PyPI](https://img.shields.io/pypi/v/amqp-mock.svg?style=flat-square)](https://pypi.python.org/pypi/amqp-mock)\n[![PyPI - Downloads](https://img.shields.io/pypi/dm/amqp-mock?style=flat-square)](https://pypi.python.org/pypi/amqp-mock)\n[![Python Version](https://img.shields.io/pypi/pyversions/amqp-mock.svg?style=flat-square)](https://pypi.python.org/pypi/amqp-mock)\n\n* [Installation](#installation)\n* [Overview](#overview)\n  * [Test Publishing](#test-publishing)\n  * [Test Consuming](#test-consuming)\n* [Mock Server](#mock-server)\n  * [Start server](#start-server)\n  * [Publish message](#publish-message)\n  * [Get queue message history](#get-queue-message-history)\n  * [Get exchange messages](#get-exchange-messages)\n  * [Delete exchange messages](#delete-exchange-messages)\n  * [Reset](#reset)\n\n## Installation\n\n```sh\npip3 install amqp-mock\n```\n\n## Overview\n\n### Test Publishing\n\n```python\nfrom amqp_mock import create_amqp_mock\n\n# 1. Start AMQP mock server\nasync with create_amqp_mock() as mock:\n    # 2. Publish message via \"system under test\"\n    publish_message([1, 2, 3], \"exchange\")\n\n    # 3. Test message has been published\n    messages = await mock.client.get_exchange_messages(\"exchange\")\n    assert messages[0].value == [1, 2, 3]\n```\n\nFull code available here: [`./examples/publish_example.py`](https://github.com/tsv1/amqp-mock/blob/master/examples/publish_example.py)\n\n### Test Consuming\n\n```python\nfrom amqp_mock import create_amqp_mock, Message, MessageStatus\n\n# 1. Start AMQP mock server\nasync with create_amqp_mock() as mock:\n    # 2. Mock next message\n    await mock.client.publish_message(\"queue\", Message([1, 2, 3]))\n\n    # 3. Consume message via \"system under test\"\n    consume_message(\"queue\")\n\n    # 4. Test message has been consumed\n    history = await mock.client.get_queue_message_history(\"queue\")\n    assert history[0].status == MessageStatus.ACKED\n```\n\nFull code available here: [`./examples/consume_example.py`](https://github.com/tsv1/amqp-mock/blob/master/examples/consume_example.py)\n\n## Mock Server\n\n### Start server\n\n```python\nimport asyncio\n\nfrom amqp_mock import AmqpServer, HttpServer, Storage, create_amqp_mock\n\n\nasync def run() -> None:\n    storage = Storage()\n    http_server = HttpServer(storage, port=8080)\n    amqp_server = AmqpServer(storage, port=5672)\n    async with create_amqp_mock(http_server, amqp_server):\n        await asyncio.Future()\n\nasyncio.run(run())\n```\n\nor via docker\n\n```shell\ndocker run -p 8080:80 -p 5672:5672 tsv1/amqp-mock\n```\n\n### Publish message\n\n`POST /queues/{queue}/messages`\n\n```js\n{\n    \"id\": \"9e342ac1-eef6-40b1-9eaf-053ee7887968\",\n    \"value\": [1, 2, 3],\n    \"exchange\": \"\",\n    \"routing_key\": \"\",\n    \"properties\": null\n}\n```\n\n<details><summary>HTTP</summary>\n<p>\n\n```sh\n$ http POST localhost/queues/test_queue/messages \\\n    value:='[1, 2, 3]' \\\n    exchange=test_exchange\n\nHTTP/1.1 200 OK\nContent-Length: 0\nContent-Type: application/json\n```\n\n</p>\n</details>\n\n<details><summary>Python</summary>\n<p>\n\n```python\nfrom amqp_mock import AmqpMockClient, Message\n\nmock_client = AmqpMockClient()\nmessage = Message([1, 2, 3], exchange=\"test_exchange\")\nawait mock_client.publish_message(\"test_queue\", message)\n```\n\n</p>\n</details>\n\n### Get queue message history\n\n`GET /queues/{queue}/messages/history`\n\n<details><summary>HTTP</summary>\n<p>\n\n```sh\n$ http GET localhost/queues/test_queue/messages/history\n\nHTTP/1.1 200 OK\nContent-Length: 190\nContent-Type: application/json; charset=utf-8\n\n[\n    {\n        \"message\": {\n            \"exchange\": \"test_exchange\",\n            \"id\": \"94459a41-9119-479a-98c9-80bc9dabb719\",\n            \"properties\": null,\n            \"routing_key\": \"\",\n            \"value\": [1, 2, 3]\n        },\n        \"queue\": \"test_queue\",\n        \"status\": \"ACKED\"\n    }\n]\n```\n\n</p>\n</details>\n\n<details><summary>Python</summary>\n<p>\n\n```python\nfrom amqp_mock import AmqpMockClient\n\nmock_client = AmqpMockClient()\nawait mock_client.get_queue_message_history(\"test_queue\")\n# [\n#   <QueuedMessage message=<Message value=[1, 2, 3], exchange='test_exchange', routing_key=''>,\n#                  queue='test_queue',\n#                  status=MessageStatus.ACKED>\n# ]\n```\n\n</p>\n</details>\n\n### Get exchange messages\n\n`GET /exchanges/{exchange}/messages`\n\n<details><summary>HTTP</summary>\n<p>\n\n```sh\n$ http GET localhost/exchanges/test_exchange/messages\n\nHTTP/1.1 200 OK\nContent-Length: 423\nContent-Type: application/json; charset=utf-8\n\n[\n    {\n        \"exchange\": \"test_exchange\",\n        \"id\": \"63fd1646-bdc1-4baa-9780-e337a9ab109c\",\n        \"properties\": {\n            \"app_id\": \"\",\n            \"cluster_id\": \"\",\n            \"content_encoding\": \"\",\n            \"content_type\": \"\",\n            \"correlation_id\": \"\",\n            \"delivery_mode\": 1,\n            \"expiration\": \"\",\n            \"headers\": null,\n            \"message_id\": \"5ec9024c74eca2e419fd7e29f7be846c\",\n            \"message_type\": \"\",\n            \"priority\": null,\n            \"reply_to\": \"\",\n            \"timestamp\": null,\n            \"user_id\": \"\"\n        },\n        \"routing_key\": \"\",\n        \"value\": [1, 2, 3]\n    }\n]\n```\n\n</p>\n</details>\n\n<details><summary>Python</summary>\n<p>\n\n```python\nfrom amqp_mock import AmqpMockClient\n\nmock_client = AmqpMockClient()\nmessages = await mock_client.get_exchange_messages(\"test_exchange\")\n# [\n#   <Message value=[1, 2, 3], exchange='test_exchange', routing_key=''>\n# ]\n```\n\n</p>\n</details>\n\n### Delete exchange messages\n\n`DELETE /exchanges/{exchange}/messages`\n\n<details><summary>HTTP</summary>\n<p>\n\n```sh\n$ http DELETE localhost/exchanges/test_exchange/messages\n\nHTTP/1.1 200 OK\nContent-Length: 0\nContent-Type: application/json\n```\n\n</p>\n</details>\n\n<details><summary>Python</summary>\n<p>\n\n```python\nfrom amqp_mock import AmqpMockClient\n\nmock_client = AmqpMockClient()\nawait mock_client.delete_exchange_messages(\"test_exchange\")\n```\n\n</p>\n</details>\n\n### Reset\n\n`DELETE /`\n\n<details><summary>HTTP</summary>\n<p>\n\n```sh\n$ http DELETE localhost/\n\nHTTP/1.1 200 OK\nContent-Length: 0\nContent-Type: application/json\n```\n\n</p>\n</details>\n\n<details><summary>Python</summary>\n<p>\n\n```python\nfrom amqp_mock import AmqpMockClient\n\nmock_client = AmqpMockClient()\nawait mock_client.reset()\n```\n\n</p>\n</details>\n",
    "bugtrack_url": null,
    "license": "Apache-2.0",
    "summary": "Remote AMQP mock",
    "version": "0.6.2",
    "project_urls": {
        "Homepage": "https://github.com/tsv1/amqp-mock"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "402db039637bb1a6b28d17e11739589e2d62b7cd959f1e81f7641552616fa13b",
                "md5": "de93b1aa7b03ece9bda1f841dab5da98",
                "sha256": "8d1f422777bdef0300da9fffba8faad02d55ada84ff09fde7423d52c5ff91a69"
            },
            "downloads": -1,
            "filename": "amqp_mock-0.6.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "de93b1aa7b03ece9bda1f841dab5da98",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8.0",
            "size": 18670,
            "upload_time": "2024-05-12T17:00:25",
            "upload_time_iso_8601": "2024-05-12T17:00:25.804534Z",
            "url": "https://files.pythonhosted.org/packages/40/2d/b039637bb1a6b28d17e11739589e2d62b7cd959f1e81f7641552616fa13b/amqp_mock-0.6.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "058e8eef013c56efeab660c289eae69d65918f5dfe1282425bf554514ee0955e",
                "md5": "f0a97354d3d7641ec5a1ec658524f493",
                "sha256": "d8496686a1d657f19a474dfe1fecbb12aeb071e8384c7234380c7f61cf7d85d7"
            },
            "downloads": -1,
            "filename": "amqp_mock-0.6.2.tar.gz",
            "has_sig": false,
            "md5_digest": "f0a97354d3d7641ec5a1ec658524f493",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8.0",
            "size": 19495,
            "upload_time": "2024-05-12T17:00:28",
            "upload_time_iso_8601": "2024-05-12T17:00:28.883455Z",
            "url": "https://files.pythonhosted.org/packages/05/8e/8eef013c56efeab660c289eae69d65918f5dfe1282425bf554514ee0955e/amqp_mock-0.6.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-05-12 17:00:28",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "tsv1",
    "github_project": "amqp-mock",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "requirements": [],
    "lcname": "amqp-mock"
}
        
Elapsed time: 0.29636s