Name | ramqp JSON |
Version |
9.2.0
JSON |
| download |
home_page | https://magenta.dk/ |
Summary | Rammearkitektur AMQP library (aio_pika wrapper) |
upload_time | 2024-03-20 16:36:55 |
maintainer | None |
docs_url | None |
author | Magenta ApS |
requires_python | <4.0,>=3.10 |
license | MPL-2.0 |
keywords |
os2mo
amqp
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
<!--
SPDX-FileCopyrightText: 2021 Magenta ApS <https://magenta.dk>
SPDX-License-Identifier: MPL-2.0
-->
# Rammearkitektur AMQP
Rammearkitektur AMQP (RAMQP) is an opinionated library for AMQP.
It is implemented as a thin wrapper around `aio_pika`, with a generic and a MO
specific AMQPSystem abstract, the MO abstraction being implementing using a thin
wrapper around the generic abstraction.
## Usage
### Generic
Receiving:
```python
import asyncio
from ramqp import AMQPSystem
from ramqp import Router
from ramqp.config import AMQPConnectionSettings
from ramqp.depends import RoutingKey
router = Router()
# Configure the callback function to receive messages for the two routing keys.
# If an exception is thrown from the function, the message is not acknowledged.
# Thus, it will be retried immediately.
@router.register("my.routing.key")
@router.register("my.other.routing.key")
async def callback_function(routing_key: RoutingKey) -> None:
pass
async def main() -> None:
settings = AMQPConnectionSettings(url=..., queue_prefix="my-program")
async with AMQPSystem(settings=settings, router=router) as amqp_system:
await amqp_system.run_forever()
asyncio.run(main())
```
Sending:
```python
from ramqp import AMQPSystem
with AMQPSystem(...) as amqp_system:
await amqp_system.publish_message("my.routing.key", {"key": "value"})
```
### Dependency Injection
The callback handlers support
[FastAPI dependency injection](https://fastapi.tiangolo.com/tutorial/dependencies).
This allows handlers to request exactly the data that they need, as seen with
FastAPI dependencies or PyTest fixtures. A callback may look like:
```python
from ramqp.mo import MORoutingKey
from ramqp.mo import PayloadType
async def callback(mo_routing_key: MORoutingKey, payload: PayloadType):
...
```
Experienced FastAPI developers might wonder how this works without the `Depends`
function. Indeed, this less verbose pattern was
[introduced in FastAPI v0.95](https://fastapi.tiangolo.com/release-notes/#0950),
and works by defining the dependency directly on the type using the `Annotated`
mechanism from [PEP593](https://peps.python.org/pep-0593/). For example:
```python
MORoutingKey = Annotated[MORoutingKey, Depends(get_routing_key)]
PayloadType = Annotated[PayloadType, Depends(get_payload_as_type(PayloadType))]
```
whereby the previous example is equivalent to
```python
async def callback(
mo_routing_key: MORoutingKey = Depends(get_routing_key),
payload: PayloadType = Depends(get_payload_as_type(PayloadType))
):
...
```
.
Reference documentation should be made available for these types in the future,
but for now they can be found mainly in `ramqp/depends.py` and `ramqp/mo.py`.
### Context
```python
import asyncio
from typing import Annotated
import httpx
from fastapi import Depends
from ramqp import AMQPSystem
from ramqp import Router
from ramqp.depends import Context
from ramqp.depends import from_context
router = Router()
async def main() -> None:
async with httpx.AsyncClient() as client:
context = {
"client": client,
}
async with AMQPSystem(..., context=context) as amqp_system:
await amqp_system.run_forever()
HTTPXClient = Annotated[httpx.AsyncClient, Depends(from_context("client"))]
@router.register("my.routing.key")
async def callback_function(context: Context, client: HTTPXClient) -> None:
pass
asyncio.run(main())
```
### Settings
In most cases, `AMQPConnectionSettings` is probably initialised by being
included in the `BaseSettings` of the application using the library. The `url`
parameter of the `AMQPConnectionSettings` object can be given as a single URL
string or as individual structured fields. Consider the following:
```python
from pydantic import BaseSettings
from ramqp.config import AMQPConnectionSettings
# BaseSettings makes the entire model initialisable using environment variables
class Settings(BaseSettings):
amqp: AMQPConnectionSettings
class Config:
env_nested_delimiter = "__" # allows setting e.g. AMQP__URL__HOST=foo
settings = Settings()
```
The above would work with either multiple structured environment variables
```
AMQP__URL__SCHEME=amqp
AMQP__URL__USER=guest
AMQP__URL__PASSWORD=guest
AMQP__URL__HOST=msg_broker
AMQP__URL__PORT=5672
AMQP__URL__VHOST=os2mo
```
or a single URL definition
```
AMQP__URL=amqp://guest:guest@msg_broker:5672/os2mo
```
### MO AMQP
Receiving:
```python
import asyncio
from ramqp.config import AMQPConnectionSettings
from ramqp.mo import MOAMQPSystem
from ramqp.mo import MORouter
from ramqp.mo import MORoutingKey
from ramqp.mo import PayloadType
router = MORouter()
# Configure the callback function to receive messages for the two routing keys.
# If an exception is thrown from the function, the message is not acknowledged.
# Thus, it will be retried immediately.
@router.register("employee.address.edit")
@router.register("employee.it.create")
async def callback_function(
mo_routing_key: MORoutingKey, payload: PayloadType
) -> None:
pass
async def main() -> None:
settings = AMQPConnectionSettings(url=..., queue_prefix="my-program")
async with MOAMQPSystem(settings=settings, router=router) as amqp_system:
await amqp_system.run_forever()
asyncio.run(main())
```
Sending:
```python
from datetime import datetime
from uuid import uuid4
from ramqp.mo import MOAMQPSystem
from ramqp.mo import PayloadType
payload = PayloadType(uuid=uuid4(), object_uuid=uuid4(), time=datetime.now())
async with MOAMQPSystem(...) as amqp_system:
await amqp_system.publish_message("employee.address.edit", payload)
```
### Metrics
RAMQP exports a myriad of prometheus metrics via `prometheus/client_python`.
These can be exported using:
```
from prometheus_client import start_http_server
start_http_server(8000)
```
Or similar, see the promethues client library for details.
## Development
### Prerequisites
- [Poetry](https://github.com/python-poetry/poetry)
### Getting Started
1. Clone the repository:
```
git clone git@git.magenta.dk:rammearkitektur/ramqp.git
```
2. Install all dependencies:
```
poetry install
```
3. Set up pre-commit:
```
poetry run pre-commit install
```
### Running the tests
You use `poetry` and `pytest` to run the tests:
`poetry run pytest`
You can also run specific files
`poetry run pytest tests/<test_folder>/<test_file.py>`
and even use filtering with `-k`
`poetry run pytest -k "Manager"`
You can use the flags `-vx` where `v` prints the test & `x` makes the test stop if any tests fails (Verbose, X-fail)
#### Running the integration tests
To run the integration tests, an AMQP instance must be available.
If an instance is already available, it can be used by configuring the `AMQP_URL`
environmental variable. Alternatively a RabbitMQ can be started in docker, using:
```
docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management
```
## Versioning
This project uses [Semantic Versioning](https://semver.org/) with the following strategy:
- MAJOR: Incompatible changes to existing data models
- MINOR: Backwards compatible updates to existing data models OR new models added
- PATCH: Backwards compatible bug fixes
## Authors
Magenta ApS <https://magenta.dk>
## License
This project uses: [MPL-2.0](MPL-2.0.txt)
This project uses [REUSE](https://reuse.software) for licensing.
All licenses can be found in the [LICENSES folder](LICENSES/) of the project.
Raw data
{
"_id": null,
"home_page": "https://magenta.dk/",
"name": "ramqp",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.10",
"maintainer_email": null,
"keywords": "os2mo, amqp",
"author": "Magenta ApS",
"author_email": "info@magenta.dk",
"download_url": "https://files.pythonhosted.org/packages/84/29/be14da6f8160b05682a833c44c6d3c72b23ff8e7139c54fda8c44f920e8a/ramqp-9.2.0.tar.gz",
"platform": null,
"description": "<!--\nSPDX-FileCopyrightText: 2021 Magenta ApS <https://magenta.dk>\nSPDX-License-Identifier: MPL-2.0\n-->\n\n# Rammearkitektur AMQP\nRammearkitektur AMQP (RAMQP) is an opinionated library for AMQP.\n\nIt is implemented as a thin wrapper around `aio_pika`, with a generic and a MO\nspecific AMQPSystem abstract, the MO abstraction being implementing using a thin\nwrapper around the generic abstraction.\n\n## Usage\n\n### Generic\nReceiving:\n\n```python\nimport asyncio\n\nfrom ramqp import AMQPSystem\nfrom ramqp import Router\nfrom ramqp.config import AMQPConnectionSettings\nfrom ramqp.depends import RoutingKey\n\nrouter = Router()\n\n\n# Configure the callback function to receive messages for the two routing keys.\n# If an exception is thrown from the function, the message is not acknowledged.\n# Thus, it will be retried immediately.\n@router.register(\"my.routing.key\")\n@router.register(\"my.other.routing.key\")\nasync def callback_function(routing_key: RoutingKey) -> None:\n pass\n\n\nasync def main() -> None:\n settings = AMQPConnectionSettings(url=..., queue_prefix=\"my-program\")\n async with AMQPSystem(settings=settings, router=router) as amqp_system:\n await amqp_system.run_forever()\n\n\nasyncio.run(main())\n```\n\n\nSending:\n```python\nfrom ramqp import AMQPSystem\n\nwith AMQPSystem(...) as amqp_system:\n await amqp_system.publish_message(\"my.routing.key\", {\"key\": \"value\"})\n```\n\n### Dependency Injection\nThe callback handlers support\n[FastAPI dependency injection](https://fastapi.tiangolo.com/tutorial/dependencies).\nThis allows handlers to request exactly the data that they need, as seen with\nFastAPI dependencies or PyTest fixtures. A callback may look like:\n```python\nfrom ramqp.mo import MORoutingKey\nfrom ramqp.mo import PayloadType\n\nasync def callback(mo_routing_key: MORoutingKey, payload: PayloadType):\n ...\n```\n\nExperienced FastAPI developers might wonder how this works without the `Depends`\nfunction. Indeed, this less verbose pattern was\n[introduced in FastAPI v0.95](https://fastapi.tiangolo.com/release-notes/#0950),\nand works by defining the dependency directly on the type using the `Annotated`\nmechanism from [PEP593](https://peps.python.org/pep-0593/). For example:\n```python\nMORoutingKey = Annotated[MORoutingKey, Depends(get_routing_key)]\nPayloadType = Annotated[PayloadType, Depends(get_payload_as_type(PayloadType))]\n```\nwhereby the previous example is equivalent to\n```python\nasync def callback(\n mo_routing_key: MORoutingKey = Depends(get_routing_key),\n payload: PayloadType = Depends(get_payload_as_type(PayloadType))\n):\n ...\n```\n.\n\nReference documentation should be made available for these types in the future,\nbut for now they can be found mainly in `ramqp/depends.py` and `ramqp/mo.py`.\n\n\n### Context\n```python\nimport asyncio\nfrom typing import Annotated\n\nimport httpx\nfrom fastapi import Depends\n\nfrom ramqp import AMQPSystem\nfrom ramqp import Router\nfrom ramqp.depends import Context\nfrom ramqp.depends import from_context\n\nrouter = Router()\n\nasync def main() -> None:\n async with httpx.AsyncClient() as client:\n context = {\n \"client\": client,\n }\n async with AMQPSystem(..., context=context) as amqp_system:\n await amqp_system.run_forever()\n\n\nHTTPXClient = Annotated[httpx.AsyncClient, Depends(from_context(\"client\"))]\n\n@router.register(\"my.routing.key\")\nasync def callback_function(context: Context, client: HTTPXClient) -> None:\n pass\n\nasyncio.run(main())\n```\n\n\n### Settings\nIn most cases, `AMQPConnectionSettings` is probably initialised by being\nincluded in the `BaseSettings` of the application using the library. The `url`\nparameter of the `AMQPConnectionSettings` object can be given as a single URL\nstring or as individual structured fields. Consider the following:\n```python\nfrom pydantic import BaseSettings\n\nfrom ramqp.config import AMQPConnectionSettings\n\n# BaseSettings makes the entire model initialisable using environment variables\nclass Settings(BaseSettings):\n amqp: AMQPConnectionSettings\n\n class Config:\n env_nested_delimiter = \"__\" # allows setting e.g. AMQP__URL__HOST=foo\n\nsettings = Settings()\n```\nThe above would work with either multiple structured environment variables\n```\nAMQP__URL__SCHEME=amqp\nAMQP__URL__USER=guest\nAMQP__URL__PASSWORD=guest\nAMQP__URL__HOST=msg_broker\nAMQP__URL__PORT=5672\nAMQP__URL__VHOST=os2mo\n```\nor a single URL definition\n```\nAMQP__URL=amqp://guest:guest@msg_broker:5672/os2mo\n```\n\n### MO AMQP\nReceiving:\n\n```python\nimport asyncio\n\nfrom ramqp.config import AMQPConnectionSettings\nfrom ramqp.mo import MOAMQPSystem\nfrom ramqp.mo import MORouter\nfrom ramqp.mo import MORoutingKey\nfrom ramqp.mo import PayloadType\n\nrouter = MORouter()\n\n\n# Configure the callback function to receive messages for the two routing keys.\n# If an exception is thrown from the function, the message is not acknowledged.\n# Thus, it will be retried immediately.\n@router.register(\"employee.address.edit\")\n@router.register(\"employee.it.create\")\nasync def callback_function(\n mo_routing_key: MORoutingKey, payload: PayloadType\n) -> None:\n pass\n\n\nasync def main() -> None:\n settings = AMQPConnectionSettings(url=..., queue_prefix=\"my-program\")\n async with MOAMQPSystem(settings=settings, router=router) as amqp_system:\n await amqp_system.run_forever()\n\n\nasyncio.run(main())\n```\n\nSending:\n\n```python\nfrom datetime import datetime\nfrom uuid import uuid4\n\nfrom ramqp.mo import MOAMQPSystem\nfrom ramqp.mo import PayloadType\n\npayload = PayloadType(uuid=uuid4(), object_uuid=uuid4(), time=datetime.now())\n\nasync with MOAMQPSystem(...) as amqp_system:\n await amqp_system.publish_message(\"employee.address.edit\", payload)\n```\n\n\n### Metrics\nRAMQP exports a myriad of prometheus metrics via `prometheus/client_python`.\n\nThese can be exported using:\n```\nfrom prometheus_client import start_http_server\n\nstart_http_server(8000)\n```\nOr similar, see the promethues client library for details.\n\n\n## Development\n\n### Prerequisites\n- [Poetry](https://github.com/python-poetry/poetry)\n\n### Getting Started\n1. Clone the repository:\n```\ngit clone git@git.magenta.dk:rammearkitektur/ramqp.git\n```\n\n2. Install all dependencies:\n```\npoetry install\n```\n\n3. Set up pre-commit:\n```\npoetry run pre-commit install\n```\n\n### Running the tests\nYou use `poetry` and `pytest` to run the tests:\n\n`poetry run pytest`\n\nYou can also run specific files\n\n`poetry run pytest tests/<test_folder>/<test_file.py>`\n\nand even use filtering with `-k`\n\n`poetry run pytest -k \"Manager\"`\n\nYou can use the flags `-vx` where `v` prints the test & `x` makes the test stop if any tests fails (Verbose, X-fail)\n\n#### Running the integration tests\nTo run the integration tests, an AMQP instance must be available.\n\nIf an instance is already available, it can be used by configuring the `AMQP_URL`\nenvironmental variable. Alternatively a RabbitMQ can be started in docker, using:\n```\ndocker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management\n```\n\n## Versioning\nThis project uses [Semantic Versioning](https://semver.org/) with the following strategy:\n- MAJOR: Incompatible changes to existing data models\n- MINOR: Backwards compatible updates to existing data models OR new models added\n- PATCH: Backwards compatible bug fixes\n\n\n## Authors\nMagenta ApS <https://magenta.dk>\n\n\n## License\nThis project uses: [MPL-2.0](MPL-2.0.txt)\n\nThis project uses [REUSE](https://reuse.software) for licensing.\nAll licenses can be found in the [LICENSES folder](LICENSES/) of the project.\n",
"bugtrack_url": null,
"license": "MPL-2.0",
"summary": "Rammearkitektur AMQP library (aio_pika wrapper)",
"version": "9.2.0",
"project_urls": {
"Homepage": "https://magenta.dk/",
"Repository": "https://git.magenta.dk/rammearkitektur/ramqp"
},
"split_keywords": [
"os2mo",
" amqp"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "4b74866a15653c9948aa83e61bdc8a74780881c64f0e8a44b8c0f0b65a0acb0f",
"md5": "a841109e82c8cfa7c58ef6e7de466fdd",
"sha256": "aa37f68c9cdf87377c07b591818f48425533d65828c9dc6b384028a54bbcf618"
},
"downloads": -1,
"filename": "ramqp-9.2.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "a841109e82c8cfa7c58ef6e7de466fdd",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.10",
"size": 28683,
"upload_time": "2024-03-20T16:36:53",
"upload_time_iso_8601": "2024-03-20T16:36:53.170792Z",
"url": "https://files.pythonhosted.org/packages/4b/74/866a15653c9948aa83e61bdc8a74780881c64f0e8a44b8c0f0b65a0acb0f/ramqp-9.2.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "8429be14da6f8160b05682a833c44c6d3c72b23ff8e7139c54fda8c44f920e8a",
"md5": "07db21abeafab6fe595b5b356e764ae1",
"sha256": "944db755f0683367a7846998fe87ee5429a74d79c5a78838969fc1233b0c6aaf"
},
"downloads": -1,
"filename": "ramqp-9.2.0.tar.gz",
"has_sig": false,
"md5_digest": "07db21abeafab6fe595b5b356e764ae1",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.10",
"size": 22686,
"upload_time": "2024-03-20T16:36:55",
"upload_time_iso_8601": "2024-03-20T16:36:55.609514Z",
"url": "https://files.pythonhosted.org/packages/84/29/be14da6f8160b05682a833c44c6d3c72b23ff8e7139c54fda8c44f920e8a/ramqp-9.2.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-03-20 16:36:55",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "ramqp"
}