Name | aidbox-python-sdk-tbs JSON |
Version |
0.0.1a5
JSON |
| download |
home_page | None |
Summary | Topic-Based Subscription extension for Aidbox SDK for python |
upload_time | 2024-09-30 17:29:34 |
maintainer | None |
docs_url | None |
author | None |
requires_python | >=3.11 |
license | None |
keywords |
aidbox
fhir
tbs
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# aidbox-python-sdk-tbs
Topic-based subscription extension for Aidbox SDK for python
## Install
Install `aidbox-python-sdk-tbs[r4b]` or `aidbox-python-sdk-tbs[r5]` using poetry/pipenv/pip.
## Usage
Create `app/app_keys.py` with the following context:
```python
from aidbox_python_sdk.app_keys import client, sdk, settings
from aiohttp import web
from fhirpy import AsyncFHIRClient
fhir_client: web.AppKey[AsyncFHIRClient] = web.AppKey("fhir_client", AsyncFHIRClient)
__all__ = ["fhir_client", "client", "sdk", "settings"]
```
Create `app/subscriptions.py` with the following content:
```python
import logging
from collections.abc import AsyncGenerator
from aidbox_python_sdk.types import SDKOperation, SDKOperationRequest
from aidbox_python_sdk_tbs import SubscriptionDefinition
from aidbox_python_sdk_tbs.r4b impotr r4b_tbs_ctx_factory
from aiohttp import web
from app import app_keys as ak
from app import config
from app.sdk import sdk
async def new_appointment_sub(app: web.Application, appointment: r4b.Appointment) -> None:
fhir_client = app[ak.fhir_client]
logging.error("New appointment %s", appointment.model_dump())
subscriptions: list[SubscriptionDefinition] = [
{
"topic": "https://example.com/SubscriptionTopic/new-appointment-event",
"handler": new_appointment_sub,
"filterBy": [
{"resourceType": "Appointment", "filterParameter": "status", "value": "booked"}
],
},
]
def aidbox_tbs_ctx(app: web.Application) -> AsyncGenerator[None, None]:
return r4b_tbs_ctx_factory(
app[ak.sdk],
app[ak.settings].APP_INIT_URL,
app[ak.fhir_client],
subscriptions,
)
```
New we need to build `app` composing different contexts.
Pay attention, that app_ctx should be the last one, because `aidbox-python-sdk-tbs` adds endpoints to sdk.
Change `main.py` to look like the following example:
```python
from collections.abc import AsyncGenerator
from aidbox_python_sdk.main import init_client, register_app, setup_routes
from aidbox_python_sdk.settings import Settings
from aiohttp import BasicAuth, web
from fhirpy import AsyncFHIRClient
from app import app_keys as ak
from app.sdk import sdk
from app.subscriptions import aidbox_tbs_ctx
async def init_fhir_client(settings: Settings, prefix: str = "") -> AsyncFHIRClient:
basic_auth = BasicAuth(
login=settings.APP_INIT_CLIENT_ID,
password=settings.APP_INIT_CLIENT_SECRET,
)
return AsyncFHIRClient(
f"{settings.APP_INIT_URL}{prefix}",
authorization=basic_auth.encode(),
dump_resource=lambda x: x.model_dump(),
)
async def fhir_client_ctx(app: web.Application) -> AsyncGenerator[None, None]:
app[ak.fhir_client] = await init_fhir_client(app[ak.settings], "/fhir")
yield
async def aidbox_client_ctx(app: web.Application) -> AsyncGenerator[None, None]:
app[ak.client] = await init_client(app[ak.settings])
yield
async def app_ctx(app: web.Application) -> AsyncGenerator[None, None]:
await register_app(app[ak.sdk], app[ak.client])
yield
def create_app() -> web.Application:
app = web.Application()
app[ak.sdk] = sdk
app[ak.settings] = sdk.settings
app.cleanup_ctx.append(aidbox_client_ctx)
app.cleanup_ctx.append(fhir_client_ctx)
app.cleanup_ctx.append(aidbox_tbs_ctx)
# NOTE: Pay attention, app_ctx should be after aidbox_tbs_ctx !!!
app.cleanup_ctx.append(app_ctx)
setup_routes(app)
return app
async def create_gunicorn_app() -> web.Application:
return create_app()
```
Raw data
{
"_id": null,
"home_page": null,
"name": "aidbox-python-sdk-tbs",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.11",
"maintainer_email": null,
"keywords": "aidbox, fhir, tbs",
"author": null,
"author_email": "\"beda.software\" <aidbox-python-sdk@beda.software>",
"download_url": "https://files.pythonhosted.org/packages/98/c7/609e237fe6004e8934885d0510729ccfbdcd05d6649769230f294c74406b/aidbox_python_sdk_tbs-0.0.1a5.tar.gz",
"platform": null,
"description": "# aidbox-python-sdk-tbs\n\nTopic-based subscription extension for Aidbox SDK for python\n\n## Install\n\nInstall `aidbox-python-sdk-tbs[r4b]` or `aidbox-python-sdk-tbs[r5]` using poetry/pipenv/pip.\n\n## Usage\n\nCreate `app/app_keys.py` with the following context:\n\n```python\nfrom aidbox_python_sdk.app_keys import client, sdk, settings\nfrom aiohttp import web\nfrom fhirpy import AsyncFHIRClient\n\nfhir_client: web.AppKey[AsyncFHIRClient] = web.AppKey(\"fhir_client\", AsyncFHIRClient)\n\n__all__ = [\"fhir_client\", \"client\", \"sdk\", \"settings\"]\n```\n\nCreate `app/subscriptions.py` with the following content:\n\n```python\nimport logging\nfrom collections.abc import AsyncGenerator\n\nfrom aidbox_python_sdk.types import SDKOperation, SDKOperationRequest\nfrom aidbox_python_sdk_tbs import SubscriptionDefinition\nfrom aidbox_python_sdk_tbs.r4b impotr r4b_tbs_ctx_factory\nfrom aiohttp import web\n\nfrom app import app_keys as ak\nfrom app import config\nfrom app.sdk import sdk\n\nasync def new_appointment_sub(app: web.Application, appointment: r4b.Appointment) -> None:\n fhir_client = app[ak.fhir_client]\n\n logging.error(\"New appointment %s\", appointment.model_dump())\n\n\nsubscriptions: list[SubscriptionDefinition] = [\n {\n \"topic\": \"https://example.com/SubscriptionTopic/new-appointment-event\",\n \"handler\": new_appointment_sub,\n \"filterBy\": [\n {\"resourceType\": \"Appointment\", \"filterParameter\": \"status\", \"value\": \"booked\"}\n ],\n },\n]\n\n\ndef aidbox_tbs_ctx(app: web.Application) -> AsyncGenerator[None, None]:\n return r4b_tbs_ctx_factory(\n app[ak.sdk],\n app[ak.settings].APP_INIT_URL,\n app[ak.fhir_client],\n subscriptions,\n )\n```\n\nNew we need to build `app` composing different contexts. \nPay attention, that app_ctx should be the last one, because `aidbox-python-sdk-tbs` adds endpoints to sdk.\n\nChange `main.py` to look like the following example:\n\n```python\nfrom collections.abc import AsyncGenerator\n\nfrom aidbox_python_sdk.main import init_client, register_app, setup_routes\nfrom aidbox_python_sdk.settings import Settings\nfrom aiohttp import BasicAuth, web\nfrom fhirpy import AsyncFHIRClient\n\nfrom app import app_keys as ak\nfrom app.sdk import sdk\nfrom app.subscriptions import aidbox_tbs_ctx\n\n\nasync def init_fhir_client(settings: Settings, prefix: str = \"\") -> AsyncFHIRClient:\n basic_auth = BasicAuth(\n login=settings.APP_INIT_CLIENT_ID,\n password=settings.APP_INIT_CLIENT_SECRET,\n )\n\n return AsyncFHIRClient(\n f\"{settings.APP_INIT_URL}{prefix}\",\n authorization=basic_auth.encode(),\n dump_resource=lambda x: x.model_dump(),\n )\n\n\nasync def fhir_client_ctx(app: web.Application) -> AsyncGenerator[None, None]:\n app[ak.fhir_client] = await init_fhir_client(app[ak.settings], \"/fhir\")\n yield\n\n\nasync def aidbox_client_ctx(app: web.Application) -> AsyncGenerator[None, None]:\n app[ak.client] = await init_client(app[ak.settings])\n yield\n\n\nasync def app_ctx(app: web.Application) -> AsyncGenerator[None, None]:\n await register_app(app[ak.sdk], app[ak.client])\n yield\n\n\ndef create_app() -> web.Application:\n app = web.Application()\n app[ak.sdk] = sdk\n app[ak.settings] = sdk.settings\n app.cleanup_ctx.append(aidbox_client_ctx)\n app.cleanup_ctx.append(fhir_client_ctx)\n app.cleanup_ctx.append(aidbox_tbs_ctx)\n # NOTE: Pay attention, app_ctx should be after aidbox_tbs_ctx !!!\n app.cleanup_ctx.append(app_ctx)\n\n setup_routes(app)\n\n return app\n\n\nasync def create_gunicorn_app() -> web.Application:\n return create_app()\n```\n\n\n\n",
"bugtrack_url": null,
"license": null,
"summary": "Topic-Based Subscription extension for Aidbox SDK for python",
"version": "0.0.1a5",
"project_urls": {
"documentation": "https://github.com/beda-software/aidbox-python-sdk-tbs#readme",
"homepage": "https://github.com/beda-software/aidbox-python-sdk-tbs",
"repository": "https://github.com/beda-software/aidbox-python-sdk-tbs.git"
},
"split_keywords": [
"aidbox",
" fhir",
" tbs"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "3a0bbc73d9974230d0dbff22207602637098d0d0567f44af55f8b22e2b26cff8",
"md5": "6a891a92ffcb4c87b80787fcf5988249",
"sha256": "07a69645318078e6a8e28dbd31c0a3bd6fa5b52bd38770e57eb72cf4f4d35dec"
},
"downloads": -1,
"filename": "aidbox_python_sdk_tbs-0.0.1a5-py3-none-any.whl",
"has_sig": false,
"md5_digest": "6a891a92ffcb4c87b80787fcf5988249",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.11",
"size": 7691,
"upload_time": "2024-09-30T17:29:33",
"upload_time_iso_8601": "2024-09-30T17:29:33.217257Z",
"url": "https://files.pythonhosted.org/packages/3a/0b/bc73d9974230d0dbff22207602637098d0d0567f44af55f8b22e2b26cff8/aidbox_python_sdk_tbs-0.0.1a5-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "98c7609e237fe6004e8934885d0510729ccfbdcd05d6649769230f294c74406b",
"md5": "0f6f3920185d7f6068cd43ede181bc72",
"sha256": "f6c5dd454249fa30139c7ab7b7f709b442f3dc46434e432684cf00e32835c7d0"
},
"downloads": -1,
"filename": "aidbox_python_sdk_tbs-0.0.1a5.tar.gz",
"has_sig": false,
"md5_digest": "0f6f3920185d7f6068cd43ede181bc72",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.11",
"size": 5414,
"upload_time": "2024-09-30T17:29:34",
"upload_time_iso_8601": "2024-09-30T17:29:34.590321Z",
"url": "https://files.pythonhosted.org/packages/98/c7/609e237fe6004e8934885d0510729ccfbdcd05d6649769230f294c74406b/aidbox_python_sdk_tbs-0.0.1a5.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-09-30 17:29:34",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "beda-software",
"github_project": "aidbox-python-sdk-tbs#readme",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "aidbox-python-sdk-tbs"
}