finalsa-sqs-consumer


Namefinalsa-sqs-consumer JSON
Version 3.0.0 PyPI version JSON
download
home_pageNone
SummaryHigh-performance SQS message consumer with worker-based concurrency, dependency injection, and async support for Python applications
upload_time2025-07-10 02:25:16
maintainerNone
docs_urlNone
authorNone
requires_python>=3.10
licenseMIT License Copyright (c) 2025 Luis Diego Jiménez Delgado Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
keywords async aws concurrency consumer dependency-injection fastapi finalsa interceptors message-queue sqs timeout worker
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Finalsa SQS Consumer

A Python package for creating SQS message consumers in FastAPI applications with built-in dependency injection, interceptors, and async support.

## Features

- **SQS Message Consumption**: Simple, decorator-based SQS message handling
- **Worker-based Concurrency**: Concurrent message processing with configurable worker pools (like uvicorn)
- **Dependency Injection**: Built-in dependency injection system with `SqsDepends`
- **Async Support**: Full async/await support for message processing
- **Interceptors**: Pre/post-processing hooks for message handling
- **Signal Handling**: Graceful shutdown handling
- **Testing Support**: Built-in testing utilities
- **Type Safety**: Full type hints and validation

## Installation

```bash
pip install finalsa-sqs-consumer
```

## Quick Start

```python
from finalsa.sqs.consumer import SqsApp, SqsDepends

# Create app instance with worker-based concurrency
app = SqsApp(
    app_name="my-consumer",
    queue_url="https://sqs.region.amazonaws.com/account/queue-name",
    max_number_of_messages=10,
    workers=8  # 8 concurrent workers for high throughput
)

# Define a simple handler
@app.handler("user.created")
async def handle_user_created(message: dict):
    print(f"User created: {message}")

# Define handler with dependencies
@app.handler("order.created")
async def handle_order_created(
    message: dict,
    db_service: DatabaseService = SqsDepends(DatabaseService)
):
    await db_service.process_order(message)

# Run the consumer with concurrent workers
if __name__ == "__main__":
    app.run()  # Starts 8 worker processes
```

## Core Components

### SqsApp

Main application class that manages message consumption and routing.

```python
app = SqsApp(
    app_name="my-app",           # Application identifier
    queue_url="...",             # SQS queue URL
    max_number_of_messages=10,   # Max messages per batch
    workers=8,                   # Number of concurrent workers (like uvicorn)
    message_timeout=300.0,       # Message processing timeout in seconds (default: 5 minutes)
    interceptors=[]              # List of interceptor classes
)
```

**Worker-based Processing:**
- Messages are distributed to a pool of concurrent workers
- Each worker processes messages independently
- Similar to uvicorn's worker model for high throughput
- Automatic load balancing across workers
- Graceful shutdown of all workers

**Message Timeout:**
- Configurable timeout for processing individual messages
- Prevents workers from being blocked by long-running handlers
- Timed-out messages are logged and returned to queue for retry
- Default timeout: 300 seconds (5 minutes)

### Message Handlers

Register handlers for specific message topics:

```python
@app.handler("topic.name")
async def my_handler(message: dict, context: dict = None):
    # Process message
    pass
```
```

### Dependency Injection

Use `SqsDepends` for dependency injection:

```python
class MyService:
    def process(self, data): ...

@app.handler("topic")
async def handler(
    message: dict,
    service: MyService = SqsDepends(MyService)
):
    service.process(message)
```

### Interceptors

Create custom interceptors for cross-cutting concerns:

```python
from finalsa.sqs.consumer import AsyncConsumerInterceptor

class LoggingInterceptor(AsyncConsumerInterceptor):
    async def before_consume(self, topic: str, message: dict):
        print(f"Processing {topic}: {message}")
    
    async def after_consume(self, topic: str, result):
        print(f"Completed {topic}")

app = SqsApp(interceptors=[LoggingInterceptor])
```

## Message Timeout Configuration

Configure timeout limits for message processing to prevent workers from being blocked:

### Basic Timeout Configuration

```python
# Fast operations (API calls, simple DB operations)
fast_app = SqsApp(
    app_name="fast-processor",
    queue_url="...",
    workers=5,
    message_timeout=30.0  # 30 seconds
)

# Data processing operations
data_app = SqsApp(
    app_name="data-processor", 
    queue_url="...",
    workers=3,
    message_timeout=300.0  # 5 minutes (default)
)

# Heavy computation operations
heavy_app = SqsApp(
    app_name="heavy-processor",
    queue_url="...", 
    workers=2,
    message_timeout=1800.0  # 30 minutes
)
```

### Timeout Behavior

When a message handler exceeds the timeout:
- The handler execution is cancelled
- An error is logged with timeout details
- The message is **not** deleted from SQS (remains available for retry)
- The worker becomes available for new messages immediately

### Timeout Guidelines

- **Fast operations (5-60 seconds)**: API calls, simple DB operations, cache updates
- **Medium operations (1-10 minutes)**: File processing, image processing, data aggregation
- **Heavy operations (10-60 minutes)**: Large file processing, ML inference, complex data analysis

### Example Handler with Timeout

```python
# This handler has a 2-minute timeout
app = SqsApp(message_timeout=120.0)

@app.handler("data.process")
async def process_data(message: dict):
    # This operation must complete within 2 minutes
    # or it will be cancelled and logged as timeout
    await heavy_data_processing(message)
```

## Testing

Use `SqsAppTest` for testing message handlers:

```python
from finalsa.sqs.consumer import SqsAppTest

def test_user_handler():
    test_app = SqsAppTest(app)
    
    # Test handler
    result = test_app.test_handler(
        "user.created",
        {"user_id": 123, "name": "John"}
    )
    
    assert result is not None
```

## Error Handling

The library provides specific exceptions:

- `TopicNotFoundException`: Handler not found for topic
- `InvalidMessageException`: Message format validation failed
- `TopicAlreadyRegisteredException`: Duplicate topic registration

## Configuration

### Environment Variables

- `AWS_REGION`: AWS region for SQS
- `AWS_ACCESS_KEY_ID`: AWS access key
- `AWS_SECRET_ACCESS_KEY`: AWS secret key

### Message Format

Expected SQS message format:

```json
{
  "topic": "user.created",
  "data": {
    "user_id": 123,
    "name": "John Doe"
  },
  "metadata": {
    "correlation_id": "uuid",
    "timestamp": "2024-01-01T00:00:00Z"
  }
}
```

## Advanced Usage

### Custom Signal Handling

```python
from finalsa.sqs.consumer import SignalHandler

signal_handler = SignalHandler(logger)
# Automatic graceful shutdown on SIGTERM/SIGINT
```

### Concurrent Processing

Configure workers for high-throughput message processing:

```python
# High throughput configuration
app = SqsApp(
    app_name="high-throughput-service",
    queue_url="...",
    max_number_of_messages=10,  # Receive multiple messages per batch
    workers=16                  # 16 concurrent workers
)

@app.handler("bulk.process")
async def process_bulk_data(message: dict):
    # Each message processed by available worker
    await process_large_dataset(message)
```

### Multiple Workers

```python
app = SqsApp(workers=10)  # Process messages with 10 concurrent workers
```

**Benefits of Worker-based Processing:**
- **Concurrent Execution**: Multiple messages processed simultaneously
- **Fault Isolation**: Worker failures don't affect other workers
- **Load Balancing**: Messages automatically distributed to available workers
- **Graceful Shutdown**: All workers stop cleanly on termination signals
- **Better Throughput**: Ideal for I/O-bound operations like database calls

### Batch Processing

```python
app = SqsApp(max_number_of_messages=10)  # Receive up to 10 messages per batch
```

## Development

### Running Tests

```bash
pytest
```

### Linting

```bash
ruff check .
```

### Coverage

```bash
coverage run -m pytest
coverage report
```

## License

MIT License - see LICENSE.md for details.

## Contributing

1. Fork the repository
2. Create a feature branch
3. Make your changes
4. Add tests
5. Run linting and tests
6. Submit a pull request

## Requirements

- Python 3.10+
- AWS credentials configured
- SQS queue access

## Related Packages

- `finalsa-common-models`: Shared data models
- `finalsa-sqs-client`: SQS client implementation
- `finalsa-sns-client`: SNS client for notifications
- `finalsa-dependency-injector`: Dependency injection framework

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "finalsa-sqs-consumer",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.10",
    "maintainer_email": null,
    "keywords": "async, aws, concurrency, consumer, dependency-injection, fastapi, finalsa, interceptors, message-queue, sqs, timeout, worker",
    "author": null,
    "author_email": "Luis Jimenez <luis@finalsa.com>",
    "download_url": "https://files.pythonhosted.org/packages/3b/a6/917a9563dfd019e5c66da0f73087f43c431c9e6d6155d75f3ab410324b8d/finalsa_sqs_consumer-3.0.0.tar.gz",
    "platform": null,
    "description": "# Finalsa SQS Consumer\n\nA Python package for creating SQS message consumers in FastAPI applications with built-in dependency injection, interceptors, and async support.\n\n## Features\n\n- **SQS Message Consumption**: Simple, decorator-based SQS message handling\n- **Worker-based Concurrency**: Concurrent message processing with configurable worker pools (like uvicorn)\n- **Dependency Injection**: Built-in dependency injection system with `SqsDepends`\n- **Async Support**: Full async/await support for message processing\n- **Interceptors**: Pre/post-processing hooks for message handling\n- **Signal Handling**: Graceful shutdown handling\n- **Testing Support**: Built-in testing utilities\n- **Type Safety**: Full type hints and validation\n\n## Installation\n\n```bash\npip install finalsa-sqs-consumer\n```\n\n## Quick Start\n\n```python\nfrom finalsa.sqs.consumer import SqsApp, SqsDepends\n\n# Create app instance with worker-based concurrency\napp = SqsApp(\n    app_name=\"my-consumer\",\n    queue_url=\"https://sqs.region.amazonaws.com/account/queue-name\",\n    max_number_of_messages=10,\n    workers=8  # 8 concurrent workers for high throughput\n)\n\n# Define a simple handler\n@app.handler(\"user.created\")\nasync def handle_user_created(message: dict):\n    print(f\"User created: {message}\")\n\n# Define handler with dependencies\n@app.handler(\"order.created\")\nasync def handle_order_created(\n    message: dict,\n    db_service: DatabaseService = SqsDepends(DatabaseService)\n):\n    await db_service.process_order(message)\n\n# Run the consumer with concurrent workers\nif __name__ == \"__main__\":\n    app.run()  # Starts 8 worker processes\n```\n\n## Core Components\n\n### SqsApp\n\nMain application class that manages message consumption and routing.\n\n```python\napp = SqsApp(\n    app_name=\"my-app\",           # Application identifier\n    queue_url=\"...\",             # SQS queue URL\n    max_number_of_messages=10,   # Max messages per batch\n    workers=8,                   # Number of concurrent workers (like uvicorn)\n    message_timeout=300.0,       # Message processing timeout in seconds (default: 5 minutes)\n    interceptors=[]              # List of interceptor classes\n)\n```\n\n**Worker-based Processing:**\n- Messages are distributed to a pool of concurrent workers\n- Each worker processes messages independently\n- Similar to uvicorn's worker model for high throughput\n- Automatic load balancing across workers\n- Graceful shutdown of all workers\n\n**Message Timeout:**\n- Configurable timeout for processing individual messages\n- Prevents workers from being blocked by long-running handlers\n- Timed-out messages are logged and returned to queue for retry\n- Default timeout: 300 seconds (5 minutes)\n\n### Message Handlers\n\nRegister handlers for specific message topics:\n\n```python\n@app.handler(\"topic.name\")\nasync def my_handler(message: dict, context: dict = None):\n    # Process message\n    pass\n```\n```\n\n### Dependency Injection\n\nUse `SqsDepends` for dependency injection:\n\n```python\nclass MyService:\n    def process(self, data): ...\n\n@app.handler(\"topic\")\nasync def handler(\n    message: dict,\n    service: MyService = SqsDepends(MyService)\n):\n    service.process(message)\n```\n\n### Interceptors\n\nCreate custom interceptors for cross-cutting concerns:\n\n```python\nfrom finalsa.sqs.consumer import AsyncConsumerInterceptor\n\nclass LoggingInterceptor(AsyncConsumerInterceptor):\n    async def before_consume(self, topic: str, message: dict):\n        print(f\"Processing {topic}: {message}\")\n    \n    async def after_consume(self, topic: str, result):\n        print(f\"Completed {topic}\")\n\napp = SqsApp(interceptors=[LoggingInterceptor])\n```\n\n## Message Timeout Configuration\n\nConfigure timeout limits for message processing to prevent workers from being blocked:\n\n### Basic Timeout Configuration\n\n```python\n# Fast operations (API calls, simple DB operations)\nfast_app = SqsApp(\n    app_name=\"fast-processor\",\n    queue_url=\"...\",\n    workers=5,\n    message_timeout=30.0  # 30 seconds\n)\n\n# Data processing operations\ndata_app = SqsApp(\n    app_name=\"data-processor\", \n    queue_url=\"...\",\n    workers=3,\n    message_timeout=300.0  # 5 minutes (default)\n)\n\n# Heavy computation operations\nheavy_app = SqsApp(\n    app_name=\"heavy-processor\",\n    queue_url=\"...\", \n    workers=2,\n    message_timeout=1800.0  # 30 minutes\n)\n```\n\n### Timeout Behavior\n\nWhen a message handler exceeds the timeout:\n- The handler execution is cancelled\n- An error is logged with timeout details\n- The message is **not** deleted from SQS (remains available for retry)\n- The worker becomes available for new messages immediately\n\n### Timeout Guidelines\n\n- **Fast operations (5-60 seconds)**: API calls, simple DB operations, cache updates\n- **Medium operations (1-10 minutes)**: File processing, image processing, data aggregation\n- **Heavy operations (10-60 minutes)**: Large file processing, ML inference, complex data analysis\n\n### Example Handler with Timeout\n\n```python\n# This handler has a 2-minute timeout\napp = SqsApp(message_timeout=120.0)\n\n@app.handler(\"data.process\")\nasync def process_data(message: dict):\n    # This operation must complete within 2 minutes\n    # or it will be cancelled and logged as timeout\n    await heavy_data_processing(message)\n```\n\n## Testing\n\nUse `SqsAppTest` for testing message handlers:\n\n```python\nfrom finalsa.sqs.consumer import SqsAppTest\n\ndef test_user_handler():\n    test_app = SqsAppTest(app)\n    \n    # Test handler\n    result = test_app.test_handler(\n        \"user.created\",\n        {\"user_id\": 123, \"name\": \"John\"}\n    )\n    \n    assert result is not None\n```\n\n## Error Handling\n\nThe library provides specific exceptions:\n\n- `TopicNotFoundException`: Handler not found for topic\n- `InvalidMessageException`: Message format validation failed\n- `TopicAlreadyRegisteredException`: Duplicate topic registration\n\n## Configuration\n\n### Environment Variables\n\n- `AWS_REGION`: AWS region for SQS\n- `AWS_ACCESS_KEY_ID`: AWS access key\n- `AWS_SECRET_ACCESS_KEY`: AWS secret key\n\n### Message Format\n\nExpected SQS message format:\n\n```json\n{\n  \"topic\": \"user.created\",\n  \"data\": {\n    \"user_id\": 123,\n    \"name\": \"John Doe\"\n  },\n  \"metadata\": {\n    \"correlation_id\": \"uuid\",\n    \"timestamp\": \"2024-01-01T00:00:00Z\"\n  }\n}\n```\n\n## Advanced Usage\n\n### Custom Signal Handling\n\n```python\nfrom finalsa.sqs.consumer import SignalHandler\n\nsignal_handler = SignalHandler(logger)\n# Automatic graceful shutdown on SIGTERM/SIGINT\n```\n\n### Concurrent Processing\n\nConfigure workers for high-throughput message processing:\n\n```python\n# High throughput configuration\napp = SqsApp(\n    app_name=\"high-throughput-service\",\n    queue_url=\"...\",\n    max_number_of_messages=10,  # Receive multiple messages per batch\n    workers=16                  # 16 concurrent workers\n)\n\n@app.handler(\"bulk.process\")\nasync def process_bulk_data(message: dict):\n    # Each message processed by available worker\n    await process_large_dataset(message)\n```\n\n### Multiple Workers\n\n```python\napp = SqsApp(workers=10)  # Process messages with 10 concurrent workers\n```\n\n**Benefits of Worker-based Processing:**\n- **Concurrent Execution**: Multiple messages processed simultaneously\n- **Fault Isolation**: Worker failures don't affect other workers\n- **Load Balancing**: Messages automatically distributed to available workers\n- **Graceful Shutdown**: All workers stop cleanly on termination signals\n- **Better Throughput**: Ideal for I/O-bound operations like database calls\n\n### Batch Processing\n\n```python\napp = SqsApp(max_number_of_messages=10)  # Receive up to 10 messages per batch\n```\n\n## Development\n\n### Running Tests\n\n```bash\npytest\n```\n\n### Linting\n\n```bash\nruff check .\n```\n\n### Coverage\n\n```bash\ncoverage run -m pytest\ncoverage report\n```\n\n## License\n\nMIT License - see LICENSE.md for details.\n\n## Contributing\n\n1. Fork the repository\n2. Create a feature branch\n3. Make your changes\n4. Add tests\n5. Run linting and tests\n6. Submit a pull request\n\n## Requirements\n\n- Python 3.10+\n- AWS credentials configured\n- SQS queue access\n\n## Related Packages\n\n- `finalsa-common-models`: Shared data models\n- `finalsa-sqs-client`: SQS client implementation\n- `finalsa-sns-client`: SNS client for notifications\n- `finalsa-dependency-injector`: Dependency injection framework\n",
    "bugtrack_url": null,
    "license": "MIT License  Copyright (c) 2025 Luis Diego Jim\u00e9nez Delgado  Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the \"Software\"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:  The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.  THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.",
    "summary": "High-performance SQS message consumer with worker-based concurrency, dependency injection, and async support for Python applications",
    "version": "3.0.0",
    "project_urls": {
        "Homepage": "https://github.com/finalsa/finalsa-sqs-consumer"
    },
    "split_keywords": [
        "async",
        " aws",
        " concurrency",
        " consumer",
        " dependency-injection",
        " fastapi",
        " finalsa",
        " interceptors",
        " message-queue",
        " sqs",
        " timeout",
        " worker"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "e8b979d1468492a77c1d1f4f462762f86673d5c2709483a974f0f7aed670cda5",
                "md5": "b1f26364331d7a303327cee1fc522ced",
                "sha256": "861ff3e194114466a27ba5cde03a3f1d31e312ccc800cda29f5fd887db94111c"
            },
            "downloads": -1,
            "filename": "finalsa_sqs_consumer-3.0.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "b1f26364331d7a303327cee1fc522ced",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.10",
            "size": 20442,
            "upload_time": "2025-07-10T02:25:15",
            "upload_time_iso_8601": "2025-07-10T02:25:15.218333Z",
            "url": "https://files.pythonhosted.org/packages/e8/b9/79d1468492a77c1d1f4f462762f86673d5c2709483a974f0f7aed670cda5/finalsa_sqs_consumer-3.0.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "3ba6917a9563dfd019e5c66da0f73087f43c431c9e6d6155d75f3ab410324b8d",
                "md5": "5a0a9be1ad6ffebe6d4043d041ca5e83",
                "sha256": "8b8aaac197f39a4db9b045fe658e3576f02e2e810d8dc42b096a2833be5aa455"
            },
            "downloads": -1,
            "filename": "finalsa_sqs_consumer-3.0.0.tar.gz",
            "has_sig": false,
            "md5_digest": "5a0a9be1ad6ffebe6d4043d041ca5e83",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.10",
            "size": 13887,
            "upload_time": "2025-07-10T02:25:16",
            "upload_time_iso_8601": "2025-07-10T02:25:16.171243Z",
            "url": "https://files.pythonhosted.org/packages/3b/a6/917a9563dfd019e5c66da0f73087f43c431c9e6d6155d75f3ab410324b8d/finalsa_sqs_consumer-3.0.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-07-10 02:25:16",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "finalsa",
    "github_project": "finalsa-sqs-consumer",
    "github_not_found": true,
    "lcname": "finalsa-sqs-consumer"
}
        
Elapsed time: 0.43214s