Name | sse-starlette JSON |
Version |
3.0.2
JSON |
| download |
home_page | None |
Summary | SSE plugin for Starlette |
upload_time | 2025-07-27 09:07:44 |
maintainer | None |
docs_url | None |
author | None |
requires_python | >=3.9 |
license | None |
keywords |
|
VCS |
 |
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# Server-Sent Events for [Starlette](https://github.com/encode/starlette) and [FastAPI](https://fastapi.tiangolo.com/)
[](https://pepy.tech/project/sse-starlette)
[![PyPI Version][pypi-image]][pypi-url]
[![Build Status][build-image]][build-url]
> Background: https://sysid.github.io/server-sent-events/
Production ready Server-Sent Events implementation for Starlette and FastAPI following the [W3C SSE specification](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events).
## Installation
```bash
pip install sse-starlette
uv add sse-starlette
# To run the examples and demonstrations
uv add sse-starlette[examples]
# Recommended ASGI server
uv add sse-starlette[uvicorn,granian,daphne]
```
## Quick Start
```python
import asyncio
from starlette.applications import Starlette
from starlette.routing import Route
from sse_starlette import EventSourceResponse
async def generate_events():
for i in range(10):
yield {"data": f"Event {i}"}
await asyncio.sleep(1)
async def sse_endpoint(request):
return EventSourceResponse(generate_events())
app = Starlette(routes=[Route("/events", sse_endpoint)])
```
## Core Features
- **Standards Compliant**: Full SSE specification implementation
- **Framework Integration**: Native Starlette and FastAPI support
- **Async/Await**: Built on modern Python async patterns
- **Connection Management**: Automatic client disconnect detection
- **Graceful Shutdown**: Proper cleanup on server termination
- **Thread Safety**: Context-local event management for multi-threaded applications
- **Multi-Loop Support**: Works correctly with multiple asyncio event loops
## Key Components
### EventSourceResponse
The main response class that handles SSE streaming:
```python
from sse_starlette import EventSourceResponse
# Basic usage
async def stream_data():
for item in data:
yield {"data": item, "event": "update", "id": str(item.id)}
return EventSourceResponse(stream_data())
```
### ServerSentEvent
For structured event creation:
```python
from sse_starlette import ServerSentEvent
event = ServerSentEvent(
data="Custom message",
event="notification",
id="msg-123",
retry=5000
)
```
### JSONServerSentEvent
For an easy way to send json data as SSE events:
```python
from sse_starlette import JSONServerSentEvent
event = JSONServerSentEvent(
data={"field":"value"}, # Anything serializable with json.dumps
)
```
## Advanced Usage
### Custom Ping Configuration
```python
from sse_starlette import ServerSentEvent
def custom_ping():
return ServerSentEvent(comment="Custom ping message")
return EventSourceResponse(
generate_events(),
ping=10, # Ping every 10 seconds
ping_message_factory=custom_ping
)
```
### Multi-Threaded Usage
sse-starlette now supports usage in multi-threaded applications and with multiple asyncio event loops:
```python
import threading
import asyncio
from sse_starlette import EventSourceResponse
def run_sse_in_thread():
"""SSE streaming works correctly in separate threads"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def thread_events():
for i in range(5):
yield {"data": f"Thread event {i}"}
await asyncio.sleep(1)
# This works without "Event bound to different loop" errors
response = EventSourceResponse(thread_events())
loop.close()
# Start SSE in multiple threads
for i in range(3):
thread = threading.Thread(target=run_sse_in_thread)
thread.start()
```
### Database Streaming (Thread-Safe)
```python
async def stream_database_results(request):
# CORRECT: Create session within generator context
async with AsyncSession() as session:
results = await session.execute(select(User))
for row in results:
if await request.is_disconnected():
break
yield {"data": row.name, "id": str(row.id)}
return EventSourceResponse(stream_database_results(request))
```
### Error Handling and Timeouts
```python
async def robust_stream(request):
try:
for i in range(100):
if await request.is_disconnected():
break
yield {"data": f"Item {i}"}
await asyncio.sleep(0.5)
except asyncio.CancelledError:
# Client disconnected - perform cleanup
raise
return EventSourceResponse(
robust_stream(request),
send_timeout=30, # Timeout hanging sends
headers={"Cache-Control": "no-cache"}
)
```
### Memory Channels Alternative
For complex data flows, use memory channels instead of generators:
```python
import anyio
from functools import partial
async def data_producer(send_channel):
async with send_channel:
for i in range(10):
await send_channel.send({"data": f"Item {i}"})
await anyio.sleep(1)
async def channel_endpoint(request):
send_channel, receive_channel = anyio.create_memory_object_stream(10)
return EventSourceResponse(
receive_channel,
data_sender_callable=partial(data_producer, send_channel)
)
```
## Configuration Options
### EventSourceResponse Parameters
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `content` | `ContentStream` | Required | Async generator or iterable |
| `ping` | `int` | 15 | Ping interval in seconds |
| `sep` | `str` | `"\r\n"` | Line separator (`\r\n`, `\r`, `\n`) |
| `send_timeout` | `float` | `None` | Send operation timeout |
| `headers` | `dict` | `None` | Additional HTTP headers |
| `ping_message_factory` | `Callable` | `None` | Custom ping message creator |
### Client Disconnection
```python
async def monitored_stream(request):
events_sent = 0
try:
while events_sent < 100:
if await request.is_disconnected():
print(f"Client disconnected after {events_sent} events")
break
yield {"data": f"Event {events_sent}"}
events_sent += 1
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Stream cancelled")
raise
```
## Testing
sse-starlette includes now comprehensive test isolation without manual setup. The library automatically handles event loop contexts, eliminating the need for manual state resets:
```python
# this is deprecated and not needed since version 3.0.0
import pytest
from sse_starlette import EventSourceResponse
@pytest.fixture
def reset_sse_app_status():
AppStatus.should_exit_event = None
yield
AppStatus.should_exit_event = None
```
## Production Considerations
### Performance Limits
- **Memory**: Each connection maintains a buffer. Monitor memory usage.
- **Connections**: Limited by system file descriptors and application design.
- **Network**: High-frequency events can saturate bandwidth.
### Error Recovery
Implement client-side reconnection logic:
```javascript
function createEventSource(url) {
const eventSource = new EventSource(url);
eventSource.onerror = function() {
setTimeout(() => {
createEventSource(url); // Reconnect after delay
}, 5000);
};
return eventSource;
}
```
## Learning Resources
### Examples Directory
The `examples/` directory contains production-ready patterns:
- **`01_basic_sse.py`**: Fundamental SSE concepts
- **`02_message_broadcasting.py`**: Multi-client message distribution
- **`03_database_streaming.py`**: Thread-safe database integration
- **`04_advanced_features.py`**: Custom protocols and error handling
### Demonstrations Directory
The `examples/demonstrations/` directory provides educational scenarios:
**Basic Patterns** (`basic_patterns/`):
- Client disconnect detection and cleanup
- Graceful server shutdown behavior
**Production Scenarios** (`production_scenarios/`):
- Load testing with concurrent clients
- Network interruption handling
**Advanced Patterns** (`advanced_patterns/`):
- Memory channels vs generators
- Error recovery and circuit breakers
- Custom protocol development
Run any demonstration:
```bash
python examples/demonstrations/basic_patterns/client_disconnect.py
python examples/demonstrations/production_scenarios/load_simulations.py
python examples/demonstrations/advanced_patterns/error_recovery.py
```
## Troubleshooting
### Common Issues
**Database session errors with async generators**
- Create database sessions inside generators, not as dependencies
**Hanging connections after client disconnect**
- Always check `await request.is_disconnected()` in loops
- Use `send_timeout` parameter to detect dead connections
If you are using Postman, please see: https://github.com/sysid/sse-starlette/issues/47#issuecomment-1445953826
### Performance Optimization
```python
# Connection limits
class ConnectionLimiter:
def __init__(self, max_connections=100):
self.semaphore = asyncio.Semaphore(max_connections)
async def limited_endpoint(self, request):
async with self.semaphore:
return EventSourceResponse(generate_events())
```
## Contributing
See examples and demonstrations for implementation patterns. Run tests with:
```bash
make test-unit # Unit tests only
make test # All tests including integration
```
<!-- Badges -->
[pypi-image]: https://badge.fury.io/py/sse-starlette.svg
[pypi-url]: https://pypi.org/project/sse-starlette/
[build-image]: https://github.com/sysid/sse-starlette/actions/workflows/build.yml/badge.svg
[build-url]: https://github.com/sysid/sse-starlette/actions/workflows/build.yml
[coverage-image]: https://codecov.io/gh/sysid/sse-starlette/branch/master/graph/badge.svg
<!--[coverage-url]: https://codecov.io/gh/sysid/sse-starlette-->
Raw data
{
"_id": null,
"home_page": null,
"name": "sse-starlette",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": null,
"author": null,
"author_email": "sysid <sysid@gmx.de>",
"download_url": "https://files.pythonhosted.org/packages/42/6f/22ed6e33f8a9e76ca0a412405f31abb844b779d52c5f96660766edcd737c/sse_starlette-3.0.2.tar.gz",
"platform": null,
"description": "# Server-Sent Events for [Starlette](https://github.com/encode/starlette) and [FastAPI](https://fastapi.tiangolo.com/)\n\n[](https://pepy.tech/project/sse-starlette)\n[![PyPI Version][pypi-image]][pypi-url]\n[![Build Status][build-image]][build-url]\n\n> Background: https://sysid.github.io/server-sent-events/\n\nProduction ready Server-Sent Events implementation for Starlette and FastAPI following the [W3C SSE specification](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events).\n\n## Installation\n\n```bash\npip install sse-starlette\nuv add sse-starlette\n\n# To run the examples and demonstrations\nuv add sse-starlette[examples]\n\n# Recommended ASGI server\nuv add sse-starlette[uvicorn,granian,daphne]\n```\n\n## Quick Start\n\n```python\nimport asyncio\nfrom starlette.applications import Starlette\nfrom starlette.routing import Route\nfrom sse_starlette import EventSourceResponse\n\nasync def generate_events():\n for i in range(10):\n yield {\"data\": f\"Event {i}\"}\n await asyncio.sleep(1)\n\nasync def sse_endpoint(request):\n return EventSourceResponse(generate_events())\n\napp = Starlette(routes=[Route(\"/events\", sse_endpoint)])\n```\n\n## Core Features\n\n- **Standards Compliant**: Full SSE specification implementation\n- **Framework Integration**: Native Starlette and FastAPI support\n- **Async/Await**: Built on modern Python async patterns\n- **Connection Management**: Automatic client disconnect detection\n- **Graceful Shutdown**: Proper cleanup on server termination\n- **Thread Safety**: Context-local event management for multi-threaded applications\n- **Multi-Loop Support**: Works correctly with multiple asyncio event loops\n\n## Key Components\n\n### EventSourceResponse\n\nThe main response class that handles SSE streaming:\n\n```python\nfrom sse_starlette import EventSourceResponse\n\n# Basic usage\nasync def stream_data():\n for item in data:\n yield {\"data\": item, \"event\": \"update\", \"id\": str(item.id)}\n\nreturn EventSourceResponse(stream_data())\n```\n\n### ServerSentEvent\n\nFor structured event creation:\n\n```python\nfrom sse_starlette import ServerSentEvent\n\nevent = ServerSentEvent(\n data=\"Custom message\",\n event=\"notification\", \n id=\"msg-123\",\n retry=5000\n)\n```\n\n### JSONServerSentEvent\n\nFor an easy way to send json data as SSE events:\n\n```python\nfrom sse_starlette import JSONServerSentEvent\n\nevent = JSONServerSentEvent(\n data={\"field\":\"value\"}, # Anything serializable with json.dumps\n)\n```\n\n## Advanced Usage\n\n### Custom Ping Configuration\n\n```python\nfrom sse_starlette import ServerSentEvent\n\ndef custom_ping():\n return ServerSentEvent(comment=\"Custom ping message\")\n\nreturn EventSourceResponse(\n generate_events(),\n ping=10, # Ping every 10 seconds\n ping_message_factory=custom_ping\n)\n```\n\n### Multi-Threaded Usage\n\nsse-starlette now supports usage in multi-threaded applications and with multiple asyncio event loops:\n\n```python\nimport threading\nimport asyncio\nfrom sse_starlette import EventSourceResponse\n\ndef run_sse_in_thread():\n \"\"\"SSE streaming works correctly in separate threads\"\"\"\n loop = asyncio.new_event_loop()\n asyncio.set_event_loop(loop)\n \n async def thread_events():\n for i in range(5):\n yield {\"data\": f\"Thread event {i}\"}\n await asyncio.sleep(1)\n \n # This works without \"Event bound to different loop\" errors\n response = EventSourceResponse(thread_events())\n loop.close()\n\n# Start SSE in multiple threads\nfor i in range(3):\n thread = threading.Thread(target=run_sse_in_thread)\n thread.start()\n```\n\n### Database Streaming (Thread-Safe)\n\n```python\nasync def stream_database_results(request):\n # CORRECT: Create session within generator context\n async with AsyncSession() as session:\n results = await session.execute(select(User))\n for row in results:\n if await request.is_disconnected():\n break\n yield {\"data\": row.name, \"id\": str(row.id)}\n\nreturn EventSourceResponse(stream_database_results(request))\n```\n\n### Error Handling and Timeouts\n\n```python\nasync def robust_stream(request):\n try:\n for i in range(100):\n if await request.is_disconnected():\n break\n yield {\"data\": f\"Item {i}\"}\n await asyncio.sleep(0.5)\n except asyncio.CancelledError:\n # Client disconnected - perform cleanup\n raise\n\nreturn EventSourceResponse(\n robust_stream(request),\n send_timeout=30, # Timeout hanging sends\n headers={\"Cache-Control\": \"no-cache\"}\n)\n```\n\n### Memory Channels Alternative\n\nFor complex data flows, use memory channels instead of generators:\n\n```python\nimport anyio\nfrom functools import partial\n\nasync def data_producer(send_channel):\n async with send_channel:\n for i in range(10):\n await send_channel.send({\"data\": f\"Item {i}\"})\n await anyio.sleep(1)\n\nasync def channel_endpoint(request):\n send_channel, receive_channel = anyio.create_memory_object_stream(10)\n \n return EventSourceResponse(\n receive_channel,\n data_sender_callable=partial(data_producer, send_channel)\n )\n```\n\n## Configuration Options\n\n### EventSourceResponse Parameters\n\n| Parameter | Type | Default | Description |\n|-----------|------|---------|-------------|\n| `content` | `ContentStream` | Required | Async generator or iterable |\n| `ping` | `int` | 15 | Ping interval in seconds |\n| `sep` | `str` | `\"\\r\\n\"` | Line separator (`\\r\\n`, `\\r`, `\\n`) |\n| `send_timeout` | `float` | `None` | Send operation timeout |\n| `headers` | `dict` | `None` | Additional HTTP headers |\n| `ping_message_factory` | `Callable` | `None` | Custom ping message creator |\n\n### Client Disconnection\n\n```python\nasync def monitored_stream(request):\n events_sent = 0\n try:\n while events_sent < 100:\n if await request.is_disconnected():\n print(f\"Client disconnected after {events_sent} events\")\n break\n \n yield {\"data\": f\"Event {events_sent}\"}\n events_sent += 1\n await asyncio.sleep(1)\n \n except asyncio.CancelledError:\n print(\"Stream cancelled\")\n raise\n```\n\n## Testing\n\nsse-starlette includes now comprehensive test isolation without manual setup. The library automatically handles event loop contexts, eliminating the need for manual state resets:\n\n```python\n# this is deprecated and not needed since version 3.0.0\nimport pytest\nfrom sse_starlette import EventSourceResponse\n\n@pytest.fixture\ndef reset_sse_app_status():\n AppStatus.should_exit_event = None\n yield\n AppStatus.should_exit_event = None\n```\n\n## Production Considerations\n\n### Performance Limits\n\n- **Memory**: Each connection maintains a buffer. Monitor memory usage.\n- **Connections**: Limited by system file descriptors and application design.\n- **Network**: High-frequency events can saturate bandwidth.\n\n### Error Recovery\n\nImplement client-side reconnection logic:\n\n```javascript\nfunction createEventSource(url) {\n const eventSource = new EventSource(url);\n \n eventSource.onerror = function() {\n setTimeout(() => {\n createEventSource(url); // Reconnect after delay\n }, 5000);\n };\n \n return eventSource;\n}\n```\n\n## Learning Resources\n\n### Examples Directory\n\nThe `examples/` directory contains production-ready patterns:\n\n- **`01_basic_sse.py`**: Fundamental SSE concepts\n- **`02_message_broadcasting.py`**: Multi-client message distribution \n- **`03_database_streaming.py`**: Thread-safe database integration\n- **`04_advanced_features.py`**: Custom protocols and error handling\n\n### Demonstrations Directory \n\nThe `examples/demonstrations/` directory provides educational scenarios:\n\n**Basic Patterns** (`basic_patterns/`):\n- Client disconnect detection and cleanup\n- Graceful server shutdown behavior\n\n**Production Scenarios** (`production_scenarios/`):\n- Load testing with concurrent clients\n- Network interruption handling\n\n**Advanced Patterns** (`advanced_patterns/`):\n- Memory channels vs generators\n- Error recovery and circuit breakers\n- Custom protocol development\n\nRun any demonstration:\n```bash\npython examples/demonstrations/basic_patterns/client_disconnect.py\npython examples/demonstrations/production_scenarios/load_simulations.py\npython examples/demonstrations/advanced_patterns/error_recovery.py\n```\n\n## Troubleshooting\n\n### Common Issues\n\n**Database session errors with async generators**\n- Create database sessions inside generators, not as dependencies\n\n**Hanging connections after client disconnect** \n- Always check `await request.is_disconnected()` in loops\n- Use `send_timeout` parameter to detect dead connections\n\nIf you are using Postman, please see: https://github.com/sysid/sse-starlette/issues/47#issuecomment-1445953826\n\n### Performance Optimization\n\n```python\n# Connection limits\nclass ConnectionLimiter:\n def __init__(self, max_connections=100):\n self.semaphore = asyncio.Semaphore(max_connections)\n \n async def limited_endpoint(self, request):\n async with self.semaphore:\n return EventSourceResponse(generate_events())\n```\n\n## Contributing\n\nSee examples and demonstrations for implementation patterns. Run tests with:\n\n```bash\nmake test-unit # Unit tests only\nmake test # All tests including integration\n```\n\n<!-- Badges -->\n[pypi-image]: https://badge.fury.io/py/sse-starlette.svg\n[pypi-url]: https://pypi.org/project/sse-starlette/\n[build-image]: https://github.com/sysid/sse-starlette/actions/workflows/build.yml/badge.svg\n[build-url]: https://github.com/sysid/sse-starlette/actions/workflows/build.yml\n[coverage-image]: https://codecov.io/gh/sysid/sse-starlette/branch/master/graph/badge.svg\n<!--[coverage-url]: https://codecov.io/gh/sysid/sse-starlette-->\n\n",
"bugtrack_url": null,
"license": null,
"summary": "SSE plugin for Starlette",
"version": "3.0.2",
"project_urls": {
"Source": "https://github.com/sysid/sse-starlette"
},
"split_keywords": [],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "ef10c78f463b4ef22eef8491f218f692be838282cd65480f6e423d7730dfd1fb",
"md5": "91cc7132614c0ff449f0186dfd28bed7",
"sha256": "16b7cbfddbcd4eaca11f7b586f3b8a080f1afe952c15813455b162edea619e5a"
},
"downloads": -1,
"filename": "sse_starlette-3.0.2-py3-none-any.whl",
"has_sig": false,
"md5_digest": "91cc7132614c0ff449f0186dfd28bed7",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 11297,
"upload_time": "2025-07-27T09:07:43",
"upload_time_iso_8601": "2025-07-27T09:07:43.268468Z",
"url": "https://files.pythonhosted.org/packages/ef/10/c78f463b4ef22eef8491f218f692be838282cd65480f6e423d7730dfd1fb/sse_starlette-3.0.2-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "426f22ed6e33f8a9e76ca0a412405f31abb844b779d52c5f96660766edcd737c",
"md5": "8b30fb9cd04490f49df810f49f40fa09",
"sha256": "ccd60b5765ebb3584d0de2d7a6e4f745672581de4f5005ab31c3a25d10b52b3a"
},
"downloads": -1,
"filename": "sse_starlette-3.0.2.tar.gz",
"has_sig": false,
"md5_digest": "8b30fb9cd04490f49df810f49f40fa09",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 20985,
"upload_time": "2025-07-27T09:07:44",
"upload_time_iso_8601": "2025-07-27T09:07:44.565427Z",
"url": "https://files.pythonhosted.org/packages/42/6f/22ed6e33f8a9e76ca0a412405f31abb844b779d52c5f96660766edcd737c/sse_starlette-3.0.2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-27 09:07:44",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "sysid",
"github_project": "sse-starlette",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "sse-starlette"
}