Name | httpx-sse JSON |
Version |
0.4.0
JSON |
| download |
home_page | |
Summary | Consume Server-Sent Event (SSE) messages with HTTPX. |
upload_time | 2023-12-22 08:01:21 |
maintainer | |
docs_url | None |
author | |
requires_python | >=3.8 |
license | MIT |
keywords |
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# httpx-sse
[![Build Status](https://dev.azure.com/florimondmanca/public/_apis/build/status/florimondmanca.httpx-sse?branchName=master)](https://dev.azure.com/florimondmanca/public/_build?definitionId=19)
[![Coverage](https://codecov.io/gh/florimondmanca/httpx-sse/branch/master/graph/badge.svg)](https://codecov.io/gh/florimondmanca/httpx-sse)
[![Package version](https://badge.fury.io/py/httpx-sse.svg)](https://pypi.org/project/httpx-sse)
Consume [Server-Sent Event (SSE)](https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events) messages with [HTTPX](https://www.python-httpx.org).
**Table of contents**
- [Installation](#installation)
- [Quickstart](#quickstart)
- [How-To](#how-to)
- [API Reference](#api-reference)
## Installation
**NOTE**: This is beta software. Please be sure to pin your dependencies.
```bash
pip install httpx-sse=="0.4.*"
```
## Quickstart
`httpx-sse` provides the [`connect_sse`](#connect_sse) and [`aconnect_sse`](#aconnect_sse) helpers for connecting to an SSE endpoint. The resulting [`EventSource`](#eventsource) object exposes the [`.iter_sse()`](#iter_sse) and [`.aiter_sse()`](#aiter_sse) methods to iterate over the server-sent events.
Example usage:
```python
import httpx
from httpx_sse import connect_sse
with httpx.Client() as client:
with connect_sse(client, "GET", "http://localhost:8000/sse") as event_source:
for sse in event_source.iter_sse():
print(sse.event, sse.data, sse.id, sse.retry)
```
You can try this against this example Starlette server ([credit](https://sysid.github.io/sse/)):
```python
# Requirements: pip install uvicorn starlette sse-starlette
import asyncio
import uvicorn
from starlette.applications import Starlette
from starlette.routing import Route
from sse_starlette.sse import EventSourceResponse
async def numbers(minimum, maximum):
for i in range(minimum, maximum + 1):
await asyncio.sleep(0.9)
yield {"data": i}
async def sse(request):
generator = numbers(1, 5)
return EventSourceResponse(generator)
routes = [
Route("/sse", endpoint=sse)
]
app = Starlette(routes=routes)
if __name__ == "__main__":
uvicorn.run(app)
```
## How-To
### Calling into Python web apps
You can [call into Python web apps](https://www.python-httpx.org/async/#calling-into-python-web-apps) with HTTPX and `httpx-sse` to test SSE endpoints directly.
Here's an example of calling into a Starlette ASGI app...
```python
import asyncio
import httpx
from httpx_sse import aconnect_sse
from sse_starlette.sse import EventSourceResponse
from starlette.applications import Starlette
from starlette.routing import Route
async def auth_events(request):
async def events():
yield {
"event": "login",
"data": '{"user_id": "4135"}',
}
return EventSourceResponse(events())
app = Starlette(routes=[Route("/sse/auth/", endpoint=auth_events)])
async def main():
async with httpx.AsyncClient(app=app) as client:
async with aconnect_sse(
client, "GET", "http://localhost:8000/sse/auth/"
) as event_source:
events = [sse async for sse in event_source.aiter_sse()]
(sse,) = events
assert sse.event == "login"
assert sse.json() == {"user_id": "4135"}
asyncio.run(main())
```
### Handling reconnections
_(Advanced)_
`SSETransport` and `AsyncSSETransport` don't have reconnection built-in. This is because how to perform retries is generally dependent on your use case. As a result, if the connection breaks while attempting to read from the server, you will get an `httpx.ReadError` from `iter_sse()` (or `aiter_sse()`).
However, `httpx-sse` does allow implementing reconnection by using the `Last-Event-ID` and reconnection time (in milliseconds), exposed as `sse.id` and `sse.retry` respectively.
Here's how you might achieve this using [`stamina`](https://github.com/hynek/stamina)...
```python
import time
from typing import Iterator
import httpx
from httpx_sse import connect_sse, ServerSentEvent
from stamina import retry
def iter_sse_retrying(client, method, url):
last_event_id = ""
reconnection_delay = 0.0
# `stamina` will apply jitter and exponential backoff on top of
# the `retry` reconnection delay sent by the server.
@retry(on=httpx.ReadError)
def _iter_sse():
nonlocal last_event_id, reconnection_delay
time.sleep(reconnection_delay)
headers = {"Accept": "text/event-stream"}
if last_event_id:
headers["Last-Event-ID"] = last_event_id
with connect_sse(client, method, url, headers=headers) as event_source:
for sse in event_source.iter_sse():
last_event_id = sse.id
if sse.retry is not None:
reconnection_delay = sse.retry / 1000
yield sse
return _iter_sse()
```
Usage:
```python
with httpx.Client() as client:
for sse in iter_sse_retrying(client, "GET", "http://localhost:8000/sse"):
print(sse.event, sse.data)
```
## API Reference
### `connect_sse`
```python
def connect_sse(
client: httpx.Client,
method: str,
url: Union[str, httpx.URL],
**kwargs,
) -> ContextManager[EventSource]
```
Connect to an SSE endpoint and return an [`EventSource`](#eventsource) context manager.
This sets `Cache-Control: no-store` on the request, as per the SSE spec, as well as `Accept: text/event-stream`.
If the response `Content-Type` is not `text/event-stream`, this will raise an [`SSEError`](#sseerror).
### `aconnect_sse`
```python
async def aconnect_sse(
client: httpx.AsyncClient,
method: str,
url: Union[str, httpx.URL],
**kwargs,
) -> AsyncContextManager[EventSource]
```
An async equivalent to [`connect_sse`](#connect_sse).
### `EventSource`
```python
def __init__(response: httpx.Response)
```
Helper for working with an SSE response.
#### `response`
The underlying [`httpx.Response`](https://www.python-httpx.org/api/#response).
#### `iter_sse`
```python
def iter_sse() -> Iterator[ServerSentEvent]
```
Decode the response content and yield corresponding [`ServerSentEvent`](#serversentevent).
Example usage:
```python
for sse in event_source.iter_sse():
...
```
#### `aiter_sse`
```python
async def iter_sse() -> AsyncIterator[ServerSentEvent]
```
An async equivalent to `iter_sse`.
### `ServerSentEvent`
Represents a server-sent event.
* `event: str` - Defaults to `"message"`.
* `data: str` - Defaults to `""`.
* `id: str` - Defaults to `""`.
* `retry: str | None` - Defaults to `None`.
Methods:
* `json() -> Any` - Returns `sse.data` decoded as JSON.
### `SSEError`
An error that occurred while making a request to an SSE endpoint.
Parents:
* `httpx.TransportError`
## License
MIT
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
## 0.4.0 - 2023-12-22
### Removed
* Dropped Python 3.7 support, as it has reached EOL. (Pull #21)
### Added
* Add official support for Python 3.12. (Pull #21)
### Fixed
* Allow `Content-Type` that contain but are not strictly `text/event-stream`. (Pull #22 by @dbuades)
* Improve error message when `Content-Type` is missing. (Pull #20 by @jamesbraza)
## 0.3.1 - 2023-06-01
### Added
* Add `__repr__()` for `ServerSentEvent` model, which may help with debugging and other tasks. (Pull #16)
## 0.3.0 - 2023-04-27
### Changed
* Raising an `SSEError` if the response content type is not `text/event-stream` is now performed as part of `iter_sse()` / `aiter_sse()`, instead of `connect_sse()` / `aconnect_sse()`. This allows inspecting the response before iterating on server-sent events, such as checking for error responses. (Pull #12)
## 0.2.0 - 2023-03-27
### Changed
* `connect_sse()` and `aconnect_sse()` now require a `method` argument: `connect_sse(client, "GET", "https://example.org")`. This provides support for SSE requests with HTTP verbs other than `GET`. (Pull #7)
## 0.1.0 - 2023-02-05
_Initial release_
### Added
* Add `connect_sse`, `aconnect_sse()`, `ServerSentEvent` and `SSEError`.
Raw data
{
"_id": null,
"home_page": "",
"name": "httpx-sse",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": "",
"keywords": "",
"author": "",
"author_email": "Florimond Manca <florimond.manca@protonmail.com>",
"download_url": "https://files.pythonhosted.org/packages/4c/60/8f4281fa9bbf3c8034fd54c0e7412e66edbab6bc74c4996bd616f8d0406e/httpx-sse-0.4.0.tar.gz",
"platform": null,
"description": "# httpx-sse\n\n[![Build Status](https://dev.azure.com/florimondmanca/public/_apis/build/status/florimondmanca.httpx-sse?branchName=master)](https://dev.azure.com/florimondmanca/public/_build?definitionId=19)\n[![Coverage](https://codecov.io/gh/florimondmanca/httpx-sse/branch/master/graph/badge.svg)](https://codecov.io/gh/florimondmanca/httpx-sse)\n[![Package version](https://badge.fury.io/py/httpx-sse.svg)](https://pypi.org/project/httpx-sse)\n\nConsume [Server-Sent Event (SSE)](https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events) messages with [HTTPX](https://www.python-httpx.org).\n\n**Table of contents**\n\n- [Installation](#installation)\n- [Quickstart](#quickstart)\n- [How-To](#how-to)\n- [API Reference](#api-reference)\n\n## Installation\n\n**NOTE**: This is beta software. Please be sure to pin your dependencies.\n\n```bash\npip install httpx-sse==\"0.4.*\"\n```\n\n## Quickstart\n\n`httpx-sse` provides the [`connect_sse`](#connect_sse) and [`aconnect_sse`](#aconnect_sse) helpers for connecting to an SSE endpoint. The resulting [`EventSource`](#eventsource) object exposes the [`.iter_sse()`](#iter_sse) and [`.aiter_sse()`](#aiter_sse) methods to iterate over the server-sent events.\n\nExample usage:\n\n```python\nimport httpx\nfrom httpx_sse import connect_sse\n\nwith httpx.Client() as client:\n with connect_sse(client, \"GET\", \"http://localhost:8000/sse\") as event_source:\n for sse in event_source.iter_sse():\n print(sse.event, sse.data, sse.id, sse.retry)\n```\n\nYou can try this against this example Starlette server ([credit](https://sysid.github.io/sse/)):\n\n```python\n# Requirements: pip install uvicorn starlette sse-starlette\nimport asyncio\nimport uvicorn\nfrom starlette.applications import Starlette\nfrom starlette.routing import Route\nfrom sse_starlette.sse import EventSourceResponse\n\nasync def numbers(minimum, maximum):\n for i in range(minimum, maximum + 1):\n await asyncio.sleep(0.9)\n yield {\"data\": i}\n\nasync def sse(request):\n generator = numbers(1, 5)\n return EventSourceResponse(generator)\n\nroutes = [\n Route(\"/sse\", endpoint=sse)\n]\n\napp = Starlette(routes=routes)\n\nif __name__ == \"__main__\":\n uvicorn.run(app)\n```\n\n## How-To\n\n### Calling into Python web apps\n\nYou can [call into Python web apps](https://www.python-httpx.org/async/#calling-into-python-web-apps) with HTTPX and `httpx-sse` to test SSE endpoints directly.\n\nHere's an example of calling into a Starlette ASGI app...\n\n```python\nimport asyncio\n\nimport httpx\nfrom httpx_sse import aconnect_sse\nfrom sse_starlette.sse import EventSourceResponse\nfrom starlette.applications import Starlette\nfrom starlette.routing import Route\n\nasync def auth_events(request):\n async def events():\n yield {\n \"event\": \"login\",\n \"data\": '{\"user_id\": \"4135\"}',\n }\n\n return EventSourceResponse(events())\n\napp = Starlette(routes=[Route(\"/sse/auth/\", endpoint=auth_events)])\n\nasync def main():\n async with httpx.AsyncClient(app=app) as client:\n async with aconnect_sse(\n client, \"GET\", \"http://localhost:8000/sse/auth/\"\n ) as event_source:\n events = [sse async for sse in event_source.aiter_sse()]\n (sse,) = events\n assert sse.event == \"login\"\n assert sse.json() == {\"user_id\": \"4135\"}\n\nasyncio.run(main())\n```\n\n### Handling reconnections\n\n_(Advanced)_\n\n`SSETransport` and `AsyncSSETransport` don't have reconnection built-in. This is because how to perform retries is generally dependent on your use case. As a result, if the connection breaks while attempting to read from the server, you will get an `httpx.ReadError` from `iter_sse()` (or `aiter_sse()`).\n\nHowever, `httpx-sse` does allow implementing reconnection by using the `Last-Event-ID` and reconnection time (in milliseconds), exposed as `sse.id` and `sse.retry` respectively.\n\nHere's how you might achieve this using [`stamina`](https://github.com/hynek/stamina)...\n\n```python\nimport time\nfrom typing import Iterator\n\nimport httpx\nfrom httpx_sse import connect_sse, ServerSentEvent\nfrom stamina import retry\n\ndef iter_sse_retrying(client, method, url):\n last_event_id = \"\"\n reconnection_delay = 0.0\n\n # `stamina` will apply jitter and exponential backoff on top of\n # the `retry` reconnection delay sent by the server.\n @retry(on=httpx.ReadError)\n def _iter_sse():\n nonlocal last_event_id, reconnection_delay\n\n time.sleep(reconnection_delay)\n\n headers = {\"Accept\": \"text/event-stream\"}\n\n if last_event_id:\n headers[\"Last-Event-ID\"] = last_event_id\n\n with connect_sse(client, method, url, headers=headers) as event_source:\n for sse in event_source.iter_sse():\n last_event_id = sse.id\n\n if sse.retry is not None:\n reconnection_delay = sse.retry / 1000\n\n yield sse\n\n return _iter_sse()\n```\n\nUsage:\n\n```python\nwith httpx.Client() as client:\n for sse in iter_sse_retrying(client, \"GET\", \"http://localhost:8000/sse\"):\n print(sse.event, sse.data)\n```\n\n## API Reference\n\n### `connect_sse`\n\n```python\ndef connect_sse(\n client: httpx.Client,\n method: str,\n url: Union[str, httpx.URL],\n **kwargs,\n) -> ContextManager[EventSource]\n```\n\nConnect to an SSE endpoint and return an [`EventSource`](#eventsource) context manager.\n\nThis sets `Cache-Control: no-store` on the request, as per the SSE spec, as well as `Accept: text/event-stream`.\n\nIf the response `Content-Type` is not `text/event-stream`, this will raise an [`SSEError`](#sseerror).\n\n### `aconnect_sse`\n\n```python\nasync def aconnect_sse(\n client: httpx.AsyncClient,\n method: str,\n url: Union[str, httpx.URL],\n **kwargs,\n) -> AsyncContextManager[EventSource]\n```\n\nAn async equivalent to [`connect_sse`](#connect_sse).\n\n### `EventSource`\n\n```python\ndef __init__(response: httpx.Response)\n```\n\nHelper for working with an SSE response.\n\n#### `response`\n\nThe underlying [`httpx.Response`](https://www.python-httpx.org/api/#response).\n\n#### `iter_sse`\n\n```python\ndef iter_sse() -> Iterator[ServerSentEvent]\n```\n\nDecode the response content and yield corresponding [`ServerSentEvent`](#serversentevent).\n\nExample usage:\n\n```python\nfor sse in event_source.iter_sse():\n ...\n```\n\n#### `aiter_sse`\n\n```python\nasync def iter_sse() -> AsyncIterator[ServerSentEvent]\n```\n\nAn async equivalent to `iter_sse`.\n\n### `ServerSentEvent`\n\nRepresents a server-sent event.\n\n* `event: str` - Defaults to `\"message\"`.\n* `data: str` - Defaults to `\"\"`.\n* `id: str` - Defaults to `\"\"`.\n* `retry: str | None` - Defaults to `None`.\n\nMethods:\n\n* `json() -> Any` - Returns `sse.data` decoded as JSON.\n\n### `SSEError`\n\nAn error that occurred while making a request to an SSE endpoint.\n\nParents:\n\n* `httpx.TransportError`\n\n## License\n\nMIT\n\n# Changelog\n\nAll notable changes to this project will be documented in this file.\n\nThe format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).\n\n## 0.4.0 - 2023-12-22\n\n### Removed\n\n* Dropped Python 3.7 support, as it has reached EOL. (Pull #21)\n\n### Added\n\n* Add official support for Python 3.12. (Pull #21)\n\n### Fixed\n\n* Allow `Content-Type` that contain but are not strictly `text/event-stream`. (Pull #22 by @dbuades)\n* Improve error message when `Content-Type` is missing. (Pull #20 by @jamesbraza)\n\n## 0.3.1 - 2023-06-01\n\n### Added\n\n* Add `__repr__()` for `ServerSentEvent` model, which may help with debugging and other tasks. (Pull #16)\n\n## 0.3.0 - 2023-04-27\n\n### Changed\n\n* Raising an `SSEError` if the response content type is not `text/event-stream` is now performed as part of `iter_sse()` / `aiter_sse()`, instead of `connect_sse()` / `aconnect_sse()`. This allows inspecting the response before iterating on server-sent events, such as checking for error responses. (Pull #12)\n\n## 0.2.0 - 2023-03-27\n\n### Changed\n\n* `connect_sse()` and `aconnect_sse()` now require a `method` argument: `connect_sse(client, \"GET\", \"https://example.org\")`. This provides support for SSE requests with HTTP verbs other than `GET`. (Pull #7)\n\n## 0.1.0 - 2023-02-05\n\n_Initial release_\n\n### Added\n\n* Add `connect_sse`, `aconnect_sse()`, `ServerSentEvent` and `SSEError`.\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Consume Server-Sent Event (SSE) messages with HTTPX.",
"version": "0.4.0",
"project_urls": {
"Homepage": "https://github.com/florimondmanca/httpx-sse"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "e19ba181f281f65d776426002f330c31849b86b31fc9d848db62e16f03ff739f",
"md5": "40ba2ec6c1889e47a55182e55805369e",
"sha256": "f329af6eae57eaa2bdfd962b42524764af68075ea87370a2de920af5341e318f"
},
"downloads": -1,
"filename": "httpx_sse-0.4.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "40ba2ec6c1889e47a55182e55805369e",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 7819,
"upload_time": "2023-12-22T08:01:19",
"upload_time_iso_8601": "2023-12-22T08:01:19.890100Z",
"url": "https://files.pythonhosted.org/packages/e1/9b/a181f281f65d776426002f330c31849b86b31fc9d848db62e16f03ff739f/httpx_sse-0.4.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "4c608f4281fa9bbf3c8034fd54c0e7412e66edbab6bc74c4996bd616f8d0406e",
"md5": "609d7a2dedb0b6592f4977d0a593268a",
"sha256": "1e81a3a3070ce322add1d3529ed42eb5f70817f45ed6ec915ab753f961139721"
},
"downloads": -1,
"filename": "httpx-sse-0.4.0.tar.gz",
"has_sig": false,
"md5_digest": "609d7a2dedb0b6592f4977d0a593268a",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 12624,
"upload_time": "2023-12-22T08:01:21",
"upload_time_iso_8601": "2023-12-22T08:01:21.083556Z",
"url": "https://files.pythonhosted.org/packages/4c/60/8f4281fa9bbf3c8034fd54c0e7412e66edbab6bc74c4996bd616f8d0406e/httpx-sse-0.4.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-12-22 08:01:21",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "florimondmanca",
"github_project": "httpx-sse",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"requirements": [],
"lcname": "httpx-sse"
}