# SimpleBroker
*A lightweight message queue backed by SQLite. No setup required, just works.*
```bash
$ pipx install simplebroker
$ broker write tasks "ship it 🚀"
$ broker read tasks
ship it 🚀
```
SimpleBroker gives you a zero-configuration message queue that runs anywhere Python runs. It's designed to be simple enough to understand in an afternoon, yet powerful enough for real work.
## Features
- **Zero configuration** - No servers, daemons, or complex setup
- **SQLite-backed** - Rock-solid reliability with true ACID guarantees
- **Concurrent safe** - Multiple processes can read/write simultaneously
- **Simple CLI** - Intuitive commands that work with pipes and scripts
- **Portable** - Each directory gets its own isolated `.broker.db`
- **Fast** - 1000+ messages/second throughput
- **Lightweight** - ~1500 lines of code, no external dependencies
## Installation
```bash
# Install with uv
uv add simplebroker
# Or with pip
pip install simplebroker
# Or with pipx for global installation (recommended)
pipx install simplebroker
```
The CLI is available as both `broker` and `simplebroker`.
**Requirements:**
- Python 3.8+
- SQLite 3.35+ (released March 2021) - required for `DELETE...RETURNING` support
## Quick Start
```bash
# Create a queue and write a message
$ broker write myqueue "Hello, World!"
# Read the message (removes it)
$ broker read myqueue
Hello, World!
# Write from stdin
$ echo "another message" | broker write myqueue -
# Read all messages at once
$ broker read myqueue --all
# Peek without removing
$ broker peek myqueue
# List all queues
$ broker list
myqueue: 3
# Broadcast to all queues
$ broker broadcast "System maintenance at 5pm"
# Clean up when done
$ broker --cleanup
```
## Command Reference
### Global Options
- `-d, --dir PATH` - Use PATH instead of current directory
- `-f, --file NAME` - Database filename or absolute path (default: `.broker.db`)
- If an absolute path is provided, the directory is extracted automatically
- Cannot be used with `-d` if the directories don't match
- `-q, --quiet` - Suppress non-error output (intended reads excepted)
- `--cleanup` - Delete the database file and exit
- `--vacuum` - Remove claimed messages and exit
- `--version` - Show version information
- `--help` - Show help message
### Commands
| Command | Description |
|---------|-------------|
| `write <queue> <message>` | Add a message to the queue |
| `write <queue> -` | Add message from stdin |
| `read <queue> [--all] [--json] [-t\|--timestamps] [--since <ts>]` | Remove and return message(s) |
| `peek <queue> [--all] [--json] [-t\|--timestamps] [--since <ts>]` | Return message(s) without removing |
| `list [--stats]` | Show queues with unclaimed messages (use `--stats` to include claimed) |
| `purge <queue>` | Delete all messages in queue |
| `purge --all` | Delete all queues |
| `broadcast <message>` | Send message to all existing queues |
#### Read/Peek Options
- `--all` - Read/peek all messages in the queue
- `--json` - Output in line-delimited JSON (ndjson) format for safe handling of special characters
- `-t, --timestamps` - Include timestamps in output
- Regular format: `<timestamp>\t<message>` (tab-separated)
- JSON format: `{"message": "...", "timestamp": <timestamp>}`
- `--since <timestamp>` - Return only messages with timestamp > the given value
- Accepts multiple formats:
- Native 64-bit timestamp as returned by `--timestamps` (e.g., `1837025672140161024`)
- ISO 8601 date/datetime (e.g., `2024-01-15`, `2024-01-15T14:30:00Z`)
- Date-only strings (`YYYY-MM-DD`) are interpreted as the beginning of that day in UTC (00:00:00Z)
- Naive datetime strings (without timezone) are assumed to be in UTC
- Unix timestamp in seconds (e.g., `1705329000` or from `date +%s`)
- Unix timestamp in milliseconds (e.g., `1705329000000`)
- **Explicit unit suffixes** (strongly recommended for scripts):
- `1705329000s` - Unix seconds
- `1705329000000ms` - Unix milliseconds
- `1705329000000000000ns` - Unix nanoseconds
- `1837025672140161024hyb` - Native hybrid timestamp
- **Best practice**: While automatic detection is convenient for interactive use, we strongly recommend using explicit unit suffixes in scripts and applications to ensure predictable behavior and future-proof your code
- **Automatic disambiguation**: Integer timestamps without suffixes are interpreted based on magnitude:
- Values < 2^44 are treated as Unix timestamps (seconds, milliseconds, or nanoseconds)
- Values ≥ 2^44 are treated as native hybrid timestamps
- This heuristic works reliably until the year ~2527
- Native format: high 44 bits are milliseconds since Unix epoch, low 20 bits are a counter
- Note: time.time() returns seconds, so the native format is `int(time.time() * 1000) << 20`
- Most effective when used with `--all` to process all new messages since a checkpoint
- Without `--all`, it finds the oldest message in the queue that is newer than `<timestamp>` and returns only that single message
### Exit Codes
- `0` - Success
- `1` - General error
- `2` - Queue is empty
## Examples
### Basic Queue Operations
```bash
# Create a work queue
$ broker write work "process customer 123"
$ broker write work "process customer 456"
# Worker processes tasks
$ while msg=$(broker read work 2>/dev/null); do
echo "Processing: $msg"
# do work...
done
```
### Using Multiple Queues
```bash
# Different queues for different purposes
$ broker write emails "send welcome to user@example.com"
$ broker write logs "2023-12-01 system started"
$ broker write metrics "cpu_usage:0.75"
$ broker list
emails: 1
logs: 1
metrics: 1
```
### Fan-out Pattern
```bash
# Send to all queues at once
$ broker broadcast "shutdown signal"
# Each worker reads from its own queue
$ broker read worker1 # -> "shutdown signal"
$ broker read worker2 # -> "shutdown signal"
```
**Note on broadcast behavior**: The `broadcast` command sends a message to all *existing* queues at the moment of execution. There's a small race window - if a new queue is created after the broadcast starts but before it completes, that queue won't receive the message. This is by design to keep the operation simple and atomic.
### Integration with Unix Tools
```bash
# Store command output
$ df -h | broker write monitoring -
$ broker peek monitoring
# Process files through a queue
$ find . -name "*.log" | while read f; do
broker write logfiles "$f"
done
# Parallel processing with xargs
$ broker read logfiles --all | xargs -P 4 -I {} process_log {}
# Use absolute paths for databases in specific locations
$ broker -f /var/lib/myapp/queue.db write tasks "backup database"
$ broker -f /var/lib/myapp/queue.db read tasks
```
### Safe Handling with JSON Output
Messages containing newlines, quotes, or other special characters can break shell pipelines. The `--json` flag provides a safe way to handle any message content:
```bash
# Problem: Messages with newlines break shell processing
$ broker write alerts "ERROR: Database connection failed\nRetrying in 5 seconds..."
$ broker read alerts | wc -l
2 # Wrong! This is one message, not two
# Solution: Use --json for safe handling
$ broker write alerts "ERROR: Database connection failed\nRetrying in 5 seconds..."
$ broker read alerts --json
{"message": "ERROR: Database connection failed\nRetrying in 5 seconds..."}
# Parse JSON safely in scripts
$ broker read alerts --json | jq -r '.message'
ERROR: Database connection failed
Retrying in 5 seconds...
# Multiple messages with --all --json (outputs ndjson)
$ broker write safe "Line 1\nLine 2"
$ broker write safe 'Message with "quotes"'
$ broker write safe "Tab\there"
$ broker read safe --all --json
{"message": "Line 1\nLine 2"}
{"message": "Message with \"quotes\""}
{"message": "Tab\there"}
# Parse each line with jq
$ broker read safe --all --json | jq -r '.message'
Line 1
Line 2
Message with "quotes"
Tab here
```
The JSON output uses line-delimited JSON (ndjson) format:
- Each message is output on its own line as: `{"message": "content"}`
- This format is streaming-friendly and works well with tools like `jq`
This is the recommended approach for handling messages that may contain special characters, as mentioned in the Security Considerations section.
### Timestamps for Message Ordering
The `-t/--timestamps` flag includes message timestamps in the output, useful for debugging and understanding message order:
```bash
# Write some messages
$ broker write events "server started"
$ broker write events "user login"
$ broker write events "file uploaded"
# View with timestamps (non-destructive peek)
$ broker peek events --all --timestamps
1837025672140161024 server started
1837025681658085376 user login
1837025689412308992 file uploaded
# Read with timestamps and JSON for parsing
$ broker read events --all --timestamps --json
{"message": "server started", "timestamp": 1837025672140161024}
{"message": "user login", "timestamp": 1837025681658085376}
{"message": "file uploaded", "timestamp": 1837025689412308992}
# Extract just timestamps with jq
$ broker peek events --all --timestamps --json | jq '.timestamp'
1837025672140161024
1837025681658085376
1837025689412308992
```
Timestamps are 64-bit hybrid values:
- High 44 bits: milliseconds since Unix epoch (equivalent to `int(time.time() * 1000)`)
- Low 20 bits: logical counter for ordering within the same millisecond
- This guarantees unique, monotonically increasing timestamps even for rapid writes
### Checkpoint-based Processing
The `--since` flag enables checkpoint-based consumption patterns, ideal for resilient processing:
```bash
# Process initial messages
$ broker write tasks "task 1"
$ broker write tasks "task 2"
# Read first task and save its timestamp
$ result=$(broker read tasks --timestamps)
$ checkpoint=$(echo "$result" | cut -f1)
$ echo "Processed: $(echo "$result" | cut -f2)"
# More tasks arrive while processing
$ broker write tasks "task 3"
$ broker write tasks "task 4"
# Resume from checkpoint - only get new messages
$ broker read tasks --all --since "$checkpoint"
task 2
task 3
task 4
# Alternative: Use human-readable timestamps
$ broker peek tasks --all --since "2024-01-15T14:30:00Z"
task 3
task 4
# Or use Unix timestamp from date command
$ broker peek tasks --all --since "$(date -d '1 hour ago' +%s)"
task 4
```
This pattern is perfect for:
- Resumable batch processing
- Fault-tolerant consumers
- Incremental data pipelines
- Distributed processing with multiple consumers
Note that simplebroker may return 0 (SUCCESS) even if no messages are returned if the
queue exists and has messages, but none match the --since filter.
### Robust Worker with Checkpointing
Here's a complete example of a resilient worker that processes messages in batches and can resume from where it left off after failures:
```bash
#!/bin/bash
# resilient-worker.sh - Process messages with checkpoint recovery
QUEUE="events"
CHECKPOINT_FILE="/var/lib/myapp/checkpoint"
BATCH_SIZE=100
# Load last checkpoint (default to 0 if first run)
if [ -f "$CHECKPOINT_FILE" ]; then
last_checkpoint=$(cat "$CHECKPOINT_FILE")
else
last_checkpoint=0
fi
echo "Starting from checkpoint: $last_checkpoint"
# Main processing loop
while true; do
# Read batch of messages since checkpoint
# Note: 'read' is destructive - it removes messages from the queue
output=$(broker read "$QUEUE" --all --json --timestamps --since "$last_checkpoint" | head -n "$BATCH_SIZE")
# Check if we got any messages
if [ -z "$output" ]; then
echo "No new messages, sleeping..."
sleep 5
continue
fi
echo "Processing new batch..."
# Process each message
echo "$output" | while IFS= read -r line; do
message=$(echo "$line" | jq -r '.message')
timestamp=$(echo "$line" | jq -r '.timestamp')
# Process the message (your business logic here)
echo "Processing: $message"
if ! process_event "$message"; then
echo "Error processing message, will retry on next run"
echo "Checkpoint remains at last successful message: $last_checkpoint"
# Exit without updating checkpoint - failed message will be reprocessed
exit 1
fi
# Atomically update checkpoint ONLY after successful processing
echo "$timestamp" > "$CHECKPOINT_FILE.tmp"
mv "$CHECKPOINT_FILE.tmp" "$CHECKPOINT_FILE"
# Update our local variable for next iteration
last_checkpoint="$timestamp"
done
echo "Batch complete, checkpoint at: $last_checkpoint"
done
```
Key features of this pattern:
- **Atomic checkpoint updates**: Uses temp file + rename for crash safety
- **Per-message checkpointing**: Updates checkpoint after each successful message (no data loss)
- **Batch processing**: Processes up to BATCH_SIZE messages at a time for efficiency
- **Failure recovery**: On error, exits without updating checkpoint so failed message is retried
- **Efficient polling**: Only queries for new messages (timestamp > checkpoint)
- **Progress tracking**: Checkpoint file persists exact progress across restarts
### Remote Queue via SSH
```bash
# Write to remote queue
$ echo "remote task" | ssh server "cd /app && broker write tasks -"
# Read from remote queue
$ ssh server "cd /app && broker read tasks"
```
## Design Philosophy
SimpleBroker follows the Unix philosophy: do one thing well. It's not trying to replace RabbitMQ or Redis - it's for when you need a queue without the complexity.
**What SimpleBroker is:**
- A simple, reliable message queue
- Perfect for scripts, cron jobs, and small services
- Easy to understand and debug
- Portable between environments
**What SimpleBroker is not:**
- A distributed message broker
- A pub/sub system
- A replacement for production message queues
- Suitable for high-frequency trading
## Technical Details
### Storage
Messages are stored in a SQLite database with Write-Ahead Logging (WAL) enabled for better concurrency. Each message is stored with:
```sql
CREATE TABLE messages (
id INTEGER PRIMARY KEY AUTOINCREMENT, -- Ensures strict FIFO ordering
queue TEXT NOT NULL,
body TEXT NOT NULL,
ts INTEGER NOT NULL -- Millisecond timestamp + hybrid logical clock
)
```
The `id` column guarantees global FIFO ordering across all processes.
Note: FIFO ordering is strictly guaranteed by the `id` column, not the timestamp.
### Concurrency
SQLite's built-in locking handles concurrent access. Multiple processes can safely read and write simultaneously. Messages are delivered **exactly once** by default using atomic `DELETE...RETURNING` operations.
**Delivery Guarantees**
- **Default behavior**: All reads (single and bulk) provide exactly-once delivery with immediate commits
- **Performance optimization**: For bulk reads (`--all`), you can trade safety for speed by setting `BROKER_READ_COMMIT_INTERVAL` to a number greater than 1 to batch messages. If a consumer crashes mid-batch, uncommitted messages remain in the queue and will be redelivered to the next consumer (at-least-once delivery).
**FIFO Ordering Guarantee:**
- **True FIFO ordering across all processes**: Messages are always read in the exact order they were written to the database, regardless of which process wrote them
- **Guaranteed by SQLite's autoincrement**: Each message receives a globally unique, monotonically increasing ID
- **No ordering ambiguity**: Even when multiple processes write simultaneously, SQLite ensures strict serialization
### Message Lifecycle and Claim Optimization
SimpleBroker uses a two-phase message lifecycle for performance optimization that's transparent to users:
1. **Claim Phase**: When messages are read, they're marked as "claimed" instead of being immediately deleted. This allows for extremely fast read operations (~3x faster) by avoiding expensive DELETE operations in the critical path.
2. **Vacuum Phase**: Claimed messages are periodically cleaned up by an automatic background process or manually via the `broker --vacuum` command. This ensures the database doesn't grow unbounded while keeping read operations fast.
This optimization is completely transparent - messages are still delivered exactly once, and from the user's perspective, a read message is gone. The cleanup happens automatically based on configurable thresholds.
**Note on `list` command**: By default, `broker list` only shows queues with unclaimed messages. To see all queues including those with only claimed messages awaiting vacuum, use `broker list --stats`. This also displays claim statistics for each queue.
### Performance
- **Throughput**: 1000+ messages/second on typical hardware
- **Latency**: <10ms for write, <10ms for read
- **Scalability**: Tested with 100k+ messages per queue
- **Read optimization**: ~3x faster reads due to the claim-based message lifecycle optimization
**Note on CLI vs Library Usage**: For CLI-only use, startup cost predominates the overall performance. If you need to process 1000+ messages per second, use the library interface directly to avoid the overhead of repeated process creation.
### Security
- Queue names are validated (alphanumeric + underscore + hyphen + period only, can't start with hyphen or period)
- Message size limited to 10MB
- Database files created with 0600 permissions
- SQL injection prevented via parameterized queries
**Security Considerations:**
- **Message bodies are not validated** - they can contain any text including newlines, control characters, and shell metacharacters
- **Shell injection risks** - When piping output to shell commands, malicious message content could execute unintended commands
- **Special characters** - Messages containing newlines or other special characters can break shell pipelines that expect single-line output
- **Recommended practice** - Always sanitize or validate message content before using it in shell commands or other security-sensitive contexts
### Environment Variables
SimpleBroker can be configured via environment variables:
- `BROKER_BUSY_TIMEOUT` - SQLite busy timeout in milliseconds (default: 5000)
- `BROKER_CACHE_MB` - SQLite page cache size in megabytes (default: 10)
- Larger cache improves performance for repeated queries and large scans
- Recommended: 10-50 MB for typical workloads, 100+ MB for heavy use
- `BROKER_SYNC_MODE` - SQLite synchronous mode: FULL, NORMAL, or OFF (default: FULL)
- `FULL`: Maximum durability, safe against power loss (default)
- `NORMAL`: ~25% faster writes, safe against app crashes, small risk on power loss
- `OFF`: Fastest but unsafe - only for testing or non-critical data
- `BROKER_READ_COMMIT_INTERVAL` - Number of messages to read before committing in `--all` mode (default: 1)
- Default of 1 provides exactly-once delivery guarantee (~10,000 messages/second)
- Increase for better performance with at-least-once delivery guarantee
- With values > 1, messages are only deleted after being successfully delivered
- Trade-off: larger batches hold database locks longer, reducing concurrency
- `BROKER_AUTO_VACUUM` - Enable automatic vacuum of claimed messages (default: true)
- When enabled, vacuum runs automatically when thresholds are exceeded
- Set to `false` to disable automatic cleanup and run `broker vacuum` manually
- `BROKER_VACUUM_THRESHOLD` - Number of claimed messages before auto-vacuum triggers (default: 10000)
- Higher values reduce vacuum frequency but use more disk space
- Lower values keep the database smaller but run vacuum more often
- `BROKER_VACUUM_BATCH_SIZE` - Number of messages to delete per vacuum batch (default: 1000)
- Larger batches are more efficient but hold locks longer
- Smaller batches are more responsive but less efficient
- `BROKER_VACUUM_LOCK_TIMEOUT` - Seconds before a vacuum lock is considered stale (default: 300)
- Prevents orphaned lock files from blocking vacuum operations
- Lock files older than this are automatically removed
## Development
SimpleBroker uses [`uv`](https://github.com/astral-sh/uv) for package management and [`ruff`](https://github.com/astral-sh/ruff) for linting and formatting.
```bash
# Clone the repository
git clone git@github.com:VanL/simplebroker.git
cd simplebroker
# Install uv if you haven't already
curl -LsSf https://astral.sh/uv/install.sh | sh
# Install all dependencies including dev extras
uv sync --all-extras
# Run tests (fast tests only, in parallel)
uv run pytest
# Run all tests including slow ones (with 1000+ subprocess spawns)
uv run pytest -m ""
# Run tests with coverage
uv run pytest --cov=simplebroker --cov-report=term-missing
# Run specific test files
uv run pytest tests/test_smoke.py
# Run tests in a single process (useful for debugging)
uv run pytest -n 0
# Lint and format code
uv run ruff check simplebroker tests # Check for issues
uv run ruff check --fix simplebroker tests # Fix auto-fixable issues
uv run ruff format simplebroker tests # Format code
# Type check
uv run mypy simplebroker
```
### Development Workflow
1. **Before committing**:
```bash
uv run ruff check --fix simplebroker tests
uv run ruff format simplebroker tests
uv run mypy simplebroker
uv run pytest
```
2. **Building packages**:
```bash
uv build # Creates wheel and sdist in dist/
```
3. **Installing locally for testing**:
```bash
uv pip install dist/simplebroker-*.whl
```
## Contributing
Contributions are welcome! Please:
1. Keep it simple - the entire codebase should stay understandable in an afternoon
2. Maintain backward compatibility
3. Add tests for new features
4. Update documentation
5. Run `uv run ruff` and `uv run pytest` before submitting PRs
### Setting up for development
```bash
# Fork and clone the repo
git clone git@github.com:VanL/simplebroker.git
cd simplebroker
# Install development environment
uv sync --all-extras
# Create a branch for your changes
git checkout -b my-feature
# Make your changes, then validate
uv run ruff check --fix simplebroker tests
uv run ruff format simplebroker tests
uv run pytest
# Push and create a pull request
git push origin my-feature
```
## License
MIT © 2025 Van Lindberg
## Acknowledgments
Built with Python, SQLite, and the Unix philosophy.
Raw data
{
"_id": null,
"home_page": null,
"name": "simplebroker",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": null,
"keywords": "broker, cli, message-queue, queue, sqlite",
"author": null,
"author_email": "Van Lindberg <van.lindberg@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/6a/56/c06514e5b9ced49c9090d194ac9cd8604b0fec283069392b3e090615c6f8/simplebroker-1.3.1.tar.gz",
"platform": null,
"description": "# SimpleBroker\n\n*A lightweight message queue backed by SQLite. No setup required, just works.*\n\n```bash\n$ pipx install simplebroker\n$ broker write tasks \"ship it \ud83d\ude80\"\n$ broker read tasks\nship it \ud83d\ude80\n```\n\nSimpleBroker gives you a zero-configuration message queue that runs anywhere Python runs. It's designed to be simple enough to understand in an afternoon, yet powerful enough for real work.\n\n## Features\n\n- **Zero configuration** - No servers, daemons, or complex setup\n- **SQLite-backed** - Rock-solid reliability with true ACID guarantees \n- **Concurrent safe** - Multiple processes can read/write simultaneously\n- **Simple CLI** - Intuitive commands that work with pipes and scripts\n- **Portable** - Each directory gets its own isolated `.broker.db`\n- **Fast** - 1000+ messages/second throughput\n- **Lightweight** - ~1500 lines of code, no external dependencies\n\n## Installation\n\n```bash\n# Install with uv \nuv add simplebroker\n\n# Or with pip\npip install simplebroker\n\n# Or with pipx for global installation (recommended)\npipx install simplebroker\n```\n\nThe CLI is available as both `broker` and `simplebroker`.\n\n**Requirements:**\n- Python 3.8+\n- SQLite 3.35+ (released March 2021) - required for `DELETE...RETURNING` support\n\n## Quick Start\n\n```bash\n# Create a queue and write a message\n$ broker write myqueue \"Hello, World!\"\n\n# Read the message (removes it)\n$ broker read myqueue\nHello, World!\n\n# Write from stdin\n$ echo \"another message\" | broker write myqueue -\n\n# Read all messages at once\n$ broker read myqueue --all\n\n# Peek without removing\n$ broker peek myqueue\n\n# List all queues\n$ broker list\nmyqueue: 3\n\n# Broadcast to all queues\n$ broker broadcast \"System maintenance at 5pm\"\n\n# Clean up when done\n$ broker --cleanup\n```\n\n## Command Reference\n\n### Global Options\n\n- `-d, --dir PATH` - Use PATH instead of current directory\n- `-f, --file NAME` - Database filename or absolute path (default: `.broker.db`)\n - If an absolute path is provided, the directory is extracted automatically\n - Cannot be used with `-d` if the directories don't match\n- `-q, --quiet` - Suppress non-error output (intended reads excepted)\n- `--cleanup` - Delete the database file and exit\n- `--vacuum` - Remove claimed messages and exit\n- `--version` - Show version information\n- `--help` - Show help message\n\n### Commands\n\n| Command | Description |\n|---------|-------------|\n| `write <queue> <message>` | Add a message to the queue |\n| `write <queue> -` | Add message from stdin |\n| `read <queue> [--all] [--json] [-t\\|--timestamps] [--since <ts>]` | Remove and return message(s) |\n| `peek <queue> [--all] [--json] [-t\\|--timestamps] [--since <ts>]` | Return message(s) without removing |\n| `list [--stats]` | Show queues with unclaimed messages (use `--stats` to include claimed) |\n| `purge <queue>` | Delete all messages in queue |\n| `purge --all` | Delete all queues |\n| `broadcast <message>` | Send message to all existing queues |\n\n#### Read/Peek Options\n\n- `--all` - Read/peek all messages in the queue\n- `--json` - Output in line-delimited JSON (ndjson) format for safe handling of special characters\n- `-t, --timestamps` - Include timestamps in output\n - Regular format: `<timestamp>\\t<message>` (tab-separated)\n - JSON format: `{\"message\": \"...\", \"timestamp\": <timestamp>}`\n- `--since <timestamp>` - Return only messages with timestamp > the given value\n - Accepts multiple formats:\n - Native 64-bit timestamp as returned by `--timestamps` (e.g., `1837025672140161024`)\n - ISO 8601 date/datetime (e.g., `2024-01-15`, `2024-01-15T14:30:00Z`)\n - Date-only strings (`YYYY-MM-DD`) are interpreted as the beginning of that day in UTC (00:00:00Z)\n - Naive datetime strings (without timezone) are assumed to be in UTC\n - Unix timestamp in seconds (e.g., `1705329000` or from `date +%s`)\n - Unix timestamp in milliseconds (e.g., `1705329000000`)\n - **Explicit unit suffixes** (strongly recommended for scripts):\n - `1705329000s` - Unix seconds\n - `1705329000000ms` - Unix milliseconds \n - `1705329000000000000ns` - Unix nanoseconds\n - `1837025672140161024hyb` - Native hybrid timestamp\n - **Best practice**: While automatic detection is convenient for interactive use, we strongly recommend using explicit unit suffixes in scripts and applications to ensure predictable behavior and future-proof your code\n - **Automatic disambiguation**: Integer timestamps without suffixes are interpreted based on magnitude:\n - Values < 2^44 are treated as Unix timestamps (seconds, milliseconds, or nanoseconds)\n - Values \u2265 2^44 are treated as native hybrid timestamps\n - This heuristic works reliably until the year ~2527\n - Native format: high 44 bits are milliseconds since Unix epoch, low 20 bits are a counter\n - Note: time.time() returns seconds, so the native format is `int(time.time() * 1000) << 20`\n - Most effective when used with `--all` to process all new messages since a checkpoint\n - Without `--all`, it finds the oldest message in the queue that is newer than `<timestamp>` and returns only that single message\n\n### Exit Codes\n\n- `0` - Success\n- `1` - General error\n- `2` - Queue is empty\n\n## Examples\n\n### Basic Queue Operations\n\n```bash\n# Create a work queue\n$ broker write work \"process customer 123\"\n$ broker write work \"process customer 456\"\n\n# Worker processes tasks\n$ while msg=$(broker read work 2>/dev/null); do\n echo \"Processing: $msg\"\n # do work...\ndone\n```\n\n### Using Multiple Queues\n\n```bash\n# Different queues for different purposes\n$ broker write emails \"send welcome to user@example.com\"\n$ broker write logs \"2023-12-01 system started\"\n$ broker write metrics \"cpu_usage:0.75\"\n\n$ broker list\nemails: 1\nlogs: 1\nmetrics: 1\n```\n\n### Fan-out Pattern\n\n```bash\n# Send to all queues at once\n$ broker broadcast \"shutdown signal\"\n\n# Each worker reads from its own queue\n$ broker read worker1 # -> \"shutdown signal\"\n$ broker read worker2 # -> \"shutdown signal\"\n```\n\n**Note on broadcast behavior**: The `broadcast` command sends a message to all *existing* queues at the moment of execution. There's a small race window - if a new queue is created after the broadcast starts but before it completes, that queue won't receive the message. This is by design to keep the operation simple and atomic.\n\n### Integration with Unix Tools\n\n```bash\n# Store command output\n$ df -h | broker write monitoring -\n$ broker peek monitoring\n\n# Process files through a queue\n$ find . -name \"*.log\" | while read f; do\n broker write logfiles \"$f\"\ndone\n\n# Parallel processing with xargs\n$ broker read logfiles --all | xargs -P 4 -I {} process_log {}\n\n# Use absolute paths for databases in specific locations\n$ broker -f /var/lib/myapp/queue.db write tasks \"backup database\"\n$ broker -f /var/lib/myapp/queue.db read tasks\n```\n\n### Safe Handling with JSON Output\n\nMessages containing newlines, quotes, or other special characters can break shell pipelines. The `--json` flag provides a safe way to handle any message content:\n\n```bash\n# Problem: Messages with newlines break shell processing\n$ broker write alerts \"ERROR: Database connection failed\\nRetrying in 5 seconds...\"\n$ broker read alerts | wc -l\n2 # Wrong! This is one message, not two\n\n# Solution: Use --json for safe handling\n$ broker write alerts \"ERROR: Database connection failed\\nRetrying in 5 seconds...\"\n$ broker read alerts --json\n{\"message\": \"ERROR: Database connection failed\\nRetrying in 5 seconds...\"}\n\n# Parse JSON safely in scripts\n$ broker read alerts --json | jq -r '.message'\nERROR: Database connection failed\nRetrying in 5 seconds...\n\n# Multiple messages with --all --json (outputs ndjson)\n$ broker write safe \"Line 1\\nLine 2\"\n$ broker write safe 'Message with \"quotes\"'\n$ broker write safe \"Tab\\there\"\n$ broker read safe --all --json\n{\"message\": \"Line 1\\nLine 2\"}\n{\"message\": \"Message with \\\"quotes\\\"\"}\n{\"message\": \"Tab\\there\"}\n\n# Parse each line with jq\n$ broker read safe --all --json | jq -r '.message'\nLine 1\nLine 2\nMessage with \"quotes\"\nTab\there\n```\n\nThe JSON output uses line-delimited JSON (ndjson) format:\n- Each message is output on its own line as: `{\"message\": \"content\"}`\n- This format is streaming-friendly and works well with tools like `jq`\n\nThis is the recommended approach for handling messages that may contain special characters, as mentioned in the Security Considerations section.\n\n### Timestamps for Message Ordering\n\nThe `-t/--timestamps` flag includes message timestamps in the output, useful for debugging and understanding message order:\n\n```bash\n# Write some messages\n$ broker write events \"server started\"\n$ broker write events \"user login\"\n$ broker write events \"file uploaded\"\n\n# View with timestamps (non-destructive peek)\n$ broker peek events --all --timestamps\n1837025672140161024\tserver started\n1837025681658085376\tuser login\n1837025689412308992\tfile uploaded\n\n# Read with timestamps and JSON for parsing\n$ broker read events --all --timestamps --json\n{\"message\": \"server started\", \"timestamp\": 1837025672140161024}\n{\"message\": \"user login\", \"timestamp\": 1837025681658085376}\n{\"message\": \"file uploaded\", \"timestamp\": 1837025689412308992}\n\n# Extract just timestamps with jq\n$ broker peek events --all --timestamps --json | jq '.timestamp'\n1837025672140161024\n1837025681658085376\n1837025689412308992\n```\n\nTimestamps are 64-bit hybrid values:\n- High 44 bits: milliseconds since Unix epoch (equivalent to `int(time.time() * 1000)`)\n- Low 20 bits: logical counter for ordering within the same millisecond\n- This guarantees unique, monotonically increasing timestamps even for rapid writes\n\n### Checkpoint-based Processing\n\nThe `--since` flag enables checkpoint-based consumption patterns, ideal for resilient processing:\n\n```bash\n# Process initial messages\n$ broker write tasks \"task 1\"\n$ broker write tasks \"task 2\"\n\n# Read first task and save its timestamp\n$ result=$(broker read tasks --timestamps)\n$ checkpoint=$(echo \"$result\" | cut -f1)\n$ echo \"Processed: $(echo \"$result\" | cut -f2)\"\n\n# More tasks arrive while processing\n$ broker write tasks \"task 3\"\n$ broker write tasks \"task 4\"\n\n# Resume from checkpoint - only get new messages\n$ broker read tasks --all --since \"$checkpoint\"\ntask 2\ntask 3\ntask 4\n\n# Alternative: Use human-readable timestamps\n$ broker peek tasks --all --since \"2024-01-15T14:30:00Z\"\ntask 3\ntask 4\n\n# Or use Unix timestamp from date command\n$ broker peek tasks --all --since \"$(date -d '1 hour ago' +%s)\"\ntask 4\n```\n\nThis pattern is perfect for:\n- Resumable batch processing\n- Fault-tolerant consumers\n- Incremental data pipelines\n- Distributed processing with multiple consumers\n\nNote that simplebroker may return 0 (SUCCESS) even if no messages are returned if the\nqueue exists and has messages, but none match the --since filter.\n\n### Robust Worker with Checkpointing\n\nHere's a complete example of a resilient worker that processes messages in batches and can resume from where it left off after failures:\n\n```bash\n#!/bin/bash\n# resilient-worker.sh - Process messages with checkpoint recovery\n\nQUEUE=\"events\"\nCHECKPOINT_FILE=\"/var/lib/myapp/checkpoint\"\nBATCH_SIZE=100\n\n# Load last checkpoint (default to 0 if first run)\nif [ -f \"$CHECKPOINT_FILE\" ]; then\n last_checkpoint=$(cat \"$CHECKPOINT_FILE\")\nelse\n last_checkpoint=0\nfi\n\necho \"Starting from checkpoint: $last_checkpoint\"\n\n# Main processing loop\nwhile true; do\n # Read batch of messages since checkpoint\n # Note: 'read' is destructive - it removes messages from the queue\n output=$(broker read \"$QUEUE\" --all --json --timestamps --since \"$last_checkpoint\" | head -n \"$BATCH_SIZE\")\n \n # Check if we got any messages\n if [ -z \"$output\" ]; then\n echo \"No new messages, sleeping...\"\n sleep 5\n continue\n fi\n \n echo \"Processing new batch...\"\n \n # Process each message\n echo \"$output\" | while IFS= read -r line; do\n message=$(echo \"$line\" | jq -r '.message')\n timestamp=$(echo \"$line\" | jq -r '.timestamp')\n \n # Process the message (your business logic here)\n echo \"Processing: $message\"\n if ! process_event \"$message\"; then\n echo \"Error processing message, will retry on next run\"\n echo \"Checkpoint remains at last successful message: $last_checkpoint\"\n # Exit without updating checkpoint - failed message will be reprocessed\n exit 1\n fi\n \n # Atomically update checkpoint ONLY after successful processing\n echo \"$timestamp\" > \"$CHECKPOINT_FILE.tmp\"\n mv \"$CHECKPOINT_FILE.tmp\" \"$CHECKPOINT_FILE\"\n \n # Update our local variable for next iteration\n last_checkpoint=\"$timestamp\"\n done\n \n echo \"Batch complete, checkpoint at: $last_checkpoint\"\ndone\n```\n\nKey features of this pattern:\n- **Atomic checkpoint updates**: Uses temp file + rename for crash safety\n- **Per-message checkpointing**: Updates checkpoint after each successful message (no data loss)\n- **Batch processing**: Processes up to BATCH_SIZE messages at a time for efficiency\n- **Failure recovery**: On error, exits without updating checkpoint so failed message is retried\n- **Efficient polling**: Only queries for new messages (timestamp > checkpoint)\n- **Progress tracking**: Checkpoint file persists exact progress across restarts\n\n### Remote Queue via SSH\n\n```bash\n# Write to remote queue\n$ echo \"remote task\" | ssh server \"cd /app && broker write tasks -\"\n\n# Read from remote queue \n$ ssh server \"cd /app && broker read tasks\"\n```\n\n## Design Philosophy\n\nSimpleBroker follows the Unix philosophy: do one thing well. It's not trying to replace RabbitMQ or Redis - it's for when you need a queue without the complexity.\n\n**What SimpleBroker is:**\n- A simple, reliable message queue\n- Perfect for scripts, cron jobs, and small services\n- Easy to understand and debug\n- Portable between environments\n\n**What SimpleBroker is not:**\n- A distributed message broker\n- A pub/sub system\n- A replacement for production message queues\n- Suitable for high-frequency trading\n\n## Technical Details\n\n### Storage\n\nMessages are stored in a SQLite database with Write-Ahead Logging (WAL) enabled for better concurrency. Each message is stored with:\n\n```sql\nCREATE TABLE messages (\n id INTEGER PRIMARY KEY AUTOINCREMENT, -- Ensures strict FIFO ordering\n queue TEXT NOT NULL,\n body TEXT NOT NULL,\n ts INTEGER NOT NULL -- Millisecond timestamp + hybrid logical clock \n)\n```\n\nThe `id` column guarantees global FIFO ordering across all processes.\nNote: FIFO ordering is strictly guaranteed by the `id` column, not the timestamp.\n\n### Concurrency\n\nSQLite's built-in locking handles concurrent access. Multiple processes can safely read and write simultaneously. Messages are delivered **exactly once** by default using atomic `DELETE...RETURNING` operations.\n\n**Delivery Guarantees**\n- **Default behavior**: All reads (single and bulk) provide exactly-once delivery with immediate commits\n- **Performance optimization**: For bulk reads (`--all`), you can trade safety for speed by setting `BROKER_READ_COMMIT_INTERVAL` to a number greater than 1 to batch messages. If a consumer crashes mid-batch, uncommitted messages remain in the queue and will be redelivered to the next consumer (at-least-once delivery).\n\n**FIFO Ordering Guarantee:**\n- **True FIFO ordering across all processes**: Messages are always read in the exact order they were written to the database, regardless of which process wrote them\n- **Guaranteed by SQLite's autoincrement**: Each message receives a globally unique, monotonically increasing ID\n- **No ordering ambiguity**: Even when multiple processes write simultaneously, SQLite ensures strict serialization\n\n### Message Lifecycle and Claim Optimization\n\nSimpleBroker uses a two-phase message lifecycle for performance optimization that's transparent to users:\n\n1. **Claim Phase**: When messages are read, they're marked as \"claimed\" instead of being immediately deleted. This allows for extremely fast read operations (~3x faster) by avoiding expensive DELETE operations in the critical path.\n\n2. **Vacuum Phase**: Claimed messages are periodically cleaned up by an automatic background process or manually via the `broker --vacuum` command. This ensures the database doesn't grow unbounded while keeping read operations fast.\n\nThis optimization is completely transparent - messages are still delivered exactly once, and from the user's perspective, a read message is gone. The cleanup happens automatically based on configurable thresholds.\n\n**Note on `list` command**: By default, `broker list` only shows queues with unclaimed messages. To see all queues including those with only claimed messages awaiting vacuum, use `broker list --stats`. This also displays claim statistics for each queue.\n\n### Performance\n\n- **Throughput**: 1000+ messages/second on typical hardware\n- **Latency**: <10ms for write, <10ms for read\n- **Scalability**: Tested with 100k+ messages per queue\n- **Read optimization**: ~3x faster reads due to the claim-based message lifecycle optimization\n\n**Note on CLI vs Library Usage**: For CLI-only use, startup cost predominates the overall performance. If you need to process 1000+ messages per second, use the library interface directly to avoid the overhead of repeated process creation.\n\n### Security\n\n- Queue names are validated (alphanumeric + underscore + hyphen + period only, can't start with hyphen or period)\n- Message size limited to 10MB\n- Database files created with 0600 permissions\n- SQL injection prevented via parameterized queries\n\n**Security Considerations:**\n- **Message bodies are not validated** - they can contain any text including newlines, control characters, and shell metacharacters\n- **Shell injection risks** - When piping output to shell commands, malicious message content could execute unintended commands\n- **Special characters** - Messages containing newlines or other special characters can break shell pipelines that expect single-line output\n- **Recommended practice** - Always sanitize or validate message content before using it in shell commands or other security-sensitive contexts\n\n### Environment Variables\n\nSimpleBroker can be configured via environment variables:\n\n- `BROKER_BUSY_TIMEOUT` - SQLite busy timeout in milliseconds (default: 5000)\n- `BROKER_CACHE_MB` - SQLite page cache size in megabytes (default: 10)\n - Larger cache improves performance for repeated queries and large scans\n - Recommended: 10-50 MB for typical workloads, 100+ MB for heavy use\n- `BROKER_SYNC_MODE` - SQLite synchronous mode: FULL, NORMAL, or OFF (default: FULL)\n - `FULL`: Maximum durability, safe against power loss (default)\n - `NORMAL`: ~25% faster writes, safe against app crashes, small risk on power loss\n - `OFF`: Fastest but unsafe - only for testing or non-critical data\n- `BROKER_READ_COMMIT_INTERVAL` - Number of messages to read before committing in `--all` mode (default: 1)\n - Default of 1 provides exactly-once delivery guarantee (~10,000 messages/second)\n - Increase for better performance with at-least-once delivery guarantee\n - With values > 1, messages are only deleted after being successfully delivered\n - Trade-off: larger batches hold database locks longer, reducing concurrency\n- `BROKER_AUTO_VACUUM` - Enable automatic vacuum of claimed messages (default: true)\n - When enabled, vacuum runs automatically when thresholds are exceeded\n - Set to `false` to disable automatic cleanup and run `broker vacuum` manually\n- `BROKER_VACUUM_THRESHOLD` - Number of claimed messages before auto-vacuum triggers (default: 10000)\n - Higher values reduce vacuum frequency but use more disk space\n - Lower values keep the database smaller but run vacuum more often\n- `BROKER_VACUUM_BATCH_SIZE` - Number of messages to delete per vacuum batch (default: 1000)\n - Larger batches are more efficient but hold locks longer\n - Smaller batches are more responsive but less efficient\n- `BROKER_VACUUM_LOCK_TIMEOUT` - Seconds before a vacuum lock is considered stale (default: 300)\n - Prevents orphaned lock files from blocking vacuum operations\n - Lock files older than this are automatically removed\n\n## Development\n\nSimpleBroker uses [`uv`](https://github.com/astral-sh/uv) for package management and [`ruff`](https://github.com/astral-sh/ruff) for linting and formatting.\n\n```bash\n# Clone the repository\ngit clone git@github.com:VanL/simplebroker.git\ncd simplebroker\n\n# Install uv if you haven't already\ncurl -LsSf https://astral.sh/uv/install.sh | sh\n\n# Install all dependencies including dev extras\nuv sync --all-extras\n\n# Run tests (fast tests only, in parallel)\nuv run pytest\n\n# Run all tests including slow ones (with 1000+ subprocess spawns)\nuv run pytest -m \"\"\n\n# Run tests with coverage\nuv run pytest --cov=simplebroker --cov-report=term-missing\n\n# Run specific test files\nuv run pytest tests/test_smoke.py\n\n# Run tests in a single process (useful for debugging)\nuv run pytest -n 0\n\n# Lint and format code\nuv run ruff check simplebroker tests # Check for issues\nuv run ruff check --fix simplebroker tests # Fix auto-fixable issues\nuv run ruff format simplebroker tests # Format code\n\n# Type check\nuv run mypy simplebroker\n```\n\n### Development Workflow\n\n1. **Before committing**:\n ```bash\n uv run ruff check --fix simplebroker tests\n uv run ruff format simplebroker tests\n uv run mypy simplebroker\n uv run pytest\n ```\n\n2. **Building packages**:\n ```bash\n uv build # Creates wheel and sdist in dist/\n ```\n\n3. **Installing locally for testing**:\n ```bash\n uv pip install dist/simplebroker-*.whl\n ```\n\n## Contributing\n\nContributions are welcome! Please:\n\n1. Keep it simple - the entire codebase should stay understandable in an afternoon\n2. Maintain backward compatibility\n3. Add tests for new features\n4. Update documentation\n5. Run `uv run ruff` and `uv run pytest` before submitting PRs\n\n### Setting up for development\n\n```bash\n# Fork and clone the repo\ngit clone git@github.com:VanL/simplebroker.git\ncd simplebroker\n\n# Install development environment\nuv sync --all-extras\n\n# Create a branch for your changes\ngit checkout -b my-feature\n\n# Make your changes, then validate\nuv run ruff check --fix simplebroker tests\nuv run ruff format simplebroker tests\nuv run pytest\n\n# Push and create a pull request\ngit push origin my-feature\n```\n\n## License\n\nMIT \u00a9 2025 Van Lindberg\n\n## Acknowledgments\n\nBuilt with Python, SQLite, and the Unix philosophy. ",
"bugtrack_url": null,
"license": "MIT",
"summary": "A lightweight message queue backed by SQLite",
"version": "1.3.1",
"project_urls": {
"Documentation": "https://github.com/VanL/simplebroker#readme",
"Homepage": "https://github.com/VanL/simplebroker",
"Issues": "https://github.com/VanL/simplebroker/issues",
"Repository": "https://github.com/VanL/simplebroker.git"
},
"split_keywords": [
"broker",
" cli",
" message-queue",
" queue",
" sqlite"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "3cc5d41e1b8ebc13071d67b4e1696113f409742e40fb97397b884055b2a94c6b",
"md5": "d1cba4641d0212c341a038f433f755dd",
"sha256": "9b99d5cdbbc15226ffaae95b9bff50491e4847cfb0edc8006a54ceb43e384ed3"
},
"downloads": -1,
"filename": "simplebroker-1.3.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "d1cba4641d0212c341a038f433f755dd",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 40338,
"upload_time": "2025-07-09T23:37:37",
"upload_time_iso_8601": "2025-07-09T23:37:37.805581Z",
"url": "https://files.pythonhosted.org/packages/3c/c5/d41e1b8ebc13071d67b4e1696113f409742e40fb97397b884055b2a94c6b/simplebroker-1.3.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "6a56c06514e5b9ced49c9090d194ac9cd8604b0fec283069392b3e090615c6f8",
"md5": "633ad0b4ce38cbf02d0f12a0f58d129e",
"sha256": "1d069ff4b08f147fed6085383cdf98eacc950c20f68ecc5e47141bc3dd7c815d"
},
"downloads": -1,
"filename": "simplebroker-1.3.1.tar.gz",
"has_sig": false,
"md5_digest": "633ad0b4ce38cbf02d0f12a0f58d129e",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 28710,
"upload_time": "2025-07-09T23:37:39",
"upload_time_iso_8601": "2025-07-09T23:37:39.061259Z",
"url": "https://files.pythonhosted.org/packages/6a/56/c06514e5b9ced49c9090d194ac9cd8604b0fec283069392b3e090615c6f8/simplebroker-1.3.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-09 23:37:39",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "VanL",
"github_project": "simplebroker#readme",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "simplebroker"
}