ramqp


Nameramqp JSON
Version 9.2.0 PyPI version JSON
download
home_pagehttps://magenta.dk/
SummaryRammearkitektur AMQP library (aio_pika wrapper)
upload_time2024-03-20 16:36:55
maintainerNone
docs_urlNone
authorMagenta ApS
requires_python<4.0,>=3.10
licenseMPL-2.0
keywords os2mo amqp
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            <!--
SPDX-FileCopyrightText: 2021 Magenta ApS <https://magenta.dk>
SPDX-License-Identifier: MPL-2.0
-->

# Rammearkitektur AMQP
Rammearkitektur AMQP (RAMQP) is an opinionated library for AMQP.

It is implemented as a thin wrapper around `aio_pika`, with a generic and a MO
specific AMQPSystem abstract, the MO abstraction being implementing using a thin
wrapper around the generic abstraction.

## Usage

### Generic
Receiving:

```python
import asyncio

from ramqp import AMQPSystem
from ramqp import Router
from ramqp.config import AMQPConnectionSettings
from ramqp.depends import RoutingKey

router = Router()


# Configure the callback function to receive messages for the two routing keys.
# If an exception is thrown from the function, the message is not acknowledged.
# Thus, it will be retried immediately.
@router.register("my.routing.key")
@router.register("my.other.routing.key")
async def callback_function(routing_key: RoutingKey) -> None:
    pass


async def main() -> None:
    settings = AMQPConnectionSettings(url=..., queue_prefix="my-program")
    async with AMQPSystem(settings=settings, router=router) as amqp_system:
        await amqp_system.run_forever()


asyncio.run(main())
```


Sending:
```python
from ramqp import AMQPSystem

with AMQPSystem(...) as amqp_system:
    await amqp_system.publish_message("my.routing.key", {"key": "value"})
```

### Dependency Injection
The callback handlers support
[FastAPI dependency injection](https://fastapi.tiangolo.com/tutorial/dependencies).
This allows handlers to request exactly the data that they need, as seen with
FastAPI dependencies or PyTest fixtures. A callback may look like:
```python
from ramqp.mo import MORoutingKey
from ramqp.mo import PayloadType

async def callback(mo_routing_key: MORoutingKey, payload: PayloadType):
    ...
```

Experienced FastAPI developers might wonder how this works without the `Depends`
function. Indeed, this less verbose pattern was
[introduced in FastAPI v0.95](https://fastapi.tiangolo.com/release-notes/#0950),
and works by defining the dependency directly on the type using the `Annotated`
mechanism from [PEP593](https://peps.python.org/pep-0593/). For example:
```python
MORoutingKey = Annotated[MORoutingKey, Depends(get_routing_key)]
PayloadType = Annotated[PayloadType, Depends(get_payload_as_type(PayloadType))]
```
whereby the previous example is equivalent to
```python
async def callback(
    mo_routing_key: MORoutingKey = Depends(get_routing_key),
    payload: PayloadType = Depends(get_payload_as_type(PayloadType))
):
    ...
```
.

Reference documentation should be made available for these types in the future,
but for now they can be found mainly in `ramqp/depends.py` and `ramqp/mo.py`.


### Context
```python
import asyncio
from typing import Annotated

import httpx
from fastapi import Depends

from ramqp import AMQPSystem
from ramqp import Router
from ramqp.depends import Context
from ramqp.depends import from_context

router = Router()

async def main() -> None:
    async with httpx.AsyncClient() as client:
        context = {
            "client": client,
        }
        async with AMQPSystem(..., context=context) as amqp_system:
            await amqp_system.run_forever()


HTTPXClient = Annotated[httpx.AsyncClient, Depends(from_context("client"))]

@router.register("my.routing.key")
async def callback_function(context: Context, client: HTTPXClient) -> None:
    pass

asyncio.run(main())
```


### Settings
In most cases, `AMQPConnectionSettings` is probably initialised by being
included in the `BaseSettings` of the application using the library. The `url`
parameter of the `AMQPConnectionSettings` object can be given as a single URL
string or as individual structured fields. Consider the following:
```python
from pydantic import BaseSettings

from ramqp.config import AMQPConnectionSettings

# BaseSettings makes the entire model initialisable using environment variables
class Settings(BaseSettings):
    amqp: AMQPConnectionSettings

    class Config:
        env_nested_delimiter = "__"  # allows setting e.g. AMQP__URL__HOST=foo

settings = Settings()
```
The above would work with either multiple structured environment variables
```
AMQP__URL__SCHEME=amqp
AMQP__URL__USER=guest
AMQP__URL__PASSWORD=guest
AMQP__URL__HOST=msg_broker
AMQP__URL__PORT=5672
AMQP__URL__VHOST=os2mo
```
or a single URL definition
```
AMQP__URL=amqp://guest:guest@msg_broker:5672/os2mo
```

### MO AMQP
Receiving:

```python
import asyncio

from ramqp.config import AMQPConnectionSettings
from ramqp.mo import MOAMQPSystem
from ramqp.mo import MORouter
from ramqp.mo import MORoutingKey
from ramqp.mo import PayloadType

router = MORouter()


# Configure the callback function to receive messages for the two routing keys.
# If an exception is thrown from the function, the message is not acknowledged.
# Thus, it will be retried immediately.
@router.register("employee.address.edit")
@router.register("employee.it.create")
async def callback_function(
    mo_routing_key: MORoutingKey, payload: PayloadType
) -> None:
    pass


async def main() -> None:
    settings = AMQPConnectionSettings(url=..., queue_prefix="my-program")
    async with MOAMQPSystem(settings=settings, router=router) as amqp_system:
        await amqp_system.run_forever()


asyncio.run(main())
```

Sending:

```python
from datetime import datetime
from uuid import uuid4

from ramqp.mo import MOAMQPSystem
from ramqp.mo import PayloadType

payload = PayloadType(uuid=uuid4(), object_uuid=uuid4(), time=datetime.now())

async with MOAMQPSystem(...) as amqp_system:
    await amqp_system.publish_message("employee.address.edit", payload)
```


### Metrics
RAMQP exports a myriad of prometheus metrics via `prometheus/client_python`.

These can be exported using:
```
from prometheus_client import start_http_server

start_http_server(8000)
```
Or similar, see the promethues client library for details.


## Development

### Prerequisites
- [Poetry](https://github.com/python-poetry/poetry)

### Getting Started
1. Clone the repository:
```
git clone git@git.magenta.dk:rammearkitektur/ramqp.git
```

2. Install all dependencies:
```
poetry install
```

3. Set up pre-commit:
```
poetry run pre-commit install
```

### Running the tests
You use `poetry` and `pytest` to run the tests:

`poetry run pytest`

You can also run specific files

`poetry run pytest tests/<test_folder>/<test_file.py>`

and even use filtering with `-k`

`poetry run pytest -k "Manager"`

You can use the flags `-vx` where `v` prints the test & `x` makes the test stop if any tests fails (Verbose, X-fail)

#### Running the integration tests
To run the integration tests, an AMQP instance must be available.

If an instance is already available, it can be used by configuring the `AMQP_URL`
environmental variable. Alternatively a RabbitMQ can be started in docker, using:
```
docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management
```

## Versioning
This project uses [Semantic Versioning](https://semver.org/) with the following strategy:
- MAJOR: Incompatible changes to existing data models
- MINOR: Backwards compatible updates to existing data models OR new models added
- PATCH: Backwards compatible bug fixes


## Authors
Magenta ApS <https://magenta.dk>


## License
This project uses: [MPL-2.0](MPL-2.0.txt)

This project uses [REUSE](https://reuse.software) for licensing.
All licenses can be found in the [LICENSES folder](LICENSES/) of the project.

            

Raw data

            {
    "_id": null,
    "home_page": "https://magenta.dk/",
    "name": "ramqp",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.10",
    "maintainer_email": null,
    "keywords": "os2mo, amqp",
    "author": "Magenta ApS",
    "author_email": "info@magenta.dk",
    "download_url": "https://files.pythonhosted.org/packages/84/29/be14da6f8160b05682a833c44c6d3c72b23ff8e7139c54fda8c44f920e8a/ramqp-9.2.0.tar.gz",
    "platform": null,
    "description": "<!--\nSPDX-FileCopyrightText: 2021 Magenta ApS <https://magenta.dk>\nSPDX-License-Identifier: MPL-2.0\n-->\n\n# Rammearkitektur AMQP\nRammearkitektur AMQP (RAMQP) is an opinionated library for AMQP.\n\nIt is implemented as a thin wrapper around `aio_pika`, with a generic and a MO\nspecific AMQPSystem abstract, the MO abstraction being implementing using a thin\nwrapper around the generic abstraction.\n\n## Usage\n\n### Generic\nReceiving:\n\n```python\nimport asyncio\n\nfrom ramqp import AMQPSystem\nfrom ramqp import Router\nfrom ramqp.config import AMQPConnectionSettings\nfrom ramqp.depends import RoutingKey\n\nrouter = Router()\n\n\n# Configure the callback function to receive messages for the two routing keys.\n# If an exception is thrown from the function, the message is not acknowledged.\n# Thus, it will be retried immediately.\n@router.register(\"my.routing.key\")\n@router.register(\"my.other.routing.key\")\nasync def callback_function(routing_key: RoutingKey) -> None:\n    pass\n\n\nasync def main() -> None:\n    settings = AMQPConnectionSettings(url=..., queue_prefix=\"my-program\")\n    async with AMQPSystem(settings=settings, router=router) as amqp_system:\n        await amqp_system.run_forever()\n\n\nasyncio.run(main())\n```\n\n\nSending:\n```python\nfrom ramqp import AMQPSystem\n\nwith AMQPSystem(...) as amqp_system:\n    await amqp_system.publish_message(\"my.routing.key\", {\"key\": \"value\"})\n```\n\n### Dependency Injection\nThe callback handlers support\n[FastAPI dependency injection](https://fastapi.tiangolo.com/tutorial/dependencies).\nThis allows handlers to request exactly the data that they need, as seen with\nFastAPI dependencies or PyTest fixtures. A callback may look like:\n```python\nfrom ramqp.mo import MORoutingKey\nfrom ramqp.mo import PayloadType\n\nasync def callback(mo_routing_key: MORoutingKey, payload: PayloadType):\n    ...\n```\n\nExperienced FastAPI developers might wonder how this works without the `Depends`\nfunction. Indeed, this less verbose pattern was\n[introduced in FastAPI v0.95](https://fastapi.tiangolo.com/release-notes/#0950),\nand works by defining the dependency directly on the type using the `Annotated`\nmechanism from [PEP593](https://peps.python.org/pep-0593/). For example:\n```python\nMORoutingKey = Annotated[MORoutingKey, Depends(get_routing_key)]\nPayloadType = Annotated[PayloadType, Depends(get_payload_as_type(PayloadType))]\n```\nwhereby the previous example is equivalent to\n```python\nasync def callback(\n    mo_routing_key: MORoutingKey = Depends(get_routing_key),\n    payload: PayloadType = Depends(get_payload_as_type(PayloadType))\n):\n    ...\n```\n.\n\nReference documentation should be made available for these types in the future,\nbut for now they can be found mainly in `ramqp/depends.py` and `ramqp/mo.py`.\n\n\n### Context\n```python\nimport asyncio\nfrom typing import Annotated\n\nimport httpx\nfrom fastapi import Depends\n\nfrom ramqp import AMQPSystem\nfrom ramqp import Router\nfrom ramqp.depends import Context\nfrom ramqp.depends import from_context\n\nrouter = Router()\n\nasync def main() -> None:\n    async with httpx.AsyncClient() as client:\n        context = {\n            \"client\": client,\n        }\n        async with AMQPSystem(..., context=context) as amqp_system:\n            await amqp_system.run_forever()\n\n\nHTTPXClient = Annotated[httpx.AsyncClient, Depends(from_context(\"client\"))]\n\n@router.register(\"my.routing.key\")\nasync def callback_function(context: Context, client: HTTPXClient) -> None:\n    pass\n\nasyncio.run(main())\n```\n\n\n### Settings\nIn most cases, `AMQPConnectionSettings` is probably initialised by being\nincluded in the `BaseSettings` of the application using the library. The `url`\nparameter of the `AMQPConnectionSettings` object can be given as a single URL\nstring or as individual structured fields. Consider the following:\n```python\nfrom pydantic import BaseSettings\n\nfrom ramqp.config import AMQPConnectionSettings\n\n# BaseSettings makes the entire model initialisable using environment variables\nclass Settings(BaseSettings):\n    amqp: AMQPConnectionSettings\n\n    class Config:\n        env_nested_delimiter = \"__\"  # allows setting e.g. AMQP__URL__HOST=foo\n\nsettings = Settings()\n```\nThe above would work with either multiple structured environment variables\n```\nAMQP__URL__SCHEME=amqp\nAMQP__URL__USER=guest\nAMQP__URL__PASSWORD=guest\nAMQP__URL__HOST=msg_broker\nAMQP__URL__PORT=5672\nAMQP__URL__VHOST=os2mo\n```\nor a single URL definition\n```\nAMQP__URL=amqp://guest:guest@msg_broker:5672/os2mo\n```\n\n### MO AMQP\nReceiving:\n\n```python\nimport asyncio\n\nfrom ramqp.config import AMQPConnectionSettings\nfrom ramqp.mo import MOAMQPSystem\nfrom ramqp.mo import MORouter\nfrom ramqp.mo import MORoutingKey\nfrom ramqp.mo import PayloadType\n\nrouter = MORouter()\n\n\n# Configure the callback function to receive messages for the two routing keys.\n# If an exception is thrown from the function, the message is not acknowledged.\n# Thus, it will be retried immediately.\n@router.register(\"employee.address.edit\")\n@router.register(\"employee.it.create\")\nasync def callback_function(\n    mo_routing_key: MORoutingKey, payload: PayloadType\n) -> None:\n    pass\n\n\nasync def main() -> None:\n    settings = AMQPConnectionSettings(url=..., queue_prefix=\"my-program\")\n    async with MOAMQPSystem(settings=settings, router=router) as amqp_system:\n        await amqp_system.run_forever()\n\n\nasyncio.run(main())\n```\n\nSending:\n\n```python\nfrom datetime import datetime\nfrom uuid import uuid4\n\nfrom ramqp.mo import MOAMQPSystem\nfrom ramqp.mo import PayloadType\n\npayload = PayloadType(uuid=uuid4(), object_uuid=uuid4(), time=datetime.now())\n\nasync with MOAMQPSystem(...) as amqp_system:\n    await amqp_system.publish_message(\"employee.address.edit\", payload)\n```\n\n\n### Metrics\nRAMQP exports a myriad of prometheus metrics via `prometheus/client_python`.\n\nThese can be exported using:\n```\nfrom prometheus_client import start_http_server\n\nstart_http_server(8000)\n```\nOr similar, see the promethues client library for details.\n\n\n## Development\n\n### Prerequisites\n- [Poetry](https://github.com/python-poetry/poetry)\n\n### Getting Started\n1. Clone the repository:\n```\ngit clone git@git.magenta.dk:rammearkitektur/ramqp.git\n```\n\n2. Install all dependencies:\n```\npoetry install\n```\n\n3. Set up pre-commit:\n```\npoetry run pre-commit install\n```\n\n### Running the tests\nYou use `poetry` and `pytest` to run the tests:\n\n`poetry run pytest`\n\nYou can also run specific files\n\n`poetry run pytest tests/<test_folder>/<test_file.py>`\n\nand even use filtering with `-k`\n\n`poetry run pytest -k \"Manager\"`\n\nYou can use the flags `-vx` where `v` prints the test & `x` makes the test stop if any tests fails (Verbose, X-fail)\n\n#### Running the integration tests\nTo run the integration tests, an AMQP instance must be available.\n\nIf an instance is already available, it can be used by configuring the `AMQP_URL`\nenvironmental variable. Alternatively a RabbitMQ can be started in docker, using:\n```\ndocker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management\n```\n\n## Versioning\nThis project uses [Semantic Versioning](https://semver.org/) with the following strategy:\n- MAJOR: Incompatible changes to existing data models\n- MINOR: Backwards compatible updates to existing data models OR new models added\n- PATCH: Backwards compatible bug fixes\n\n\n## Authors\nMagenta ApS <https://magenta.dk>\n\n\n## License\nThis project uses: [MPL-2.0](MPL-2.0.txt)\n\nThis project uses [REUSE](https://reuse.software) for licensing.\nAll licenses can be found in the [LICENSES folder](LICENSES/) of the project.\n",
    "bugtrack_url": null,
    "license": "MPL-2.0",
    "summary": "Rammearkitektur AMQP library (aio_pika wrapper)",
    "version": "9.2.0",
    "project_urls": {
        "Homepage": "https://magenta.dk/",
        "Repository": "https://git.magenta.dk/rammearkitektur/ramqp"
    },
    "split_keywords": [
        "os2mo",
        " amqp"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "4b74866a15653c9948aa83e61bdc8a74780881c64f0e8a44b8c0f0b65a0acb0f",
                "md5": "a841109e82c8cfa7c58ef6e7de466fdd",
                "sha256": "aa37f68c9cdf87377c07b591818f48425533d65828c9dc6b384028a54bbcf618"
            },
            "downloads": -1,
            "filename": "ramqp-9.2.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "a841109e82c8cfa7c58ef6e7de466fdd",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.10",
            "size": 28683,
            "upload_time": "2024-03-20T16:36:53",
            "upload_time_iso_8601": "2024-03-20T16:36:53.170792Z",
            "url": "https://files.pythonhosted.org/packages/4b/74/866a15653c9948aa83e61bdc8a74780881c64f0e8a44b8c0f0b65a0acb0f/ramqp-9.2.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "8429be14da6f8160b05682a833c44c6d3c72b23ff8e7139c54fda8c44f920e8a",
                "md5": "07db21abeafab6fe595b5b356e764ae1",
                "sha256": "944db755f0683367a7846998fe87ee5429a74d79c5a78838969fc1233b0c6aaf"
            },
            "downloads": -1,
            "filename": "ramqp-9.2.0.tar.gz",
            "has_sig": false,
            "md5_digest": "07db21abeafab6fe595b5b356e764ae1",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.10",
            "size": 22686,
            "upload_time": "2024-03-20T16:36:55",
            "upload_time_iso_8601": "2024-03-20T16:36:55.609514Z",
            "url": "https://files.pythonhosted.org/packages/84/29/be14da6f8160b05682a833c44c6d3c72b23ff8e7139c54fda8c44f920e8a/ramqp-9.2.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-03-20 16:36:55",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "ramqp"
}
        
Elapsed time: 0.19809s