operetta


Nameoperetta JSON
Version 0.0.9 PyPI version JSON
download
home_pageNone
SummaryNone
upload_time2025-10-10 19:51:40
maintainerNone
docs_urlNone
authorAlexander Tikhonov
requires_python<4.0,>=3.11
licenseApache-2.0
keywords application framework ddd domain-driven design asyncio microservices microservice dependency-injection dependency injection di web api rest aiohttp http openapi openapi3 swagger swagger-ui redoc postgres postgresql database asyncpg hasql aiomisc
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            <div align="center">

<img alt="logo" width="175" src="https://raw.githubusercontent.com/Fatal1ty/operetta/7e3e80a54dfc91673dacb36a64c1379aae36a042/img/logo.png">

###### Design Python services right

[![Build Status](https://github.com/Fatal1ty/operetta/workflows/tests/badge.svg)](https://github.com/Fatal1ty/operetta/actions)
[![Latest Version](https://img.shields.io/pypi/v/operetta.svg)](https://pypi.python.org/pypi/operetta)
[![Python Version](https://img.shields.io/pypi/pyversions/operetta.svg)](https://pypi.python.org/pypi/operetta)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
</div>

# Operetta

A lightweight framework for building Python applications that is not tied to a specific transport protocol. It is built on top of [aiomisc](https://github.com/aiokitchen/aiomisc) (service lifecycle, entrypoint) and [dishka](https://github.com/reagento/dishka) (dependency injection). On top of that, the following integrations are available:

- [AIOHTTP](https://github.com/aio-libs/aiohttp): declarative handlers with DI for request body, query, and path params; automatic OpenAPI generation with [Swagger](https://github.com/swagger-api/swagger-ui) and [Redoc](https://github.com/Redocly/redoc).
- PostgreSQL via [asyncpg](https://github.com/MagicStack/asyncpg): a database adapter and DI provider for a connection pool.
- PostgreSQL with HA via [hasql](https://github.com/aiokitchen/hasql): a pool with balancing, failover and the same adapter layer.


## Table of contents

- [Highlights](#highlights)
- [Installation](#installation)
- [Quickstart (HTTP API)](#quickstart-http-api)
  - [How it works under the hood](#how-it-works-under-the-hood)
- [Quickstart (non-HTTP app)](#quickstart-non-http-app)
- [Services and DI](#services-and-di)
- [AIOHTTP](#aiohttp)
  - [Configuration](#configuration)
  - [Error handling and response format](#error-handling-and-response-format)
- [PostgreSQL](#postgresql)
  - [Single-node PostgreSQL (asyncpg)](#single-node-postgresql-asyncpg)
  - [High-availability PostgreSQL cluster (hasql)](#high-availability-postgresql-cluster-hasql)
  - [Advanced setup](#advanced-setup)

## Highlights

- Services as units of functionality: each service starts/stops via [aiomisc](https://github.com/aiokitchen/aiomisc) and may provide DI providers.
- Single DI container ([dishka](https://github.com/reagento/dishka)) for the whole app; separate [scopes](https://dishka.readthedocs.io/en/stable/advanced/scopes.html) for `APP` and `REQUEST`.
- [AIOHTTP](https://github.com/aio-libs/aiohttp) integration:
  - Handler parameter annotations: `FromBody[T]`, `FromQuery[T]`, `FromPath[T]`.
  - Automatic parsing and validation via [mashumaro](https://github.com/Fatal1ty/mashumaro); friendly error details.
  - Unified JSON envelope for responses.
  - OpenAPI generation with static assets for Swagger/Redoc.
- PostgreSQL integrations ([asyncpg](https://github.com/MagicStack/asyncpg)/[hasql](https://github.com/aiokitchen/hasql)): interface adapter `PostgresDatabaseAdapter` + transactional `PostgresTransactionDatabaseAdapter` for repositories and units of work.

## Installation

Requires Python 3.11+.

- Base:

```bash
pip install operetta
```

- With AIOHTTP and OpenAPI:

```bash
pip install 'operetta[aiohttp]'
```

- With PostgreSQL via asyncpg:

```bash
pip install 'operetta[asyncpg]'
```

- With PostgreSQL HA via hasql:

```bash
pip install 'operetta[hasql]'
```

## Quickstart (HTTP API)

A minimal AIOHTTP app with DI and autogenerated OpenAPI. You are free to organize your project structure and files as you prefer.

```python
from dataclasses import dataclass, asdict
from aiohttp import web
from operetta.app import Application
from operetta.integrations.aiohttp.annotations import (
    FromBody,
    FromPath,
    FromQuery,
)
from operetta.integrations.aiohttp.response import success_response
from operetta.integrations.aiohttp.service import AIOHTTPService


@dataclass
class CreateUserBody:
    name: str
    email: str


@dataclass
class UserDto:
    id: int
    name: str
    email: str


async def create_user(
    _: web.Request, body: FromBody[CreateUserBody]
) -> web.StreamResponse:
    # ... create a user ...
    user = UserDto(id=1, name=body.name, email=body.email)
    return success_response(asdict(user))


async def get_user(
    _: web.Request,
    user_id: FromPath[int],
    detailed: FromQuery[bool] = False,
) -> UserDto:
    # ... load a user ...
    user = UserDto(id=user_id, name="Alice", email="alice@example.com")
    return user


routes = [
    web.post("/users", create_user),
    web.get("/users/{user_id}", get_user),
]

app = Application(
    AIOHTTPService(
        address="127.0.0.1",
        port=8080,
        routes=routes,
        docs_title="Demo API",
        docs_servers=("http://127.0.0.1:8080",),
        docs_default_type="swagger",  # or "redoc"
    ),
    di_providers=[],  # your dishka providers if needed
    warmup_dependencies=True,
)

if __name__ == "__main__":
    app.run()
```

Short example: raising DDD errors in handlers

```python
from operetta.ddd import NotFoundError, AuthorizationError

async def get_user(_: web.Request, user_id: FromPath[int]) -> User:
    # Example auth check
    if not has_access_to_user(user_id):
        raise AuthorizationError(details=[{"permission": "users:read"}])

    user = await repo.get_user(user_id)
    if user is None:
        raise NotFoundError(details=[{"id": user_id}])

    return user
```

Open the docs at:

- OpenAPI spec: `/static/openapi/openapi.yaml` (static files path is configurable).
- Swagger UI: `/docs/swagger` (and redirect from `/docs`).
- Redoc: `/docs/redoc`.

### How it works under the hood

- [`AIOHTTPService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py) at app creation time:
  - Wraps your routes by inspecting handler signatures and [`FromBody`/`FromQuery`/`FromPath`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/annotations.py) annotations.
  - Injects parsed values into the handler call.
  - If the return type is not a `StreamResponse`, serializes result into [`SuccessResponse[T]`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/response.py) and returns JSON ([format details](#error-handling-and-response-format)).
  - Builds the OpenAPI spec via [openapify](https://github.com/Fatal1ty/openapify) and serves it as static.
  - Attaches system middleware: DDD error mapping to HTTP and a global unhandled error catcher.
- DI is configured via [dishka integration with AIOHTTP](https://dishka.readthedocs.io/en/stable/integrations/aiohttp.html); the container is created by [`DIService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/di.py) and wired into the app.
  - Each request gets a new DI scope (`REQUEST`) for per-request dependencies.
  - Handler parameters may be any DI-resolvable type (e.g., services, database adapters) in addition to `FromBody/FromQuery/FromPath` via `FromDishka`.

## Quickstart (non-HTTP app)

Operetta is not tied to HTTP. You can write background services/workers on [aiomisc](https://github.com/aiokitchen/aiomisc) and use DI:

```python
import asyncio
import contextlib
from operetta.app import Application
from operetta.service.base import Service

class Worker(Service):
    async def start(self):
        # example: a periodic task
        self._task = asyncio.create_task(self._job())

    async def stop(self, exception: Exception | None = None):
        self._task.cancel()
        with contextlib.suppress(Exception):
            await self._task

    async def _job(self):
        while True:
            # get dependencies if needed:
            # db = await self.get_dependency(PostgresDatabaseAdapter)
            await asyncio.sleep(1)

app = Application(Worker(), warmup_dependencies=True)
app.run()
```

## Services and DI

- Base service class: [`operetta.service.base.Service`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/base.py) (inherits `aiomisc.Service`).
- DI container: created inside `DIService` (see [`operetta/service/di.py`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/di.py)).
  - Providers are collected from:
    - the `Application` itself (argument `di_providers`),
    - application services implementing `get_di_providers()`.
  - Supports dependency warmup (`warmup=True`) for APP/REQUEST factories.
- Retrieve a dependency from a service via `await service.get_dependency(Type)`.

To load config from YAML, use [`YAMLConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/configuration.py):

```python
from operetta import Application
from operetta.service.configuration import YAMLConfigurationService

config_service = YAMLConfigurationService()  # reads --config path from CLI
app = Application(config_service)
```

Two values are provided to DI: [`ApplicationDictConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/types.py) (raw dict) and a config object (if you provide `config_cls`/`config_factory`).

Custom config class (mashumaro DataClassDictMixin):

```python
from dataclasses import dataclass
from mashumaro import DataClassDictMixin
from operetta import Application
from operetta.service.configuration import YAMLConfigurationService

# Define your typed config mapped to YAML structure
@dataclass
class AppConfig(DataClassDictMixin):
    # You can use nested dataclasses as well; here kept minimal
    creds: dict[str, str] | None = None

# Build service that parses YAML into AppConfig using mashumaro
config_service = YAMLConfigurationService(
    config_cls=AppConfig,
    config_factory=AppConfig.from_dict,
)

# Both ApplicationDictConfig (raw dict) and AppConfig are available in DI
app = Application(config_service)
```

## AIOHTTP

A first-class integration for building HTTP APIs with declarative handler parameters, DI, and autogenerated OpenAPI/Swagger/Redoc.

- Handler parameter annotations: [`FromBody[T]`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/annotations.py), [`FromQuery[T]`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/annotations.py), [`FromPath[T]`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/annotations.py) (plus DI via `FromDishka`).
- Unified JSON responses out of the box.
- Automatic OpenAPI spec generation and static docs at `/docs` (Swagger or Redoc).

### Configuration

You can configure [`AIOHTTPService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py) in three complementary ways:

- Via constructor (`__init__`) arguments — explicit values have the highest priority.
- Via YAML file ([`YAMLConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/configuration.py) + [`AIOHTTPConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py)/[`AIOHTTPServiceConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/providers.py)) — good for ops-driven setups; overrides defaults but not explicit `__init__` values.
- Via custom DI providers — e.g., environment variables or secrets managers.

Precedence rule:
- `__init__` → DI ([`AIOHTTPServiceConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/providers.py)) → internal defaults

> [!TIP]
> - [`AIOHTTPConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py) is a helper that installs [`AIOHTTPServiceConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/providers.py) into DI.
> - This provider reads `ApplicationDictConfig['api']` and decodes it into [`AIOHTTPServiceConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/config.py).
> - YAML is not required. You can provide [`AIOHTTPServiceConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/config.py) via any DI provider.

YAML keys (all optional) live under the `api:` section:

```yaml
api:
  address: 0.0.0.0         # bind address
  port: 8081               # listen port
  static_endpoint_prefix: /static/
  static_files_root: ./var/static  # where to serve static files and openapi spec
  docs_default_path: /docs
  docs_swagger_path: /docs/swagger
  docs_redoc_path: /docs/redoc
  docs_title: Demo API
  docs_servers:
    - http://127.0.0.1:8081
  docs_default_type: swagger  # swagger | redoc | null (no redirect from /docs)
  docs_remove_path_prefix: /v1/
  # Optional OpenAPI cosmetics
  docs_tag_descriptions:
    users: Operations with users
  docs_tag_groups:
    Management:
      - users
```

Wire it up:

```python
from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.aiohttp import (
    AIOHTTPService,
    AIOHTTPConfigurationService,
)

app = Application(
    YAMLConfigurationService(),            # loads --config path and exposes dict to DI
    AIOHTTPConfigurationService(),         # registers AIOHTTPServiceConfigProvider reading the api: section
    AIOHTTPService(
        routes=[],
        # You may still override settings here (constructor wins over YAML):
        # port=9090,
        # docs_default_type="redoc",
    ),
)
```

Custom config provider example (env-vars):

```python
import os
from dishka import Provider, Scope, provide
from operetta import Application
from operetta.integrations.aiohttp.config import AIOHTTPServiceConfig
from operetta.integrations.aiohttp import AIOHTTPService

class EnvAiohttpConfigProvider(Provider):
    scope = Scope.APP

    @provide
    def get_config(self) -> AIOHTTPServiceConfig:
        return AIOHTTPServiceConfig(
            address=os.getenv("HTTP_ADDRESS", "0.0.0.0"),
            port=int(os.getenv("HTTP_PORT", "8080")),
        )

app = Application(
    AIOHTTPService(routes=[]),
    di_providers=[EnvAiohttpConfigProvider()],
)
```

Under the hood [`AIOHTTPService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py) tries to resolve [`AIOHTTPServiceConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/config.py) from DI on start; if available, it merges values with the precedence above and continues startup as usual.

### Error handling and response format

- Successful responses are automatically wrapped into `{ "success": true, "data": ..., "error": null }`.
- Errors use `{ "success": false, "data": null, "error": { message, code, details } }`.
- Standard AIOHTTP errors and domain/application/infrastructure errors (see [`operetta.ddd.errors`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/errors.py)) are mapped by middleware from [`integrations/aiohttp/middlewares.py`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/middlewares.py).
- Parsing errors for body/params use types from [`integrations/aiohttp/errors.py`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/errors.py) (e.g., `InvalidJSONBodyError`, `InvalidQueryParamsError`, `InvalidPathParamsError`, ...).

Recommended way to raise errors in your app

- Import DDD exceptions from a single place:

  ```python
  from operetta.ddd import (
      NotFoundError,
      AlreadyExistsError,
      ConflictError,
      ValidationError,
      AuthenticationError,
      AuthorizationError,
      RelatedResourceNotFoundError,
      DependencyUnavailableError,
  )
  ```

- Raise with optional structured details (a sequence of JSON-serializable objects):

  ```python
  raise NotFoundError(
      details=[{"resource": "User", "id": user_id}]
  )
  ```

HTTP mapping of DDD exceptions (handled by middleware)

| DDD exception                                                                                                     | HTTP status | HTTP error               | code                  |
|-------------------------------------------------------------------------------------------------------------------|-------------|--------------------------|-----------------------|
| AuthenticationError                                                                                               | 401         | UnauthorizedError        | UNAUTHORIZED          |
| AuthorizationError, PermissionDeniedError                                                                         | 403         | ForbiddenError           | FORBIDDEN             |
| NotFoundError                                                                                                     | 404         | ResourceNotFoundError    | RESOURCE_NOT_FOUND    |
| AlreadyExistsError                                                                                                | 409         | DuplicateRequestError    | DUPLICATE_RESOURCE    |
| ConflictError, InvalidOperationError                                                                              | 409         | ConflictError            | CONFLICT              |
| ValidationError, RelatedResourceNotFoundError                                                                     | 422         | UnprocessableEntityError | UNPROCESSABLE_ENTITY  |
| DeadlineExceededError                                                                                             | 504         | GatewayTimeoutError      | GATEWAY_TIMEOUT       |
| DependencyThrottledError, DependencyUnavailableError, SubsystemUnavailableError, SystemResourceLimitExceededError | 503         | ServiceUnavailableError  | SERVICE_UNAVAILABLE   |
| DependencyFailureError                                                                                            | 502         | BadGatewayError          | BAD_GATEWAY           |
| StorageIntegrityError, TransportIntegrityError, InfrastructureError (fallback)                                    | 500         | ServerError              | INTERNAL_SERVER_ERROR |

Response envelope reference

- Success:

  ```json
  { "success": true, "data": { "id": 1, "name": "Alice" }, "error": null }
  ```

- Error:

  ```json
  {
    "success": false,
    "data": null,
    "error": {
      "message": "Resource not found",
      "code": "RESOURCE_NOT_FOUND",
      "details": [ { "resource": "User", "id": 123 } ]
    }
  }
  ```

Advanced

- You can throw HTTP-specific errors directly if you need full control over the client response: see [`operetta.integrations.aiohttp.errors`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/errors.py) (e.g., `ForbiddenError`, `UnauthorizedError`, `UnprocessableEntityError`).
- Two middlewares are installed by default:
  - [`ddd_errors_middleware`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/middlewares.py) maps DDD exceptions to HTTP errors above.
  - [`unhandled_error_middleware`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/middlewares.py) catches all other exceptions and returns a generic 500 with a safe message.

## PostgreSQL

Operetta provides a thin, uniform abstraction over PostgreSQL so your application code does not depend on a particular driver or pool manager. You write repositories and units of work against two interfaces:

- [`PostgresDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) — a general-purpose adapter for any operations (fetch, fetch_one, execute, ...) without explicit transaction control.
- [`PostgresTransactionDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) — the same API for all operations plus transaction control methods (start/commit/rollback) when you need to run multiple steps in a single transaction.

There are two interchangeable backends:
- [asyncpg](https://github.com/MagicStack/asyncpg) — a straightforward single-pool setup.
- [hasql](https://github.com/aiokitchen/hasql) (asyncpg HA) — a high-availability pool manager with balancing/failover.

Both backends expose the same interfaces via DI, so switching is configuration-only. DI scopes are chosen to match typical usage:
- [`PostgresDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) is provided with scope=APP (shared pool).
- [`PostgresTransactionDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) is provided with scope=REQUEST (per-request/operation handle for transactional work).

Configuration is provided via DI:
- Connection config types: [`AsyncpgPostgresDatabaseConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py) (for asyncpg) and [`AsyncpgHAPostgresDatabaseConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/config.py) (for asyncpg HA).
- Pool factory kwargs type: [`AsyncpgPoolFactoryKwargs`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py) (to pass `init` or other pool options to the driver/manager).
- Built-in config providers — [`AsyncpgPostgresDatabaseConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/providers.py) and [`AsyncpgHAPostgresDatabaseConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/providers.py) — read settings from `ApplicationDictConfig['postgres']`, which is loaded by [`YAMLConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/configuration.py) from your YAML file.
- A built-in pool kwargs provider returns an empty [`AsyncpgPoolFactoryKwargs`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py) by default; you can override it to customize connection initialization (see [Advanced setup](#advanced-setup)).

Typical pattern:
- Use [`PostgresDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) when you don't need explicit transaction management: it's suitable for any reads and writes.
- When you need transactional boundaries, get [`PostgresTransactionDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py), call `start_transaction()`/`commit_transaction()` (or `rollback_transaction()` on error), and run your operations within that transaction.

Configuration can be loaded from YAML via [`YAMLConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/configuration.py) under the `postgres:` key. Optional connection initialization (e.g., custom codecs or `search_path`) can be provided through [`AsyncpgPoolFactoryKwargs`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py) in DI; this works for both `asyncpg` and `hasql` variants.

### Single-node PostgreSQL (asyncpg)

Provides:
- Providers: [`AsyncpgPostgresDatabaseProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/providers.py), [`AsyncpgPostgresDatabaseConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/providers.py).
- Convenience services to plug into `Application`:
  - [`AsyncpgPostgresDatabaseService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/service.py) — pool and adapters,
  - [`AsyncpgPostgresDatabaseConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/service.py) — loads config from `ApplicationDictConfig`.
- Adapters:
  - [`PostgresDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) with scope=APP — general-purpose adapter for any operations (fetch/fetch_one/execute, ...).
  - [`PostgresTransactionDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) with scope=REQUEST (handy for HTTP requests) — same API plus transaction control methods (start/commit/rollback).

YAML config example:

```yaml
postgres:
  user: app
  password: secret
  database: appdb
  host: 127.0.0.1:5432
  # optional pool params:
  min_size: 5
  max_size: 20
  max_queries: 50000
  max_inactive_connection_lifetime: 300
```

Plug into the app:

```python
from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.asyncpg.service import (
    AsyncpgPostgresDatabaseService,
    AsyncpgPostgresDatabaseConfigurationService,
)

app = Application(
    YAMLConfigurationService(),
    AsyncpgPostgresDatabaseConfigurationService(),
    AsyncpgPostgresDatabaseService(),
)
```

Use in a repository:

```python
from dataclasses import dataclass
from operetta.ddd.infrastructure.db.postgres.adapters.interface import (
    PostgresDatabaseAdapter,
    PostgresTransactionDatabaseAdapter,
)

@dataclass
class User:
    id: int
    name: str

class UserRepository:
    def __init__(self, db: PostgresDatabaseAdapter):
        self._db = db

    async def get_by_id(self, user_id: int) -> User | None:
        row = await self._db.fetch_one(
            "SELECT id, name FROM users WHERE id=$1", user_id
        )
        return User(id=row["id"], name=row["name"]) if row else None

class UnitOfWork:
    def __init__(self, tx: PostgresTransactionDatabaseAdapter):
        self._tx = tx

    async def __aenter__(self):
        await self._tx.start_transaction()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        if exc:
            await self._tx.rollback_transaction()
        else:
            await self._tx.commit_transaction()
```

### High-availability PostgreSQL cluster (hasql)

If you run an HA cluster (multiple nodes), use the hasql integration.

Provides:
- Providers: [`AsyncpgHAPostgresDatabaseProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/providers.py), [`AsyncpgHAPostgresDatabaseConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/providers.py).
- Convenience services to plug into `Application`:
  - [`AsyncpgHAPostgresDatabaseService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/service.py) — pool and adapters,
  - [`AsyncpgHAPostgresDatabaseConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/service.py) — loads config from `ApplicationDictConfig`.
- Adapters:
  - [`PostgresDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) with scope=APP — general-purpose adapter for any operations (fetch/fetch_one/execute, ...).
  - [`PostgresTransactionDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) with scope=REQUEST (handy for HTTP requests) — same API plus transaction control methods (start/commit/rollback).

YAML config example:

```yaml
postgres:
  user: app
  password: secret
  database: appdb
  hosts:
    - 10.0.0.1:5432
    - 10.0.0.2:5432
  min_masters: 1
  min_replicas: 1
  # optional:
  acquire_timeout: 5
  refresh_delay: 5
  refresh_timeout: 5
  fallback_master: false
  master_as_replica_weight: 1.0
  balancer_policy: greedy  # or round_robin / random_weighted
  stopwatch_window_size: 100
```

Plug into the app:

```python
from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.asyncpg_ha.service import (
    AsyncpgHAPostgresDatabaseService,
    AsyncpgHAPostgresDatabaseConfigurationService,
)

app = Application(
    YAMLConfigurationService(),
    AsyncpgHAPostgresDatabaseConfigurationService(),
    AsyncpgHAPostgresDatabaseService(),
)
```

> [!TIP]
> DI exposes the same adapter interfaces, so repository and unit of work code stays unchanged.

### Advanced setup

You can pass an `init` callable for connections (e.g., register codecs, set search_path) via DI. Below is an example provider from a real project that registers a custom JSONB codec for asyncpg HA (hasql):

```python
import json
from dishka import Provider, Scope, provide
from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.asyncpg.config import AsyncpgPoolFactoryKwargs
from operetta.integrations.asyncpg_ha.service import (
    AsyncpgHAPostgresDatabaseConfigurationService,
    AsyncpgHAPostgresDatabaseService,
)

class AsyncpgJSONCodecProvider(Provider):
    scope = Scope.APP

    @provide(override=True)
    def get_pool_factory_kwargs(self) -> AsyncpgPoolFactoryKwargs:
        async def set_custom_codecs(conn):
            await conn.set_type_codec(
                "jsonb",
                encoder=json.dumps,
                decoder=json.loads,
                schema="pg_catalog",
            )
        return AsyncpgPoolFactoryKwargs(init=set_custom_codecs)

app = Application(
    YAMLConfigurationService(),
    AsyncpgHAPostgresDatabaseConfigurationService(),
    AsyncpgHAPostgresDatabaseService(),
    di_providers=[AsyncpgJSONCodecProvider()],
)
```

> [!IMPORTANT]\
> If you use the built-in [`AsyncpgPostgresDatabaseConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/service.py) or
> [`AsyncpgHAPostgresDatabaseConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/service.py), they already register a
> default provider for [`AsyncpgPoolFactoryKwargs`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py). To customize pool options,
> declare your provider with `@provide(override=True)` so it overrides the
> built-in one; otherwise container validation will fail.\
> When you provide your own [`AsyncpgPoolFactoryKwargs`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py) and there is an existing
> default provider from those services, `override=True` is mandatory.

Define your own config providers (e.g., from environment variables) if you don't want to use YAML-based ones:

```python
import os
from dishka import Provider, Scope, provide
from operetta.app import Application
from operetta.integrations.asyncpg.config import (
    AsyncpgPostgresDatabaseConfig,
    AsyncpgPoolFactoryKwargs,
)
from operetta.integrations.asyncpg.service import AsyncpgPostgresDatabaseService

class EnvAsyncpgConfigProvider(Provider):
    scope = Scope.APP

    @provide
    def get_db_config(self) -> AsyncpgPostgresDatabaseConfig:
        return AsyncpgPostgresDatabaseConfig(
            user=os.getenv("PGUSER", "app"),
            database=os.getenv("PGDATABASE", "appdb"),
            host=os.getenv("PGHOST", "127.0.0.1:5432"),
            password=os.getenv("PGPASSWORD"),
        )

    @provide
    def get_pool_factory_kwargs(self) -> AsyncpgPoolFactoryKwargs:
        return {}

app = Application(
    AsyncpgPostgresDatabaseService(),
    di_providers=[EnvAsyncpgConfigProvider()],
)
```

Example of an environment-based HA config provider:

```python
import os
from dishka import Provider, Scope, provide
from operetta.app import Application
from operetta.integrations.asyncpg.config import AsyncpgPoolFactoryKwargs
from operetta.integrations.asyncpg_ha.config import AsyncpgHAPostgresDatabaseConfig
from operetta.integrations.asyncpg_ha.service import (
    AsyncpgHAPostgresDatabaseConfigurationService,
    AsyncpgHAPostgresDatabaseService,
)


class EnvHasqlConfigProvider(Provider):
    scope = Scope.APP

    @provide
    def get_db_config(self) -> AsyncpgHAPostgresDatabaseConfig:
        hosts = os.getenv("PGHOSTS", "10.0.0.1:5432,10.0.0.2:5432").split(",")
        return AsyncpgHAPostgresDatabaseConfig(
            user=os.getenv("PGUSER", "app"),
            database=os.getenv("PGDATABASE", "appdb"),
            hosts=[h.strip() for h in hosts if h.strip()],
            password=os.getenv("PGPASSWORD"),
            min_masters=int(os.getenv("PG_MIN_MASTERS", "1")),
            min_replicas=int(os.getenv("PG_MIN_REPLICAS", "1")),
        )

    @provide
    def get_pool_factory_kwargs(self) -> AsyncpgPoolFactoryKwargs:
        return {}


app = Application(
    AsyncpgHAPostgresDatabaseService(),
    di_providers=[EnvHasqlConfigProvider()],
)
```

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "operetta",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.11",
    "maintainer_email": null,
    "keywords": "application, framework, ddd, domain-driven design, asyncio, microservices, microservice, dependency-injection, dependency injection, di, web, api, rest, aiohttp, http, openapi, openapi3, swagger, swagger-ui, redoc, postgres, postgresql, database, asyncpg, hasql, aiomisc",
    "author": "Alexander Tikhonov",
    "author_email": "random.gauss@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/15/61/e5d6e300fa8d9fbe94d73fc2e6378deaf50950f367756cdb7da2de3c7522/operetta-0.0.9.tar.gz",
    "platform": null,
    "description": "<div align=\"center\">\n\n<img alt=\"logo\" width=\"175\" src=\"https://raw.githubusercontent.com/Fatal1ty/operetta/7e3e80a54dfc91673dacb36a64c1379aae36a042/img/logo.png\">\n\n###### Design Python services right\n\n[![Build Status](https://github.com/Fatal1ty/operetta/workflows/tests/badge.svg)](https://github.com/Fatal1ty/operetta/actions)\n[![Latest Version](https://img.shields.io/pypi/v/operetta.svg)](https://pypi.python.org/pypi/operetta)\n[![Python Version](https://img.shields.io/pypi/pyversions/operetta.svg)](https://pypi.python.org/pypi/operetta)\n[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)\n</div>\n\n# Operetta\n\nA lightweight framework for building Python applications that is not tied to a specific transport protocol. It is built on top of [aiomisc](https://github.com/aiokitchen/aiomisc) (service lifecycle, entrypoint) and [dishka](https://github.com/reagento/dishka) (dependency injection). On top of that, the following integrations are available:\n\n- [AIOHTTP](https://github.com/aio-libs/aiohttp): declarative handlers with DI for request body, query, and path params; automatic OpenAPI generation with [Swagger](https://github.com/swagger-api/swagger-ui) and [Redoc](https://github.com/Redocly/redoc).\n- PostgreSQL via [asyncpg](https://github.com/MagicStack/asyncpg): a database adapter and DI provider for a connection pool.\n- PostgreSQL with HA via [hasql](https://github.com/aiokitchen/hasql): a pool with balancing, failover and the same adapter layer.\n\n\n## Table of contents\n\n- [Highlights](#highlights)\n- [Installation](#installation)\n- [Quickstart (HTTP API)](#quickstart-http-api)\n  - [How it works under the hood](#how-it-works-under-the-hood)\n- [Quickstart (non-HTTP app)](#quickstart-non-http-app)\n- [Services and DI](#services-and-di)\n- [AIOHTTP](#aiohttp)\n  - [Configuration](#configuration)\n  - [Error handling and response format](#error-handling-and-response-format)\n- [PostgreSQL](#postgresql)\n  - [Single-node PostgreSQL (asyncpg)](#single-node-postgresql-asyncpg)\n  - [High-availability PostgreSQL cluster (hasql)](#high-availability-postgresql-cluster-hasql)\n  - [Advanced setup](#advanced-setup)\n\n## Highlights\n\n- Services as units of functionality: each service starts/stops via [aiomisc](https://github.com/aiokitchen/aiomisc) and may provide DI providers.\n- Single DI container ([dishka](https://github.com/reagento/dishka)) for the whole app; separate [scopes](https://dishka.readthedocs.io/en/stable/advanced/scopes.html) for `APP` and `REQUEST`.\n- [AIOHTTP](https://github.com/aio-libs/aiohttp) integration:\n  - Handler parameter annotations: `FromBody[T]`, `FromQuery[T]`, `FromPath[T]`.\n  - Automatic parsing and validation via [mashumaro](https://github.com/Fatal1ty/mashumaro); friendly error details.\n  - Unified JSON envelope for responses.\n  - OpenAPI generation with static assets for Swagger/Redoc.\n- PostgreSQL integrations ([asyncpg](https://github.com/MagicStack/asyncpg)/[hasql](https://github.com/aiokitchen/hasql)): interface adapter `PostgresDatabaseAdapter` + transactional `PostgresTransactionDatabaseAdapter` for repositories and units of work.\n\n## Installation\n\nRequires Python 3.11+.\n\n- Base:\n\n```bash\npip install operetta\n```\n\n- With AIOHTTP and OpenAPI:\n\n```bash\npip install 'operetta[aiohttp]'\n```\n\n- With PostgreSQL via asyncpg:\n\n```bash\npip install 'operetta[asyncpg]'\n```\n\n- With PostgreSQL HA via hasql:\n\n```bash\npip install 'operetta[hasql]'\n```\n\n## Quickstart (HTTP API)\n\nA minimal AIOHTTP app with DI and autogenerated OpenAPI. You are free to organize your project structure and files as you prefer.\n\n```python\nfrom dataclasses import dataclass, asdict\nfrom aiohttp import web\nfrom operetta.app import Application\nfrom operetta.integrations.aiohttp.annotations import (\n    FromBody,\n    FromPath,\n    FromQuery,\n)\nfrom operetta.integrations.aiohttp.response import success_response\nfrom operetta.integrations.aiohttp.service import AIOHTTPService\n\n\n@dataclass\nclass CreateUserBody:\n    name: str\n    email: str\n\n\n@dataclass\nclass UserDto:\n    id: int\n    name: str\n    email: str\n\n\nasync def create_user(\n    _: web.Request, body: FromBody[CreateUserBody]\n) -> web.StreamResponse:\n    # ... create a user ...\n    user = UserDto(id=1, name=body.name, email=body.email)\n    return success_response(asdict(user))\n\n\nasync def get_user(\n    _: web.Request,\n    user_id: FromPath[int],\n    detailed: FromQuery[bool] = False,\n) -> UserDto:\n    # ... load a user ...\n    user = UserDto(id=user_id, name=\"Alice\", email=\"alice@example.com\")\n    return user\n\n\nroutes = [\n    web.post(\"/users\", create_user),\n    web.get(\"/users/{user_id}\", get_user),\n]\n\napp = Application(\n    AIOHTTPService(\n        address=\"127.0.0.1\",\n        port=8080,\n        routes=routes,\n        docs_title=\"Demo API\",\n        docs_servers=(\"http://127.0.0.1:8080\",),\n        docs_default_type=\"swagger\",  # or \"redoc\"\n    ),\n    di_providers=[],  # your dishka providers if needed\n    warmup_dependencies=True,\n)\n\nif __name__ == \"__main__\":\n    app.run()\n```\n\nShort example: raising DDD errors in handlers\n\n```python\nfrom operetta.ddd import NotFoundError, AuthorizationError\n\nasync def get_user(_: web.Request, user_id: FromPath[int]) -> User:\n    # Example auth check\n    if not has_access_to_user(user_id):\n        raise AuthorizationError(details=[{\"permission\": \"users:read\"}])\n\n    user = await repo.get_user(user_id)\n    if user is None:\n        raise NotFoundError(details=[{\"id\": user_id}])\n\n    return user\n```\n\nOpen the docs at:\n\n- OpenAPI spec: `/static/openapi/openapi.yaml` (static files path is configurable).\n- Swagger UI: `/docs/swagger` (and redirect from `/docs`).\n- Redoc: `/docs/redoc`.\n\n### How it works under the hood\n\n- [`AIOHTTPService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py) at app creation time:\n  - Wraps your routes by inspecting handler signatures and [`FromBody`/`FromQuery`/`FromPath`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/annotations.py) annotations.\n  - Injects parsed values into the handler call.\n  - If the return type is not a `StreamResponse`, serializes result into [`SuccessResponse[T]`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/response.py) and returns JSON ([format details](#error-handling-and-response-format)).\n  - Builds the OpenAPI spec via [openapify](https://github.com/Fatal1ty/openapify) and serves it as static.\n  - Attaches system middleware: DDD error mapping to HTTP and a global unhandled error catcher.\n- DI is configured via [dishka integration with AIOHTTP](https://dishka.readthedocs.io/en/stable/integrations/aiohttp.html); the container is created by [`DIService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/di.py) and wired into the app.\n  - Each request gets a new DI scope (`REQUEST`) for per-request dependencies.\n  - Handler parameters may be any DI-resolvable type (e.g., services, database adapters) in addition to `FromBody/FromQuery/FromPath` via `FromDishka`.\n\n## Quickstart (non-HTTP app)\n\nOperetta is not tied to HTTP. You can write background services/workers on [aiomisc](https://github.com/aiokitchen/aiomisc) and use DI:\n\n```python\nimport asyncio\nimport contextlib\nfrom operetta.app import Application\nfrom operetta.service.base import Service\n\nclass Worker(Service):\n    async def start(self):\n        # example: a periodic task\n        self._task = asyncio.create_task(self._job())\n\n    async def stop(self, exception: Exception | None = None):\n        self._task.cancel()\n        with contextlib.suppress(Exception):\n            await self._task\n\n    async def _job(self):\n        while True:\n            # get dependencies if needed:\n            # db = await self.get_dependency(PostgresDatabaseAdapter)\n            await asyncio.sleep(1)\n\napp = Application(Worker(), warmup_dependencies=True)\napp.run()\n```\n\n## Services and DI\n\n- Base service class: [`operetta.service.base.Service`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/base.py) (inherits `aiomisc.Service`).\n- DI container: created inside `DIService` (see [`operetta/service/di.py`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/di.py)).\n  - Providers are collected from:\n    - the `Application` itself (argument `di_providers`),\n    - application services implementing `get_di_providers()`.\n  - Supports dependency warmup (`warmup=True`) for APP/REQUEST factories.\n- Retrieve a dependency from a service via `await service.get_dependency(Type)`.\n\nTo load config from YAML, use [`YAMLConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/configuration.py):\n\n```python\nfrom operetta import Application\nfrom operetta.service.configuration import YAMLConfigurationService\n\nconfig_service = YAMLConfigurationService()  # reads --config path from CLI\napp = Application(config_service)\n```\n\nTwo values are provided to DI: [`ApplicationDictConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/types.py) (raw dict) and a config object (if you provide `config_cls`/`config_factory`).\n\nCustom config class (mashumaro DataClassDictMixin):\n\n```python\nfrom dataclasses import dataclass\nfrom mashumaro import DataClassDictMixin\nfrom operetta import Application\nfrom operetta.service.configuration import YAMLConfigurationService\n\n# Define your typed config mapped to YAML structure\n@dataclass\nclass AppConfig(DataClassDictMixin):\n    # You can use nested dataclasses as well; here kept minimal\n    creds: dict[str, str] | None = None\n\n# Build service that parses YAML into AppConfig using mashumaro\nconfig_service = YAMLConfigurationService(\n    config_cls=AppConfig,\n    config_factory=AppConfig.from_dict,\n)\n\n# Both ApplicationDictConfig (raw dict) and AppConfig are available in DI\napp = Application(config_service)\n```\n\n## AIOHTTP\n\nA first-class integration for building HTTP APIs with declarative handler parameters, DI, and autogenerated OpenAPI/Swagger/Redoc.\n\n- Handler parameter annotations: [`FromBody[T]`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/annotations.py), [`FromQuery[T]`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/annotations.py), [`FromPath[T]`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/annotations.py) (plus DI via `FromDishka`).\n- Unified JSON responses out of the box.\n- Automatic OpenAPI spec generation and static docs at `/docs` (Swagger or Redoc).\n\n### Configuration\n\nYou can configure [`AIOHTTPService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py) in three complementary ways:\n\n- Via constructor (`__init__`) arguments \u2014 explicit values have the highest priority.\n- Via YAML file ([`YAMLConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/configuration.py) + [`AIOHTTPConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py)/[`AIOHTTPServiceConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/providers.py)) \u2014 good for ops-driven setups; overrides defaults but not explicit `__init__` values.\n- Via custom DI providers \u2014 e.g., environment variables or secrets managers.\n\nPrecedence rule:\n- `__init__` \u2192 DI ([`AIOHTTPServiceConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/providers.py)) \u2192 internal defaults\n\n> [!TIP]\n> - [`AIOHTTPConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py) is a helper that installs [`AIOHTTPServiceConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/providers.py) into DI.\n> - This provider reads `ApplicationDictConfig['api']` and decodes it into [`AIOHTTPServiceConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/config.py).\n> - YAML is not required. You can provide [`AIOHTTPServiceConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/config.py) via any DI provider.\n\nYAML keys (all optional) live under the `api:` section:\n\n```yaml\napi:\n  address: 0.0.0.0         # bind address\n  port: 8081               # listen port\n  static_endpoint_prefix: /static/\n  static_files_root: ./var/static  # where to serve static files and openapi spec\n  docs_default_path: /docs\n  docs_swagger_path: /docs/swagger\n  docs_redoc_path: /docs/redoc\n  docs_title: Demo API\n  docs_servers:\n    - http://127.0.0.1:8081\n  docs_default_type: swagger  # swagger | redoc | null (no redirect from /docs)\n  docs_remove_path_prefix: /v1/\n  # Optional OpenAPI cosmetics\n  docs_tag_descriptions:\n    users: Operations with users\n  docs_tag_groups:\n    Management:\n      - users\n```\n\nWire it up:\n\n```python\nfrom operetta.app import Application\nfrom operetta.service.configuration import YAMLConfigurationService\nfrom operetta.integrations.aiohttp import (\n    AIOHTTPService,\n    AIOHTTPConfigurationService,\n)\n\napp = Application(\n    YAMLConfigurationService(),            # loads --config path and exposes dict to DI\n    AIOHTTPConfigurationService(),         # registers AIOHTTPServiceConfigProvider reading the api: section\n    AIOHTTPService(\n        routes=[],\n        # You may still override settings here (constructor wins over YAML):\n        # port=9090,\n        # docs_default_type=\"redoc\",\n    ),\n)\n```\n\nCustom config provider example (env-vars):\n\n```python\nimport os\nfrom dishka import Provider, Scope, provide\nfrom operetta import Application\nfrom operetta.integrations.aiohttp.config import AIOHTTPServiceConfig\nfrom operetta.integrations.aiohttp import AIOHTTPService\n\nclass EnvAiohttpConfigProvider(Provider):\n    scope = Scope.APP\n\n    @provide\n    def get_config(self) -> AIOHTTPServiceConfig:\n        return AIOHTTPServiceConfig(\n            address=os.getenv(\"HTTP_ADDRESS\", \"0.0.0.0\"),\n            port=int(os.getenv(\"HTTP_PORT\", \"8080\")),\n        )\n\napp = Application(\n    AIOHTTPService(routes=[]),\n    di_providers=[EnvAiohttpConfigProvider()],\n)\n```\n\nUnder the hood [`AIOHTTPService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py) tries to resolve [`AIOHTTPServiceConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/config.py) from DI on start; if available, it merges values with the precedence above and continues startup as usual.\n\n### Error handling and response format\n\n- Successful responses are automatically wrapped into `{ \"success\": true, \"data\": ..., \"error\": null }`.\n- Errors use `{ \"success\": false, \"data\": null, \"error\": { message, code, details } }`.\n- Standard AIOHTTP errors and domain/application/infrastructure errors (see [`operetta.ddd.errors`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/errors.py)) are mapped by middleware from [`integrations/aiohttp/middlewares.py`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/middlewares.py).\n- Parsing errors for body/params use types from [`integrations/aiohttp/errors.py`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/errors.py) (e.g., `InvalidJSONBodyError`, `InvalidQueryParamsError`, `InvalidPathParamsError`, ...).\n\nRecommended way to raise errors in your app\n\n- Import DDD exceptions from a single place:\n\n  ```python\n  from operetta.ddd import (\n      NotFoundError,\n      AlreadyExistsError,\n      ConflictError,\n      ValidationError,\n      AuthenticationError,\n      AuthorizationError,\n      RelatedResourceNotFoundError,\n      DependencyUnavailableError,\n  )\n  ```\n\n- Raise with optional structured details (a sequence of JSON-serializable objects):\n\n  ```python\n  raise NotFoundError(\n      details=[{\"resource\": \"User\", \"id\": user_id}]\n  )\n  ```\n\nHTTP mapping of DDD exceptions (handled by middleware)\n\n| DDD exception                                                                                                     | HTTP status | HTTP error               | code                  |\n|-------------------------------------------------------------------------------------------------------------------|-------------|--------------------------|-----------------------|\n| AuthenticationError                                                                                               | 401         | UnauthorizedError        | UNAUTHORIZED          |\n| AuthorizationError, PermissionDeniedError                                                                         | 403         | ForbiddenError           | FORBIDDEN             |\n| NotFoundError                                                                                                     | 404         | ResourceNotFoundError    | RESOURCE_NOT_FOUND    |\n| AlreadyExistsError                                                                                                | 409         | DuplicateRequestError    | DUPLICATE_RESOURCE    |\n| ConflictError, InvalidOperationError                                                                              | 409         | ConflictError            | CONFLICT              |\n| ValidationError, RelatedResourceNotFoundError                                                                     | 422         | UnprocessableEntityError | UNPROCESSABLE_ENTITY  |\n| DeadlineExceededError                                                                                             | 504         | GatewayTimeoutError      | GATEWAY_TIMEOUT       |\n| DependencyThrottledError, DependencyUnavailableError, SubsystemUnavailableError, SystemResourceLimitExceededError | 503         | ServiceUnavailableError  | SERVICE_UNAVAILABLE   |\n| DependencyFailureError                                                                                            | 502         | BadGatewayError          | BAD_GATEWAY           |\n| StorageIntegrityError, TransportIntegrityError, InfrastructureError (fallback)                                    | 500         | ServerError              | INTERNAL_SERVER_ERROR |\n\nResponse envelope reference\n\n- Success:\n\n  ```json\n  { \"success\": true, \"data\": { \"id\": 1, \"name\": \"Alice\" }, \"error\": null }\n  ```\n\n- Error:\n\n  ```json\n  {\n    \"success\": false,\n    \"data\": null,\n    \"error\": {\n      \"message\": \"Resource not found\",\n      \"code\": \"RESOURCE_NOT_FOUND\",\n      \"details\": [ { \"resource\": \"User\", \"id\": 123 } ]\n    }\n  }\n  ```\n\nAdvanced\n\n- You can throw HTTP-specific errors directly if you need full control over the client response: see [`operetta.integrations.aiohttp.errors`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/errors.py) (e.g., `ForbiddenError`, `UnauthorizedError`, `UnprocessableEntityError`).\n- Two middlewares are installed by default:\n  - [`ddd_errors_middleware`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/middlewares.py) maps DDD exceptions to HTTP errors above.\n  - [`unhandled_error_middleware`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/middlewares.py) catches all other exceptions and returns a generic 500 with a safe message.\n\n## PostgreSQL\n\nOperetta provides a thin, uniform abstraction over PostgreSQL so your application code does not depend on a particular driver or pool manager. You write repositories and units of work against two interfaces:\n\n- [`PostgresDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) \u2014 a general-purpose adapter for any operations (fetch, fetch_one, execute, ...) without explicit transaction control.\n- [`PostgresTransactionDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) \u2014 the same API for all operations plus transaction control methods (start/commit/rollback) when you need to run multiple steps in a single transaction.\n\nThere are two interchangeable backends:\n- [asyncpg](https://github.com/MagicStack/asyncpg) \u2014 a straightforward single-pool setup.\n- [hasql](https://github.com/aiokitchen/hasql) (asyncpg HA) \u2014 a high-availability pool manager with balancing/failover.\n\nBoth backends expose the same interfaces via DI, so switching is configuration-only. DI scopes are chosen to match typical usage:\n- [`PostgresDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) is provided with scope=APP (shared pool).\n- [`PostgresTransactionDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) is provided with scope=REQUEST (per-request/operation handle for transactional work).\n\nConfiguration is provided via DI:\n- Connection config types: [`AsyncpgPostgresDatabaseConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py) (for asyncpg) and [`AsyncpgHAPostgresDatabaseConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/config.py) (for asyncpg HA).\n- Pool factory kwargs type: [`AsyncpgPoolFactoryKwargs`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py) (to pass `init` or other pool options to the driver/manager).\n- Built-in config providers \u2014 [`AsyncpgPostgresDatabaseConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/providers.py) and [`AsyncpgHAPostgresDatabaseConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/providers.py) \u2014 read settings from `ApplicationDictConfig['postgres']`, which is loaded by [`YAMLConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/configuration.py) from your YAML file.\n- A built-in pool kwargs provider returns an empty [`AsyncpgPoolFactoryKwargs`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py) by default; you can override it to customize connection initialization (see [Advanced setup](#advanced-setup)).\n\nTypical pattern:\n- Use [`PostgresDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) when you don't need explicit transaction management: it's suitable for any reads and writes.\n- When you need transactional boundaries, get [`PostgresTransactionDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py), call `start_transaction()`/`commit_transaction()` (or `rollback_transaction()` on error), and run your operations within that transaction.\n\nConfiguration can be loaded from YAML via [`YAMLConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/configuration.py) under the `postgres:` key. Optional connection initialization (e.g., custom codecs or `search_path`) can be provided through [`AsyncpgPoolFactoryKwargs`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py) in DI; this works for both `asyncpg` and `hasql` variants.\n\n### Single-node PostgreSQL (asyncpg)\n\nProvides:\n- Providers: [`AsyncpgPostgresDatabaseProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/providers.py), [`AsyncpgPostgresDatabaseConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/providers.py).\n- Convenience services to plug into `Application`:\n  - [`AsyncpgPostgresDatabaseService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/service.py) \u2014 pool and adapters,\n  - [`AsyncpgPostgresDatabaseConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/service.py) \u2014 loads config from `ApplicationDictConfig`.\n- Adapters:\n  - [`PostgresDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) with scope=APP \u2014 general-purpose adapter for any operations (fetch/fetch_one/execute, ...).\n  - [`PostgresTransactionDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) with scope=REQUEST (handy for HTTP requests) \u2014 same API plus transaction control methods (start/commit/rollback).\n\nYAML config example:\n\n```yaml\npostgres:\n  user: app\n  password: secret\n  database: appdb\n  host: 127.0.0.1:5432\n  # optional pool params:\n  min_size: 5\n  max_size: 20\n  max_queries: 50000\n  max_inactive_connection_lifetime: 300\n```\n\nPlug into the app:\n\n```python\nfrom operetta.app import Application\nfrom operetta.service.configuration import YAMLConfigurationService\nfrom operetta.integrations.asyncpg.service import (\n    AsyncpgPostgresDatabaseService,\n    AsyncpgPostgresDatabaseConfigurationService,\n)\n\napp = Application(\n    YAMLConfigurationService(),\n    AsyncpgPostgresDatabaseConfigurationService(),\n    AsyncpgPostgresDatabaseService(),\n)\n```\n\nUse in a repository:\n\n```python\nfrom dataclasses import dataclass\nfrom operetta.ddd.infrastructure.db.postgres.adapters.interface import (\n    PostgresDatabaseAdapter,\n    PostgresTransactionDatabaseAdapter,\n)\n\n@dataclass\nclass User:\n    id: int\n    name: str\n\nclass UserRepository:\n    def __init__(self, db: PostgresDatabaseAdapter):\n        self._db = db\n\n    async def get_by_id(self, user_id: int) -> User | None:\n        row = await self._db.fetch_one(\n            \"SELECT id, name FROM users WHERE id=$1\", user_id\n        )\n        return User(id=row[\"id\"], name=row[\"name\"]) if row else None\n\nclass UnitOfWork:\n    def __init__(self, tx: PostgresTransactionDatabaseAdapter):\n        self._tx = tx\n\n    async def __aenter__(self):\n        await self._tx.start_transaction()\n        return self\n\n    async def __aexit__(self, exc_type, exc, tb):\n        if exc:\n            await self._tx.rollback_transaction()\n        else:\n            await self._tx.commit_transaction()\n```\n\n### High-availability PostgreSQL cluster (hasql)\n\nIf you run an HA cluster (multiple nodes), use the hasql integration.\n\nProvides:\n- Providers: [`AsyncpgHAPostgresDatabaseProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/providers.py), [`AsyncpgHAPostgresDatabaseConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/providers.py).\n- Convenience services to plug into `Application`:\n  - [`AsyncpgHAPostgresDatabaseService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/service.py) \u2014 pool and adapters,\n  - [`AsyncpgHAPostgresDatabaseConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/service.py) \u2014 loads config from `ApplicationDictConfig`.\n- Adapters:\n  - [`PostgresDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) with scope=APP \u2014 general-purpose adapter for any operations (fetch/fetch_one/execute, ...).\n  - [`PostgresTransactionDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) with scope=REQUEST (handy for HTTP requests) \u2014 same API plus transaction control methods (start/commit/rollback).\n\nYAML config example:\n\n```yaml\npostgres:\n  user: app\n  password: secret\n  database: appdb\n  hosts:\n    - 10.0.0.1:5432\n    - 10.0.0.2:5432\n  min_masters: 1\n  min_replicas: 1\n  # optional:\n  acquire_timeout: 5\n  refresh_delay: 5\n  refresh_timeout: 5\n  fallback_master: false\n  master_as_replica_weight: 1.0\n  balancer_policy: greedy  # or round_robin / random_weighted\n  stopwatch_window_size: 100\n```\n\nPlug into the app:\n\n```python\nfrom operetta.app import Application\nfrom operetta.service.configuration import YAMLConfigurationService\nfrom operetta.integrations.asyncpg_ha.service import (\n    AsyncpgHAPostgresDatabaseService,\n    AsyncpgHAPostgresDatabaseConfigurationService,\n)\n\napp = Application(\n    YAMLConfigurationService(),\n    AsyncpgHAPostgresDatabaseConfigurationService(),\n    AsyncpgHAPostgresDatabaseService(),\n)\n```\n\n> [!TIP]\n> DI exposes the same adapter interfaces, so repository and unit of work code stays unchanged.\n\n### Advanced setup\n\nYou can pass an `init` callable for connections (e.g., register codecs, set search_path) via DI. Below is an example provider from a real project that registers a custom JSONB codec for asyncpg HA (hasql):\n\n```python\nimport json\nfrom dishka import Provider, Scope, provide\nfrom operetta.app import Application\nfrom operetta.service.configuration import YAMLConfigurationService\nfrom operetta.integrations.asyncpg.config import AsyncpgPoolFactoryKwargs\nfrom operetta.integrations.asyncpg_ha.service import (\n    AsyncpgHAPostgresDatabaseConfigurationService,\n    AsyncpgHAPostgresDatabaseService,\n)\n\nclass AsyncpgJSONCodecProvider(Provider):\n    scope = Scope.APP\n\n    @provide(override=True)\n    def get_pool_factory_kwargs(self) -> AsyncpgPoolFactoryKwargs:\n        async def set_custom_codecs(conn):\n            await conn.set_type_codec(\n                \"jsonb\",\n                encoder=json.dumps,\n                decoder=json.loads,\n                schema=\"pg_catalog\",\n            )\n        return AsyncpgPoolFactoryKwargs(init=set_custom_codecs)\n\napp = Application(\n    YAMLConfigurationService(),\n    AsyncpgHAPostgresDatabaseConfigurationService(),\n    AsyncpgHAPostgresDatabaseService(),\n    di_providers=[AsyncpgJSONCodecProvider()],\n)\n```\n\n> [!IMPORTANT]\\\n> If you use the built-in [`AsyncpgPostgresDatabaseConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/service.py) or\n> [`AsyncpgHAPostgresDatabaseConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/service.py), they already register a\n> default provider for [`AsyncpgPoolFactoryKwargs`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py). To customize pool options,\n> declare your provider with `@provide(override=True)` so it overrides the\n> built-in one; otherwise container validation will fail.\\\n> When you provide your own [`AsyncpgPoolFactoryKwargs`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py) and there is an existing\n> default provider from those services, `override=True` is mandatory.\n\nDefine your own config providers (e.g., from environment variables) if you don't want to use YAML-based ones:\n\n```python\nimport os\nfrom dishka import Provider, Scope, provide\nfrom operetta.app import Application\nfrom operetta.integrations.asyncpg.config import (\n    AsyncpgPostgresDatabaseConfig,\n    AsyncpgPoolFactoryKwargs,\n)\nfrom operetta.integrations.asyncpg.service import AsyncpgPostgresDatabaseService\n\nclass EnvAsyncpgConfigProvider(Provider):\n    scope = Scope.APP\n\n    @provide\n    def get_db_config(self) -> AsyncpgPostgresDatabaseConfig:\n        return AsyncpgPostgresDatabaseConfig(\n            user=os.getenv(\"PGUSER\", \"app\"),\n            database=os.getenv(\"PGDATABASE\", \"appdb\"),\n            host=os.getenv(\"PGHOST\", \"127.0.0.1:5432\"),\n            password=os.getenv(\"PGPASSWORD\"),\n        )\n\n    @provide\n    def get_pool_factory_kwargs(self) -> AsyncpgPoolFactoryKwargs:\n        return {}\n\napp = Application(\n    AsyncpgPostgresDatabaseService(),\n    di_providers=[EnvAsyncpgConfigProvider()],\n)\n```\n\nExample of an environment-based HA config provider:\n\n```python\nimport os\nfrom dishka import Provider, Scope, provide\nfrom operetta.app import Application\nfrom operetta.integrations.asyncpg.config import AsyncpgPoolFactoryKwargs\nfrom operetta.integrations.asyncpg_ha.config import AsyncpgHAPostgresDatabaseConfig\nfrom operetta.integrations.asyncpg_ha.service import (\n    AsyncpgHAPostgresDatabaseConfigurationService,\n    AsyncpgHAPostgresDatabaseService,\n)\n\n\nclass EnvHasqlConfigProvider(Provider):\n    scope = Scope.APP\n\n    @provide\n    def get_db_config(self) -> AsyncpgHAPostgresDatabaseConfig:\n        hosts = os.getenv(\"PGHOSTS\", \"10.0.0.1:5432,10.0.0.2:5432\").split(\",\")\n        return AsyncpgHAPostgresDatabaseConfig(\n            user=os.getenv(\"PGUSER\", \"app\"),\n            database=os.getenv(\"PGDATABASE\", \"appdb\"),\n            hosts=[h.strip() for h in hosts if h.strip()],\n            password=os.getenv(\"PGPASSWORD\"),\n            min_masters=int(os.getenv(\"PG_MIN_MASTERS\", \"1\")),\n            min_replicas=int(os.getenv(\"PG_MIN_REPLICAS\", \"1\")),\n        )\n\n    @provide\n    def get_pool_factory_kwargs(self) -> AsyncpgPoolFactoryKwargs:\n        return {}\n\n\napp = Application(\n    AsyncpgHAPostgresDatabaseService(),\n    di_providers=[EnvHasqlConfigProvider()],\n)\n```\n",
    "bugtrack_url": null,
    "license": "Apache-2.0",
    "summary": null,
    "version": "0.0.9",
    "project_urls": null,
    "split_keywords": [
        "application",
        " framework",
        " ddd",
        " domain-driven design",
        " asyncio",
        " microservices",
        " microservice",
        " dependency-injection",
        " dependency injection",
        " di",
        " web",
        " api",
        " rest",
        " aiohttp",
        " http",
        " openapi",
        " openapi3",
        " swagger",
        " swagger-ui",
        " redoc",
        " postgres",
        " postgresql",
        " database",
        " asyncpg",
        " hasql",
        " aiomisc"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "d8af4e67a61a9d46ddc564113cf83872efb7905f3329d5e2993b514c75a59226",
                "md5": "9d23ada0cc8b1690d4ed7f99327d04e7",
                "sha256": "e9874b01f5a4a1cbf735650629c3395f5c7c6a93e04c1e999f5376ef6a984218"
            },
            "downloads": -1,
            "filename": "operetta-0.0.9-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "9d23ada0cc8b1690d4ed7f99327d04e7",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.11",
            "size": 43318,
            "upload_time": "2025-10-10T19:51:39",
            "upload_time_iso_8601": "2025-10-10T19:51:39.080052Z",
            "url": "https://files.pythonhosted.org/packages/d8/af/4e67a61a9d46ddc564113cf83872efb7905f3329d5e2993b514c75a59226/operetta-0.0.9-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "1561e5d6e300fa8d9fbe94d73fc2e6378deaf50950f367756cdb7da2de3c7522",
                "md5": "f925b7b77ce60a71e0e4ace3a56901fc",
                "sha256": "076cd9be5f0bc2cc5f92f56e2ffee04f5748964914ce37ddb97aa61c87639056"
            },
            "downloads": -1,
            "filename": "operetta-0.0.9.tar.gz",
            "has_sig": false,
            "md5_digest": "f925b7b77ce60a71e0e4ace3a56901fc",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.11",
            "size": 37586,
            "upload_time": "2025-10-10T19:51:40",
            "upload_time_iso_8601": "2025-10-10T19:51:40.547832Z",
            "url": "https://files.pythonhosted.org/packages/15/61/e5d6e300fa8d9fbe94d73fc2e6378deaf50950f367756cdb7da2de3c7522/operetta-0.0.9.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-10-10 19:51:40",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "operetta"
}
        
Elapsed time: 1.41473s