beque


Namebeque JSON
Version 0.1.0 PyPI version JSON
download
home_pageNone
SummaryAsynchronous In-Memory Batch Queue Processor
upload_time2025-08-30 03:45:06
maintainerNone
docs_urlNone
authorNone
requires_python>=3.8
licenseNone
keywords async batch processing queue
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Beque - Asynchronous In-Memory Batch Queue Processor

**Beque** (pronounced "Beck") is a lightweight, high-performance Python library for asynchronous batch processing. It accumulates items in memory and flushes them to an async sink when either:

* `max_batch_size` items are queued, or
* `flush_interval` seconds have passed since the last successful flush

Flushing is serialized and items are never lost: on failure, the batch is re-queued in original order.

## Features

* **Async/await support**: Built for modern Python asyncio applications
* **Batch processing**: Efficiently processes items in configurable batches
* **Time-based flushing**: Automatic flushing based on configurable intervals
* **Error resilience**: Failed batches are re-queued and retried
* **Thread-safe**: Safe for concurrent access across async tasks
* **Generic typing**: Full type safety with Python's generic typing system
* **Comprehensive stats**: Built-in statistics and monitoring
* **Zero dependencies**: No external dependencies required

## Installation

```bash
pip install beque
```

## Quick Start

```python
import asyncio
from beque import Beque

async def process_batch(items):
    """Your async batch processing function."""
    print(f"Processing {len(items)} items: {items}")
    # Simulate async work (database write, API call, etc.)
    await asyncio.sleep(0.1)

async def main():
    # Create a Beque that flushes every 5 items or every 10 seconds
    async with Beque(
        on_flush=process_batch, 
        max_batch_size=5, 
        flush_interval=10.0
    ) as queue:
        
        # Add items to the queue
        for i in range(12):
            await queue.add(f"item-{i}")
            await asyncio.sleep(0.5)
        
        # Items are automatically flushed in batches
        # Final flush happens when exiting the context manager

asyncio.run(main())
```

## API Reference

### Beque Class

```python
class Beque(Generic[T]):
    def __init__(
        self,
        *,
        on_flush: Callable[[List[T]], Awaitable[None]],
        max_batch_size: int = 100,
        flush_interval: float = 10.0,
        name: str = "Beque",
        logger: logging.Logger = None,
    )
```

**Parameters:**

* `on_flush`: Async function that processes batches of items
* `max_batch_size`: Maximum items in a batch before auto-flush (default: 100)
* `flush_interval`: Seconds between time-based flushes (default: 10.0)
* `name`: Name for logging and identification (default: "Beque")
* `logger`: Custom logger instance (optional)

### Methods

#### `async add(item: T) -> None`

Add a single item to the queue.

#### `async add_many(items: List[T]) -> None`

Add multiple items to the queue atomically.

#### `async flush(*, force: bool = True) -> None`

Manually trigger a flush. If `force=True`, flushes all queued items.

#### `stats -> dict`

Get current statistics:

```python
{
    "flushes": int,      # Total successful flushes
    "items": int,        # Total items processed  
    "failures": int,     # Total flush failures
    "queued": int,       # Current items in queue
    "last_flush_time": float,  # Timestamp of last flush
    "running": bool      # Whether queue is active
}
```

### Context Manager Usage

Beque is designed to be used as an async context manager:

```python
async with Beque(on_flush=handler) as queue:
    await queue.add(item)
    # Automatic cleanup and final flush on exit
```

Or manually:

```python
queue = Beque(on_flush=handler)
await queue.start()
try:
    await queue.add(item)
finally:
    await queue.stop()  # Ensures final flush
```

## Advanced Examples

### Database Batch Inserts

```python
import asyncio
from beque import Beque

class DatabaseWriter:
    async def write_users(self, user_batch):
        # Simulate batch database insert
        print(f"INSERT INTO users VALUES {user_batch}")
        await asyncio.sleep(0.1)  # Simulated I/O

async def main():
    db = DatabaseWriter()
    
    async with Beque(
        on_flush=db.write_users,
        max_batch_size=10,
        flush_interval=5.0
    ) as user_queue:
        
        # Simulate receiving user data
        for i in range(25):
            user_data = {"id": i, "name": f"user-{i}"}
            await user_queue.add(user_data)
            await asyncio.sleep(0.2)

asyncio.run(main())
```

### Error Handling and Recovery

```python
import asyncio
import random
from beque import Beque

async def flaky_processor(batch):
    """Processor that occasionally fails."""
    if random.random() < 0.3:  # 30% failure rate
        raise Exception("Processing failed!")
    
    print(f"Successfully processed: {batch}")
    await asyncio.sleep(0.1)

async def main():
    async with Beque(
        on_flush=flaky_processor,
        max_batch_size=3,
        flush_interval=2.0
    ) as queue:
        
        for i in range(10):
            await queue.add(f"task-{i}")
            await asyncio.sleep(0.5)
        
        # Check stats to see failure/retry information
        print("Final stats:", queue.stats)

asyncio.run(main())
```

### Multiple Concurrent Producers

```python
import asyncio
from beque import Beque

async def log_processor(batch):
    print(f"Logged {len(batch)} events: {batch}")
    await asyncio.sleep(0.05)

async def producer(queue, producer_id):
    """Simulate concurrent event producers."""
    for i in range(10):
        event = f"producer-{producer_id}-event-{i}"
        await queue.add(event)
        await asyncio.sleep(0.1)

async def main():
    async with Beque(
        on_flush=log_processor,
        max_batch_size=5,
        flush_interval=1.0
    ) as event_queue:
        
        # Start multiple concurrent producers
        await asyncio.gather(
            producer(event_queue, 1),
            producer(event_queue, 2),
            producer(event_queue, 3),
        )

asyncio.run(main())
```

## Type Safety

Beque is fully typed and supports generic type parameters:

```python
from beque import Beque
from typing import Dict

async def process_dicts(batch: List[Dict[str, int]]) -> None:
    for item in batch:
        print(f"Processing: {item}")

# Type-safe queue for dictionaries
queue: Beque[Dict[str, int]] = Beque(on_flush=process_dicts)
```

## Performance Characteristics

* **Memory**: O(n) where n is the number of queued items
* **Throughput**: Optimized for high-frequency additions with batched processing
* **Latency**: Configurable via `flush_interval` and `max_batch_size`
* **Concurrency**: Thread-safe with asyncio locks, supports many concurrent producers

## Error Handling

Beque provides robust error handling:

1. **Flush failures**: Batches are re-queued in original order
2. **Automatic retry**: Failed batches will be retried on next flush opportunity
3. **Graceful shutdown**: Context manager ensures final flush even on exceptions
4. **Statistics tracking**: Monitor success/failure rates via `stats` property

## Logging

Beque provides structured logging at various levels:

* **INFO**: Start/stop events with configuration
* **DEBUG**: Individual flush operations
* **ERROR**: Flush failures with full context

```python
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("my_app.queue")

async with Beque(on_flush=handler, logger=logger) as queue:
    # Custom logger will be used for all queue events
    pass
```

## Requirements

* Python 3.8+
* No external dependencies

## License

MIT License - see LICENSE file for details.

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "beque",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": null,
    "keywords": "async, batch, processing, queue",
    "author": null,
    "author_email": "Punit Arani <punitsai36@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/34/58/fc9dcecf714166b14f252936cf56e87d5105302d09ef12fe8f096c6851e7/beque-0.1.0.tar.gz",
    "platform": null,
    "description": "# Beque - Asynchronous In-Memory Batch Queue Processor\n\n**Beque** (pronounced \"Beck\") is a lightweight, high-performance Python library for asynchronous batch processing. It accumulates items in memory and flushes them to an async sink when either:\n\n* `max_batch_size` items are queued, or\n* `flush_interval` seconds have passed since the last successful flush\n\nFlushing is serialized and items are never lost: on failure, the batch is re-queued in original order.\n\n## Features\n\n* **Async/await support**: Built for modern Python asyncio applications\n* **Batch processing**: Efficiently processes items in configurable batches\n* **Time-based flushing**: Automatic flushing based on configurable intervals\n* **Error resilience**: Failed batches are re-queued and retried\n* **Thread-safe**: Safe for concurrent access across async tasks\n* **Generic typing**: Full type safety with Python's generic typing system\n* **Comprehensive stats**: Built-in statistics and monitoring\n* **Zero dependencies**: No external dependencies required\n\n## Installation\n\n```bash\npip install beque\n```\n\n## Quick Start\n\n```python\nimport asyncio\nfrom beque import Beque\n\nasync def process_batch(items):\n    \"\"\"Your async batch processing function.\"\"\"\n    print(f\"Processing {len(items)} items: {items}\")\n    # Simulate async work (database write, API call, etc.)\n    await asyncio.sleep(0.1)\n\nasync def main():\n    # Create a Beque that flushes every 5 items or every 10 seconds\n    async with Beque(\n        on_flush=process_batch, \n        max_batch_size=5, \n        flush_interval=10.0\n    ) as queue:\n        \n        # Add items to the queue\n        for i in range(12):\n            await queue.add(f\"item-{i}\")\n            await asyncio.sleep(0.5)\n        \n        # Items are automatically flushed in batches\n        # Final flush happens when exiting the context manager\n\nasyncio.run(main())\n```\n\n## API Reference\n\n### Beque Class\n\n```python\nclass Beque(Generic[T]):\n    def __init__(\n        self,\n        *,\n        on_flush: Callable[[List[T]], Awaitable[None]],\n        max_batch_size: int = 100,\n        flush_interval: float = 10.0,\n        name: str = \"Beque\",\n        logger: logging.Logger = None,\n    )\n```\n\n**Parameters:**\n\n* `on_flush`: Async function that processes batches of items\n* `max_batch_size`: Maximum items in a batch before auto-flush (default: 100)\n* `flush_interval`: Seconds between time-based flushes (default: 10.0)\n* `name`: Name for logging and identification (default: \"Beque\")\n* `logger`: Custom logger instance (optional)\n\n### Methods\n\n#### `async add(item: T) -> None`\n\nAdd a single item to the queue.\n\n#### `async add_many(items: List[T]) -> None`\n\nAdd multiple items to the queue atomically.\n\n#### `async flush(*, force: bool = True) -> None`\n\nManually trigger a flush. If `force=True`, flushes all queued items.\n\n#### `stats -> dict`\n\nGet current statistics:\n\n```python\n{\n    \"flushes\": int,      # Total successful flushes\n    \"items\": int,        # Total items processed  \n    \"failures\": int,     # Total flush failures\n    \"queued\": int,       # Current items in queue\n    \"last_flush_time\": float,  # Timestamp of last flush\n    \"running\": bool      # Whether queue is active\n}\n```\n\n### Context Manager Usage\n\nBeque is designed to be used as an async context manager:\n\n```python\nasync with Beque(on_flush=handler) as queue:\n    await queue.add(item)\n    # Automatic cleanup and final flush on exit\n```\n\nOr manually:\n\n```python\nqueue = Beque(on_flush=handler)\nawait queue.start()\ntry:\n    await queue.add(item)\nfinally:\n    await queue.stop()  # Ensures final flush\n```\n\n## Advanced Examples\n\n### Database Batch Inserts\n\n```python\nimport asyncio\nfrom beque import Beque\n\nclass DatabaseWriter:\n    async def write_users(self, user_batch):\n        # Simulate batch database insert\n        print(f\"INSERT INTO users VALUES {user_batch}\")\n        await asyncio.sleep(0.1)  # Simulated I/O\n\nasync def main():\n    db = DatabaseWriter()\n    \n    async with Beque(\n        on_flush=db.write_users,\n        max_batch_size=10,\n        flush_interval=5.0\n    ) as user_queue:\n        \n        # Simulate receiving user data\n        for i in range(25):\n            user_data = {\"id\": i, \"name\": f\"user-{i}\"}\n            await user_queue.add(user_data)\n            await asyncio.sleep(0.2)\n\nasyncio.run(main())\n```\n\n### Error Handling and Recovery\n\n```python\nimport asyncio\nimport random\nfrom beque import Beque\n\nasync def flaky_processor(batch):\n    \"\"\"Processor that occasionally fails.\"\"\"\n    if random.random() < 0.3:  # 30% failure rate\n        raise Exception(\"Processing failed!\")\n    \n    print(f\"Successfully processed: {batch}\")\n    await asyncio.sleep(0.1)\n\nasync def main():\n    async with Beque(\n        on_flush=flaky_processor,\n        max_batch_size=3,\n        flush_interval=2.0\n    ) as queue:\n        \n        for i in range(10):\n            await queue.add(f\"task-{i}\")\n            await asyncio.sleep(0.5)\n        \n        # Check stats to see failure/retry information\n        print(\"Final stats:\", queue.stats)\n\nasyncio.run(main())\n```\n\n### Multiple Concurrent Producers\n\n```python\nimport asyncio\nfrom beque import Beque\n\nasync def log_processor(batch):\n    print(f\"Logged {len(batch)} events: {batch}\")\n    await asyncio.sleep(0.05)\n\nasync def producer(queue, producer_id):\n    \"\"\"Simulate concurrent event producers.\"\"\"\n    for i in range(10):\n        event = f\"producer-{producer_id}-event-{i}\"\n        await queue.add(event)\n        await asyncio.sleep(0.1)\n\nasync def main():\n    async with Beque(\n        on_flush=log_processor,\n        max_batch_size=5,\n        flush_interval=1.0\n    ) as event_queue:\n        \n        # Start multiple concurrent producers\n        await asyncio.gather(\n            producer(event_queue, 1),\n            producer(event_queue, 2),\n            producer(event_queue, 3),\n        )\n\nasyncio.run(main())\n```\n\n## Type Safety\n\nBeque is fully typed and supports generic type parameters:\n\n```python\nfrom beque import Beque\nfrom typing import Dict\n\nasync def process_dicts(batch: List[Dict[str, int]]) -> None:\n    for item in batch:\n        print(f\"Processing: {item}\")\n\n# Type-safe queue for dictionaries\nqueue: Beque[Dict[str, int]] = Beque(on_flush=process_dicts)\n```\n\n## Performance Characteristics\n\n* **Memory**: O(n) where n is the number of queued items\n* **Throughput**: Optimized for high-frequency additions with batched processing\n* **Latency**: Configurable via `flush_interval` and `max_batch_size`\n* **Concurrency**: Thread-safe with asyncio locks, supports many concurrent producers\n\n## Error Handling\n\nBeque provides robust error handling:\n\n1. **Flush failures**: Batches are re-queued in original order\n2. **Automatic retry**: Failed batches will be retried on next flush opportunity\n3. **Graceful shutdown**: Context manager ensures final flush even on exceptions\n4. **Statistics tracking**: Monitor success/failure rates via `stats` property\n\n## Logging\n\nBeque provides structured logging at various levels:\n\n* **INFO**: Start/stop events with configuration\n* **DEBUG**: Individual flush operations\n* **ERROR**: Flush failures with full context\n\n```python\nimport logging\n\nlogging.basicConfig(level=logging.INFO)\nlogger = logging.getLogger(\"my_app.queue\")\n\nasync with Beque(on_flush=handler, logger=logger) as queue:\n    # Custom logger will be used for all queue events\n    pass\n```\n\n## Requirements\n\n* Python 3.8+\n* No external dependencies\n\n## License\n\nMIT License - see LICENSE file for details.\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "Asynchronous In-Memory Batch Queue Processor",
    "version": "0.1.0",
    "project_urls": null,
    "split_keywords": [
        "async",
        " batch",
        " processing",
        " queue"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "6dacd397e332d31620ffa9a082ac47238242eb529aafd84b800fe4b429d31326",
                "md5": "83775ca0aab69f7139ca75ddd4c0335e",
                "sha256": "99f48d23405fb48de58a882970a48bf9433ccb9d769ca9c9269ac95163f84e76"
            },
            "downloads": -1,
            "filename": "beque-0.1.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "83775ca0aab69f7139ca75ddd4c0335e",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 6716,
            "upload_time": "2025-08-30T03:45:05",
            "upload_time_iso_8601": "2025-08-30T03:45:05.898537Z",
            "url": "https://files.pythonhosted.org/packages/6d/ac/d397e332d31620ffa9a082ac47238242eb529aafd84b800fe4b429d31326/beque-0.1.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "3458fc9dcecf714166b14f252936cf56e87d5105302d09ef12fe8f096c6851e7",
                "md5": "0557784433795a9824b57ad7dbebc0c7",
                "sha256": "8c87e0810dfab3b54e02ad6962bc1296673413f7b7a15cfc5c7be653d4c6c2ef"
            },
            "downloads": -1,
            "filename": "beque-0.1.0.tar.gz",
            "has_sig": false,
            "md5_digest": "0557784433795a9824b57ad7dbebc0c7",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 30128,
            "upload_time": "2025-08-30T03:45:06",
            "upload_time_iso_8601": "2025-08-30T03:45:06.938658Z",
            "url": "https://files.pythonhosted.org/packages/34/58/fc9dcecf714166b14f252936cf56e87d5105302d09ef12fe8f096c6851e7/beque-0.1.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-08-30 03:45:06",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "beque"
}
        
Elapsed time: 3.12742s