dramatiq_sqs_batch


Namedramatiq_sqs_batch JSON
Version 1.0.0 PyPI version JSON
download
home_pagehttps://github.com/jiayun/dramatiq_sqs_batch
SummaryA high-performance batch processing broker for AWS SQS with intelligent message splitting
upload_time2025-08-08 15:44:37
maintainerNone
docs_urlNone
authorJiayun Zhou
requires_python<4.0.0,>=3.8.1
licenseMIT
keywords aws sqs dramatiq message-queue batch-processing
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # BatchSQSBroker

A high-performance batch processing broker for AWS SQS with intelligent message splitting, retry mechanisms, and comprehensive monitoring. Optimized for handling high-volume task queues with automatic batching, exponential backoff, and thread-safe operations.

## Features

- **Intelligent Batch Processing**: Automatically batches messages up to SQS limits (10 messages, 256KB)
- **Smart Message Splitting**: Handles oversized batches with greedy algorithm optimization
- **Retry Mechanisms**: Exponential backoff with configurable retry limits
- **Thread-Safe Operations**: Concurrent message processing with proper locking
- **Comprehensive Monitoring**: Built-in metrics for observability
- **Configurable Timeouts**: Per-queue batch intervals and idle timeouts
- **Memory Management**: Buffer overflow protection with backpressure
- **Python 3.8+ Support**: Compatible with modern Python versions

## Installation

```bash
pip install dramatiq_sqs_batch
```

Or with Poetry:

```bash
poetry add dramatiq_sqs_batch
```

## Quick Start

```python
from batch_sqs_broker import BatchSQSBroker
import dramatiq

# Create broker
broker = BatchSQSBroker(
    namespace="my-app-",
    default_batch_interval=1.0,  # Max 1 second wait
    default_idle_timeout=0.1,    # 100ms idle timeout
)

# Set as default broker
dramatiq.set_broker(broker)

# Define a task
@dramatiq.actor
def my_task(data):
    print(f"Processing: {data}")

# Enqueue messages
for i in range(100):
    my_task.send(f"message-{i}")
```

## Configuration

### Basic Configuration

```python
broker = BatchSQSBroker(
    namespace="myapp-",
    default_batch_interval=2.0,    # Wait up to 2 seconds
    default_idle_timeout=0.5,      # Send after 500ms of no new messages
    batch_size=10,                 # Max 10 messages per batch (SQS limit)
    max_buffer_size_per_queue=1000, # Memory protection
    max_retry_attempts=3,          # Retry failed messages 3 times
)
```

### Per-Queue Configuration

```python
broker = BatchSQSBroker(
    group_batch_intervals={
        "high_priority": 0,      # Send immediately
        "low_priority": 5.0,     # Wait up to 5 seconds
    },
    group_idle_timeouts={
        "high_priority": 0,      # No idle timeout
        "low_priority": 1.0,     # Send after 1s idle
    }
)
```

## Monitoring

```python
# Get overall metrics
metrics = broker.get_metrics()
print(metrics)

# Get specific queue status
status = broker.get_queue_status("my_queue")
print(status)
```

Example output:
```python
{
    "buffer_sizes": {"my_queue": 5},
    "failed_message_counts": {"my_queue": 0},
    "metrics": {
        "messages_sent": {"my_queue": 100},
        "messages_failed": {"my_queue": 2},
        "batch_split_count": {"my_queue": 1},
        "oversized_message_dropped": {"my_queue": 0}
    },
    "max_buffer_size_per_queue": 5000,
    "max_retry_attempts": 3,
    "background_thread_alive": True
}
```

## Advanced Features

### Manual Queue Management

```python
# Force flush a specific queue
broker.force_flush_queue("urgent_queue")

# Clear queue buffer (emergency use)
cleared_count = broker.clear_queue_buffer("problematic_queue")

# Flush all queues
broker.flush_all()
```

### Graceful Shutdown

```python
# Proper cleanup
broker.close()
```

## How It Solves SQS Challenges

BatchSQSBroker addresses common AWS SQS limitations:

1. **256KB Batch Limit**: Intelligently splits oversized batches using greedy algorithm
2. **10 Message Limit**: Automatically chunks large message sets
3. **Infinite Retry Loops**: Distinguishes between batch-size and message-size issues
4. **Memory Leaks**: Implements buffer size limits with backpressure
5. **Thread Safety**: Uses proper locking for concurrent access
6. **Performance**: Reduces SQS API calls through intelligent batching

## Architecture

```
┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│   Application   │────│  BatchSQSBroker  │────│   AWS SQS       │
│                 │    │                  │    │                 │
│ dramatiq.send() │    │ • Buffering      │    │ • Batch API     │
│                 │    │ • Batching       │    │ • Message Queue │
│                 │    │ • Retry Logic    │    │                 │
└─────────────────┘    └──────────────────┘    └─────────────────┘
                              │
                              ▼
                      ┌──────────────────┐
                      │ Background Thread │
                      │ • Timeout Check   │
                      │ • Auto Flush      │
                      │ • Retry Failed    │
                      └──────────────────┘
```

## API Reference

### BatchSQSBroker

The main broker class that extends dramatiq's SQSBroker with batching capabilities.

#### Parameters

- `default_batch_interval` (float): Maximum wait time before sending batch (default: 1.0s)
- `default_idle_timeout` (float): Send batch after this idle time (default: 0.1s)
- `batch_size` (int): Maximum messages per batch, up to 10 (SQS limit)
- `group_batch_intervals` (dict): Per-queue batch intervals
- `group_idle_timeouts` (dict): Per-queue idle timeouts
- `max_buffer_size_per_queue` (int): Buffer size limit per queue (default: 5000)
- `max_retry_attempts` (int): Maximum retry attempts for failed messages (default: 3)

#### Methods

- `get_metrics()`: Returns comprehensive metrics dictionary
- `get_queue_status(queue_name)`: Returns detailed status for specific queue
- `flush_all()`: Immediately flush all queue buffers
- `force_flush_queue(queue_name)`: Force flush specific queue
- `clear_queue_buffer(queue_name)`: Clear buffer for emergency use
- `close()`: Gracefully shut down broker

### FailedMessage

Dataclass for tracking failed message retry information.

#### Attributes

- `entry` (dict): The SQS message entry
- `retry_count` (int): Current retry count
- `first_failure_time` (float): Timestamp of first failure
- `last_failure_time` (float): Timestamp of last failure

## Best Practices

1. **Choose appropriate batch intervals**: Balance between latency and throughput
2. **Monitor buffer sizes**: Watch for queue buffer overflows
3. **Set reasonable retry limits**: Avoid infinite retry loops
4. **Use per-queue configuration**: Different queues may need different settings
5. **Implement proper shutdown**: Always call `broker.close()` for graceful cleanup

## Error Handling

BatchSQSBroker includes robust error handling:

- **Oversized Messages**: Messages >256KB are dropped with warnings
- **Buffer Overflow**: Automatic backpressure prevents memory issues  
- **Network Errors**: Exponential backoff retry with limits
- **Thread Safety**: Proper locking prevents race conditions

## Performance Tips

- Use `default_idle_timeout` for low-latency requirements
- Set `default_batch_interval=0` for immediate sending on high-priority queues
- Monitor `batch_split_count` metric to optimize message sizes
- Adjust `max_buffer_size_per_queue` based on memory constraints

## Requirements

- Python 3.8+
- dramatiq >= 1.12.0
- dramatiq-sqs >= 0.2.0
- boto3 >= 1.20.0

## Contributing

1. Fork the repository
2. Create a feature branch
3. Make your changes
4. Add tests
5. Submit a pull request

## License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

## Changelog

### v1.0.0
- Initial release
- Intelligent batch splitting with greedy algorithm
- Retry mechanisms with exponential backoff
- Comprehensive monitoring and metrics
- Thread-safe operations
- Per-queue configuration support
- Buffer overflow protection

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/jiayun/dramatiq_sqs_batch",
    "name": "dramatiq_sqs_batch",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0.0,>=3.8.1",
    "maintainer_email": null,
    "keywords": "aws, sqs, dramatiq, message-queue, batch-processing",
    "author": "Jiayun Zhou",
    "author_email": "jiayun@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/b0/80/9ce87d7c0dd04a5b772b0b3fdf45f76fffe2399a3753f7654296a264d627/dramatiq_sqs_batch-1.0.0.tar.gz",
    "platform": null,
    "description": "# BatchSQSBroker\n\nA high-performance batch processing broker for AWS SQS with intelligent message splitting, retry mechanisms, and comprehensive monitoring. Optimized for handling high-volume task queues with automatic batching, exponential backoff, and thread-safe operations.\n\n## Features\n\n- **Intelligent Batch Processing**: Automatically batches messages up to SQS limits (10 messages, 256KB)\n- **Smart Message Splitting**: Handles oversized batches with greedy algorithm optimization\n- **Retry Mechanisms**: Exponential backoff with configurable retry limits\n- **Thread-Safe Operations**: Concurrent message processing with proper locking\n- **Comprehensive Monitoring**: Built-in metrics for observability\n- **Configurable Timeouts**: Per-queue batch intervals and idle timeouts\n- **Memory Management**: Buffer overflow protection with backpressure\n- **Python 3.8+ Support**: Compatible with modern Python versions\n\n## Installation\n\n```bash\npip install dramatiq_sqs_batch\n```\n\nOr with Poetry:\n\n```bash\npoetry add dramatiq_sqs_batch\n```\n\n## Quick Start\n\n```python\nfrom batch_sqs_broker import BatchSQSBroker\nimport dramatiq\n\n# Create broker\nbroker = BatchSQSBroker(\n    namespace=\"my-app-\",\n    default_batch_interval=1.0,  # Max 1 second wait\n    default_idle_timeout=0.1,    # 100ms idle timeout\n)\n\n# Set as default broker\ndramatiq.set_broker(broker)\n\n# Define a task\n@dramatiq.actor\ndef my_task(data):\n    print(f\"Processing: {data}\")\n\n# Enqueue messages\nfor i in range(100):\n    my_task.send(f\"message-{i}\")\n```\n\n## Configuration\n\n### Basic Configuration\n\n```python\nbroker = BatchSQSBroker(\n    namespace=\"myapp-\",\n    default_batch_interval=2.0,    # Wait up to 2 seconds\n    default_idle_timeout=0.5,      # Send after 500ms of no new messages\n    batch_size=10,                 # Max 10 messages per batch (SQS limit)\n    max_buffer_size_per_queue=1000, # Memory protection\n    max_retry_attempts=3,          # Retry failed messages 3 times\n)\n```\n\n### Per-Queue Configuration\n\n```python\nbroker = BatchSQSBroker(\n    group_batch_intervals={\n        \"high_priority\": 0,      # Send immediately\n        \"low_priority\": 5.0,     # Wait up to 5 seconds\n    },\n    group_idle_timeouts={\n        \"high_priority\": 0,      # No idle timeout\n        \"low_priority\": 1.0,     # Send after 1s idle\n    }\n)\n```\n\n## Monitoring\n\n```python\n# Get overall metrics\nmetrics = broker.get_metrics()\nprint(metrics)\n\n# Get specific queue status\nstatus = broker.get_queue_status(\"my_queue\")\nprint(status)\n```\n\nExample output:\n```python\n{\n    \"buffer_sizes\": {\"my_queue\": 5},\n    \"failed_message_counts\": {\"my_queue\": 0},\n    \"metrics\": {\n        \"messages_sent\": {\"my_queue\": 100},\n        \"messages_failed\": {\"my_queue\": 2},\n        \"batch_split_count\": {\"my_queue\": 1},\n        \"oversized_message_dropped\": {\"my_queue\": 0}\n    },\n    \"max_buffer_size_per_queue\": 5000,\n    \"max_retry_attempts\": 3,\n    \"background_thread_alive\": True\n}\n```\n\n## Advanced Features\n\n### Manual Queue Management\n\n```python\n# Force flush a specific queue\nbroker.force_flush_queue(\"urgent_queue\")\n\n# Clear queue buffer (emergency use)\ncleared_count = broker.clear_queue_buffer(\"problematic_queue\")\n\n# Flush all queues\nbroker.flush_all()\n```\n\n### Graceful Shutdown\n\n```python\n# Proper cleanup\nbroker.close()\n```\n\n## How It Solves SQS Challenges\n\nBatchSQSBroker addresses common AWS SQS limitations:\n\n1. **256KB Batch Limit**: Intelligently splits oversized batches using greedy algorithm\n2. **10 Message Limit**: Automatically chunks large message sets\n3. **Infinite Retry Loops**: Distinguishes between batch-size and message-size issues\n4. **Memory Leaks**: Implements buffer size limits with backpressure\n5. **Thread Safety**: Uses proper locking for concurrent access\n6. **Performance**: Reduces SQS API calls through intelligent batching\n\n## Architecture\n\n```\n\u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510    \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510    \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510\n\u2502   Application   \u2502\u2500\u2500\u2500\u2500\u2502  BatchSQSBroker  \u2502\u2500\u2500\u2500\u2500\u2502   AWS SQS       \u2502\n\u2502                 \u2502    \u2502                  \u2502    \u2502                 \u2502\n\u2502 dramatiq.send() \u2502    \u2502 \u2022 Buffering      \u2502    \u2502 \u2022 Batch API     \u2502\n\u2502                 \u2502    \u2502 \u2022 Batching       \u2502    \u2502 \u2022 Message Queue \u2502\n\u2502                 \u2502    \u2502 \u2022 Retry Logic    \u2502    \u2502                 \u2502\n\u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518    \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518    \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518\n                              \u2502\n                              \u25bc\n                      \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510\n                      \u2502 Background Thread \u2502\n                      \u2502 \u2022 Timeout Check   \u2502\n                      \u2502 \u2022 Auto Flush      \u2502\n                      \u2502 \u2022 Retry Failed    \u2502\n                      \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518\n```\n\n## API Reference\n\n### BatchSQSBroker\n\nThe main broker class that extends dramatiq's SQSBroker with batching capabilities.\n\n#### Parameters\n\n- `default_batch_interval` (float): Maximum wait time before sending batch (default: 1.0s)\n- `default_idle_timeout` (float): Send batch after this idle time (default: 0.1s)\n- `batch_size` (int): Maximum messages per batch, up to 10 (SQS limit)\n- `group_batch_intervals` (dict): Per-queue batch intervals\n- `group_idle_timeouts` (dict): Per-queue idle timeouts\n- `max_buffer_size_per_queue` (int): Buffer size limit per queue (default: 5000)\n- `max_retry_attempts` (int): Maximum retry attempts for failed messages (default: 3)\n\n#### Methods\n\n- `get_metrics()`: Returns comprehensive metrics dictionary\n- `get_queue_status(queue_name)`: Returns detailed status for specific queue\n- `flush_all()`: Immediately flush all queue buffers\n- `force_flush_queue(queue_name)`: Force flush specific queue\n- `clear_queue_buffer(queue_name)`: Clear buffer for emergency use\n- `close()`: Gracefully shut down broker\n\n### FailedMessage\n\nDataclass for tracking failed message retry information.\n\n#### Attributes\n\n- `entry` (dict): The SQS message entry\n- `retry_count` (int): Current retry count\n- `first_failure_time` (float): Timestamp of first failure\n- `last_failure_time` (float): Timestamp of last failure\n\n## Best Practices\n\n1. **Choose appropriate batch intervals**: Balance between latency and throughput\n2. **Monitor buffer sizes**: Watch for queue buffer overflows\n3. **Set reasonable retry limits**: Avoid infinite retry loops\n4. **Use per-queue configuration**: Different queues may need different settings\n5. **Implement proper shutdown**: Always call `broker.close()` for graceful cleanup\n\n## Error Handling\n\nBatchSQSBroker includes robust error handling:\n\n- **Oversized Messages**: Messages >256KB are dropped with warnings\n- **Buffer Overflow**: Automatic backpressure prevents memory issues  \n- **Network Errors**: Exponential backoff retry with limits\n- **Thread Safety**: Proper locking prevents race conditions\n\n## Performance Tips\n\n- Use `default_idle_timeout` for low-latency requirements\n- Set `default_batch_interval=0` for immediate sending on high-priority queues\n- Monitor `batch_split_count` metric to optimize message sizes\n- Adjust `max_buffer_size_per_queue` based on memory constraints\n\n## Requirements\n\n- Python 3.8+\n- dramatiq >= 1.12.0\n- dramatiq-sqs >= 0.2.0\n- boto3 >= 1.20.0\n\n## Contributing\n\n1. Fork the repository\n2. Create a feature branch\n3. Make your changes\n4. Add tests\n5. Submit a pull request\n\n## License\n\nThis project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.\n\n## Changelog\n\n### v1.0.0\n- Initial release\n- Intelligent batch splitting with greedy algorithm\n- Retry mechanisms with exponential backoff\n- Comprehensive monitoring and metrics\n- Thread-safe operations\n- Per-queue configuration support\n- Buffer overflow protection\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "A high-performance batch processing broker for AWS SQS with intelligent message splitting",
    "version": "1.0.0",
    "project_urls": {
        "Documentation": "https://github.com/jiayun/dramatiq_sqs_batch/blob/main/README.md",
        "Homepage": "https://github.com/jiayun/dramatiq_sqs_batch",
        "Repository": "https://github.com/jiayun/dramatiq_sqs_batch"
    },
    "split_keywords": [
        "aws",
        " sqs",
        " dramatiq",
        " message-queue",
        " batch-processing"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "1d960be50bc3ea45099f83801b1e4fc769e5db464584e28a065cb8d1b835aa0a",
                "md5": "3f6bf71c17867e4b7d085d3aad78b13a",
                "sha256": "2b050b5a855cdb327f1533c8a2893b2fedc50e1abbe0965444595285f4df2a2c"
            },
            "downloads": -1,
            "filename": "dramatiq_sqs_batch-1.0.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "3f6bf71c17867e4b7d085d3aad78b13a",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0.0,>=3.8.1",
            "size": 10560,
            "upload_time": "2025-08-08T15:44:35",
            "upload_time_iso_8601": "2025-08-08T15:44:35.845372Z",
            "url": "https://files.pythonhosted.org/packages/1d/96/0be50bc3ea45099f83801b1e4fc769e5db464584e28a065cb8d1b835aa0a/dramatiq_sqs_batch-1.0.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "b0809ce87d7c0dd04a5b772b0b3fdf45f76fffe2399a3753f7654296a264d627",
                "md5": "efea5fed6eb062e4a68d5903b5652599",
                "sha256": "7bb96cb9f545e87c85d243cb97b581e27344d938efeb43ff8911398c7c38e6f8"
            },
            "downloads": -1,
            "filename": "dramatiq_sqs_batch-1.0.0.tar.gz",
            "has_sig": false,
            "md5_digest": "efea5fed6eb062e4a68d5903b5652599",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0.0,>=3.8.1",
            "size": 12011,
            "upload_time": "2025-08-08T15:44:37",
            "upload_time_iso_8601": "2025-08-08T15:44:37.256504Z",
            "url": "https://files.pythonhosted.org/packages/b0/80/9ce87d7c0dd04a5b772b0b3fdf45f76fffe2399a3753f7654296a264d627/dramatiq_sqs_batch-1.0.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-08-08 15:44:37",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "jiayun",
    "github_project": "dramatiq_sqs_batch",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "dramatiq_sqs_batch"
}
        
Elapsed time: 0.56435s