simplebroker


Namesimplebroker JSON
Version 2.7.1 PyPI version JSON
download
home_pageNone
SummaryA lightweight message queue backed by SQLite
upload_time2025-10-24 22:06:24
maintainerNone
docs_urlNone
authorNone
requires_python>=3.10
licenseMIT
keywords broker cli message-queue queue sqlite
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # SimpleBroker

  [![CI](https://github.com/VanL/simplebroker/actions/workflows/test.yml/badge.svg)](https://github.com/VanL/simplebroker/actions/workflows/test.yml)
  [![codecov](https://codecov.io/gh/VanL/simplebroker/branch/main/graph/badge.svg)](https://codecov.io/gh/VanL/simplebroker)
  [![PyPI version](https://badge.fury.io/py/simplebroker.svg)](https://badge.fury.io/py/simplebroker)
  [![Python versions](https://img.shields.io/pypi/pyversions/simplebroker.svg)](https://pypi.org/project/simplebroker/)

*A lightweight message queue backed by SQLite. No setup required, just works.*

```bash
$ pipx install simplebroker
$ broker write tasks "ship it πŸš€"
$ broker read tasks
ship it πŸš€
```

SimpleBroker is a zero-configuration, no-dependency message queue that runs anywhere Python runs. It's designed to be simple enough to understand in an afternoon, yet powerful enough for real work.

## Table of Contents

- [Features](#features)
- [Use Cases](#use-cases)
- [Installation](#installation)
- [Quick Start](#quick-start)
- [Command Reference](#command-reference)
  - [Global Options](#global-options)
  - [Commands](#commands)
  - [Command Options](#command-options)
  - [Exit Codes](#exit-codes)
- [Critical Safety Notes](#️-critical-safety-notes)
  - [Potential Data Loss with `watch`](#potential-data-loss-with-watch)
  - [Safe Message Handling](#safe-message-handling)
- [Core Concepts](#core-concepts)
  - [Timestamps as Message IDs](#timestamps-as-message-ids)
  - [JSON for Safe Processing](#json-for-safe-processing)
  - [Checkpoint-based Processing](#checkpoint-based-processing)
- [Common Patterns](#common-patterns)
- [Real-time Queue Watching](#real-time-queue-watching)
- [Python API](#python-api)
- [Performance & Tuning](#performance--tuning)
- [Project Scoping](#project-scoping)
  - [Basic Project Scoping](#basic-project-scoping)
  - [Project Database Names](#project-database-names)
  - [Error Behavior](#error-behavior-when-no-project-database-found)
  - [Project Initialization](#project-initialization)
  - [Precedence Rules](#precedence-rules)
  - [Security Notes](#security-notes)
  - [Common Use Cases](#common-use-cases)
- [Architecture & Technical Details](#architecture--technical-details)
- [Development & Contributing](#development--contributing)
- [License](#license)

## Features

- **Zero configuration** - No servers, daemons, or complex setup
- **SQLite-backed** - Rock-solid reliability with true ACID guarantees
- **Concurrent safe** - Multiple processes can read/write simultaneously
- **Simple CLI** - Intuitive commands that work with pipes and scripts
- **Portable** - Each directory gets its own isolated `.broker.db`
- **Fast** - 1000+ messages/second throughput
- **Lightweight** - ~2500 lines of code, no external dependencies
- **Real-time** - Built-in watcher for event-driven workflows

## Use Cases

- **Shell Scripting:** Decouple stages of a complex script
- **Background Jobs:** Manage tasks for cron jobs or systemd services
- **Development:** Simple message queue for local development without Docker
- **Data Pipelines:** Pass file paths or data chunks between processing steps
- **CI/CD Pipelines:** Coordinate build stages without external dependencies
- **Log Processing:** Buffer logs before aggregation or analysis
- **Simple IPC:** Communication between processes on the same machine

**Good for:** Scripts, cron jobs, small services, development  
**Not for:** Distributed systems, pub/sub, high-frequency trading

## Installation

```bash
# Use pipx for global installation (recommended)
pipx install simplebroker

# Or install with uv to use as a library
uv add simplebroker

# Or with pip
pip install simplebroker
```

The CLI is available as both `broker` and `simplebroker`.

**Requirements:**
- Python 3.10+
- SQLite 3.35+ (released March 2021) - required for `DELETE...RETURNING` support

## Quick Start

```bash
# Write a message
$ broker write myqueue "Hello, World!"

# Read the message (removes it)
$ broker read myqueue
Hello, World!

# Write from stdin
$ echo "another message" | broker write myqueue -

# Read all messages at once
$ broker read myqueue --all

# Peek without removing
$ broker peek myqueue

# Move messages between queues
$ broker move myqueue processed
$ broker move errors retry --all

# list all queues
$ broker list
myqueue: 3
processed: 1

# Broadcast to all queues
$ broker broadcast "System maintenance at 5pm"
# Target only matching queues using fnmatch-style globs
$ broker broadcast --pattern 'jobs-*' "Pipeline paused"

# Clean up when done
$ broker --cleanup
```

## Command Reference

### Global Options

- `-d, --dir PATH` - Use PATH instead of current directory
- `-f, --file NAME` - Database filename or absolute path (default: `.broker.db`)
  - If an absolute path is provided, the directory is extracted automatically
  - Cannot be used with `-d` if the directories don't match
- `-q, --quiet` - Suppress non-error output
- `--cleanup` - Delete the database file and exit
- `--vacuum` - Remove claimed messages and exit
- `--status` - Show global message count, last timestamp, and DB size (`--status --json` for JSON output)
- `--version` - Show version information
- `--help` - Show help message

### Commands

| Command | Description |
|---------|-------------|
| `write <queue> <message\|->` | Add message to queue (use `-` for stdin) |
| `read <queue> [options]` | Remove and return message(s) |
| `peek <queue> [options]` | Return message(s) without removing |
| `move <source> <dest> [options]` | Atomically transfer messages between queues |
| `list [--stats]` | Show queues and message counts |
| `delete <queue> [-m <id>]` | Delete queue or specific message (marks for removal; use `--vacuum` to reclaim space) |
| `delete --all` | Delete all queues (marks for removal; use `--vacuum` to reclaim space) |
| `broadcast <message\|->` | Send message to all existing queues |
| `watch <queue> [options]` | Watch queue for new messages |
| `alias <add|remove|list|->` | Manage queue aliases |
| `init [--force]` | Initialize SimpleBroker database in current directory (does not accept `-d` or `-f` flags) |

#### Queue Aliases

Use aliases when two agents refer to the same underlying queue with different names. Aliases are stored in the database, persist across processes, and update atomically.

```bash
$ broker alias add task1.outbox agent1-to-agent2
$ broker alias add task2.inbox agent1-to-agent2
$ broker write @task1.outbox "Job ready"
$ broker read @task2.inbox
Job ready
$ broker alias list
task1.outbox -> agent1-to-agent2
task2.inbox -> agent1-to-agent2
$ broker alias list --target agent1-to-agent2
task1.outbox -> agent1-to-agent2
task2.inbox -> agent1-to-agent2
$ broker write task1.outbox "goes to literal queue"
$ broker read task1.outbox
goes to literal queue
$ broker alias remove task1.outbox
```

- Plain queue names (`task1.outbox`) always refer to the literal queue. Use the
  `@` prefix (`@task1.outbox`) to opt into alias resolutionβ€”if the alias is not
  defined the command fails.
- Alias names are plain queue names (no `@` prefix); when *using* an alias on the CLI, prefix it with `@`.
- Use `alias list --target <queue>` to see which aliases point to a specific queue (reverse lookup).
- A target must be a real queue name (not another alias). Attempts to alias an alias or create cycles raise `ValueError`.
- Removing an alias does not affect stored messages; they remain under the canonical queue name.

### Command Options

**Common options for read/peek/move:**
- `--all` - Process all messages (CLI moves up to 1,000,000 per invocation; rerun for larger queues or use the Python API generators)
- `--json` - Output as line-delimited JSON (includes timestamps)
- `-t, --timestamps` - Include timestamps in output
- `-m <id>` - Target specific message by its 19-digit timestamp ID
- `--since <timestamp>` - Process messages newer than timestamp

**Watch options:**
- `--peek` - Monitor without consuming
- `--move <dest>` - Continuously drain to destination queue
- `--quiet` - Suppress startup message

**Timestamp formats for `--since`:**
- ISO 8601: `2024-01-15T14:30:00Z` or `2024-01-15` (midnight UTC)
- Unix seconds: `1705329000` or `1705329000s`
- Unix milliseconds: `1705329000000ms`
- Unix nanoseconds/Native hybrid: `1837025672140161024` or `1837025672140161024ns`

**Best practice:** Heuristics are used to distinguish between different values for interactive use, but explicit suffixes (s/ms/ns) are recommended for clarity if referring to particular times. 

### Exit Codes
- `0` - Success (returns 0 even when no messages match filters like `--since`)
- `1` - General error (e.g., database access error, invalid arguments)
- `2` - Queue empty, no matching messages, or invalid message ID format (only when queue is actually empty, no messages match the criteria, or the provided message ID has an invalid format)

**Note:** The `delete` command marks messages as "claimed" for performance. Use `--vacuum` to permanently remove them.

## Critical Safety Notes

### Safe Message Handling

Messages can contain any characters including newlines, control characters, and shell metacharacters:
- **Shell injection risks** - When piping output to shell commands, malicious message content could execute unintended commands
- **Special characters** - Messages containing newlines or other special characters can break shell pipelines that expect single-line output
- **Queue names** - Limited to alphanumeric + underscore/hyphen/period (cannot start with hyphen or period)
- **Message size** - Limited to 10MB

**Always use `--json` for safe handling** - see examples below.

### Robust message handling with `watch`

When using `watch` in its default consuming mode, messages are **permanently removed** from the queue *before* your script or handler processes them. If your script fails or crashes, **the message is lost**. For critical data, you must use a safe processing pattern (move or peek-then-delete) that ensures that your data is not removed until you can acknowledge receipt. Example:

```bash
#!/bin/bash
# safe-worker.sh - A robust worker using the peek-and-acknowledge pattern

# Watch in peek mode, which does not remove messages
broker watch tasks --peek --json | while IFS= read -r line; do
    message=$(echo "$line" | jq -r '.message')
    timestamp=$(echo "$line" | jq -r '.timestamp')
    
    echo "Processing message ID: $timestamp"
    if process_task "$message"; then
        # Success: remove the specific message by its unique ID
        broker delete tasks -m "$timestamp"
    else
        echo "Failed to process, message remains in queue for retry." >&2
        # Optional: move to a dead-letter queue
        # echo "$message" | broker write failed_tasks -
    fi
done
```

## Core Concepts

### Timestamps as Message IDs
Every message receives a unique 64-bit number that serves dual purposes as a timestamp and unique message ID. Timestamps are always included in JSON output. 
Timestamps can be included in regular output by passing the -t/--timestamps flag. 

Timestamps are:
- **Unique** - No collisions even with concurrent writers (enforced by database constraint)
- **Time-ordered** - Natural chronological sorting
- **Efficient** - 64-bit integers, not UUIDs
- **Meaningful** - Can extract creation time from the ID

The format:
- High 52 bits: microseconds since Unix epoch
- Low 12 bits: logical counter for sub-microsecond ordering
- Similar to Twitter's Snowflake IDs or UUID7


### JSON for Safe Processing

Messages with newlines or special characters can break shell pipelines. Use `--json` to avoid shell issues:

```bash
# Problem: newlines break line counting
$ broker write alerts "ERROR: Database connection failed\nRetrying in 5 seconds..."
$ broker read alerts | wc -l
2  # Wrong! One message counted as two

# Solution: JSON output (line-delimited)
$ broker write alerts "ERROR: Database connection failed\nRetrying in 5 seconds..."
$ broker read alerts --json
{"message": "ERROR: Database connection failed\nRetrying in 5 seconds...", "timestamp": 1837025672140161024}

# Parse safely with jq
$ broker read alerts --json | jq -r '.message'
ERROR: Database connection failed
Retrying in 5 seconds...
```

### Checkpoint-based Processing

Use `--since` for resumable processing:

```bash
# Save checkpoint after processing
$ result=$(broker read tasks --json)
$ checkpoint=$(echo "$result" | jq '.timestamp')

# Resume from checkpoint
$ broker read tasks --all --since "$checkpoint"

# Or use human-readable timestamps
$ broker read tasks --all --since "2024-01-15T14:30:00Z"
```

## Common Patterns

<details>
<summary>Basic Worker Loop</summary>

```bash
while msg=$(broker read work 2>/dev/null); do
    echo "Processing: $msg"
    # do work...
done
```
</details>

<details>
<summary>Multiple Queues</summary>

```bash
# Different queues for different purposes
$ broker write emails "send welcome to user@example.com"
$ broker write logs "2023-12-01 system started"
$ broker write metrics "cpu_usage:0.75"

$ broker list
emails: 1
logs: 1
metrics: 1
```
</details>

<details>
<summary>Fan-out with Broadcast</summary>

```bash
# Send to all queues at once
$ broker broadcast "shutdown signal"

# Each worker reads from its own queue
$ broker read worker1  # -> "shutdown signal"
$ broker read worker2  # -> "shutdown signal"
```

**Note:** Broadcast sends to all *existing* queues at execution time. There's a small race window for queues created during broadcast.

**Alias interaction:** Broadcast operations ignore aliases and work only on literal queue names. Pattern matching with `--pattern` matches queue names, not alias names.
</details>

<details>
<summary>Unix Tool Integration</summary>

```bash
# Store command output
$ df -h | broker write monitoring -

# Process files through a queue
$ find . -name "*.log" | while read f; do
    broker write logfiles "$f"
done

# Parallel processing with xargs
$ broker read logfiles --all | xargs -P 4 -I {} process_log {}

# Remote queue via SSH
$ echo "remote task" | ssh server "cd /app && broker write tasks -"
$ ssh server "cd /app && broker read tasks"


### Integration with Unix Tools

```bash
# Pipe to queue
$ df -h | broker write monitoring -

# Store command output
$ df -h | broker write monitoring -
$ broker peek monitoring

# Process files through a queue
$ find . -name "*.log" | while read f; do
    broker write logfiles "$f"
done

# Parallel processing with xargs
$ broker read logfiles --all | xargs -P 4 -I {} process_log {}

# Use absolute paths for databases in specific locations
$ broker -f /var/lib/myapp/queue.db write tasks "backup database"
$ broker -f /var/lib/myapp/queue.db read tasks

# Reserving work using move
$ msg_json=$(broker move todo in-process --json 2>/dev/null)
  if [ -n "$msg_json" ]; then
      msg_id=$(echo "$msg_json" | jq -r '.[0].id')
      msg_data=$(echo "$msg_json" | jq -r '.[0].data')

      echo "Processing message $msg_id: $msg_data"

      # Process the message here
      # ...

      # Delete after successful processing
      broker delete in-process -m "$msg_id"
  else
      echo "No messages to process"
  fi
```
</details>

<details>
<summary>Dead Letter Queue Pattern</summary>

```bash
# Process messages, moving failures to DLQ
while msg=$(broker read tasks); do
    if ! process_task "$msg"; then
        echo "$msg" | broker write dlq -
    fi
done

# Retry failed messages
broker move dlq tasks --all
```
</details>

<details>
<summary>Resilient Worker with Checkpointing</summary>

```bash
#!/bin/bash
# resilient-worker.sh - Process messages with checkpoint recovery

QUEUE="events"
CHECKPOINT_FILE="/var/lib/myapp/checkpoint"
BATCH_SIZE=100

# Load last checkpoint (default to 0 if first run)
last_checkpoint=$(cat "$CHECKPOINT_FILE" 2>/dev/null || echo 0)
echo "Starting from checkpoint: $last_checkpoint"

while true; do
    # Check if there are messages newer than our checkpoint
    if ! broker peek "$QUEUE" --json --since "$last_checkpoint" >/dev/null 2>&1; then
        echo "No new messages, sleeping..."
        sleep 5
        continue
    fi
    
    echo "Processing new messages..."
    
    # Process messages one at a time to avoid data loss
    processed=0
    while [ $processed -lt $BATCH_SIZE ]; do
        # Read exactly one message newer than checkpoint
        message_data=$(broker read "$QUEUE" --json --since "$last_checkpoint" 2>/dev/null)
        
        # Check if we got a message
        if [ -z "$message_data" ]; then
            echo "No more messages to process"
            break
        fi
        
        # Extract message and timestamp
        message=$(echo "$message_data" | jq -r '.message')
        timestamp=$(echo "$message_data" | jq -r '.timestamp')
        
        # Process the message
        echo "Processing: $message"
        if ! process_event "$message"; then
            echo "Error processing message, will retry on next run"
            # Exit without updating checkpoint - failed message will be reprocessed
            exit 1
        fi
        
        # Atomically update checkpoint ONLY after successful processing
        echo "$timestamp" > "$CHECKPOINT_FILE.tmp"
        mv "$CHECKPOINT_FILE.tmp" "$CHECKPOINT_FILE"
        
        # Update our local variable for next iteration
        last_checkpoint="$timestamp"
        processed=$((processed + 1))
    done
    
    if [ $processed -eq 0 ]; then
        echo "No messages processed, sleeping..."
        sleep 5
    else
        echo "Batch complete, processed $processed messages"
    fi
done
```

Key features:
- **No data loss from pipe buffering** - Reads messages one at a time
- **Atomic checkpoint updates** - Uses temp file + rename for crash safety
- **Per-message checkpointing** - Updates checkpoint after each successful message
- **Batch processing** - Processes up to BATCH_SIZE messages at a time for efficiency
- **Failure recovery** - On error, exits without updating checkpoint so failed message is retried
</details>

## Real-time Queue Watching

The `watch` command provides three modes for monitoring queues:

1. **Consume** (default): Process and remove messages from the queue
2. **Peek** (`--peek`): Monitor messages without removing them
3. **Move** (`--move DEST`): Drain ALL messages to another queue

```bash
# Start watching a queue (consumes messages)
$ broker watch tasks

# Watch without consuming (peek mode)
$ broker watch tasks --peek

# Watch with JSON output (timestamps always included)
$ broker watch tasks --json
{"message": "task 1", "timestamp": 1837025672140161024}

# Continuously drain one queue to another
$ broker watch source_queue --move destination_queue
```

The watcher uses an efficient polling strategy:
- **Burst mode**: First 100 checks with zero delay for immediate message pickup
- **Smart backoff**: Gradually increases polling interval to 0.1s maximum
- **Low overhead**: Uses SQLite's data_version to detect changes without querying
- **Graceful shutdown**: Handles Ctrl-C (SIGINT) cleanly

### Move Mode (`--move`)

The `--move` option provides continuous queue-to-queue message migration:

```bash
# Like: tail -f /var/log/app.log | tee -a /var/log/processed.log
$ broker watch source_queue --move dest_queue
```

Key characteristics:
- **Drains entire queue**: Moves ALL messages from source to destination
- **Atomic operation**: Each message is atomically moved before being displayed
- **No filtering**: Incompatible with `--since` (would leave messages stranded)
- **Concurrent safe**: Multiple move watchers can run safely without data loss

## Python API

SimpleBroker also provides a Python API for more advanced use cases:

```python
from simplebroker import Queue, QueueWatcher
from simplebroker.db import DBConnection
import logging

# Basic usage
with Queue("tasks") as q:
    q.write("process order 123")
    message = q.read()  # Returns: "process order 123"

# Safe peek-and-acknowledge pattern (recommended for critical data)
def process_message(message: str, timestamp: int):
    """Process message and acknowledge only on success."""
    logging.info(f"Processing: {message}")
    
    # Simulate processing that might fail
    if "error" in message:
        raise ValueError("Simulated processing failure")
    
    # If we get here, processing succeeded
    # Now explicitly acknowledge by deleting the message
    with Queue("tasks") as q:
        q.delete(message_id=timestamp)
    logging.info(f"Message {timestamp} acknowledged")

def handle_error(exception: Exception, message: str, timestamp: int) -> bool:
    """Log error and optionally move to dead-letter queue."""
    logging.error(f"Failed to process message {timestamp}: {exception}")
    # Message remains in queue for retry since we're using peek=True
    
    # Optional: After N retries, move to dead-letter queue
    # Queue("errors").write(f"{timestamp}:{message}:{exception}")
    
    return True  # Continue watching

# Use peek=True for safe mode - messages aren't removed until explicitly acknowledged
```

### Generating timestamps without writing

Sometimes you need a broker-compatible timestamp/ID before enqueueing a message (for logging, correlation IDs, or backpressure planning). You can ask SimpleBroker to generate one without writing a row:

```python
with DBConnection("/path/to/.broker.db") as conn:
    db = conn.get_connection()
    ts = db.generate_timestamp()  # alias: db.get_ts()

queue = Queue("tasks", db_path="/path/to/.broker.db")
ts2 = queue.generate_timestamp()  # alias: queue.get_ts()

print(ts2 > ts)  # Monotonic within a database
```

Notes:
- Timestamps are monotonic per database and match what `Queue.write()` uses internally.
- Generating a timestamp does not reserve a slot; it simply gives you the next ID.
watcher = QueueWatcher(
    queue=Queue("tasks"),
    handler=process_message,
    error_handler=handle_error,
    peek=True  # True = safe mode - just observe, don't consume
)

# Start watching (blocks until stopped)
try:
    watcher.watch()
except KeyboardInterrupt:
    print("Watcher stopped by user")
```

### Thread-Based Background Processing

Use `run_in_thread()` to run watchers in background threads:

```python
from pathlib import Path
from simplebroker import QueueWatcher

def handle_message(msg: str, ts: int):
    print(f"Processing: {msg}")

# Create watcher with database path (recommended for thread safety)
watcher = QueueWatcher(
    Path("my.db"),
    "orders",
    handle_message
)

# Start in background thread
thread = watcher.run_in_thread()

# Do other work...

# Stop when done
watcher.stop()
thread.join()
```

### Context Manager Support

For cleaner resource management, watchers can be used as context managers which automatically start the thread and ensure proper cleanup:

```python
import time
from simplebroker import QueueWatcher

def handle_message(msg: str, ts: int):
    print(f"Received: {msg}")

# Automatic thread management with context manager
with QueueWatcher("my.db", "notifications", handle_message) as watcher:
    # Thread is started automatically
    # Do other work while watcher processes messages
    time.sleep(10)
    
# Thread is automatically stopped and joined when exiting the context
# Ensures proper cleanup even if an exception occurs
```

SimpleBroker is synchronous by design for simplicity, but can be easily integrated with async applications:

```python
import asyncio
import concurrent.futures
from simplebroker import Queue

class AsyncQueue:
    """Async wrapper for SimpleBroker Queue using thread pool executor."""
    
    def __init__(self, queue_name: str, db_path: str = ".broker.db"):
        self.queue_name = queue_name
        self.db_path = db_path
        self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
        
    async def write(self, message: str) -> None:
        """Write message asynchronously."""
        loop = asyncio.get_event_loop()
        def _write():
            with Queue(self.queue_name, self.db_path) as q:
                q.write(message)
        await loop.run_in_executor(self._executor, _write)
    
    async def read(self) -> str | None:
        """Read message asynchronously."""
        loop = asyncio.get_event_loop()
        def _read():
            with Queue(self.queue_name, self.db_path) as q:
                return q.read()
        return await loop.run_in_executor(self._executor, _read)

# Usage
async def main():
    tasks_queue = AsyncQueue("tasks")
    
    # Write messages concurrently
    await asyncio.gather(
        tasks_queue.write("Task 1"),
        tasks_queue.write("Task 2"),
        tasks_queue.write("Task 3")
    )
    
    # Read messages
    while msg := await tasks_queue.read():
        print(f"Got: {msg}")
```

For advanced use cases requiring direct database access, you can use the `DBConnection` context manager:

```python
from simplebroker.db import DBConnection

# Safe database connection management for cross-queue operations
with DBConnection("myapp.db") as conn:
    db = conn.get_connection()
    
    # Perform cross-queue operations
    queues = db.get_queue_stats()
    for queue_name, stats in queues.items():
        print(f"{queue_name}: {stats['pending']} pending")
    
    # Broadcast to all queues
    db.broadcast("System maintenance at 5pm")
```

**Key async integration strategies:**

1. **Use Queue API**: Prefer the high-level Queue class for single-queue operations
2. **Thread Pool Executor**: Run SimpleBroker's sync methods in threads
3. **One Queue Per Operation**: Create fresh Queue instances for thread safety
4. **DBConnection for Advanced Use**: Use DBConnection context manager for cross-queue operations

See [`examples/async_wrapper.py`](examples/async_wrapper.py) for a complete async wrapper implementation including:
- Async context manager for proper cleanup
- Background watcher with asyncio coordination
- Streaming message consumption
- Concurrent queue operations

### Advanced: Custom Extensions

**Note:** This is an advanced example showing how to extend SimpleBroker's internals. Most users should use the standard Queue API.

```python
from simplebroker.db import BrokerDB, DBConnection
from simplebroker import Queue

class PriorityQueueSystem:
    """Example: Priority queue system using multiple standard queues."""
    
    def __init__(self, db_path: str = ".broker.db"):
        self.db_path = db_path
    
    def write_with_priority(self, base_queue: str, message: str, priority: int = 0):
        """Write message with priority (higher = more important)."""
        queue_name = f"{base_queue}_p{priority}"
        with Queue(queue_name, self.db_path) as q:
            q.write(message)
    
    def read_highest_priority(self, base_queue: str) -> str | None:
        """Read from highest priority queue first."""
        # Check queues in priority order
        for priority in range(9, -1, -1):
            queue_name = f"{base_queue}_p{priority}"
            with Queue(queue_name, self.db_path) as q:
                msg = q.read()
                if msg:
                    return msg
        return None

# For even more advanced use requiring database subclassing:
class CustomBroker(BrokerDB):
    """Example of extending BrokerDB directly (advanced users only)."""
    
    def custom_operation(self):
        # Access internal database methods
        with self._lock:
            # Your custom SQL operations here
            pass
```

See [`examples/`](examples/) directory for more patterns including async processing and custom runners.

## Performance & Tuning

- **Throughput**: 1000+ messages/second on typical hardware
- **Latency**: <10ms for write, <10ms for read
- **Scalability**: Tested with 100k+ messages per queue
- **Optimization**: Use `--all` for bulk operations

### Environment Variables

<details>
<summary>Click to see all configuration options</summary>

**Core Settings:**
- `BROKER_BUSY_TIMEOUT` - SQLite busy timeout in milliseconds (default: 5000)
- `BROKER_CACHE_MB` - SQLite page cache size in megabytes (default: 10)
  - Larger cache improves performance for repeated queries and large scans
  - Recommended: 10-50 MB for typical workloads, 100+ MB for heavy use
- `BROKER_SYNC_MODE` - SQLite synchronous mode: FULL, NORMAL, or OFF (default: FULL)
  - `FULL`: Maximum durability, safe against power loss (default)
  - `NORMAL`: ~25% faster writes, safe against app crashes, small risk on power loss
- `BROKER_WAL_AUTOCHECKPOINT` - WAL auto-checkpoint threshold in pages (default: 1000)
  - Controls when SQLite automatically moves WAL data to the main database
  - Default of 1000 pages β‰ˆ 1MB (with 1KB page size)
  - Increase for high-traffic scenarios to reduce checkpoint frequency
  - Set to 0 to disable automatic checkpoints (manual control only)
  - `OFF`: Fastest but unsafe - only for testing or non-critical data

**Read Performance:**
- `BROKER_READ_COMMIT_INTERVAL` - Number of messages to read before committing in `--all` mode (default: 1)
  - Default of 1 provides exactly-once delivery guarantee
  - Increase for better performance with at-least-once delivery guarantee

**Vacuum Settings:**
- `BROKER_AUTO_VACUUM` - Enable automatic vacuum of claimed messages (default: true)
- `BROKER_VACUUM_THRESHOLD` - Number of claimed messages before auto-vacuum triggers (default: 10000)
- `BROKER_VACUUM_BATCH_SIZE` - Number of messages to delete per vacuum batch (default: 1000)
- `BROKER_VACUUM_LOCK_TIMEOUT` - Seconds before a vacuum lock is considered stale (default: 300)

**Watcher Tuning:**
- `BROKER_INITIAL_CHECKS` - Number of checks with zero delay (default: 100)
- `BROKER_MAX_INTERVAL` - Maximum polling interval in seconds (default: 0.1)

**Database Naming:**
- `BROKER_DEFAULT_DB_NAME` - name of the broker database file (default: .broker.db)
- Corresponds to the -f/--file command line argument
- Can be a compound path including a single directory (e.g., ".subdirectory/broker.db")
- Applies to all scopes 

Example configurations:
```bash
# High-throughput configuration
export BROKER_SYNC_MODE=NORMAL
export BROKER_READ_COMMIT_INTERVAL=100
export BROKER_INITIAL_CHECKS=1000

# Low-latency configuration  
export BROKER_MAX_INTERVAL=0.01
export BROKER_CACHE_MB=50

# Power-saving configuration
export BROKER_INITIAL_CHECKS=50
export BROKER_MAX_INTERVAL=0.5

# Project scoping configuration
export BROKER_PROJECT_SCOPE=true
export BROKER_DEFAULT_DB_NAME=project-queue.db
```
</details>

## Project Scoping

SimpleBroker provides flexible database scoping modes to handle different use cases:

**Directory Scope (Default):** Each directory gets its own independent `.broker.db`  
**Project Scope:** Git-like upward search for shared project database  
**Global Scope:** Use a specific location for all broker operations

This allows multiple scripts and processes to share broker databases according to your needs.

### Basic Project Scoping

Enable project scoping by setting the environment variable:

```bash
export BROKER_PROJECT_SCOPE=true
```

With project scoping enabled, SimpleBroker searches upward from the current directory to find an existing `.broker.db` file. If found, it uses that database instead of creating a new one in the current directory.

```bash
# Project structure:
# /home/user/myproject/.broker.db  ← Project database
# /home/user/myproject/scripts/
# /home/user/myproject/scripts/worker.py

cd /home/user/myproject/scripts
export BROKER_PROJECT_SCOPE=true
broker write tasks "process data"  # Uses /home/user/myproject/.broker.db
```

**Benefits:**
- **Shared state**: All project scripts use the same message queue
- **Location independence**: Works from any subdirectory
- **Zero configuration**: Just set the environment variable
- **Git-like behavior**: Intuitive for developers familiar with version control

### Global Scope

Use a specific directory for all broker operations. Must be an absolute path.

```bash
export BROKER_DEFAULT_DB_LOCATION=/var/lib/myapp
# Uses: /var/lib/myapp/.broker.db for all operations
```

**Use cases:**
- **System-wide queues**: Central message broker for multiple applications
- **Shared storage**: Use network-mounted directories for distributed access
- **Privilege separation**: Store databases in controlled system directories

**Note:** `BROKER_DEFAULT_DB_LOCATION` corresponds to the `-d/--dir` command line argument and is ignored when `BROKER_PROJECT_SCOPE=true`.

### Project Database Names

Control the database filename used in any scoping mode:

```bash
export BROKER_DEFAULT_DB_NAME=project-queue.db
export BROKER_PROJECT_SCOPE=true
```
Now project scoping searches for `project-queue.db` instead of `.broker.db`.

To better support git-like operation, the BROKER_DEFAULT_DB_NAME can be a compound name including a single subdirectory:

```bash
export BROKER_DEFAULT_DB_NAME=.project/queue.db
export BROKER_PROJECT_SCOPE=true
```
Now project scoping searches for `.project/queue.db` instead of `.broker.db`.

**Use cases:**
- **Multiple projects**: Use different names to avoid conflicts
- **Descriptive names**: `analytics.db`, `build-queue.db`, etc.
- **Environment separation**: `dev-queue.db` vs `prod-queue.db`
- **Using config directories**: `.config/broker.db` vs `.broker.db`

**Note:** If no project database is found during the upward search, SimpleBroker will error out and ask you to run `broker init` to create one.

### Error Behavior When No Project Database Found

When project scoping is enabled but no project database is found, SimpleBroker will error out with a clear message:

```bash
export BROKER_PROJECT_SCOPE=true
cd /tmp/isolated_directory
broker write tasks "test message"
# Error: No SimpleBroker database found in project scope.
# Run 'broker init' to create a project database.
```

**This is intentional behavior** - SimpleBroker requires explicit initialization to avoid accidentally creating databases in unexpected locations.

### Project Initialization

Use `broker init` to create a project database in the current directory:

```bash
cd /home/user/myproject
broker init
# Creates /home/user/myproject/.broker.db
```

**With custom database name:**
```bash
export BROKER_DEFAULT_DB_NAME=project-queue.db
cd /home/user/myproject
broker init
# Creates /home/user/myproject/project-queue.db

# Force reinitialize existing database
broker init --force
```

**Important:** `broker init` always creates the database in the current working directory and does not accept `-d` or `-f` flags. It only respects `BROKER_DEFAULT_DB_NAME` for custom filenames.

**Directory structure examples:**
```bash
# Web application
webapp/
β”œβ”€β”€ .broker.db          ← Project queue (created by: broker init)
β”œβ”€β”€ frontend/
β”‚   └── build.py        ← Uses ../broker.db 
β”œβ”€β”€ backend/
β”‚   └── worker.py       ← Uses ../broker.db
└── scripts/
    └── deploy.sh       ← Uses ../broker.db

# Data pipeline
pipeline/
β”œβ”€β”€ queues.db           ← Custom name (BROKER_DEFAULT_DB_NAME=queues.db)
β”œβ”€β”€ extract/
β”‚   └── scraper.py      ← Uses ../queues.db
β”œβ”€β”€ transform/
β”‚   └── processor.py    ← Uses ../queues.db
└── load/
    └── uploader.py     ← Uses ../queues.db
```

### Precedence Rules

Database path resolution follows strict precedence rules:

1. **Explicit CLI flags** (`-f`, `-d`) - Always override all other settings
2. **Project scoping** (`BROKER_PROJECT_SCOPE=true`) - Git-like upward search, errors if not found
3. **Global scope** (`BROKER_DEFAULT_DB_LOCATION`) - Used when project scoping disabled
4. **Built-in defaults** - Current directory + `.broker.db`

**Environment variable interactions:**
- `BROKER_DEFAULT_DB_NAME` applies to all scoping modes
- `BROKER_DEFAULT_DB_LOCATION` ignored when `BROKER_PROJECT_SCOPE=true`
- `BROKER_PROJECT_SCOPE=true` takes precedence over global scope settings

**Examples:**

```bash
export BROKER_PROJECT_SCOPE=true
export BROKER_DEFAULT_DB_NAME=project.db

# 1. Explicit absolute path (highest precedence)
broker -f /explicit/path/queue.db write test "msg"
# Uses: /explicit/path/queue.db

# 2. Explicit directory + filename
broker -d /explicit/dir -f custom.db write test "msg"  
# Uses: /explicit/dir/custom.db

# 3. Project scoping finds existing database
# (assuming /home/user/myproject/.config/project.db exists)
cd /home/user/myproject/subdir
broker write test "msg"
# Uses: /home/user/myproject/.config/project.db

# 4. Project scoping enabled but no database found (errors out)
cd /tmp/isolated
broker write test "msg"
# Error: No SimpleBroker database found. Run 'broker init' to create one.

# 5. Built-in defaults (no project scoping)
unset BROKER_PROJECT_SCOPE BROKER_DEFAULT_DB_NAME
broker write test "msg"
# Uses: /tmp/isolated/.broker.db
```

**Decision flowchart:**
```
CLI flags (-f absolute path)?
β”œβ”€ YES β†’ Use absolute path
└─ NO β†’ CLI flags (-d + -f)?
   β”œβ”€ YES β†’ Use directory + filename
   └─ NO β†’ BROKER_PROJECT_SCOPE=true?
      β”œβ”€ NO β†’ Use env defaults or built-in defaults
      └─ YES β†’ Search upward for database
         β”œβ”€ FOUND β†’ Use project database
         └─ NOT FOUND β†’ Error with message to run 'broker init'
```

### Security Notes

Project scoping includes several security measures to prevent unauthorized access:

**Boundary detection:**
- Stops at filesystem root (`/` on Unix, `C:\` on Windows)
- Respects filesystem mount boundaries
- Maximum 100 directory levels to prevent infinite loops

**Database validation:**
- Only uses files with SimpleBroker magic string
- Validates database schema and structure
- Rejects corrupted or foreign databases

**Permission checking:**
- Respects file system access controls
- Skips directories with permission issues
- Validates read/write access before using database

**Traversal limits:**
- Maximum 100 directory levels to prevent infinite loops
- Prevents symlink loop exploitation
- Uses existing path resolution security

**Warnings:**

⚠️ **Project scoping allows accessing databases in parent directories.** Only enable in trusted environments where this behavior is desired.

⚠️ **Database sharing:** Multiple processes will share the same database when project scoping is enabled. Ensure your application handles concurrent access appropriately.

⚠️ **No automatic fallback:** When project scoping is enabled but no database is found, SimpleBroker will error out rather than creating a database automatically. You must run `broker init` to create a project database.

**Best practices:**
```bash
# Safe: Enable only in controlled environments
if [[ "$PWD" == /home/user/myproject/* ]]; then
    export BROKER_PROJECT_SCOPE=true
fi

# Safe: Use explicit paths for sensitive operations
broker -f /secure/path/queue.db write secrets "sensitive data"

# Safe: Validate environment before enabling
if [[ -r "/home/user/myproject/.broker.db" ]]; then
    export BROKER_PROJECT_SCOPE=true
fi
```

### Common Use Cases

**Build systems:**
```bash
# Root project queue for build coordination
cd /project && broker init
export BROKER_PROJECT_SCOPE=true

# Frontend build (any subdirectory)
cd /project/frontend
broker write build-tasks "compile assets"

# Backend build (different subdirectory)  
cd /project/backend
broker read build-tasks  # Gets "compile assets"
```

**Data pipelines:**
```bash
# Pipeline coordination
export BROKER_PROJECT_SCOPE=true
export BROKER_DEFAULT_DB_NAME=pipeline.db
cd /data-pipeline && broker init

# Extract phase
cd /data-pipeline/extractors
broker write raw-data "/path/to/file1.csv"

# Transform phase  
cd /data-pipeline/transformers
broker read raw-data  # Gets "/path/to/file1.csv"
broker write clean-data "/path/to/processed1.json"

# Load phase
cd /data-pipeline/loaders  
broker read clean-data  # Gets "/path/to/processed1.json"
```

**Development workflows:**
```bash
# Development environment setup
cd ~/myproject
export BROKER_PROJECT_SCOPE=true
export BROKER_DEFAULT_DB_NAME=dev-queue.db
broker init

# Testing from any location
cd ~/myproject/tests
broker write test-data "integration-test-1"

# Application reads from any location
cd ~/myproject/src
broker read test-data  # Gets "integration-test-1"
```

**CI/CD integration:**
```bash
# Build script (in any project subdirectory)
#!/bin/bash
export BROKER_PROJECT_SCOPE=true
export BROKER_DEFAULT_DB_NAME=ci-queue.db

# Ensure project queue exists
if ! broker list >/dev/null 2>&1; then
    broker init
fi

# Add build tasks
broker write builds "compile-frontend"
broker write builds "run-tests" 
broker write builds "build-docker"
broker write builds "deploy-staging"
```

**Multi-service coordination:**
```bash
# Service discovery queue
export BROKER_PROJECT_SCOPE=true
export BROKER_DEFAULT_DB_NAME=services.db

# Service A registers itself
cd /app/service-a
broker write registry "service-a:healthy:port:8080"

# Service B discovers Service A
cd /app/service-b  
broker peek registry  # Sees "service-a:healthy:port:8080"
```

## Architecture & Technical Details

<details>
<summary>Database Schema and Internals</summary>

SimpleBroker uses a single SQLite database with Write-Ahead Logging (WAL) enabled:

```sql
CREATE TABLE messages (
    id INTEGER PRIMARY KEY AUTOINCREMENT,  -- Ensures strict FIFO ordering
    queue TEXT NOT NULL,
    body TEXT NOT NULL,
    ts INTEGER NOT NULL UNIQUE,            -- Unique hybrid timestamp serves as message ID
    claimed INTEGER DEFAULT 0              -- For read optimization
);
```

**Key design decisions:**
- The `id` column guarantees global FIFO ordering across all processes
- The `ts` column serves as the public message identifier with uniqueness enforced
- WAL mode enables concurrent readers and writers
- Claim-based deletion enables ~3x faster reads
</details>

<details>
<summary>Concurrency and Delivery Guarantees</summary>

**Exactly-Once Delivery:** Read and move operations use atomic `DELETE...RETURNING` operations. A message is delivered exactly once to a consumer by default.

**FIFO Ordering:** Messages are always read in the exact order they were written to the database, regardless of which process wrote them. This is guaranteed by SQLite's autoincrement and row-level locking.

**Message Lifecycle:**
1. **Write Phase**: Message inserted with unique timestamp
2. **Claim Phase**: Read marks message as "claimed" (fast, logical delete)
3. **Vacuum Phase**: Background process permanently removes claimed messages

This optimization is transparent - messages are still delivered exactly once.
</details>

<details>
<summary>Security Considerations</summary>

- **Queue names**: Validated (alphanumeric + underscore + hyphen + period only)
- **Message size**: Limited to 10MB
- **Database files**: Created with 0600 permissions (user-only)
- **SQL injection**: Prevented via parameterized queries
- **Message content**: Not validated - can contain any text including shell metacharacters
</details>

## Development & Contributing

SimpleBroker uses [`uv`](https://github.com/astral-sh/uv) for package management and [`ruff`](https://github.com/astral-sh/ruff) for linting.

```bash
# Clone the repository
git clone git@github.com:VanL/simplebroker.git
cd simplebroker

# Install development environment
uv sync --all-extras

# Run tests
uv run pytest              # Fast tests only
uv run pytest -m ""        # All tests including slow ones

# Lint and format
uv run ruff check --fix simplebroker tests
uv run ruff format simplebroker tests
uv run mypy simplebroker
```

**Contributing guidelines:**
1. Keep it simple - the entire codebase should stay understandable
2. Maintain backward compatibility
3. Add tests for new features
4. Update documentation
5. Run linting and tests before submitting PRs

## License

MIT Β© 2025 Van Lindberg

## Acknowledgments

Built with Python, SQLite, and the Unix philosophy.

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "simplebroker",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.10",
    "maintainer_email": null,
    "keywords": "broker, cli, message-queue, queue, sqlite",
    "author": null,
    "author_email": "Van Lindberg <van.lindberg@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/14/7a/c855ed7cb0b954e68047b0bd26c01170edb516bb8fa16e3871957ad62b73/simplebroker-2.7.1.tar.gz",
    "platform": null,
    "description": "# SimpleBroker\n\n  [![CI](https://github.com/VanL/simplebroker/actions/workflows/test.yml/badge.svg)](https://github.com/VanL/simplebroker/actions/workflows/test.yml)\n  [![codecov](https://codecov.io/gh/VanL/simplebroker/branch/main/graph/badge.svg)](https://codecov.io/gh/VanL/simplebroker)\n  [![PyPI version](https://badge.fury.io/py/simplebroker.svg)](https://badge.fury.io/py/simplebroker)\n  [![Python versions](https://img.shields.io/pypi/pyversions/simplebroker.svg)](https://pypi.org/project/simplebroker/)\n\n*A lightweight message queue backed by SQLite. No setup required, just works.*\n\n```bash\n$ pipx install simplebroker\n$ broker write tasks \"ship it \ud83d\ude80\"\n$ broker read tasks\nship it \ud83d\ude80\n```\n\nSimpleBroker is a zero-configuration, no-dependency message queue that runs anywhere Python runs. It's designed to be simple enough to understand in an afternoon, yet powerful enough for real work.\n\n## Table of Contents\n\n- [Features](#features)\n- [Use Cases](#use-cases)\n- [Installation](#installation)\n- [Quick Start](#quick-start)\n- [Command Reference](#command-reference)\n  - [Global Options](#global-options)\n  - [Commands](#commands)\n  - [Command Options](#command-options)\n  - [Exit Codes](#exit-codes)\n- [Critical Safety Notes](#\ufe0f-critical-safety-notes)\n  - [Potential Data Loss with `watch`](#potential-data-loss-with-watch)\n  - [Safe Message Handling](#safe-message-handling)\n- [Core Concepts](#core-concepts)\n  - [Timestamps as Message IDs](#timestamps-as-message-ids)\n  - [JSON for Safe Processing](#json-for-safe-processing)\n  - [Checkpoint-based Processing](#checkpoint-based-processing)\n- [Common Patterns](#common-patterns)\n- [Real-time Queue Watching](#real-time-queue-watching)\n- [Python API](#python-api)\n- [Performance & Tuning](#performance--tuning)\n- [Project Scoping](#project-scoping)\n  - [Basic Project Scoping](#basic-project-scoping)\n  - [Project Database Names](#project-database-names)\n  - [Error Behavior](#error-behavior-when-no-project-database-found)\n  - [Project Initialization](#project-initialization)\n  - [Precedence Rules](#precedence-rules)\n  - [Security Notes](#security-notes)\n  - [Common Use Cases](#common-use-cases)\n- [Architecture & Technical Details](#architecture--technical-details)\n- [Development & Contributing](#development--contributing)\n- [License](#license)\n\n## Features\n\n- **Zero configuration** - No servers, daemons, or complex setup\n- **SQLite-backed** - Rock-solid reliability with true ACID guarantees\n- **Concurrent safe** - Multiple processes can read/write simultaneously\n- **Simple CLI** - Intuitive commands that work with pipes and scripts\n- **Portable** - Each directory gets its own isolated `.broker.db`\n- **Fast** - 1000+ messages/second throughput\n- **Lightweight** - ~2500 lines of code, no external dependencies\n- **Real-time** - Built-in watcher for event-driven workflows\n\n## Use Cases\n\n- **Shell Scripting:** Decouple stages of a complex script\n- **Background Jobs:** Manage tasks for cron jobs or systemd services\n- **Development:** Simple message queue for local development without Docker\n- **Data Pipelines:** Pass file paths or data chunks between processing steps\n- **CI/CD Pipelines:** Coordinate build stages without external dependencies\n- **Log Processing:** Buffer logs before aggregation or analysis\n- **Simple IPC:** Communication between processes on the same machine\n\n**Good for:** Scripts, cron jobs, small services, development  \n**Not for:** Distributed systems, pub/sub, high-frequency trading\n\n## Installation\n\n```bash\n# Use pipx for global installation (recommended)\npipx install simplebroker\n\n# Or install with uv to use as a library\nuv add simplebroker\n\n# Or with pip\npip install simplebroker\n```\n\nThe CLI is available as both `broker` and `simplebroker`.\n\n**Requirements:**\n- Python 3.10+\n- SQLite 3.35+ (released March 2021) - required for `DELETE...RETURNING` support\n\n## Quick Start\n\n```bash\n# Write a message\n$ broker write myqueue \"Hello, World!\"\n\n# Read the message (removes it)\n$ broker read myqueue\nHello, World!\n\n# Write from stdin\n$ echo \"another message\" | broker write myqueue -\n\n# Read all messages at once\n$ broker read myqueue --all\n\n# Peek without removing\n$ broker peek myqueue\n\n# Move messages between queues\n$ broker move myqueue processed\n$ broker move errors retry --all\n\n# list all queues\n$ broker list\nmyqueue: 3\nprocessed: 1\n\n# Broadcast to all queues\n$ broker broadcast \"System maintenance at 5pm\"\n# Target only matching queues using fnmatch-style globs\n$ broker broadcast --pattern 'jobs-*' \"Pipeline paused\"\n\n# Clean up when done\n$ broker --cleanup\n```\n\n## Command Reference\n\n### Global Options\n\n- `-d, --dir PATH` - Use PATH instead of current directory\n- `-f, --file NAME` - Database filename or absolute path (default: `.broker.db`)\n  - If an absolute path is provided, the directory is extracted automatically\n  - Cannot be used with `-d` if the directories don't match\n- `-q, --quiet` - Suppress non-error output\n- `--cleanup` - Delete the database file and exit\n- `--vacuum` - Remove claimed messages and exit\n- `--status` - Show global message count, last timestamp, and DB size (`--status --json` for JSON output)\n- `--version` - Show version information\n- `--help` - Show help message\n\n### Commands\n\n| Command | Description |\n|---------|-------------|\n| `write <queue> <message\\|->` | Add message to queue (use `-` for stdin) |\n| `read <queue> [options]` | Remove and return message(s) |\n| `peek <queue> [options]` | Return message(s) without removing |\n| `move <source> <dest> [options]` | Atomically transfer messages between queues |\n| `list [--stats]` | Show queues and message counts |\n| `delete <queue> [-m <id>]` | Delete queue or specific message (marks for removal; use `--vacuum` to reclaim space) |\n| `delete --all` | Delete all queues (marks for removal; use `--vacuum` to reclaim space) |\n| `broadcast <message\\|->` | Send message to all existing queues |\n| `watch <queue> [options]` | Watch queue for new messages |\n| `alias <add|remove|list|->` | Manage queue aliases |\n| `init [--force]` | Initialize SimpleBroker database in current directory (does not accept `-d` or `-f` flags) |\n\n#### Queue Aliases\n\nUse aliases when two agents refer to the same underlying queue with different names. Aliases are stored in the database, persist across processes, and update atomically.\n\n```bash\n$ broker alias add task1.outbox agent1-to-agent2\n$ broker alias add task2.inbox agent1-to-agent2\n$ broker write @task1.outbox \"Job ready\"\n$ broker read @task2.inbox\nJob ready\n$ broker alias list\ntask1.outbox -> agent1-to-agent2\ntask2.inbox -> agent1-to-agent2\n$ broker alias list --target agent1-to-agent2\ntask1.outbox -> agent1-to-agent2\ntask2.inbox -> agent1-to-agent2\n$ broker write task1.outbox \"goes to literal queue\"\n$ broker read task1.outbox\ngoes to literal queue\n$ broker alias remove task1.outbox\n```\n\n- Plain queue names (`task1.outbox`) always refer to the literal queue. Use the\n  `@` prefix (`@task1.outbox`) to opt into alias resolution\u2014if the alias is not\n  defined the command fails.\n- Alias names are plain queue names (no `@` prefix); when *using* an alias on the CLI, prefix it with `@`.\n- Use `alias list --target <queue>` to see which aliases point to a specific queue (reverse lookup).\n- A target must be a real queue name (not another alias). Attempts to alias an alias or create cycles raise `ValueError`.\n- Removing an alias does not affect stored messages; they remain under the canonical queue name.\n\n### Command Options\n\n**Common options for read/peek/move:**\n- `--all` - Process all messages (CLI moves up to 1,000,000 per invocation; rerun for larger queues or use the Python API generators)\n- `--json` - Output as line-delimited JSON (includes timestamps)\n- `-t, --timestamps` - Include timestamps in output\n- `-m <id>` - Target specific message by its 19-digit timestamp ID\n- `--since <timestamp>` - Process messages newer than timestamp\n\n**Watch options:**\n- `--peek` - Monitor without consuming\n- `--move <dest>` - Continuously drain to destination queue\n- `--quiet` - Suppress startup message\n\n**Timestamp formats for `--since`:**\n- ISO 8601: `2024-01-15T14:30:00Z` or `2024-01-15` (midnight UTC)\n- Unix seconds: `1705329000` or `1705329000s`\n- Unix milliseconds: `1705329000000ms`\n- Unix nanoseconds/Native hybrid: `1837025672140161024` or `1837025672140161024ns`\n\n**Best practice:** Heuristics are used to distinguish between different values for interactive use, but explicit suffixes (s/ms/ns) are recommended for clarity if referring to particular times. \n\n### Exit Codes\n- `0` - Success (returns 0 even when no messages match filters like `--since`)\n- `1` - General error (e.g., database access error, invalid arguments)\n- `2` - Queue empty, no matching messages, or invalid message ID format (only when queue is actually empty, no messages match the criteria, or the provided message ID has an invalid format)\n\n**Note:** The `delete` command marks messages as \"claimed\" for performance. Use `--vacuum` to permanently remove them.\n\n## Critical Safety Notes\n\n### Safe Message Handling\n\nMessages can contain any characters including newlines, control characters, and shell metacharacters:\n- **Shell injection risks** - When piping output to shell commands, malicious message content could execute unintended commands\n- **Special characters** - Messages containing newlines or other special characters can break shell pipelines that expect single-line output\n- **Queue names** - Limited to alphanumeric + underscore/hyphen/period (cannot start with hyphen or period)\n- **Message size** - Limited to 10MB\n\n**Always use `--json` for safe handling** - see examples below.\n\n### Robust message handling with `watch`\n\nWhen using `watch` in its default consuming mode, messages are **permanently removed** from the queue *before* your script or handler processes them. If your script fails or crashes, **the message is lost**. For critical data, you must use a safe processing pattern (move or peek-then-delete) that ensures that your data is not removed until you can acknowledge receipt. Example:\n\n```bash\n#!/bin/bash\n# safe-worker.sh - A robust worker using the peek-and-acknowledge pattern\n\n# Watch in peek mode, which does not remove messages\nbroker watch tasks --peek --json | while IFS= read -r line; do\n    message=$(echo \"$line\" | jq -r '.message')\n    timestamp=$(echo \"$line\" | jq -r '.timestamp')\n    \n    echo \"Processing message ID: $timestamp\"\n    if process_task \"$message\"; then\n        # Success: remove the specific message by its unique ID\n        broker delete tasks -m \"$timestamp\"\n    else\n        echo \"Failed to process, message remains in queue for retry.\" >&2\n        # Optional: move to a dead-letter queue\n        # echo \"$message\" | broker write failed_tasks -\n    fi\ndone\n```\n\n## Core Concepts\n\n### Timestamps as Message IDs\nEvery message receives a unique 64-bit number that serves dual purposes as a timestamp and unique message ID. Timestamps are always included in JSON output. \nTimestamps can be included in regular output by passing the -t/--timestamps flag. \n\nTimestamps are:\n- **Unique** - No collisions even with concurrent writers (enforced by database constraint)\n- **Time-ordered** - Natural chronological sorting\n- **Efficient** - 64-bit integers, not UUIDs\n- **Meaningful** - Can extract creation time from the ID\n\nThe format:\n- High 52 bits: microseconds since Unix epoch\n- Low 12 bits: logical counter for sub-microsecond ordering\n- Similar to Twitter's Snowflake IDs or UUID7\n\n\n### JSON for Safe Processing\n\nMessages with newlines or special characters can break shell pipelines. Use `--json` to avoid shell issues:\n\n```bash\n# Problem: newlines break line counting\n$ broker write alerts \"ERROR: Database connection failed\\nRetrying in 5 seconds...\"\n$ broker read alerts | wc -l\n2  # Wrong! One message counted as two\n\n# Solution: JSON output (line-delimited)\n$ broker write alerts \"ERROR: Database connection failed\\nRetrying in 5 seconds...\"\n$ broker read alerts --json\n{\"message\": \"ERROR: Database connection failed\\nRetrying in 5 seconds...\", \"timestamp\": 1837025672140161024}\n\n# Parse safely with jq\n$ broker read alerts --json | jq -r '.message'\nERROR: Database connection failed\nRetrying in 5 seconds...\n```\n\n### Checkpoint-based Processing\n\nUse `--since` for resumable processing:\n\n```bash\n# Save checkpoint after processing\n$ result=$(broker read tasks --json)\n$ checkpoint=$(echo \"$result\" | jq '.timestamp')\n\n# Resume from checkpoint\n$ broker read tasks --all --since \"$checkpoint\"\n\n# Or use human-readable timestamps\n$ broker read tasks --all --since \"2024-01-15T14:30:00Z\"\n```\n\n## Common Patterns\n\n<details>\n<summary>Basic Worker Loop</summary>\n\n```bash\nwhile msg=$(broker read work 2>/dev/null); do\n    echo \"Processing: $msg\"\n    # do work...\ndone\n```\n</details>\n\n<details>\n<summary>Multiple Queues</summary>\n\n```bash\n# Different queues for different purposes\n$ broker write emails \"send welcome to user@example.com\"\n$ broker write logs \"2023-12-01 system started\"\n$ broker write metrics \"cpu_usage:0.75\"\n\n$ broker list\nemails: 1\nlogs: 1\nmetrics: 1\n```\n</details>\n\n<details>\n<summary>Fan-out with Broadcast</summary>\n\n```bash\n# Send to all queues at once\n$ broker broadcast \"shutdown signal\"\n\n# Each worker reads from its own queue\n$ broker read worker1  # -> \"shutdown signal\"\n$ broker read worker2  # -> \"shutdown signal\"\n```\n\n**Note:** Broadcast sends to all *existing* queues at execution time. There's a small race window for queues created during broadcast.\n\n**Alias interaction:** Broadcast operations ignore aliases and work only on literal queue names. Pattern matching with `--pattern` matches queue names, not alias names.\n</details>\n\n<details>\n<summary>Unix Tool Integration</summary>\n\n```bash\n# Store command output\n$ df -h | broker write monitoring -\n\n# Process files through a queue\n$ find . -name \"*.log\" | while read f; do\n    broker write logfiles \"$f\"\ndone\n\n# Parallel processing with xargs\n$ broker read logfiles --all | xargs -P 4 -I {} process_log {}\n\n# Remote queue via SSH\n$ echo \"remote task\" | ssh server \"cd /app && broker write tasks -\"\n$ ssh server \"cd /app && broker read tasks\"\n\n\n### Integration with Unix Tools\n\n```bash\n# Pipe to queue\n$ df -h | broker write monitoring -\n\n# Store command output\n$ df -h | broker write monitoring -\n$ broker peek monitoring\n\n# Process files through a queue\n$ find . -name \"*.log\" | while read f; do\n    broker write logfiles \"$f\"\ndone\n\n# Parallel processing with xargs\n$ broker read logfiles --all | xargs -P 4 -I {} process_log {}\n\n# Use absolute paths for databases in specific locations\n$ broker -f /var/lib/myapp/queue.db write tasks \"backup database\"\n$ broker -f /var/lib/myapp/queue.db read tasks\n\n# Reserving work using move\n$ msg_json=$(broker move todo in-process --json 2>/dev/null)\n  if [ -n \"$msg_json\" ]; then\n      msg_id=$(echo \"$msg_json\" | jq -r '.[0].id')\n      msg_data=$(echo \"$msg_json\" | jq -r '.[0].data')\n\n      echo \"Processing message $msg_id: $msg_data\"\n\n      # Process the message here\n      # ...\n\n      # Delete after successful processing\n      broker delete in-process -m \"$msg_id\"\n  else\n      echo \"No messages to process\"\n  fi\n```\n</details>\n\n<details>\n<summary>Dead Letter Queue Pattern</summary>\n\n```bash\n# Process messages, moving failures to DLQ\nwhile msg=$(broker read tasks); do\n    if ! process_task \"$msg\"; then\n        echo \"$msg\" | broker write dlq -\n    fi\ndone\n\n# Retry failed messages\nbroker move dlq tasks --all\n```\n</details>\n\n<details>\n<summary>Resilient Worker with Checkpointing</summary>\n\n```bash\n#!/bin/bash\n# resilient-worker.sh - Process messages with checkpoint recovery\n\nQUEUE=\"events\"\nCHECKPOINT_FILE=\"/var/lib/myapp/checkpoint\"\nBATCH_SIZE=100\n\n# Load last checkpoint (default to 0 if first run)\nlast_checkpoint=$(cat \"$CHECKPOINT_FILE\" 2>/dev/null || echo 0)\necho \"Starting from checkpoint: $last_checkpoint\"\n\nwhile true; do\n    # Check if there are messages newer than our checkpoint\n    if ! broker peek \"$QUEUE\" --json --since \"$last_checkpoint\" >/dev/null 2>&1; then\n        echo \"No new messages, sleeping...\"\n        sleep 5\n        continue\n    fi\n    \n    echo \"Processing new messages...\"\n    \n    # Process messages one at a time to avoid data loss\n    processed=0\n    while [ $processed -lt $BATCH_SIZE ]; do\n        # Read exactly one message newer than checkpoint\n        message_data=$(broker read \"$QUEUE\" --json --since \"$last_checkpoint\" 2>/dev/null)\n        \n        # Check if we got a message\n        if [ -z \"$message_data\" ]; then\n            echo \"No more messages to process\"\n            break\n        fi\n        \n        # Extract message and timestamp\n        message=$(echo \"$message_data\" | jq -r '.message')\n        timestamp=$(echo \"$message_data\" | jq -r '.timestamp')\n        \n        # Process the message\n        echo \"Processing: $message\"\n        if ! process_event \"$message\"; then\n            echo \"Error processing message, will retry on next run\"\n            # Exit without updating checkpoint - failed message will be reprocessed\n            exit 1\n        fi\n        \n        # Atomically update checkpoint ONLY after successful processing\n        echo \"$timestamp\" > \"$CHECKPOINT_FILE.tmp\"\n        mv \"$CHECKPOINT_FILE.tmp\" \"$CHECKPOINT_FILE\"\n        \n        # Update our local variable for next iteration\n        last_checkpoint=\"$timestamp\"\n        processed=$((processed + 1))\n    done\n    \n    if [ $processed -eq 0 ]; then\n        echo \"No messages processed, sleeping...\"\n        sleep 5\n    else\n        echo \"Batch complete, processed $processed messages\"\n    fi\ndone\n```\n\nKey features:\n- **No data loss from pipe buffering** - Reads messages one at a time\n- **Atomic checkpoint updates** - Uses temp file + rename for crash safety\n- **Per-message checkpointing** - Updates checkpoint after each successful message\n- **Batch processing** - Processes up to BATCH_SIZE messages at a time for efficiency\n- **Failure recovery** - On error, exits without updating checkpoint so failed message is retried\n</details>\n\n## Real-time Queue Watching\n\nThe `watch` command provides three modes for monitoring queues:\n\n1. **Consume** (default): Process and remove messages from the queue\n2. **Peek** (`--peek`): Monitor messages without removing them\n3. **Move** (`--move DEST`): Drain ALL messages to another queue\n\n```bash\n# Start watching a queue (consumes messages)\n$ broker watch tasks\n\n# Watch without consuming (peek mode)\n$ broker watch tasks --peek\n\n# Watch with JSON output (timestamps always included)\n$ broker watch tasks --json\n{\"message\": \"task 1\", \"timestamp\": 1837025672140161024}\n\n# Continuously drain one queue to another\n$ broker watch source_queue --move destination_queue\n```\n\nThe watcher uses an efficient polling strategy:\n- **Burst mode**: First 100 checks with zero delay for immediate message pickup\n- **Smart backoff**: Gradually increases polling interval to 0.1s maximum\n- **Low overhead**: Uses SQLite's data_version to detect changes without querying\n- **Graceful shutdown**: Handles Ctrl-C (SIGINT) cleanly\n\n### Move Mode (`--move`)\n\nThe `--move` option provides continuous queue-to-queue message migration:\n\n```bash\n# Like: tail -f /var/log/app.log | tee -a /var/log/processed.log\n$ broker watch source_queue --move dest_queue\n```\n\nKey characteristics:\n- **Drains entire queue**: Moves ALL messages from source to destination\n- **Atomic operation**: Each message is atomically moved before being displayed\n- **No filtering**: Incompatible with `--since` (would leave messages stranded)\n- **Concurrent safe**: Multiple move watchers can run safely without data loss\n\n## Python API\n\nSimpleBroker also provides a Python API for more advanced use cases:\n\n```python\nfrom simplebroker import Queue, QueueWatcher\nfrom simplebroker.db import DBConnection\nimport logging\n\n# Basic usage\nwith Queue(\"tasks\") as q:\n    q.write(\"process order 123\")\n    message = q.read()  # Returns: \"process order 123\"\n\n# Safe peek-and-acknowledge pattern (recommended for critical data)\ndef process_message(message: str, timestamp: int):\n    \"\"\"Process message and acknowledge only on success.\"\"\"\n    logging.info(f\"Processing: {message}\")\n    \n    # Simulate processing that might fail\n    if \"error\" in message:\n        raise ValueError(\"Simulated processing failure\")\n    \n    # If we get here, processing succeeded\n    # Now explicitly acknowledge by deleting the message\n    with Queue(\"tasks\") as q:\n        q.delete(message_id=timestamp)\n    logging.info(f\"Message {timestamp} acknowledged\")\n\ndef handle_error(exception: Exception, message: str, timestamp: int) -> bool:\n    \"\"\"Log error and optionally move to dead-letter queue.\"\"\"\n    logging.error(f\"Failed to process message {timestamp}: {exception}\")\n    # Message remains in queue for retry since we're using peek=True\n    \n    # Optional: After N retries, move to dead-letter queue\n    # Queue(\"errors\").write(f\"{timestamp}:{message}:{exception}\")\n    \n    return True  # Continue watching\n\n# Use peek=True for safe mode - messages aren't removed until explicitly acknowledged\n```\n\n### Generating timestamps without writing\n\nSometimes you need a broker-compatible timestamp/ID before enqueueing a message (for logging, correlation IDs, or backpressure planning). You can ask SimpleBroker to generate one without writing a row:\n\n```python\nwith DBConnection(\"/path/to/.broker.db\") as conn:\n    db = conn.get_connection()\n    ts = db.generate_timestamp()  # alias: db.get_ts()\n\nqueue = Queue(\"tasks\", db_path=\"/path/to/.broker.db\")\nts2 = queue.generate_timestamp()  # alias: queue.get_ts()\n\nprint(ts2 > ts)  # Monotonic within a database\n```\n\nNotes:\n- Timestamps are monotonic per database and match what `Queue.write()` uses internally.\n- Generating a timestamp does not reserve a slot; it simply gives you the next ID.\nwatcher = QueueWatcher(\n    queue=Queue(\"tasks\"),\n    handler=process_message,\n    error_handler=handle_error,\n    peek=True  # True = safe mode - just observe, don't consume\n)\n\n# Start watching (blocks until stopped)\ntry:\n    watcher.watch()\nexcept KeyboardInterrupt:\n    print(\"Watcher stopped by user\")\n```\n\n### Thread-Based Background Processing\n\nUse `run_in_thread()` to run watchers in background threads:\n\n```python\nfrom pathlib import Path\nfrom simplebroker import QueueWatcher\n\ndef handle_message(msg: str, ts: int):\n    print(f\"Processing: {msg}\")\n\n# Create watcher with database path (recommended for thread safety)\nwatcher = QueueWatcher(\n    Path(\"my.db\"),\n    \"orders\",\n    handle_message\n)\n\n# Start in background thread\nthread = watcher.run_in_thread()\n\n# Do other work...\n\n# Stop when done\nwatcher.stop()\nthread.join()\n```\n\n### Context Manager Support\n\nFor cleaner resource management, watchers can be used as context managers which automatically start the thread and ensure proper cleanup:\n\n```python\nimport time\nfrom simplebroker import QueueWatcher\n\ndef handle_message(msg: str, ts: int):\n    print(f\"Received: {msg}\")\n\n# Automatic thread management with context manager\nwith QueueWatcher(\"my.db\", \"notifications\", handle_message) as watcher:\n    # Thread is started automatically\n    # Do other work while watcher processes messages\n    time.sleep(10)\n    \n# Thread is automatically stopped and joined when exiting the context\n# Ensures proper cleanup even if an exception occurs\n```\n\nSimpleBroker is synchronous by design for simplicity, but can be easily integrated with async applications:\n\n```python\nimport asyncio\nimport concurrent.futures\nfrom simplebroker import Queue\n\nclass AsyncQueue:\n    \"\"\"Async wrapper for SimpleBroker Queue using thread pool executor.\"\"\"\n    \n    def __init__(self, queue_name: str, db_path: str = \".broker.db\"):\n        self.queue_name = queue_name\n        self.db_path = db_path\n        self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)\n        \n    async def write(self, message: str) -> None:\n        \"\"\"Write message asynchronously.\"\"\"\n        loop = asyncio.get_event_loop()\n        def _write():\n            with Queue(self.queue_name, self.db_path) as q:\n                q.write(message)\n        await loop.run_in_executor(self._executor, _write)\n    \n    async def read(self) -> str | None:\n        \"\"\"Read message asynchronously.\"\"\"\n        loop = asyncio.get_event_loop()\n        def _read():\n            with Queue(self.queue_name, self.db_path) as q:\n                return q.read()\n        return await loop.run_in_executor(self._executor, _read)\n\n# Usage\nasync def main():\n    tasks_queue = AsyncQueue(\"tasks\")\n    \n    # Write messages concurrently\n    await asyncio.gather(\n        tasks_queue.write(\"Task 1\"),\n        tasks_queue.write(\"Task 2\"),\n        tasks_queue.write(\"Task 3\")\n    )\n    \n    # Read messages\n    while msg := await tasks_queue.read():\n        print(f\"Got: {msg}\")\n```\n\nFor advanced use cases requiring direct database access, you can use the `DBConnection` context manager:\n\n```python\nfrom simplebroker.db import DBConnection\n\n# Safe database connection management for cross-queue operations\nwith DBConnection(\"myapp.db\") as conn:\n    db = conn.get_connection()\n    \n    # Perform cross-queue operations\n    queues = db.get_queue_stats()\n    for queue_name, stats in queues.items():\n        print(f\"{queue_name}: {stats['pending']} pending\")\n    \n    # Broadcast to all queues\n    db.broadcast(\"System maintenance at 5pm\")\n```\n\n**Key async integration strategies:**\n\n1. **Use Queue API**: Prefer the high-level Queue class for single-queue operations\n2. **Thread Pool Executor**: Run SimpleBroker's sync methods in threads\n3. **One Queue Per Operation**: Create fresh Queue instances for thread safety\n4. **DBConnection for Advanced Use**: Use DBConnection context manager for cross-queue operations\n\nSee [`examples/async_wrapper.py`](examples/async_wrapper.py) for a complete async wrapper implementation including:\n- Async context manager for proper cleanup\n- Background watcher with asyncio coordination\n- Streaming message consumption\n- Concurrent queue operations\n\n### Advanced: Custom Extensions\n\n**Note:** This is an advanced example showing how to extend SimpleBroker's internals. Most users should use the standard Queue API.\n\n```python\nfrom simplebroker.db import BrokerDB, DBConnection\nfrom simplebroker import Queue\n\nclass PriorityQueueSystem:\n    \"\"\"Example: Priority queue system using multiple standard queues.\"\"\"\n    \n    def __init__(self, db_path: str = \".broker.db\"):\n        self.db_path = db_path\n    \n    def write_with_priority(self, base_queue: str, message: str, priority: int = 0):\n        \"\"\"Write message with priority (higher = more important).\"\"\"\n        queue_name = f\"{base_queue}_p{priority}\"\n        with Queue(queue_name, self.db_path) as q:\n            q.write(message)\n    \n    def read_highest_priority(self, base_queue: str) -> str | None:\n        \"\"\"Read from highest priority queue first.\"\"\"\n        # Check queues in priority order\n        for priority in range(9, -1, -1):\n            queue_name = f\"{base_queue}_p{priority}\"\n            with Queue(queue_name, self.db_path) as q:\n                msg = q.read()\n                if msg:\n                    return msg\n        return None\n\n# For even more advanced use requiring database subclassing:\nclass CustomBroker(BrokerDB):\n    \"\"\"Example of extending BrokerDB directly (advanced users only).\"\"\"\n    \n    def custom_operation(self):\n        # Access internal database methods\n        with self._lock:\n            # Your custom SQL operations here\n            pass\n```\n\nSee [`examples/`](examples/) directory for more patterns including async processing and custom runners.\n\n## Performance & Tuning\n\n- **Throughput**: 1000+ messages/second on typical hardware\n- **Latency**: <10ms for write, <10ms for read\n- **Scalability**: Tested with 100k+ messages per queue\n- **Optimization**: Use `--all` for bulk operations\n\n### Environment Variables\n\n<details>\n<summary>Click to see all configuration options</summary>\n\n**Core Settings:**\n- `BROKER_BUSY_TIMEOUT` - SQLite busy timeout in milliseconds (default: 5000)\n- `BROKER_CACHE_MB` - SQLite page cache size in megabytes (default: 10)\n  - Larger cache improves performance for repeated queries and large scans\n  - Recommended: 10-50 MB for typical workloads, 100+ MB for heavy use\n- `BROKER_SYNC_MODE` - SQLite synchronous mode: FULL, NORMAL, or OFF (default: FULL)\n  - `FULL`: Maximum durability, safe against power loss (default)\n  - `NORMAL`: ~25% faster writes, safe against app crashes, small risk on power loss\n- `BROKER_WAL_AUTOCHECKPOINT` - WAL auto-checkpoint threshold in pages (default: 1000)\n  - Controls when SQLite automatically moves WAL data to the main database\n  - Default of 1000 pages \u2248 1MB (with 1KB page size)\n  - Increase for high-traffic scenarios to reduce checkpoint frequency\n  - Set to 0 to disable automatic checkpoints (manual control only)\n  - `OFF`: Fastest but unsafe - only for testing or non-critical data\n\n**Read Performance:**\n- `BROKER_READ_COMMIT_INTERVAL` - Number of messages to read before committing in `--all` mode (default: 1)\n  - Default of 1 provides exactly-once delivery guarantee\n  - Increase for better performance with at-least-once delivery guarantee\n\n**Vacuum Settings:**\n- `BROKER_AUTO_VACUUM` - Enable automatic vacuum of claimed messages (default: true)\n- `BROKER_VACUUM_THRESHOLD` - Number of claimed messages before auto-vacuum triggers (default: 10000)\n- `BROKER_VACUUM_BATCH_SIZE` - Number of messages to delete per vacuum batch (default: 1000)\n- `BROKER_VACUUM_LOCK_TIMEOUT` - Seconds before a vacuum lock is considered stale (default: 300)\n\n**Watcher Tuning:**\n- `BROKER_INITIAL_CHECKS` - Number of checks with zero delay (default: 100)\n- `BROKER_MAX_INTERVAL` - Maximum polling interval in seconds (default: 0.1)\n\n**Database Naming:**\n- `BROKER_DEFAULT_DB_NAME` - name of the broker database file (default: .broker.db)\n- Corresponds to the -f/--file command line argument\n- Can be a compound path including a single directory (e.g., \".subdirectory/broker.db\")\n- Applies to all scopes \n\nExample configurations:\n```bash\n# High-throughput configuration\nexport BROKER_SYNC_MODE=NORMAL\nexport BROKER_READ_COMMIT_INTERVAL=100\nexport BROKER_INITIAL_CHECKS=1000\n\n# Low-latency configuration  \nexport BROKER_MAX_INTERVAL=0.01\nexport BROKER_CACHE_MB=50\n\n# Power-saving configuration\nexport BROKER_INITIAL_CHECKS=50\nexport BROKER_MAX_INTERVAL=0.5\n\n# Project scoping configuration\nexport BROKER_PROJECT_SCOPE=true\nexport BROKER_DEFAULT_DB_NAME=project-queue.db\n```\n</details>\n\n## Project Scoping\n\nSimpleBroker provides flexible database scoping modes to handle different use cases:\n\n**Directory Scope (Default):** Each directory gets its own independent `.broker.db`  \n**Project Scope:** Git-like upward search for shared project database  \n**Global Scope:** Use a specific location for all broker operations\n\nThis allows multiple scripts and processes to share broker databases according to your needs.\n\n### Basic Project Scoping\n\nEnable project scoping by setting the environment variable:\n\n```bash\nexport BROKER_PROJECT_SCOPE=true\n```\n\nWith project scoping enabled, SimpleBroker searches upward from the current directory to find an existing `.broker.db` file. If found, it uses that database instead of creating a new one in the current directory.\n\n```bash\n# Project structure:\n# /home/user/myproject/.broker.db  \u2190 Project database\n# /home/user/myproject/scripts/\n# /home/user/myproject/scripts/worker.py\n\ncd /home/user/myproject/scripts\nexport BROKER_PROJECT_SCOPE=true\nbroker write tasks \"process data\"  # Uses /home/user/myproject/.broker.db\n```\n\n**Benefits:**\n- **Shared state**: All project scripts use the same message queue\n- **Location independence**: Works from any subdirectory\n- **Zero configuration**: Just set the environment variable\n- **Git-like behavior**: Intuitive for developers familiar with version control\n\n### Global Scope\n\nUse a specific directory for all broker operations. Must be an absolute path.\n\n```bash\nexport BROKER_DEFAULT_DB_LOCATION=/var/lib/myapp\n# Uses: /var/lib/myapp/.broker.db for all operations\n```\n\n**Use cases:**\n- **System-wide queues**: Central message broker for multiple applications\n- **Shared storage**: Use network-mounted directories for distributed access\n- **Privilege separation**: Store databases in controlled system directories\n\n**Note:** `BROKER_DEFAULT_DB_LOCATION` corresponds to the `-d/--dir` command line argument and is ignored when `BROKER_PROJECT_SCOPE=true`.\n\n### Project Database Names\n\nControl the database filename used in any scoping mode:\n\n```bash\nexport BROKER_DEFAULT_DB_NAME=project-queue.db\nexport BROKER_PROJECT_SCOPE=true\n```\nNow project scoping searches for `project-queue.db` instead of `.broker.db`.\n\nTo better support git-like operation, the BROKER_DEFAULT_DB_NAME can be a compound name including a single subdirectory:\n\n```bash\nexport BROKER_DEFAULT_DB_NAME=.project/queue.db\nexport BROKER_PROJECT_SCOPE=true\n```\nNow project scoping searches for `.project/queue.db` instead of `.broker.db`.\n\n**Use cases:**\n- **Multiple projects**: Use different names to avoid conflicts\n- **Descriptive names**: `analytics.db`, `build-queue.db`, etc.\n- **Environment separation**: `dev-queue.db` vs `prod-queue.db`\n- **Using config directories**: `.config/broker.db` vs `.broker.db`\n\n**Note:** If no project database is found during the upward search, SimpleBroker will error out and ask you to run `broker init` to create one.\n\n### Error Behavior When No Project Database Found\n\nWhen project scoping is enabled but no project database is found, SimpleBroker will error out with a clear message:\n\n```bash\nexport BROKER_PROJECT_SCOPE=true\ncd /tmp/isolated_directory\nbroker write tasks \"test message\"\n# Error: No SimpleBroker database found in project scope.\n# Run 'broker init' to create a project database.\n```\n\n**This is intentional behavior** - SimpleBroker requires explicit initialization to avoid accidentally creating databases in unexpected locations.\n\n### Project Initialization\n\nUse `broker init` to create a project database in the current directory:\n\n```bash\ncd /home/user/myproject\nbroker init\n# Creates /home/user/myproject/.broker.db\n```\n\n**With custom database name:**\n```bash\nexport BROKER_DEFAULT_DB_NAME=project-queue.db\ncd /home/user/myproject\nbroker init\n# Creates /home/user/myproject/project-queue.db\n\n# Force reinitialize existing database\nbroker init --force\n```\n\n**Important:** `broker init` always creates the database in the current working directory and does not accept `-d` or `-f` flags. It only respects `BROKER_DEFAULT_DB_NAME` for custom filenames.\n\n**Directory structure examples:**\n```bash\n# Web application\nwebapp/\n\u251c\u2500\u2500 .broker.db          \u2190 Project queue (created by: broker init)\n\u251c\u2500\u2500 frontend/\n\u2502   \u2514\u2500\u2500 build.py        \u2190 Uses ../broker.db \n\u251c\u2500\u2500 backend/\n\u2502   \u2514\u2500\u2500 worker.py       \u2190 Uses ../broker.db\n\u2514\u2500\u2500 scripts/\n    \u2514\u2500\u2500 deploy.sh       \u2190 Uses ../broker.db\n\n# Data pipeline\npipeline/\n\u251c\u2500\u2500 queues.db           \u2190 Custom name (BROKER_DEFAULT_DB_NAME=queues.db)\n\u251c\u2500\u2500 extract/\n\u2502   \u2514\u2500\u2500 scraper.py      \u2190 Uses ../queues.db\n\u251c\u2500\u2500 transform/\n\u2502   \u2514\u2500\u2500 processor.py    \u2190 Uses ../queues.db\n\u2514\u2500\u2500 load/\n    \u2514\u2500\u2500 uploader.py     \u2190 Uses ../queues.db\n```\n\n### Precedence Rules\n\nDatabase path resolution follows strict precedence rules:\n\n1. **Explicit CLI flags** (`-f`, `-d`) - Always override all other settings\n2. **Project scoping** (`BROKER_PROJECT_SCOPE=true`) - Git-like upward search, errors if not found\n3. **Global scope** (`BROKER_DEFAULT_DB_LOCATION`) - Used when project scoping disabled\n4. **Built-in defaults** - Current directory + `.broker.db`\n\n**Environment variable interactions:**\n- `BROKER_DEFAULT_DB_NAME` applies to all scoping modes\n- `BROKER_DEFAULT_DB_LOCATION` ignored when `BROKER_PROJECT_SCOPE=true`\n- `BROKER_PROJECT_SCOPE=true` takes precedence over global scope settings\n\n**Examples:**\n\n```bash\nexport BROKER_PROJECT_SCOPE=true\nexport BROKER_DEFAULT_DB_NAME=project.db\n\n# 1. Explicit absolute path (highest precedence)\nbroker -f /explicit/path/queue.db write test \"msg\"\n# Uses: /explicit/path/queue.db\n\n# 2. Explicit directory + filename\nbroker -d /explicit/dir -f custom.db write test \"msg\"  \n# Uses: /explicit/dir/custom.db\n\n# 3. Project scoping finds existing database\n# (assuming /home/user/myproject/.config/project.db exists)\ncd /home/user/myproject/subdir\nbroker write test \"msg\"\n# Uses: /home/user/myproject/.config/project.db\n\n# 4. Project scoping enabled but no database found (errors out)\ncd /tmp/isolated\nbroker write test \"msg\"\n# Error: No SimpleBroker database found. Run 'broker init' to create one.\n\n# 5. Built-in defaults (no project scoping)\nunset BROKER_PROJECT_SCOPE BROKER_DEFAULT_DB_NAME\nbroker write test \"msg\"\n# Uses: /tmp/isolated/.broker.db\n```\n\n**Decision flowchart:**\n```\nCLI flags (-f absolute path)?\n\u251c\u2500 YES \u2192 Use absolute path\n\u2514\u2500 NO \u2192 CLI flags (-d + -f)?\n   \u251c\u2500 YES \u2192 Use directory + filename\n   \u2514\u2500 NO \u2192 BROKER_PROJECT_SCOPE=true?\n      \u251c\u2500 NO \u2192 Use env defaults or built-in defaults\n      \u2514\u2500 YES \u2192 Search upward for database\n         \u251c\u2500 FOUND \u2192 Use project database\n         \u2514\u2500 NOT FOUND \u2192 Error with message to run 'broker init'\n```\n\n### Security Notes\n\nProject scoping includes several security measures to prevent unauthorized access:\n\n**Boundary detection:**\n- Stops at filesystem root (`/` on Unix, `C:\\` on Windows)\n- Respects filesystem mount boundaries\n- Maximum 100 directory levels to prevent infinite loops\n\n**Database validation:**\n- Only uses files with SimpleBroker magic string\n- Validates database schema and structure\n- Rejects corrupted or foreign databases\n\n**Permission checking:**\n- Respects file system access controls\n- Skips directories with permission issues\n- Validates read/write access before using database\n\n**Traversal limits:**\n- Maximum 100 directory levels to prevent infinite loops\n- Prevents symlink loop exploitation\n- Uses existing path resolution security\n\n**Warnings:**\n\n\u26a0\ufe0f **Project scoping allows accessing databases in parent directories.** Only enable in trusted environments where this behavior is desired.\n\n\u26a0\ufe0f **Database sharing:** Multiple processes will share the same database when project scoping is enabled. Ensure your application handles concurrent access appropriately.\n\n\u26a0\ufe0f **No automatic fallback:** When project scoping is enabled but no database is found, SimpleBroker will error out rather than creating a database automatically. You must run `broker init` to create a project database.\n\n**Best practices:**\n```bash\n# Safe: Enable only in controlled environments\nif [[ \"$PWD\" == /home/user/myproject/* ]]; then\n    export BROKER_PROJECT_SCOPE=true\nfi\n\n# Safe: Use explicit paths for sensitive operations\nbroker -f /secure/path/queue.db write secrets \"sensitive data\"\n\n# Safe: Validate environment before enabling\nif [[ -r \"/home/user/myproject/.broker.db\" ]]; then\n    export BROKER_PROJECT_SCOPE=true\nfi\n```\n\n### Common Use Cases\n\n**Build systems:**\n```bash\n# Root project queue for build coordination\ncd /project && broker init\nexport BROKER_PROJECT_SCOPE=true\n\n# Frontend build (any subdirectory)\ncd /project/frontend\nbroker write build-tasks \"compile assets\"\n\n# Backend build (different subdirectory)  \ncd /project/backend\nbroker read build-tasks  # Gets \"compile assets\"\n```\n\n**Data pipelines:**\n```bash\n# Pipeline coordination\nexport BROKER_PROJECT_SCOPE=true\nexport BROKER_DEFAULT_DB_NAME=pipeline.db\ncd /data-pipeline && broker init\n\n# Extract phase\ncd /data-pipeline/extractors\nbroker write raw-data \"/path/to/file1.csv\"\n\n# Transform phase  \ncd /data-pipeline/transformers\nbroker read raw-data  # Gets \"/path/to/file1.csv\"\nbroker write clean-data \"/path/to/processed1.json\"\n\n# Load phase\ncd /data-pipeline/loaders  \nbroker read clean-data  # Gets \"/path/to/processed1.json\"\n```\n\n**Development workflows:**\n```bash\n# Development environment setup\ncd ~/myproject\nexport BROKER_PROJECT_SCOPE=true\nexport BROKER_DEFAULT_DB_NAME=dev-queue.db\nbroker init\n\n# Testing from any location\ncd ~/myproject/tests\nbroker write test-data \"integration-test-1\"\n\n# Application reads from any location\ncd ~/myproject/src\nbroker read test-data  # Gets \"integration-test-1\"\n```\n\n**CI/CD integration:**\n```bash\n# Build script (in any project subdirectory)\n#!/bin/bash\nexport BROKER_PROJECT_SCOPE=true\nexport BROKER_DEFAULT_DB_NAME=ci-queue.db\n\n# Ensure project queue exists\nif ! broker list >/dev/null 2>&1; then\n    broker init\nfi\n\n# Add build tasks\nbroker write builds \"compile-frontend\"\nbroker write builds \"run-tests\" \nbroker write builds \"build-docker\"\nbroker write builds \"deploy-staging\"\n```\n\n**Multi-service coordination:**\n```bash\n# Service discovery queue\nexport BROKER_PROJECT_SCOPE=true\nexport BROKER_DEFAULT_DB_NAME=services.db\n\n# Service A registers itself\ncd /app/service-a\nbroker write registry \"service-a:healthy:port:8080\"\n\n# Service B discovers Service A\ncd /app/service-b  \nbroker peek registry  # Sees \"service-a:healthy:port:8080\"\n```\n\n## Architecture & Technical Details\n\n<details>\n<summary>Database Schema and Internals</summary>\n\nSimpleBroker uses a single SQLite database with Write-Ahead Logging (WAL) enabled:\n\n```sql\nCREATE TABLE messages (\n    id INTEGER PRIMARY KEY AUTOINCREMENT,  -- Ensures strict FIFO ordering\n    queue TEXT NOT NULL,\n    body TEXT NOT NULL,\n    ts INTEGER NOT NULL UNIQUE,            -- Unique hybrid timestamp serves as message ID\n    claimed INTEGER DEFAULT 0              -- For read optimization\n);\n```\n\n**Key design decisions:**\n- The `id` column guarantees global FIFO ordering across all processes\n- The `ts` column serves as the public message identifier with uniqueness enforced\n- WAL mode enables concurrent readers and writers\n- Claim-based deletion enables ~3x faster reads\n</details>\n\n<details>\n<summary>Concurrency and Delivery Guarantees</summary>\n\n**Exactly-Once Delivery:** Read and move operations use atomic `DELETE...RETURNING` operations. A message is delivered exactly once to a consumer by default.\n\n**FIFO Ordering:** Messages are always read in the exact order they were written to the database, regardless of which process wrote them. This is guaranteed by SQLite's autoincrement and row-level locking.\n\n**Message Lifecycle:**\n1. **Write Phase**: Message inserted with unique timestamp\n2. **Claim Phase**: Read marks message as \"claimed\" (fast, logical delete)\n3. **Vacuum Phase**: Background process permanently removes claimed messages\n\nThis optimization is transparent - messages are still delivered exactly once.\n</details>\n\n<details>\n<summary>Security Considerations</summary>\n\n- **Queue names**: Validated (alphanumeric + underscore + hyphen + period only)\n- **Message size**: Limited to 10MB\n- **Database files**: Created with 0600 permissions (user-only)\n- **SQL injection**: Prevented via parameterized queries\n- **Message content**: Not validated - can contain any text including shell metacharacters\n</details>\n\n## Development & Contributing\n\nSimpleBroker uses [`uv`](https://github.com/astral-sh/uv) for package management and [`ruff`](https://github.com/astral-sh/ruff) for linting.\n\n```bash\n# Clone the repository\ngit clone git@github.com:VanL/simplebroker.git\ncd simplebroker\n\n# Install development environment\nuv sync --all-extras\n\n# Run tests\nuv run pytest              # Fast tests only\nuv run pytest -m \"\"        # All tests including slow ones\n\n# Lint and format\nuv run ruff check --fix simplebroker tests\nuv run ruff format simplebroker tests\nuv run mypy simplebroker\n```\n\n**Contributing guidelines:**\n1. Keep it simple - the entire codebase should stay understandable\n2. Maintain backward compatibility\n3. Add tests for new features\n4. Update documentation\n5. Run linting and tests before submitting PRs\n\n## License\n\nMIT \u00a9 2025 Van Lindberg\n\n## Acknowledgments\n\nBuilt with Python, SQLite, and the Unix philosophy.\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "A lightweight message queue backed by SQLite",
    "version": "2.7.1",
    "project_urls": {
        "Documentation": "https://github.com/VanL/simplebroker#readme",
        "Homepage": "https://github.com/VanL/simplebroker",
        "Issues": "https://github.com/VanL/simplebroker/issues",
        "Repository": "https://github.com/VanL/simplebroker.git"
    },
    "split_keywords": [
        "broker",
        " cli",
        " message-queue",
        " queue",
        " sqlite"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "bb67933b5c577ee08ba2646e5017d9fff3fe91cdd4ead21b46b3e6af8c426ee8",
                "md5": "df7aed205eddbea6610cd0bade150966",
                "sha256": "6daf18dac3b6cb7ff27d2138d78d801814cc5234ba9524ab5599df293f025344"
            },
            "downloads": -1,
            "filename": "simplebroker-2.7.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "df7aed205eddbea6610cd0bade150966",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.10",
            "size": 118876,
            "upload_time": "2025-10-24T22:06:23",
            "upload_time_iso_8601": "2025-10-24T22:06:23.863072Z",
            "url": "https://files.pythonhosted.org/packages/bb/67/933b5c577ee08ba2646e5017d9fff3fe91cdd4ead21b46b3e6af8c426ee8/simplebroker-2.7.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "147ac855ed7cb0b954e68047b0bd26c01170edb516bb8fa16e3871957ad62b73",
                "md5": "bf94511fe1125b68db09e74ca9ed4e2e",
                "sha256": "cbd8f9dfa2092117ad4a1c9bf87e8f1c1b9b09611b31a8693125eb1d0c6ac784"
            },
            "downloads": -1,
            "filename": "simplebroker-2.7.1.tar.gz",
            "has_sig": false,
            "md5_digest": "bf94511fe1125b68db09e74ca9ed4e2e",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.10",
            "size": 108722,
            "upload_time": "2025-10-24T22:06:24",
            "upload_time_iso_8601": "2025-10-24T22:06:24.937358Z",
            "url": "https://files.pythonhosted.org/packages/14/7a/c855ed7cb0b954e68047b0bd26c01170edb516bb8fa16e3871957ad62b73/simplebroker-2.7.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-10-24 22:06:24",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "VanL",
    "github_project": "simplebroker#readme",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "simplebroker"
}
        
Elapsed time: 2.50516s