![Sample image](https://gitlab.com/uploads/-/system/project/avatar/42327849/a5e01db694b47cd07018813ce821a4e1.png?width=64)
aiosox: <a href="https://gitlab.com/arieutils/aiosox">repo link </a>
=======================================
Quick example
-----------
can be installed using pip/poetry:
poetry shell
poetry run uvicorn example:app --port=8001
```python
from typing import List
from fastapi import FastAPI
from pydantic import BaseModel
from aiosox import SioAuth, SioNamespace, SocketIoServer
def get_app():
applitcation = FastAPI(title="tester")
return applitcation
app = get_app()
sio_server: SocketIoServer = SocketIoServer(app=app, kafka_url="localhost:29092")
user_namespapce: SioNamespace = SioNamespace("/user", socket_io_server=sio_server)
sio_server._sio.register_namespace(user_namespapce)
@app.on_event("startup")
async def on_start():
await sio_server.start()
@app.on_event("shutdown")
async def on_shutdown():
await sio_server.shutdown()
class UserY(BaseModel):
name: str
class UserT(BaseModel):
name: str
what: List[UserY]
class OfferT(BaseModel):
title: str
on_failed_emmiter = user_namespapce.create_emitter("failed", model=OfferT | UserT)
@user_namespapce.on(
"submit",
description="when user submits a form",
payload_model=UserT | UserY,
response_model=List[UserT],
auth=SioAuth.jwt,
)
async def on_submit(sid, data):
print(
sid,
data,
)
```
Raw data
{
"_id": null,
"home_page": "https://gitlab.com/arieutils/aiosox",
"name": "aiosox",
"maintainer": "",
"docs_url": null,
"requires_python": "==3.11.1",
"maintainer_email": "",
"keywords": "fastapi,asyncapi,kafka,aiokafka,socketio,pubsub,websockets",
"author": "Arie",
"author_email": "ariesorkin3@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/53/66/2fdbc5fafa73d399d80e2df4357831ff5534781b42e2c340d56baa0835c3/aiosox-0.3.0.tar.gz",
"platform": null,
"description": "![Sample image](https://gitlab.com/uploads/-/system/project/avatar/42327849/a5e01db694b47cd07018813ce821a4e1.png?width=64)\n\n\naiosox: <a href=\"https://gitlab.com/arieutils/aiosox\">repo link </a>\n=======================================\nQuick example\n-----------\n\ncan be installed using pip/poetry:\n\n poetry shell\n \n poetry run uvicorn example:app --port=8001\n \n```python\nfrom typing import List\n\nfrom fastapi import FastAPI\nfrom pydantic import BaseModel\n\nfrom aiosox import SioAuth, SioNamespace, SocketIoServer\n\n\ndef get_app():\n applitcation = FastAPI(title=\"tester\")\n return applitcation\n\n\napp = get_app()\n\nsio_server: SocketIoServer = SocketIoServer(app=app, kafka_url=\"localhost:29092\")\nuser_namespapce: SioNamespace = SioNamespace(\"/user\", socket_io_server=sio_server)\nsio_server._sio.register_namespace(user_namespapce)\n\n\n@app.on_event(\"startup\")\nasync def on_start():\n await sio_server.start()\n\n\n@app.on_event(\"shutdown\")\nasync def on_shutdown():\n await sio_server.shutdown()\n\n\nclass UserY(BaseModel):\n name: str\n\n\nclass UserT(BaseModel):\n name: str\n what: List[UserY]\n\n\nclass OfferT(BaseModel):\n title: str\n\n\non_failed_emmiter = user_namespapce.create_emitter(\"failed\", model=OfferT | UserT)\n\n\n@user_namespapce.on(\n \"submit\",\n description=\"when user submits a form\",\n payload_model=UserT | UserY,\n response_model=List[UserT],\n auth=SioAuth.jwt,\n)\nasync def on_submit(sid, data):\n print(\n sid,\n data,\n )\n\n```",
"bugtrack_url": null,
"license": "",
"summary": "\u26d3\ufe0f Combination of asyncapi(documentation) & socketio pub/sub using aiokafka as the client manager multinode backend services",
"version": "0.3.0",
"split_keywords": [
"fastapi",
"asyncapi",
"kafka",
"aiokafka",
"socketio",
"pubsub",
"websockets"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "766ab58d18b1fb8f8f796aef9fc2cc93757ccc068fa9d62aace2b3275dbc115f",
"md5": "a925fc064d176779de52a8a5c079d7f1",
"sha256": "8ed8afe9d20dcf7796f2d55f29a3be0d686a78ede239a25dc324f2f322378211"
},
"downloads": -1,
"filename": "aiosox-0.3.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "a925fc064d176779de52a8a5c079d7f1",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "==3.11.1",
"size": 27173,
"upload_time": "2023-01-10T11:22:57",
"upload_time_iso_8601": "2023-01-10T11:22:57.458133Z",
"url": "https://files.pythonhosted.org/packages/76/6a/b58d18b1fb8f8f796aef9fc2cc93757ccc068fa9d62aace2b3275dbc115f/aiosox-0.3.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "53662fdbc5fafa73d399d80e2df4357831ff5534781b42e2c340d56baa0835c3",
"md5": "65fae746b325cf9ea0212bb7e2f8c683",
"sha256": "3ec444bd9e09ca392b9fafb3af5b4f3caeb4e58cef1ef297e710f5a84fab5a83"
},
"downloads": -1,
"filename": "aiosox-0.3.0.tar.gz",
"has_sig": false,
"md5_digest": "65fae746b325cf9ea0212bb7e2f8c683",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "==3.11.1",
"size": 19098,
"upload_time": "2023-01-10T11:22:59",
"upload_time_iso_8601": "2023-01-10T11:22:59.074049Z",
"url": "https://files.pythonhosted.org/packages/53/66/2fdbc5fafa73d399d80e2df4357831ff5534781b42e2c340d56baa0835c3/aiosox-0.3.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-01-10 11:22:59",
"github": false,
"gitlab": true,
"bitbucket": false,
"gitlab_user": "arieutils",
"gitlab_project": "aiosox",
"lcname": "aiosox"
}