Name | fhir-tbs JSON |
Version |
1.0.0a1
JSON |
| download |
home_page | None |
Summary | Topic-Based Subscription context for python aiohttp web applications |
upload_time | 2024-11-28 23:02:54 |
maintainer | None |
docs_url | None |
author | None |
requires_python | >=3.11 |
license | None |
keywords |
fhir
tbs
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# fhir-tbs-py
Topic-based subscription extension for python aiohttp web applications.
**Features**:
- Automatically created webhook aiohttp handlers based on definitions
- Unified R4B/R5 API for automatic registration (managed subscriptions)
- Optional managed subscriptions
- Optional authentication using `X-Api-Key`
- `id-only`/`full-resource` support
## Install
Install `fhir-tbs[r4b]` or `fhir-tbs[r5]` using poetry/pipenv/pip.
## Usage
1. Instantiate R4BTBS/R5TBS class with optionally passing predefined subscriptions using `subscriptions` arg and `subscription_defaults` with default subscription parameters (e.g. `payload_content`, `timeout` or `heartbeat_period`):
```python
tbs = R4BTBS(subscription_defaults={"payload_content": "full-resource"})
```
2. Define subscriptions using decorator `tbs.define`:
```python
@tbs.define(
topic="https://example.com/SubscriptionTopic/new-appointment-event",
filter_by=[
{
"resource_type": "Appointment",
"filter_parameter": "status",
"value": "booked"
}
],
webhook_id="new-appointment"
)
async def new_appointment_handler(
app: web.Application,
reference: str,
_included_resources: list[r4b.AnyResource],
_timestamp: str | None,
) -> None:
logging.info("New appointment %s", reference)
```
3. Invoke `setup_tbs` on app initialization passing needed parameters (see specification below):
```python
setup_tbs(app, tbs, webhook_path_prefix="webhook")
```
### Specification
**fhir_tbs.r4b.R4BTBS**/**fhir_tbs.r5.R5TBS**
- subscriptions (*list[fhir_tbs.SubscriptionDefinitionWithHandler]*, optional) - predefined list of subscriptions.
- subscription_defaults (optional) - default parameters for all subscription definitions.
- payload_content (*str*, optional): `id-only`/`full-resource` (default is `id-only`)
- timeout (*int*, optional): default is `60`
- heartbeat_period (*int*, optional): default is `20`
**tbs_instance.define**
- topic (*str*): URL of SubscriptionTopic to subscribe.
- webhook_id (optional): Optional webhook id that will be part of webhook URL.
- filter_by (*list[FilterBy]*, optional): Optional list of filters applied to topic.
- payload_content (*str*, optional): `id-only`/`full-resource` (default is `id-only`)
- timeout (*int*, optional): default is `60`
- heartbeat_period (*int*, optional): default is `20`
**setup_tbs**
- app (*web.Application*): aiohttp application.
- tbs (*R4BTBS*/*R5TBS*): TBS class instance.
- webhook_path_prefix (*str*): Prefix for the generated aiohttp routes.
- webhook_token (*str*, optional): The authorization token that is checked in X-Api-Token header.
- manage_subscriptions (*bool*, optional): The flag that indicates whether subscription registration/population should be enabled.
- handle_delivery_errors (*bool*, optional): WIP The flag that indicated whether subscription delivery errors (e.g. broken connection or missing events) should be handled.
- app_url (*str*, optional): Application url that is used when `manage_subscriptions`/`handle_delivery_errors` are set.
- get_fhir_client (*Callable[[web.Application], AsyncFHIRClient]*, optional): Getter for web.Application that returns AsyncFHIRClient that further used when `manage_subscriptions`/`handle_delivery_errors` are set.
### Examples
#### General example
Create `subscriptions.py` with the following content:
```python
import logging
from fhirpy import AsyncFHIRClient
import fhirpy_types_r4b as r4b
from fhir_tbs import SubscriptionDefinitionWithHandler
from fhir_tbs.r4b import R4BTBS
from aiohttp import web
# Make sure that app has fhir_client_key
fhir_client_key = web.AppKey("fhir_client_key", AsyncFHIRClient)
async def new_appointment_sub(
app: web.Application,
appointment_ref: str,
included_resources: list[r4b.AnyResource],
_timestamp: str
) -> None:
fhir_client = app[fhir_client_key]
# For id-only use fhir_client to fetch the resource
appointment = r4b.Appointment(**(await fhir_client.get(appointment_ref)))
# For full-resource find in in included resources by reference (straightforward example)
appointment = [
resource for resource in included_resources
if appointment_ref == f"{resource.resourceType}/{resource.id}"
][0]
logging.info("New appointment %s", appointment.model_dump())
tbs = R4BTBS()
@tbs.define(
topic="https://example.com/SubscriptionTopic/new-appointment-event",
filter_by=[
{
"resource_type": "Appointment",
"filter_parameter": "status",
"value": "booked"
}
],
webhook_id="new-appointment"
)
async def new_appointment_handler(
app: web.Application,
reference: str,
_included_resources: list[r4b.AnyResource],
_timestamp: str | None,
) -> None:
logging.info("New appointment %s", reference)
def create_app() -> web.Application:
app = web.Application()
app[fhir_client_key] = AsyncFHIRClient(...)
setup_tbs(
app,
tbs,
webhook_path_prefix="webhook",
app_url="http://app:8080",
get_fhir_client=lambda app: app[fhir_client_key],
manage_subscriptions=True,
handle_delivery_errors=True
)
```
#### Using in aidbox-python-sdk for external subscriptions
```python
external_webhook_path_prefix_parts = ["external-webhook"]
external_tbs = R4BTBS(subscriptions=subscriptions)
def setup_external_tbs(app: web.Application) -> None:
setup_tbs(
app,
external_tbs,
webhook_prefix_path="/".join(external_webhook_path_prefix_parts),
app_url="http://aidbox.example.com",
get_fhir_client=lambda app: app[fhir_client_key],
manage_subscriptions=True,
handle_delivery_errors=True
)
@sdk.operation(
methods=["POST"],
path=[*external_webhook_path_prefix_parts, {"name": "webhook-name"}],
public=True,
)
async def external_webhook_proxy_op(
_operation: SDKOperation, request: SDKOperationRequest
) -> web.Response:
session = request["app"][ak.session]
app_url = str(request["app"][ak.settings].APP_URL).rstrip("/")
webhook_name = request["route-params"]["webhook-name"]
path = "/".join([*external_webhook_path_prefix_parts, webhook_name])
token = request["headers"].get("x-api-key")
async with session.post(
f"{app_url}/{path}",
headers={"X-Api-Key": token} if token else {},
json=request["resource"],
) as response:
return web.Response(
body=await response.text(),
status=response.status,
content_type=response.content_type,
)
def create_app() -> web.Application:
app = web.Application()
app[fhir_client_key] = AsyncFHIRClient(...)
setup_external_tbs(app)
```
Raw data
{
"_id": null,
"home_page": null,
"name": "fhir-tbs",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.11",
"maintainer_email": null,
"keywords": "fhir, tbs",
"author": null,
"author_email": "\"beda.software\" <fhir-tbs-py@beda.software>",
"download_url": "https://files.pythonhosted.org/packages/8a/ae/a6b0819ead7fbb9613cb2f615e71efef492bc63f6c6ad088266499e9d493/fhir_tbs-1.0.0a1.tar.gz",
"platform": null,
"description": "# fhir-tbs-py\n\nTopic-based subscription extension for python aiohttp web applications.\n\n**Features**:\n- Automatically created webhook aiohttp handlers based on definitions\n- Unified R4B/R5 API for automatic registration (managed subscriptions)\n- Optional managed subscriptions\n- Optional authentication using `X-Api-Key`\n- `id-only`/`full-resource` support\n\n## Install\n\nInstall `fhir-tbs[r4b]` or `fhir-tbs[r5]` using poetry/pipenv/pip.\n\n## Usage\n\n1. Instantiate R4BTBS/R5TBS class with optionally passing predefined subscriptions using `subscriptions` arg and `subscription_defaults` with default subscription parameters (e.g. `payload_content`, `timeout` or `heartbeat_period`):\n ```python\n tbs = R4BTBS(subscription_defaults={\"payload_content\": \"full-resource\"})\n ```\n2. Define subscriptions using decorator `tbs.define`:\n ```python\n @tbs.define(\n topic=\"https://example.com/SubscriptionTopic/new-appointment-event\",\n filter_by=[\n {\n \"resource_type\": \"Appointment\",\n \"filter_parameter\": \"status\",\n \"value\": \"booked\"\n }\n ],\n webhook_id=\"new-appointment\"\n )\n async def new_appointment_handler(\n app: web.Application,\n reference: str,\n _included_resources: list[r4b.AnyResource],\n _timestamp: str | None,\n ) -> None:\n logging.info(\"New appointment %s\", reference)\n ```\n3. Invoke `setup_tbs` on app initialization passing needed parameters (see specification below):\n ```python\n setup_tbs(app, tbs, webhook_path_prefix=\"webhook\")\n ```\n\n### Specification\n\n**fhir_tbs.r4b.R4BTBS**/**fhir_tbs.r5.R5TBS**\n- subscriptions (*list[fhir_tbs.SubscriptionDefinitionWithHandler]*, optional) - predefined list of subscriptions.\n- subscription_defaults (optional) - default parameters for all subscription definitions.\n - payload_content (*str*, optional): `id-only`/`full-resource` (default is `id-only`)\n - timeout (*int*, optional): default is `60`\n - heartbeat_period (*int*, optional): default is `20`\n\n\n**tbs_instance.define**\n- topic (*str*): URL of SubscriptionTopic to subscribe.\n- webhook_id (optional): Optional webhook id that will be part of webhook URL.\n- filter_by (*list[FilterBy]*, optional): Optional list of filters applied to topic.\n- payload_content (*str*, optional): `id-only`/`full-resource` (default is `id-only`)\n- timeout (*int*, optional): default is `60`\n- heartbeat_period (*int*, optional): default is `20`\n\n**setup_tbs**\n- app (*web.Application*): aiohttp application.\n- tbs (*R4BTBS*/*R5TBS*): TBS class instance.\n- webhook_path_prefix (*str*): Prefix for the generated aiohttp routes.\n- webhook_token (*str*, optional): The authorization token that is checked in X-Api-Token header.\n- manage_subscriptions (*bool*, optional): The flag that indicates whether subscription registration/population should be enabled.\n- handle_delivery_errors (*bool*, optional): WIP The flag that indicated whether subscription delivery errors (e.g. broken connection or missing events) should be handled.\n- app_url (*str*, optional): Application url that is used when `manage_subscriptions`/`handle_delivery_errors` are set.\n- get_fhir_client (*Callable[[web.Application], AsyncFHIRClient]*, optional): Getter for web.Application that returns AsyncFHIRClient that further used when `manage_subscriptions`/`handle_delivery_errors` are set.\n\n### Examples\n\n#### General example\n\nCreate `subscriptions.py` with the following content:\n\n```python\nimport logging\n\nfrom fhirpy import AsyncFHIRClient\nimport fhirpy_types_r4b as r4b\nfrom fhir_tbs import SubscriptionDefinitionWithHandler\nfrom fhir_tbs.r4b import R4BTBS\nfrom aiohttp import web\n\n# Make sure that app has fhir_client_key\nfhir_client_key = web.AppKey(\"fhir_client_key\", AsyncFHIRClient)\n\n\nasync def new_appointment_sub(\n app: web.Application,\n appointment_ref: str,\n included_resources: list[r4b.AnyResource],\n _timestamp: str\n) -> None:\n fhir_client = app[fhir_client_key]\n # For id-only use fhir_client to fetch the resource\n appointment = r4b.Appointment(**(await fhir_client.get(appointment_ref)))\n # For full-resource find in in included resources by reference (straightforward example) \n appointment = [\n resource for resource in included_resources \n if appointment_ref == f\"{resource.resourceType}/{resource.id}\"\n ][0]\n\n logging.info(\"New appointment %s\", appointment.model_dump())\n\n\ntbs = R4BTBS()\n\n@tbs.define(\n topic=\"https://example.com/SubscriptionTopic/new-appointment-event\",\n filter_by=[\n {\n \"resource_type\": \"Appointment\",\n \"filter_parameter\": \"status\",\n \"value\": \"booked\"\n }\n ],\n webhook_id=\"new-appointment\"\n)\nasync def new_appointment_handler(\n app: web.Application,\n reference: str,\n _included_resources: list[r4b.AnyResource],\n _timestamp: str | None,\n) -> None:\n logging.info(\"New appointment %s\", reference)\n\n\ndef create_app() -> web.Application:\n app = web.Application()\n app[fhir_client_key] = AsyncFHIRClient(...)\n setup_tbs(\n app, \n tbs,\n webhook_path_prefix=\"webhook\",\n app_url=\"http://app:8080\",\n get_fhir_client=lambda app: app[fhir_client_key],\n manage_subscriptions=True,\n handle_delivery_errors=True\n )\n\n```\n\n\n#### Using in aidbox-python-sdk for external subscriptions\n\n\n```python\nexternal_webhook_path_prefix_parts = [\"external-webhook\"]\n\nexternal_tbs = R4BTBS(subscriptions=subscriptions)\n\ndef setup_external_tbs(app: web.Application) -> None:\n setup_tbs(\n app,\n external_tbs,\n webhook_prefix_path=\"/\".join(external_webhook_path_prefix_parts),\n app_url=\"http://aidbox.example.com\",\n get_fhir_client=lambda app: app[fhir_client_key],\n manage_subscriptions=True,\n handle_delivery_errors=True\n )\n\n\n@sdk.operation(\n methods=[\"POST\"],\n path=[*external_webhook_path_prefix_parts, {\"name\": \"webhook-name\"}],\n public=True,\n)\nasync def external_webhook_proxy_op(\n _operation: SDKOperation, request: SDKOperationRequest\n) -> web.Response:\n session = request[\"app\"][ak.session]\n app_url = str(request[\"app\"][ak.settings].APP_URL).rstrip(\"/\")\n webhook_name = request[\"route-params\"][\"webhook-name\"]\n path = \"/\".join([*external_webhook_path_prefix_parts, webhook_name])\n token = request[\"headers\"].get(\"x-api-key\")\n\n async with session.post(\n f\"{app_url}/{path}\",\n headers={\"X-Api-Key\": token} if token else {},\n json=request[\"resource\"],\n ) as response:\n return web.Response(\n body=await response.text(),\n status=response.status,\n content_type=response.content_type,\n )\n\n\ndef create_app() -> web.Application:\n app = web.Application()\n app[fhir_client_key] = AsyncFHIRClient(...)\n\n setup_external_tbs(app)\n```\n\n\n",
"bugtrack_url": null,
"license": null,
"summary": "Topic-Based Subscription context for python aiohttp web applications",
"version": "1.0.0a1",
"project_urls": {
"documentation": "https://github.com/beda-software/fhir-tbs-py#readme",
"homepage": "https://github.com/beda-software/fhir-tbs-py",
"repository": "https://github.com/beda-software/fhir-tbs-py.git"
},
"split_keywords": [
"fhir",
" tbs"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "8ff2f6e6b63baf7877413037e90b6522e26cd40fd32125481bf5252cf78e2fae",
"md5": "322f169c4de0d511109c4db17f7abaf4",
"sha256": "fb5c7f2e43d8626063151be3a26e963a17443ec6c617490a376cdfa81fdd4d17"
},
"downloads": -1,
"filename": "fhir_tbs-1.0.0a1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "322f169c4de0d511109c4db17f7abaf4",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.11",
"size": 12064,
"upload_time": "2024-11-28T23:02:52",
"upload_time_iso_8601": "2024-11-28T23:02:52.303061Z",
"url": "https://files.pythonhosted.org/packages/8f/f2/f6e6b63baf7877413037e90b6522e26cd40fd32125481bf5252cf78e2fae/fhir_tbs-1.0.0a1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "8aaea6b0819ead7fbb9613cb2f615e71efef492bc63f6c6ad088266499e9d493",
"md5": "2424de4c01b92b84a3740804b55ab637",
"sha256": "894ae6e64cc46ca96532132ed048cd0c5e1a406dfb937092752448629dad935f"
},
"downloads": -1,
"filename": "fhir_tbs-1.0.0a1.tar.gz",
"has_sig": false,
"md5_digest": "2424de4c01b92b84a3740804b55ab637",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.11",
"size": 11343,
"upload_time": "2024-11-28T23:02:54",
"upload_time_iso_8601": "2024-11-28T23:02:54.179205Z",
"url": "https://files.pythonhosted.org/packages/8a/ae/a6b0819ead7fbb9613cb2f615e71efef492bc63f6c6ad088266499e9d493/fhir_tbs-1.0.0a1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-11-28 23:02:54",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "beda-software",
"github_project": "fhir-tbs-py#readme",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "fhir-tbs"
}