# JAAI Hub
A Python package containing utilities and components for building streaming AI applications with OpenAI-compatible APIs. This package provides comprehensive streaming message handling, custom API development tools, and seamless integration with FastAPI applications.
## Installation
```bash
pip install jaai-hub
```
## Core Features
### 🔄 Streaming Message System
- **StreamingMessage**: Main class for handling streaming data with both sync and async generators
- **Event-based streaming**: Server-Sent Events (SSE) support with start, data, done, error events
- **Multi-modal support**: Text, images, attachments, sources, and status updates
- **Real-time processing**: Stream and process data as it's generated
### 🛠️ Custom API Framework
- **OpenAI-compatible APIs**: Build APIs that work with OpenAI client libraries
- **FastAPI integration**: Seamless integration with FastAPI applications
- **Health checks**: Standard health check endpoints
- **Error handling**: Robust error handling and status reporting
### 📊 Rich Data Types
- **Status**: Progress updates and completion states
- **Source**: Source information and references
- **Attachment**: File attachments with metadata
- **GeneratedImage**: AI-generated images with metadata
- **Plan/Step**: Multi-step planning and task execution
- **HiddenContext**: Internal context that doesn't appear in UI
## Quick Start
### Basic Streaming Message
```python
from jaai_hub.streaming_message import StreamingMessage, Status, Source
# Create a streaming message
def my_generator():
yield "Hello"
yield Status(text="Processing...")
yield Source(title="Example Source", url="https://example.com")
yield "World"
stream = StreamingMessage(my_generator())
for chunk in stream:
print(chunk)
```
### Building a Custom API
```python
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from jaai_hub.custom_api import (
ChatCompletionRequest,
health_check_endpoint,
create_chat_completions_endpoint,
)
from jaai_hub.streaming_message import StreamingMessage, Status
# Create your API router
router = APIRouter(tags=["your-service"])
@router.get("/health")
async def health():
return health_check_endpoint("your-service-name")
# Define your streaming function
async def your_stream_function(request: ChatCompletionRequest):
yield Status(type="basic", text="Processing request...")
# Your custom logic here
last_message = request.messages[-1].content if request.messages else ""
response = f"Echo: {last_message}"
yield response
yield Status(type="complete", text="Done!")
# Create the chat completions endpoint
chat_completion = create_chat_completions_endpoint(your_stream_function)
router.post("/chat/completions")(chat_completion)
```
## Advanced Examples
### Image Generation API
```python
from jaai_hub.streaming_message import GeneratedImage, Status
async def image_generation_stream(request: ChatCompletionRequest):
yield Status(type="basic", text="🖼️ Creating image...")
# Your image generation logic
prompt = request.messages[-1].content
image_b64 = await create_image(prompt)
yield GeneratedImage(
url=f"data:image/png;base64,{image_b64}",
prompt=prompt,
width=1024,
height=1024,
)
yield Status(type="complete", text="✅ Image created!")
```
### Research API with Sources
```python
from jaai_hub.streaming_message import Source, Status
async def research_stream(request: ChatCompletionRequest):
yield Status(type="basic", text="🔍 Starting research...")
query = request.messages[-1].content
sources = await perform_research(query)
for source_data in sources:
yield Source(
title=source_data["title"],
url=source_data["url"],
raw_content=source_data["content"]
)
yield "Based on the research findings..."
yield Status(type="complete", text="🎉 Research complete!")
```
### Multi-step Planning
```python
from jaai_hub.streaming_message import Plan, Step, Status
async def planning_stream(request: ChatCompletionRequest):
yield Status(type="basic", text="📋 Creating plan...")
task = request.messages[-1].content
steps = await create_plan(task)
plan = Plan(steps=[
Step(title=step["title"], task=step["task"], fulfilled=False)
for step in steps
])
yield plan
yield Status(type="complete", text="✅ Plan ready!")
```
## API Reference
### StreamingMessage Class
```python
StreamingMessage(source_gen: Union[Generator, AsyncGenerator])
```
- **source_gen**: Generator or async generator yielding StreamableType objects
- **Methods**:
- `get_message()`: Get current accumulated message
- `is_done()`: Check if streaming is complete
- `__iter__()` / `__aiter__()`: Iterate over streaming chunks
### Data Models
#### Status
```python
Status(
type: Literal["basic", "complete", "error"] = "basic",
text: str,
replace: bool = False
)
```
#### Source
```python
Source(
title: str,
url: Optional[str] = None,
raw_content: Optional[str] = None,
image_urls: Optional[List[ImageUrl]] = None
)
```
#### Attachment
```python
Attachment(
type: str,
name: str,
url: Optional[str] = None,
size: Optional[int] = None,
mimeType: Optional[str] = None,
base64: Optional[str] = None,
extractedText: Optional[str] = None
)
```
#### GeneratedImage
```python
GeneratedImage(
url: str,
prompt: Optional[str] = None,
width: Optional[int] = None,
height: Optional[int] = None
)
```
### Custom API Utilities
#### ChatCompletionRequest
```python
ChatCompletionRequest(
model: str,
messages: List[Message],
temperature: Optional[float] = 0.7,
max_tokens: Optional[int] = None,
stream: Optional[bool] = False
)
```
#### Utility Functions
- `health_check_endpoint(service_name)`: Standard health check response
- `create_chat_completions_endpoint(stream_func)`: Create OpenAI-compatible endpoint
- `create_data_chunk(content, model, content_type)`: Create SSE data chunks
## Integration Features
### OpenAI Compatibility
Works seamlessly with OpenAI client libraries:
```python
import openai
client = openai.OpenAI(base_url="http://your-api-url", api_key="dummy")
response = client.chat.completions.create(
model="your-model",
messages=[{"role": "user", "content": "Hello!"}],
stream=True
)
```
### FastAPI Integration
Automatic integration with FastAPI features:
- Request/response validation
- OpenAPI documentation
- Error handling
- Middleware support
## Best Practices
1. **Always implement health checks** using `health_check_endpoint()`
2. **Use streaming responses** for better user experience
3. **Yield status updates** to keep users informed of progress
4. **Handle errors gracefully** with appropriate status messages
5. **Follow OpenAI API conventions** for maximum compatibility
6. **Use appropriate data types** (Source, Attachment, etc.) for rich content
7. **Implement proper error handling** with error status types
## Requirements
- Python ≥ 3.11
- pydantic ≥ 2.4.0
- loguru ≥ 0.7.0
- requests ≥ 2.25.0
- langchain-community ≥ 0.0.10
- langchain-core ≥ 1.0
- langchain-openai ≥ 0.0.1
- markdownify ≥ 0.11.0
## License
Proprietary - JAAI Team
Raw data
{
"_id": null,
"home_page": null,
"name": "jaai-hub",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.11",
"maintainer_email": null,
"keywords": "ai, api, chat, langchain, openai, streaming",
"author": "JAAI Team",
"author_email": null,
"download_url": "https://files.pythonhosted.org/packages/41/cf/829436f785338dba403cb646a93d2a59463767b353d28b917c2a24fed8d9/jaai_hub-0.1.1.tar.gz",
"platform": null,
"description": "# JAAI Hub\n\nA Python package containing utilities and components for building streaming AI applications with OpenAI-compatible APIs. This package provides comprehensive streaming message handling, custom API development tools, and seamless integration with FastAPI applications.\n\n## Installation\n\n```bash\npip install jaai-hub\n```\n\n## Core Features\n\n### \ud83d\udd04 Streaming Message System\n- **StreamingMessage**: Main class for handling streaming data with both sync and async generators\n- **Event-based streaming**: Server-Sent Events (SSE) support with start, data, done, error events\n- **Multi-modal support**: Text, images, attachments, sources, and status updates\n- **Real-time processing**: Stream and process data as it's generated\n\n### \ud83d\udee0\ufe0f Custom API Framework\n- **OpenAI-compatible APIs**: Build APIs that work with OpenAI client libraries\n- **FastAPI integration**: Seamless integration with FastAPI applications\n- **Health checks**: Standard health check endpoints\n- **Error handling**: Robust error handling and status reporting\n\n### \ud83d\udcca Rich Data Types\n- **Status**: Progress updates and completion states\n- **Source**: Source information and references\n- **Attachment**: File attachments with metadata\n- **GeneratedImage**: AI-generated images with metadata\n- **Plan/Step**: Multi-step planning and task execution\n- **HiddenContext**: Internal context that doesn't appear in UI\n\n## Quick Start\n\n### Basic Streaming Message\n\n```python\nfrom jaai_hub.streaming_message import StreamingMessage, Status, Source\n\n# Create a streaming message\ndef my_generator():\n yield \"Hello\"\n yield Status(text=\"Processing...\")\n yield Source(title=\"Example Source\", url=\"https://example.com\")\n yield \"World\"\n\nstream = StreamingMessage(my_generator())\nfor chunk in stream:\n print(chunk)\n```\n\n### Building a Custom API\n\n```python\nfrom fastapi import APIRouter\nfrom fastapi.responses import StreamingResponse\nfrom jaai_hub.custom_api import (\n ChatCompletionRequest,\n health_check_endpoint,\n create_chat_completions_endpoint,\n)\nfrom jaai_hub.streaming_message import StreamingMessage, Status\n\n# Create your API router\nrouter = APIRouter(tags=[\"your-service\"])\n\n@router.get(\"/health\")\nasync def health():\n return health_check_endpoint(\"your-service-name\")\n\n# Define your streaming function\nasync def your_stream_function(request: ChatCompletionRequest):\n yield Status(type=\"basic\", text=\"Processing request...\")\n\n # Your custom logic here\n last_message = request.messages[-1].content if request.messages else \"\"\n response = f\"Echo: {last_message}\"\n\n yield response\n yield Status(type=\"complete\", text=\"Done!\")\n\n# Create the chat completions endpoint\nchat_completion = create_chat_completions_endpoint(your_stream_function)\nrouter.post(\"/chat/completions\")(chat_completion)\n```\n\n## Advanced Examples\n\n### Image Generation API\n\n```python\nfrom jaai_hub.streaming_message import GeneratedImage, Status\n\nasync def image_generation_stream(request: ChatCompletionRequest):\n yield Status(type=\"basic\", text=\"\ud83d\uddbc\ufe0f Creating image...\")\n\n # Your image generation logic\n prompt = request.messages[-1].content\n image_b64 = await create_image(prompt)\n\n yield GeneratedImage(\n url=f\"data:image/png;base64,{image_b64}\",\n prompt=prompt,\n width=1024,\n height=1024,\n )\n\n yield Status(type=\"complete\", text=\"\u2705 Image created!\")\n```\n\n### Research API with Sources\n\n```python\nfrom jaai_hub.streaming_message import Source, Status\n\nasync def research_stream(request: ChatCompletionRequest):\n yield Status(type=\"basic\", text=\"\ud83d\udd0d Starting research...\")\n\n query = request.messages[-1].content\n sources = await perform_research(query)\n\n for source_data in sources:\n yield Source(\n title=source_data[\"title\"],\n url=source_data[\"url\"],\n raw_content=source_data[\"content\"]\n )\n\n yield \"Based on the research findings...\"\n yield Status(type=\"complete\", text=\"\ud83c\udf89 Research complete!\")\n```\n\n### Multi-step Planning\n\n```python\nfrom jaai_hub.streaming_message import Plan, Step, Status\n\nasync def planning_stream(request: ChatCompletionRequest):\n yield Status(type=\"basic\", text=\"\ud83d\udccb Creating plan...\")\n\n task = request.messages[-1].content\n steps = await create_plan(task)\n\n plan = Plan(steps=[\n Step(title=step[\"title\"], task=step[\"task\"], fulfilled=False)\n for step in steps\n ])\n\n yield plan\n yield Status(type=\"complete\", text=\"\u2705 Plan ready!\")\n```\n\n## API Reference\n\n### StreamingMessage Class\n\n```python\nStreamingMessage(source_gen: Union[Generator, AsyncGenerator])\n```\n\n- **source_gen**: Generator or async generator yielding StreamableType objects\n- **Methods**:\n - `get_message()`: Get current accumulated message\n - `is_done()`: Check if streaming is complete\n - `__iter__()` / `__aiter__()`: Iterate over streaming chunks\n\n### Data Models\n\n#### Status\n```python\nStatus(\n type: Literal[\"basic\", \"complete\", \"error\"] = \"basic\",\n text: str,\n replace: bool = False\n)\n```\n\n#### Source\n```python\nSource(\n title: str,\n url: Optional[str] = None,\n raw_content: Optional[str] = None,\n image_urls: Optional[List[ImageUrl]] = None\n)\n```\n\n#### Attachment\n```python\nAttachment(\n type: str,\n name: str,\n url: Optional[str] = None,\n size: Optional[int] = None,\n mimeType: Optional[str] = None,\n base64: Optional[str] = None,\n extractedText: Optional[str] = None\n)\n```\n\n#### GeneratedImage\n```python\nGeneratedImage(\n url: str,\n prompt: Optional[str] = None,\n width: Optional[int] = None,\n height: Optional[int] = None\n)\n```\n\n### Custom API Utilities\n\n#### ChatCompletionRequest\n```python\nChatCompletionRequest(\n model: str,\n messages: List[Message],\n temperature: Optional[float] = 0.7,\n max_tokens: Optional[int] = None,\n stream: Optional[bool] = False\n)\n```\n\n#### Utility Functions\n\n- `health_check_endpoint(service_name)`: Standard health check response\n- `create_chat_completions_endpoint(stream_func)`: Create OpenAI-compatible endpoint\n- `create_data_chunk(content, model, content_type)`: Create SSE data chunks\n\n## Integration Features\n\n### OpenAI Compatibility\nWorks seamlessly with OpenAI client libraries:\n\n```python\nimport openai\n\nclient = openai.OpenAI(base_url=\"http://your-api-url\", api_key=\"dummy\")\nresponse = client.chat.completions.create(\n model=\"your-model\",\n messages=[{\"role\": \"user\", \"content\": \"Hello!\"}],\n stream=True\n)\n```\n\n### FastAPI Integration\nAutomatic integration with FastAPI features:\n- Request/response validation\n- OpenAPI documentation\n- Error handling\n- Middleware support\n\n## Best Practices\n\n1. **Always implement health checks** using `health_check_endpoint()`\n2. **Use streaming responses** for better user experience\n3. **Yield status updates** to keep users informed of progress\n4. **Handle errors gracefully** with appropriate status messages\n5. **Follow OpenAI API conventions** for maximum compatibility\n6. **Use appropriate data types** (Source, Attachment, etc.) for rich content\n7. **Implement proper error handling** with error status types\n\n## Requirements\n\n- Python \u2265 3.11\n- pydantic \u2265 2.4.0\n- loguru \u2265 0.7.0\n- requests \u2265 2.25.0\n- langchain-community \u2265 0.0.10\n- langchain-core \u2265 1.0\n- langchain-openai \u2265 0.0.1\n- markdownify \u2265 0.11.0\n\n## License\n\nProprietary - JAAI Team",
"bugtrack_url": null,
"license": "Proprietary",
"summary": "Main hub package for JAAI Hub utilities and components",
"version": "0.1.1",
"project_urls": null,
"split_keywords": [
"ai",
" api",
" chat",
" langchain",
" openai",
" streaming"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "9e65f850b9146eb5c5e611621fe5da36d84f149226d121da73a17148d18b9fd4",
"md5": "423b4661df2476478e493a78b20724fb",
"sha256": "454125bb913060a21359ff73f993e427946f90256e33214debba825d8965a842"
},
"downloads": -1,
"filename": "jaai_hub-0.1.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "423b4661df2476478e493a78b20724fb",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.11",
"size": 10031,
"upload_time": "2025-07-22T22:26:49",
"upload_time_iso_8601": "2025-07-22T22:26:49.673776Z",
"url": "https://files.pythonhosted.org/packages/9e/65/f850b9146eb5c5e611621fe5da36d84f149226d121da73a17148d18b9fd4/jaai_hub-0.1.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "41cf829436f785338dba403cb646a93d2a59463767b353d28b917c2a24fed8d9",
"md5": "884ad5340d2583098aad4b65f0c8ccc7",
"sha256": "ad221427633cc3009dc690876240d25b297d251cb2780cad3462751255177adc"
},
"downloads": -1,
"filename": "jaai_hub-0.1.1.tar.gz",
"has_sig": false,
"md5_digest": "884ad5340d2583098aad4b65f0c8ccc7",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.11",
"size": 9714,
"upload_time": "2025-07-22T22:26:48",
"upload_time_iso_8601": "2025-07-22T22:26:48.931542Z",
"url": "https://files.pythonhosted.org/packages/41/cf/829436f785338dba403cb646a93d2a59463767b353d28b917c2a24fed8d9/jaai_hub-0.1.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-22 22:26:48",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "jaai-hub"
}