fastsqs


Namefastsqs JSON
Version 0.4.0 PyPI version JSON
download
home_pagehttps://github.com/lafayettegabe/fastsqs
SummaryFastAPI-like, modern async SQS message processing for Python with advanced features
upload_time2025-09-08 05:25:47
maintainerNone
docs_urlNone
authorGabriel LaFayette
requires_python>=3.8
licenseMIT
keywords sqs aws lambda serverless async fastapi pydantic message-processing
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # FastSQS

**FastAPI-like, production-ready async SQS message processing for Python.**

[![PyPI version](https://img.shields.io/pypi/v/fastsqs.svg)](https://pypi.org/project/fastsqs/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](LICENSE)

---

## Version 0.4.0 - Enhanced Enterprise Features

> ⚠️ **Pre-1.0 Release Warning**: This library is under active development. Breaking changes may occur until version 1.0.0. Pin your version in production.

### 🚀 New in Version 0.4.0

- **Middleware Presets**: Quick setup with production, development, and minimal presets
- **Enhanced Telemetry**: Advanced logging with Elasticsearch integration support
- **Load Balancing**: Intelligent message distribution across handlers
- **Queue Metrics**: Comprehensive queue performance monitoring
- **Custom Middleware Framework**: Simplified custom middleware creation with examples

### 🏗️ Enterprise Features (0.3.x)

- **Idempotency**: Prevent duplicate message processing with memory or DynamoDB storage
- **Advanced Error Handling**: Exponential backoff, circuit breaker, and DLQ management  
- **Visibility Timeout Management**: Automatic monitoring and extension for long-running processes
- **Parallelization**: Concurrent processing with semaphore-based limiting and thread pools

## Key Features

- 🚀 **FastAPI-like API:** Familiar decorator-based routing with automatic type inference
- 🔒 **Pydantic Validation:** Automatic message validation and serialization using SQSEvent models
- 🔄 **Auto Async/Sync:** Write handlers as sync or async functions - framework handles both automatically
- ⚡ **Middleware Presets:** One-line setup for production, development, or minimal configurations
- 🛡️ **Enterprise Middleware:** Idempotency, error handling, circuit breakers, and DLQ management
- 📊 **Telemetry & Metrics:** Built-in performance monitoring and Elasticsearch logging support
- 🔧 **Load Balancing:** Intelligent message distribution and resource management
- 🦾 **Partial Batch Failure:** Native support for AWS Lambda batch failure responses
- 🔀 **FIFO & Standard Queues:** Full support for both SQS queue types with proper ordering
- 🎯 **Flexible Matching:** Automatic field name normalization (camelCase ↔ snake_case)
- 🏗️ **Nested Routing:** QueueRouter support for complex routing scenarios
- 🐍 **Type Safety:** Full type hints and editor support throughout

---

## Requirements

- Python 3.8+
- [Pydantic](https://docs.pydantic.dev/) (installed automatically)

---

## Installation

```bash
# Basic installation
pip install fastsqs

# With DynamoDB support (for production idempotency)
pip install fastsqs[dynamodb]

# With telemetry support (for advanced logging)
pip install fastsqs[telemetry]

# With all optional features
pip install fastsqs[all]
```

---

## Quick Start

### Basic FastAPI-like Example

```python
from fastsqs import FastSQS, SQSEvent

class UserCreated(SQSEvent):
    user_id: str
    email: str
    name: str

class OrderProcessed(SQSEvent):
    order_id: str
    amount: float

# Create FastSQS app
app = FastSQS(debug=True)

# Route messages using SQSEvent models
@app.route(UserCreated)
async def handle_user_created(msg: UserCreated):
    print(f"User created: {msg.name} ({msg.email})")

@app.route(OrderProcessed)
def handle_order_processed(msg: OrderProcessed):
    print(f"Order {msg.order_id}: ${msg.amount}")

# Default handler for unmatched messages
@app.default()
def handle_unknown(payload, ctx):
    print(f"Unknown message: {payload}")

# AWS Lambda handler
def lambda_handler(event, context):
    return app.handler(event, context)
```

### Example SQS Message Payloads

```json
{
  "type": "user_created",
  "user_id": "123",
  "email": "user@example.com",
  "name": "John Doe"
}
```

```json
{
  "type": "order_processed",
  "order_id": "ord-456",
  "amount": 99.99
}
```

---

## Advanced Features

### Middleware Presets (New in 0.4.0)

```python
# Production-ready setup in one line
app.use_preset("production", 
    dynamodb_table="my-idempotency-table",
    max_concurrent=10,
    retry_attempts=3
)

# Development setup
app.use_preset("development")

# Minimal setup
app.use_preset("minimal")
```

### Manual Middleware Configuration

```python
# FIFO Queue Support
app = FastSQS(queue_type=QueueType.FIFO)

# Individual middleware
from fastsqs.middleware import (
    TimingMsMiddleware, LoggingMiddleware, 
    IdempotencyMiddleware, ErrorHandlingMiddleware,
    ParallelizationMiddleware, QueueMetricsMiddleware
)
app.add_middleware(TimingMsMiddleware())
app.add_middleware(LoggingMiddleware())
app.add_middleware(IdempotencyMiddleware())
app.add_middleware(QueueMetricsMiddleware())

# Field Matching - automatically handles camelCase ↔ snake_case
class UserEvent(SQSEvent):
    user_id: str  # Matches: user_id, userId, USER_ID
    first_name: str  # Matches: first_name, firstName
```

---

## How it Works

1. **Message Parsing:** JSON validated and normalized
2. **Route Matching:** Type-based routing to handlers  
3. **Handler Execution:** Sync/async functions supported
4. **Error Handling:** Failed messages → SQS retry/DLQ

---

## Error Handling & Performance

- **Predictable Errors:** All failures result in batch item failures for SQS retry
- **Parallel Processing:** Concurrent message handling (respects FIFO ordering)
- **Type Safety:** Full Pydantic validation with IDE support
- **Memory Efficient:** Minimal overhead per message

---

## Documentation & Contributing

- **Examples:** See `examples/` directory for complete working examples
- **Contributing:** Issues and PRs welcome!
- **License:** MIT

---

**Ready to build type-safe, FastAPI-like SQS processors? Try FastSQS today!**

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/lafayettegabe/fastsqs",
    "name": "fastsqs",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": null,
    "keywords": "sqs aws lambda serverless async fastapi pydantic message-processing",
    "author": "Gabriel LaFayette",
    "author_email": "gabriel.lafayette@proton.me",
    "download_url": "https://files.pythonhosted.org/packages/b0/a5/45ea426da9f77fd0878cbf63e69cbeb9761c1824cf299a1e574dd70f17dd/fastsqs-0.4.0.tar.gz",
    "platform": null,
    "description": "# FastSQS\n\n**FastAPI-like, production-ready async SQS message processing for Python.**\n\n[![PyPI version](https://img.shields.io/pypi/v/fastsqs.svg)](https://pypi.org/project/fastsqs/)\n[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](LICENSE)\n\n---\n\n## Version 0.4.0 - Enhanced Enterprise Features\n\n> \u26a0\ufe0f **Pre-1.0 Release Warning**: This library is under active development. Breaking changes may occur until version 1.0.0. Pin your version in production.\n\n### \ud83d\ude80 New in Version 0.4.0\n\n- **Middleware Presets**: Quick setup with production, development, and minimal presets\n- **Enhanced Telemetry**: Advanced logging with Elasticsearch integration support\n- **Load Balancing**: Intelligent message distribution across handlers\n- **Queue Metrics**: Comprehensive queue performance monitoring\n- **Custom Middleware Framework**: Simplified custom middleware creation with examples\n\n### \ud83c\udfd7\ufe0f Enterprise Features (0.3.x)\n\n- **Idempotency**: Prevent duplicate message processing with memory or DynamoDB storage\n- **Advanced Error Handling**: Exponential backoff, circuit breaker, and DLQ management  \n- **Visibility Timeout Management**: Automatic monitoring and extension for long-running processes\n- **Parallelization**: Concurrent processing with semaphore-based limiting and thread pools\n\n## Key Features\n\n- \ud83d\ude80 **FastAPI-like API:** Familiar decorator-based routing with automatic type inference\n- \ud83d\udd12 **Pydantic Validation:** Automatic message validation and serialization using SQSEvent models\n- \ud83d\udd04 **Auto Async/Sync:** Write handlers as sync or async functions - framework handles both automatically\n- \u26a1 **Middleware Presets:** One-line setup for production, development, or minimal configurations\n- \ud83d\udee1\ufe0f **Enterprise Middleware:** Idempotency, error handling, circuit breakers, and DLQ management\n- \ud83d\udcca **Telemetry & Metrics:** Built-in performance monitoring and Elasticsearch logging support\n- \ud83d\udd27 **Load Balancing:** Intelligent message distribution and resource management\n- \ud83e\uddbe **Partial Batch Failure:** Native support for AWS Lambda batch failure responses\n- \ud83d\udd00 **FIFO & Standard Queues:** Full support for both SQS queue types with proper ordering\n- \ud83c\udfaf **Flexible Matching:** Automatic field name normalization (camelCase \u2194 snake_case)\n- \ud83c\udfd7\ufe0f **Nested Routing:** QueueRouter support for complex routing scenarios\n- \ud83d\udc0d **Type Safety:** Full type hints and editor support throughout\n\n---\n\n## Requirements\n\n- Python 3.8+\n- [Pydantic](https://docs.pydantic.dev/) (installed automatically)\n\n---\n\n## Installation\n\n```bash\n# Basic installation\npip install fastsqs\n\n# With DynamoDB support (for production idempotency)\npip install fastsqs[dynamodb]\n\n# With telemetry support (for advanced logging)\npip install fastsqs[telemetry]\n\n# With all optional features\npip install fastsqs[all]\n```\n\n---\n\n## Quick Start\n\n### Basic FastAPI-like Example\n\n```python\nfrom fastsqs import FastSQS, SQSEvent\n\nclass UserCreated(SQSEvent):\n    user_id: str\n    email: str\n    name: str\n\nclass OrderProcessed(SQSEvent):\n    order_id: str\n    amount: float\n\n# Create FastSQS app\napp = FastSQS(debug=True)\n\n# Route messages using SQSEvent models\n@app.route(UserCreated)\nasync def handle_user_created(msg: UserCreated):\n    print(f\"User created: {msg.name} ({msg.email})\")\n\n@app.route(OrderProcessed)\ndef handle_order_processed(msg: OrderProcessed):\n    print(f\"Order {msg.order_id}: ${msg.amount}\")\n\n# Default handler for unmatched messages\n@app.default()\ndef handle_unknown(payload, ctx):\n    print(f\"Unknown message: {payload}\")\n\n# AWS Lambda handler\ndef lambda_handler(event, context):\n    return app.handler(event, context)\n```\n\n### Example SQS Message Payloads\n\n```json\n{\n  \"type\": \"user_created\",\n  \"user_id\": \"123\",\n  \"email\": \"user@example.com\",\n  \"name\": \"John Doe\"\n}\n```\n\n```json\n{\n  \"type\": \"order_processed\",\n  \"order_id\": \"ord-456\",\n  \"amount\": 99.99\n}\n```\n\n---\n\n## Advanced Features\n\n### Middleware Presets (New in 0.4.0)\n\n```python\n# Production-ready setup in one line\napp.use_preset(\"production\", \n    dynamodb_table=\"my-idempotency-table\",\n    max_concurrent=10,\n    retry_attempts=3\n)\n\n# Development setup\napp.use_preset(\"development\")\n\n# Minimal setup\napp.use_preset(\"minimal\")\n```\n\n### Manual Middleware Configuration\n\n```python\n# FIFO Queue Support\napp = FastSQS(queue_type=QueueType.FIFO)\n\n# Individual middleware\nfrom fastsqs.middleware import (\n    TimingMsMiddleware, LoggingMiddleware, \n    IdempotencyMiddleware, ErrorHandlingMiddleware,\n    ParallelizationMiddleware, QueueMetricsMiddleware\n)\napp.add_middleware(TimingMsMiddleware())\napp.add_middleware(LoggingMiddleware())\napp.add_middleware(IdempotencyMiddleware())\napp.add_middleware(QueueMetricsMiddleware())\n\n# Field Matching - automatically handles camelCase \u2194 snake_case\nclass UserEvent(SQSEvent):\n    user_id: str  # Matches: user_id, userId, USER_ID\n    first_name: str  # Matches: first_name, firstName\n```\n\n---\n\n## How it Works\n\n1. **Message Parsing:** JSON validated and normalized\n2. **Route Matching:** Type-based routing to handlers  \n3. **Handler Execution:** Sync/async functions supported\n4. **Error Handling:** Failed messages \u2192 SQS retry/DLQ\n\n---\n\n## Error Handling & Performance\n\n- **Predictable Errors:** All failures result in batch item failures for SQS retry\n- **Parallel Processing:** Concurrent message handling (respects FIFO ordering)\n- **Type Safety:** Full Pydantic validation with IDE support\n- **Memory Efficient:** Minimal overhead per message\n\n---\n\n## Documentation & Contributing\n\n- **Examples:** See `examples/` directory for complete working examples\n- **Contributing:** Issues and PRs welcome!\n- **License:** MIT\n\n---\n\n**Ready to build type-safe, FastAPI-like SQS processors? Try FastSQS today!**\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "FastAPI-like, modern async SQS message processing for Python with advanced features",
    "version": "0.4.0",
    "project_urls": {
        "Homepage": "https://github.com/lafayettegabe/fastsqs"
    },
    "split_keywords": [
        "sqs",
        "aws",
        "lambda",
        "serverless",
        "async",
        "fastapi",
        "pydantic",
        "message-processing"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "864e5ee63c014d14e2ab9889caed917f6bafdc67d81a21430b3d2e415b6e86d2",
                "md5": "5f8a64a7d04b987262201d442ad8803d",
                "sha256": "56504105cee70386bbcb562f281d2736766f3a0aab09e91ed83fc2118218b20e"
            },
            "downloads": -1,
            "filename": "fastsqs-0.4.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "5f8a64a7d04b987262201d442ad8803d",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 43743,
            "upload_time": "2025-09-08T05:25:46",
            "upload_time_iso_8601": "2025-09-08T05:25:46.674872Z",
            "url": "https://files.pythonhosted.org/packages/86/4e/5ee63c014d14e2ab9889caed917f6bafdc67d81a21430b3d2e415b6e86d2/fastsqs-0.4.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "b0a545ea426da9f77fd0878cbf63e69cbeb9761c1824cf299a1e574dd70f17dd",
                "md5": "b79697a888ef551b0e58a1e07787ba72",
                "sha256": "2f233322dd83f8e5f1e4ea340f8f6391ec43e91fac26390967a83b9ce718bda3"
            },
            "downloads": -1,
            "filename": "fastsqs-0.4.0.tar.gz",
            "has_sig": false,
            "md5_digest": "b79697a888ef551b0e58a1e07787ba72",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 38519,
            "upload_time": "2025-09-08T05:25:47",
            "upload_time_iso_8601": "2025-09-08T05:25:47.873568Z",
            "url": "https://files.pythonhosted.org/packages/b0/a5/45ea426da9f77fd0878cbf63e69cbeb9761c1824cf299a1e574dd70f17dd/fastsqs-0.4.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-09-08 05:25:47",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "lafayettegabe",
    "github_project": "fastsqs",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "fastsqs"
}
        
Elapsed time: 0.46332s