Name | FastRAMQPI JSON |
Version |
10.0.5
JSON |
| download |
home_page | https://magenta.dk/ |
Summary | Rammearkitektur integrations framework |
upload_time | 2024-09-25 13:42:53 |
maintainer | None |
docs_url | None |
author | Magenta ApS |
requires_python | <4.0,>=3.11 |
license | MPL-2.0 |
keywords |
os2mo
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
<!--
SPDX-FileCopyrightText: 2021 Magenta ApS <https://magenta.dk>
SPDX-License-Identifier: MPL-2.0
-->
# FastRAMQPI
FastRAMQPI is an opinionated library for OS2mo integrations.
## Usage
```python
from typing import Any
from fastapi import APIRouter
from fastapi import FastAPI
from pydantic import BaseSettings
from pydantic import Field
from fastramqpi import depends
from fastramqpi.config import Settings as FastRAMQPISettings
from fastramqpi.main import FastRAMQPI
from fastramqpi.ramqp.depends import Context
from fastramqpi.ramqp.depends import rate_limit
from fastramqpi.ramqp.mo import MORouter
from fastramqpi.ramqp.mo import PayloadUUID
class Settings(BaseSettings):
class Config:
frozen = True
env_nested_delimiter = "__"
fastramqpi: FastRAMQPISettings = Field(
default_factory=FastRAMQPISettings, description="FastRAMQPI settings"
)
fastapi_router = APIRouter()
amqp_router = MORouter()
@amqp_router.register("engagement", dependencies=[Depends(rate_limit(10))])
async def listen_to_engagements(
context: Context,
graphql_session: depends.LegacyGraphQLSession,
uuid: PayloadUUID,
) -> None:
print(uuid)
def create_fastramqpi(**kwargs: Any) -> FastRAMQPI:
settings = Settings(**kwargs)
fastramqpi = FastRAMQPI(
application_name="os2mo-example-integration",
settings=settings.fastramqpi,
graphql_version=20,
)
fastramqpi.add_context(settings=settings)
# Add our AMQP router(s)
amqpsystem = fastramqpi.get_amqpsystem()
amqpsystem.router.registry.update(amqp_router.registry)
# Add our FastAPI router(s)
app = fastramqpi.get_app()
app.include_router(fastapi_router)
return fastramqpi
def create_app(**kwargs: Any) -> FastAPI:
fastramqpi = create_fastramqpi(**kwargs)
return fastramqpi.get_app()
```
### Metrics
FastRAMQPI Metrics are exported via `prometheus/client_python` on the FastAPI's `/metrics`.
### Debugging
FastRAMQPI ships with support for debugging via [DAP](https://microsoft.github.io/debug-adapter-protocol/).
To enable it set the `DAP` environmental variable to true, and expose the debugging port (5678).
For instance in a `docker-compose.yaml` file, by merging in:
```yaml
version: "3"
services:
mo_ldap_import_export:
environment:
DAP: "true"
ports:
- "127.0.0.0:5678:5678"
```
For ease of use, this should be the default for projects using FastRAMQPI.
## Autogenerated GraphQL Client
FastRAMQPI exposes an
[authenticated httpx client](https://docs.authlib.org/en/latest/client/api.html#authlib.integrations.httpx_client.AsyncOAuth2Client)
through the dependency injection system. While it is possible to call the OS2mo
API directly through it, the recommended approach is to define a properly-typed
GraphQL client in the integration and configure it to make calls through the
authenticated client. Instead of manually implementing such client, we strongly
recommend to use the
[**Ariadne Code Generator**](https://github.com/mirumee/ariadne-codegen), which
generates an integration-specific client based on the general OS2mo GraphQL
schema and the exact queries and mutations the integration requires.
To integrate such client, first add and configure the codegen:
```toml
# pyproject.toml
[tool.poetry.group.dev.dependencies]
ariadne-codegen = "0.13.0"
[tool.ariadne-codegen]
# Ideally, the GraphQL client is generated as part of the build process and
# never committed to git. Unfortunately, most of our tools and CI analyses the
# project directly as it is in Git. In the future - when the CI templates
# operate on the built container image - only the definition of the schema and
# queries should be checked in.
#
# The default package name is `graphql_client`. Make it more obvious that the
# files are not to be modified manually.
target_package_name = "autogenerated_graphql_client"
target_package_path = "my_integration/"
client_name = "GraphQLClient"
schema_path = "schema.graphql" # curl -O http://localhost:5000/graphql/v8/schema.graphql
queries_path = "queries.graphql"
include_all_inputs = false
include_all_enums = false
plugins = [
# Return values directly when only a single top field is requested
"ariadne_codegen.contrib.shorter_results.ShorterResultsPlugin",
]
[tool.ariadne-codegen.scalars.DateTime]
type = "datetime.datetime"
parse = "fastramqpi.ariadne.parse_graphql_datetime"
[tool.ariadne-codegen.scalars.UUID]
type = "uuid.UUID"
```
Where you replace `"my_integration/"` with the path to your integration.
Grab OS2mo's GraphQL schema:
```bash
curl -O http://localhost:5000/graphql/v20/schema.graphql
```
Define your queries:
```gql
# queries.graphql
# SPDX-FileCopyrightText: Magenta ApS <https://magenta.dk>
# SPDX-License-Identifier: MPL-2.0
query Version {
version {
mo_version
mo_hash
}
}
```
Generate the client - you may have to activate some virtual environment:
```bash
ariadne-codegen
```
The client class is passed to FastRAMQPI on startup as shown below. This will
ensure it is automatically opened and closed and configured with
authentication. **NOTE: remember to update the `graphql_version` when you
generate against a new schema version**.
```python
# app.py
from autogenerated_graphql_client import GraphQLClient
def create_app(**kwargs: Any) -> FastAPI:
fastramqpi = FastRAMQPI(
...,
graphql_version=20,
graphql_client_cls=GraphQLClient,
)
```
The FastRAMQPI framework cannot define the annotated type for the GraphQL client
since its methods depend on the specific queries required by the integration.
Therefore, each implementing integration needs to define their own:
```python
# depends.py
from typing import Annotated
from fastapi import Depends
from fastramqpi.ramqp.depends import from_context
from my_integration.autogenerated_graphql_client import GraphQLClient as _GraphQLClient
GraphQLClient = Annotated[_GraphQLClient, Depends(from_context("graphql_client"))]
```
Finally, we can define our AMQP handler to use the GraphQL client:
```python
# events.py
from . import depends
@router.register("*")
async def handler(mo: depends.GraphQLClient) -> None:
version = await mo.version()
print(version)
```
To get REUSE working, you might consider adding the following to `.reuse/dep5`:
```text
Files: my_integration/autogenerated_graphql_client/*
Copyright: Magenta ApS <https://magenta.dk>
License: MPL-2.0
```
## Database
If your integration requires access to a database, set it up as so:
`compose.yaml`:
```yaml
services:
my_integration:
environment:
FASTRAMQPI__DATABASE__USER: "fastramqpi"
FASTRAMQPI__DATABASE__PASSWORD: "fastramqpi"
FASTRAMQPI__DATABASE__HOST: "db"
FASTRAMQPI__DATABASE__NAME: "fastramqpi"
db:
image: postgres:16
environment:
POSTGRES_USER: "fastramqpi"
POSTGRES_PASSWORD: "fastramqpi"
POSTGRES_DB: "fastramqpi"
```
`my_integration/database.py`:
```python
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "user"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
```
Due to budgetary constraints FastRAMQPI does not yet facilitate database
migrations through alembic. Instead, all defined tables are created on startup.
For technical reasons, this requires us to pass the `DeclarativeBase` from
above to the FastRAMQPI constructor in `app.py` as so:
```python
from my_integration.database import Base
fastramqpi = FastRAMQPI(
application_name="os2mo-example-integration",
settings=set``tings.fastramqpi,
...
database_metadata=Base.metadata,
)
```
The database can be used in a handler as follows:
```python
from fastramqpi import depends
from my_integration.database import User
@amqp_router.register("person")
async def add(session: depends.Session) -> None:
session.add(User(name="Alice"))
```
## Multilayer exchanges
FastRAMQPI supports a multi-layer exchange setup, where all integration queues
are bound to an intermediate exchange which itself is bound to an upstream
exchange for instance OS2mo, the below Graph shows this setup with two
integrations that each has their own intermediate exchange.
```
┌─────┐
┌──┤OS2mo├──┐
R1+R2+R3│ └─────┘ │R4+R5
│ │
┌──▼──┐ ┌──▼──┐
┌──┤LDAP ├──┐ │Omada├──┐
│ └──┬──┘ │ └──┬──┘ │
R1│ R2│ R3│ R4│ R5│
│ │ │ │ │
┌──▼──┬──▼──┬──▼──┐ ┌▼─────▼┐
│ Q1 │ Q2 │ Q3 │ │ Q4 │
└─────┴─────┴─────┘ └───────┘
```
Here `R1,...R5` are routing keys, and `Q1,...Q4` are queues. Notice that it is
still possible for a single queue to subscribe to multiple routing keys, and to
have multiple queues subscribed to a single intermediate exchange.
Compared the above model using intermediate exchanges with the model below,
that does not utilize intermediate exchanges.
```
┌─────┐
┌─────┬──┤OS2mo├──┬─────┐
│ │ └──┬──┘ │ │
R1│ R2│ R3│ R4│ R5│
│ │ │ │ │
┌──▼──┬──▼──┬──▼──┐ ┌▼─────▼┐
│ Q1 │ Q2 │ Q3 │ │ Q4 │
└─────┴─────┴─────┘ └───────┘
```
Here it is no longer obvious which queues and routing keys belong to which
integration, and as such it is no longer possible to use OS2mo's `refresh_*`
mutators to request a specific integration to run its event handlers.
Requesting a specific integration to run its event handlers, using multi-layer
exchanges with intermediate exchanges can be done using the `exchange`
parameter on the `refresh_*` mutators, by passing the name of the intermediate
exchange.
To start using intermediate exchanges in your FastRAMQPI integration, simply
set the `upstream_exchange` value on the `AMQPConnectionSettings` to the
upstream exchange you want to bind your intermediate exchange to, and set the
`exchange` value to the name you want to allocate to your intermediate exchange.
It is generally recommended to use the `upstream_exchange` name as a prefix for
the intermediate exchange set in the `exchange` value, such as:
```python
from fastramqpi.config import Settings as _FastRAMQPISettings
from fastramqpi.ramqp.config import AMQPConnectionSettings as _AMQPConnectionSettings
class AMQPConnectionSettings(_AMQPConnectionSettings):
upstream_exchange = "os2mo"
exchange = "os2mo_ldap"
queue_prefix = "os2mo_ldap"
class FastRAMQPISettings(_FastRAMQPISettings):
amqp: AMQPConnectionSettings
class Settings(BaseSettings):
fastramqpi: FastRAMQPISettings
```
## Integration Testing
The goal of integration testing in our context is to minimise the amount of
mocking and patching by testing the integration's behavior against a running
OS2mo instance in a way that resembles a production environment as much as
possible. To this end, the integration is configured for testing through its
public interface -- environment variables -- in `.gitlab-ci.yml` wherever
possible. The flow of each test follows the well-known "Arrange, Act and
Assert" pattern, but with a sleight modification to avoid hooking into either
OS2mo or the integration itself, and endure the nature of eventual consistency
that is unavoidable with an event-based system.
To get started, add OS2mo's [GitLab CI template](https://git.magenta.dk/rammearkitektur/os2mo/-/blob/master/gitlab-ci-templates/integration-test-meta.v1.yml)
to `.gitlab-ci.yml`:
```yaml
include:
- project: rammearkitektur/os2mo
file:
- gitlab-ci-templates/integration-test-meta.v1.yml
...
Test:
variables:
PYTEST_ADDOPTS: "-m 'not integration_test'"
Integration-test:
extends:
- .integration-test:mo
variables:
MY_INTEGRATION__FOO: "true"
```
The `Integration-test` CI job starts a full OS2mo stack, including all
necessary services, and then runs all tests marked with `integration_test`.
[Auto-used fixtures](fastramqpi/pytest_plugin.py) automatically ensure test
isolation in OS2mo's database, increase the AMQP messaging interval, and
configure `respx` appropriately. By default, the `Test` job runs all tests in
the project, including both unit- and integration-tests. We overwrite the
`Test` CI job to exclude integration tests -- the `Integration-test` job
already only runs integration tests by default.
Moving on, some boilerplate needs to be copied since a few types cannot be
known a priori:
```python
# tests/integration/conftest.py
from collections.abc import AsyncIterator
import pytest
from asgi_lifespan import LifespanManager
from asgi_lifespan._types import ASGIApp
from fastapi import FastAPI
from gql.client import AsyncClientSession
from httpx import AsyncClient
from httpx import ASGITransport
from pytest import MonkeyPatch
from my_integration.app import create_app
@pytest.fixture
async def _app(monkeypatch: MonkeyPatch) -> FastAPI:
monkeypatch.setenv("SOME_CONFIG", "http://example.org")
app = create_app()
return app
@pytest.fixture
async def asgiapp(_app: FastAPI) -> AsyncIterator[ASGIApp]:
"""ASGI app with lifespan run."""
async with LifespanManager(_app) as manager:
yield manager.app
@pytest.fixture
async def app(_app: FastAPI, asgiapp: ASGIApp) -> FastAPI:
"""FastAPI app with lifespan run."""
return _app
@pytest.fixture
async def test_client(asgiapp: ASGIApp) -> AsyncIterator[AsyncClient]:
"""Create test client with associated lifecycles."""
transport = ASGITransport(app=asgiapp, client=("1.2.3.4", 123)) # type: ignore
async with AsyncClient(
transport=transport, base_url="http://example.com"
) as client:
yield client
@pytest.fixture
async def graphql_client(app: FastAPI) -> AsyncClientSession:
"""Authenticated GraphQL codegen client for OS2mo."""
return app.state.context["graphql_client"]
```
The `test_client` fixture refers to a running instance of the integration, and
`graphql_client` to the integration's pre-authenticated instance of its
[autogenerated GraphQL client](#autogenerated-graphql-client).
To make async tests and fixtures work, and avoid having to
`@pytest.mark.asyncio` each test individually, the following must be added to
`pyproject.toml`:
```toml
[tool.pytest.ini_options]
asyncio_mode="auto"
```
A sample integration test could be as follows:
```python
# tests/integration/test_my_integration.py
import pytest
from fastapi.testclient import TestClient
from fastramqpi.pytest_util import retry
from more_itertools import one
from my_integration.autogenerated_graphql_client import GraphQLClient
@pytest.mark.integration_test
async def test_create_person(
test_client: TestClient,
graphql_client: GraphQLClient,
) -> None:
# Precondition: The person does not already exist.
# The auto-use fixtures should automatically ensure test isolation, but
# sometimes, especially during local development, we might be a little too
# fast on the ^C^C^C^C^C so pytest doesn't get a chance to clean up.
cpr_number = "1234567890"
employee = await graphql_client._testing__get_employee(cpr_number)
assert employee.objects == []
# The integration needs to be triggered to create the employee. How this is
# done depends on the integration. We assume a /trigger/ endpoint here:
test_client.post("/trigger")
@retry()
async def verify() -> None:
employees = await graphql_client._testing__get_employee(cpr_number)
employee_states = one(employees.objects)
employee = one(employee_states.objects)
assert employee.cpr_number == cpr_number
assert employee.given_name == "Alice"
await verify()
```
Through the use of the `test_client` fixture, the test begins by starting the
integration, including initialising any associated lifecyles such as AMQP
connections. We sanity-check, to ensure the test isn't trivially passing, and
trigger the integration. Due to the nature of AMQP, we don't know how long the
integration will need to reconcile the state of OS2mo or an external system.
Therefore, all asserts which depend on eventual consistent state, are wrapped
in a function with `@retry` from FastRAMQPI's `pytest_util`. This allows the
assertions to fail and be retried up to 30 seconds, before the test fails.
To keep tests clear and concise, all GraphQL queries, which are required to
arrange and assert in the test, are added to the [autogenerated GraphQL client](#autogenerated-graphql-client),
using a `_testing__` prefix by convention:
```graphql
# queries.graphql
query _Testing_GetEmployee($cpr_number: CPR!) {
employees(filter: {cpr_numbers: [$cpr_number]}) {
objects {
objects {
cpr_number
given_name
}
}
}
}
```
### Overriding OS2mo-init
It is possible to specific a custom [OS2mo-init](https://git.magenta.dk/rammearkitektur/os2mo-init)
configuration by setting the `OS2MO_INIT_CONFIG` variable at the top of the
project's `.gitlab-ci.yml` and declaring a `init.config.yml`:
```yaml
# .gitlab-ci.yml
variables:
OS2MO_INIT_CONFIG: "/builds/$CI_PROJECT_PATH/init.config.yml"
# init.config.yml
facets:
org_unit_address_type: {}
manager_address_type: {}
address_property: {}
engagement_job_function: {}
org_unit_type: {}
engagement_type:
qa_engineer:
title: "Software Integration Tester"
scope: "TEXT"
association_type: {}
role_type: {}
leave_type: {}
manager_type: {}
responsibility: {}
manager_level: {}
visibility: {}
time_planning: {}
org_unit_level: {}
primary_type: {}
org_unit_hierarchy: {}
kle_number: {}
kle_aspect: {}
it_systems:
FOOBAR: "The Foo Bar"
```
### Debugging
By default, only logs generated by the application are captured and output in
the GitLab interface. Service logs (OS2mo, OS2mo-init, Keycloak, etc.) can be
captured for debugging by adding the following to the project's
`.gitlab-ci.yml` file:
```yaml
variables:
CI_DEBUG_SERVICES: "true"
```
This is especially useful if a service fails to start (`Waiting for keycloak
realm builder...`). See the [GitLab
documentation](https://docs.gitlab.com/ee/ci/services/#capturing-service-container-logs)
for more information.
## Development
### Prerequisites
- [Poetry](https://github.com/python-poetry/poetry)
### Getting Started
1. Clone the repository:
```
git clone git@git.magenta.dk:rammearkitektur/FastRAMQPI.git
```
2. Install all dependencies:
```
poetry install
```
3. Set up pre-commit:
```
poetry run pre-commit install
```
### Running the tests
You use `poetry` and `pytest` to run the tests:
`poetry run pytest`
You can also run specific files
`poetry run pytest tests/<test_folder>/<test_file.py>`
and even use filtering with `-k`
`poetry run pytest -k "Manager"`
You can use the flags `-vx` where `v` prints the test & `x` makes the test stop if any tests fails (Verbose, X-fail)
## Authors
Magenta ApS <https://magenta.dk>
## License
This project uses: [MPL-2.0](LICENSES/MPL-2.0.txt)
This project uses [REUSE](https://reuse.software) for licensing.
All licenses can be found in the [LICENSES folder](LICENSES/) of the project.
Raw data
{
"_id": null,
"home_page": "https://magenta.dk/",
"name": "FastRAMQPI",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.11",
"maintainer_email": null,
"keywords": "os2mo",
"author": "Magenta ApS",
"author_email": "info@magenta.dk",
"download_url": "https://files.pythonhosted.org/packages/60/76/11db6f01f47428c72eb202b21902c813a846585dbddeb9903f0eca065f2b/fastramqpi-10.0.5.tar.gz",
"platform": null,
"description": "<!--\nSPDX-FileCopyrightText: 2021 Magenta ApS <https://magenta.dk>\nSPDX-License-Identifier: MPL-2.0\n-->\n\n# FastRAMQPI\n\nFastRAMQPI is an opinionated library for OS2mo integrations.\n\n\n## Usage\n\n```python\nfrom typing import Any\n\nfrom fastapi import APIRouter\nfrom fastapi import FastAPI\nfrom pydantic import BaseSettings\nfrom pydantic import Field\n\nfrom fastramqpi import depends\nfrom fastramqpi.config import Settings as FastRAMQPISettings\nfrom fastramqpi.main import FastRAMQPI\nfrom fastramqpi.ramqp.depends import Context\nfrom fastramqpi.ramqp.depends import rate_limit\nfrom fastramqpi.ramqp.mo import MORouter\nfrom fastramqpi.ramqp.mo import PayloadUUID\n\n\nclass Settings(BaseSettings):\n class Config:\n frozen = True\n env_nested_delimiter = \"__\"\n\n fastramqpi: FastRAMQPISettings = Field(\n default_factory=FastRAMQPISettings, description=\"FastRAMQPI settings\"\n )\n\n\nfastapi_router = APIRouter()\namqp_router = MORouter()\n\n\n@amqp_router.register(\"engagement\", dependencies=[Depends(rate_limit(10))])\nasync def listen_to_engagements(\n context: Context,\n graphql_session: depends.LegacyGraphQLSession,\n uuid: PayloadUUID,\n) -> None:\n print(uuid)\n\n\ndef create_fastramqpi(**kwargs: Any) -> FastRAMQPI:\n settings = Settings(**kwargs)\n fastramqpi = FastRAMQPI(\n application_name=\"os2mo-example-integration\",\n settings=settings.fastramqpi,\n graphql_version=20,\n )\n fastramqpi.add_context(settings=settings)\n\n # Add our AMQP router(s)\n amqpsystem = fastramqpi.get_amqpsystem()\n amqpsystem.router.registry.update(amqp_router.registry)\n\n # Add our FastAPI router(s)\n app = fastramqpi.get_app()\n app.include_router(fastapi_router)\n\n return fastramqpi\n\n\ndef create_app(**kwargs: Any) -> FastAPI:\n fastramqpi = create_fastramqpi(**kwargs)\n return fastramqpi.get_app()\n```\n\n### Metrics\nFastRAMQPI Metrics are exported via `prometheus/client_python` on the FastAPI's `/metrics`.\n\n\n### Debugging\nFastRAMQPI ships with support for debugging via [DAP](https://microsoft.github.io/debug-adapter-protocol/).\nTo enable it set the `DAP` environmental variable to true, and expose the debugging port (5678).\n\nFor instance in a `docker-compose.yaml` file, by merging in:\n```yaml\nversion: \"3\"\nservices:\n mo_ldap_import_export:\n environment:\n DAP: \"true\"\n ports:\n - \"127.0.0.0:5678:5678\"\n```\n\nFor ease of use, this should be the default for projects using FastRAMQPI.\n## Autogenerated GraphQL Client\nFastRAMQPI exposes an\n[authenticated httpx client](https://docs.authlib.org/en/latest/client/api.html#authlib.integrations.httpx_client.AsyncOAuth2Client)\nthrough the dependency injection system. While it is possible to call the OS2mo\nAPI directly through it, the recommended approach is to define a properly-typed\nGraphQL client in the integration and configure it to make calls through the\nauthenticated client. Instead of manually implementing such client, we strongly\nrecommend to use the\n[**Ariadne Code Generator**](https://github.com/mirumee/ariadne-codegen), which\ngenerates an integration-specific client based on the general OS2mo GraphQL\nschema and the exact queries and mutations the integration requires.\n\nTo integrate such client, first add and configure the codegen:\n```toml\n# pyproject.toml\n\n[tool.poetry.group.dev.dependencies]\nariadne-codegen = \"0.13.0\"\n\n[tool.ariadne-codegen]\n# Ideally, the GraphQL client is generated as part of the build process and\n# never committed to git. Unfortunately, most of our tools and CI analyses the\n# project directly as it is in Git. In the future - when the CI templates\n# operate on the built container image - only the definition of the schema and\n# queries should be checked in.\n#\n# The default package name is `graphql_client`. Make it more obvious that the\n# files are not to be modified manually.\ntarget_package_name = \"autogenerated_graphql_client\"\ntarget_package_path = \"my_integration/\"\nclient_name = \"GraphQLClient\"\nschema_path = \"schema.graphql\" # curl -O http://localhost:5000/graphql/v8/schema.graphql\nqueries_path = \"queries.graphql\"\ninclude_all_inputs = false\ninclude_all_enums = false\nplugins = [\n # Return values directly when only a single top field is requested\n \"ariadne_codegen.contrib.shorter_results.ShorterResultsPlugin\",\n]\n[tool.ariadne-codegen.scalars.DateTime]\ntype = \"datetime.datetime\"\nparse = \"fastramqpi.ariadne.parse_graphql_datetime\"\n[tool.ariadne-codegen.scalars.UUID]\ntype = \"uuid.UUID\"\n```\nWhere you replace `\"my_integration/\"` with the path to your integration.\n\nGrab OS2mo's GraphQL schema:\n```bash\ncurl -O http://localhost:5000/graphql/v20/schema.graphql\n```\nDefine your queries:\n```gql\n# queries.graphql\n\n# SPDX-FileCopyrightText: Magenta ApS <https://magenta.dk>\n# SPDX-License-Identifier: MPL-2.0\n\nquery Version {\n version {\n mo_version\n mo_hash\n }\n}\n```\nGenerate the client - you may have to activate some virtual environment:\n```bash\nariadne-codegen\n```\nThe client class is passed to FastRAMQPI on startup as shown below. This will\nensure it is automatically opened and closed and configured with\nauthentication. **NOTE: remember to update the `graphql_version` when you\ngenerate against a new schema version**.\n\n```python\n# app.py\nfrom autogenerated_graphql_client import GraphQLClient\n\n\ndef create_app(**kwargs: Any) -> FastAPI:\n fastramqpi = FastRAMQPI(\n ...,\n graphql_version=20,\n graphql_client_cls=GraphQLClient,\n )\n```\nThe FastRAMQPI framework cannot define the annotated type for the GraphQL client\nsince its methods depend on the specific queries required by the integration.\nTherefore, each implementing integration needs to define their own:\n```python\n# depends.py\nfrom typing import Annotated\n\nfrom fastapi import Depends\nfrom fastramqpi.ramqp.depends import from_context\n\nfrom my_integration.autogenerated_graphql_client import GraphQLClient as _GraphQLClient\n\nGraphQLClient = Annotated[_GraphQLClient, Depends(from_context(\"graphql_client\"))]\n```\nFinally, we can define our AMQP handler to use the GraphQL client:\n```python\n# events.py\nfrom . import depends\n\n\n@router.register(\"*\")\nasync def handler(mo: depends.GraphQLClient) -> None:\n version = await mo.version()\n print(version)\n```\n\nTo get REUSE working, you might consider adding the following to `.reuse/dep5`:\n```text\nFiles: my_integration/autogenerated_graphql_client/*\nCopyright: Magenta ApS <https://magenta.dk>\nLicense: MPL-2.0\n```\n\n\n## Database\nIf your integration requires access to a database, set it up as so:\n`compose.yaml`:\n```yaml\nservices:\n my_integration:\n environment:\n FASTRAMQPI__DATABASE__USER: \"fastramqpi\"\n FASTRAMQPI__DATABASE__PASSWORD: \"fastramqpi\"\n FASTRAMQPI__DATABASE__HOST: \"db\"\n FASTRAMQPI__DATABASE__NAME: \"fastramqpi\"\n db:\n image: postgres:16\n environment:\n POSTGRES_USER: \"fastramqpi\"\n POSTGRES_PASSWORD: \"fastramqpi\"\n POSTGRES_DB: \"fastramqpi\"\n```\n\n`my_integration/database.py`:\n```python\nfrom sqlalchemy.orm import DeclarativeBase\nfrom sqlalchemy.orm import Mapped\nfrom sqlalchemy.orm import mapped_column\n\n\nclass Base(DeclarativeBase):\n pass\n\n\nclass User(Base):\n __tablename__ = \"user\"\n\n id: Mapped[int] = mapped_column(primary_key=True)\n name: Mapped[str]\n```\n\nDue to budgetary constraints FastRAMQPI does not yet facilitate database\nmigrations through alembic. Instead, all defined tables are created on startup.\nFor technical reasons, this requires us to pass the `DeclarativeBase` from\nabove to the FastRAMQPI constructor in `app.py` as so:\n```python\nfrom my_integration.database import Base\n\n\nfastramqpi = FastRAMQPI(\n application_name=\"os2mo-example-integration\",\n settings=set``tings.fastramqpi,\n ...\n database_metadata=Base.metadata,\n)\n```\n\nThe database can be used in a handler as follows:\n```python\nfrom fastramqpi import depends\nfrom my_integration.database import User\n\n\n@amqp_router.register(\"person\")\nasync def add(session: depends.Session) -> None:\n session.add(User(name=\"Alice\"))\n```\n\n## Multilayer exchanges\n\nFastRAMQPI supports a multi-layer exchange setup, where all integration queues\nare bound to an intermediate exchange which itself is bound to an upstream\nexchange for instance OS2mo, the below Graph shows this setup with two\nintegrations that each has their own intermediate exchange.\n```\n \u250c\u2500\u2500\u2500\u2500\u2500\u2510\n \u250c\u2500\u2500\u2524OS2mo\u251c\u2500\u2500\u2510\n R1+R2+R3\u2502 \u2514\u2500\u2500\u2500\u2500\u2500\u2518 \u2502R4+R5\n \u2502 \u2502\n \u250c\u2500\u2500\u25bc\u2500\u2500\u2510 \u250c\u2500\u2500\u25bc\u2500\u2500\u2510\n \u250c\u2500\u2500\u2524LDAP \u251c\u2500\u2500\u2510 \u2502Omada\u251c\u2500\u2500\u2510\n \u2502 \u2514\u2500\u2500\u252c\u2500\u2500\u2518 \u2502 \u2514\u2500\u2500\u252c\u2500\u2500\u2518 \u2502\n R1\u2502 R2\u2502 R3\u2502 R4\u2502 R5\u2502\n \u2502 \u2502 \u2502 \u2502 \u2502\n\u250c\u2500\u2500\u25bc\u2500\u2500\u252c\u2500\u2500\u25bc\u2500\u2500\u252c\u2500\u2500\u25bc\u2500\u2500\u2510 \u250c\u25bc\u2500\u2500\u2500\u2500\u2500\u25bc\u2510\n\u2502 Q1 \u2502 Q2 \u2502 Q3 \u2502 \u2502 Q4 \u2502\n\u2514\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2518 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518\n```\nHere `R1,...R5` are routing keys, and `Q1,...Q4` are queues. Notice that it is\nstill possible for a single queue to subscribe to multiple routing keys, and to\nhave multiple queues subscribed to a single intermediate exchange.\n\nCompared the above model using intermediate exchanges with the model below,\nthat does not utilize intermediate exchanges.\n```\n \u250c\u2500\u2500\u2500\u2500\u2500\u2510\n \u250c\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2524OS2mo\u251c\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2510\n \u2502 \u2502 \u2514\u2500\u2500\u252c\u2500\u2500\u2518 \u2502 \u2502\n R1\u2502 R2\u2502 R3\u2502 R4\u2502 R5\u2502\n \u2502 \u2502 \u2502 \u2502 \u2502\n\u250c\u2500\u2500\u25bc\u2500\u2500\u252c\u2500\u2500\u25bc\u2500\u2500\u252c\u2500\u2500\u25bc\u2500\u2500\u2510 \u250c\u25bc\u2500\u2500\u2500\u2500\u2500\u25bc\u2510\n\u2502 Q1 \u2502 Q2 \u2502 Q3 \u2502 \u2502 Q4 \u2502\n\u2514\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2518 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518\n```\nHere it is no longer obvious which queues and routing keys belong to which\nintegration, and as such it is no longer possible to use OS2mo's `refresh_*`\nmutators to request a specific integration to run its event handlers.\n\nRequesting a specific integration to run its event handlers, using multi-layer\nexchanges with intermediate exchanges can be done using the `exchange`\nparameter on the `refresh_*` mutators, by passing the name of the intermediate\nexchange.\n\nTo start using intermediate exchanges in your FastRAMQPI integration, simply\nset the `upstream_exchange` value on the `AMQPConnectionSettings` to the\nupstream exchange you want to bind your intermediate exchange to, and set the\n`exchange` value to the name you want to allocate to your intermediate exchange.\n\nIt is generally recommended to use the `upstream_exchange` name as a prefix for\nthe intermediate exchange set in the `exchange` value, such as:\n```python\nfrom fastramqpi.config import Settings as _FastRAMQPISettings\nfrom fastramqpi.ramqp.config import AMQPConnectionSettings as _AMQPConnectionSettings\n\nclass AMQPConnectionSettings(_AMQPConnectionSettings):\n upstream_exchange = \"os2mo\"\n exchange = \"os2mo_ldap\"\n queue_prefix = \"os2mo_ldap\"\n\n\nclass FastRAMQPISettings(_FastRAMQPISettings):\n amqp: AMQPConnectionSettings\n\n\nclass Settings(BaseSettings):\n fastramqpi: FastRAMQPISettings\n```\n\n\n## Integration Testing\nThe goal of integration testing in our context is to minimise the amount of\nmocking and patching by testing the integration's behavior against a running\nOS2mo instance in a way that resembles a production environment as much as\npossible. To this end, the integration is configured for testing through its\npublic interface -- environment variables -- in `.gitlab-ci.yml` wherever\npossible. The flow of each test follows the well-known \"Arrange, Act and\nAssert\" pattern, but with a sleight modification to avoid hooking into either\nOS2mo or the integration itself, and endure the nature of eventual consistency\nthat is unavoidable with an event-based system.\n\nTo get started, add OS2mo's [GitLab CI template](https://git.magenta.dk/rammearkitektur/os2mo/-/blob/master/gitlab-ci-templates/integration-test-meta.v1.yml)\nto `.gitlab-ci.yml`:\n```yaml\ninclude:\n - project: rammearkitektur/os2mo\n file:\n - gitlab-ci-templates/integration-test-meta.v1.yml\n\n...\n\nTest:\n variables:\n PYTEST_ADDOPTS: \"-m 'not integration_test'\"\n\nIntegration-test:\n extends:\n - .integration-test:mo\n variables:\n MY_INTEGRATION__FOO: \"true\"\n```\n\nThe `Integration-test` CI job starts a full OS2mo stack, including all\nnecessary services, and then runs all tests marked with `integration_test`.\n[Auto-used fixtures](fastramqpi/pytest_plugin.py) automatically ensure test\nisolation in OS2mo's database, increase the AMQP messaging interval, and\nconfigure `respx` appropriately. By default, the `Test` job runs all tests in\nthe project, including both unit- and integration-tests. We overwrite the\n`Test` CI job to exclude integration tests -- the `Integration-test` job\nalready only runs integration tests by default.\n\nMoving on, some boilerplate needs to be copied since a few types cannot be\nknown a priori:\n```python\n# tests/integration/conftest.py\nfrom collections.abc import AsyncIterator\n\nimport pytest\nfrom asgi_lifespan import LifespanManager\nfrom asgi_lifespan._types import ASGIApp\nfrom fastapi import FastAPI\nfrom gql.client import AsyncClientSession\nfrom httpx import AsyncClient\nfrom httpx import ASGITransport\nfrom pytest import MonkeyPatch\n\nfrom my_integration.app import create_app\n\n\n@pytest.fixture\nasync def _app(monkeypatch: MonkeyPatch) -> FastAPI:\n monkeypatch.setenv(\"SOME_CONFIG\", \"http://example.org\")\n\n app = create_app()\n return app\n\n\n@pytest.fixture\nasync def asgiapp(_app: FastAPI) -> AsyncIterator[ASGIApp]:\n \"\"\"ASGI app with lifespan run.\"\"\"\n async with LifespanManager(_app) as manager:\n yield manager.app\n\n\n@pytest.fixture\nasync def app(_app: FastAPI, asgiapp: ASGIApp) -> FastAPI:\n \"\"\"FastAPI app with lifespan run.\"\"\"\n return _app\n\n\n@pytest.fixture\nasync def test_client(asgiapp: ASGIApp) -> AsyncIterator[AsyncClient]:\n \"\"\"Create test client with associated lifecycles.\"\"\"\n transport = ASGITransport(app=asgiapp, client=(\"1.2.3.4\", 123)) # type: ignore\n async with AsyncClient(\n transport=transport, base_url=\"http://example.com\"\n ) as client:\n yield client\n\n\n@pytest.fixture\nasync def graphql_client(app: FastAPI) -> AsyncClientSession:\n \"\"\"Authenticated GraphQL codegen client for OS2mo.\"\"\"\n return app.state.context[\"graphql_client\"]\n```\n\nThe `test_client` fixture refers to a running instance of the integration, and\n`graphql_client` to the integration's pre-authenticated instance of its\n[autogenerated GraphQL client](#autogenerated-graphql-client).\n\nTo make async tests and fixtures work, and avoid having to\n`@pytest.mark.asyncio` each test individually, the following must be added to\n`pyproject.toml`:\n```toml\n[tool.pytest.ini_options]\nasyncio_mode=\"auto\"\n```\n\nA sample integration test could be as follows:\n\n```python\n# tests/integration/test_my_integration.py\nimport pytest\nfrom fastapi.testclient import TestClient\nfrom fastramqpi.pytest_util import retry\nfrom more_itertools import one\n\nfrom my_integration.autogenerated_graphql_client import GraphQLClient\n\n\n@pytest.mark.integration_test\nasync def test_create_person(\n test_client: TestClient,\n graphql_client: GraphQLClient,\n) -> None:\n # Precondition: The person does not already exist.\n # The auto-use fixtures should automatically ensure test isolation, but\n # sometimes, especially during local development, we might be a little too\n # fast on the ^C^C^C^C^C so pytest doesn't get a chance to clean up.\n cpr_number = \"1234567890\"\n employee = await graphql_client._testing__get_employee(cpr_number)\n assert employee.objects == []\n\n # The integration needs to be triggered to create the employee. How this is\n # done depends on the integration. We assume a /trigger/ endpoint here:\n test_client.post(\"/trigger\")\n\n @retry()\n async def verify() -> None:\n employees = await graphql_client._testing__get_employee(cpr_number)\n employee_states = one(employees.objects)\n employee = one(employee_states.objects)\n assert employee.cpr_number == cpr_number\n assert employee.given_name == \"Alice\"\n \n await verify()\n```\nThrough the use of the `test_client` fixture, the test begins by starting the\nintegration, including initialising any associated lifecyles such as AMQP\nconnections. We sanity-check, to ensure the test isn't trivially passing, and\ntrigger the integration. Due to the nature of AMQP, we don't know how long the\nintegration will need to reconcile the state of OS2mo or an external system.\nTherefore, all asserts which depend on eventual consistent state, are wrapped\nin a function with `@retry` from FastRAMQPI's `pytest_util`. This allows the\nassertions to fail and be retried up to 30 seconds, before the test fails.\n\nTo keep tests clear and concise, all GraphQL queries, which are required to\narrange and assert in the test, are added to the [autogenerated GraphQL client](#autogenerated-graphql-client),\nusing a `_testing__` prefix by convention:\n```graphql\n# queries.graphql\nquery _Testing_GetEmployee($cpr_number: CPR!) {\n employees(filter: {cpr_numbers: [$cpr_number]}) {\n objects {\n objects {\n cpr_number\n given_name\n }\n }\n }\n}\n```\n\n### Overriding OS2mo-init\nIt is possible to specific a custom [OS2mo-init](https://git.magenta.dk/rammearkitektur/os2mo-init)\nconfiguration by setting the `OS2MO_INIT_CONFIG` variable at the top of the\nproject's `.gitlab-ci.yml` and declaring a `init.config.yml`:\n```yaml\n# .gitlab-ci.yml\nvariables:\n OS2MO_INIT_CONFIG: \"/builds/$CI_PROJECT_PATH/init.config.yml\"\n\n# init.config.yml\nfacets:\n org_unit_address_type: {}\n manager_address_type: {}\n address_property: {}\n engagement_job_function: {}\n org_unit_type: {}\n engagement_type:\n qa_engineer:\n title: \"Software Integration Tester\"\n scope: \"TEXT\"\n association_type: {}\n role_type: {}\n leave_type: {}\n manager_type: {}\n responsibility: {}\n manager_level: {}\n visibility: {}\n time_planning: {}\n org_unit_level: {}\n primary_type: {}\n org_unit_hierarchy: {}\n kle_number: {}\n kle_aspect: {}\n\nit_systems:\n FOOBAR: \"The Foo Bar\"\n```\n\n### Debugging\nBy default, only logs generated by the application are captured and output in\nthe GitLab interface. Service logs (OS2mo, OS2mo-init, Keycloak, etc.) can be\ncaptured for debugging by adding the following to the project's\n`.gitlab-ci.yml` file:\n```yaml\nvariables:\n CI_DEBUG_SERVICES: \"true\"\n```\nThis is especially useful if a service fails to start (`Waiting for keycloak\nrealm builder...`). See the [GitLab\ndocumentation](https://docs.gitlab.com/ee/ci/services/#capturing-service-container-logs)\nfor more information.\n\n\n## Development\n\n### Prerequisites\n\n- [Poetry](https://github.com/python-poetry/poetry)\n\n### Getting Started\n\n1. Clone the repository:\n```\ngit clone git@git.magenta.dk:rammearkitektur/FastRAMQPI.git\n```\n\n2. Install all dependencies:\n```\npoetry install\n```\n\n3. Set up pre-commit:\n```\npoetry run pre-commit install\n```\n\n### Running the tests\n\nYou use `poetry` and `pytest` to run the tests:\n\n`poetry run pytest`\n\nYou can also run specific files\n\n`poetry run pytest tests/<test_folder>/<test_file.py>`\n\nand even use filtering with `-k`\n\n`poetry run pytest -k \"Manager\"`\n\nYou can use the flags `-vx` where `v` prints the test & `x` makes the test stop if any tests fails (Verbose, X-fail)\n\n## Authors\n\nMagenta ApS <https://magenta.dk>\n\n## License\n\nThis project uses: [MPL-2.0](LICENSES/MPL-2.0.txt)\n\nThis project uses [REUSE](https://reuse.software) for licensing.\nAll licenses can be found in the [LICENSES folder](LICENSES/) of the project.\n",
"bugtrack_url": null,
"license": "MPL-2.0",
"summary": "Rammearkitektur integrations framework",
"version": "10.0.5",
"project_urls": {
"Homepage": "https://magenta.dk/",
"Repository": "https://git.magenta.dk/rammearkitektur/FastRAMQPI"
},
"split_keywords": [
"os2mo"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "2abf9967201f94bff644aeaa618b169cb1fcb0ddba3ab6076fd14763ed3ac3d6",
"md5": "41e8d206e107a01ccfe7e45b6a821a33",
"sha256": "b2ec8a95793c56b4025fafc24a1fe9cb4fcd864fd370771dda11e6917d75821e"
},
"downloads": -1,
"filename": "fastramqpi-10.0.5-py3-none-any.whl",
"has_sig": false,
"md5_digest": "41e8d206e107a01ccfe7e45b6a821a33",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.11",
"size": 93650,
"upload_time": "2024-09-25T13:42:51",
"upload_time_iso_8601": "2024-09-25T13:42:51.630077Z",
"url": "https://files.pythonhosted.org/packages/2a/bf/9967201f94bff644aeaa618b169cb1fcb0ddba3ab6076fd14763ed3ac3d6/fastramqpi-10.0.5-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "607611db6f01f47428c72eb202b21902c813a846585dbddeb9903f0eca065f2b",
"md5": "403a784f7456c166193f2c87544b4fc7",
"sha256": "899c41e8119a9251415a13153d830f92c1be7b4c121c4c0ef159e5305ce1df0c"
},
"downloads": -1,
"filename": "fastramqpi-10.0.5.tar.gz",
"has_sig": false,
"md5_digest": "403a784f7456c166193f2c87544b4fc7",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.11",
"size": 71089,
"upload_time": "2024-09-25T13:42:53",
"upload_time_iso_8601": "2024-09-25T13:42:53.473920Z",
"url": "https://files.pythonhosted.org/packages/60/76/11db6f01f47428c72eb202b21902c813a846585dbddeb9903f0eca065f2b/fastramqpi-10.0.5.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-09-25 13:42:53",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "fastramqpi"
}