django-pgwatch


Namedjango-pgwatch JSON
Version 1.0.0 PyPI version JSON
download
home_pageNone
SummaryDjango app for PostgreSQL LISTEN/NOTIFY with persistence and playback
upload_time2025-08-13 20:12:52
maintainerNone
docs_urlNone
authorNone
requires_python>=3.9
licenseMIT
keywords django postgresql notify listen database triggers
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # django-pgwatch

A Django app that provides PostgreSQL LISTEN/NOTIFY functionality with persistence and playback capabilities. This solves the problem of missed notifications when consumers are disconnected by storing all notifications in a database table and providing automatic playback functionality.

## Table of Contents

- [Quick Start](#quick-start)
- [Features](#features)
- [Installation](#installation)
- [Basic Usage](#basic-usage)
  - [Sending Notifications](#1-sending-notifications)
  - [Creating a Consumer](#2-creating-a-consumer)
  - [Running Consumers](#3-running-consumers)
- [Database Change Notifications](#database-change-notifications)
- [Management Commands](#management-commands)
- [Common Patterns](#common-patterns)
- [Advanced Usage](#advanced-usage)
- [Admin Interface](#admin-interface)
- [Architecture & Performance](#architecture--performance)
- [Testing](#testing)

## Quick Start

**Get up and running in 3 steps:**

1. **Install and migrate:**
   ```bash
   # Add 'django_pgwatch' to INSTALLED_APPS
   python manage.py migrate django_pgwatch
   ```

2. **Send a notification:**
   ```python
   from django_pgwatch.utils import smart_notify
   smart_notify('my_channel', {'event': 'test', 'user_id': 123})
   ```

3. **Create and run a consumer:**
   ```python
   # myapp/consumers.py
   from django_pgwatch.consumer import BaseConsumer, NotificationHandler
   
   class MyConsumer(BaseConsumer):
       consumer_id = 'my_consumer'
       channels = ['my_channel']
       
       def handle_notification(self, handler: NotificationHandler):
           print(f"Received: {handler.data}")
   ```
   
   ```bash
   python manage.py pgwatch_listen
   ```

**That's it!** Your consumer will process all notifications and stay running for new ones.

## Features

- **Guaranteed Delivery**: All notifications are persisted to a database table
- **Playback Capability**: Consumers can catch up on missed notifications after reconnecting
- **Multiple Consumers**: Track which consumers have processed each notification
- **Large Payload Support**: Automatically handles payloads larger than PostgreSQL's 8KB limit
- **Gap Detection**: Automatically detects and fills missed notifications during operation
- **Consumer Management**: Track consumer progress and handle consumer-specific processing
- **Django Integration**: Native Django models, admin interface, and management commands

## Installation

1. Add `django_pgwatch` to your `INSTALLED_APPS`:
   ```python
   INSTALLED_APPS = [
       # ... other apps
       'django_pgwatch',
   ]
   ```

2. Run migrations (automatically installs PostgreSQL functions):
   ```bash
   python manage.py migrate django_pgwatch
   ```

## Basic Usage

### 1. Sending Notifications

```python
from django_pgwatch.utils import smart_notify

# Send a simple notification
notification_log_id = smart_notify('my_channel', {
    'event_type': 'user_login',
    'user_id': 123,
    'timestamp': '2024-01-01T10:00:00Z'
})

# The notification is automatically persisted and sent via PostgreSQL NOTIFY
```

### 2. Creating a Consumer

Create a `consumers.py` file in your Django app:

```python
# myapp/consumers.py
from django_pgwatch.consumer import BaseConsumer, NotificationHandler

class MyConsumer(BaseConsumer):
    # Class attributes for auto-discovery
    consumer_id = 'my_consumer'
    channels = ['my_channel', 'data_change']
    
    def handle_notification(self, handler: NotificationHandler):
        print(f"Received: {handler.data}")
        
        # Access notification details
        print(f"Notification Log ID: {handler.notification_log_id}")
        print(f"Channel: {handler.channel}")
        print(f"Is replay: {handler.is_replay}")
        
        # For database change notifications
        if handler.get_table() == 'users':
            if handler.is_insert():
                print(f"New user: {handler.get_new_data()}")
            elif handler.is_update():
                print(f"User updated: {handler.get_old_data()} -> {handler.get_new_data()}")
            elif handler.is_delete():
                print(f"User deleted: {handler.get_old_data()}")
```

### 3. Running Consumers

The management command automatically discovers all consumers from your `INSTALLED_APPS`:

```bash
# Run all discovered consumers
python manage.py pgwatch_listen

# List all discoverable consumers
python manage.py pgwatch_listen --list-consumers

# Run consumers from specific apps only
python manage.py pgwatch_listen --apps myapp otherapp

# Run specific consumers by ID
python manage.py pgwatch_listen --consumers my_consumer webhook_sender

# Exclude specific consumers
python manage.py pgwatch_listen --exclude-consumers heavy_processor
```

## Database Change Notifications

### Setting up Triggers

The app provides a trigger function that automatically sends notifications for database changes.

#### Recommended Approach: Django Migrations (Preferred)

The best way to create database triggers is using Django migrations with `RunSQL`:

```python
# In your app's migration file (e.g., migrations/0002_create_triggers.py)
from django.db import migrations

class Migration(migrations.Migration):
    dependencies = [
        ('your_app', '0001_initial'),
        # Ensure notify_data_change() function exists
        ('django_pgwatch', '0002_install_pg_functions'),
    ]

    operations = [
        migrations.RunSQL(
            # Forward migration - create trigger
            sql="""
            CREATE TRIGGER notify_users_changes
                AFTER INSERT OR UPDATE OR DELETE ON users
                FOR EACH ROW 
                EXECUTE FUNCTION notify_data_change();
            """,
            # Reverse migration - drop trigger
            reverse_sql="""
            DROP TRIGGER IF EXISTS notify_users_changes ON users;
            """
        ),
    ]
```

**Benefits of using migrations:**
- ✅ Automatic deployment with `python manage.py migrate`
- ✅ Version controlled and reversible
- ✅ Consistent across environments
- ✅ No manual intervention required

#### Alternative Approaches

**Direct SQL:**
```sql
-- Create a trigger on any table
CREATE TRIGGER notify_users_changes
    AFTER INSERT OR UPDATE OR DELETE ON users
    FOR EACH ROW EXECUTE FUNCTION notify_data_change();
```

**Python helper function:**
```python
from django_pgwatch.examples import create_trigger_for_table

# Create trigger for the users table
create_trigger_for_table('users')
create_trigger_for_table('orders', 'order_events')  # Custom channel
```

**Note:** The Python helper and direct SQL approaches require manual execution and are not version controlled. Use migrations for production applications.

### Consuming Database Changes

```python
# myapp/consumers.py
from django_pgwatch.consumer import BaseConsumer, NotificationHandler

class DatabaseChangeConsumer(BaseConsumer):
    consumer_id = 'database_changes'
    channels = ['data_change']
    
    def handle_notification(self, handler: NotificationHandler):
        table = handler.get_table()
        action = handler.get_action()  # INSERT, UPDATE, DELETE
        
        if table == 'users' and action == 'INSERT':
            user_data = handler.get_new_data()
            self.send_welcome_email(user_data)
        elif table == 'orders' and action == 'UPDATE':
            old_data = handler.get_old_data()
            new_data = handler.get_new_data()
            
            if old_data['status'] != new_data['status']:
                self.send_status_update(new_data)
```

## Common Patterns

### Cache Invalidation

```python
class CacheInvalidationConsumer(BaseConsumer):
    consumer_id = 'cache_invalidator'
    channels = ['data_change']
    
    def handle_notification(self, handler: NotificationHandler):
        if handler.is_database_change():
            cache_key = f"{handler.get_table()}:{handler.get_record_id()}"
            cache.delete(cache_key)
```

### Webhook Integration

```python
class WebhookConsumer(BaseConsumer):
    consumer_id = 'webhook_sender'
    channels = ['data_change']
    
    def handle_notification(self, handler: NotificationHandler):
        requests.post(settings.WEBHOOK_URL, json=handler.data)
```

### Analytics Tracking

```python
class AnalyticsConsumer(BaseConsumer):
    consumer_id = 'analytics_processor'
    channels = ['data_change']
    
    def handle_notification(self, handler: NotificationHandler):
        analytics.track(handler.get_record_id(), handler.get_action(), handler.data)
```

## Management Commands

### pgwatch_listen

**Basic usage:**
```bash
# Run all discovered consumers
python manage.py pgwatch_listen

# List available consumers
python manage.py pgwatch_listen --list-consumers
```

**Common filtering options:**
```bash
# Run specific consumers
python manage.py pgwatch_listen --consumers webhook_sender cache_invalidator

# Run consumers from specific apps
python manage.py pgwatch_listen --apps myapp otherapp

# Exclude heavy processors
python manage.py pgwatch_listen --exclude-consumers analytics_processor
```

**Advanced options:**
- `--timeout=30`: Listening timeout in seconds
- `--max-batch-size=100`: Playback batch size
- `--skip-playback`: Skip missed notifications, only process new ones
- `--reconnect-delay=5`: Delay before reconnecting after error

### pgwatch_cleanup

```bash
# Clean up old notifications (keep 7 days)
python manage.py pgwatch_cleanup --days=7

# Preview what will be deleted
python manage.py pgwatch_cleanup --days=7 --dry-run
```

### Deployment Patterns

**Single process (default):**
```bash
python manage.py pgwatch_listen  # All consumers in one process
```

**Parallel processes:**
```bash
# Split heavy consumers into separate processes
python manage.py pgwatch_listen --consumers cache_invalidator &
python manage.py pgwatch_listen --consumers webhook_sender &
```

**Load balancing:**
```python
# Create multiple worker consumers
class Worker1Consumer(BaseConsumer):
    consumer_id = 'worker_1'
    channels = ['work_queue']
    
class Worker2Consumer(BaseConsumer):
    consumer_id = 'worker_2'
    channels = ['work_queue']
```

## Advanced Usage

### Error Handling

```python
class RobustConsumer(BaseConsumer):
    consumer_id = 'robust_consumer'
    channels = ['data_change']
    
    def handle_notification(self, handler: NotificationHandler):
        try:
            self.process_notification(handler)
        except RetryableError as e:
            logger.error(f"Retryable error: {e}")
            raise  # Will retry
        except PermanentError as e:
            logger.error(f"Permanent error: {e}")
            # Don't re-raise - marks as processed
```

### Filtering Notifications

```python
class FilteredConsumer(BaseConsumer):
    consumer_id = 'filtered_consumer'
    channels = ['data_change']
    
    def handle_notification(self, handler: NotificationHandler):
        # Filter by table
        if handler.get_table() not in ['users', 'orders']:
            return
        
        # Skip replayed notifications
        if handler.is_replay:
            return
            
        self.process_notification(handler)
```

### Custom Notifications

```python
from django_pgwatch.examples import send_custom_notification

send_custom_notification('user_events', 'password_reset', {
    'user_id': 123,
    'ip_address': '192.168.1.1',
    'requested_at': '2024-01-01T10:00:00Z'
})
```

## Admin Interface

The app provides a Django admin interface for monitoring notifications:

- View all notification logs
- See which consumers have processed each notification
- Clean up old notifications
- Reprocess notifications (clear consumer tracking)
- View summary statistics

Access at `/admin/django_pgwatch/notificationlog/`

### Database Functions

**smart_notify(channel_name, payload_data)** - Persists and sends notifications:
```sql
SELECT smart_notify('my_channel', '{"event": "test"}'::jsonb);
```

**notify_data_change()** - Trigger function for database changes:
```sql
CREATE TRIGGER my_table_changes
    AFTER INSERT OR UPDATE OR DELETE ON my_table
    FOR EACH ROW EXECUTE FUNCTION notify_data_change();
```

## Architecture & Performance

### How It Works

1. **Notification Storage**: All notifications stored in `notification_log` table
2. **Consumer Tracking**: Each consumer tracks processed notifications
3. **Playback**: On startup, process missed notifications
4. **Real-time**: Listen for new notifications via PostgreSQL LISTEN/NOTIFY
5. **Gap Detection**: Automatic detection of missed notifications

### Performance Features

- **Batch Processing**: Configurable batch sizes for playback
- **Parallel Processing**: Multiple consumers process simultaneously
- **Database Optimization**: Indexes on channel and timestamps
- **Large Payloads**: Automatic handling of payloads >8KB
- **Cleanup**: Configurable retention periods

### Monitoring

- Django admin interface for notification tracking
- Consumer progress and error monitoring
- Database table size monitoring
- Alerting for unprocessed notifications

## Testing

### TODO: Test Coverage Needed

The following areas need comprehensive test coverage:

**Auto-Discovery Tests:**
- [ ] Test consumer discovery from multiple apps
- [ ] Test filtering by app names (`--apps`)
- [ ] Test filtering by consumer IDs (`--consumers`) 
- [ ] Test excluding consumers (`--exclude-consumers`)
- [ ] Test error handling for non-existent apps/consumers
- [ ] Test `--list-consumers` output formatting

**Consumer Base Class Tests:**
- [ ] Test BaseConsumer class attribute inheritance
- [ ] Test constructor parameter override of class attributes
- [ ] Test validation error when no consumer_id provided
- [ ] Test channel configuration precedence

**Database Integration Tests:**
- [ ] Test trigger creation and notification sending
- [ ] Test consumer playback of missed notifications
- [ ] Test real-time notification processing
- [ ] Test consumer restart and gap detection
- [ ] Test large payload handling

**Error Handling Tests:**
- [ ] Test consumer exception handling (retryable vs permanent)
- [ ] Test database connection failures and reconnection
- [ ] Test malformed notification payloads
- [ ] Test consumer shutdown and cleanup

**Management Command Tests:**
- [ ] Test command argument parsing and validation
- [ ] Test signal handling for graceful shutdown
- [ ] Test consumer process lifecycle management
- [ ] Test output formatting and logging

**Multi-Consumer Tests:**
- [ ] Test multiple consumers processing same notifications
- [ ] Test consumer isolation (one failure doesn't affect others)
- [ ] Test consumer-specific progress tracking
- [ ] Test parallel vs sequential execution modes

## Contributing

This is an internal Avela Education tool. For issues or feature requests, contact the development team.

## License

Internal use only - Avela Education

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "django-pgwatch",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.9",
    "maintainer_email": null,
    "keywords": "django, postgresql, notify, listen, database, triggers",
    "author": null,
    "author_email": "Ed Menendez <ed@edmenendez.com>",
    "download_url": "https://files.pythonhosted.org/packages/1b/7f/fe741c728db100985a85691447434c566ae38d2364b5ddb195e98b9a7340/django_pgwatch-1.0.0.tar.gz",
    "platform": null,
    "description": "# django-pgwatch\n\nA Django app that provides PostgreSQL LISTEN/NOTIFY functionality with persistence and playback capabilities. This solves the problem of missed notifications when consumers are disconnected by storing all notifications in a database table and providing automatic playback functionality.\n\n## Table of Contents\n\n- [Quick Start](#quick-start)\n- [Features](#features)\n- [Installation](#installation)\n- [Basic Usage](#basic-usage)\n  - [Sending Notifications](#1-sending-notifications)\n  - [Creating a Consumer](#2-creating-a-consumer)\n  - [Running Consumers](#3-running-consumers)\n- [Database Change Notifications](#database-change-notifications)\n- [Management Commands](#management-commands)\n- [Common Patterns](#common-patterns)\n- [Advanced Usage](#advanced-usage)\n- [Admin Interface](#admin-interface)\n- [Architecture & Performance](#architecture--performance)\n- [Testing](#testing)\n\n## Quick Start\n\n**Get up and running in 3 steps:**\n\n1. **Install and migrate:**\n   ```bash\n   # Add 'django_pgwatch' to INSTALLED_APPS\n   python manage.py migrate django_pgwatch\n   ```\n\n2. **Send a notification:**\n   ```python\n   from django_pgwatch.utils import smart_notify\n   smart_notify('my_channel', {'event': 'test', 'user_id': 123})\n   ```\n\n3. **Create and run a consumer:**\n   ```python\n   # myapp/consumers.py\n   from django_pgwatch.consumer import BaseConsumer, NotificationHandler\n   \n   class MyConsumer(BaseConsumer):\n       consumer_id = 'my_consumer'\n       channels = ['my_channel']\n       \n       def handle_notification(self, handler: NotificationHandler):\n           print(f\"Received: {handler.data}\")\n   ```\n   \n   ```bash\n   python manage.py pgwatch_listen\n   ```\n\n**That's it!** Your consumer will process all notifications and stay running for new ones.\n\n## Features\n\n- **Guaranteed Delivery**: All notifications are persisted to a database table\n- **Playback Capability**: Consumers can catch up on missed notifications after reconnecting\n- **Multiple Consumers**: Track which consumers have processed each notification\n- **Large Payload Support**: Automatically handles payloads larger than PostgreSQL's 8KB limit\n- **Gap Detection**: Automatically detects and fills missed notifications during operation\n- **Consumer Management**: Track consumer progress and handle consumer-specific processing\n- **Django Integration**: Native Django models, admin interface, and management commands\n\n## Installation\n\n1. Add `django_pgwatch` to your `INSTALLED_APPS`:\n   ```python\n   INSTALLED_APPS = [\n       # ... other apps\n       'django_pgwatch',\n   ]\n   ```\n\n2. Run migrations (automatically installs PostgreSQL functions):\n   ```bash\n   python manage.py migrate django_pgwatch\n   ```\n\n## Basic Usage\n\n### 1. Sending Notifications\n\n```python\nfrom django_pgwatch.utils import smart_notify\n\n# Send a simple notification\nnotification_log_id = smart_notify('my_channel', {\n    'event_type': 'user_login',\n    'user_id': 123,\n    'timestamp': '2024-01-01T10:00:00Z'\n})\n\n# The notification is automatically persisted and sent via PostgreSQL NOTIFY\n```\n\n### 2. Creating a Consumer\n\nCreate a `consumers.py` file in your Django app:\n\n```python\n# myapp/consumers.py\nfrom django_pgwatch.consumer import BaseConsumer, NotificationHandler\n\nclass MyConsumer(BaseConsumer):\n    # Class attributes for auto-discovery\n    consumer_id = 'my_consumer'\n    channels = ['my_channel', 'data_change']\n    \n    def handle_notification(self, handler: NotificationHandler):\n        print(f\"Received: {handler.data}\")\n        \n        # Access notification details\n        print(f\"Notification Log ID: {handler.notification_log_id}\")\n        print(f\"Channel: {handler.channel}\")\n        print(f\"Is replay: {handler.is_replay}\")\n        \n        # For database change notifications\n        if handler.get_table() == 'users':\n            if handler.is_insert():\n                print(f\"New user: {handler.get_new_data()}\")\n            elif handler.is_update():\n                print(f\"User updated: {handler.get_old_data()} -> {handler.get_new_data()}\")\n            elif handler.is_delete():\n                print(f\"User deleted: {handler.get_old_data()}\")\n```\n\n### 3. Running Consumers\n\nThe management command automatically discovers all consumers from your `INSTALLED_APPS`:\n\n```bash\n# Run all discovered consumers\npython manage.py pgwatch_listen\n\n# List all discoverable consumers\npython manage.py pgwatch_listen --list-consumers\n\n# Run consumers from specific apps only\npython manage.py pgwatch_listen --apps myapp otherapp\n\n# Run specific consumers by ID\npython manage.py pgwatch_listen --consumers my_consumer webhook_sender\n\n# Exclude specific consumers\npython manage.py pgwatch_listen --exclude-consumers heavy_processor\n```\n\n## Database Change Notifications\n\n### Setting up Triggers\n\nThe app provides a trigger function that automatically sends notifications for database changes.\n\n#### Recommended Approach: Django Migrations (Preferred)\n\nThe best way to create database triggers is using Django migrations with `RunSQL`:\n\n```python\n# In your app's migration file (e.g., migrations/0002_create_triggers.py)\nfrom django.db import migrations\n\nclass Migration(migrations.Migration):\n    dependencies = [\n        ('your_app', '0001_initial'),\n        # Ensure notify_data_change() function exists\n        ('django_pgwatch', '0002_install_pg_functions'),\n    ]\n\n    operations = [\n        migrations.RunSQL(\n            # Forward migration - create trigger\n            sql=\"\"\"\n            CREATE TRIGGER notify_users_changes\n                AFTER INSERT OR UPDATE OR DELETE ON users\n                FOR EACH ROW \n                EXECUTE FUNCTION notify_data_change();\n            \"\"\",\n            # Reverse migration - drop trigger\n            reverse_sql=\"\"\"\n            DROP TRIGGER IF EXISTS notify_users_changes ON users;\n            \"\"\"\n        ),\n    ]\n```\n\n**Benefits of using migrations:**\n- \u2705 Automatic deployment with `python manage.py migrate`\n- \u2705 Version controlled and reversible\n- \u2705 Consistent across environments\n- \u2705 No manual intervention required\n\n#### Alternative Approaches\n\n**Direct SQL:**\n```sql\n-- Create a trigger on any table\nCREATE TRIGGER notify_users_changes\n    AFTER INSERT OR UPDATE OR DELETE ON users\n    FOR EACH ROW EXECUTE FUNCTION notify_data_change();\n```\n\n**Python helper function:**\n```python\nfrom django_pgwatch.examples import create_trigger_for_table\n\n# Create trigger for the users table\ncreate_trigger_for_table('users')\ncreate_trigger_for_table('orders', 'order_events')  # Custom channel\n```\n\n**Note:** The Python helper and direct SQL approaches require manual execution and are not version controlled. Use migrations for production applications.\n\n### Consuming Database Changes\n\n```python\n# myapp/consumers.py\nfrom django_pgwatch.consumer import BaseConsumer, NotificationHandler\n\nclass DatabaseChangeConsumer(BaseConsumer):\n    consumer_id = 'database_changes'\n    channels = ['data_change']\n    \n    def handle_notification(self, handler: NotificationHandler):\n        table = handler.get_table()\n        action = handler.get_action()  # INSERT, UPDATE, DELETE\n        \n        if table == 'users' and action == 'INSERT':\n            user_data = handler.get_new_data()\n            self.send_welcome_email(user_data)\n        elif table == 'orders' and action == 'UPDATE':\n            old_data = handler.get_old_data()\n            new_data = handler.get_new_data()\n            \n            if old_data['status'] != new_data['status']:\n                self.send_status_update(new_data)\n```\n\n## Common Patterns\n\n### Cache Invalidation\n\n```python\nclass CacheInvalidationConsumer(BaseConsumer):\n    consumer_id = 'cache_invalidator'\n    channels = ['data_change']\n    \n    def handle_notification(self, handler: NotificationHandler):\n        if handler.is_database_change():\n            cache_key = f\"{handler.get_table()}:{handler.get_record_id()}\"\n            cache.delete(cache_key)\n```\n\n### Webhook Integration\n\n```python\nclass WebhookConsumer(BaseConsumer):\n    consumer_id = 'webhook_sender'\n    channels = ['data_change']\n    \n    def handle_notification(self, handler: NotificationHandler):\n        requests.post(settings.WEBHOOK_URL, json=handler.data)\n```\n\n### Analytics Tracking\n\n```python\nclass AnalyticsConsumer(BaseConsumer):\n    consumer_id = 'analytics_processor'\n    channels = ['data_change']\n    \n    def handle_notification(self, handler: NotificationHandler):\n        analytics.track(handler.get_record_id(), handler.get_action(), handler.data)\n```\n\n## Management Commands\n\n### pgwatch_listen\n\n**Basic usage:**\n```bash\n# Run all discovered consumers\npython manage.py pgwatch_listen\n\n# List available consumers\npython manage.py pgwatch_listen --list-consumers\n```\n\n**Common filtering options:**\n```bash\n# Run specific consumers\npython manage.py pgwatch_listen --consumers webhook_sender cache_invalidator\n\n# Run consumers from specific apps\npython manage.py pgwatch_listen --apps myapp otherapp\n\n# Exclude heavy processors\npython manage.py pgwatch_listen --exclude-consumers analytics_processor\n```\n\n**Advanced options:**\n- `--timeout=30`: Listening timeout in seconds\n- `--max-batch-size=100`: Playback batch size\n- `--skip-playback`: Skip missed notifications, only process new ones\n- `--reconnect-delay=5`: Delay before reconnecting after error\n\n### pgwatch_cleanup\n\n```bash\n# Clean up old notifications (keep 7 days)\npython manage.py pgwatch_cleanup --days=7\n\n# Preview what will be deleted\npython manage.py pgwatch_cleanup --days=7 --dry-run\n```\n\n### Deployment Patterns\n\n**Single process (default):**\n```bash\npython manage.py pgwatch_listen  # All consumers in one process\n```\n\n**Parallel processes:**\n```bash\n# Split heavy consumers into separate processes\npython manage.py pgwatch_listen --consumers cache_invalidator &\npython manage.py pgwatch_listen --consumers webhook_sender &\n```\n\n**Load balancing:**\n```python\n# Create multiple worker consumers\nclass Worker1Consumer(BaseConsumer):\n    consumer_id = 'worker_1'\n    channels = ['work_queue']\n    \nclass Worker2Consumer(BaseConsumer):\n    consumer_id = 'worker_2'\n    channels = ['work_queue']\n```\n\n## Advanced Usage\n\n### Error Handling\n\n```python\nclass RobustConsumer(BaseConsumer):\n    consumer_id = 'robust_consumer'\n    channels = ['data_change']\n    \n    def handle_notification(self, handler: NotificationHandler):\n        try:\n            self.process_notification(handler)\n        except RetryableError as e:\n            logger.error(f\"Retryable error: {e}\")\n            raise  # Will retry\n        except PermanentError as e:\n            logger.error(f\"Permanent error: {e}\")\n            # Don't re-raise - marks as processed\n```\n\n### Filtering Notifications\n\n```python\nclass FilteredConsumer(BaseConsumer):\n    consumer_id = 'filtered_consumer'\n    channels = ['data_change']\n    \n    def handle_notification(self, handler: NotificationHandler):\n        # Filter by table\n        if handler.get_table() not in ['users', 'orders']:\n            return\n        \n        # Skip replayed notifications\n        if handler.is_replay:\n            return\n            \n        self.process_notification(handler)\n```\n\n### Custom Notifications\n\n```python\nfrom django_pgwatch.examples import send_custom_notification\n\nsend_custom_notification('user_events', 'password_reset', {\n    'user_id': 123,\n    'ip_address': '192.168.1.1',\n    'requested_at': '2024-01-01T10:00:00Z'\n})\n```\n\n## Admin Interface\n\nThe app provides a Django admin interface for monitoring notifications:\n\n- View all notification logs\n- See which consumers have processed each notification\n- Clean up old notifications\n- Reprocess notifications (clear consumer tracking)\n- View summary statistics\n\nAccess at `/admin/django_pgwatch/notificationlog/`\n\n### Database Functions\n\n**smart_notify(channel_name, payload_data)** - Persists and sends notifications:\n```sql\nSELECT smart_notify('my_channel', '{\"event\": \"test\"}'::jsonb);\n```\n\n**notify_data_change()** - Trigger function for database changes:\n```sql\nCREATE TRIGGER my_table_changes\n    AFTER INSERT OR UPDATE OR DELETE ON my_table\n    FOR EACH ROW EXECUTE FUNCTION notify_data_change();\n```\n\n## Architecture & Performance\n\n### How It Works\n\n1. **Notification Storage**: All notifications stored in `notification_log` table\n2. **Consumer Tracking**: Each consumer tracks processed notifications\n3. **Playback**: On startup, process missed notifications\n4. **Real-time**: Listen for new notifications via PostgreSQL LISTEN/NOTIFY\n5. **Gap Detection**: Automatic detection of missed notifications\n\n### Performance Features\n\n- **Batch Processing**: Configurable batch sizes for playback\n- **Parallel Processing**: Multiple consumers process simultaneously\n- **Database Optimization**: Indexes on channel and timestamps\n- **Large Payloads**: Automatic handling of payloads >8KB\n- **Cleanup**: Configurable retention periods\n\n### Monitoring\n\n- Django admin interface for notification tracking\n- Consumer progress and error monitoring\n- Database table size monitoring\n- Alerting for unprocessed notifications\n\n## Testing\n\n### TODO: Test Coverage Needed\n\nThe following areas need comprehensive test coverage:\n\n**Auto-Discovery Tests:**\n- [ ] Test consumer discovery from multiple apps\n- [ ] Test filtering by app names (`--apps`)\n- [ ] Test filtering by consumer IDs (`--consumers`) \n- [ ] Test excluding consumers (`--exclude-consumers`)\n- [ ] Test error handling for non-existent apps/consumers\n- [ ] Test `--list-consumers` output formatting\n\n**Consumer Base Class Tests:**\n- [ ] Test BaseConsumer class attribute inheritance\n- [ ] Test constructor parameter override of class attributes\n- [ ] Test validation error when no consumer_id provided\n- [ ] Test channel configuration precedence\n\n**Database Integration Tests:**\n- [ ] Test trigger creation and notification sending\n- [ ] Test consumer playback of missed notifications\n- [ ] Test real-time notification processing\n- [ ] Test consumer restart and gap detection\n- [ ] Test large payload handling\n\n**Error Handling Tests:**\n- [ ] Test consumer exception handling (retryable vs permanent)\n- [ ] Test database connection failures and reconnection\n- [ ] Test malformed notification payloads\n- [ ] Test consumer shutdown and cleanup\n\n**Management Command Tests:**\n- [ ] Test command argument parsing and validation\n- [ ] Test signal handling for graceful shutdown\n- [ ] Test consumer process lifecycle management\n- [ ] Test output formatting and logging\n\n**Multi-Consumer Tests:**\n- [ ] Test multiple consumers processing same notifications\n- [ ] Test consumer isolation (one failure doesn't affect others)\n- [ ] Test consumer-specific progress tracking\n- [ ] Test parallel vs sequential execution modes\n\n## Contributing\n\nThis is an internal Avela Education tool. For issues or feature requests, contact the development team.\n\n## License\n\nInternal use only - Avela Education\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Django app for PostgreSQL LISTEN/NOTIFY with persistence and playback",
    "version": "1.0.0",
    "project_urls": {
        "Bug Tracker": "https://github.com/edmenendez/django-pgwatch/issues",
        "Documentation": "https://github.com/edmenendez/django-pgwatch#readme",
        "Homepage": "https://github.com/edmenendez/django-pgwatch",
        "Repository": "https://github.com/edmenendez/django-pgwatch"
    },
    "split_keywords": [
        "django",
        " postgresql",
        " notify",
        " listen",
        " database",
        " triggers"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "83bdc081f8425fb1bfd6956dc78fb3b51cbdf930bb24647b789bb0e1f93d63da",
                "md5": "2a674dd90a82427fe9046150ced3c5db",
                "sha256": "ed4b23f75efd7a8dfc1b12de8a4898334d1fb20552e16031657ec7be3e11a8b3"
            },
            "downloads": -1,
            "filename": "django_pgwatch-1.0.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "2a674dd90a82427fe9046150ced3c5db",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.9",
            "size": 25406,
            "upload_time": "2025-08-13T20:12:51",
            "upload_time_iso_8601": "2025-08-13T20:12:51.732388Z",
            "url": "https://files.pythonhosted.org/packages/83/bd/c081f8425fb1bfd6956dc78fb3b51cbdf930bb24647b789bb0e1f93d63da/django_pgwatch-1.0.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "1b7ffe741c728db100985a85691447434c566ae38d2364b5ddb195e98b9a7340",
                "md5": "356ca8a2b3fe9989e92e0d0d40290f38",
                "sha256": "c20b1f66993c67529ac5ff9a4d193808bf6176fc728b9e0f48f0d9132ac79181"
            },
            "downloads": -1,
            "filename": "django_pgwatch-1.0.0.tar.gz",
            "has_sig": false,
            "md5_digest": "356ca8a2b3fe9989e92e0d0d40290f38",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.9",
            "size": 30172,
            "upload_time": "2025-08-13T20:12:52",
            "upload_time_iso_8601": "2025-08-13T20:12:52.873540Z",
            "url": "https://files.pythonhosted.org/packages/1b/7f/fe741c728db100985a85691447434c566ae38d2364b5ddb195e98b9a7340/django_pgwatch-1.0.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-08-13 20:12:52",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "edmenendez",
    "github_project": "django-pgwatch",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "django-pgwatch"
}
        
Elapsed time: 0.98614s