django-streaming-coordinator


Namedjango-streaming-coordinator JSON
Version 0.1.0 PyPI version JSON
download
home_pageNone
SummaryA Django-based system for managing long-running tasks with Server-Sent Events (SSE) streaming
upload_time2025-10-15 14:50:33
maintainerNone
docs_urlNone
authorNone
requires_python>=3.11
licenseNone
keywords django sse server-sent-events streaming async tasks background-tasks
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Django Streaming Coordinator

A Django-based system for managing long-running tasks with Server-Sent Events (SSE) streaming. Tasks continue running even if clients disconnect, and multiple clients can connect to the same task simultaneously.

## Features

- **Persistent Tasks**: Tasks continue running in the background even when clients disconnect
- **Multiple Clients**: Multiple clients can connect to the same task and receive real-time updates
- **Server-Sent Events (SSE)**: Uses SSE for efficient real-time streaming
- **Easy Task Creation**: Simple API for creating custom streaming tasks
- **Unix Socket Support**: Can bind to Unix sockets or TCP ports
- **Async/Await**: Built on modern Python asyncio for efficient concurrent operations

## Installation

```bash
# Install dependencies
poetry install

# Run migrations
poetry run python manage.py migrate
```

## Quick Start

### 1. Create a Custom Task

Subclass `StreamTask` and implement the `async process()` method:

```python
from streaming.models import StreamTask
import asyncio

class MyCustomTask(StreamTask):
    # Add your custom fields
    title = models.CharField(max_length=255)

    async def process(self):
        # Send start event
        await self.send_event('start', {
            'message': f'Starting task: {self.title}'
        })

        # Do your work and send events
        for i in range(10):
            await asyncio.sleep(1)
            await self.send_event('progress', {
                'step': i + 1,
                'total': 10,
                'message': f'Processing step {i + 1}'
            })

        # Send completion event
        await self.send_event('complete', {
            'message': 'Task completed successfully'
        })
```

### 2. Run the Streaming Server

```bash
# Using Unix socket (default)
poetry run python manage.py runserver_stream

# Using TCP port
poetry run python manage.py runserver_stream --host 127.0.0.1 --port 8888

# Custom Unix socket path
poetry run python manage.py runserver_stream --socket /tmp/my-stream.sock
```

### 3. Create and Connect to a Task

```python
# Create a task instance
from streaming.models import ExampleTask
from streaming.coordinator import coordinator

task = ExampleTask.objects.create(message="Hello, World!")

# Start the task (will run in background)
await coordinator.start_task(task, 'ExampleTask')

# Connect via HTTP SSE
# GET /stream/ExampleTask/{task_id}
```

### 4. Connect from Client

Using JavaScript:

```javascript
const eventSource = new EventSource('/stream/ExampleTask/1');

eventSource.addEventListener('start', (event) => {
    const data = JSON.parse(event.data);
    console.log('Task started:', data);
});

eventSource.addEventListener('progress', (event) => {
    const data = JSON.parse(event.data);
    console.log('Progress:', data);
});

eventSource.addEventListener('complete', (event) => {
    const data = JSON.parse(event.data);
    console.log('Task completed:', data);
    eventSource.close();
});

eventSource.addEventListener('error', (event) => {
    const data = JSON.parse(event.data);
    console.error('Error:', data);
    eventSource.close();
});
```

Using Python (httpx + httpx-sse):

```python
import httpx
from httpx_sse import aconnect_sse
import json

async with httpx.AsyncClient() as client:
    async with aconnect_sse(
        client, 'GET', 'http://127.0.0.1:8888/stream/ExampleTask/1'
    ) as event_source:
        async for event in event_source.aiter_sse():
            data = json.loads(event.data)
            print(f"{event.event}: {data}")

            if event.event == 'complete':
                break
```

## API

### StreamTask Model

Base abstract model for all streaming tasks.

**Fields:**
- `created_at`: Timestamp when task was created
- `updated_at`: Timestamp when task was last updated
- `completed_at`: Timestamp when task completed (null if not completed)

**Methods:**

#### `async send_event(event_type: str, data: dict)`
Send an event to all connected clients.

**Parameters:**
- `event_type`: Type of event (e.g., 'start', 'progress', 'complete', 'error')
- `data`: Dictionary of data to send to clients

**Example:**
```python
await self.send_event('progress', {
    'step': 5,
    'total': 10,
    'message': 'Halfway there!'
})
```

#### `async process()`
Override this method to implement your task logic. This is where your task's work happens.

#### `async mark_completed()`
Mark the task as completed. Called automatically by the coordinator when `process()` finishes.

### TaskCoordinator

Singleton that manages running tasks.

**Methods:**

#### `async start_task(task_instance: StreamTask, model_name: str)`
Start a task in the background.

#### `async get_task_instance(model_name: str, task_id: int) -> StreamTask`
Get a running task instance or load from database.

#### `is_task_running(model_name: str, task_id: int) -> bool`
Check if a task is currently running.

## HTTP Endpoints

### `GET /stream/{model_name}/{task_id}`
Connect to a task's SSE stream.

**Response Format:**
```
event: start
data: {"message": "...", "_task_id": 1, "_model": "ExampleTask", "_timestamp": "..."}

event: progress
data: {"step": 1, "total": 3, "_task_id": 1, "_model": "ExampleTask", "_timestamp": "..."}

event: complete
data: {"message": "...", "_task_id": 1, "_model": "ExampleTask", "_timestamp": "..."}
```

### `GET /health`
Health check endpoint. Returns `200 OK`.

## Testing

Run tests with Django's test runner:

```bash
# Run all tests
poetry run python manage.py test

# Run specific test class
poetry run python manage.py test streaming.tests.StreamingSystemTests

# Run with verbose output
poetry run python manage.py test --verbosity=2
```

## Architecture

1. **StreamTask**: Abstract Django model that defines the interface for streaming tasks
2. **TaskCoordinator**: Singleton that manages task lifecycle and keeps tasks running
3. **SSE Server**: asgineer-based ASGI server that handles HTTP SSE connections
4. **Management Command**: `runserver_stream` to start the server on Unix socket or TCP port

## Example: ExampleTask

The project includes a simple example task:

```python
class ExampleTask(StreamTask):
    message = models.CharField(max_length=255, default="Hello from ExampleTask")

    async def process(self):
        await self.send_event('start', {
            'message': self.message,
            'total_steps': 3
        })

        for i in range(1, 4):
            await asyncio.sleep(2)
            await self.send_event('progress', {
                'step': i,
                'total_steps': 3,
                'message': f"Step {i} of 3"
            })

        await self.send_event('complete', {
            'message': 'Task completed successfully'
        })
```

Test it:

```bash
# Start server
poetry run python manage.py runserver_stream --port 8888

# In another terminal, create a task
poetry run python manage.py shell
>>> from streaming.models import ExampleTask
>>> from streaming.coordinator import coordinator
>>> import asyncio
>>> task = ExampleTask.objects.create(message="Test")
>>> asyncio.run(coordinator.start_task(task, 'ExampleTask'))

# Connect with curl or browser
curl http://127.0.0.1:8888/stream/ExampleTask/1
```

## Requirements

- Python 3.11+
- Django 5.2+
- asgineer
- uvicorn
- httpx (for testing)
- httpx-sse (for testing)

## License

MIT

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "django-streaming-coordinator",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.11",
    "maintainer_email": null,
    "keywords": "django, sse, server-sent-events, streaming, async, tasks, background-tasks",
    "author": null,
    "author_email": "Justus H\u00e4m\u00e4l\u00e4inen <justushamalainen@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/a9/d7/3c327a21daf4ecfb96242ac427fe30e0adcd73cc454df865b2a9ecb957d5/django_streaming_coordinator-0.1.0.tar.gz",
    "platform": null,
    "description": "# Django Streaming Coordinator\n\nA Django-based system for managing long-running tasks with Server-Sent Events (SSE) streaming. Tasks continue running even if clients disconnect, and multiple clients can connect to the same task simultaneously.\n\n## Features\n\n- **Persistent Tasks**: Tasks continue running in the background even when clients disconnect\n- **Multiple Clients**: Multiple clients can connect to the same task and receive real-time updates\n- **Server-Sent Events (SSE)**: Uses SSE for efficient real-time streaming\n- **Easy Task Creation**: Simple API for creating custom streaming tasks\n- **Unix Socket Support**: Can bind to Unix sockets or TCP ports\n- **Async/Await**: Built on modern Python asyncio for efficient concurrent operations\n\n## Installation\n\n```bash\n# Install dependencies\npoetry install\n\n# Run migrations\npoetry run python manage.py migrate\n```\n\n## Quick Start\n\n### 1. Create a Custom Task\n\nSubclass `StreamTask` and implement the `async process()` method:\n\n```python\nfrom streaming.models import StreamTask\nimport asyncio\n\nclass MyCustomTask(StreamTask):\n    # Add your custom fields\n    title = models.CharField(max_length=255)\n\n    async def process(self):\n        # Send start event\n        await self.send_event('start', {\n            'message': f'Starting task: {self.title}'\n        })\n\n        # Do your work and send events\n        for i in range(10):\n            await asyncio.sleep(1)\n            await self.send_event('progress', {\n                'step': i + 1,\n                'total': 10,\n                'message': f'Processing step {i + 1}'\n            })\n\n        # Send completion event\n        await self.send_event('complete', {\n            'message': 'Task completed successfully'\n        })\n```\n\n### 2. Run the Streaming Server\n\n```bash\n# Using Unix socket (default)\npoetry run python manage.py runserver_stream\n\n# Using TCP port\npoetry run python manage.py runserver_stream --host 127.0.0.1 --port 8888\n\n# Custom Unix socket path\npoetry run python manage.py runserver_stream --socket /tmp/my-stream.sock\n```\n\n### 3. Create and Connect to a Task\n\n```python\n# Create a task instance\nfrom streaming.models import ExampleTask\nfrom streaming.coordinator import coordinator\n\ntask = ExampleTask.objects.create(message=\"Hello, World!\")\n\n# Start the task (will run in background)\nawait coordinator.start_task(task, 'ExampleTask')\n\n# Connect via HTTP SSE\n# GET /stream/ExampleTask/{task_id}\n```\n\n### 4. Connect from Client\n\nUsing JavaScript:\n\n```javascript\nconst eventSource = new EventSource('/stream/ExampleTask/1');\n\neventSource.addEventListener('start', (event) => {\n    const data = JSON.parse(event.data);\n    console.log('Task started:', data);\n});\n\neventSource.addEventListener('progress', (event) => {\n    const data = JSON.parse(event.data);\n    console.log('Progress:', data);\n});\n\neventSource.addEventListener('complete', (event) => {\n    const data = JSON.parse(event.data);\n    console.log('Task completed:', data);\n    eventSource.close();\n});\n\neventSource.addEventListener('error', (event) => {\n    const data = JSON.parse(event.data);\n    console.error('Error:', data);\n    eventSource.close();\n});\n```\n\nUsing Python (httpx + httpx-sse):\n\n```python\nimport httpx\nfrom httpx_sse import aconnect_sse\nimport json\n\nasync with httpx.AsyncClient() as client:\n    async with aconnect_sse(\n        client, 'GET', 'http://127.0.0.1:8888/stream/ExampleTask/1'\n    ) as event_source:\n        async for event in event_source.aiter_sse():\n            data = json.loads(event.data)\n            print(f\"{event.event}: {data}\")\n\n            if event.event == 'complete':\n                break\n```\n\n## API\n\n### StreamTask Model\n\nBase abstract model for all streaming tasks.\n\n**Fields:**\n- `created_at`: Timestamp when task was created\n- `updated_at`: Timestamp when task was last updated\n- `completed_at`: Timestamp when task completed (null if not completed)\n\n**Methods:**\n\n#### `async send_event(event_type: str, data: dict)`\nSend an event to all connected clients.\n\n**Parameters:**\n- `event_type`: Type of event (e.g., 'start', 'progress', 'complete', 'error')\n- `data`: Dictionary of data to send to clients\n\n**Example:**\n```python\nawait self.send_event('progress', {\n    'step': 5,\n    'total': 10,\n    'message': 'Halfway there!'\n})\n```\n\n#### `async process()`\nOverride this method to implement your task logic. This is where your task's work happens.\n\n#### `async mark_completed()`\nMark the task as completed. Called automatically by the coordinator when `process()` finishes.\n\n### TaskCoordinator\n\nSingleton that manages running tasks.\n\n**Methods:**\n\n#### `async start_task(task_instance: StreamTask, model_name: str)`\nStart a task in the background.\n\n#### `async get_task_instance(model_name: str, task_id: int) -> StreamTask`\nGet a running task instance or load from database.\n\n#### `is_task_running(model_name: str, task_id: int) -> bool`\nCheck if a task is currently running.\n\n## HTTP Endpoints\n\n### `GET /stream/{model_name}/{task_id}`\nConnect to a task's SSE stream.\n\n**Response Format:**\n```\nevent: start\ndata: {\"message\": \"...\", \"_task_id\": 1, \"_model\": \"ExampleTask\", \"_timestamp\": \"...\"}\n\nevent: progress\ndata: {\"step\": 1, \"total\": 3, \"_task_id\": 1, \"_model\": \"ExampleTask\", \"_timestamp\": \"...\"}\n\nevent: complete\ndata: {\"message\": \"...\", \"_task_id\": 1, \"_model\": \"ExampleTask\", \"_timestamp\": \"...\"}\n```\n\n### `GET /health`\nHealth check endpoint. Returns `200 OK`.\n\n## Testing\n\nRun tests with Django's test runner:\n\n```bash\n# Run all tests\npoetry run python manage.py test\n\n# Run specific test class\npoetry run python manage.py test streaming.tests.StreamingSystemTests\n\n# Run with verbose output\npoetry run python manage.py test --verbosity=2\n```\n\n## Architecture\n\n1. **StreamTask**: Abstract Django model that defines the interface for streaming tasks\n2. **TaskCoordinator**: Singleton that manages task lifecycle and keeps tasks running\n3. **SSE Server**: asgineer-based ASGI server that handles HTTP SSE connections\n4. **Management Command**: `runserver_stream` to start the server on Unix socket or TCP port\n\n## Example: ExampleTask\n\nThe project includes a simple example task:\n\n```python\nclass ExampleTask(StreamTask):\n    message = models.CharField(max_length=255, default=\"Hello from ExampleTask\")\n\n    async def process(self):\n        await self.send_event('start', {\n            'message': self.message,\n            'total_steps': 3\n        })\n\n        for i in range(1, 4):\n            await asyncio.sleep(2)\n            await self.send_event('progress', {\n                'step': i,\n                'total_steps': 3,\n                'message': f\"Step {i} of 3\"\n            })\n\n        await self.send_event('complete', {\n            'message': 'Task completed successfully'\n        })\n```\n\nTest it:\n\n```bash\n# Start server\npoetry run python manage.py runserver_stream --port 8888\n\n# In another terminal, create a task\npoetry run python manage.py shell\n>>> from streaming.models import ExampleTask\n>>> from streaming.coordinator import coordinator\n>>> import asyncio\n>>> task = ExampleTask.objects.create(message=\"Test\")\n>>> asyncio.run(coordinator.start_task(task, 'ExampleTask'))\n\n# Connect with curl or browser\ncurl http://127.0.0.1:8888/stream/ExampleTask/1\n```\n\n## Requirements\n\n- Python 3.11+\n- Django 5.2+\n- asgineer\n- uvicorn\n- httpx (for testing)\n- httpx-sse (for testing)\n\n## License\n\nMIT\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "A Django-based system for managing long-running tasks with Server-Sent Events (SSE) streaming",
    "version": "0.1.0",
    "project_urls": {
        "Documentation": "https://github.com/Technology-Company/django-streaming-coordinator#readme",
        "Homepage": "https://github.com/Technology-Company/django-streaming-coordinator",
        "Issues": "https://github.com/Technology-Company/django-streaming-coordinator/issues",
        "Repository": "https://github.com/Technology-Company/django-streaming-coordinator"
    },
    "split_keywords": [
        "django",
        " sse",
        " server-sent-events",
        " streaming",
        " async",
        " tasks",
        " background-tasks"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "509f61fdf422144f5ee6025fa8c7a8ddfe9195ed2182d46bd25a7b3445be2da1",
                "md5": "294a833a5148c2e19a4ea24374cdaebc",
                "sha256": "b8e4328c8337555848a4ea2ddf39a3074590be32f21186121704e12063da844a"
            },
            "downloads": -1,
            "filename": "django_streaming_coordinator-0.1.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "294a833a5148c2e19a4ea24374cdaebc",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.11",
            "size": 11646,
            "upload_time": "2025-10-15T14:50:32",
            "upload_time_iso_8601": "2025-10-15T14:50:32.544940Z",
            "url": "https://files.pythonhosted.org/packages/50/9f/61fdf422144f5ee6025fa8c7a8ddfe9195ed2182d46bd25a7b3445be2da1/django_streaming_coordinator-0.1.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "a9d73c327a21daf4ecfb96242ac427fe30e0adcd73cc454df865b2a9ecb957d5",
                "md5": "21cbfc60c89cbac9ce4bb064c777e285",
                "sha256": "1b24f8fc7c3368e5cd63829fae5c8e1b44082756077cb09991796dd8ae468736"
            },
            "downloads": -1,
            "filename": "django_streaming_coordinator-0.1.0.tar.gz",
            "has_sig": false,
            "md5_digest": "21cbfc60c89cbac9ce4bb064c777e285",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.11",
            "size": 12106,
            "upload_time": "2025-10-15T14:50:33",
            "upload_time_iso_8601": "2025-10-15T14:50:33.979573Z",
            "url": "https://files.pythonhosted.org/packages/a9/d7/3c327a21daf4ecfb96242ac427fe30e0adcd73cc454df865b2a9ecb957d5/django_streaming_coordinator-0.1.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-10-15 14:50:33",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "Technology-Company",
    "github_project": "django-streaming-coordinator#readme",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "django-streaming-coordinator"
}
        
Elapsed time: 3.01843s