dbop-core


Namedbop-core JSON
Version 1.0.0 PyPI version JSON
download
home_pageNone
SummaryDB-agnostic operation runner with retries/backoff and pluggable scopes/classifiers
upload_time2025-10-19 16:33:40
maintainerNone
docs_urlNone
authorYoussef Khaya
requires_python>=3.9
licenseMIT
keywords retry backoff database resilience asyncio
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # dbop-core

**DB-agnostic retry runner** for Python database operations.
You bring the driver or ORM — `dbop-core` gives you:

* **Retries with backoff and jitter**
* **Attempt scopes** (transaction / SAVEPOINT wrappers)
* **Per-attempt hooks** (e.g. set timeouts, apply metadata)
* **Transient error classification**

Lightweight and composable — the core doesn’t know your driver.
Adapters live under `contrib/` (SQLAlchemy, psycopg, asyncpg, aiomysql, aiosqlite, and generic DB-API).

---

## When to Use

Use `dbop-core` when you need **resilience for a single logical DB step**:

* Occasional **deadlocks** or **lock wait timeouts**
* **Slow statements** that risk blocking your pool
* **SAVEPOINT-style retries** inside an outer transaction
* **Per-attempt timeouts** without rewriting your app logic

It’s not a migration tool or pooler — just a precise **execution runner** for safe retries.

---

## Features

* ✅ **Async-first core API**, works with sync *and* async drivers
* 🔁 **Retry policy**: max retries, exponential backoff, jitter, caps
* 🧩 **Attempt scopes**: pluggable context managers (transaction/savepoint)
* ⚙️ **Per-attempt hooks**: run custom setup (timeouts, instrumentation)
* 🧠 **Transient classifier**: decide whether an exception should retry

---

## Installation

```bash
pip install dbop-core

# optional extras for contrib adapters
pip install "dbop-core[sqlalchemy]"
pip install "dbop-core[psycopg]"
pip install "dbop-core[asyncpg]"
pip install "dbop-core[aiomysql]"
pip install "dbop-core[aiosqlite]"
```

**Compatibility:** Python 3.9 – 3.13

---

## Quickstart

```python
from dbop_core.core import execute, RetryPolicy

async def op(x):
    return x * 2

result = await execute(op, args=(21,), policy=RetryPolicy())
assert result == 42
```

---

## Core API (Essentials)

```python
await execute(
    op,                        # callable: sync or async
    args=(), kwargs=None,
    retry_on=(Exception,),     # types to retry
    classifier=None,           # fn(exc) -> bool; True = retry
    raises=True,               # if False, return default on final failure
    default=None,
    policy=RetryPolicy(),      # backoff/jitter settings
    attempt_scope=None,        # sync AttemptScope
    attempt_scope_async=None,  # async AttemptScope
    pre_attempt=None,          # async setup hook
    read_only=False,           # passed to scopes
    overall_timeout_s=None,    # per-attempt timeout
)
```

**Semantics**

* Only exceptions in `retry_on` are candidates for retry.
* If `classifier` is provided, it takes precedence per exception (`True` → retry, `False` → stop).
* `overall_timeout_s` cancels the attempt; if `raises=False`, you get `default`.
* `pre_attempt` is always async — even for sync drivers (wrap your sync setup with `async def pre(): ...`).
---
### Execution Flow (Conceptual Diagram)

Below is a simplified view of what happens inside `execute()` during retries.

```text
 ┌──────────────────────────────────────────────────────────────┐
 │                     execute() lifecycle                      │
 └──────────────────────────────────────────────────────────────┘
            │
            ▼
    [1] start execute()
            │
            │
            ▼
    [2] initialize RetryPolicy
        - max_retries, delay, jitter, etc.
        - retry_on exception types
            │
            ▼
    [3] for each attempt (1..N):
            │
            ├─► [3.1] pre_attempt()
            │        (async setup hook)
            │        e.g., apply_timeouts, reset state
            │
            ├─► [3.2] attempt_scope / attempt_scope_async
            │        (transaction or SAVEPOINT wrapper)
            │
            ├─► [3.3] call op(*args, **kwargs)
            │        (sync or async function)
            │
            ├─► [3.4] if success → return result
            │
            ├─► [3.5] if exception:
            │       ├─ check type in retry_on
            │       ├─ run classifier(exc)
            │       ├─ if transient → sleep(backoff) → retry
            │       └─ else → re-raise (or return default)
            │
            ▼
    [4] if all retries failed:
        - return default (if raises=False)
        - or raise last exception
```

**Key concepts:**

* `attempt_scope` isolates one DB operation (transaction or savepoint).
  If the attempt fails, the scope rolls back and prepares for retry.
* `pre_attempt` runs before each try — perfect for **timeouts**, **instrumentation**, or **context tagging**.
* `RetryPolicy` determines how long to wait and how many times to retry.

---

### Design Philosophy

Database operations often need **fine-grained resilience** — but frameworks usually give you an all-or-nothing approach:

* Retry at the HTTP or ORM layer (too coarse).
* Manual retry loops around transactions (too error-prone).
* Connection poolers that retry implicitly (too opaque).

`dbop-core` exists to make retries **explicit, minimal, and driver-agnostic**.
It focuses on **one unit of work** — one statement, one transaction, one savepoint — and lets *you* decide:

* ✅ *When to retry* (`classifier`, `retry_on`)
* ✅ *How to retry* (`RetryPolicy`, exponential backoff + jitter)
* ✅ *Where to isolate* (`attempt_scope` / `attempt_scope_async`)
* ✅ *What to prepare* before each try (`pre_attempt` hook)

Everything else — connection pooling, ORM sessions, schema migration — stays out of scope.
This separation keeps `dbop-core` **composable**, **transparent**, and **safe** to embed anywhere in your stack — from raw DB-API connections to async SQLAlchemy sessions or FastAPI background tasks.

> In short: **`dbop-core` doesn’t manage your database. It helps you survive it.**

---

**Execution modes:**

| Driver Type | Scope used            | Hook type                | Example Adapter                       |
| ----------- | --------------------- | ------------------------ | ------------------------------------- |
| Sync        | `attempt_scope`       | `apply_timeouts_sync()`  | DB-API, SQLAlchemy                    |
| Async       | `attempt_scope_async` | `apply_timeouts_async()` | asyncpg, psycopg, aiomysql, aiosqlite |

---

## Contrib Adapters

| Adapter                   | Sync/Async | Backend               | File                                  |
| ------------------------- | ---------- | --------------------- | ------------------------------------- |
| DB-API (generic)          | Sync       | Postgres/MySQL/SQLite | `contrib/dbapi_adapter.py`            |
| SQLAlchemy (Session)      | Sync       | Any                   | `contrib/sqlalchemy_adapter.py`       |
| SQLAlchemy (AsyncSession) | Async      | Any                   | `contrib/sqlalchemy_adapter_async.py` |
| psycopg 3                 | Async      | Postgres              | `contrib/psycopg_adapter.py`          |
| asyncpg                   | Async      | Postgres              | `contrib/asyncpg_adapter.py`          |
| aiomysql                  | Async      | MySQL/MariaDB         | `contrib/aiomysql_adapter.py`         |
| aiosqlite                 | Async      | SQLite                | `contrib/aiosqlite_adapter.py`        |

---

### SQLAlchemy (Sync Example)

```python
import asyncio
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from dbop_core.core import execute, RetryPolicy
from dbop_core.contrib.sqlalchemy_adapter import attempt_scope_sync

engine = create_engine("sqlite+pysqlite:///:memory:")
Session = sessionmaker(bind=engine)

def setup(sess):
    sess.execute(text("CREATE TABLE IF NOT EXISTS kv(k TEXT PRIMARY KEY, v TEXT)"))
def put(sess, k, v):
    sess.execute(text("INSERT OR REPLACE INTO kv VALUES (:k,:v)"), {"k": k, "v": v})
def get(sess, k):
    return sess.execute(text("SELECT v FROM kv WHERE k=:k"), {"k": k}).scalar()

async def main():
    pol = RetryPolicy(max_retries=3, initial_delay=0.05, max_delay=0.2)
    with Session() as sess:
        with sess.begin(): setup(sess)

        with sess.begin():
            await execute(lambda: put(sess, "hello", "world"),
                attempt_scope=lambda r=False: attempt_scope_sync(sess, read_only=r),
                policy=pol)

        with sess.begin():
            val = await execute(lambda: get(sess, "hello"),
                attempt_scope=lambda r=False: attempt_scope_sync(sess, read_only=r),
                policy=pol, read_only=True)
            print(val)

asyncio.run(main())
```

---

### psycopg (Postgres, Async)

```python
from functools import partial
from psycopg import AsyncConnection
from dbop_core.core import execute, RetryPolicy
from dbop_core.classify import dbapi_classifier
from dbop_core.contrib.psycopg_adapter import attempt_scope_async, apply_timeouts_async

DSN = "postgresql://postgres:postgres@localhost:5432/dbop"

async def pre(conn):  # per-attempt setup
    await apply_timeouts_async(conn, lock_timeout_s=3, stmt_timeout_s=10)

async def run():
    async with AsyncConnection.connect(DSN) as conn:
        pol = RetryPolicy(max_retries=5, initial_delay=0.05, max_delay=0.5)

        await execute(
            lambda: conn.execute("INSERT INTO items(name) VALUES ('gamma') ON CONFLICT DO NOTHING"),
            classifier=dbapi_classifier,
            attempt_scope_async=lambda r=False: attempt_scope_async(conn, read_only=r),
            pre_attempt=partial(pre, conn),
            policy=pol,
        )

        count = await execute(
            lambda: conn.execute("SELECT COUNT(*) FROM items"),
            classifier=dbapi_classifier,
            attempt_scope_async=lambda r=False: attempt_scope_async(conn, read_only=r),
            pre_attempt=partial(pre, conn),
            policy=pol, read_only=True,
        )
        print("count:", count)
```

---

### Generic DB-API (Sync, e.g. SQLite)

```python
import asyncio, sqlite3
from dbop_core.core import execute, RetryPolicy
from dbop_core.contrib.dbapi_adapter import attempt_scope_sync, apply_timeouts_sync

conn = sqlite3.connect(":memory:")

def create(): conn.execute("CREATE TABLE IF NOT EXISTS t(x INT)")
def insert(): conn.execute("INSERT INTO t(x) VALUES (1)")
def count(): return conn.execute("SELECT COUNT(*) FROM t").fetchone()[0]

async def pre():
    apply_timeouts_sync(conn, backend="sqlite", lock_timeout_s=3)

async def main():
    create()
    pol = RetryPolicy(max_retries=2, initial_delay=0.05, max_delay=0.2)

    await execute(lambda: insert(),
        attempt_scope=lambda r=False: attempt_scope_sync(conn, read_only=r, backend="sqlite"),
        pre_attempt=pre, policy=pol)

    n = await execute(lambda: count(),
        attempt_scope=lambda r=False: attempt_scope_sync(conn, read_only=True, backend="sqlite"),
        pre_attempt=pre, policy=pol, read_only=True)
    print("rows:", n)

asyncio.run(main())
```

---

## Timeout Mapping (per attempt)

| Backend           | Mechanism                                                      |
| ----------------- | -------------------------------------------------------------- |
| **PostgreSQL**    | `SET LOCAL lock_timeout`, `SET LOCAL statement_timeout`        |
| **MySQL/MariaDB** | `innodb_lock_wait_timeout`, `MAX_EXECUTION_TIME` (best-effort) |
| **SQLite**        | `PRAGMA busy_timeout` (connection-level)                       |

Use your adapter’s `apply_timeouts_*` in `pre_attempt()`.

---

## Transient Classification

`dbapi_classifier` detects common transient patterns:

| Backend       | Typical Transient Codes / Messages               |
| ------------- | ------------------------------------------------ |
| Postgres      | `40P01` (deadlock), `55P03` (lock not available) |
| MySQL/MariaDB | `1213`, `1205`, connection lost                  |
| SQLite        | `database is locked`                             |
| Generic       | Operational/timeouts from DB-API                 |

You can always plug in your own classifier:
`classifier(exc) -> bool`.

---

## Examples

```bash
cd examples
cp .env.sample .env  # configure DSNs

# SQLite (local)
make install-sqlite-local && make run-sqlite

# Postgres (Docker)
make pg-up && make install-psycopg-local && make run-psycopg
make install-asyncpg-local && make run-asyncpg
make pg-down

# MySQL (Docker)
make mysql-up && make install-mysql-local && make run-mysql
make mysql-down
```

---

## Roadmap

* OTLP tracing (spans around retries)
* Instrumentation hooks (OpenTelemetry / Prometheus)
* More contrib adapters (`databases`, `gino`, etc.)
* Extended cookbook examples

---

## Changelog

See [CHANGELOG.md](CHANGELOG.md)

---

## License

MIT

---

## Author

**Youssef Khaya**
[LinkedIn](https://www.linkedin.com/in/youssef-khaya-88a1a128)
[GitHub](https://github.com/yokha/dbop-core)

---

### Optional badges for later

Once you publish to PyPI and GitHub Actions:

```markdown
[![PyPI version](https://img.shields.io/pypi/v/dbop-core.svg)](https://pypi.org/project/dbop-core/)
[![Build Status](https://github.com/yokha/dbop-core/actions/workflows/test.yml/badge.svg)](https://github.com/yokha/dbop-core/actions)
[![Coverage Status](https://img.shields.io/codecov/c/github/yokha/dbop-core.svg)](https://codecov.io/gh/yokha/dbop-core)
```

---

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "dbop-core",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.9",
    "maintainer_email": null,
    "keywords": "retry, backoff, database, resilience, asyncio",
    "author": "Youssef Khaya",
    "author_email": null,
    "download_url": "https://files.pythonhosted.org/packages/33/da/1fb9ef68ff5c1f215e9908a0ed0ac2f791a93b6863baffd2f047eed484a2/dbop_core-1.0.0.tar.gz",
    "platform": null,
    "description": "# dbop-core\n\n**DB-agnostic retry runner** for Python database operations.\nYou bring the driver or ORM \u2014 `dbop-core` gives you:\n\n* **Retries with backoff and jitter**\n* **Attempt scopes** (transaction / SAVEPOINT wrappers)\n* **Per-attempt hooks** (e.g. set timeouts, apply metadata)\n* **Transient error classification**\n\nLightweight and composable \u2014 the core doesn\u2019t know your driver.\nAdapters live under `contrib/` (SQLAlchemy, psycopg, asyncpg, aiomysql, aiosqlite, and generic DB-API).\n\n---\n\n## When to Use\n\nUse `dbop-core` when you need **resilience for a single logical DB step**:\n\n* Occasional **deadlocks** or **lock wait timeouts**\n* **Slow statements** that risk blocking your pool\n* **SAVEPOINT-style retries** inside an outer transaction\n* **Per-attempt timeouts** without rewriting your app logic\n\nIt\u2019s not a migration tool or pooler \u2014 just a precise **execution runner** for safe retries.\n\n---\n\n## Features\n\n* \u2705 **Async-first core API**, works with sync *and* async drivers\n* \ud83d\udd01 **Retry policy**: max retries, exponential backoff, jitter, caps\n* \ud83e\udde9 **Attempt scopes**: pluggable context managers (transaction/savepoint)\n* \u2699\ufe0f **Per-attempt hooks**: run custom setup (timeouts, instrumentation)\n* \ud83e\udde0 **Transient classifier**: decide whether an exception should retry\n\n---\n\n## Installation\n\n```bash\npip install dbop-core\n\n# optional extras for contrib adapters\npip install \"dbop-core[sqlalchemy]\"\npip install \"dbop-core[psycopg]\"\npip install \"dbop-core[asyncpg]\"\npip install \"dbop-core[aiomysql]\"\npip install \"dbop-core[aiosqlite]\"\n```\n\n**Compatibility:** Python 3.9 \u2013 3.13\n\n---\n\n## Quickstart\n\n```python\nfrom dbop_core.core import execute, RetryPolicy\n\nasync def op(x):\n    return x * 2\n\nresult = await execute(op, args=(21,), policy=RetryPolicy())\nassert result == 42\n```\n\n---\n\n## Core API (Essentials)\n\n```python\nawait execute(\n    op,                        # callable: sync or async\n    args=(), kwargs=None,\n    retry_on=(Exception,),     # types to retry\n    classifier=None,           # fn(exc) -> bool; True = retry\n    raises=True,               # if False, return default on final failure\n    default=None,\n    policy=RetryPolicy(),      # backoff/jitter settings\n    attempt_scope=None,        # sync AttemptScope\n    attempt_scope_async=None,  # async AttemptScope\n    pre_attempt=None,          # async setup hook\n    read_only=False,           # passed to scopes\n    overall_timeout_s=None,    # per-attempt timeout\n)\n```\n\n**Semantics**\n\n* Only exceptions in `retry_on` are candidates for retry.\n* If `classifier` is provided, it takes precedence per exception (`True` \u2192 retry, `False` \u2192 stop).\n* `overall_timeout_s` cancels the attempt; if `raises=False`, you get `default`.\n* `pre_attempt` is always async \u2014 even for sync drivers (wrap your sync setup with `async def pre(): ...`).\n---\n### Execution Flow (Conceptual Diagram)\n\nBelow is a simplified view of what happens inside `execute()` during retries.\n\n```text\n \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510\n \u2502                     execute() lifecycle                      \u2502\n \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518\n            \u2502\n            \u25bc\n    [1] start execute()\n            \u2502\n            \u2502\n            \u25bc\n    [2] initialize RetryPolicy\n        - max_retries, delay, jitter, etc.\n        - retry_on exception types\n            \u2502\n            \u25bc\n    [3] for each attempt (1..N):\n            \u2502\n            \u251c\u2500\u25ba [3.1] pre_attempt()\n            \u2502        (async setup hook)\n            \u2502        e.g., apply_timeouts, reset state\n            \u2502\n            \u251c\u2500\u25ba [3.2] attempt_scope / attempt_scope_async\n            \u2502        (transaction or SAVEPOINT wrapper)\n            \u2502\n            \u251c\u2500\u25ba [3.3] call op(*args, **kwargs)\n            \u2502        (sync or async function)\n            \u2502\n            \u251c\u2500\u25ba [3.4] if success \u2192 return result\n            \u2502\n            \u251c\u2500\u25ba [3.5] if exception:\n            \u2502       \u251c\u2500 check type in retry_on\n            \u2502       \u251c\u2500 run classifier(exc)\n            \u2502       \u251c\u2500 if transient \u2192 sleep(backoff) \u2192 retry\n            \u2502       \u2514\u2500 else \u2192 re-raise (or return default)\n            \u2502\n            \u25bc\n    [4] if all retries failed:\n        - return default (if raises=False)\n        - or raise last exception\n```\n\n**Key concepts:**\n\n* `attempt_scope` isolates one DB operation (transaction or savepoint).\n  If the attempt fails, the scope rolls back and prepares for retry.\n* `pre_attempt` runs before each try \u2014 perfect for **timeouts**, **instrumentation**, or **context tagging**.\n* `RetryPolicy` determines how long to wait and how many times to retry.\n\n---\n\n### Design Philosophy\n\nDatabase operations often need **fine-grained resilience** \u2014 but frameworks usually give you an all-or-nothing approach:\n\n* Retry at the HTTP or ORM layer (too coarse).\n* Manual retry loops around transactions (too error-prone).\n* Connection poolers that retry implicitly (too opaque).\n\n`dbop-core` exists to make retries **explicit, minimal, and driver-agnostic**.\nIt focuses on **one unit of work** \u2014 one statement, one transaction, one savepoint \u2014 and lets *you* decide:\n\n* \u2705 *When to retry* (`classifier`, `retry_on`)\n* \u2705 *How to retry* (`RetryPolicy`, exponential backoff + jitter)\n* \u2705 *Where to isolate* (`attempt_scope` / `attempt_scope_async`)\n* \u2705 *What to prepare* before each try (`pre_attempt` hook)\n\nEverything else \u2014 connection pooling, ORM sessions, schema migration \u2014 stays out of scope.\nThis separation keeps `dbop-core` **composable**, **transparent**, and **safe** to embed anywhere in your stack \u2014 from raw DB-API connections to async SQLAlchemy sessions or FastAPI background tasks.\n\n> In short: **`dbop-core` doesn\u2019t manage your database. It helps you survive it.**\n\n---\n\n**Execution modes:**\n\n| Driver Type | Scope used            | Hook type                | Example Adapter                       |\n| ----------- | --------------------- | ------------------------ | ------------------------------------- |\n| Sync        | `attempt_scope`       | `apply_timeouts_sync()`  | DB-API, SQLAlchemy                    |\n| Async       | `attempt_scope_async` | `apply_timeouts_async()` | asyncpg, psycopg, aiomysql, aiosqlite |\n\n---\n\n## Contrib Adapters\n\n| Adapter                   | Sync/Async | Backend               | File                                  |\n| ------------------------- | ---------- | --------------------- | ------------------------------------- |\n| DB-API (generic)          | Sync       | Postgres/MySQL/SQLite | `contrib/dbapi_adapter.py`            |\n| SQLAlchemy (Session)      | Sync       | Any                   | `contrib/sqlalchemy_adapter.py`       |\n| SQLAlchemy (AsyncSession) | Async      | Any                   | `contrib/sqlalchemy_adapter_async.py` |\n| psycopg 3                 | Async      | Postgres              | `contrib/psycopg_adapter.py`          |\n| asyncpg                   | Async      | Postgres              | `contrib/asyncpg_adapter.py`          |\n| aiomysql                  | Async      | MySQL/MariaDB         | `contrib/aiomysql_adapter.py`         |\n| aiosqlite                 | Async      | SQLite                | `contrib/aiosqlite_adapter.py`        |\n\n---\n\n### SQLAlchemy (Sync Example)\n\n```python\nimport asyncio\nfrom sqlalchemy import create_engine, text\nfrom sqlalchemy.orm import sessionmaker\nfrom dbop_core.core import execute, RetryPolicy\nfrom dbop_core.contrib.sqlalchemy_adapter import attempt_scope_sync\n\nengine = create_engine(\"sqlite+pysqlite:///:memory:\")\nSession = sessionmaker(bind=engine)\n\ndef setup(sess):\n    sess.execute(text(\"CREATE TABLE IF NOT EXISTS kv(k TEXT PRIMARY KEY, v TEXT)\"))\ndef put(sess, k, v):\n    sess.execute(text(\"INSERT OR REPLACE INTO kv VALUES (:k,:v)\"), {\"k\": k, \"v\": v})\ndef get(sess, k):\n    return sess.execute(text(\"SELECT v FROM kv WHERE k=:k\"), {\"k\": k}).scalar()\n\nasync def main():\n    pol = RetryPolicy(max_retries=3, initial_delay=0.05, max_delay=0.2)\n    with Session() as sess:\n        with sess.begin(): setup(sess)\n\n        with sess.begin():\n            await execute(lambda: put(sess, \"hello\", \"world\"),\n                attempt_scope=lambda r=False: attempt_scope_sync(sess, read_only=r),\n                policy=pol)\n\n        with sess.begin():\n            val = await execute(lambda: get(sess, \"hello\"),\n                attempt_scope=lambda r=False: attempt_scope_sync(sess, read_only=r),\n                policy=pol, read_only=True)\n            print(val)\n\nasyncio.run(main())\n```\n\n---\n\n### psycopg (Postgres, Async)\n\n```python\nfrom functools import partial\nfrom psycopg import AsyncConnection\nfrom dbop_core.core import execute, RetryPolicy\nfrom dbop_core.classify import dbapi_classifier\nfrom dbop_core.contrib.psycopg_adapter import attempt_scope_async, apply_timeouts_async\n\nDSN = \"postgresql://postgres:postgres@localhost:5432/dbop\"\n\nasync def pre(conn):  # per-attempt setup\n    await apply_timeouts_async(conn, lock_timeout_s=3, stmt_timeout_s=10)\n\nasync def run():\n    async with AsyncConnection.connect(DSN) as conn:\n        pol = RetryPolicy(max_retries=5, initial_delay=0.05, max_delay=0.5)\n\n        await execute(\n            lambda: conn.execute(\"INSERT INTO items(name) VALUES ('gamma') ON CONFLICT DO NOTHING\"),\n            classifier=dbapi_classifier,\n            attempt_scope_async=lambda r=False: attempt_scope_async(conn, read_only=r),\n            pre_attempt=partial(pre, conn),\n            policy=pol,\n        )\n\n        count = await execute(\n            lambda: conn.execute(\"SELECT COUNT(*) FROM items\"),\n            classifier=dbapi_classifier,\n            attempt_scope_async=lambda r=False: attempt_scope_async(conn, read_only=r),\n            pre_attempt=partial(pre, conn),\n            policy=pol, read_only=True,\n        )\n        print(\"count:\", count)\n```\n\n---\n\n### Generic DB-API (Sync, e.g. SQLite)\n\n```python\nimport asyncio, sqlite3\nfrom dbop_core.core import execute, RetryPolicy\nfrom dbop_core.contrib.dbapi_adapter import attempt_scope_sync, apply_timeouts_sync\n\nconn = sqlite3.connect(\":memory:\")\n\ndef create(): conn.execute(\"CREATE TABLE IF NOT EXISTS t(x INT)\")\ndef insert(): conn.execute(\"INSERT INTO t(x) VALUES (1)\")\ndef count(): return conn.execute(\"SELECT COUNT(*) FROM t\").fetchone()[0]\n\nasync def pre():\n    apply_timeouts_sync(conn, backend=\"sqlite\", lock_timeout_s=3)\n\nasync def main():\n    create()\n    pol = RetryPolicy(max_retries=2, initial_delay=0.05, max_delay=0.2)\n\n    await execute(lambda: insert(),\n        attempt_scope=lambda r=False: attempt_scope_sync(conn, read_only=r, backend=\"sqlite\"),\n        pre_attempt=pre, policy=pol)\n\n    n = await execute(lambda: count(),\n        attempt_scope=lambda r=False: attempt_scope_sync(conn, read_only=True, backend=\"sqlite\"),\n        pre_attempt=pre, policy=pol, read_only=True)\n    print(\"rows:\", n)\n\nasyncio.run(main())\n```\n\n---\n\n## Timeout Mapping (per attempt)\n\n| Backend           | Mechanism                                                      |\n| ----------------- | -------------------------------------------------------------- |\n| **PostgreSQL**    | `SET LOCAL lock_timeout`, `SET LOCAL statement_timeout`        |\n| **MySQL/MariaDB** | `innodb_lock_wait_timeout`, `MAX_EXECUTION_TIME` (best-effort) |\n| **SQLite**        | `PRAGMA busy_timeout` (connection-level)                       |\n\nUse your adapter\u2019s `apply_timeouts_*` in `pre_attempt()`.\n\n---\n\n## Transient Classification\n\n`dbapi_classifier` detects common transient patterns:\n\n| Backend       | Typical Transient Codes / Messages               |\n| ------------- | ------------------------------------------------ |\n| Postgres      | `40P01` (deadlock), `55P03` (lock not available) |\n| MySQL/MariaDB | `1213`, `1205`, connection lost                  |\n| SQLite        | `database is locked`                             |\n| Generic       | Operational/timeouts from DB-API                 |\n\nYou can always plug in your own classifier:\n`classifier(exc) -> bool`.\n\n---\n\n## Examples\n\n```bash\ncd examples\ncp .env.sample .env  # configure DSNs\n\n# SQLite (local)\nmake install-sqlite-local && make run-sqlite\n\n# Postgres (Docker)\nmake pg-up && make install-psycopg-local && make run-psycopg\nmake install-asyncpg-local && make run-asyncpg\nmake pg-down\n\n# MySQL (Docker)\nmake mysql-up && make install-mysql-local && make run-mysql\nmake mysql-down\n```\n\n---\n\n## Roadmap\n\n* OTLP tracing (spans around retries)\n* Instrumentation hooks (OpenTelemetry / Prometheus)\n* More contrib adapters (`databases`, `gino`, etc.)\n* Extended cookbook examples\n\n---\n\n## Changelog\n\nSee [CHANGELOG.md](CHANGELOG.md)\n\n---\n\n## License\n\nMIT\n\n---\n\n## Author\n\n**Youssef Khaya**\n[LinkedIn](https://www.linkedin.com/in/youssef-khaya-88a1a128)\n[GitHub](https://github.com/yokha/dbop-core)\n\n---\n\n### Optional badges for later\n\nOnce you publish to PyPI and GitHub Actions:\n\n```markdown\n[![PyPI version](https://img.shields.io/pypi/v/dbop-core.svg)](https://pypi.org/project/dbop-core/)\n[![Build Status](https://github.com/yokha/dbop-core/actions/workflows/test.yml/badge.svg)](https://github.com/yokha/dbop-core/actions)\n[![Coverage Status](https://img.shields.io/codecov/c/github/yokha/dbop-core.svg)](https://codecov.io/gh/yokha/dbop-core)\n```\n\n---\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "DB-agnostic operation runner with retries/backoff and pluggable scopes/classifiers",
    "version": "1.0.0",
    "project_urls": null,
    "split_keywords": [
        "retry",
        " backoff",
        " database",
        " resilience",
        " asyncio"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "f8124c5ebef532b0ec5e949ded73fa205471b2da3ed6f47cb4845b47aca13b50",
                "md5": "03b284cd3c4beda0b39444cda1b68b10",
                "sha256": "eb9cfae051630ecfc40405c6fc2395f42d5bb10b82c488f02d3bd9df1df6c242"
            },
            "downloads": -1,
            "filename": "dbop_core-1.0.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "03b284cd3c4beda0b39444cda1b68b10",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.9",
            "size": 16308,
            "upload_time": "2025-10-19T16:33:38",
            "upload_time_iso_8601": "2025-10-19T16:33:38.227662Z",
            "url": "https://files.pythonhosted.org/packages/f8/12/4c5ebef532b0ec5e949ded73fa205471b2da3ed6f47cb4845b47aca13b50/dbop_core-1.0.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "33da1fb9ef68ff5c1f215e9908a0ed0ac2f791a93b6863baffd2f047eed484a2",
                "md5": "469980d30b2f229f029d2c2f8cbfb5ec",
                "sha256": "1668f859ead66382775bc3f3feaa7def3b22981ec6aa396302b63792d0901834"
            },
            "downloads": -1,
            "filename": "dbop_core-1.0.0.tar.gz",
            "has_sig": false,
            "md5_digest": "469980d30b2f229f029d2c2f8cbfb5ec",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.9",
            "size": 21653,
            "upload_time": "2025-10-19T16:33:40",
            "upload_time_iso_8601": "2025-10-19T16:33:40.040344Z",
            "url": "https://files.pythonhosted.org/packages/33/da/1fb9ef68ff5c1f215e9908a0ed0ac2f791a93b6863baffd2f047eed484a2/dbop_core-1.0.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-10-19 16:33:40",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "dbop-core"
}
        
Elapsed time: 1.30812s