# django-pgwatch
A Django app that provides PostgreSQL LISTEN/NOTIFY functionality with persistence and playback capabilities. This solves the problem of missed notifications when consumers are disconnected by storing all notifications in a database table and providing automatic playback functionality.
## Table of Contents
- [Quick Start](#quick-start)
- [Features](#features)
- [Installation](#installation)
- [Basic Usage](#basic-usage)
- [Sending Notifications](#1-sending-notifications)
- [Creating a Consumer](#2-creating-a-consumer)
- [Running Consumers](#3-running-consumers)
- [Database Change Notifications](#database-change-notifications)
- [Management Commands](#management-commands)
- [Common Patterns](#common-patterns)
- [Advanced Usage](#advanced-usage)
- [Admin Interface](#admin-interface)
- [Architecture & Performance](#architecture--performance)
- [Testing](#testing)
## Quick Start
**Get up and running in 3 steps:**
1. **Install and migrate:**
```bash
# Add 'django_pgwatch' to INSTALLED_APPS
python manage.py migrate django_pgwatch
```
2. **Send a notification:**
```python
from django_pgwatch.utils import smart_notify
smart_notify('my_channel', {'event': 'test', 'user_id': 123})
```
3. **Create and run a consumer:**
```python
# myapp/consumers.py
from django_pgwatch.consumer import BaseConsumer, NotificationHandler
class MyConsumer(BaseConsumer):
consumer_id = 'my_consumer'
channels = ['my_channel']
def handle_notification(self, handler: NotificationHandler):
print(f"Received: {handler.data}")
```
```bash
python manage.py pgwatch_listen
```
**That's it!** Your consumer will process all notifications and stay running for new ones.
## Features
- **Guaranteed Delivery**: All notifications are persisted to a database table
- **Playback Capability**: Consumers can catch up on missed notifications after reconnecting
- **Multiple Consumers**: Track which consumers have processed each notification
- **Large Payload Support**: Automatically handles payloads larger than PostgreSQL's 8KB limit
- **Gap Detection**: Automatically detects and fills missed notifications during operation
- **Consumer Management**: Track consumer progress and handle consumer-specific processing
- **Django Integration**: Native Django models, admin interface, and management commands
## Installation
1. Add `django_pgwatch` to your `INSTALLED_APPS`:
```python
INSTALLED_APPS = [
# ... other apps
'django_pgwatch',
]
```
2. Run migrations (automatically installs PostgreSQL functions):
```bash
python manage.py migrate django_pgwatch
```
## Basic Usage
### 1. Sending Notifications
```python
from django_pgwatch.utils import smart_notify
# Send a simple notification
notification_log_id = smart_notify('my_channel', {
'event_type': 'user_login',
'user_id': 123,
'timestamp': '2024-01-01T10:00:00Z'
})
# The notification is automatically persisted and sent via PostgreSQL NOTIFY
```
### 2. Creating a Consumer
Create a `consumers.py` file in your Django app:
```python
# myapp/consumers.py
from django_pgwatch.consumer import BaseConsumer, NotificationHandler
class MyConsumer(BaseConsumer):
# Class attributes for auto-discovery
consumer_id = 'my_consumer'
channels = ['my_channel', 'data_change']
def handle_notification(self, handler: NotificationHandler):
print(f"Received: {handler.data}")
# Access notification details
print(f"Notification Log ID: {handler.notification_log_id}")
print(f"Channel: {handler.channel}")
print(f"Is replay: {handler.is_replay}")
# For database change notifications
if handler.get_table() == 'users':
if handler.is_insert():
print(f"New user: {handler.get_new_data()}")
elif handler.is_update():
print(f"User updated: {handler.get_old_data()} -> {handler.get_new_data()}")
elif handler.is_delete():
print(f"User deleted: {handler.get_old_data()}")
```
### 3. Running Consumers
The management command automatically discovers all consumers from your `INSTALLED_APPS`:
```bash
# Run all discovered consumers
python manage.py pgwatch_listen
# List all discoverable consumers
python manage.py pgwatch_listen --list-consumers
# Run consumers from specific apps only
python manage.py pgwatch_listen --apps myapp otherapp
# Run specific consumers by ID
python manage.py pgwatch_listen --consumers my_consumer webhook_sender
# Exclude specific consumers
python manage.py pgwatch_listen --exclude-consumers heavy_processor
```
## Database Change Notifications
### Setting up Triggers
The app provides a trigger function that automatically sends notifications for database changes.
#### Recommended Approach: Django Migrations (Preferred)
The best way to create database triggers is using Django migrations with `RunSQL`:
```python
# In your app's migration file (e.g., migrations/0002_create_triggers.py)
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('your_app', '0001_initial'),
# Ensure notify_data_change() function exists
('django_pgwatch', '0002_install_pg_functions'),
]
operations = [
migrations.RunSQL(
# Forward migration - create trigger
sql="""
CREATE TRIGGER notify_users_changes
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW
EXECUTE FUNCTION notify_data_change();
""",
# Reverse migration - drop trigger
reverse_sql="""
DROP TRIGGER IF EXISTS notify_users_changes ON users;
"""
),
]
```
**Benefits of using migrations:**
- ✅ Automatic deployment with `python manage.py migrate`
- ✅ Version controlled and reversible
- ✅ Consistent across environments
- ✅ No manual intervention required
#### Alternative Approaches
**Direct SQL:**
```sql
-- Create a trigger on any table
CREATE TRIGGER notify_users_changes
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE FUNCTION notify_data_change();
```
**Python helper function:**
```python
from django_pgwatch.examples import create_trigger_for_table
# Create trigger for the users table
create_trigger_for_table('users')
create_trigger_for_table('orders', 'order_events') # Custom channel
```
**Note:** The Python helper and direct SQL approaches require manual execution and are not version controlled. Use migrations for production applications.
### Consuming Database Changes
```python
# myapp/consumers.py
from django_pgwatch.consumer import BaseConsumer, NotificationHandler
class DatabaseChangeConsumer(BaseConsumer):
consumer_id = 'database_changes'
channels = ['data_change']
def handle_notification(self, handler: NotificationHandler):
table = handler.get_table()
action = handler.get_action() # INSERT, UPDATE, DELETE
if table == 'users' and action == 'INSERT':
user_data = handler.get_new_data()
self.send_welcome_email(user_data)
elif table == 'orders' and action == 'UPDATE':
old_data = handler.get_old_data()
new_data = handler.get_new_data()
if old_data['status'] != new_data['status']:
self.send_status_update(new_data)
```
## Common Patterns
### Cache Invalidation
```python
class CacheInvalidationConsumer(BaseConsumer):
consumer_id = 'cache_invalidator'
channels = ['data_change']
def handle_notification(self, handler: NotificationHandler):
if handler.is_database_change():
cache_key = f"{handler.get_table()}:{handler.get_record_id()}"
cache.delete(cache_key)
```
### Webhook Integration
```python
class WebhookConsumer(BaseConsumer):
consumer_id = 'webhook_sender'
channels = ['data_change']
def handle_notification(self, handler: NotificationHandler):
requests.post(settings.WEBHOOK_URL, json=handler.data)
```
### Analytics Tracking
```python
class AnalyticsConsumer(BaseConsumer):
consumer_id = 'analytics_processor'
channels = ['data_change']
def handle_notification(self, handler: NotificationHandler):
analytics.track(handler.get_record_id(), handler.get_action(), handler.data)
```
## Management Commands
### pgwatch_listen
**Basic usage:**
```bash
# Run all discovered consumers
python manage.py pgwatch_listen
# List available consumers
python manage.py pgwatch_listen --list-consumers
```
**Common filtering options:**
```bash
# Run specific consumers
python manage.py pgwatch_listen --consumers webhook_sender cache_invalidator
# Run consumers from specific apps
python manage.py pgwatch_listen --apps myapp otherapp
# Exclude heavy processors
python manage.py pgwatch_listen --exclude-consumers analytics_processor
```
**Advanced options:**
- `--timeout=30`: Listening timeout in seconds
- `--max-batch-size=100`: Playback batch size
- `--skip-playback`: Skip missed notifications, only process new ones
- `--reconnect-delay=5`: Delay before reconnecting after error
### pgwatch_cleanup
```bash
# Clean up old notifications (keep 7 days)
python manage.py pgwatch_cleanup --days=7
# Preview what will be deleted
python manage.py pgwatch_cleanup --days=7 --dry-run
```
### Deployment Patterns
**Single process (default):**
```bash
python manage.py pgwatch_listen # All consumers in one process
```
**Parallel processes:**
```bash
# Split heavy consumers into separate processes
python manage.py pgwatch_listen --consumers cache_invalidator &
python manage.py pgwatch_listen --consumers webhook_sender &
```
**Load balancing:**
```python
# Create multiple worker consumers
class Worker1Consumer(BaseConsumer):
consumer_id = 'worker_1'
channels = ['work_queue']
class Worker2Consumer(BaseConsumer):
consumer_id = 'worker_2'
channels = ['work_queue']
```
## Advanced Usage
### Error Handling
```python
class RobustConsumer(BaseConsumer):
consumer_id = 'robust_consumer'
channels = ['data_change']
def handle_notification(self, handler: NotificationHandler):
try:
self.process_notification(handler)
except RetryableError as e:
logger.error(f"Retryable error: {e}")
raise # Will retry
except PermanentError as e:
logger.error(f"Permanent error: {e}")
# Don't re-raise - marks as processed
```
### Filtering Notifications
```python
class FilteredConsumer(BaseConsumer):
consumer_id = 'filtered_consumer'
channels = ['data_change']
def handle_notification(self, handler: NotificationHandler):
# Filter by table
if handler.get_table() not in ['users', 'orders']:
return
# Skip replayed notifications
if handler.is_replay:
return
self.process_notification(handler)
```
### Custom Notifications
```python
from django_pgwatch.examples import send_custom_notification
send_custom_notification('user_events', 'password_reset', {
'user_id': 123,
'ip_address': '192.168.1.1',
'requested_at': '2024-01-01T10:00:00Z'
})
```
## Admin Interface
The app provides a Django admin interface for monitoring notifications:
- View all notification logs
- See which consumers have processed each notification
- Clean up old notifications
- Reprocess notifications (clear consumer tracking)
- View summary statistics
Access at `/admin/django_pgwatch/notificationlog/`
### Database Functions
**smart_notify(channel_name, payload_data)** - Persists and sends notifications:
```sql
SELECT smart_notify('my_channel', '{"event": "test"}'::jsonb);
```
**notify_data_change()** - Trigger function for database changes:
```sql
CREATE TRIGGER my_table_changes
AFTER INSERT OR UPDATE OR DELETE ON my_table
FOR EACH ROW EXECUTE FUNCTION notify_data_change();
```
## Architecture & Performance
### How It Works
1. **Notification Storage**: All notifications stored in `notification_log` table
2. **Consumer Tracking**: Each consumer tracks processed notifications
3. **Playback**: On startup, process missed notifications
4. **Real-time**: Listen for new notifications via PostgreSQL LISTEN/NOTIFY
5. **Gap Detection**: Automatic detection of missed notifications
### Performance Features
- **Batch Processing**: Configurable batch sizes for playback
- **Parallel Processing**: Multiple consumers process simultaneously
- **Database Optimization**: Indexes on channel and timestamps
- **Large Payloads**: Automatic handling of payloads >8KB
- **Cleanup**: Configurable retention periods
### Monitoring
- Django admin interface for notification tracking
- Consumer progress and error monitoring
- Database table size monitoring
- Alerting for unprocessed notifications
## Testing
### TODO: Test Coverage Needed
The following areas need comprehensive test coverage:
**Auto-Discovery Tests:**
- [ ] Test consumer discovery from multiple apps
- [ ] Test filtering by app names (`--apps`)
- [ ] Test filtering by consumer IDs (`--consumers`)
- [ ] Test excluding consumers (`--exclude-consumers`)
- [ ] Test error handling for non-existent apps/consumers
- [ ] Test `--list-consumers` output formatting
**Consumer Base Class Tests:**
- [ ] Test BaseConsumer class attribute inheritance
- [ ] Test constructor parameter override of class attributes
- [ ] Test validation error when no consumer_id provided
- [ ] Test channel configuration precedence
**Database Integration Tests:**
- [ ] Test trigger creation and notification sending
- [ ] Test consumer playback of missed notifications
- [ ] Test real-time notification processing
- [ ] Test consumer restart and gap detection
- [ ] Test large payload handling
**Error Handling Tests:**
- [ ] Test consumer exception handling (retryable vs permanent)
- [ ] Test database connection failures and reconnection
- [ ] Test malformed notification payloads
- [ ] Test consumer shutdown and cleanup
**Management Command Tests:**
- [ ] Test command argument parsing and validation
- [ ] Test signal handling for graceful shutdown
- [ ] Test consumer process lifecycle management
- [ ] Test output formatting and logging
**Multi-Consumer Tests:**
- [ ] Test multiple consumers processing same notifications
- [ ] Test consumer isolation (one failure doesn't affect others)
- [ ] Test consumer-specific progress tracking
- [ ] Test parallel vs sequential execution modes
## Contributing
This is an internal Avela Education tool. For issues or feature requests, contact the development team.
## License
Internal use only - Avela Education
Raw data
{
"_id": null,
"home_page": null,
"name": "django-pgwatch",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": "django, postgresql, notify, listen, database, triggers",
"author": null,
"author_email": "Ed Menendez <ed@edmenendez.com>",
"download_url": "https://files.pythonhosted.org/packages/1b/7f/fe741c728db100985a85691447434c566ae38d2364b5ddb195e98b9a7340/django_pgwatch-1.0.0.tar.gz",
"platform": null,
"description": "# django-pgwatch\n\nA Django app that provides PostgreSQL LISTEN/NOTIFY functionality with persistence and playback capabilities. This solves the problem of missed notifications when consumers are disconnected by storing all notifications in a database table and providing automatic playback functionality.\n\n## Table of Contents\n\n- [Quick Start](#quick-start)\n- [Features](#features)\n- [Installation](#installation)\n- [Basic Usage](#basic-usage)\n - [Sending Notifications](#1-sending-notifications)\n - [Creating a Consumer](#2-creating-a-consumer)\n - [Running Consumers](#3-running-consumers)\n- [Database Change Notifications](#database-change-notifications)\n- [Management Commands](#management-commands)\n- [Common Patterns](#common-patterns)\n- [Advanced Usage](#advanced-usage)\n- [Admin Interface](#admin-interface)\n- [Architecture & Performance](#architecture--performance)\n- [Testing](#testing)\n\n## Quick Start\n\n**Get up and running in 3 steps:**\n\n1. **Install and migrate:**\n ```bash\n # Add 'django_pgwatch' to INSTALLED_APPS\n python manage.py migrate django_pgwatch\n ```\n\n2. **Send a notification:**\n ```python\n from django_pgwatch.utils import smart_notify\n smart_notify('my_channel', {'event': 'test', 'user_id': 123})\n ```\n\n3. **Create and run a consumer:**\n ```python\n # myapp/consumers.py\n from django_pgwatch.consumer import BaseConsumer, NotificationHandler\n \n class MyConsumer(BaseConsumer):\n consumer_id = 'my_consumer'\n channels = ['my_channel']\n \n def handle_notification(self, handler: NotificationHandler):\n print(f\"Received: {handler.data}\")\n ```\n \n ```bash\n python manage.py pgwatch_listen\n ```\n\n**That's it!** Your consumer will process all notifications and stay running for new ones.\n\n## Features\n\n- **Guaranteed Delivery**: All notifications are persisted to a database table\n- **Playback Capability**: Consumers can catch up on missed notifications after reconnecting\n- **Multiple Consumers**: Track which consumers have processed each notification\n- **Large Payload Support**: Automatically handles payloads larger than PostgreSQL's 8KB limit\n- **Gap Detection**: Automatically detects and fills missed notifications during operation\n- **Consumer Management**: Track consumer progress and handle consumer-specific processing\n- **Django Integration**: Native Django models, admin interface, and management commands\n\n## Installation\n\n1. Add `django_pgwatch` to your `INSTALLED_APPS`:\n ```python\n INSTALLED_APPS = [\n # ... other apps\n 'django_pgwatch',\n ]\n ```\n\n2. Run migrations (automatically installs PostgreSQL functions):\n ```bash\n python manage.py migrate django_pgwatch\n ```\n\n## Basic Usage\n\n### 1. Sending Notifications\n\n```python\nfrom django_pgwatch.utils import smart_notify\n\n# Send a simple notification\nnotification_log_id = smart_notify('my_channel', {\n 'event_type': 'user_login',\n 'user_id': 123,\n 'timestamp': '2024-01-01T10:00:00Z'\n})\n\n# The notification is automatically persisted and sent via PostgreSQL NOTIFY\n```\n\n### 2. Creating a Consumer\n\nCreate a `consumers.py` file in your Django app:\n\n```python\n# myapp/consumers.py\nfrom django_pgwatch.consumer import BaseConsumer, NotificationHandler\n\nclass MyConsumer(BaseConsumer):\n # Class attributes for auto-discovery\n consumer_id = 'my_consumer'\n channels = ['my_channel', 'data_change']\n \n def handle_notification(self, handler: NotificationHandler):\n print(f\"Received: {handler.data}\")\n \n # Access notification details\n print(f\"Notification Log ID: {handler.notification_log_id}\")\n print(f\"Channel: {handler.channel}\")\n print(f\"Is replay: {handler.is_replay}\")\n \n # For database change notifications\n if handler.get_table() == 'users':\n if handler.is_insert():\n print(f\"New user: {handler.get_new_data()}\")\n elif handler.is_update():\n print(f\"User updated: {handler.get_old_data()} -> {handler.get_new_data()}\")\n elif handler.is_delete():\n print(f\"User deleted: {handler.get_old_data()}\")\n```\n\n### 3. Running Consumers\n\nThe management command automatically discovers all consumers from your `INSTALLED_APPS`:\n\n```bash\n# Run all discovered consumers\npython manage.py pgwatch_listen\n\n# List all discoverable consumers\npython manage.py pgwatch_listen --list-consumers\n\n# Run consumers from specific apps only\npython manage.py pgwatch_listen --apps myapp otherapp\n\n# Run specific consumers by ID\npython manage.py pgwatch_listen --consumers my_consumer webhook_sender\n\n# Exclude specific consumers\npython manage.py pgwatch_listen --exclude-consumers heavy_processor\n```\n\n## Database Change Notifications\n\n### Setting up Triggers\n\nThe app provides a trigger function that automatically sends notifications for database changes.\n\n#### Recommended Approach: Django Migrations (Preferred)\n\nThe best way to create database triggers is using Django migrations with `RunSQL`:\n\n```python\n# In your app's migration file (e.g., migrations/0002_create_triggers.py)\nfrom django.db import migrations\n\nclass Migration(migrations.Migration):\n dependencies = [\n ('your_app', '0001_initial'),\n # Ensure notify_data_change() function exists\n ('django_pgwatch', '0002_install_pg_functions'),\n ]\n\n operations = [\n migrations.RunSQL(\n # Forward migration - create trigger\n sql=\"\"\"\n CREATE TRIGGER notify_users_changes\n AFTER INSERT OR UPDATE OR DELETE ON users\n FOR EACH ROW \n EXECUTE FUNCTION notify_data_change();\n \"\"\",\n # Reverse migration - drop trigger\n reverse_sql=\"\"\"\n DROP TRIGGER IF EXISTS notify_users_changes ON users;\n \"\"\"\n ),\n ]\n```\n\n**Benefits of using migrations:**\n- \u2705 Automatic deployment with `python manage.py migrate`\n- \u2705 Version controlled and reversible\n- \u2705 Consistent across environments\n- \u2705 No manual intervention required\n\n#### Alternative Approaches\n\n**Direct SQL:**\n```sql\n-- Create a trigger on any table\nCREATE TRIGGER notify_users_changes\n AFTER INSERT OR UPDATE OR DELETE ON users\n FOR EACH ROW EXECUTE FUNCTION notify_data_change();\n```\n\n**Python helper function:**\n```python\nfrom django_pgwatch.examples import create_trigger_for_table\n\n# Create trigger for the users table\ncreate_trigger_for_table('users')\ncreate_trigger_for_table('orders', 'order_events') # Custom channel\n```\n\n**Note:** The Python helper and direct SQL approaches require manual execution and are not version controlled. Use migrations for production applications.\n\n### Consuming Database Changes\n\n```python\n# myapp/consumers.py\nfrom django_pgwatch.consumer import BaseConsumer, NotificationHandler\n\nclass DatabaseChangeConsumer(BaseConsumer):\n consumer_id = 'database_changes'\n channels = ['data_change']\n \n def handle_notification(self, handler: NotificationHandler):\n table = handler.get_table()\n action = handler.get_action() # INSERT, UPDATE, DELETE\n \n if table == 'users' and action == 'INSERT':\n user_data = handler.get_new_data()\n self.send_welcome_email(user_data)\n elif table == 'orders' and action == 'UPDATE':\n old_data = handler.get_old_data()\n new_data = handler.get_new_data()\n \n if old_data['status'] != new_data['status']:\n self.send_status_update(new_data)\n```\n\n## Common Patterns\n\n### Cache Invalidation\n\n```python\nclass CacheInvalidationConsumer(BaseConsumer):\n consumer_id = 'cache_invalidator'\n channels = ['data_change']\n \n def handle_notification(self, handler: NotificationHandler):\n if handler.is_database_change():\n cache_key = f\"{handler.get_table()}:{handler.get_record_id()}\"\n cache.delete(cache_key)\n```\n\n### Webhook Integration\n\n```python\nclass WebhookConsumer(BaseConsumer):\n consumer_id = 'webhook_sender'\n channels = ['data_change']\n \n def handle_notification(self, handler: NotificationHandler):\n requests.post(settings.WEBHOOK_URL, json=handler.data)\n```\n\n### Analytics Tracking\n\n```python\nclass AnalyticsConsumer(BaseConsumer):\n consumer_id = 'analytics_processor'\n channels = ['data_change']\n \n def handle_notification(self, handler: NotificationHandler):\n analytics.track(handler.get_record_id(), handler.get_action(), handler.data)\n```\n\n## Management Commands\n\n### pgwatch_listen\n\n**Basic usage:**\n```bash\n# Run all discovered consumers\npython manage.py pgwatch_listen\n\n# List available consumers\npython manage.py pgwatch_listen --list-consumers\n```\n\n**Common filtering options:**\n```bash\n# Run specific consumers\npython manage.py pgwatch_listen --consumers webhook_sender cache_invalidator\n\n# Run consumers from specific apps\npython manage.py pgwatch_listen --apps myapp otherapp\n\n# Exclude heavy processors\npython manage.py pgwatch_listen --exclude-consumers analytics_processor\n```\n\n**Advanced options:**\n- `--timeout=30`: Listening timeout in seconds\n- `--max-batch-size=100`: Playback batch size\n- `--skip-playback`: Skip missed notifications, only process new ones\n- `--reconnect-delay=5`: Delay before reconnecting after error\n\n### pgwatch_cleanup\n\n```bash\n# Clean up old notifications (keep 7 days)\npython manage.py pgwatch_cleanup --days=7\n\n# Preview what will be deleted\npython manage.py pgwatch_cleanup --days=7 --dry-run\n```\n\n### Deployment Patterns\n\n**Single process (default):**\n```bash\npython manage.py pgwatch_listen # All consumers in one process\n```\n\n**Parallel processes:**\n```bash\n# Split heavy consumers into separate processes\npython manage.py pgwatch_listen --consumers cache_invalidator &\npython manage.py pgwatch_listen --consumers webhook_sender &\n```\n\n**Load balancing:**\n```python\n# Create multiple worker consumers\nclass Worker1Consumer(BaseConsumer):\n consumer_id = 'worker_1'\n channels = ['work_queue']\n \nclass Worker2Consumer(BaseConsumer):\n consumer_id = 'worker_2'\n channels = ['work_queue']\n```\n\n## Advanced Usage\n\n### Error Handling\n\n```python\nclass RobustConsumer(BaseConsumer):\n consumer_id = 'robust_consumer'\n channels = ['data_change']\n \n def handle_notification(self, handler: NotificationHandler):\n try:\n self.process_notification(handler)\n except RetryableError as e:\n logger.error(f\"Retryable error: {e}\")\n raise # Will retry\n except PermanentError as e:\n logger.error(f\"Permanent error: {e}\")\n # Don't re-raise - marks as processed\n```\n\n### Filtering Notifications\n\n```python\nclass FilteredConsumer(BaseConsumer):\n consumer_id = 'filtered_consumer'\n channels = ['data_change']\n \n def handle_notification(self, handler: NotificationHandler):\n # Filter by table\n if handler.get_table() not in ['users', 'orders']:\n return\n \n # Skip replayed notifications\n if handler.is_replay:\n return\n \n self.process_notification(handler)\n```\n\n### Custom Notifications\n\n```python\nfrom django_pgwatch.examples import send_custom_notification\n\nsend_custom_notification('user_events', 'password_reset', {\n 'user_id': 123,\n 'ip_address': '192.168.1.1',\n 'requested_at': '2024-01-01T10:00:00Z'\n})\n```\n\n## Admin Interface\n\nThe app provides a Django admin interface for monitoring notifications:\n\n- View all notification logs\n- See which consumers have processed each notification\n- Clean up old notifications\n- Reprocess notifications (clear consumer tracking)\n- View summary statistics\n\nAccess at `/admin/django_pgwatch/notificationlog/`\n\n### Database Functions\n\n**smart_notify(channel_name, payload_data)** - Persists and sends notifications:\n```sql\nSELECT smart_notify('my_channel', '{\"event\": \"test\"}'::jsonb);\n```\n\n**notify_data_change()** - Trigger function for database changes:\n```sql\nCREATE TRIGGER my_table_changes\n AFTER INSERT OR UPDATE OR DELETE ON my_table\n FOR EACH ROW EXECUTE FUNCTION notify_data_change();\n```\n\n## Architecture & Performance\n\n### How It Works\n\n1. **Notification Storage**: All notifications stored in `notification_log` table\n2. **Consumer Tracking**: Each consumer tracks processed notifications\n3. **Playback**: On startup, process missed notifications\n4. **Real-time**: Listen for new notifications via PostgreSQL LISTEN/NOTIFY\n5. **Gap Detection**: Automatic detection of missed notifications\n\n### Performance Features\n\n- **Batch Processing**: Configurable batch sizes for playback\n- **Parallel Processing**: Multiple consumers process simultaneously\n- **Database Optimization**: Indexes on channel and timestamps\n- **Large Payloads**: Automatic handling of payloads >8KB\n- **Cleanup**: Configurable retention periods\n\n### Monitoring\n\n- Django admin interface for notification tracking\n- Consumer progress and error monitoring\n- Database table size monitoring\n- Alerting for unprocessed notifications\n\n## Testing\n\n### TODO: Test Coverage Needed\n\nThe following areas need comprehensive test coverage:\n\n**Auto-Discovery Tests:**\n- [ ] Test consumer discovery from multiple apps\n- [ ] Test filtering by app names (`--apps`)\n- [ ] Test filtering by consumer IDs (`--consumers`) \n- [ ] Test excluding consumers (`--exclude-consumers`)\n- [ ] Test error handling for non-existent apps/consumers\n- [ ] Test `--list-consumers` output formatting\n\n**Consumer Base Class Tests:**\n- [ ] Test BaseConsumer class attribute inheritance\n- [ ] Test constructor parameter override of class attributes\n- [ ] Test validation error when no consumer_id provided\n- [ ] Test channel configuration precedence\n\n**Database Integration Tests:**\n- [ ] Test trigger creation and notification sending\n- [ ] Test consumer playback of missed notifications\n- [ ] Test real-time notification processing\n- [ ] Test consumer restart and gap detection\n- [ ] Test large payload handling\n\n**Error Handling Tests:**\n- [ ] Test consumer exception handling (retryable vs permanent)\n- [ ] Test database connection failures and reconnection\n- [ ] Test malformed notification payloads\n- [ ] Test consumer shutdown and cleanup\n\n**Management Command Tests:**\n- [ ] Test command argument parsing and validation\n- [ ] Test signal handling for graceful shutdown\n- [ ] Test consumer process lifecycle management\n- [ ] Test output formatting and logging\n\n**Multi-Consumer Tests:**\n- [ ] Test multiple consumers processing same notifications\n- [ ] Test consumer isolation (one failure doesn't affect others)\n- [ ] Test consumer-specific progress tracking\n- [ ] Test parallel vs sequential execution modes\n\n## Contributing\n\nThis is an internal Avela Education tool. For issues or feature requests, contact the development team.\n\n## License\n\nInternal use only - Avela Education\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Django app for PostgreSQL LISTEN/NOTIFY with persistence and playback",
"version": "1.0.0",
"project_urls": {
"Bug Tracker": "https://github.com/edmenendez/django-pgwatch/issues",
"Documentation": "https://github.com/edmenendez/django-pgwatch#readme",
"Homepage": "https://github.com/edmenendez/django-pgwatch",
"Repository": "https://github.com/edmenendez/django-pgwatch"
},
"split_keywords": [
"django",
" postgresql",
" notify",
" listen",
" database",
" triggers"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "83bdc081f8425fb1bfd6956dc78fb3b51cbdf930bb24647b789bb0e1f93d63da",
"md5": "2a674dd90a82427fe9046150ced3c5db",
"sha256": "ed4b23f75efd7a8dfc1b12de8a4898334d1fb20552e16031657ec7be3e11a8b3"
},
"downloads": -1,
"filename": "django_pgwatch-1.0.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "2a674dd90a82427fe9046150ced3c5db",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 25406,
"upload_time": "2025-08-13T20:12:51",
"upload_time_iso_8601": "2025-08-13T20:12:51.732388Z",
"url": "https://files.pythonhosted.org/packages/83/bd/c081f8425fb1bfd6956dc78fb3b51cbdf930bb24647b789bb0e1f93d63da/django_pgwatch-1.0.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "1b7ffe741c728db100985a85691447434c566ae38d2364b5ddb195e98b9a7340",
"md5": "356ca8a2b3fe9989e92e0d0d40290f38",
"sha256": "c20b1f66993c67529ac5ff9a4d193808bf6176fc728b9e0f48f0d9132ac79181"
},
"downloads": -1,
"filename": "django_pgwatch-1.0.0.tar.gz",
"has_sig": false,
"md5_digest": "356ca8a2b3fe9989e92e0d0d40290f38",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 30172,
"upload_time": "2025-08-13T20:12:52",
"upload_time_iso_8601": "2025-08-13T20:12:52.873540Z",
"url": "https://files.pythonhosted.org/packages/1b/7f/fe741c728db100985a85691447434c566ae38d2364b5ddb195e98b9a7340/django_pgwatch-1.0.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-08-13 20:12:52",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "edmenendez",
"github_project": "django-pgwatch",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "django-pgwatch"
}