httpx-sse


Namehttpx-sse JSON
Version 0.4.0 PyPI version JSON
download
home_page
SummaryConsume Server-Sent Event (SSE) messages with HTTPX.
upload_time2023-12-22 08:01:21
maintainer
docs_urlNone
author
requires_python>=3.8
licenseMIT
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # httpx-sse

[![Build Status](https://dev.azure.com/florimondmanca/public/_apis/build/status/florimondmanca.httpx-sse?branchName=master)](https://dev.azure.com/florimondmanca/public/_build?definitionId=19)
[![Coverage](https://codecov.io/gh/florimondmanca/httpx-sse/branch/master/graph/badge.svg)](https://codecov.io/gh/florimondmanca/httpx-sse)
[![Package version](https://badge.fury.io/py/httpx-sse.svg)](https://pypi.org/project/httpx-sse)

Consume [Server-Sent Event (SSE)](https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events) messages with [HTTPX](https://www.python-httpx.org).

**Table of contents**

- [Installation](#installation)
- [Quickstart](#quickstart)
- [How-To](#how-to)
- [API Reference](#api-reference)

## Installation

**NOTE**: This is beta software. Please be sure to pin your dependencies.

```bash
pip install httpx-sse=="0.4.*"
```

## Quickstart

`httpx-sse` provides the [`connect_sse`](#connect_sse) and [`aconnect_sse`](#aconnect_sse) helpers for connecting to an SSE endpoint. The resulting [`EventSource`](#eventsource) object exposes the [`.iter_sse()`](#iter_sse) and [`.aiter_sse()`](#aiter_sse) methods to iterate over the server-sent events.

Example usage:

```python
import httpx
from httpx_sse import connect_sse

with httpx.Client() as client:
    with connect_sse(client, "GET", "http://localhost:8000/sse") as event_source:
        for sse in event_source.iter_sse():
            print(sse.event, sse.data, sse.id, sse.retry)
```

You can try this against this example Starlette server ([credit](https://sysid.github.io/sse/)):

```python
# Requirements: pip install uvicorn starlette sse-starlette
import asyncio
import uvicorn
from starlette.applications import Starlette
from starlette.routing import Route
from sse_starlette.sse import EventSourceResponse

async def numbers(minimum, maximum):
    for i in range(minimum, maximum + 1):
        await asyncio.sleep(0.9)
        yield {"data": i}

async def sse(request):
    generator = numbers(1, 5)
    return EventSourceResponse(generator)

routes = [
    Route("/sse", endpoint=sse)
]

app = Starlette(routes=routes)

if __name__ == "__main__":
    uvicorn.run(app)
```

## How-To

### Calling into Python web apps

You can [call into Python web apps](https://www.python-httpx.org/async/#calling-into-python-web-apps) with HTTPX and `httpx-sse` to test SSE endpoints directly.

Here's an example of calling into a Starlette ASGI app...

```python
import asyncio

import httpx
from httpx_sse import aconnect_sse
from sse_starlette.sse import EventSourceResponse
from starlette.applications import Starlette
from starlette.routing import Route

async def auth_events(request):
    async def events():
        yield {
            "event": "login",
            "data": '{"user_id": "4135"}',
        }

    return EventSourceResponse(events())

app = Starlette(routes=[Route("/sse/auth/", endpoint=auth_events)])

async def main():
    async with httpx.AsyncClient(app=app) as client:
        async with aconnect_sse(
            client, "GET", "http://localhost:8000/sse/auth/"
        ) as event_source:
            events = [sse async for sse in event_source.aiter_sse()]
            (sse,) = events
            assert sse.event == "login"
            assert sse.json() == {"user_id": "4135"}

asyncio.run(main())
```

### Handling reconnections

_(Advanced)_

`SSETransport` and `AsyncSSETransport` don't have reconnection built-in. This is because how to perform retries is generally dependent on your use case. As a result, if the connection breaks while attempting to read from the server, you will get an `httpx.ReadError` from `iter_sse()` (or `aiter_sse()`).

However, `httpx-sse` does allow implementing reconnection by using the `Last-Event-ID` and reconnection time (in milliseconds), exposed as `sse.id` and `sse.retry` respectively.

Here's how you might achieve this using [`stamina`](https://github.com/hynek/stamina)...

```python
import time
from typing import Iterator

import httpx
from httpx_sse import connect_sse, ServerSentEvent
from stamina import retry

def iter_sse_retrying(client, method, url):
    last_event_id = ""
    reconnection_delay = 0.0

    # `stamina` will apply jitter and exponential backoff on top of
    # the `retry` reconnection delay sent by the server.
    @retry(on=httpx.ReadError)
    def _iter_sse():
        nonlocal last_event_id, reconnection_delay

        time.sleep(reconnection_delay)

        headers = {"Accept": "text/event-stream"}

        if last_event_id:
            headers["Last-Event-ID"] = last_event_id

        with connect_sse(client, method, url, headers=headers) as event_source:
            for sse in event_source.iter_sse():
                last_event_id = sse.id

                if sse.retry is not None:
                    reconnection_delay = sse.retry / 1000

                yield sse

    return _iter_sse()
```

Usage:

```python
with httpx.Client() as client:
    for sse in iter_sse_retrying(client, "GET", "http://localhost:8000/sse"):
        print(sse.event, sse.data)
```

## API Reference

### `connect_sse`

```python
def connect_sse(
    client: httpx.Client,
    method: str,
    url: Union[str, httpx.URL],
    **kwargs,
) -> ContextManager[EventSource]
```

Connect to an SSE endpoint and return an [`EventSource`](#eventsource) context manager.

This sets `Cache-Control: no-store` on the request, as per the SSE spec, as well as `Accept: text/event-stream`.

If the response `Content-Type` is not `text/event-stream`, this will raise an [`SSEError`](#sseerror).

### `aconnect_sse`

```python
async def aconnect_sse(
    client: httpx.AsyncClient,
    method: str,
    url: Union[str, httpx.URL],
    **kwargs,
) -> AsyncContextManager[EventSource]
```

An async equivalent to [`connect_sse`](#connect_sse).

### `EventSource`

```python
def __init__(response: httpx.Response)
```

Helper for working with an SSE response.

#### `response`

The underlying [`httpx.Response`](https://www.python-httpx.org/api/#response).

#### `iter_sse`

```python
def iter_sse() -> Iterator[ServerSentEvent]
```

Decode the response content and yield corresponding [`ServerSentEvent`](#serversentevent).

Example usage:

```python
for sse in event_source.iter_sse():
    ...
```

#### `aiter_sse`

```python
async def iter_sse() -> AsyncIterator[ServerSentEvent]
```

An async equivalent to `iter_sse`.

### `ServerSentEvent`

Represents a server-sent event.

* `event: str` - Defaults to `"message"`.
* `data: str` - Defaults to `""`.
* `id: str` - Defaults to `""`.
* `retry: str | None` - Defaults to `None`.

Methods:

* `json() -> Any` - Returns `sse.data` decoded as JSON.

### `SSEError`

An error that occurred while making a request to an SSE endpoint.

Parents:

* `httpx.TransportError`

## License

MIT

# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## 0.4.0 - 2023-12-22

### Removed

* Dropped Python 3.7 support, as it has reached EOL. (Pull #21)

### Added

* Add official support for Python 3.12. (Pull #21)

### Fixed

* Allow `Content-Type` that contain but are not strictly `text/event-stream`. (Pull #22 by @dbuades)
* Improve error message when `Content-Type` is missing. (Pull #20 by @jamesbraza)

## 0.3.1 - 2023-06-01

### Added

* Add `__repr__()` for `ServerSentEvent` model, which may help with debugging and other tasks. (Pull #16)

## 0.3.0 - 2023-04-27

### Changed

* Raising an `SSEError` if the response content type is not `text/event-stream` is now performed as part of `iter_sse()` / `aiter_sse()`, instead of `connect_sse()` / `aconnect_sse()`. This allows inspecting the response before iterating on server-sent events, such as checking for error responses. (Pull #12)

## 0.2.0 - 2023-03-27

### Changed

* `connect_sse()` and `aconnect_sse()` now require a `method` argument: `connect_sse(client, "GET", "https://example.org")`. This provides support for SSE requests with HTTP verbs other than `GET`. (Pull #7)

## 0.1.0 - 2023-02-05

_Initial release_

### Added

* Add `connect_sse`, `aconnect_sse()`, `ServerSentEvent` and `SSEError`.

            

Raw data

            {
    "_id": null,
    "home_page": "",
    "name": "httpx-sse",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": "",
    "keywords": "",
    "author": "",
    "author_email": "Florimond Manca <florimond.manca@protonmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/4c/60/8f4281fa9bbf3c8034fd54c0e7412e66edbab6bc74c4996bd616f8d0406e/httpx-sse-0.4.0.tar.gz",
    "platform": null,
    "description": "# httpx-sse\n\n[![Build Status](https://dev.azure.com/florimondmanca/public/_apis/build/status/florimondmanca.httpx-sse?branchName=master)](https://dev.azure.com/florimondmanca/public/_build?definitionId=19)\n[![Coverage](https://codecov.io/gh/florimondmanca/httpx-sse/branch/master/graph/badge.svg)](https://codecov.io/gh/florimondmanca/httpx-sse)\n[![Package version](https://badge.fury.io/py/httpx-sse.svg)](https://pypi.org/project/httpx-sse)\n\nConsume [Server-Sent Event (SSE)](https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events) messages with [HTTPX](https://www.python-httpx.org).\n\n**Table of contents**\n\n- [Installation](#installation)\n- [Quickstart](#quickstart)\n- [How-To](#how-to)\n- [API Reference](#api-reference)\n\n## Installation\n\n**NOTE**: This is beta software. Please be sure to pin your dependencies.\n\n```bash\npip install httpx-sse==\"0.4.*\"\n```\n\n## Quickstart\n\n`httpx-sse` provides the [`connect_sse`](#connect_sse) and [`aconnect_sse`](#aconnect_sse) helpers for connecting to an SSE endpoint. The resulting [`EventSource`](#eventsource) object exposes the [`.iter_sse()`](#iter_sse) and [`.aiter_sse()`](#aiter_sse) methods to iterate over the server-sent events.\n\nExample usage:\n\n```python\nimport httpx\nfrom httpx_sse import connect_sse\n\nwith httpx.Client() as client:\n    with connect_sse(client, \"GET\", \"http://localhost:8000/sse\") as event_source:\n        for sse in event_source.iter_sse():\n            print(sse.event, sse.data, sse.id, sse.retry)\n```\n\nYou can try this against this example Starlette server ([credit](https://sysid.github.io/sse/)):\n\n```python\n# Requirements: pip install uvicorn starlette sse-starlette\nimport asyncio\nimport uvicorn\nfrom starlette.applications import Starlette\nfrom starlette.routing import Route\nfrom sse_starlette.sse import EventSourceResponse\n\nasync def numbers(minimum, maximum):\n    for i in range(minimum, maximum + 1):\n        await asyncio.sleep(0.9)\n        yield {\"data\": i}\n\nasync def sse(request):\n    generator = numbers(1, 5)\n    return EventSourceResponse(generator)\n\nroutes = [\n    Route(\"/sse\", endpoint=sse)\n]\n\napp = Starlette(routes=routes)\n\nif __name__ == \"__main__\":\n    uvicorn.run(app)\n```\n\n## How-To\n\n### Calling into Python web apps\n\nYou can [call into Python web apps](https://www.python-httpx.org/async/#calling-into-python-web-apps) with HTTPX and `httpx-sse` to test SSE endpoints directly.\n\nHere's an example of calling into a Starlette ASGI app...\n\n```python\nimport asyncio\n\nimport httpx\nfrom httpx_sse import aconnect_sse\nfrom sse_starlette.sse import EventSourceResponse\nfrom starlette.applications import Starlette\nfrom starlette.routing import Route\n\nasync def auth_events(request):\n    async def events():\n        yield {\n            \"event\": \"login\",\n            \"data\": '{\"user_id\": \"4135\"}',\n        }\n\n    return EventSourceResponse(events())\n\napp = Starlette(routes=[Route(\"/sse/auth/\", endpoint=auth_events)])\n\nasync def main():\n    async with httpx.AsyncClient(app=app) as client:\n        async with aconnect_sse(\n            client, \"GET\", \"http://localhost:8000/sse/auth/\"\n        ) as event_source:\n            events = [sse async for sse in event_source.aiter_sse()]\n            (sse,) = events\n            assert sse.event == \"login\"\n            assert sse.json() == {\"user_id\": \"4135\"}\n\nasyncio.run(main())\n```\n\n### Handling reconnections\n\n_(Advanced)_\n\n`SSETransport` and `AsyncSSETransport` don't have reconnection built-in. This is because how to perform retries is generally dependent on your use case. As a result, if the connection breaks while attempting to read from the server, you will get an `httpx.ReadError` from `iter_sse()` (or `aiter_sse()`).\n\nHowever, `httpx-sse` does allow implementing reconnection by using the `Last-Event-ID` and reconnection time (in milliseconds), exposed as `sse.id` and `sse.retry` respectively.\n\nHere's how you might achieve this using [`stamina`](https://github.com/hynek/stamina)...\n\n```python\nimport time\nfrom typing import Iterator\n\nimport httpx\nfrom httpx_sse import connect_sse, ServerSentEvent\nfrom stamina import retry\n\ndef iter_sse_retrying(client, method, url):\n    last_event_id = \"\"\n    reconnection_delay = 0.0\n\n    # `stamina` will apply jitter and exponential backoff on top of\n    # the `retry` reconnection delay sent by the server.\n    @retry(on=httpx.ReadError)\n    def _iter_sse():\n        nonlocal last_event_id, reconnection_delay\n\n        time.sleep(reconnection_delay)\n\n        headers = {\"Accept\": \"text/event-stream\"}\n\n        if last_event_id:\n            headers[\"Last-Event-ID\"] = last_event_id\n\n        with connect_sse(client, method, url, headers=headers) as event_source:\n            for sse in event_source.iter_sse():\n                last_event_id = sse.id\n\n                if sse.retry is not None:\n                    reconnection_delay = sse.retry / 1000\n\n                yield sse\n\n    return _iter_sse()\n```\n\nUsage:\n\n```python\nwith httpx.Client() as client:\n    for sse in iter_sse_retrying(client, \"GET\", \"http://localhost:8000/sse\"):\n        print(sse.event, sse.data)\n```\n\n## API Reference\n\n### `connect_sse`\n\n```python\ndef connect_sse(\n    client: httpx.Client,\n    method: str,\n    url: Union[str, httpx.URL],\n    **kwargs,\n) -> ContextManager[EventSource]\n```\n\nConnect to an SSE endpoint and return an [`EventSource`](#eventsource) context manager.\n\nThis sets `Cache-Control: no-store` on the request, as per the SSE spec, as well as `Accept: text/event-stream`.\n\nIf the response `Content-Type` is not `text/event-stream`, this will raise an [`SSEError`](#sseerror).\n\n### `aconnect_sse`\n\n```python\nasync def aconnect_sse(\n    client: httpx.AsyncClient,\n    method: str,\n    url: Union[str, httpx.URL],\n    **kwargs,\n) -> AsyncContextManager[EventSource]\n```\n\nAn async equivalent to [`connect_sse`](#connect_sse).\n\n### `EventSource`\n\n```python\ndef __init__(response: httpx.Response)\n```\n\nHelper for working with an SSE response.\n\n#### `response`\n\nThe underlying [`httpx.Response`](https://www.python-httpx.org/api/#response).\n\n#### `iter_sse`\n\n```python\ndef iter_sse() -> Iterator[ServerSentEvent]\n```\n\nDecode the response content and yield corresponding [`ServerSentEvent`](#serversentevent).\n\nExample usage:\n\n```python\nfor sse in event_source.iter_sse():\n    ...\n```\n\n#### `aiter_sse`\n\n```python\nasync def iter_sse() -> AsyncIterator[ServerSentEvent]\n```\n\nAn async equivalent to `iter_sse`.\n\n### `ServerSentEvent`\n\nRepresents a server-sent event.\n\n* `event: str` - Defaults to `\"message\"`.\n* `data: str` - Defaults to `\"\"`.\n* `id: str` - Defaults to `\"\"`.\n* `retry: str | None` - Defaults to `None`.\n\nMethods:\n\n* `json() -> Any` - Returns `sse.data` decoded as JSON.\n\n### `SSEError`\n\nAn error that occurred while making a request to an SSE endpoint.\n\nParents:\n\n* `httpx.TransportError`\n\n## License\n\nMIT\n\n# Changelog\n\nAll notable changes to this project will be documented in this file.\n\nThe format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).\n\n## 0.4.0 - 2023-12-22\n\n### Removed\n\n* Dropped Python 3.7 support, as it has reached EOL. (Pull #21)\n\n### Added\n\n* Add official support for Python 3.12. (Pull #21)\n\n### Fixed\n\n* Allow `Content-Type` that contain but are not strictly `text/event-stream`. (Pull #22 by @dbuades)\n* Improve error message when `Content-Type` is missing. (Pull #20 by @jamesbraza)\n\n## 0.3.1 - 2023-06-01\n\n### Added\n\n* Add `__repr__()` for `ServerSentEvent` model, which may help with debugging and other tasks. (Pull #16)\n\n## 0.3.0 - 2023-04-27\n\n### Changed\n\n* Raising an `SSEError` if the response content type is not `text/event-stream` is now performed as part of `iter_sse()` / `aiter_sse()`, instead of `connect_sse()` / `aconnect_sse()`. This allows inspecting the response before iterating on server-sent events, such as checking for error responses. (Pull #12)\n\n## 0.2.0 - 2023-03-27\n\n### Changed\n\n* `connect_sse()` and `aconnect_sse()` now require a `method` argument: `connect_sse(client, \"GET\", \"https://example.org\")`. This provides support for SSE requests with HTTP verbs other than `GET`. (Pull #7)\n\n## 0.1.0 - 2023-02-05\n\n_Initial release_\n\n### Added\n\n* Add `connect_sse`, `aconnect_sse()`, `ServerSentEvent` and `SSEError`.\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Consume Server-Sent Event (SSE) messages with HTTPX.",
    "version": "0.4.0",
    "project_urls": {
        "Homepage": "https://github.com/florimondmanca/httpx-sse"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "e19ba181f281f65d776426002f330c31849b86b31fc9d848db62e16f03ff739f",
                "md5": "40ba2ec6c1889e47a55182e55805369e",
                "sha256": "f329af6eae57eaa2bdfd962b42524764af68075ea87370a2de920af5341e318f"
            },
            "downloads": -1,
            "filename": "httpx_sse-0.4.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "40ba2ec6c1889e47a55182e55805369e",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 7819,
            "upload_time": "2023-12-22T08:01:19",
            "upload_time_iso_8601": "2023-12-22T08:01:19.890100Z",
            "url": "https://files.pythonhosted.org/packages/e1/9b/a181f281f65d776426002f330c31849b86b31fc9d848db62e16f03ff739f/httpx_sse-0.4.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "4c608f4281fa9bbf3c8034fd54c0e7412e66edbab6bc74c4996bd616f8d0406e",
                "md5": "609d7a2dedb0b6592f4977d0a593268a",
                "sha256": "1e81a3a3070ce322add1d3529ed42eb5f70817f45ed6ec915ab753f961139721"
            },
            "downloads": -1,
            "filename": "httpx-sse-0.4.0.tar.gz",
            "has_sig": false,
            "md5_digest": "609d7a2dedb0b6592f4977d0a593268a",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 12624,
            "upload_time": "2023-12-22T08:01:21",
            "upload_time_iso_8601": "2023-12-22T08:01:21.083556Z",
            "url": "https://files.pythonhosted.org/packages/4c/60/8f4281fa9bbf3c8034fd54c0e7412e66edbab6bc74c4996bd616f8d0406e/httpx-sse-0.4.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-12-22 08:01:21",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "florimondmanca",
    "github_project": "httpx-sse",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "requirements": [],
    "lcname": "httpx-sse"
}
        
Elapsed time: 0.16814s