Name | sse-starlette JSON |
Version |
2.2.0
JSON |
| download |
home_page | None |
Summary | SSE plugin for Starlette |
upload_time | 2024-12-21 17:41:11 |
maintainer | None |
docs_url | None |
author | None |
requires_python | >=3.9 |
license | BSD-3-Clause |
keywords |
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# Server Sent Events for [Starlette](https://github.com/encode/starlette) and [FastAPI](https://fastapi.tiangolo.com/)
[![Downloads](https://static.pepy.tech/badge/sse-starlette/week)](https://pepy.tech/project/sse-starlette)
[![PyPI Version][pypi-image]][pypi-url]
[![Build Status][build-image]][build-url]
[![Code Coverage][coverage-image]][coverage-url]
> Implements the [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) specification.
Background: https://sysid.github.io/server-sent-events/
Installation:
```shell
pip install sse-starlette
```
Usage:
```python
import asyncio
import uvicorn
from starlette.applications import Starlette
from starlette.routing import Route
from sse_starlette.sse import EventSourceResponse
async def numbers(minimum, maximum):
for i in range(minimum, maximum + 1):
await asyncio.sleep(0.9)
yield dict(data=i)
async def sse(request):
generator = numbers(1, 5)
return EventSourceResponse(generator)
routes = [
Route("/", endpoint=sse)
]
app = Starlette(debug=True, routes=routes)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000, log_level='info')
```
Output:
![output](output.png)
**Caveat:** SSE streaming does not work in combination with [GZipMiddleware](https://github.com/encode/starlette/issues/20#issuecomment-704106436).
Be aware that for proper server shutdown your application must stop all
running tasks (generators). Otherwise you might experience the following warnings
at shutdown: `Waiting for background tasks to complete. (CTRL+C to force quit)`.
Client disconnects need to be handled in your Request handler (see example.py):
```python
async def endless(req: Request):
async def event_publisher():
i = 0
try:
while True:
i += 1
yield dict(data=i)
await asyncio.sleep(0.2)
except asyncio.CancelledError as e:
_log.info(f"Disconnected from client (via refresh/close) {req.client}")
# Do any other cleanup, if any
raise e
return EventSourceResponse(event_publisher())
```
# Thread Safety with SQLAlchemy Sessions and Similar Objects
The streaming portion of this package is accomplished via anyio TaskGroups. Care
needs to be taken to avoid passing objects that are not thread-safe to generators
you use to yield streaming data.
For example, if you are using SQLAlchemy, you should not use/pass an `AsyncSession`
object to your generator:
```python
# ❌ This can result in "The garbage collector is trying to clean up non-checked-in connection..." errors
async def bad_route():
async with AsyncSession() as session:
async def generator():
async for row in session.execute(select(User)):
yield dict(data=row)
return EventSourceResponse(generator)
```
Instead, ensure you create sessions within the generator itself
```python
# ✅ This is safe
async def good_route():
async def generator():
async with AsyncSession() as session:
async for row in session.execute(select(User)):
yield dict(data=row)
return EventSourceResponse(generator)
```
## Special use cases
### Customize Ping
By default, the server sends a ping every 15 seconds. You can customize this by:
1. setting the `ping` parameter
2. by changing the `ping` event to a comment event so that it is not visible to the client
```python
@router.get("")
async def handle():
generator = numbers(1, 100)
return EventSourceResponse(
generator,
headers={"Server": "nini"},
ping=5,
ping_message_factory=lambda: ServerSentEvent(**{"comment": "You can't see\r\nthis ping"}),
)
```
### SSE Send Timeout
To avoid 'hanging' connections in case HTTP connection from a certain client was kept open, but the client
stopped reading from the connection you can specifiy a send timeout (see
[#89](https://github.com/sysid/sse-starlette/issues/89)).
```python
EventSourceResponse(..., send_timeout=5) # terminate hanging send call after 5s
```
### Fan out Proxies
Fan out proxies usually rely on response being cacheable. To support that, you can set the value of `Cache-Control`.
For example:
```python
return EventSourceResponse(
generator(), headers={"Cache-Control": "public, max-age=29"}
)
```
### Error Handling
See example: `examples/error_handling.py`
### Sending Responses without Async Generators
Async generators can expose tricky error and cleanup behavior especially when they are interrupted.
[Background: Cleanup in async generators](https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/#cleanup-in-generators-and-async-generators).
Example [`no_async_generators.py`](https://github.com/sysid/sse-starlette/pull/56#issue-1704495339) shows an alternative implementation
that does not rely on async generators but instead uses memory channels (`examples/no_async_generators.py`).
### Using pytest to test SSE Endpoints
When testing more than a single SSE endpoint via pytest, one may encounter the following error: `RuntimeError: <asyncio.locks.Event object at 0xxxx [unset]> is bound to a different event loop`.
A workaround to fix this error is to use the following fixture on all tests that use SSE:
```python
@pytest.fixture
def reset_sse_starlette_appstatus_event():
"""
Fixture that resets the appstatus event in the sse_starlette app.
Should be used on any test that uses sse_starlette to stream events.
"""
# See https://github.com/sysid/sse-starlette/issues/59
from sse_starlette.sse import AppStatus
AppStatus.should_exit_event = None
```
For details, see [Issue#59](https://github.com/sysid/sse-starlette/issues/59#issuecomment-1961678665).
## Development, Contributing
1. install pdm: `pip install pdm`
2. install dependencies using pipenv: `pdm install -d`.
3. To run tests:
### Makefile
- make sure your virtualenv is active
- check `Makefile` for available commands and development support, e.g. run the unit tests:
```shell
make test
make tox
```
For integration testing you can use the provided examples in `tests` and `examples`.
If you are using Postman, please see: https://github.com/sysid/sse-starlette/issues/47#issuecomment-1445953826
<!-- Badges -->
[pypi-image]: https://badge.fury.io/py/sse-starlette.svg
[pypi-url]: https://pypi.org/project/sse-starlette/
[build-image]: https://github.com/sysid/sse-starlette/actions/workflows/build.yml/badge.svg
[build-url]: https://github.com/sysid/sse-starlette/actions/workflows/build.yml
[coverage-image]: https://codecov.io/gh/sysid/sse-starlette/branch/master/graph/badge.svg
[coverage-url]: https://codecov.io/gh/sysid/sse-starlette
Raw data
{
"_id": null,
"home_page": null,
"name": "sse-starlette",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": null,
"author": null,
"author_email": "sysid <sysid@gmx.de>",
"download_url": "https://files.pythonhosted.org/packages/81/71/7532e5d872a19a2d43c47999b5bb602ddf43d0d5bafa2fb7c0ed35cdf5bc/sse_starlette-2.2.0.tar.gz",
"platform": null,
"description": "# Server Sent Events for [Starlette](https://github.com/encode/starlette) and [FastAPI](https://fastapi.tiangolo.com/)\n\n[![Downloads](https://static.pepy.tech/badge/sse-starlette/week)](https://pepy.tech/project/sse-starlette)\n[![PyPI Version][pypi-image]][pypi-url]\n[![Build Status][build-image]][build-url]\n[![Code Coverage][coverage-image]][coverage-url]\n\n> Implements the [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) specification.\n\nBackground: https://sysid.github.io/server-sent-events/\n\nInstallation:\n\n```shell\npip install sse-starlette\n```\n\nUsage:\n\n```python\nimport asyncio\nimport uvicorn\nfrom starlette.applications import Starlette\nfrom starlette.routing import Route\nfrom sse_starlette.sse import EventSourceResponse\n\nasync def numbers(minimum, maximum):\n for i in range(minimum, maximum + 1):\n await asyncio.sleep(0.9)\n yield dict(data=i)\n\nasync def sse(request):\n generator = numbers(1, 5)\n return EventSourceResponse(generator)\n\nroutes = [\n Route(\"/\", endpoint=sse)\n]\n\napp = Starlette(debug=True, routes=routes)\n\nif __name__ == \"__main__\":\n uvicorn.run(app, host=\"0.0.0.0\", port=8000, log_level='info')\n```\n\nOutput:\n![output](output.png)\n\n**Caveat:** SSE streaming does not work in combination with [GZipMiddleware](https://github.com/encode/starlette/issues/20#issuecomment-704106436).\n\nBe aware that for proper server shutdown your application must stop all\nrunning tasks (generators). Otherwise you might experience the following warnings\nat shutdown: `Waiting for background tasks to complete. (CTRL+C to force quit)`.\n\nClient disconnects need to be handled in your Request handler (see example.py):\n```python\nasync def endless(req: Request):\n async def event_publisher():\n i = 0\n try:\n while True:\n i += 1\n yield dict(data=i)\n await asyncio.sleep(0.2)\n except asyncio.CancelledError as e:\n _log.info(f\"Disconnected from client (via refresh/close) {req.client}\")\n # Do any other cleanup, if any\n raise e\n return EventSourceResponse(event_publisher())\n```\n\n# Thread Safety with SQLAlchemy Sessions and Similar Objects\n\nThe streaming portion of this package is accomplished via anyio TaskGroups. Care\nneeds to be taken to avoid passing objects that are not thread-safe to generators\nyou use to yield streaming data.\n\nFor example, if you are using SQLAlchemy, you should not use/pass an `AsyncSession`\nobject to your generator:\n\n```python\n# \u274c This can result in \"The garbage collector is trying to clean up non-checked-in connection...\" errors\nasync def bad_route():\n async with AsyncSession() as session:\n async def generator():\n async for row in session.execute(select(User)):\n yield dict(data=row)\n\n return EventSourceResponse(generator)\n```\n\nInstead, ensure you create sessions within the generator itself\n\n```python\n# \u2705 This is safe\nasync def good_route():\n async def generator():\n async with AsyncSession() as session:\n async for row in session.execute(select(User)):\n yield dict(data=row)\n\n return EventSourceResponse(generator)\n```\n\n## Special use cases\n### Customize Ping\nBy default, the server sends a ping every 15 seconds. You can customize this by:\n1. setting the `ping` parameter\n2. by changing the `ping` event to a comment event so that it is not visible to the client\n```python\n@router.get(\"\")\nasync def handle():\n generator = numbers(1, 100)\n return EventSourceResponse(\n generator,\n headers={\"Server\": \"nini\"},\n ping=5,\n ping_message_factory=lambda: ServerSentEvent(**{\"comment\": \"You can't see\\r\\nthis ping\"}),\n )\n```\n### SSE Send Timeout\nTo avoid 'hanging' connections in case HTTP connection from a certain client was kept open, but the client\nstopped reading from the connection you can specifiy a send timeout (see\n[#89](https://github.com/sysid/sse-starlette/issues/89)).\n```python\nEventSourceResponse(..., send_timeout=5) # terminate hanging send call after 5s\n```\n\n### Fan out Proxies\nFan out proxies usually rely on response being cacheable. To support that, you can set the value of `Cache-Control`.\nFor example:\n```python\nreturn EventSourceResponse(\n generator(), headers={\"Cache-Control\": \"public, max-age=29\"}\n )\n```\n### Error Handling\nSee example: `examples/error_handling.py`\n\n### Sending Responses without Async Generators\nAsync generators can expose tricky error and cleanup behavior especially when they are interrupted.\n\n[Background: Cleanup in async generators](https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/#cleanup-in-generators-and-async-generators).\n\nExample [`no_async_generators.py`](https://github.com/sysid/sse-starlette/pull/56#issue-1704495339) shows an alternative implementation\nthat does not rely on async generators but instead uses memory channels (`examples/no_async_generators.py`).\n\n### Using pytest to test SSE Endpoints\nWhen testing more than a single SSE endpoint via pytest, one may encounter the following error: `RuntimeError: <asyncio.locks.Event object at 0xxxx [unset]> is bound to a different event loop`.\n\nA workaround to fix this error is to use the following fixture on all tests that use SSE:\n\n```python\n@pytest.fixture\ndef reset_sse_starlette_appstatus_event():\n \"\"\"\n Fixture that resets the appstatus event in the sse_starlette app.\n\n Should be used on any test that uses sse_starlette to stream events.\n \"\"\"\n # See https://github.com/sysid/sse-starlette/issues/59\n from sse_starlette.sse import AppStatus\n\n AppStatus.should_exit_event = None\n```\n\nFor details, see [Issue#59](https://github.com/sysid/sse-starlette/issues/59#issuecomment-1961678665).\n\n## Development, Contributing\n1. install pdm: `pip install pdm`\n2. install dependencies using pipenv: `pdm install -d`.\n3. To run tests:\n\n### Makefile\n- make sure your virtualenv is active\n- check `Makefile` for available commands and development support, e.g. run the unit tests:\n```shell\nmake test\nmake tox\n```\n\nFor integration testing you can use the provided examples in `tests` and `examples`.\n\nIf you are using Postman, please see: https://github.com/sysid/sse-starlette/issues/47#issuecomment-1445953826\n\n\n<!-- Badges -->\n\n[pypi-image]: https://badge.fury.io/py/sse-starlette.svg\n[pypi-url]: https://pypi.org/project/sse-starlette/\n[build-image]: https://github.com/sysid/sse-starlette/actions/workflows/build.yml/badge.svg\n[build-url]: https://github.com/sysid/sse-starlette/actions/workflows/build.yml\n[coverage-image]: https://codecov.io/gh/sysid/sse-starlette/branch/master/graph/badge.svg\n[coverage-url]: https://codecov.io/gh/sysid/sse-starlette\n",
"bugtrack_url": null,
"license": "BSD-3-Clause",
"summary": "SSE plugin for Starlette",
"version": "2.2.0",
"project_urls": {
"Source": "https://github.com/sysid/sse-starlette"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "d4ba2a9b98ee28333c32172ccd29c71134b5bee523c7cd55427ed73da403155c",
"md5": "59a376a2f939bf3102665072c4301f40",
"sha256": "b923c0417f96061c4cbf67ebb9f0ae178264abfb6a07c28e1ca8192e18e0361c"
},
"downloads": -1,
"filename": "sse_starlette-2.2.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "59a376a2f939bf3102665072c4301f40",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 10098,
"upload_time": "2024-12-21T17:41:09",
"upload_time_iso_8601": "2024-12-21T17:41:09.566104Z",
"url": "https://files.pythonhosted.org/packages/d4/ba/2a9b98ee28333c32172ccd29c71134b5bee523c7cd55427ed73da403155c/sse_starlette-2.2.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "81717532e5d872a19a2d43c47999b5bb602ddf43d0d5bafa2fb7c0ed35cdf5bc",
"md5": "35f4edc58ac8da09285624540c2e318e",
"sha256": "2be010484d29a0a1ac6b5ae72610ada3bb00f1be0ef0048a8b87f9dba1e99fcb"
},
"downloads": -1,
"filename": "sse_starlette-2.2.0.tar.gz",
"has_sig": false,
"md5_digest": "35f4edc58ac8da09285624540c2e318e",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 20584,
"upload_time": "2024-12-21T17:41:11",
"upload_time_iso_8601": "2024-12-21T17:41:11.988814Z",
"url": "https://files.pythonhosted.org/packages/81/71/7532e5d872a19a2d43c47999b5bb602ddf43d0d5bafa2fb7c0ed35cdf5bc/sse_starlette-2.2.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-12-21 17:41:11",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "sysid",
"github_project": "sse-starlette",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"tox": true,
"lcname": "sse-starlette"
}