# FastWS
<p align="center">
<a href="https://github.com/endrekrohn/fastws">
<img src="https://raw.githubusercontent.com/endrekrohn/fastws/assets/assets/fastws.png" alt="FastWS"/>
</a>
</p>
**Source Code**: <a href="https://github.com/endrekrohn/fastws" target="_blank">https://github.com/endrekrohn/fastws</a>
---
FastWS is a wrapper around FastAPI to create better WebSocket applications with auto-documentation using <a href="https://www.asyncapi.com/" target="_blank">AsyncAPI</a>, in a similar fashion as FastAPIs existing use of OpenAPI.
The current supported AsyncAPI verison is `2.4.0`. Once version `3.0.0` is released the plan is to upgrade to this standard.
---
## Example project
If you are familiar with FastAPI and want to look at an example project using FastWS <a href="https://github.com/endrekrohn/fastws-example" target="_blank">look here</a>👨💻
---
## Requirements
Python 3.11+
`FastWS` uses Pydantic v2 and FastAPI.
## Installation
```console
$ pip install fastws
```
You will also need an ASGI server, for production such as <a href="https://www.uvicorn.org" class="external-link" target="_blank">Uvicorn</a> or <a href="https://github.com/pgjones/hypercorn" class="external-link" target="_blank">Hypercorn</a>.
<div class="termy">
```console
$ pip install "uvicorn[standard]"
```
</div>
## Example
### Create it
* Create a file `main.py` with:
```Python
from contextlib import asynccontextmanager
from typing import Annotated
from fastapi import Depends, FastAPI
from fastws import Client, FastWS
service = FastWS()
@service.send("ping", reply="ping")
async def send_event_a():
return
@asynccontextmanager
async def lifespan(app: FastAPI):
service.setup(app)
yield
app = FastAPI(lifespan=lifespan)
@app.websocket("/")
async def fastws_stream(client: Annotated[Client, Depends(service.manage)]):
await service.serve(client)
```
We can look at the generated documentation at `http://localhost:<port>/asyncapi`.
<p align="center">
<a href="https://github.com/endrekrohn/fastws">
<img src="https://raw.githubusercontent.com/endrekrohn/fastws/assets/assets/asyncapi_example.png" alt="AsyncAPI Docs"/>
</a>
</p>
---
### Example breakdown
First we import and initialize the service.
```Python
from fastws import Client, FastWS
service = FastWS()
```
#### Define event
Next up we connect an operation (a WebSocket message) to the service, using the decorator `@service.send(...)`. We need to define the operation using a string similar to how we define an HTTP-endpoint using a path.
The operation-identificator is in this case `"ping"`, meaning we will use this string to identify what type of message we are receiving.
```Python
@service.send("ping", reply="ping")
async def send_event_a():
return
```
If we want to define an `payload` for the operation we can extend the example:
```Python
from pydantic import BaseModel
class PingPayload(BaseModel):
foo: str
@service.send("ping", reply="ping")
async def send_event_a(payload: PingPayload):
return
```
An incoming message should now have the following format. (We will later view this in the generated AsyncAPI-documentation).
```json
{
"type": "ping",
"payload": {
"foo": "bar"
}
}
```
#### Connect service
Next up we connect the service to our running FastAPI application.
```Python
@asynccontextmanager
async def lifespan(app: FastAPI):
service.setup(app)
yield
app = FastAPI(lifespan=lifespan)
@app.websocket("/")
async def fastws_stream(client: Annotated[Client, Depends(service.manage)]):
await service.serve(client)
```
The function `service.setup(app)` inside FastAPIs lifespan registers two endpoints
- `/asyncapi.json`, to retrieve our API definition
- `/asyncapi`, to view the AsyncAPI documentation UI.
You can override both of these URLs when initializing the service, or set them to `None` to avoid registering the endpoints at all.
## Routing
To spread out our service we can use the `OperationRouter`-class.
```Python
# feature_1.py
from fastws import Client, OperationRouter
from pydantic import BaseModel
router = OperationRouter(prefix="user.")
class SubscribePayload(BaseModel):
topic: str
class SubscribeResponse(BaseModel):
detail: str
topics: set[str]
@router.send("subscribe", reply="subscribe.response")
async def subscribe_to_topic(
payload: SubscribePayload,
client: Client,
) -> SubscribeResponse:
client.subscribe(payload.topic)
return SubscribeResponse(
detail=f"Subscribed to {payload.topic}",
topics=client.topics,
)
```
We can then include the router in our main service.
```Python
# main.py
from fastws import Client, FastWS
from feature_1 import router
service = FastWS()
service.include_router(router)
```
## Operations, `send` and `recv`
The service enables two types of operations. Let us define these operations clearly:
- `send`: An operation where API user sends a message to the API server.
**Note**: Up to AsyncAPI version `2.6.0` this refers to a `publish`-operation, but is changing to `send` in version `3.0.0`.
- `recv`: An operation where API server sends a message to the API user.
**Note**: Up to AsyncAPI version `2.6.0` this refers to a `subscribe`-operation, but is changing to `receive` in version `3.0.0`.
### The `send`-operation
The above examples have only displayed the use of `send`-operations.
When using the functions `FastWS.client_send(message, client)` or `FastWS.serve(client)`, we implicitly send some arguments. These keyword-arguments have the following keywords and types:
- `client` with type `fastws.application.Client`
- `app` with type `fastws.application.FastWS`
- `payload`, optional with type defined in the function processing the message.
A `send`-operation can therefore access the following arguments:
```Python
from fastws import Client, FastWS
from pydantic import BaseModel
class Something(BaseModel):
foo: str
class Thing(BaseModel):
bar: str
@router.send("foo", reply="bar")
async def some_function(
payload: Something,
client: Client,
app: FastWS,
) -> Thing:
print(f"{app.connections=}")
print(f"{client.uid=}")
return Thing(bar=client.uid)
```
### The `recv`-operation
When using the function `FastWS.server_send(message, topic)`, we implicitly send some arguments. These keyword-arguments have the keywords and types:
- `app` with type `fastws.application.FastWS`
- Optional `payload` with type defined in the function processing the message.
A `recv`-operation can therefore access the following arguments:
```Python
from fastws import FastWS
from pydantic import BaseModel
class AlertPayload(BaseModel):
message: str
@router.recv("alert")
async def recv_client(payload: AlertPayload, app: FastWS) -> str:
return "hey there!"
```
If we want create a message on the server side we can do the following:
```Python
from fastapi import FastAPI
from fastws import FastWS
service = FastWS()
app = FastAPI()
@app.post("/")
async def alert_on_topic_foobar(message: str):
await service.server_send(
Message(type="alert", payload={"message": message}),
topic="foobar",
)
return "ok"
```
In the example above all connections subscribed to the topic `foobar` will recieve a message the the payload `"hey there!"`.
In this way you can on the server-side choose to publish messages from anywhere to any topic. This is especially useful if you have a persistent connection to Redis or similar that reads messages from some channel and want to propagate these to your users.
## Authentication
There are to ways to tackle authentication using `FastWS`.
### By defining `auth_handler`
One way is to provide a custom `auth_handler` when initializing the service. Below is an example where the API user must provide a secret message within a timeout to authenticate.
```Python
import asyncio
import logging
from fastapi import WebSocket
from fastws import FastWS
def custom_auth(to_wait: float = 5):
async def handler(ws: WebSocket) -> bool:
await ws.accept()
try:
initial_msg = await asyncio.wait_for(
ws.receive_text(),
timeout=to_wait,
)
return initial_msg == "SECRET_HUSH_HUSH"
except asyncio.exceptions.TimeoutError:
logging.info("Took to long to provide authentication")
return False
return handler
service = FastWS(auth_handler=custom_auth())
```
### By using FastAPI dependency
If you want to use your own FastAPI dependency to handle authentication before it enters the FastWS service you will have to set `auto_ws_accept` to `False`.
```Python
import asyncio
from typing import Annotated
from fastapi import Depends, FastAPI, WebSocket, WebSocketException, status
from fastws import Client, FastWS
service = FastWS(auto_ws_accept=False)
app = FastAPI()
async def custom_dep(ws: WebSocket):
await ws.accept()
initial_msg = await asyncio.wait_for(
ws.receive_text(),
timeout=5,
)
if initial_msg == "SECRET_HUSH_HUSH":
return
raise WebSocketException(
code=status.WS_1008_POLICY_VIOLATION,
reason="Not authenticated",
)
@app.websocket("/")
async def fastws_stream(
client: Annotated[Client, Depends(service.manage)],
_=Depends(custom_dep),
):
await service.serve(client)
```
## Heartbeats and connection lifespan
To handle a WebSocket's lifespan at an application level, FastWS tries to help you by using `asyncio.timeout()`-context managers in its `serve(client)`-function.
You can set the both:
- `heartbeat_interval`: Meaning a client needs to send a message within this time.
- `max_connection_lifespan`: Meaning all connections will disconnect when exceeding this time.
These must set during initialization:
```Python
from fastws import FastWS
service = FastWS(
heartbeat_interval=10,
max_connection_lifespan=300,
)
```
Both `heartbeat_interval` and `max_connection_lifespan` can be set to None to disable any restrictions. Note this is the default.
Please note that you can also set restrictions in your ASGI-server. These restrictions apply at a protocol/server-level and are different from the restrictions set by your application. Applicable settings for [uvicorn](https://www.uvicorn.org/#command-line-options):
- `--ws-ping-interval` INTEGER
- `--ws-ping-timeout` INTEGER
- `--ws-max-size` INTEGER
Raw data
{
"_id": null,
"home_page": "https://github.com/endrekrohn/fastws",
"name": "fastws",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.11,<4.0",
"maintainer_email": "",
"keywords": "fastapi,pydantic,starlette,websockets,asyncapi",
"author": "Endre Krohn",
"author_email": "endre@skript.no",
"download_url": "https://files.pythonhosted.org/packages/72/6d/3bfee5cd321ce19919bf4699be72600b25e0d9147f7f87f987f28f033b38/fastws-0.1.7.tar.gz",
"platform": null,
"description": "# FastWS\n\n<p align=\"center\">\n <a href=\"https://github.com/endrekrohn/fastws\">\n <img src=\"https://raw.githubusercontent.com/endrekrohn/fastws/assets/assets/fastws.png\" alt=\"FastWS\"/>\n</a>\n</p>\n\n**Source Code**: <a href=\"https://github.com/endrekrohn/fastws\" target=\"_blank\">https://github.com/endrekrohn/fastws</a>\n\n---\n\nFastWS is a wrapper around FastAPI to create better WebSocket applications with auto-documentation using <a href=\"https://www.asyncapi.com/\" target=\"_blank\">AsyncAPI</a>, in a similar fashion as FastAPIs existing use of OpenAPI.\n\nThe current supported AsyncAPI verison is `2.4.0`. Once version `3.0.0` is released the plan is to upgrade to this standard.\n\n---\n## Example project\n\nIf you are familiar with FastAPI and want to look at an example project using FastWS <a href=\"https://github.com/endrekrohn/fastws-example\" target=\"_blank\">look here</a>\ud83d\udc68\u200d\ud83d\udcbb\n\n---\n\n## Requirements\n\nPython 3.11+\n\n`FastWS` uses Pydantic v2 and FastAPI.\n\n## Installation\n\n\n```console\n$ pip install fastws\n```\n\n\nYou will also need an ASGI server, for production such as <a href=\"https://www.uvicorn.org\" class=\"external-link\" target=\"_blank\">Uvicorn</a> or <a href=\"https://github.com/pgjones/hypercorn\" class=\"external-link\" target=\"_blank\">Hypercorn</a>.\n\n<div class=\"termy\">\n\n```console\n$ pip install \"uvicorn[standard]\"\n```\n\n</div>\n\n## Example\n\n### Create it\n\n* Create a file `main.py` with:\n\n```Python\nfrom contextlib import asynccontextmanager\nfrom typing import Annotated\n\nfrom fastapi import Depends, FastAPI\nfrom fastws import Client, FastWS\n\nservice = FastWS()\n\n\n@service.send(\"ping\", reply=\"ping\")\nasync def send_event_a():\n return\n\n\n@asynccontextmanager\nasync def lifespan(app: FastAPI):\n service.setup(app)\n yield\n\n\napp = FastAPI(lifespan=lifespan)\n\n\n@app.websocket(\"/\")\nasync def fastws_stream(client: Annotated[Client, Depends(service.manage)]):\n await service.serve(client)\n```\n\nWe can look at the generated documentation at `http://localhost:<port>/asyncapi`.\n\n<p align=\"center\">\n <a href=\"https://github.com/endrekrohn/fastws\">\n <img src=\"https://raw.githubusercontent.com/endrekrohn/fastws/assets/assets/asyncapi_example.png\" alt=\"AsyncAPI Docs\"/>\n</a>\n</p>\n\n---\n\n### Example breakdown\n\nFirst we import and initialize the service.\n\n\n```Python\nfrom fastws import Client, FastWS\n\nservice = FastWS()\n```\n\n#### Define event\n\nNext up we connect an operation (a WebSocket message) to the service, using the decorator `@service.send(...)`. We need to define the operation using a string similar to how we define an HTTP-endpoint using a path.\n\nThe operation-identificator is in this case `\"ping\"`, meaning we will use this string to identify what type of message we are receiving.\n\n```Python\n@service.send(\"ping\", reply=\"ping\")\nasync def send_event_a():\n return\n```\n\nIf we want to define an `payload` for the operation we can extend the example:\n\n```Python\nfrom pydantic import BaseModel\n\nclass PingPayload(BaseModel):\n foo: str\n\n@service.send(\"ping\", reply=\"ping\")\nasync def send_event_a(payload: PingPayload):\n return\n```\n\nAn incoming message should now have the following format. (We will later view this in the generated AsyncAPI-documentation).\n\n```json\n{\n \"type\": \"ping\",\n \"payload\": {\n \"foo\": \"bar\"\n }\n}\n```\n#### Connect service\n\nNext up we connect the service to our running FastAPI application.\n\n```Python\n@asynccontextmanager\nasync def lifespan(app: FastAPI):\n service.setup(app)\n yield\n\n\napp = FastAPI(lifespan=lifespan)\n\n\n@app.websocket(\"/\")\nasync def fastws_stream(client: Annotated[Client, Depends(service.manage)]):\n await service.serve(client)\n```\n\nThe function `service.setup(app)` inside FastAPIs lifespan registers two endpoints\n- `/asyncapi.json`, to retrieve our API definition \n- `/asyncapi`, to view the AsyncAPI documentation UI.\n\nYou can override both of these URLs when initializing the service, or set them to `None` to avoid registering the endpoints at all.\n\n## Routing\n\nTo spread out our service we can use the `OperationRouter`-class.\n\n```Python\n# feature_1.py\nfrom fastws import Client, OperationRouter\nfrom pydantic import BaseModel\n\nrouter = OperationRouter(prefix=\"user.\")\n\n\nclass SubscribePayload(BaseModel):\n topic: str\n\n\nclass SubscribeResponse(BaseModel):\n detail: str\n topics: set[str]\n\n\n@router.send(\"subscribe\", reply=\"subscribe.response\")\nasync def subscribe_to_topic(\n payload: SubscribePayload,\n client: Client,\n) -> SubscribeResponse:\n client.subscribe(payload.topic)\n return SubscribeResponse(\n detail=f\"Subscribed to {payload.topic}\",\n topics=client.topics,\n )\n```\n\nWe can then include the router in our main service.\n\n```Python\n# main.py\nfrom fastws import Client, FastWS\n\nfrom feature_1 import router\n\nservice = FastWS()\nservice.include_router(router)\n```\n\n## Operations, `send` and `recv`\n\nThe service enables two types of operations. Let us define these operations clearly:\n\n- `send`: An operation where API user sends a message to the API server.\n \n **Note**: Up to AsyncAPI version `2.6.0` this refers to a `publish`-operation, but is changing to `send` in version `3.0.0`.\n\n- `recv`: An operation where API server sends a message to the API user.\n \n **Note**: Up to AsyncAPI version `2.6.0` this refers to a `subscribe`-operation, but is changing to `receive` in version `3.0.0`.\n\n\n### The `send`-operation\n\nThe above examples have only displayed the use of `send`-operations.\n\nWhen using the functions `FastWS.client_send(message, client)` or `FastWS.serve(client)`, we implicitly send some arguments. These keyword-arguments have the following keywords and types:\n\n- `client` with type `fastws.application.Client` \n- `app` with type `fastws.application.FastWS`\n- `payload`, optional with type defined in the function processing the message.\n\nA `send`-operation can therefore access the following arguments:\n\n```Python\nfrom fastws import Client, FastWS\nfrom pydantic import BaseModel\n\nclass Something(BaseModel):\n foo: str\n\n\nclass Thing(BaseModel):\n bar: str\n\n\n@router.send(\"foo\", reply=\"bar\")\nasync def some_function(\n payload: Something,\n client: Client,\n app: FastWS,\n) -> Thing:\n print(f\"{app.connections=}\")\n print(f\"{client.uid=}\")\n\n return Thing(bar=client.uid)\n```\n\n### The `recv`-operation\n\nWhen using the function `FastWS.server_send(message, topic)`, we implicitly send some arguments. These keyword-arguments have the keywords and types:\n\n- `app` with type `fastws.application.FastWS`\n- Optional `payload` with type defined in the function processing the message.\n\nA `recv`-operation can therefore access the following arguments:\n\n```Python\nfrom fastws import FastWS\nfrom pydantic import BaseModel\n\nclass AlertPayload(BaseModel):\n message: str\n\n\n@router.recv(\"alert\")\nasync def recv_client(payload: AlertPayload, app: FastWS) -> str:\n return \"hey there!\"\n```\n\nIf we want create a message on the server side we can do the following:\n\n```Python\nfrom fastapi import FastAPI\nfrom fastws import FastWS\n\nservice = FastWS()\napp = FastAPI()\n\n@app.post(\"/\")\nasync def alert_on_topic_foobar(message: str):\n await service.server_send(\n Message(type=\"alert\", payload={\"message\": message}),\n topic=\"foobar\",\n )\n return \"ok\"\n```\n\nIn the example above all connections subscribed to the topic `foobar` will recieve a message the the payload `\"hey there!\"`.\n\nIn this way you can on the server-side choose to publish messages from anywhere to any topic. This is especially useful if you have a persistent connection to Redis or similar that reads messages from some channel and want to propagate these to your users.\n\n## Authentication\n\nThere are to ways to tackle authentication using `FastWS`.\n\n### By defining `auth_handler`\n\nOne way is to provide a custom `auth_handler` when initializing the service. Below is an example where the API user must provide a secret message within a timeout to authenticate.\n\n```Python\nimport asyncio\nimport logging\nfrom fastapi import WebSocket\nfrom fastws import FastWS\n\n\ndef custom_auth(to_wait: float = 5):\n async def handler(ws: WebSocket) -> bool:\n await ws.accept()\n try:\n initial_msg = await asyncio.wait_for(\n ws.receive_text(),\n timeout=to_wait,\n )\n return initial_msg == \"SECRET_HUSH_HUSH\"\n except asyncio.exceptions.TimeoutError:\n logging.info(\"Took to long to provide authentication\")\n\n return False\n\n return handler\n\n\nservice = FastWS(auth_handler=custom_auth())\n```\n\n### By using FastAPI dependency\n\nIf you want to use your own FastAPI dependency to handle authentication before it enters the FastWS service you will have to set `auto_ws_accept` to `False`.\n\n```Python\nimport asyncio\nfrom typing import Annotated\n\nfrom fastapi import Depends, FastAPI, WebSocket, WebSocketException, status\nfrom fastws import Client, FastWS\n\nservice = FastWS(auto_ws_accept=False)\n\napp = FastAPI()\n\n\nasync def custom_dep(ws: WebSocket):\n await ws.accept()\n initial_msg = await asyncio.wait_for(\n ws.receive_text(),\n timeout=5,\n )\n if initial_msg == \"SECRET_HUSH_HUSH\":\n return\n raise WebSocketException(\n code=status.WS_1008_POLICY_VIOLATION,\n reason=\"Not authenticated\",\n )\n\n\n@app.websocket(\"/\")\nasync def fastws_stream(\n client: Annotated[Client, Depends(service.manage)],\n _=Depends(custom_dep),\n):\n await service.serve(client)\n```\n\n## Heartbeats and connection lifespan\n\nTo handle a WebSocket's lifespan at an application level, FastWS tries to help you by using `asyncio.timeout()`-context managers in its `serve(client)`-function.\n\nYou can set the both:\n- `heartbeat_interval`: Meaning a client needs to send a message within this time.\n- `max_connection_lifespan`: Meaning all connections will disconnect when exceeding this time.\n\nThese must set during initialization:\n\n```Python\nfrom fastws import FastWS\n\nservice = FastWS(\n heartbeat_interval=10,\n max_connection_lifespan=300,\n)\n```\n\nBoth `heartbeat_interval` and `max_connection_lifespan` can be set to None to disable any restrictions. Note this is the default.\n\nPlease note that you can also set restrictions in your ASGI-server. These restrictions apply at a protocol/server-level and are different from the restrictions set by your application. Applicable settings for [uvicorn](https://www.uvicorn.org/#command-line-options):\n- `--ws-ping-interval` INTEGER\n- `--ws-ping-timeout` INTEGER\n- `--ws-max-size` INTEGER",
"bugtrack_url": null,
"license": "",
"summary": "FastWS framework. A WebSocket wrapper around FastAPI with auto-documentation using AsyncAPI.",
"version": "0.1.7",
"project_urls": {
"Documentation": "https://github.com/endrekrohn/fastws",
"Homepage": "https://github.com/endrekrohn/fastws",
"Repository": "https://github.com/endrekrohn/fastws"
},
"split_keywords": [
"fastapi",
"pydantic",
"starlette",
"websockets",
"asyncapi"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "efeb8649ba1ccdd4c7882d7c5ecbb0aef1f364b46e5e029ef7efc6170e2da6ee",
"md5": "15aaa4ec2e03f602d2d917bca224fdfb",
"sha256": "835fee169a02007ab6e2d59c15cc3e0180f5cfbe1786e9e9ec95016f2fe81531"
},
"downloads": -1,
"filename": "fastws-0.1.7-py3-none-any.whl",
"has_sig": false,
"md5_digest": "15aaa4ec2e03f602d2d917bca224fdfb",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.11,<4.0",
"size": 13668,
"upload_time": "2023-08-07T15:13:22",
"upload_time_iso_8601": "2023-08-07T15:13:22.749213Z",
"url": "https://files.pythonhosted.org/packages/ef/eb/8649ba1ccdd4c7882d7c5ecbb0aef1f364b46e5e029ef7efc6170e2da6ee/fastws-0.1.7-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "726d3bfee5cd321ce19919bf4699be72600b25e0d9147f7f87f987f28f033b38",
"md5": "678d07417a63c26688a8cfb6ce2a1d86",
"sha256": "9529b7a7bbdfa8ea85c90dcd154b0c9795f728588b064b5d2d5689997e880dc8"
},
"downloads": -1,
"filename": "fastws-0.1.7.tar.gz",
"has_sig": false,
"md5_digest": "678d07417a63c26688a8cfb6ce2a1d86",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.11,<4.0",
"size": 14734,
"upload_time": "2023-08-07T15:13:24",
"upload_time_iso_8601": "2023-08-07T15:13:24.873697Z",
"url": "https://files.pythonhosted.org/packages/72/6d/3bfee5cd321ce19919bf4699be72600b25e0d9147f7f87f987f28f033b38/fastws-0.1.7.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-08-07 15:13:24",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "endrekrohn",
"github_project": "fastws",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "fastws"
}