<div align="center">
<img alt="logo" width="175" src="https://raw.githubusercontent.com/Fatal1ty/operetta/7e3e80a54dfc91673dacb36a64c1379aae36a042/img/logo.png">
###### Design Python services right
[](https://github.com/Fatal1ty/operetta/actions)
[](https://pypi.python.org/pypi/operetta)
[](https://pypi.python.org/pypi/operetta)
[](https://opensource.org/licenses/Apache-2.0)
</div>
# Operetta
A lightweight framework for building Python applications that is not tied to a specific transport protocol. It is built on top of [aiomisc](https://github.com/aiokitchen/aiomisc) (service lifecycle, entrypoint) and [dishka](https://github.com/reagento/dishka) (dependency injection). On top of that, the following integrations are available:
- [AIOHTTP](https://github.com/aio-libs/aiohttp): declarative handlers with DI for request body, query, and path params; automatic OpenAPI generation with [Swagger](https://github.com/swagger-api/swagger-ui) and [Redoc](https://github.com/Redocly/redoc).
- PostgreSQL via [asyncpg](https://github.com/MagicStack/asyncpg): a database adapter and DI provider for a connection pool.
- PostgreSQL with HA via [hasql](https://github.com/aiokitchen/hasql): a pool with balancing, failover and the same adapter layer.
## Table of contents
- [Highlights](#highlights)
- [Installation](#installation)
- [Quickstart (HTTP API)](#quickstart-http-api)
- [How it works under the hood](#how-it-works-under-the-hood)
- [Quickstart (non-HTTP app)](#quickstart-non-http-app)
- [Services and DI](#services-and-di)
- [AIOHTTP](#aiohttp)
- [Configuration](#configuration)
- [Error handling and response format](#error-handling-and-response-format)
- [PostgreSQL](#postgresql)
- [Single-node PostgreSQL (asyncpg)](#single-node-postgresql-asyncpg)
- [High-availability PostgreSQL cluster (hasql)](#high-availability-postgresql-cluster-hasql)
- [Advanced setup](#advanced-setup)
## Highlights
- Services as units of functionality: each service starts/stops via [aiomisc](https://github.com/aiokitchen/aiomisc) and may provide DI providers.
- Single DI container ([dishka](https://github.com/reagento/dishka)) for the whole app; separate [scopes](https://dishka.readthedocs.io/en/stable/advanced/scopes.html) for `APP` and `REQUEST`.
- [AIOHTTP](https://github.com/aio-libs/aiohttp) integration:
- Handler parameter annotations: `FromBody[T]`, `FromQuery[T]`, `FromPath[T]`.
- Automatic parsing and validation via [mashumaro](https://github.com/Fatal1ty/mashumaro); friendly error details.
- Unified JSON envelope for responses.
- OpenAPI generation with static assets for Swagger/Redoc.
- PostgreSQL integrations ([asyncpg](https://github.com/MagicStack/asyncpg)/[hasql](https://github.com/aiokitchen/hasql)): interface adapter `PostgresDatabaseAdapter` + transactional `PostgresTransactionDatabaseAdapter` for repositories and units of work.
## Installation
Requires Python 3.11+.
- Base:
```bash
pip install operetta
```
- With AIOHTTP and OpenAPI:
```bash
pip install 'operetta[aiohttp]'
```
- With PostgreSQL via asyncpg:
```bash
pip install 'operetta[asyncpg]'
```
- With PostgreSQL HA via hasql:
```bash
pip install 'operetta[hasql]'
```
## Quickstart (HTTP API)
A minimal AIOHTTP app with DI and autogenerated OpenAPI. You are free to organize your project structure and files as you prefer.
```python
from dataclasses import dataclass, asdict
from aiohttp import web
from operetta.app import Application
from operetta.integrations.aiohttp.annotations import (
FromBody,
FromPath,
FromQuery,
)
from operetta.integrations.aiohttp.response import success_response
from operetta.integrations.aiohttp.service import AIOHTTPService
@dataclass
class CreateUserBody:
name: str
email: str
@dataclass
class UserDto:
id: int
name: str
email: str
async def create_user(
_: web.Request, body: FromBody[CreateUserBody]
) -> web.StreamResponse:
# ... create a user ...
user = UserDto(id=1, name=body.name, email=body.email)
return success_response(asdict(user))
async def get_user(
_: web.Request,
user_id: FromPath[int],
detailed: FromQuery[bool] = False,
) -> UserDto:
# ... load a user ...
user = UserDto(id=user_id, name="Alice", email="alice@example.com")
return user
routes = [
web.post("/users", create_user),
web.get("/users/{user_id}", get_user),
]
app = Application(
AIOHTTPService(
address="127.0.0.1",
port=8080,
routes=routes,
docs_title="Demo API",
docs_servers=("http://127.0.0.1:8080",),
docs_default_type="swagger", # or "redoc"
),
di_providers=[], # your dishka providers if needed
warmup_dependencies=True,
)
if __name__ == "__main__":
app.run()
```
Short example: raising DDD errors in handlers
```python
from operetta.ddd import NotFoundError, AuthorizationError
async def get_user(_: web.Request, user_id: FromPath[int]) -> User:
# Example auth check
if not has_access_to_user(user_id):
raise AuthorizationError(details=[{"permission": "users:read"}])
user = await repo.get_user(user_id)
if user is None:
raise NotFoundError(details=[{"id": user_id}])
return user
```
Open the docs at:
- OpenAPI spec: `/static/openapi/openapi.yaml` (static files path is configurable).
- Swagger UI: `/docs/swagger` (and redirect from `/docs`).
- Redoc: `/docs/redoc`.
### How it works under the hood
- [`AIOHTTPService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py) at app creation time:
- Wraps your routes by inspecting handler signatures and [`FromBody`/`FromQuery`/`FromPath`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/annotations.py) annotations.
- Injects parsed values into the handler call.
- If the return type is not a `StreamResponse`, serializes result into [`SuccessResponse[T]`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/response.py) and returns JSON ([format details](#error-handling-and-response-format)).
- Builds the OpenAPI spec via [openapify](https://github.com/Fatal1ty/openapify) and serves it as static.
- Attaches system middleware: DDD error mapping to HTTP and a global unhandled error catcher.
- DI is configured via [dishka integration with AIOHTTP](https://dishka.readthedocs.io/en/stable/integrations/aiohttp.html); the container is created by [`DIService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/di.py) and wired into the app.
- Each request gets a new DI scope (`REQUEST`) for per-request dependencies.
- Handler parameters may be any DI-resolvable type (e.g., services, database adapters) in addition to `FromBody/FromQuery/FromPath` via `FromDishka`.
## Quickstart (non-HTTP app)
Operetta is not tied to HTTP. You can write background services/workers on [aiomisc](https://github.com/aiokitchen/aiomisc) and use DI:
```python
import asyncio
import contextlib
from operetta.app import Application
from operetta.service.base import Service
class Worker(Service):
async def start(self):
# example: a periodic task
self._task = asyncio.create_task(self._job())
async def stop(self, exception: Exception | None = None):
self._task.cancel()
with contextlib.suppress(Exception):
await self._task
async def _job(self):
while True:
# get dependencies if needed:
# db = await self.get_dependency(PostgresDatabaseAdapter)
await asyncio.sleep(1)
app = Application(Worker(), warmup_dependencies=True)
app.run()
```
## Services and DI
- Base service class: [`operetta.service.base.Service`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/base.py) (inherits `aiomisc.Service`).
- DI container: created inside `DIService` (see [`operetta/service/di.py`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/di.py)).
- Providers are collected from:
- the `Application` itself (argument `di_providers`),
- application services implementing `get_di_providers()`.
- Supports dependency warmup (`warmup=True`) for APP/REQUEST factories.
- Retrieve a dependency from a service via `await service.get_dependency(Type)`.
To load config from YAML, use [`YAMLConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/configuration.py):
```python
from operetta import Application
from operetta.service.configuration import YAMLConfigurationService
config_service = YAMLConfigurationService() # reads --config path from CLI
app = Application(config_service)
```
Two values are provided to DI: [`ApplicationDictConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/types.py) (raw dict) and a config object (if you provide `config_cls`/`config_factory`).
Custom config class (mashumaro DataClassDictMixin):
```python
from dataclasses import dataclass
from mashumaro import DataClassDictMixin
from operetta import Application
from operetta.service.configuration import YAMLConfigurationService
# Define your typed config mapped to YAML structure
@dataclass
class AppConfig(DataClassDictMixin):
# You can use nested dataclasses as well; here kept minimal
creds: dict[str, str] | None = None
# Build service that parses YAML into AppConfig using mashumaro
config_service = YAMLConfigurationService(
config_cls=AppConfig,
config_factory=AppConfig.from_dict,
)
# Both ApplicationDictConfig (raw dict) and AppConfig are available in DI
app = Application(config_service)
```
## AIOHTTP
A first-class integration for building HTTP APIs with declarative handler parameters, DI, and autogenerated OpenAPI/Swagger/Redoc.
- Handler parameter annotations: [`FromBody[T]`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/annotations.py), [`FromQuery[T]`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/annotations.py), [`FromPath[T]`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/annotations.py) (plus DI via `FromDishka`).
- Unified JSON responses out of the box.
- Automatic OpenAPI spec generation and static docs at `/docs` (Swagger or Redoc).
### Configuration
You can configure [`AIOHTTPService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py) in three complementary ways:
- Via constructor (`__init__`) arguments — explicit values have the highest priority.
- Via YAML file ([`YAMLConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/configuration.py) + [`AIOHTTPConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py)/[`AIOHTTPServiceConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/providers.py)) — good for ops-driven setups; overrides defaults but not explicit `__init__` values.
- Via custom DI providers — e.g., environment variables or secrets managers.
Precedence rule:
- `__init__` → DI ([`AIOHTTPServiceConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/providers.py)) → internal defaults
> [!TIP]
> - [`AIOHTTPConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py) is a helper that installs [`AIOHTTPServiceConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/providers.py) into DI.
> - This provider reads `ApplicationDictConfig['api']` and decodes it into [`AIOHTTPServiceConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/config.py).
> - YAML is not required. You can provide [`AIOHTTPServiceConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/config.py) via any DI provider.
YAML keys (all optional) live under the `api:` section:
```yaml
api:
address: 0.0.0.0 # bind address
port: 8081 # listen port
static_endpoint_prefix: /static/
static_files_root: ./var/static # where to serve static files and openapi spec
docs_default_path: /docs
docs_swagger_path: /docs/swagger
docs_redoc_path: /docs/redoc
docs_title: Demo API
docs_servers:
- http://127.0.0.1:8081
docs_default_type: swagger # swagger | redoc | null (no redirect from /docs)
docs_remove_path_prefix: /v1/
# Optional OpenAPI cosmetics
docs_tag_descriptions:
users: Operations with users
docs_tag_groups:
Management:
- users
```
Wire it up:
```python
from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.aiohttp import (
AIOHTTPService,
AIOHTTPConfigurationService,
)
app = Application(
YAMLConfigurationService(), # loads --config path and exposes dict to DI
AIOHTTPConfigurationService(), # registers AIOHTTPServiceConfigProvider reading the api: section
AIOHTTPService(
routes=[],
# You may still override settings here (constructor wins over YAML):
# port=9090,
# docs_default_type="redoc",
),
)
```
Custom config provider example (env-vars):
```python
import os
from dishka import Provider, Scope, provide
from operetta import Application
from operetta.integrations.aiohttp.config import AIOHTTPServiceConfig
from operetta.integrations.aiohttp import AIOHTTPService
class EnvAiohttpConfigProvider(Provider):
scope = Scope.APP
@provide
def get_config(self) -> AIOHTTPServiceConfig:
return AIOHTTPServiceConfig(
address=os.getenv("HTTP_ADDRESS", "0.0.0.0"),
port=int(os.getenv("HTTP_PORT", "8080")),
)
app = Application(
AIOHTTPService(routes=[]),
di_providers=[EnvAiohttpConfigProvider()],
)
```
Under the hood [`AIOHTTPService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py) tries to resolve [`AIOHTTPServiceConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/config.py) from DI on start; if available, it merges values with the precedence above and continues startup as usual.
### Error handling and response format
- Successful responses are automatically wrapped into `{ "success": true, "data": ..., "error": null }`.
- Errors use `{ "success": false, "data": null, "error": { message, code, details } }`.
- Standard AIOHTTP errors and domain/application/infrastructure errors (see [`operetta.ddd.errors`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/errors.py)) are mapped by middleware from [`integrations/aiohttp/middlewares.py`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/middlewares.py).
- Parsing errors for body/params use types from [`integrations/aiohttp/errors.py`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/errors.py) (e.g., `InvalidJSONBodyError`, `InvalidQueryParamsError`, `InvalidPathParamsError`, ...).
Recommended way to raise errors in your app
- Import DDD exceptions from a single place:
```python
from operetta.ddd import (
NotFoundError,
AlreadyExistsError,
ConflictError,
ValidationError,
AuthenticationError,
AuthorizationError,
RelatedResourceNotFoundError,
DependencyUnavailableError,
)
```
- Raise with optional structured details (a sequence of JSON-serializable objects):
```python
raise NotFoundError(
details=[{"resource": "User", "id": user_id}]
)
```
HTTP mapping of DDD exceptions (handled by middleware)
| DDD exception | HTTP status | HTTP error | code |
|-------------------------------------------------------------------------------------------------------------------|-------------|--------------------------|-----------------------|
| AuthenticationError | 401 | UnauthorizedError | UNAUTHORIZED |
| AuthorizationError, PermissionDeniedError | 403 | ForbiddenError | FORBIDDEN |
| NotFoundError | 404 | ResourceNotFoundError | RESOURCE_NOT_FOUND |
| AlreadyExistsError | 409 | DuplicateRequestError | DUPLICATE_RESOURCE |
| ConflictError, InvalidOperationError | 409 | ConflictError | CONFLICT |
| ValidationError, RelatedResourceNotFoundError | 422 | UnprocessableEntityError | UNPROCESSABLE_ENTITY |
| DeadlineExceededError | 504 | GatewayTimeoutError | GATEWAY_TIMEOUT |
| DependencyThrottledError, DependencyUnavailableError, SubsystemUnavailableError, SystemResourceLimitExceededError | 503 | ServiceUnavailableError | SERVICE_UNAVAILABLE |
| DependencyFailureError | 502 | BadGatewayError | BAD_GATEWAY |
| StorageIntegrityError, TransportIntegrityError, InfrastructureError (fallback) | 500 | ServerError | INTERNAL_SERVER_ERROR |
Response envelope reference
- Success:
```json
{ "success": true, "data": { "id": 1, "name": "Alice" }, "error": null }
```
- Error:
```json
{
"success": false,
"data": null,
"error": {
"message": "Resource not found",
"code": "RESOURCE_NOT_FOUND",
"details": [ { "resource": "User", "id": 123 } ]
}
}
```
Advanced
- You can throw HTTP-specific errors directly if you need full control over the client response: see [`operetta.integrations.aiohttp.errors`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/errors.py) (e.g., `ForbiddenError`, `UnauthorizedError`, `UnprocessableEntityError`).
- Two middlewares are installed by default:
- [`ddd_errors_middleware`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/middlewares.py) maps DDD exceptions to HTTP errors above.
- [`unhandled_error_middleware`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/middlewares.py) catches all other exceptions and returns a generic 500 with a safe message.
## PostgreSQL
Operetta provides a thin, uniform abstraction over PostgreSQL so your application code does not depend on a particular driver or pool manager. You write repositories and units of work against two interfaces:
- [`PostgresDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) — a general-purpose adapter for any operations (fetch, fetch_one, execute, ...) without explicit transaction control.
- [`PostgresTransactionDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) — the same API for all operations plus transaction control methods (start/commit/rollback) when you need to run multiple steps in a single transaction.
There are two interchangeable backends:
- [asyncpg](https://github.com/MagicStack/asyncpg) — a straightforward single-pool setup.
- [hasql](https://github.com/aiokitchen/hasql) (asyncpg HA) — a high-availability pool manager with balancing/failover.
Both backends expose the same interfaces via DI, so switching is configuration-only. DI scopes are chosen to match typical usage:
- [`PostgresDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) is provided with scope=APP (shared pool).
- [`PostgresTransactionDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) is provided with scope=REQUEST (per-request/operation handle for transactional work).
Configuration is provided via DI:
- Connection config types: [`AsyncpgPostgresDatabaseConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py) (for asyncpg) and [`AsyncpgHAPostgresDatabaseConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/config.py) (for asyncpg HA).
- Pool factory kwargs type: [`AsyncpgPoolFactoryKwargs`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py) (to pass `init` or other pool options to the driver/manager).
- Built-in config providers — [`AsyncpgPostgresDatabaseConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/providers.py) and [`AsyncpgHAPostgresDatabaseConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/providers.py) — read settings from `ApplicationDictConfig['postgres']`, which is loaded by [`YAMLConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/configuration.py) from your YAML file.
- A built-in pool kwargs provider returns an empty [`AsyncpgPoolFactoryKwargs`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py) by default; you can override it to customize connection initialization (see [Advanced setup](#advanced-setup)).
Typical pattern:
- Use [`PostgresDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) when you don't need explicit transaction management: it's suitable for any reads and writes.
- When you need transactional boundaries, get [`PostgresTransactionDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py), call `start_transaction()`/`commit_transaction()` (or `rollback_transaction()` on error), and run your operations within that transaction.
Configuration can be loaded from YAML via [`YAMLConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/configuration.py) under the `postgres:` key. Optional connection initialization (e.g., custom codecs or `search_path`) can be provided through [`AsyncpgPoolFactoryKwargs`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py) in DI; this works for both `asyncpg` and `hasql` variants.
### Single-node PostgreSQL (asyncpg)
Provides:
- Providers: [`AsyncpgPostgresDatabaseProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/providers.py), [`AsyncpgPostgresDatabaseConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/providers.py).
- Convenience services to plug into `Application`:
- [`AsyncpgPostgresDatabaseService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/service.py) — pool and adapters,
- [`AsyncpgPostgresDatabaseConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/service.py) — loads config from `ApplicationDictConfig`.
- Adapters:
- [`PostgresDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) with scope=APP — general-purpose adapter for any operations (fetch/fetch_one/execute, ...).
- [`PostgresTransactionDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) with scope=REQUEST (handy for HTTP requests) — same API plus transaction control methods (start/commit/rollback).
YAML config example:
```yaml
postgres:
user: app
password: secret
database: appdb
host: 127.0.0.1:5432
# optional pool params:
min_size: 5
max_size: 20
max_queries: 50000
max_inactive_connection_lifetime: 300
```
Plug into the app:
```python
from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.asyncpg.service import (
AsyncpgPostgresDatabaseService,
AsyncpgPostgresDatabaseConfigurationService,
)
app = Application(
YAMLConfigurationService(),
AsyncpgPostgresDatabaseConfigurationService(),
AsyncpgPostgresDatabaseService(),
)
```
Use in a repository:
```python
from dataclasses import dataclass
from operetta.ddd.infrastructure.db.postgres.adapters.interface import (
PostgresDatabaseAdapter,
PostgresTransactionDatabaseAdapter,
)
@dataclass
class User:
id: int
name: str
class UserRepository:
def __init__(self, db: PostgresDatabaseAdapter):
self._db = db
async def get_by_id(self, user_id: int) -> User | None:
row = await self._db.fetch_one(
"SELECT id, name FROM users WHERE id=$1", user_id
)
return User(id=row["id"], name=row["name"]) if row else None
class UnitOfWork:
def __init__(self, tx: PostgresTransactionDatabaseAdapter):
self._tx = tx
async def __aenter__(self):
await self._tx.start_transaction()
return self
async def __aexit__(self, exc_type, exc, tb):
if exc:
await self._tx.rollback_transaction()
else:
await self._tx.commit_transaction()
```
### High-availability PostgreSQL cluster (hasql)
If you run an HA cluster (multiple nodes), use the hasql integration.
Provides:
- Providers: [`AsyncpgHAPostgresDatabaseProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/providers.py), [`AsyncpgHAPostgresDatabaseConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/providers.py).
- Convenience services to plug into `Application`:
- [`AsyncpgHAPostgresDatabaseService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/service.py) — pool and adapters,
- [`AsyncpgHAPostgresDatabaseConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/service.py) — loads config from `ApplicationDictConfig`.
- Adapters:
- [`PostgresDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) with scope=APP — general-purpose adapter for any operations (fetch/fetch_one/execute, ...).
- [`PostgresTransactionDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) with scope=REQUEST (handy for HTTP requests) — same API plus transaction control methods (start/commit/rollback).
YAML config example:
```yaml
postgres:
user: app
password: secret
database: appdb
hosts:
- 10.0.0.1:5432
- 10.0.0.2:5432
min_masters: 1
min_replicas: 1
# optional:
acquire_timeout: 5
refresh_delay: 5
refresh_timeout: 5
fallback_master: false
master_as_replica_weight: 1.0
balancer_policy: greedy # or round_robin / random_weighted
stopwatch_window_size: 100
```
Plug into the app:
```python
from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.asyncpg_ha.service import (
AsyncpgHAPostgresDatabaseService,
AsyncpgHAPostgresDatabaseConfigurationService,
)
app = Application(
YAMLConfigurationService(),
AsyncpgHAPostgresDatabaseConfigurationService(),
AsyncpgHAPostgresDatabaseService(),
)
```
> [!TIP]
> DI exposes the same adapter interfaces, so repository and unit of work code stays unchanged.
### Advanced setup
You can pass an `init` callable for connections (e.g., register codecs, set search_path) via DI. Below is an example provider from a real project that registers a custom JSONB codec for asyncpg HA (hasql):
```python
import json
from dishka import Provider, Scope, provide
from operetta.app import Application
from operetta.service.configuration import YAMLConfigurationService
from operetta.integrations.asyncpg.config import AsyncpgPoolFactoryKwargs
from operetta.integrations.asyncpg_ha.service import (
AsyncpgHAPostgresDatabaseConfigurationService,
AsyncpgHAPostgresDatabaseService,
)
class AsyncpgJSONCodecProvider(Provider):
scope = Scope.APP
@provide(override=True)
def get_pool_factory_kwargs(self) -> AsyncpgPoolFactoryKwargs:
async def set_custom_codecs(conn):
await conn.set_type_codec(
"jsonb",
encoder=json.dumps,
decoder=json.loads,
schema="pg_catalog",
)
return AsyncpgPoolFactoryKwargs(init=set_custom_codecs)
app = Application(
YAMLConfigurationService(),
AsyncpgHAPostgresDatabaseConfigurationService(),
AsyncpgHAPostgresDatabaseService(),
di_providers=[AsyncpgJSONCodecProvider()],
)
```
> [!IMPORTANT]\
> If you use the built-in [`AsyncpgPostgresDatabaseConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/service.py) or
> [`AsyncpgHAPostgresDatabaseConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/service.py), they already register a
> default provider for [`AsyncpgPoolFactoryKwargs`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py). To customize pool options,
> declare your provider with `@provide(override=True)` so it overrides the
> built-in one; otherwise container validation will fail.\
> When you provide your own [`AsyncpgPoolFactoryKwargs`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py) and there is an existing
> default provider from those services, `override=True` is mandatory.
Define your own config providers (e.g., from environment variables) if you don't want to use YAML-based ones:
```python
import os
from dishka import Provider, Scope, provide
from operetta.app import Application
from operetta.integrations.asyncpg.config import (
AsyncpgPostgresDatabaseConfig,
AsyncpgPoolFactoryKwargs,
)
from operetta.integrations.asyncpg.service import AsyncpgPostgresDatabaseService
class EnvAsyncpgConfigProvider(Provider):
scope = Scope.APP
@provide
def get_db_config(self) -> AsyncpgPostgresDatabaseConfig:
return AsyncpgPostgresDatabaseConfig(
user=os.getenv("PGUSER", "app"),
database=os.getenv("PGDATABASE", "appdb"),
host=os.getenv("PGHOST", "127.0.0.1:5432"),
password=os.getenv("PGPASSWORD"),
)
@provide
def get_pool_factory_kwargs(self) -> AsyncpgPoolFactoryKwargs:
return {}
app = Application(
AsyncpgPostgresDatabaseService(),
di_providers=[EnvAsyncpgConfigProvider()],
)
```
Example of an environment-based HA config provider:
```python
import os
from dishka import Provider, Scope, provide
from operetta.app import Application
from operetta.integrations.asyncpg.config import AsyncpgPoolFactoryKwargs
from operetta.integrations.asyncpg_ha.config import AsyncpgHAPostgresDatabaseConfig
from operetta.integrations.asyncpg_ha.service import (
AsyncpgHAPostgresDatabaseConfigurationService,
AsyncpgHAPostgresDatabaseService,
)
class EnvHasqlConfigProvider(Provider):
scope = Scope.APP
@provide
def get_db_config(self) -> AsyncpgHAPostgresDatabaseConfig:
hosts = os.getenv("PGHOSTS", "10.0.0.1:5432,10.0.0.2:5432").split(",")
return AsyncpgHAPostgresDatabaseConfig(
user=os.getenv("PGUSER", "app"),
database=os.getenv("PGDATABASE", "appdb"),
hosts=[h.strip() for h in hosts if h.strip()],
password=os.getenv("PGPASSWORD"),
min_masters=int(os.getenv("PG_MIN_MASTERS", "1")),
min_replicas=int(os.getenv("PG_MIN_REPLICAS", "1")),
)
@provide
def get_pool_factory_kwargs(self) -> AsyncpgPoolFactoryKwargs:
return {}
app = Application(
AsyncpgHAPostgresDatabaseService(),
di_providers=[EnvHasqlConfigProvider()],
)
```
Raw data
{
"_id": null,
"home_page": null,
"name": "operetta",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.11",
"maintainer_email": null,
"keywords": "application, framework, ddd, domain-driven design, asyncio, microservices, microservice, dependency-injection, dependency injection, di, web, api, rest, aiohttp, http, openapi, openapi3, swagger, swagger-ui, redoc, postgres, postgresql, database, asyncpg, hasql, aiomisc",
"author": "Alexander Tikhonov",
"author_email": "random.gauss@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/15/61/e5d6e300fa8d9fbe94d73fc2e6378deaf50950f367756cdb7da2de3c7522/operetta-0.0.9.tar.gz",
"platform": null,
"description": "<div align=\"center\">\n\n<img alt=\"logo\" width=\"175\" src=\"https://raw.githubusercontent.com/Fatal1ty/operetta/7e3e80a54dfc91673dacb36a64c1379aae36a042/img/logo.png\">\n\n###### Design Python services right\n\n[](https://github.com/Fatal1ty/operetta/actions)\n[](https://pypi.python.org/pypi/operetta)\n[](https://pypi.python.org/pypi/operetta)\n[](https://opensource.org/licenses/Apache-2.0)\n</div>\n\n# Operetta\n\nA lightweight framework for building Python applications that is not tied to a specific transport protocol. It is built on top of [aiomisc](https://github.com/aiokitchen/aiomisc) (service lifecycle, entrypoint) and [dishka](https://github.com/reagento/dishka) (dependency injection). On top of that, the following integrations are available:\n\n- [AIOHTTP](https://github.com/aio-libs/aiohttp): declarative handlers with DI for request body, query, and path params; automatic OpenAPI generation with [Swagger](https://github.com/swagger-api/swagger-ui) and [Redoc](https://github.com/Redocly/redoc).\n- PostgreSQL via [asyncpg](https://github.com/MagicStack/asyncpg): a database adapter and DI provider for a connection pool.\n- PostgreSQL with HA via [hasql](https://github.com/aiokitchen/hasql): a pool with balancing, failover and the same adapter layer.\n\n\n## Table of contents\n\n- [Highlights](#highlights)\n- [Installation](#installation)\n- [Quickstart (HTTP API)](#quickstart-http-api)\n - [How it works under the hood](#how-it-works-under-the-hood)\n- [Quickstart (non-HTTP app)](#quickstart-non-http-app)\n- [Services and DI](#services-and-di)\n- [AIOHTTP](#aiohttp)\n - [Configuration](#configuration)\n - [Error handling and response format](#error-handling-and-response-format)\n- [PostgreSQL](#postgresql)\n - [Single-node PostgreSQL (asyncpg)](#single-node-postgresql-asyncpg)\n - [High-availability PostgreSQL cluster (hasql)](#high-availability-postgresql-cluster-hasql)\n - [Advanced setup](#advanced-setup)\n\n## Highlights\n\n- Services as units of functionality: each service starts/stops via [aiomisc](https://github.com/aiokitchen/aiomisc) and may provide DI providers.\n- Single DI container ([dishka](https://github.com/reagento/dishka)) for the whole app; separate [scopes](https://dishka.readthedocs.io/en/stable/advanced/scopes.html) for `APP` and `REQUEST`.\n- [AIOHTTP](https://github.com/aio-libs/aiohttp) integration:\n - Handler parameter annotations: `FromBody[T]`, `FromQuery[T]`, `FromPath[T]`.\n - Automatic parsing and validation via [mashumaro](https://github.com/Fatal1ty/mashumaro); friendly error details.\n - Unified JSON envelope for responses.\n - OpenAPI generation with static assets for Swagger/Redoc.\n- PostgreSQL integrations ([asyncpg](https://github.com/MagicStack/asyncpg)/[hasql](https://github.com/aiokitchen/hasql)): interface adapter `PostgresDatabaseAdapter` + transactional `PostgresTransactionDatabaseAdapter` for repositories and units of work.\n\n## Installation\n\nRequires Python 3.11+.\n\n- Base:\n\n```bash\npip install operetta\n```\n\n- With AIOHTTP and OpenAPI:\n\n```bash\npip install 'operetta[aiohttp]'\n```\n\n- With PostgreSQL via asyncpg:\n\n```bash\npip install 'operetta[asyncpg]'\n```\n\n- With PostgreSQL HA via hasql:\n\n```bash\npip install 'operetta[hasql]'\n```\n\n## Quickstart (HTTP API)\n\nA minimal AIOHTTP app with DI and autogenerated OpenAPI. You are free to organize your project structure and files as you prefer.\n\n```python\nfrom dataclasses import dataclass, asdict\nfrom aiohttp import web\nfrom operetta.app import Application\nfrom operetta.integrations.aiohttp.annotations import (\n FromBody,\n FromPath,\n FromQuery,\n)\nfrom operetta.integrations.aiohttp.response import success_response\nfrom operetta.integrations.aiohttp.service import AIOHTTPService\n\n\n@dataclass\nclass CreateUserBody:\n name: str\n email: str\n\n\n@dataclass\nclass UserDto:\n id: int\n name: str\n email: str\n\n\nasync def create_user(\n _: web.Request, body: FromBody[CreateUserBody]\n) -> web.StreamResponse:\n # ... create a user ...\n user = UserDto(id=1, name=body.name, email=body.email)\n return success_response(asdict(user))\n\n\nasync def get_user(\n _: web.Request,\n user_id: FromPath[int],\n detailed: FromQuery[bool] = False,\n) -> UserDto:\n # ... load a user ...\n user = UserDto(id=user_id, name=\"Alice\", email=\"alice@example.com\")\n return user\n\n\nroutes = [\n web.post(\"/users\", create_user),\n web.get(\"/users/{user_id}\", get_user),\n]\n\napp = Application(\n AIOHTTPService(\n address=\"127.0.0.1\",\n port=8080,\n routes=routes,\n docs_title=\"Demo API\",\n docs_servers=(\"http://127.0.0.1:8080\",),\n docs_default_type=\"swagger\", # or \"redoc\"\n ),\n di_providers=[], # your dishka providers if needed\n warmup_dependencies=True,\n)\n\nif __name__ == \"__main__\":\n app.run()\n```\n\nShort example: raising DDD errors in handlers\n\n```python\nfrom operetta.ddd import NotFoundError, AuthorizationError\n\nasync def get_user(_: web.Request, user_id: FromPath[int]) -> User:\n # Example auth check\n if not has_access_to_user(user_id):\n raise AuthorizationError(details=[{\"permission\": \"users:read\"}])\n\n user = await repo.get_user(user_id)\n if user is None:\n raise NotFoundError(details=[{\"id\": user_id}])\n\n return user\n```\n\nOpen the docs at:\n\n- OpenAPI spec: `/static/openapi/openapi.yaml` (static files path is configurable).\n- Swagger UI: `/docs/swagger` (and redirect from `/docs`).\n- Redoc: `/docs/redoc`.\n\n### How it works under the hood\n\n- [`AIOHTTPService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py) at app creation time:\n - Wraps your routes by inspecting handler signatures and [`FromBody`/`FromQuery`/`FromPath`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/annotations.py) annotations.\n - Injects parsed values into the handler call.\n - If the return type is not a `StreamResponse`, serializes result into [`SuccessResponse[T]`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/response.py) and returns JSON ([format details](#error-handling-and-response-format)).\n - Builds the OpenAPI spec via [openapify](https://github.com/Fatal1ty/openapify) and serves it as static.\n - Attaches system middleware: DDD error mapping to HTTP and a global unhandled error catcher.\n- DI is configured via [dishka integration with AIOHTTP](https://dishka.readthedocs.io/en/stable/integrations/aiohttp.html); the container is created by [`DIService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/di.py) and wired into the app.\n - Each request gets a new DI scope (`REQUEST`) for per-request dependencies.\n - Handler parameters may be any DI-resolvable type (e.g., services, database adapters) in addition to `FromBody/FromQuery/FromPath` via `FromDishka`.\n\n## Quickstart (non-HTTP app)\n\nOperetta is not tied to HTTP. You can write background services/workers on [aiomisc](https://github.com/aiokitchen/aiomisc) and use DI:\n\n```python\nimport asyncio\nimport contextlib\nfrom operetta.app import Application\nfrom operetta.service.base import Service\n\nclass Worker(Service):\n async def start(self):\n # example: a periodic task\n self._task = asyncio.create_task(self._job())\n\n async def stop(self, exception: Exception | None = None):\n self._task.cancel()\n with contextlib.suppress(Exception):\n await self._task\n\n async def _job(self):\n while True:\n # get dependencies if needed:\n # db = await self.get_dependency(PostgresDatabaseAdapter)\n await asyncio.sleep(1)\n\napp = Application(Worker(), warmup_dependencies=True)\napp.run()\n```\n\n## Services and DI\n\n- Base service class: [`operetta.service.base.Service`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/base.py) (inherits `aiomisc.Service`).\n- DI container: created inside `DIService` (see [`operetta/service/di.py`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/di.py)).\n - Providers are collected from:\n - the `Application` itself (argument `di_providers`),\n - application services implementing `get_di_providers()`.\n - Supports dependency warmup (`warmup=True`) for APP/REQUEST factories.\n- Retrieve a dependency from a service via `await service.get_dependency(Type)`.\n\nTo load config from YAML, use [`YAMLConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/configuration.py):\n\n```python\nfrom operetta import Application\nfrom operetta.service.configuration import YAMLConfigurationService\n\nconfig_service = YAMLConfigurationService() # reads --config path from CLI\napp = Application(config_service)\n```\n\nTwo values are provided to DI: [`ApplicationDictConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/types.py) (raw dict) and a config object (if you provide `config_cls`/`config_factory`).\n\nCustom config class (mashumaro DataClassDictMixin):\n\n```python\nfrom dataclasses import dataclass\nfrom mashumaro import DataClassDictMixin\nfrom operetta import Application\nfrom operetta.service.configuration import YAMLConfigurationService\n\n# Define your typed config mapped to YAML structure\n@dataclass\nclass AppConfig(DataClassDictMixin):\n # You can use nested dataclasses as well; here kept minimal\n creds: dict[str, str] | None = None\n\n# Build service that parses YAML into AppConfig using mashumaro\nconfig_service = YAMLConfigurationService(\n config_cls=AppConfig,\n config_factory=AppConfig.from_dict,\n)\n\n# Both ApplicationDictConfig (raw dict) and AppConfig are available in DI\napp = Application(config_service)\n```\n\n## AIOHTTP\n\nA first-class integration for building HTTP APIs with declarative handler parameters, DI, and autogenerated OpenAPI/Swagger/Redoc.\n\n- Handler parameter annotations: [`FromBody[T]`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/annotations.py), [`FromQuery[T]`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/annotations.py), [`FromPath[T]`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/annotations.py) (plus DI via `FromDishka`).\n- Unified JSON responses out of the box.\n- Automatic OpenAPI spec generation and static docs at `/docs` (Swagger or Redoc).\n\n### Configuration\n\nYou can configure [`AIOHTTPService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py) in three complementary ways:\n\n- Via constructor (`__init__`) arguments \u2014 explicit values have the highest priority.\n- Via YAML file ([`YAMLConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/configuration.py) + [`AIOHTTPConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py)/[`AIOHTTPServiceConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/providers.py)) \u2014 good for ops-driven setups; overrides defaults but not explicit `__init__` values.\n- Via custom DI providers \u2014 e.g., environment variables or secrets managers.\n\nPrecedence rule:\n- `__init__` \u2192 DI ([`AIOHTTPServiceConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/providers.py)) \u2192 internal defaults\n\n> [!TIP]\n> - [`AIOHTTPConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py) is a helper that installs [`AIOHTTPServiceConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/providers.py) into DI.\n> - This provider reads `ApplicationDictConfig['api']` and decodes it into [`AIOHTTPServiceConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/config.py).\n> - YAML is not required. You can provide [`AIOHTTPServiceConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/config.py) via any DI provider.\n\nYAML keys (all optional) live under the `api:` section:\n\n```yaml\napi:\n address: 0.0.0.0 # bind address\n port: 8081 # listen port\n static_endpoint_prefix: /static/\n static_files_root: ./var/static # where to serve static files and openapi spec\n docs_default_path: /docs\n docs_swagger_path: /docs/swagger\n docs_redoc_path: /docs/redoc\n docs_title: Demo API\n docs_servers:\n - http://127.0.0.1:8081\n docs_default_type: swagger # swagger | redoc | null (no redirect from /docs)\n docs_remove_path_prefix: /v1/\n # Optional OpenAPI cosmetics\n docs_tag_descriptions:\n users: Operations with users\n docs_tag_groups:\n Management:\n - users\n```\n\nWire it up:\n\n```python\nfrom operetta.app import Application\nfrom operetta.service.configuration import YAMLConfigurationService\nfrom operetta.integrations.aiohttp import (\n AIOHTTPService,\n AIOHTTPConfigurationService,\n)\n\napp = Application(\n YAMLConfigurationService(), # loads --config path and exposes dict to DI\n AIOHTTPConfigurationService(), # registers AIOHTTPServiceConfigProvider reading the api: section\n AIOHTTPService(\n routes=[],\n # You may still override settings here (constructor wins over YAML):\n # port=9090,\n # docs_default_type=\"redoc\",\n ),\n)\n```\n\nCustom config provider example (env-vars):\n\n```python\nimport os\nfrom dishka import Provider, Scope, provide\nfrom operetta import Application\nfrom operetta.integrations.aiohttp.config import AIOHTTPServiceConfig\nfrom operetta.integrations.aiohttp import AIOHTTPService\n\nclass EnvAiohttpConfigProvider(Provider):\n scope = Scope.APP\n\n @provide\n def get_config(self) -> AIOHTTPServiceConfig:\n return AIOHTTPServiceConfig(\n address=os.getenv(\"HTTP_ADDRESS\", \"0.0.0.0\"),\n port=int(os.getenv(\"HTTP_PORT\", \"8080\")),\n )\n\napp = Application(\n AIOHTTPService(routes=[]),\n di_providers=[EnvAiohttpConfigProvider()],\n)\n```\n\nUnder the hood [`AIOHTTPService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/service.py) tries to resolve [`AIOHTTPServiceConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/config.py) from DI on start; if available, it merges values with the precedence above and continues startup as usual.\n\n### Error handling and response format\n\n- Successful responses are automatically wrapped into `{ \"success\": true, \"data\": ..., \"error\": null }`.\n- Errors use `{ \"success\": false, \"data\": null, \"error\": { message, code, details } }`.\n- Standard AIOHTTP errors and domain/application/infrastructure errors (see [`operetta.ddd.errors`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/errors.py)) are mapped by middleware from [`integrations/aiohttp/middlewares.py`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/middlewares.py).\n- Parsing errors for body/params use types from [`integrations/aiohttp/errors.py`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/errors.py) (e.g., `InvalidJSONBodyError`, `InvalidQueryParamsError`, `InvalidPathParamsError`, ...).\n\nRecommended way to raise errors in your app\n\n- Import DDD exceptions from a single place:\n\n ```python\n from operetta.ddd import (\n NotFoundError,\n AlreadyExistsError,\n ConflictError,\n ValidationError,\n AuthenticationError,\n AuthorizationError,\n RelatedResourceNotFoundError,\n DependencyUnavailableError,\n )\n ```\n\n- Raise with optional structured details (a sequence of JSON-serializable objects):\n\n ```python\n raise NotFoundError(\n details=[{\"resource\": \"User\", \"id\": user_id}]\n )\n ```\n\nHTTP mapping of DDD exceptions (handled by middleware)\n\n| DDD exception | HTTP status | HTTP error | code |\n|-------------------------------------------------------------------------------------------------------------------|-------------|--------------------------|-----------------------|\n| AuthenticationError | 401 | UnauthorizedError | UNAUTHORIZED |\n| AuthorizationError, PermissionDeniedError | 403 | ForbiddenError | FORBIDDEN |\n| NotFoundError | 404 | ResourceNotFoundError | RESOURCE_NOT_FOUND |\n| AlreadyExistsError | 409 | DuplicateRequestError | DUPLICATE_RESOURCE |\n| ConflictError, InvalidOperationError | 409 | ConflictError | CONFLICT |\n| ValidationError, RelatedResourceNotFoundError | 422 | UnprocessableEntityError | UNPROCESSABLE_ENTITY |\n| DeadlineExceededError | 504 | GatewayTimeoutError | GATEWAY_TIMEOUT |\n| DependencyThrottledError, DependencyUnavailableError, SubsystemUnavailableError, SystemResourceLimitExceededError | 503 | ServiceUnavailableError | SERVICE_UNAVAILABLE |\n| DependencyFailureError | 502 | BadGatewayError | BAD_GATEWAY |\n| StorageIntegrityError, TransportIntegrityError, InfrastructureError (fallback) | 500 | ServerError | INTERNAL_SERVER_ERROR |\n\nResponse envelope reference\n\n- Success:\n\n ```json\n { \"success\": true, \"data\": { \"id\": 1, \"name\": \"Alice\" }, \"error\": null }\n ```\n\n- Error:\n\n ```json\n {\n \"success\": false,\n \"data\": null,\n \"error\": {\n \"message\": \"Resource not found\",\n \"code\": \"RESOURCE_NOT_FOUND\",\n \"details\": [ { \"resource\": \"User\", \"id\": 123 } ]\n }\n }\n ```\n\nAdvanced\n\n- You can throw HTTP-specific errors directly if you need full control over the client response: see [`operetta.integrations.aiohttp.errors`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/errors.py) (e.g., `ForbiddenError`, `UnauthorizedError`, `UnprocessableEntityError`).\n- Two middlewares are installed by default:\n - [`ddd_errors_middleware`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/middlewares.py) maps DDD exceptions to HTTP errors above.\n - [`unhandled_error_middleware`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/aiohttp/middlewares.py) catches all other exceptions and returns a generic 500 with a safe message.\n\n## PostgreSQL\n\nOperetta provides a thin, uniform abstraction over PostgreSQL so your application code does not depend on a particular driver or pool manager. You write repositories and units of work against two interfaces:\n\n- [`PostgresDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) \u2014 a general-purpose adapter for any operations (fetch, fetch_one, execute, ...) without explicit transaction control.\n- [`PostgresTransactionDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) \u2014 the same API for all operations plus transaction control methods (start/commit/rollback) when you need to run multiple steps in a single transaction.\n\nThere are two interchangeable backends:\n- [asyncpg](https://github.com/MagicStack/asyncpg) \u2014 a straightforward single-pool setup.\n- [hasql](https://github.com/aiokitchen/hasql) (asyncpg HA) \u2014 a high-availability pool manager with balancing/failover.\n\nBoth backends expose the same interfaces via DI, so switching is configuration-only. DI scopes are chosen to match typical usage:\n- [`PostgresDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) is provided with scope=APP (shared pool).\n- [`PostgresTransactionDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) is provided with scope=REQUEST (per-request/operation handle for transactional work).\n\nConfiguration is provided via DI:\n- Connection config types: [`AsyncpgPostgresDatabaseConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py) (for asyncpg) and [`AsyncpgHAPostgresDatabaseConfig`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/config.py) (for asyncpg HA).\n- Pool factory kwargs type: [`AsyncpgPoolFactoryKwargs`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py) (to pass `init` or other pool options to the driver/manager).\n- Built-in config providers \u2014 [`AsyncpgPostgresDatabaseConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/providers.py) and [`AsyncpgHAPostgresDatabaseConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/providers.py) \u2014 read settings from `ApplicationDictConfig['postgres']`, which is loaded by [`YAMLConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/configuration.py) from your YAML file.\n- A built-in pool kwargs provider returns an empty [`AsyncpgPoolFactoryKwargs`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py) by default; you can override it to customize connection initialization (see [Advanced setup](#advanced-setup)).\n\nTypical pattern:\n- Use [`PostgresDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) when you don't need explicit transaction management: it's suitable for any reads and writes.\n- When you need transactional boundaries, get [`PostgresTransactionDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py), call `start_transaction()`/`commit_transaction()` (or `rollback_transaction()` on error), and run your operations within that transaction.\n\nConfiguration can be loaded from YAML via [`YAMLConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/service/configuration.py) under the `postgres:` key. Optional connection initialization (e.g., custom codecs or `search_path`) can be provided through [`AsyncpgPoolFactoryKwargs`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py) in DI; this works for both `asyncpg` and `hasql` variants.\n\n### Single-node PostgreSQL (asyncpg)\n\nProvides:\n- Providers: [`AsyncpgPostgresDatabaseProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/providers.py), [`AsyncpgPostgresDatabaseConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/providers.py).\n- Convenience services to plug into `Application`:\n - [`AsyncpgPostgresDatabaseService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/service.py) \u2014 pool and adapters,\n - [`AsyncpgPostgresDatabaseConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/service.py) \u2014 loads config from `ApplicationDictConfig`.\n- Adapters:\n - [`PostgresDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) with scope=APP \u2014 general-purpose adapter for any operations (fetch/fetch_one/execute, ...).\n - [`PostgresTransactionDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) with scope=REQUEST (handy for HTTP requests) \u2014 same API plus transaction control methods (start/commit/rollback).\n\nYAML config example:\n\n```yaml\npostgres:\n user: app\n password: secret\n database: appdb\n host: 127.0.0.1:5432\n # optional pool params:\n min_size: 5\n max_size: 20\n max_queries: 50000\n max_inactive_connection_lifetime: 300\n```\n\nPlug into the app:\n\n```python\nfrom operetta.app import Application\nfrom operetta.service.configuration import YAMLConfigurationService\nfrom operetta.integrations.asyncpg.service import (\n AsyncpgPostgresDatabaseService,\n AsyncpgPostgresDatabaseConfigurationService,\n)\n\napp = Application(\n YAMLConfigurationService(),\n AsyncpgPostgresDatabaseConfigurationService(),\n AsyncpgPostgresDatabaseService(),\n)\n```\n\nUse in a repository:\n\n```python\nfrom dataclasses import dataclass\nfrom operetta.ddd.infrastructure.db.postgres.adapters.interface import (\n PostgresDatabaseAdapter,\n PostgresTransactionDatabaseAdapter,\n)\n\n@dataclass\nclass User:\n id: int\n name: str\n\nclass UserRepository:\n def __init__(self, db: PostgresDatabaseAdapter):\n self._db = db\n\n async def get_by_id(self, user_id: int) -> User | None:\n row = await self._db.fetch_one(\n \"SELECT id, name FROM users WHERE id=$1\", user_id\n )\n return User(id=row[\"id\"], name=row[\"name\"]) if row else None\n\nclass UnitOfWork:\n def __init__(self, tx: PostgresTransactionDatabaseAdapter):\n self._tx = tx\n\n async def __aenter__(self):\n await self._tx.start_transaction()\n return self\n\n async def __aexit__(self, exc_type, exc, tb):\n if exc:\n await self._tx.rollback_transaction()\n else:\n await self._tx.commit_transaction()\n```\n\n### High-availability PostgreSQL cluster (hasql)\n\nIf you run an HA cluster (multiple nodes), use the hasql integration.\n\nProvides:\n- Providers: [`AsyncpgHAPostgresDatabaseProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/providers.py), [`AsyncpgHAPostgresDatabaseConfigProvider`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/providers.py).\n- Convenience services to plug into `Application`:\n - [`AsyncpgHAPostgresDatabaseService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/service.py) \u2014 pool and adapters,\n - [`AsyncpgHAPostgresDatabaseConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/service.py) \u2014 loads config from `ApplicationDictConfig`.\n- Adapters:\n - [`PostgresDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) with scope=APP \u2014 general-purpose adapter for any operations (fetch/fetch_one/execute, ...).\n - [`PostgresTransactionDatabaseAdapter`](https://github.com/Fatal1ty/operetta/blob/main/operetta/ddd/infrastructure/db/postgres/adapters/interface.py) with scope=REQUEST (handy for HTTP requests) \u2014 same API plus transaction control methods (start/commit/rollback).\n\nYAML config example:\n\n```yaml\npostgres:\n user: app\n password: secret\n database: appdb\n hosts:\n - 10.0.0.1:5432\n - 10.0.0.2:5432\n min_masters: 1\n min_replicas: 1\n # optional:\n acquire_timeout: 5\n refresh_delay: 5\n refresh_timeout: 5\n fallback_master: false\n master_as_replica_weight: 1.0\n balancer_policy: greedy # or round_robin / random_weighted\n stopwatch_window_size: 100\n```\n\nPlug into the app:\n\n```python\nfrom operetta.app import Application\nfrom operetta.service.configuration import YAMLConfigurationService\nfrom operetta.integrations.asyncpg_ha.service import (\n AsyncpgHAPostgresDatabaseService,\n AsyncpgHAPostgresDatabaseConfigurationService,\n)\n\napp = Application(\n YAMLConfigurationService(),\n AsyncpgHAPostgresDatabaseConfigurationService(),\n AsyncpgHAPostgresDatabaseService(),\n)\n```\n\n> [!TIP]\n> DI exposes the same adapter interfaces, so repository and unit of work code stays unchanged.\n\n### Advanced setup\n\nYou can pass an `init` callable for connections (e.g., register codecs, set search_path) via DI. Below is an example provider from a real project that registers a custom JSONB codec for asyncpg HA (hasql):\n\n```python\nimport json\nfrom dishka import Provider, Scope, provide\nfrom operetta.app import Application\nfrom operetta.service.configuration import YAMLConfigurationService\nfrom operetta.integrations.asyncpg.config import AsyncpgPoolFactoryKwargs\nfrom operetta.integrations.asyncpg_ha.service import (\n AsyncpgHAPostgresDatabaseConfigurationService,\n AsyncpgHAPostgresDatabaseService,\n)\n\nclass AsyncpgJSONCodecProvider(Provider):\n scope = Scope.APP\n\n @provide(override=True)\n def get_pool_factory_kwargs(self) -> AsyncpgPoolFactoryKwargs:\n async def set_custom_codecs(conn):\n await conn.set_type_codec(\n \"jsonb\",\n encoder=json.dumps,\n decoder=json.loads,\n schema=\"pg_catalog\",\n )\n return AsyncpgPoolFactoryKwargs(init=set_custom_codecs)\n\napp = Application(\n YAMLConfigurationService(),\n AsyncpgHAPostgresDatabaseConfigurationService(),\n AsyncpgHAPostgresDatabaseService(),\n di_providers=[AsyncpgJSONCodecProvider()],\n)\n```\n\n> [!IMPORTANT]\\\n> If you use the built-in [`AsyncpgPostgresDatabaseConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/service.py) or\n> [`AsyncpgHAPostgresDatabaseConfigurationService`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg_ha/service.py), they already register a\n> default provider for [`AsyncpgPoolFactoryKwargs`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py). To customize pool options,\n> declare your provider with `@provide(override=True)` so it overrides the\n> built-in one; otherwise container validation will fail.\\\n> When you provide your own [`AsyncpgPoolFactoryKwargs`](https://github.com/Fatal1ty/operetta/blob/main/operetta/integrations/asyncpg/config.py) and there is an existing\n> default provider from those services, `override=True` is mandatory.\n\nDefine your own config providers (e.g., from environment variables) if you don't want to use YAML-based ones:\n\n```python\nimport os\nfrom dishka import Provider, Scope, provide\nfrom operetta.app import Application\nfrom operetta.integrations.asyncpg.config import (\n AsyncpgPostgresDatabaseConfig,\n AsyncpgPoolFactoryKwargs,\n)\nfrom operetta.integrations.asyncpg.service import AsyncpgPostgresDatabaseService\n\nclass EnvAsyncpgConfigProvider(Provider):\n scope = Scope.APP\n\n @provide\n def get_db_config(self) -> AsyncpgPostgresDatabaseConfig:\n return AsyncpgPostgresDatabaseConfig(\n user=os.getenv(\"PGUSER\", \"app\"),\n database=os.getenv(\"PGDATABASE\", \"appdb\"),\n host=os.getenv(\"PGHOST\", \"127.0.0.1:5432\"),\n password=os.getenv(\"PGPASSWORD\"),\n )\n\n @provide\n def get_pool_factory_kwargs(self) -> AsyncpgPoolFactoryKwargs:\n return {}\n\napp = Application(\n AsyncpgPostgresDatabaseService(),\n di_providers=[EnvAsyncpgConfigProvider()],\n)\n```\n\nExample of an environment-based HA config provider:\n\n```python\nimport os\nfrom dishka import Provider, Scope, provide\nfrom operetta.app import Application\nfrom operetta.integrations.asyncpg.config import AsyncpgPoolFactoryKwargs\nfrom operetta.integrations.asyncpg_ha.config import AsyncpgHAPostgresDatabaseConfig\nfrom operetta.integrations.asyncpg_ha.service import (\n AsyncpgHAPostgresDatabaseConfigurationService,\n AsyncpgHAPostgresDatabaseService,\n)\n\n\nclass EnvHasqlConfigProvider(Provider):\n scope = Scope.APP\n\n @provide\n def get_db_config(self) -> AsyncpgHAPostgresDatabaseConfig:\n hosts = os.getenv(\"PGHOSTS\", \"10.0.0.1:5432,10.0.0.2:5432\").split(\",\")\n return AsyncpgHAPostgresDatabaseConfig(\n user=os.getenv(\"PGUSER\", \"app\"),\n database=os.getenv(\"PGDATABASE\", \"appdb\"),\n hosts=[h.strip() for h in hosts if h.strip()],\n password=os.getenv(\"PGPASSWORD\"),\n min_masters=int(os.getenv(\"PG_MIN_MASTERS\", \"1\")),\n min_replicas=int(os.getenv(\"PG_MIN_REPLICAS\", \"1\")),\n )\n\n @provide\n def get_pool_factory_kwargs(self) -> AsyncpgPoolFactoryKwargs:\n return {}\n\n\napp = Application(\n AsyncpgHAPostgresDatabaseService(),\n di_providers=[EnvHasqlConfigProvider()],\n)\n```\n",
"bugtrack_url": null,
"license": "Apache-2.0",
"summary": null,
"version": "0.0.9",
"project_urls": null,
"split_keywords": [
"application",
" framework",
" ddd",
" domain-driven design",
" asyncio",
" microservices",
" microservice",
" dependency-injection",
" dependency injection",
" di",
" web",
" api",
" rest",
" aiohttp",
" http",
" openapi",
" openapi3",
" swagger",
" swagger-ui",
" redoc",
" postgres",
" postgresql",
" database",
" asyncpg",
" hasql",
" aiomisc"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "d8af4e67a61a9d46ddc564113cf83872efb7905f3329d5e2993b514c75a59226",
"md5": "9d23ada0cc8b1690d4ed7f99327d04e7",
"sha256": "e9874b01f5a4a1cbf735650629c3395f5c7c6a93e04c1e999f5376ef6a984218"
},
"downloads": -1,
"filename": "operetta-0.0.9-py3-none-any.whl",
"has_sig": false,
"md5_digest": "9d23ada0cc8b1690d4ed7f99327d04e7",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.11",
"size": 43318,
"upload_time": "2025-10-10T19:51:39",
"upload_time_iso_8601": "2025-10-10T19:51:39.080052Z",
"url": "https://files.pythonhosted.org/packages/d8/af/4e67a61a9d46ddc564113cf83872efb7905f3329d5e2993b514c75a59226/operetta-0.0.9-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "1561e5d6e300fa8d9fbe94d73fc2e6378deaf50950f367756cdb7da2de3c7522",
"md5": "f925b7b77ce60a71e0e4ace3a56901fc",
"sha256": "076cd9be5f0bc2cc5f92f56e2ffee04f5748964914ce37ddb97aa61c87639056"
},
"downloads": -1,
"filename": "operetta-0.0.9.tar.gz",
"has_sig": false,
"md5_digest": "f925b7b77ce60a71e0e4ace3a56901fc",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.11",
"size": 37586,
"upload_time": "2025-10-10T19:51:40",
"upload_time_iso_8601": "2025-10-10T19:51:40.547832Z",
"url": "https://files.pythonhosted.org/packages/15/61/e5d6e300fa8d9fbe94d73fc2e6378deaf50950f367756cdb7da2de3c7522/operetta-0.0.9.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-10-10 19:51:40",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "operetta"
}