<p align="center">
<img src="https://i.ibb.co/NV6wmy8/pubsub.png" width="55%" alt="pubsub" border="0">
</p>
#
# ⚡🗞️ FastAPI Websocket Pub/Sub
<a href="https://github.com/permitio/fastapi_websocket_pubsub/actions?query=workflow%3ATests" target="_blank">
<img src="https://github.com/permitio/fastapi_websocket_pubsub/workflows/Tests/badge.svg?branch=master" alt="Tests">
</a>
<a href="https://pypi.org/project/fastapi-websocket-pubsub/" target="_blank">
<img src="https://img.shields.io/pypi/v/fastapi-websocket-pubsub?color=%2331C654&label=PyPi%20package" alt="Package">
</a>
<a href="https://pepy.tech/project/fastapi-websocket-pubsub" target="_blank">
<img src="https://static.pepy.tech/personalized-badge/fastapi-websocket-pubsub?period=total&units=international_system&left_color=black&right_color=blue&left_text=Downloads" alt="Downloads">
</a>
A fast and durable Pub/Sub channel over Websockets.
The easiest way to create a live publish / subscribe multi-cast over the web.
Supports and tested on Python >= 3.7
As seen at <a href="https://www.youtube.com/watch?v=KP7tPeKhT3o" target="_blank">PyCon IL 2021</a> and <a href="https://www.youtube.com/watch?v=IuMZVWEUvGs" target="_blank">EuroPython 2021</a>
## Installation 🛠️
```
pip install fastapi_websocket_pubsub_plus
```
## Intro
The classic pub/sub pattern made easily accessible and scalable over the web and across your cloud in realtime; while enjoying the benefits of FastAPI (e.g. dependency injection).
FastAPI + WebSockets + PubSub == ⚡💪 ❤️
- Subscribe
- Clients subscribe to topics (arbitrary strings) and receive relevant events along with structured data (serialized with Pydantic).
```python
# Callback to be called upon event being published on server
async def on_event(data):
print("We got an event! with data- ", data)
# Subscribe for the event
client.subscribe("my event", on_event)
```
- Publish
- Directly from server code to connected clients.
```python
app = FastAPI()
endpoint = PubSubEndpoint()
endpoint.register_route(app, path="/pubsub")
endpoint.publish(["my_event_topic"], data=["my", "data", 1])
```
- From client to client (through the servers)
```python
async with PubSubClient(server_uri="ws://localhost/pubsub") as client:
endpoint.publish(["my_event_topic"], data=["my", "data", 1])
```
- Across server instances (using [broadcaster](https://pypi.org/project/broadcaster/) and a backend medium (e.g. Redis, Kafka, ...))
- No matter which server a client connects to - it will get the messages it subscribes to
```python
app = FastAPI()
endpoint = PubSubEndpoint(broadcaster="postgres://localhost:5432/")
@app.websocket("/pubsub")
async def websocket_rpc_endpoint(websocket: WebSocket):
await endpoint.main_loop(websocket)
```
see [examples/pubsub_broadcaster_server_example.py](examples/pubsub_broadcaster_server_example.py) for full usage example
## Usage example (server publishing following HTTP trigger):
In the code below, a client connects to the server and subscribes to a topic named "triggered".
Aside from PubSub websocket, the server also exposes a regular http route, which triggers publication of the event.
### Server:
```python
import asyncio
import uvicorn
from fastapi import FastAPI
from fastapi.routing import APIRouter
from fastapi_websocket_pubsub import PubSubEndpoint
app = FastAPI()
# Init endpoint
endpoint = PubSubEndpoint()
# register the endpoint on the app
endpoint.register_route(app, "/pubsub")
# Register a regular HTTP route
@app.get("/trigger")
async def trigger_events():
# Upon request trigger an event
endpoint.publish(["triggered"])
```
### Client:
```python
from fastapi_websocket_pubsub_plus import PubSubClient
# Callback to be called upon event being published on server
async def on_trigger(data):
print("Trigger URL was accessed")
async with PubSubClient(server_uri="ws://localhost/pubsub") as client:
# Subscribe for the event
client.subscribe("triggered", on_trigger)
```
## More Examples
- See the [examples](/examples) and [tests](/tests) folders for more server and client examples.
- See [fastapi-websocket-rpc depends example](https://github.com/permitio/fastapi_websocket_rpc/blob/master/tests/fast_api_depends_test.py) to see how to combine with FASTAPI dependency injections
## What can I do with this?
The combination of Websockets, and bi-directional Pub/Sub is ideal to create realtime data propagation solution at scale over the web.
- Update mechanism
- Remote control mechanism
- Data processing
- Distributed computing
- Realtime communications over the web
## Foundations:
- Based on [fastapi-websocket-rpc](https://github.com/permitio/fastapi_websocket_rpc) for a robust realtime bidirectional channel
- Based on [broadcaster](https://pypi.org/project/broadcaster/) for syncing server instances
- Server Endpoint:
- Based on [FastAPI](https://github.com/tiangolo/fastapi): enjoy all the benefits of a full ASGI platform, including Async-io and dependency injections (for example to authenticate connections)
- Based on [Pydantic](https://pydantic-docs.helpmanual.io/): easily serialize structured data as part of RPC requests and responses. Simply Pass Pydantic data models as PubSub published data to have it available as part of an event.
- Client :
- Based on [Tenacity](https://tenacity.readthedocs.io/en/latest/index.html): allowing configurable retries to keep to connection alive
- see WebSocketRpcClient.__init__'s retry_config
- Based on python [websockets](https://websockets.readthedocs.io/en/stable/intro.html) - a more comprehensive client than the one offered by FastAPI
## Logging
fastapi-websocket-pubsub uses fastapi-websocket-rpc for logging config.
It provides a helper logging module to control how it produces logs for you.
See [fastapi_websocket_rpc/logger.py](fastapi_websocket_rpc/logger.py).
Use ```logging_config.set_mode``` or the 'WS_RPC_LOGGING' environment variable to choose the logging method you prefer.
Or override completely via default logging config (e.g. 'logging.config.dictConfig'), all logger name start with: 'fastapi.ws_rpc.pubsub'
example:
```python
# set RPC to log like UVICORN
from fastapi_websocket_rpc.logger import logging_config, LoggingModes
logging_config.set_mode(LoggingModes.UVICORN)
```
## Pull requests - welcome!
- Please include tests for new features
Raw data
{
"_id": null,
"home_page": "https://github.com/vishal-singh-baraiya/fastapi_websocket_pubsub_plus",
"name": "fastapi-websocket-pubsub-plus",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": null,
"keywords": null,
"author": "Vishal Singh Baraiya",
"author_email": "realvixhal@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/16/f1/32f4798ffe762e245a65b31d1a6b0be5cd522e219704c0fd3262105bbf90/fastapi_websocket_pubsub_plus-0.1.1.tar.gz",
"platform": null,
"description": "<p align=\"center\">\n<img src=\"https://i.ibb.co/NV6wmy8/pubsub.png\" width=\"55%\" alt=\"pubsub\" border=\"0\">\n</p>\n\n#\n\n# \u26a1\ud83d\uddde\ufe0f FastAPI Websocket Pub/Sub \n\n<a href=\"https://github.com/permitio/fastapi_websocket_pubsub/actions?query=workflow%3ATests\" target=\"_blank\">\n <img src=\"https://github.com/permitio/fastapi_websocket_pubsub/workflows/Tests/badge.svg?branch=master\" alt=\"Tests\">\n</a>\n\n<a href=\"https://pypi.org/project/fastapi-websocket-pubsub/\" target=\"_blank\">\n <img src=\"https://img.shields.io/pypi/v/fastapi-websocket-pubsub?color=%2331C654&label=PyPi%20package\" alt=\"Package\">\n</a>\n<a href=\"https://pepy.tech/project/fastapi-websocket-pubsub\" target=\"_blank\">\n <img src=\"https://static.pepy.tech/personalized-badge/fastapi-websocket-pubsub?period=total&units=international_system&left_color=black&right_color=blue&left_text=Downloads\" alt=\"Downloads\">\n</a>\n\n\nA fast and durable Pub/Sub channel over Websockets.\nThe easiest way to create a live publish / subscribe multi-cast over the web.\n\nSupports and tested on Python >= 3.7 \n\nAs seen at <a href=\"https://www.youtube.com/watch?v=KP7tPeKhT3o\" target=\"_blank\">PyCon IL 2021</a> and <a href=\"https://www.youtube.com/watch?v=IuMZVWEUvGs\" target=\"_blank\">EuroPython 2021</a>\n\n\n## Installation \ud83d\udee0\ufe0f\n```\npip install fastapi_websocket_pubsub_plus\n```\n\n\n## Intro\nThe classic pub/sub pattern made easily accessible and scalable over the web and across your cloud in realtime; while enjoying the benefits of FastAPI (e.g. dependency injection).\n\nFastAPI + WebSockets + PubSub == \u26a1\ud83d\udcaa \u2764\ufe0f\n\n\n- Subscribe\n - Clients subscribe to topics (arbitrary strings) and receive relevant events along with structured data (serialized with Pydantic).\n ```python\n # Callback to be called upon event being published on server\n async def on_event(data):\n print(\"We got an event! with data- \", data)\n # Subscribe for the event \n client.subscribe(\"my event\", on_event)\n ```\n\n- Publish \n - Directly from server code to connected clients. \n ```python\n app = FastAPI() \n endpoint = PubSubEndpoint()\n endpoint.register_route(app, path=\"/pubsub\")\n endpoint.publish([\"my_event_topic\"], data=[\"my\", \"data\", 1])\n ```\n - From client to client (through the servers)\n ```python \n async with PubSubClient(server_uri=\"ws://localhost/pubsub\") as client:\n endpoint.publish([\"my_event_topic\"], data=[\"my\", \"data\", 1])\n ``` \n - Across server instances (using [broadcaster](https://pypi.org/project/broadcaster/) and a backend medium (e.g. Redis, Kafka, ...))\n - No matter which server a client connects to - it will get the messages it subscribes to\n ```python\n app = FastAPI() \n endpoint = PubSubEndpoint(broadcaster=\"postgres://localhost:5432/\")\n \n @app.websocket(\"/pubsub\")\n async def websocket_rpc_endpoint(websocket: WebSocket):\n await endpoint.main_loop(websocket)\n ```\n see [examples/pubsub_broadcaster_server_example.py](examples/pubsub_broadcaster_server_example.py) for full usage example \n\n\n\n## Usage example (server publishing following HTTP trigger):\nIn the code below, a client connects to the server and subscribes to a topic named \"triggered\".\nAside from PubSub websocket, the server also exposes a regular http route, which triggers publication of the event. \n\n### Server:\n```python\nimport asyncio\nimport uvicorn\nfrom fastapi import FastAPI\nfrom fastapi.routing import APIRouter\n\nfrom fastapi_websocket_pubsub import PubSubEndpoint\napp = FastAPI()\n# Init endpoint\nendpoint = PubSubEndpoint()\n# register the endpoint on the app\nendpoint.register_route(app, \"/pubsub\")\n# Register a regular HTTP route\n@app.get(\"/trigger\")\nasync def trigger_events():\n # Upon request trigger an event\n endpoint.publish([\"triggered\"])\n```\n### Client:\n```python\nfrom fastapi_websocket_pubsub_plus import PubSubClient\n# Callback to be called upon event being published on server\nasync def on_trigger(data):\n print(\"Trigger URL was accessed\")\n\nasync with PubSubClient(server_uri=\"ws://localhost/pubsub\") as client:\n # Subscribe for the event \n client.subscribe(\"triggered\", on_trigger)\n\n```\n\n## More Examples\n- See the [examples](/examples) and [tests](/tests) folders for more server and client examples.\n- See [fastapi-websocket-rpc depends example](https://github.com/permitio/fastapi_websocket_rpc/blob/master/tests/fast_api_depends_test.py) to see how to combine with FASTAPI dependency injections\n\n## What can I do with this?\nThe combination of Websockets, and bi-directional Pub/Sub is ideal to create realtime data propagation solution at scale over the web. \n - Update mechanism\n - Remote control mechanism\n - Data processing\n - Distributed computing\n - Realtime communications over the web \n\n\n## Foundations:\n\n- Based on [fastapi-websocket-rpc](https://github.com/permitio/fastapi_websocket_rpc) for a robust realtime bidirectional channel\n\n- Based on [broadcaster](https://pypi.org/project/broadcaster/) for syncing server instances\n\n- Server Endpoint:\n\n - Based on [FastAPI](https://github.com/tiangolo/fastapi): enjoy all the benefits of a full ASGI platform, including Async-io and dependency injections (for example to authenticate connections)\n\n - Based on [Pydantic](https://pydantic-docs.helpmanual.io/): easily serialize structured data as part of RPC requests and responses. Simply Pass Pydantic data models as PubSub published data to have it available as part of an event. \n\n- Client :\n - Based on [Tenacity](https://tenacity.readthedocs.io/en/latest/index.html): allowing configurable retries to keep to connection alive\n - see WebSocketRpcClient.__init__'s retry_config \n\n - Based on python [websockets](https://websockets.readthedocs.io/en/stable/intro.html) - a more comprehensive client than the one offered by FastAPI\n\n## Logging \nfastapi-websocket-pubsub uses fastapi-websocket-rpc for logging config.\nIt provides a helper logging module to control how it produces logs for you.\nSee [fastapi_websocket_rpc/logger.py](fastapi_websocket_rpc/logger.py).\nUse ```logging_config.set_mode``` or the 'WS_RPC_LOGGING' environment variable to choose the logging method you prefer.\nOr override completely via default logging config (e.g. 'logging.config.dictConfig'), all logger name start with: 'fastapi.ws_rpc.pubsub'\n\nexample:\n```python\n# set RPC to log like UVICORN\nfrom fastapi_websocket_rpc.logger import logging_config, LoggingModes\nlogging_config.set_mode(LoggingModes.UVICORN)\n```\n\n## Pull requests - welcome!\n- Please include tests for new features \n\n\n",
"bugtrack_url": null,
"license": null,
"summary": "A fast and durable PubSub channel over Websockets (using fastapi-websockets-rpc).",
"version": "0.1.1",
"project_urls": {
"Homepage": "https://github.com/vishal-singh-baraiya/fastapi_websocket_pubsub_plus"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "568516c830ef43c66ef7bbb8393f05aa1c6a4670b1576230abe7d1e62900184c",
"md5": "297b2c069fb85609c9b49b3661c703b0",
"sha256": "9e6957ec3371a048574557b07a4c7f69d0e3dd73a602ecd979854f849ce71845"
},
"downloads": -1,
"filename": "fastapi_websocket_pubsub_plus-0.1.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "297b2c069fb85609c9b49b3661c703b0",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 21964,
"upload_time": "2024-12-26T15:23:51",
"upload_time_iso_8601": "2024-12-26T15:23:51.318969Z",
"url": "https://files.pythonhosted.org/packages/56/85/16c830ef43c66ef7bbb8393f05aa1c6a4670b1576230abe7d1e62900184c/fastapi_websocket_pubsub_plus-0.1.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "16f132f4798ffe762e245a65b31d1a6b0be5cd522e219704c0fd3262105bbf90",
"md5": "508993e5a40a94d485d7d2da807f7784",
"sha256": "776ae2ee551a319004b3db8bb461483843cd01b57be96c108c380bdf8db3bcc6"
},
"downloads": -1,
"filename": "fastapi_websocket_pubsub_plus-0.1.1.tar.gz",
"has_sig": false,
"md5_digest": "508993e5a40a94d485d7d2da807f7784",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 20654,
"upload_time": "2024-12-26T15:23:53",
"upload_time_iso_8601": "2024-12-26T15:23:53.768915Z",
"url": "https://files.pythonhosted.org/packages/16/f1/32f4798ffe762e245a65b31d1a6b0be5cd522e219704c0fd3262105bbf90/fastapi_websocket_pubsub_plus-0.1.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-12-26 15:23:53",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "vishal-singh-baraiya",
"github_project": "fastapi_websocket_pubsub_plus",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"requirements": [
{
"name": "fastapi-websocket-rpc",
"specs": [
[
"<",
"1"
],
[
">=",
"0.1.25"
]
]
},
{
"name": "packaging",
"specs": [
[
">=",
"20.4"
]
]
},
{
"name": "permit-broadcaster",
"specs": [
[
"<",
"3"
],
[
">=",
"0.2.5"
]
]
},
{
"name": "pydantic",
"specs": [
[
">=",
"1.9.1"
]
]
},
{
"name": "websockets",
"specs": [
[
">=",
"10.3"
]
]
}
],
"lcname": "fastapi-websocket-pubsub-plus"
}