# Django Streaming Coordinator
A Django-based system for managing long-running tasks with Server-Sent Events (SSE) streaming. Tasks continue running even if clients disconnect, and multiple clients can connect to the same task simultaneously.
## Features
- **Persistent Tasks**: Tasks continue running in the background even when clients disconnect
- **Multiple Clients**: Multiple clients can connect to the same task and receive real-time updates
- **Server-Sent Events (SSE)**: Uses SSE for efficient real-time streaming
- **Easy Task Creation**: Simple API for creating custom streaming tasks
- **Unix Socket Support**: Can bind to Unix sockets or TCP ports
- **Async/Await**: Built on modern Python asyncio for efficient concurrent operations
## Installation
```bash
# Install dependencies
poetry install
# Run migrations
poetry run python manage.py migrate
```
## Quick Start
### 1. Create a Custom Task
Subclass `StreamTask` and implement the `async process()` method:
```python
from streaming.models import StreamTask
import asyncio
class MyCustomTask(StreamTask):
# Add your custom fields
title = models.CharField(max_length=255)
async def process(self):
# Send start event
await self.send_event('start', {
'message': f'Starting task: {self.title}'
})
# Do your work and send events
for i in range(10):
await asyncio.sleep(1)
await self.send_event('progress', {
'step': i + 1,
'total': 10,
'message': f'Processing step {i + 1}'
})
# Send completion event
await self.send_event('complete', {
'message': 'Task completed successfully'
})
```
### 2. Run the Streaming Server
```bash
# Using Unix socket (default)
poetry run python manage.py runserver_stream
# Using TCP port
poetry run python manage.py runserver_stream --host 127.0.0.1 --port 8888
# Custom Unix socket path
poetry run python manage.py runserver_stream --socket /tmp/my-stream.sock
```
### 3. Create and Connect to a Task
```python
# Create a task instance
from streaming.models import ExampleTask
from streaming.coordinator import coordinator
task = ExampleTask.objects.create(message="Hello, World!")
# Start the task (will run in background)
await coordinator.start_task(task, 'ExampleTask')
# Connect via HTTP SSE
# GET /stream/ExampleTask/{task_id}
```
### 4. Connect from Client
Using JavaScript:
```javascript
const eventSource = new EventSource('/stream/ExampleTask/1');
eventSource.addEventListener('start', (event) => {
const data = JSON.parse(event.data);
console.log('Task started:', data);
});
eventSource.addEventListener('progress', (event) => {
const data = JSON.parse(event.data);
console.log('Progress:', data);
});
eventSource.addEventListener('complete', (event) => {
const data = JSON.parse(event.data);
console.log('Task completed:', data);
eventSource.close();
});
eventSource.addEventListener('error', (event) => {
const data = JSON.parse(event.data);
console.error('Error:', data);
eventSource.close();
});
```
Using Python (httpx + httpx-sse):
```python
import httpx
from httpx_sse import aconnect_sse
import json
async with httpx.AsyncClient() as client:
async with aconnect_sse(
client, 'GET', 'http://127.0.0.1:8888/stream/ExampleTask/1'
) as event_source:
async for event in event_source.aiter_sse():
data = json.loads(event.data)
print(f"{event.event}: {data}")
if event.event == 'complete':
break
```
## API
### StreamTask Model
Base abstract model for all streaming tasks.
**Fields:**
- `created_at`: Timestamp when task was created
- `updated_at`: Timestamp when task was last updated
- `completed_at`: Timestamp when task completed (null if not completed)
**Methods:**
#### `async send_event(event_type: str, data: dict)`
Send an event to all connected clients.
**Parameters:**
- `event_type`: Type of event (e.g., 'start', 'progress', 'complete', 'error')
- `data`: Dictionary of data to send to clients
**Example:**
```python
await self.send_event('progress', {
'step': 5,
'total': 10,
'message': 'Halfway there!'
})
```
#### `async process()`
Override this method to implement your task logic. This is where your task's work happens.
#### `async mark_completed()`
Mark the task as completed. Called automatically by the coordinator when `process()` finishes.
### TaskCoordinator
Singleton that manages running tasks.
**Methods:**
#### `async start_task(task_instance: StreamTask, model_name: str)`
Start a task in the background.
#### `async get_task_instance(model_name: str, task_id: int) -> StreamTask`
Get a running task instance or load from database.
#### `is_task_running(model_name: str, task_id: int) -> bool`
Check if a task is currently running.
## HTTP Endpoints
### `GET /stream/{model_name}/{task_id}`
Connect to a task's SSE stream.
**Response Format:**
```
event: start
data: {"message": "...", "_task_id": 1, "_model": "ExampleTask", "_timestamp": "..."}
event: progress
data: {"step": 1, "total": 3, "_task_id": 1, "_model": "ExampleTask", "_timestamp": "..."}
event: complete
data: {"message": "...", "_task_id": 1, "_model": "ExampleTask", "_timestamp": "..."}
```
### `GET /health`
Health check endpoint. Returns `200 OK`.
## Testing
Run tests with Django's test runner:
```bash
# Run all tests
poetry run python manage.py test
# Run specific test class
poetry run python manage.py test streaming.tests.StreamingSystemTests
# Run with verbose output
poetry run python manage.py test --verbosity=2
```
## Architecture
1. **StreamTask**: Abstract Django model that defines the interface for streaming tasks
2. **TaskCoordinator**: Singleton that manages task lifecycle and keeps tasks running
3. **SSE Server**: asgineer-based ASGI server that handles HTTP SSE connections
4. **Management Command**: `runserver_stream` to start the server on Unix socket or TCP port
## Example: ExampleTask
The project includes a simple example task:
```python
class ExampleTask(StreamTask):
message = models.CharField(max_length=255, default="Hello from ExampleTask")
async def process(self):
await self.send_event('start', {
'message': self.message,
'total_steps': 3
})
for i in range(1, 4):
await asyncio.sleep(2)
await self.send_event('progress', {
'step': i,
'total_steps': 3,
'message': f"Step {i} of 3"
})
await self.send_event('complete', {
'message': 'Task completed successfully'
})
```
Test it:
```bash
# Start server
poetry run python manage.py runserver_stream --port 8888
# In another terminal, create a task
poetry run python manage.py shell
>>> from streaming.models import ExampleTask
>>> from streaming.coordinator import coordinator
>>> import asyncio
>>> task = ExampleTask.objects.create(message="Test")
>>> asyncio.run(coordinator.start_task(task, 'ExampleTask'))
# Connect with curl or browser
curl http://127.0.0.1:8888/stream/ExampleTask/1
```
## Requirements
- Python 3.11+
- Django 5.2+
- asgineer
- uvicorn
- httpx (for testing)
- httpx-sse (for testing)
## License
MIT
Raw data
{
"_id": null,
"home_page": null,
"name": "django-streaming-coordinator",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.11",
"maintainer_email": null,
"keywords": "django, sse, server-sent-events, streaming, async, tasks, background-tasks",
"author": null,
"author_email": "Justus H\u00e4m\u00e4l\u00e4inen <justushamalainen@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/a9/d7/3c327a21daf4ecfb96242ac427fe30e0adcd73cc454df865b2a9ecb957d5/django_streaming_coordinator-0.1.0.tar.gz",
"platform": null,
"description": "# Django Streaming Coordinator\n\nA Django-based system for managing long-running tasks with Server-Sent Events (SSE) streaming. Tasks continue running even if clients disconnect, and multiple clients can connect to the same task simultaneously.\n\n## Features\n\n- **Persistent Tasks**: Tasks continue running in the background even when clients disconnect\n- **Multiple Clients**: Multiple clients can connect to the same task and receive real-time updates\n- **Server-Sent Events (SSE)**: Uses SSE for efficient real-time streaming\n- **Easy Task Creation**: Simple API for creating custom streaming tasks\n- **Unix Socket Support**: Can bind to Unix sockets or TCP ports\n- **Async/Await**: Built on modern Python asyncio for efficient concurrent operations\n\n## Installation\n\n```bash\n# Install dependencies\npoetry install\n\n# Run migrations\npoetry run python manage.py migrate\n```\n\n## Quick Start\n\n### 1. Create a Custom Task\n\nSubclass `StreamTask` and implement the `async process()` method:\n\n```python\nfrom streaming.models import StreamTask\nimport asyncio\n\nclass MyCustomTask(StreamTask):\n # Add your custom fields\n title = models.CharField(max_length=255)\n\n async def process(self):\n # Send start event\n await self.send_event('start', {\n 'message': f'Starting task: {self.title}'\n })\n\n # Do your work and send events\n for i in range(10):\n await asyncio.sleep(1)\n await self.send_event('progress', {\n 'step': i + 1,\n 'total': 10,\n 'message': f'Processing step {i + 1}'\n })\n\n # Send completion event\n await self.send_event('complete', {\n 'message': 'Task completed successfully'\n })\n```\n\n### 2. Run the Streaming Server\n\n```bash\n# Using Unix socket (default)\npoetry run python manage.py runserver_stream\n\n# Using TCP port\npoetry run python manage.py runserver_stream --host 127.0.0.1 --port 8888\n\n# Custom Unix socket path\npoetry run python manage.py runserver_stream --socket /tmp/my-stream.sock\n```\n\n### 3. Create and Connect to a Task\n\n```python\n# Create a task instance\nfrom streaming.models import ExampleTask\nfrom streaming.coordinator import coordinator\n\ntask = ExampleTask.objects.create(message=\"Hello, World!\")\n\n# Start the task (will run in background)\nawait coordinator.start_task(task, 'ExampleTask')\n\n# Connect via HTTP SSE\n# GET /stream/ExampleTask/{task_id}\n```\n\n### 4. Connect from Client\n\nUsing JavaScript:\n\n```javascript\nconst eventSource = new EventSource('/stream/ExampleTask/1');\n\neventSource.addEventListener('start', (event) => {\n const data = JSON.parse(event.data);\n console.log('Task started:', data);\n});\n\neventSource.addEventListener('progress', (event) => {\n const data = JSON.parse(event.data);\n console.log('Progress:', data);\n});\n\neventSource.addEventListener('complete', (event) => {\n const data = JSON.parse(event.data);\n console.log('Task completed:', data);\n eventSource.close();\n});\n\neventSource.addEventListener('error', (event) => {\n const data = JSON.parse(event.data);\n console.error('Error:', data);\n eventSource.close();\n});\n```\n\nUsing Python (httpx + httpx-sse):\n\n```python\nimport httpx\nfrom httpx_sse import aconnect_sse\nimport json\n\nasync with httpx.AsyncClient() as client:\n async with aconnect_sse(\n client, 'GET', 'http://127.0.0.1:8888/stream/ExampleTask/1'\n ) as event_source:\n async for event in event_source.aiter_sse():\n data = json.loads(event.data)\n print(f\"{event.event}: {data}\")\n\n if event.event == 'complete':\n break\n```\n\n## API\n\n### StreamTask Model\n\nBase abstract model for all streaming tasks.\n\n**Fields:**\n- `created_at`: Timestamp when task was created\n- `updated_at`: Timestamp when task was last updated\n- `completed_at`: Timestamp when task completed (null if not completed)\n\n**Methods:**\n\n#### `async send_event(event_type: str, data: dict)`\nSend an event to all connected clients.\n\n**Parameters:**\n- `event_type`: Type of event (e.g., 'start', 'progress', 'complete', 'error')\n- `data`: Dictionary of data to send to clients\n\n**Example:**\n```python\nawait self.send_event('progress', {\n 'step': 5,\n 'total': 10,\n 'message': 'Halfway there!'\n})\n```\n\n#### `async process()`\nOverride this method to implement your task logic. This is where your task's work happens.\n\n#### `async mark_completed()`\nMark the task as completed. Called automatically by the coordinator when `process()` finishes.\n\n### TaskCoordinator\n\nSingleton that manages running tasks.\n\n**Methods:**\n\n#### `async start_task(task_instance: StreamTask, model_name: str)`\nStart a task in the background.\n\n#### `async get_task_instance(model_name: str, task_id: int) -> StreamTask`\nGet a running task instance or load from database.\n\n#### `is_task_running(model_name: str, task_id: int) -> bool`\nCheck if a task is currently running.\n\n## HTTP Endpoints\n\n### `GET /stream/{model_name}/{task_id}`\nConnect to a task's SSE stream.\n\n**Response Format:**\n```\nevent: start\ndata: {\"message\": \"...\", \"_task_id\": 1, \"_model\": \"ExampleTask\", \"_timestamp\": \"...\"}\n\nevent: progress\ndata: {\"step\": 1, \"total\": 3, \"_task_id\": 1, \"_model\": \"ExampleTask\", \"_timestamp\": \"...\"}\n\nevent: complete\ndata: {\"message\": \"...\", \"_task_id\": 1, \"_model\": \"ExampleTask\", \"_timestamp\": \"...\"}\n```\n\n### `GET /health`\nHealth check endpoint. Returns `200 OK`.\n\n## Testing\n\nRun tests with Django's test runner:\n\n```bash\n# Run all tests\npoetry run python manage.py test\n\n# Run specific test class\npoetry run python manage.py test streaming.tests.StreamingSystemTests\n\n# Run with verbose output\npoetry run python manage.py test --verbosity=2\n```\n\n## Architecture\n\n1. **StreamTask**: Abstract Django model that defines the interface for streaming tasks\n2. **TaskCoordinator**: Singleton that manages task lifecycle and keeps tasks running\n3. **SSE Server**: asgineer-based ASGI server that handles HTTP SSE connections\n4. **Management Command**: `runserver_stream` to start the server on Unix socket or TCP port\n\n## Example: ExampleTask\n\nThe project includes a simple example task:\n\n```python\nclass ExampleTask(StreamTask):\n message = models.CharField(max_length=255, default=\"Hello from ExampleTask\")\n\n async def process(self):\n await self.send_event('start', {\n 'message': self.message,\n 'total_steps': 3\n })\n\n for i in range(1, 4):\n await asyncio.sleep(2)\n await self.send_event('progress', {\n 'step': i,\n 'total_steps': 3,\n 'message': f\"Step {i} of 3\"\n })\n\n await self.send_event('complete', {\n 'message': 'Task completed successfully'\n })\n```\n\nTest it:\n\n```bash\n# Start server\npoetry run python manage.py runserver_stream --port 8888\n\n# In another terminal, create a task\npoetry run python manage.py shell\n>>> from streaming.models import ExampleTask\n>>> from streaming.coordinator import coordinator\n>>> import asyncio\n>>> task = ExampleTask.objects.create(message=\"Test\")\n>>> asyncio.run(coordinator.start_task(task, 'ExampleTask'))\n\n# Connect with curl or browser\ncurl http://127.0.0.1:8888/stream/ExampleTask/1\n```\n\n## Requirements\n\n- Python 3.11+\n- Django 5.2+\n- asgineer\n- uvicorn\n- httpx (for testing)\n- httpx-sse (for testing)\n\n## License\n\nMIT\n",
"bugtrack_url": null,
"license": null,
"summary": "A Django-based system for managing long-running tasks with Server-Sent Events (SSE) streaming",
"version": "0.1.0",
"project_urls": {
"Documentation": "https://github.com/Technology-Company/django-streaming-coordinator#readme",
"Homepage": "https://github.com/Technology-Company/django-streaming-coordinator",
"Issues": "https://github.com/Technology-Company/django-streaming-coordinator/issues",
"Repository": "https://github.com/Technology-Company/django-streaming-coordinator"
},
"split_keywords": [
"django",
" sse",
" server-sent-events",
" streaming",
" async",
" tasks",
" background-tasks"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "509f61fdf422144f5ee6025fa8c7a8ddfe9195ed2182d46bd25a7b3445be2da1",
"md5": "294a833a5148c2e19a4ea24374cdaebc",
"sha256": "b8e4328c8337555848a4ea2ddf39a3074590be32f21186121704e12063da844a"
},
"downloads": -1,
"filename": "django_streaming_coordinator-0.1.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "294a833a5148c2e19a4ea24374cdaebc",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.11",
"size": 11646,
"upload_time": "2025-10-15T14:50:32",
"upload_time_iso_8601": "2025-10-15T14:50:32.544940Z",
"url": "https://files.pythonhosted.org/packages/50/9f/61fdf422144f5ee6025fa8c7a8ddfe9195ed2182d46bd25a7b3445be2da1/django_streaming_coordinator-0.1.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "a9d73c327a21daf4ecfb96242ac427fe30e0adcd73cc454df865b2a9ecb957d5",
"md5": "21cbfc60c89cbac9ce4bb064c777e285",
"sha256": "1b24f8fc7c3368e5cd63829fae5c8e1b44082756077cb09991796dd8ae468736"
},
"downloads": -1,
"filename": "django_streaming_coordinator-0.1.0.tar.gz",
"has_sig": false,
"md5_digest": "21cbfc60c89cbac9ce4bb064c777e285",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.11",
"size": 12106,
"upload_time": "2025-10-15T14:50:33",
"upload_time_iso_8601": "2025-10-15T14:50:33.979573Z",
"url": "https://files.pythonhosted.org/packages/a9/d7/3c327a21daf4ecfb96242ac427fe30e0adcd73cc454df865b2a9ecb957d5/django_streaming_coordinator-0.1.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-10-15 14:50:33",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "Technology-Company",
"github_project": "django-streaming-coordinator#readme",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "django-streaming-coordinator"
}