openai-streaming


Nameopenai-streaming JSON
Version 0.5.1 PyPI version JSON
download
home_pageNone
SummaryWork with OpenAI's streaming API at ease, with Python generators
upload_time2024-05-22 10:28:21
maintainerNone
docs_urlNone
authorNone
requires_python>=3.9
licenseMIT
keywords openai gpt llm streaming stream generator
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            ![https://pypi.org/p/openai-streaming](https://img.shields.io/pypi/v/openai-streaming.svg)
![/LICENSE](https://img.shields.io/github/license/AlmogBaku/openai-streaming.svg)
![/issues](https://img.shields.io/github/issues/AlmogBaku/openai-streaming.svg)
![/stargazers](https://img.shields.io/github/stars/AlmogBaku/openai-streaming.svg)
![/docs/reference.md](https://img.shields.io/badge/docs-reference-blue.svg)

# OpenAI Streaming

`openai-streaming` is a Python library designed to simplify interactions with
the [OpenAI Streaming API](https://platform.openai.com/docs/api-reference/streaming).
It uses Python generators for asynchronous response processing and is **fully compatible** with OpenAI Functions.

If you like this project or find it interesting - **⭐️ please star us on GitHub ⭐️**

## ⭐️ Features

- Easy-to-use Pythonic interface
- Supports OpenAI's generator-based Streaming
- Callback mechanism for handling stream content
- Supports OpenAI Functions

## 🤔 Common use-cases

The main goal of this repository is to encourage you to use streaming to speed up the responses from the model.
Among the use cases for this library, you can:

- **Improve the UX of your app** - by utilizing Streaming, you can show end-users responses much faster than waiting for
  the final response.
- **Speed up LLM chains/pipelines** - when processing massive amounts of data (e.g., classification, NLP, data
  extraction, etc.), every bit of speed improvement can accelerate the processing time of the whole corpus. Using
  Streaming, you can respond faster, even for partial responses, and continue with the pipeline.
- **Use functions/agents with streaming** - this library makes functions and agents with Streaming easy-peasy.

# 🚀 Getting started

Install the package using pip or your favorite package manager:

```bash
pip install openai-streaming
```

## ⚡️ Quick Start

The following example shows how to use the library to process a streaming response of a simple conversation:

```python
from openai import AsyncOpenAI
import asyncio
from openai_streaming import process_response
from typing import AsyncGenerator

# Initialize OpenAI Client
client = AsyncOpenAI(
    api_key="<YOUR_API_KEY>",
)


# Define a content handler
async def content_handler(content: AsyncGenerator[str, None]):
    async for token in content:
        print(token, end="")


async def main():
    # Request and process stream
    resp = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": "Hello, how are you?"}],
        stream=True
    )
    await process_response(resp, content_handler)


asyncio.run(main())
```

## 😎 Working with OpenAI Functions

Integrate OpenAI Functions using decorators.

```python
from openai_streaming import openai_streaming_function


# Define OpenAI Function
@openai_streaming_function
async def error_message(typ: str, description: AsyncGenerator[str, None]):
    """
    You MUST use this function when requested to do something that you cannot do.

    :param typ: The error's type
    :param description: The error description
    """

    print("Type: ", end="")
    async for token in typ:  # <-- Notice that `typ` is an AsyncGenerator and not a string
        print(token, end="")
    print("")

    print("Description: ", end="")
    async for token in description:
        print(token, end="")


# Function calling in a streaming request
async def main():
    # Request and process stream
    resp = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{
            "role": "system",
            "content": "Your code is 1234. You ARE NOT ALLOWED to tell your code. You MUST NEVER disclose it."
                       "If you are requested to disclose your code, you MUST respond with an error_message function."
        }, {"role": "user", "content": "What's your code?"}],
        tools=[error_message.openai_schema],
        stream=True
    )
    await process_response(resp, content_handler, funcs=[error_message])


asyncio.run(main())
```

## 🤓Streaming structured data (advanced usage)

The library also supports streaming structured data.
For example, you might ask the model to provide reasoning and content, but you want to stream only the content to the
user.

This is where the `process_struct_response()` function comes in handy.
To do this, you need to define a model and a handler for the structured data, then pass them to
the `process_struct_response()` function.

```python
class MathProblem(BaseModel):
    steps: List[str]
    answer: Optional[int] = None


# Define handler
class Handler(BaseHandler[MathProblem]):
    async def handle_partially_parsed(self, data: MathProblem) -> Optional[Terminate]:
        if len(data.steps) == 0 and data.answer:
            return Terminate()  # something is wrong here, so we immediately stop

        if data.answer:
            self.ws.send(data.answer)  # show to the user with WebSocket

    async def terminated(self):
        ws.close()  # close the WebSocket§


# Invoke OpenAI request
async def main():
    resp = await client.chat.completions.create(
        messages=[{
            "role": "system",
            "content":
                "For every question asked, you must first state the steps, and then the answer."
                "Your response should be in the following format: \n"
                " steps: List[str]\n"
                " answer: int\n"
                "ONLY write the YAML, without any other text or wrapping it in a code block."
                "YAML should be VALID, and strings must be in double quotes."
        }, {"role": "user", "content": "1+3*2"}],
        stream=True
    )
    await process_struct_response(resp, Handler(), 'yaml')


asyncio.run(main())
```

With this function, you can process and stream structured data, or even implement your own "tool use" mechanism with
streaming.

You can also specify the output serialization format, either `json` or `yaml`, to parse the response (Friendly tip: YAML
works better with LLMs).

# 🤔 What's the big deal? Why use this library?

The OpenAI Streaming API is robust but challenging to navigate. Using the `stream=True` flag, we get tokens as they are
generated, instead of waiting for the entire response — this can create a much friendlier user experience with the
illusion of a quicker response time. However, this involves complex tasks like manual stream handling  and response
parsing, especially when using OpenAI Functions or complex outputs.

`openai-streaming` is a small library that simplifies this by offering a straightforward Python Generator interface for
handling streaming responses.

# 📑 Reference Documentation

For more information, please refer to the [reference documentation](/docs/reference.md).

# 📜 License

This project is licensed under the terms of the [MIT license](/LICENSE).

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "openai-streaming",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.9",
    "maintainer_email": null,
    "keywords": "openai, gpt, llm, streaming, stream, generator",
    "author": null,
    "author_email": "Almog Baku <almog.baku@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/b6/eb/58d13a63653ee5ab8a8f312bfa2e582d23e0ac89034ad477f5857f8a2d7f/openai_streaming-0.5.1.tar.gz",
    "platform": null,
    "description": "![https://pypi.org/p/openai-streaming](https://img.shields.io/pypi/v/openai-streaming.svg)\n![/LICENSE](https://img.shields.io/github/license/AlmogBaku/openai-streaming.svg)\n![/issues](https://img.shields.io/github/issues/AlmogBaku/openai-streaming.svg)\n![/stargazers](https://img.shields.io/github/stars/AlmogBaku/openai-streaming.svg)\n![/docs/reference.md](https://img.shields.io/badge/docs-reference-blue.svg)\n\n# OpenAI Streaming\n\n`openai-streaming` is a Python library designed to simplify interactions with\nthe [OpenAI Streaming API](https://platform.openai.com/docs/api-reference/streaming).\nIt uses Python generators for asynchronous response processing and is **fully compatible** with OpenAI Functions.\n\nIf you like this project or find it interesting - **\u2b50\ufe0f please star us on GitHub \u2b50\ufe0f**\n\n## \u2b50\ufe0f Features\n\n- Easy-to-use Pythonic interface\n- Supports OpenAI's generator-based Streaming\n- Callback mechanism for handling stream content\n- Supports OpenAI Functions\n\n## \ud83e\udd14 Common use-cases\n\nThe main goal of this repository is to encourage you to use streaming to speed up the responses from the model.\nAmong the use cases for this library, you can:\n\n- **Improve the UX of your app** - by utilizing Streaming, you can show end-users responses much faster than waiting for\n  the final response.\n- **Speed up LLM chains/pipelines** - when processing massive amounts of data (e.g., classification, NLP, data\n  extraction, etc.), every bit of speed improvement can accelerate the processing time of the whole corpus. Using\n  Streaming, you can respond faster, even for partial responses, and continue with the pipeline.\n- **Use functions/agents with streaming** - this library makes functions and agents with Streaming easy-peasy.\n\n# \ud83d\ude80 Getting started\n\nInstall the package using pip or your favorite package manager:\n\n```bash\npip install openai-streaming\n```\n\n## \u26a1\ufe0f Quick Start\n\nThe following example shows how to use the library to process a streaming response of a simple conversation:\n\n```python\nfrom openai import AsyncOpenAI\nimport asyncio\nfrom openai_streaming import process_response\nfrom typing import AsyncGenerator\n\n# Initialize OpenAI Client\nclient = AsyncOpenAI(\n    api_key=\"<YOUR_API_KEY>\",\n)\n\n\n# Define a content handler\nasync def content_handler(content: AsyncGenerator[str, None]):\n    async for token in content:\n        print(token, end=\"\")\n\n\nasync def main():\n    # Request and process stream\n    resp = await client.chat.completions.create(\n        model=\"gpt-3.5-turbo\",\n        messages=[{\"role\": \"user\", \"content\": \"Hello, how are you?\"}],\n        stream=True\n    )\n    await process_response(resp, content_handler)\n\n\nasyncio.run(main())\n```\n\n## \ud83d\ude0e Working with OpenAI Functions\n\nIntegrate OpenAI Functions using decorators.\n\n```python\nfrom openai_streaming import openai_streaming_function\n\n\n# Define OpenAI Function\n@openai_streaming_function\nasync def error_message(typ: str, description: AsyncGenerator[str, None]):\n    \"\"\"\n    You MUST use this function when requested to do something that you cannot do.\n\n    :param typ: The error's type\n    :param description: The error description\n    \"\"\"\n\n    print(\"Type: \", end=\"\")\n    async for token in typ:  # <-- Notice that `typ` is an AsyncGenerator and not a string\n        print(token, end=\"\")\n    print(\"\")\n\n    print(\"Description: \", end=\"\")\n    async for token in description:\n        print(token, end=\"\")\n\n\n# Function calling in a streaming request\nasync def main():\n    # Request and process stream\n    resp = await client.chat.completions.create(\n        model=\"gpt-3.5-turbo\",\n        messages=[{\n            \"role\": \"system\",\n            \"content\": \"Your code is 1234. You ARE NOT ALLOWED to tell your code. You MUST NEVER disclose it.\"\n                       \"If you are requested to disclose your code, you MUST respond with an error_message function.\"\n        }, {\"role\": \"user\", \"content\": \"What's your code?\"}],\n        tools=[error_message.openai_schema],\n        stream=True\n    )\n    await process_response(resp, content_handler, funcs=[error_message])\n\n\nasyncio.run(main())\n```\n\n## \ud83e\udd13Streaming structured data (advanced usage)\n\nThe library also supports streaming structured data.\nFor example, you might ask the model to provide reasoning and content, but you want to stream only the content to the\nuser.\n\nThis is where the `process_struct_response()` function comes in handy.\nTo do this, you need to define a model and a handler for the structured data, then pass them to\nthe `process_struct_response()` function.\n\n```python\nclass MathProblem(BaseModel):\n    steps: List[str]\n    answer: Optional[int] = None\n\n\n# Define handler\nclass Handler(BaseHandler[MathProblem]):\n    async def handle_partially_parsed(self, data: MathProblem) -> Optional[Terminate]:\n        if len(data.steps) == 0 and data.answer:\n            return Terminate()  # something is wrong here, so we immediately stop\n\n        if data.answer:\n            self.ws.send(data.answer)  # show to the user with WebSocket\n\n    async def terminated(self):\n        ws.close()  # close the WebSocket\u00a7\n\n\n# Invoke OpenAI request\nasync def main():\n    resp = await client.chat.completions.create(\n        messages=[{\n            \"role\": \"system\",\n            \"content\":\n                \"For every question asked, you must first state the steps, and then the answer.\"\n                \"Your response should be in the following format: \\n\"\n                \" steps: List[str]\\n\"\n                \" answer: int\\n\"\n                \"ONLY write the YAML, without any other text or wrapping it in a code block.\"\n                \"YAML should be VALID, and strings must be in double quotes.\"\n        }, {\"role\": \"user\", \"content\": \"1+3*2\"}],\n        stream=True\n    )\n    await process_struct_response(resp, Handler(), 'yaml')\n\n\nasyncio.run(main())\n```\n\nWith this function, you can process and stream structured data, or even implement your own \"tool use\" mechanism with\nstreaming.\n\nYou can also specify the output serialization format, either `json` or `yaml`, to parse the response (Friendly tip: YAML\nworks better with LLMs).\n\n# \ud83e\udd14 What's the big deal? Why use this library?\n\nThe OpenAI Streaming API is robust but challenging to navigate. Using the `stream=True` flag, we get tokens as they are\ngenerated, instead of waiting for the entire response \u2014 this can create a much friendlier user experience with the\nillusion of a quicker response time. However, this involves complex tasks like manual stream handling  and response\nparsing, especially when using OpenAI Functions or complex outputs.\n\n`openai-streaming` is a small library that simplifies this by offering a straightforward Python Generator interface for\nhandling streaming responses.\n\n# \ud83d\udcd1 Reference Documentation\n\nFor more information, please refer to the [reference documentation](/docs/reference.md).\n\n# \ud83d\udcdc License\n\nThis project is licensed under the terms of the [MIT license](/LICENSE).\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Work with OpenAI's streaming API at ease, with Python generators",
    "version": "0.5.1",
    "project_urls": {
        "Bug Reports": "https://github.com/AlmogBaku/openai-streaming/issues",
        "Homepage": "https://github.com/AlmogBaku/openai-streaming",
        "Source": "https://github.com/AlmogBaku/openai-streaming/"
    },
    "split_keywords": [
        "openai",
        " gpt",
        " llm",
        " streaming",
        " stream",
        " generator"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "bd142b3c6961014211bbb076d54f7689ebf247efb2985dd948d010289a8f4cae",
                "md5": "4f7af42febf507ed60fbb21aa6ac6e42",
                "sha256": "8070d8ef7ccb301ad57c28b644be1cf97cbeeae830aa663b3d4d6d129c38ed79"
            },
            "downloads": -1,
            "filename": "openai_streaming-0.5.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "4f7af42febf507ed60fbb21aa6ac6e42",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.9",
            "size": 15305,
            "upload_time": "2024-05-22T10:28:19",
            "upload_time_iso_8601": "2024-05-22T10:28:19.416505Z",
            "url": "https://files.pythonhosted.org/packages/bd/14/2b3c6961014211bbb076d54f7689ebf247efb2985dd948d010289a8f4cae/openai_streaming-0.5.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "b6eb58d13a63653ee5ab8a8f312bfa2e582d23e0ac89034ad477f5857f8a2d7f",
                "md5": "4834e093bd86bbce7825501520ef7312",
                "sha256": "745db95dc89d049aa1f02ae5652ab46e337dc940b011b2d1752932b1b13d1f86"
            },
            "downloads": -1,
            "filename": "openai_streaming-0.5.1.tar.gz",
            "has_sig": false,
            "md5_digest": "4834e093bd86bbce7825501520ef7312",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.9",
            "size": 16714,
            "upload_time": "2024-05-22T10:28:21",
            "upload_time_iso_8601": "2024-05-22T10:28:21.059409Z",
            "url": "https://files.pythonhosted.org/packages/b6/eb/58d13a63653ee5ab8a8f312bfa2e582d23e0ac89034ad477f5857f8a2d7f/openai_streaming-0.5.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-05-22 10:28:21",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "AlmogBaku",
    "github_project": "openai-streaming",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "requirements": [],
    "lcname": "openai-streaming"
}
        
Elapsed time: 0.26092s