aidbox-python-sdk-tbs


Nameaidbox-python-sdk-tbs JSON
Version 0.0.1a5 PyPI version JSON
download
home_pageNone
SummaryTopic-Based Subscription extension for Aidbox SDK for python
upload_time2024-09-30 17:29:34
maintainerNone
docs_urlNone
authorNone
requires_python>=3.11
licenseNone
keywords aidbox fhir tbs
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # aidbox-python-sdk-tbs

Topic-based subscription extension for Aidbox SDK for python

## Install

Install `aidbox-python-sdk-tbs[r4b]` or `aidbox-python-sdk-tbs[r5]` using poetry/pipenv/pip.

## Usage

Create `app/app_keys.py` with the following context:

```python
from aidbox_python_sdk.app_keys import client, sdk, settings
from aiohttp import web
from fhirpy import AsyncFHIRClient

fhir_client: web.AppKey[AsyncFHIRClient] = web.AppKey("fhir_client", AsyncFHIRClient)

__all__ = ["fhir_client", "client", "sdk", "settings"]
```

Create `app/subscriptions.py` with the following content:

```python
import logging
from collections.abc import AsyncGenerator

from aidbox_python_sdk.types import SDKOperation, SDKOperationRequest
from aidbox_python_sdk_tbs import SubscriptionDefinition
from aidbox_python_sdk_tbs.r4b impotr r4b_tbs_ctx_factory
from aiohttp import web

from app import app_keys as ak
from app import config
from app.sdk import sdk

async def new_appointment_sub(app: web.Application, appointment: r4b.Appointment) -> None:
    fhir_client = app[ak.fhir_client]

    logging.error("New appointment %s", appointment.model_dump())


subscriptions: list[SubscriptionDefinition] = [
    {
        "topic": "https://example.com/SubscriptionTopic/new-appointment-event",
        "handler": new_appointment_sub,
        "filterBy": [
            {"resourceType": "Appointment", "filterParameter": "status", "value": "booked"}
        ],
    },
]


def aidbox_tbs_ctx(app: web.Application) -> AsyncGenerator[None, None]:
    return r4b_tbs_ctx_factory(
        app[ak.sdk],
        app[ak.settings].APP_INIT_URL,
        app[ak.fhir_client],
        subscriptions,
    )
```

New we need to build `app` composing different contexts. 
Pay attention, that app_ctx should be the last one, because `aidbox-python-sdk-tbs` adds endpoints to sdk.

Change `main.py` to look like the following example:

```python
from collections.abc import AsyncGenerator

from aidbox_python_sdk.main import init_client, register_app, setup_routes
from aidbox_python_sdk.settings import Settings
from aiohttp import BasicAuth, web
from fhirpy import AsyncFHIRClient

from app import app_keys as ak
from app.sdk import sdk
from app.subscriptions import aidbox_tbs_ctx


async def init_fhir_client(settings: Settings, prefix: str = "") -> AsyncFHIRClient:
    basic_auth = BasicAuth(
        login=settings.APP_INIT_CLIENT_ID,
        password=settings.APP_INIT_CLIENT_SECRET,
    )

    return AsyncFHIRClient(
        f"{settings.APP_INIT_URL}{prefix}",
        authorization=basic_auth.encode(),
        dump_resource=lambda x: x.model_dump(),
    )


async def fhir_client_ctx(app: web.Application) -> AsyncGenerator[None, None]:
    app[ak.fhir_client] = await init_fhir_client(app[ak.settings], "/fhir")
    yield


async def aidbox_client_ctx(app: web.Application) -> AsyncGenerator[None, None]:
    app[ak.client] = await init_client(app[ak.settings])
    yield


async def app_ctx(app: web.Application) -> AsyncGenerator[None, None]:
    await register_app(app[ak.sdk], app[ak.client])
    yield


def create_app() -> web.Application:
    app = web.Application()
    app[ak.sdk] = sdk
    app[ak.settings] = sdk.settings
    app.cleanup_ctx.append(aidbox_client_ctx)
    app.cleanup_ctx.append(fhir_client_ctx)
    app.cleanup_ctx.append(aidbox_tbs_ctx)
    # NOTE: Pay attention, app_ctx should be after aidbox_tbs_ctx !!!
    app.cleanup_ctx.append(app_ctx)

    setup_routes(app)

    return app


async def create_gunicorn_app() -> web.Application:
    return create_app()
```




            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "aidbox-python-sdk-tbs",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.11",
    "maintainer_email": null,
    "keywords": "aidbox, fhir, tbs",
    "author": null,
    "author_email": "\"beda.software\" <aidbox-python-sdk@beda.software>",
    "download_url": "https://files.pythonhosted.org/packages/98/c7/609e237fe6004e8934885d0510729ccfbdcd05d6649769230f294c74406b/aidbox_python_sdk_tbs-0.0.1a5.tar.gz",
    "platform": null,
    "description": "# aidbox-python-sdk-tbs\n\nTopic-based subscription extension for Aidbox SDK for python\n\n## Install\n\nInstall `aidbox-python-sdk-tbs[r4b]` or `aidbox-python-sdk-tbs[r5]` using poetry/pipenv/pip.\n\n## Usage\n\nCreate `app/app_keys.py` with the following context:\n\n```python\nfrom aidbox_python_sdk.app_keys import client, sdk, settings\nfrom aiohttp import web\nfrom fhirpy import AsyncFHIRClient\n\nfhir_client: web.AppKey[AsyncFHIRClient] = web.AppKey(\"fhir_client\", AsyncFHIRClient)\n\n__all__ = [\"fhir_client\", \"client\", \"sdk\", \"settings\"]\n```\n\nCreate `app/subscriptions.py` with the following content:\n\n```python\nimport logging\nfrom collections.abc import AsyncGenerator\n\nfrom aidbox_python_sdk.types import SDKOperation, SDKOperationRequest\nfrom aidbox_python_sdk_tbs import SubscriptionDefinition\nfrom aidbox_python_sdk_tbs.r4b impotr r4b_tbs_ctx_factory\nfrom aiohttp import web\n\nfrom app import app_keys as ak\nfrom app import config\nfrom app.sdk import sdk\n\nasync def new_appointment_sub(app: web.Application, appointment: r4b.Appointment) -> None:\n    fhir_client = app[ak.fhir_client]\n\n    logging.error(\"New appointment %s\", appointment.model_dump())\n\n\nsubscriptions: list[SubscriptionDefinition] = [\n    {\n        \"topic\": \"https://example.com/SubscriptionTopic/new-appointment-event\",\n        \"handler\": new_appointment_sub,\n        \"filterBy\": [\n            {\"resourceType\": \"Appointment\", \"filterParameter\": \"status\", \"value\": \"booked\"}\n        ],\n    },\n]\n\n\ndef aidbox_tbs_ctx(app: web.Application) -> AsyncGenerator[None, None]:\n    return r4b_tbs_ctx_factory(\n        app[ak.sdk],\n        app[ak.settings].APP_INIT_URL,\n        app[ak.fhir_client],\n        subscriptions,\n    )\n```\n\nNew we need to build `app` composing different contexts. \nPay attention, that app_ctx should be the last one, because `aidbox-python-sdk-tbs` adds endpoints to sdk.\n\nChange `main.py` to look like the following example:\n\n```python\nfrom collections.abc import AsyncGenerator\n\nfrom aidbox_python_sdk.main import init_client, register_app, setup_routes\nfrom aidbox_python_sdk.settings import Settings\nfrom aiohttp import BasicAuth, web\nfrom fhirpy import AsyncFHIRClient\n\nfrom app import app_keys as ak\nfrom app.sdk import sdk\nfrom app.subscriptions import aidbox_tbs_ctx\n\n\nasync def init_fhir_client(settings: Settings, prefix: str = \"\") -> AsyncFHIRClient:\n    basic_auth = BasicAuth(\n        login=settings.APP_INIT_CLIENT_ID,\n        password=settings.APP_INIT_CLIENT_SECRET,\n    )\n\n    return AsyncFHIRClient(\n        f\"{settings.APP_INIT_URL}{prefix}\",\n        authorization=basic_auth.encode(),\n        dump_resource=lambda x: x.model_dump(),\n    )\n\n\nasync def fhir_client_ctx(app: web.Application) -> AsyncGenerator[None, None]:\n    app[ak.fhir_client] = await init_fhir_client(app[ak.settings], \"/fhir\")\n    yield\n\n\nasync def aidbox_client_ctx(app: web.Application) -> AsyncGenerator[None, None]:\n    app[ak.client] = await init_client(app[ak.settings])\n    yield\n\n\nasync def app_ctx(app: web.Application) -> AsyncGenerator[None, None]:\n    await register_app(app[ak.sdk], app[ak.client])\n    yield\n\n\ndef create_app() -> web.Application:\n    app = web.Application()\n    app[ak.sdk] = sdk\n    app[ak.settings] = sdk.settings\n    app.cleanup_ctx.append(aidbox_client_ctx)\n    app.cleanup_ctx.append(fhir_client_ctx)\n    app.cleanup_ctx.append(aidbox_tbs_ctx)\n    # NOTE: Pay attention, app_ctx should be after aidbox_tbs_ctx !!!\n    app.cleanup_ctx.append(app_ctx)\n\n    setup_routes(app)\n\n    return app\n\n\nasync def create_gunicorn_app() -> web.Application:\n    return create_app()\n```\n\n\n\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "Topic-Based Subscription extension for Aidbox SDK for python",
    "version": "0.0.1a5",
    "project_urls": {
        "documentation": "https://github.com/beda-software/aidbox-python-sdk-tbs#readme",
        "homepage": "https://github.com/beda-software/aidbox-python-sdk-tbs",
        "repository": "https://github.com/beda-software/aidbox-python-sdk-tbs.git"
    },
    "split_keywords": [
        "aidbox",
        " fhir",
        " tbs"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "3a0bbc73d9974230d0dbff22207602637098d0d0567f44af55f8b22e2b26cff8",
                "md5": "6a891a92ffcb4c87b80787fcf5988249",
                "sha256": "07a69645318078e6a8e28dbd31c0a3bd6fa5b52bd38770e57eb72cf4f4d35dec"
            },
            "downloads": -1,
            "filename": "aidbox_python_sdk_tbs-0.0.1a5-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "6a891a92ffcb4c87b80787fcf5988249",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.11",
            "size": 7691,
            "upload_time": "2024-09-30T17:29:33",
            "upload_time_iso_8601": "2024-09-30T17:29:33.217257Z",
            "url": "https://files.pythonhosted.org/packages/3a/0b/bc73d9974230d0dbff22207602637098d0d0567f44af55f8b22e2b26cff8/aidbox_python_sdk_tbs-0.0.1a5-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "98c7609e237fe6004e8934885d0510729ccfbdcd05d6649769230f294c74406b",
                "md5": "0f6f3920185d7f6068cd43ede181bc72",
                "sha256": "f6c5dd454249fa30139c7ab7b7f709b442f3dc46434e432684cf00e32835c7d0"
            },
            "downloads": -1,
            "filename": "aidbox_python_sdk_tbs-0.0.1a5.tar.gz",
            "has_sig": false,
            "md5_digest": "0f6f3920185d7f6068cd43ede181bc72",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.11",
            "size": 5414,
            "upload_time": "2024-09-30T17:29:34",
            "upload_time_iso_8601": "2024-09-30T17:29:34.590321Z",
            "url": "https://files.pythonhosted.org/packages/98/c7/609e237fe6004e8934885d0510729ccfbdcd05d6649769230f294c74406b/aidbox_python_sdk_tbs-0.0.1a5.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-09-30 17:29:34",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "beda-software",
    "github_project": "aidbox-python-sdk-tbs#readme",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "aidbox-python-sdk-tbs"
}
        
Elapsed time: 2.86435s