# dbop-core
**DB-agnostic retry runner** for Python database operations.
You bring the driver or ORM — `dbop-core` gives you:
* **Retries with backoff and jitter**
* **Attempt scopes** (transaction / SAVEPOINT wrappers)
* **Per-attempt hooks** (e.g. set timeouts, apply metadata)
* **Transient error classification**
Lightweight and composable — the core doesn’t know your driver.
Adapters live under `contrib/` (SQLAlchemy, psycopg, asyncpg, aiomysql, aiosqlite, and generic DB-API).
---
## When to Use
Use `dbop-core` when you need **resilience for a single logical DB step**:
* Occasional **deadlocks** or **lock wait timeouts**
* **Slow statements** that risk blocking your pool
* **SAVEPOINT-style retries** inside an outer transaction
* **Per-attempt timeouts** without rewriting your app logic
It’s not a migration tool or pooler — just a precise **execution runner** for safe retries.
---
## Features
* ✅ **Async-first core API**, works with sync *and* async drivers
* 🔁 **Retry policy**: max retries, exponential backoff, jitter, caps
* 🧩 **Attempt scopes**: pluggable context managers (transaction/savepoint)
* ⚙️ **Per-attempt hooks**: run custom setup (timeouts, instrumentation)
* 🧠 **Transient classifier**: decide whether an exception should retry
---
## Installation
```bash
pip install dbop-core
# optional extras for contrib adapters
pip install "dbop-core[sqlalchemy]"
pip install "dbop-core[psycopg]"
pip install "dbop-core[asyncpg]"
pip install "dbop-core[aiomysql]"
pip install "dbop-core[aiosqlite]"
```
**Compatibility:** Python 3.9 – 3.13
---
## Quickstart
```python
from dbop_core.core import execute, RetryPolicy
async def op(x):
return x * 2
result = await execute(op, args=(21,), policy=RetryPolicy())
assert result == 42
```
---
## Core API (Essentials)
```python
await execute(
op, # callable: sync or async
args=(), kwargs=None,
retry_on=(Exception,), # types to retry
classifier=None, # fn(exc) -> bool; True = retry
raises=True, # if False, return default on final failure
default=None,
policy=RetryPolicy(), # backoff/jitter settings
attempt_scope=None, # sync AttemptScope
attempt_scope_async=None, # async AttemptScope
pre_attempt=None, # async setup hook
read_only=False, # passed to scopes
overall_timeout_s=None, # per-attempt timeout
)
```
**Semantics**
* Only exceptions in `retry_on` are candidates for retry.
* If `classifier` is provided, it takes precedence per exception (`True` → retry, `False` → stop).
* `overall_timeout_s` cancels the attempt; if `raises=False`, you get `default`.
* `pre_attempt` is always async — even for sync drivers (wrap your sync setup with `async def pre(): ...`).
---
### Execution Flow (Conceptual Diagram)
Below is a simplified view of what happens inside `execute()` during retries.
```text
┌──────────────────────────────────────────────────────────────┐
│ execute() lifecycle │
└──────────────────────────────────────────────────────────────┘
│
▼
[1] start execute()
│
│
▼
[2] initialize RetryPolicy
- max_retries, delay, jitter, etc.
- retry_on exception types
│
▼
[3] for each attempt (1..N):
│
├─► [3.1] pre_attempt()
│ (async setup hook)
│ e.g., apply_timeouts, reset state
│
├─► [3.2] attempt_scope / attempt_scope_async
│ (transaction or SAVEPOINT wrapper)
│
├─► [3.3] call op(*args, **kwargs)
│ (sync or async function)
│
├─► [3.4] if success → return result
│
├─► [3.5] if exception:
│ ├─ check type in retry_on
│ ├─ run classifier(exc)
│ ├─ if transient → sleep(backoff) → retry
│ └─ else → re-raise (or return default)
│
▼
[4] if all retries failed:
- return default (if raises=False)
- or raise last exception
```
**Key concepts:**
* `attempt_scope` isolates one DB operation (transaction or savepoint).
If the attempt fails, the scope rolls back and prepares for retry.
* `pre_attempt` runs before each try — perfect for **timeouts**, **instrumentation**, or **context tagging**.
* `RetryPolicy` determines how long to wait and how many times to retry.
---
### Design Philosophy
Database operations often need **fine-grained resilience** — but frameworks usually give you an all-or-nothing approach:
* Retry at the HTTP or ORM layer (too coarse).
* Manual retry loops around transactions (too error-prone).
* Connection poolers that retry implicitly (too opaque).
`dbop-core` exists to make retries **explicit, minimal, and driver-agnostic**.
It focuses on **one unit of work** — one statement, one transaction, one savepoint — and lets *you* decide:
* ✅ *When to retry* (`classifier`, `retry_on`)
* ✅ *How to retry* (`RetryPolicy`, exponential backoff + jitter)
* ✅ *Where to isolate* (`attempt_scope` / `attempt_scope_async`)
* ✅ *What to prepare* before each try (`pre_attempt` hook)
Everything else — connection pooling, ORM sessions, schema migration — stays out of scope.
This separation keeps `dbop-core` **composable**, **transparent**, and **safe** to embed anywhere in your stack — from raw DB-API connections to async SQLAlchemy sessions or FastAPI background tasks.
> In short: **`dbop-core` doesn’t manage your database. It helps you survive it.**
---
**Execution modes:**
| Driver Type | Scope used | Hook type | Example Adapter |
| ----------- | --------------------- | ------------------------ | ------------------------------------- |
| Sync | `attempt_scope` | `apply_timeouts_sync()` | DB-API, SQLAlchemy |
| Async | `attempt_scope_async` | `apply_timeouts_async()` | asyncpg, psycopg, aiomysql, aiosqlite |
---
## Contrib Adapters
| Adapter | Sync/Async | Backend | File |
| ------------------------- | ---------- | --------------------- | ------------------------------------- |
| DB-API (generic) | Sync | Postgres/MySQL/SQLite | `contrib/dbapi_adapter.py` |
| SQLAlchemy (Session) | Sync | Any | `contrib/sqlalchemy_adapter.py` |
| SQLAlchemy (AsyncSession) | Async | Any | `contrib/sqlalchemy_adapter_async.py` |
| psycopg 3 | Async | Postgres | `contrib/psycopg_adapter.py` |
| asyncpg | Async | Postgres | `contrib/asyncpg_adapter.py` |
| aiomysql | Async | MySQL/MariaDB | `contrib/aiomysql_adapter.py` |
| aiosqlite | Async | SQLite | `contrib/aiosqlite_adapter.py` |
---
### SQLAlchemy (Sync Example)
```python
import asyncio
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from dbop_core.core import execute, RetryPolicy
from dbop_core.contrib.sqlalchemy_adapter import attempt_scope_sync
engine = create_engine("sqlite+pysqlite:///:memory:")
Session = sessionmaker(bind=engine)
def setup(sess):
sess.execute(text("CREATE TABLE IF NOT EXISTS kv(k TEXT PRIMARY KEY, v TEXT)"))
def put(sess, k, v):
sess.execute(text("INSERT OR REPLACE INTO kv VALUES (:k,:v)"), {"k": k, "v": v})
def get(sess, k):
return sess.execute(text("SELECT v FROM kv WHERE k=:k"), {"k": k}).scalar()
async def main():
pol = RetryPolicy(max_retries=3, initial_delay=0.05, max_delay=0.2)
with Session() as sess:
with sess.begin(): setup(sess)
with sess.begin():
await execute(lambda: put(sess, "hello", "world"),
attempt_scope=lambda r=False: attempt_scope_sync(sess, read_only=r),
policy=pol)
with sess.begin():
val = await execute(lambda: get(sess, "hello"),
attempt_scope=lambda r=False: attempt_scope_sync(sess, read_only=r),
policy=pol, read_only=True)
print(val)
asyncio.run(main())
```
---
### psycopg (Postgres, Async)
```python
from functools import partial
from psycopg import AsyncConnection
from dbop_core.core import execute, RetryPolicy
from dbop_core.classify import dbapi_classifier
from dbop_core.contrib.psycopg_adapter import attempt_scope_async, apply_timeouts_async
DSN = "postgresql://postgres:postgres@localhost:5432/dbop"
async def pre(conn): # per-attempt setup
await apply_timeouts_async(conn, lock_timeout_s=3, stmt_timeout_s=10)
async def run():
async with AsyncConnection.connect(DSN) as conn:
pol = RetryPolicy(max_retries=5, initial_delay=0.05, max_delay=0.5)
await execute(
lambda: conn.execute("INSERT INTO items(name) VALUES ('gamma') ON CONFLICT DO NOTHING"),
classifier=dbapi_classifier,
attempt_scope_async=lambda r=False: attempt_scope_async(conn, read_only=r),
pre_attempt=partial(pre, conn),
policy=pol,
)
count = await execute(
lambda: conn.execute("SELECT COUNT(*) FROM items"),
classifier=dbapi_classifier,
attempt_scope_async=lambda r=False: attempt_scope_async(conn, read_only=r),
pre_attempt=partial(pre, conn),
policy=pol, read_only=True,
)
print("count:", count)
```
---
### Generic DB-API (Sync, e.g. SQLite)
```python
import asyncio, sqlite3
from dbop_core.core import execute, RetryPolicy
from dbop_core.contrib.dbapi_adapter import attempt_scope_sync, apply_timeouts_sync
conn = sqlite3.connect(":memory:")
def create(): conn.execute("CREATE TABLE IF NOT EXISTS t(x INT)")
def insert(): conn.execute("INSERT INTO t(x) VALUES (1)")
def count(): return conn.execute("SELECT COUNT(*) FROM t").fetchone()[0]
async def pre():
apply_timeouts_sync(conn, backend="sqlite", lock_timeout_s=3)
async def main():
create()
pol = RetryPolicy(max_retries=2, initial_delay=0.05, max_delay=0.2)
await execute(lambda: insert(),
attempt_scope=lambda r=False: attempt_scope_sync(conn, read_only=r, backend="sqlite"),
pre_attempt=pre, policy=pol)
n = await execute(lambda: count(),
attempt_scope=lambda r=False: attempt_scope_sync(conn, read_only=True, backend="sqlite"),
pre_attempt=pre, policy=pol, read_only=True)
print("rows:", n)
asyncio.run(main())
```
---
## Timeout Mapping (per attempt)
| Backend | Mechanism |
| ----------------- | -------------------------------------------------------------- |
| **PostgreSQL** | `SET LOCAL lock_timeout`, `SET LOCAL statement_timeout` |
| **MySQL/MariaDB** | `innodb_lock_wait_timeout`, `MAX_EXECUTION_TIME` (best-effort) |
| **SQLite** | `PRAGMA busy_timeout` (connection-level) |
Use your adapter’s `apply_timeouts_*` in `pre_attempt()`.
---
## Transient Classification
`dbapi_classifier` detects common transient patterns:
| Backend | Typical Transient Codes / Messages |
| ------------- | ------------------------------------------------ |
| Postgres | `40P01` (deadlock), `55P03` (lock not available) |
| MySQL/MariaDB | `1213`, `1205`, connection lost |
| SQLite | `database is locked` |
| Generic | Operational/timeouts from DB-API |
You can always plug in your own classifier:
`classifier(exc) -> bool`.
---
## Examples
```bash
cd examples
cp .env.sample .env # configure DSNs
# SQLite (local)
make install-sqlite-local && make run-sqlite
# Postgres (Docker)
make pg-up && make install-psycopg-local && make run-psycopg
make install-asyncpg-local && make run-asyncpg
make pg-down
# MySQL (Docker)
make mysql-up && make install-mysql-local && make run-mysql
make mysql-down
```
---
## Roadmap
* OTLP tracing (spans around retries)
* Instrumentation hooks (OpenTelemetry / Prometheus)
* More contrib adapters (`databases`, `gino`, etc.)
* Extended cookbook examples
---
## Changelog
See [CHANGELOG.md](CHANGELOG.md)
---
## License
MIT
---
## Author
**Youssef Khaya**
[LinkedIn](https://www.linkedin.com/in/youssef-khaya-88a1a128)
[GitHub](https://github.com/yokha/dbop-core)
---
### Optional badges for later
Once you publish to PyPI and GitHub Actions:
```markdown
[](https://pypi.org/project/dbop-core/)
[](https://github.com/yokha/dbop-core/actions)
[](https://codecov.io/gh/yokha/dbop-core)
```
---
Raw data
{
"_id": null,
"home_page": null,
"name": "dbop-core",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": "retry, backoff, database, resilience, asyncio",
"author": "Youssef Khaya",
"author_email": null,
"download_url": "https://files.pythonhosted.org/packages/33/da/1fb9ef68ff5c1f215e9908a0ed0ac2f791a93b6863baffd2f047eed484a2/dbop_core-1.0.0.tar.gz",
"platform": null,
"description": "# dbop-core\n\n**DB-agnostic retry runner** for Python database operations.\nYou bring the driver or ORM \u2014 `dbop-core` gives you:\n\n* **Retries with backoff and jitter**\n* **Attempt scopes** (transaction / SAVEPOINT wrappers)\n* **Per-attempt hooks** (e.g. set timeouts, apply metadata)\n* **Transient error classification**\n\nLightweight and composable \u2014 the core doesn\u2019t know your driver.\nAdapters live under `contrib/` (SQLAlchemy, psycopg, asyncpg, aiomysql, aiosqlite, and generic DB-API).\n\n---\n\n## When to Use\n\nUse `dbop-core` when you need **resilience for a single logical DB step**:\n\n* Occasional **deadlocks** or **lock wait timeouts**\n* **Slow statements** that risk blocking your pool\n* **SAVEPOINT-style retries** inside an outer transaction\n* **Per-attempt timeouts** without rewriting your app logic\n\nIt\u2019s not a migration tool or pooler \u2014 just a precise **execution runner** for safe retries.\n\n---\n\n## Features\n\n* \u2705 **Async-first core API**, works with sync *and* async drivers\n* \ud83d\udd01 **Retry policy**: max retries, exponential backoff, jitter, caps\n* \ud83e\udde9 **Attempt scopes**: pluggable context managers (transaction/savepoint)\n* \u2699\ufe0f **Per-attempt hooks**: run custom setup (timeouts, instrumentation)\n* \ud83e\udde0 **Transient classifier**: decide whether an exception should retry\n\n---\n\n## Installation\n\n```bash\npip install dbop-core\n\n# optional extras for contrib adapters\npip install \"dbop-core[sqlalchemy]\"\npip install \"dbop-core[psycopg]\"\npip install \"dbop-core[asyncpg]\"\npip install \"dbop-core[aiomysql]\"\npip install \"dbop-core[aiosqlite]\"\n```\n\n**Compatibility:** Python 3.9 \u2013 3.13\n\n---\n\n## Quickstart\n\n```python\nfrom dbop_core.core import execute, RetryPolicy\n\nasync def op(x):\n return x * 2\n\nresult = await execute(op, args=(21,), policy=RetryPolicy())\nassert result == 42\n```\n\n---\n\n## Core API (Essentials)\n\n```python\nawait execute(\n op, # callable: sync or async\n args=(), kwargs=None,\n retry_on=(Exception,), # types to retry\n classifier=None, # fn(exc) -> bool; True = retry\n raises=True, # if False, return default on final failure\n default=None,\n policy=RetryPolicy(), # backoff/jitter settings\n attempt_scope=None, # sync AttemptScope\n attempt_scope_async=None, # async AttemptScope\n pre_attempt=None, # async setup hook\n read_only=False, # passed to scopes\n overall_timeout_s=None, # per-attempt timeout\n)\n```\n\n**Semantics**\n\n* Only exceptions in `retry_on` are candidates for retry.\n* If `classifier` is provided, it takes precedence per exception (`True` \u2192 retry, `False` \u2192 stop).\n* `overall_timeout_s` cancels the attempt; if `raises=False`, you get `default`.\n* `pre_attempt` is always async \u2014 even for sync drivers (wrap your sync setup with `async def pre(): ...`).\n---\n### Execution Flow (Conceptual Diagram)\n\nBelow is a simplified view of what happens inside `execute()` during retries.\n\n```text\n \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510\n \u2502 execute() lifecycle \u2502\n \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518\n \u2502\n \u25bc\n [1] start execute()\n \u2502\n \u2502\n \u25bc\n [2] initialize RetryPolicy\n - max_retries, delay, jitter, etc.\n - retry_on exception types\n \u2502\n \u25bc\n [3] for each attempt (1..N):\n \u2502\n \u251c\u2500\u25ba [3.1] pre_attempt()\n \u2502 (async setup hook)\n \u2502 e.g., apply_timeouts, reset state\n \u2502\n \u251c\u2500\u25ba [3.2] attempt_scope / attempt_scope_async\n \u2502 (transaction or SAVEPOINT wrapper)\n \u2502\n \u251c\u2500\u25ba [3.3] call op(*args, **kwargs)\n \u2502 (sync or async function)\n \u2502\n \u251c\u2500\u25ba [3.4] if success \u2192 return result\n \u2502\n \u251c\u2500\u25ba [3.5] if exception:\n \u2502 \u251c\u2500 check type in retry_on\n \u2502 \u251c\u2500 run classifier(exc)\n \u2502 \u251c\u2500 if transient \u2192 sleep(backoff) \u2192 retry\n \u2502 \u2514\u2500 else \u2192 re-raise (or return default)\n \u2502\n \u25bc\n [4] if all retries failed:\n - return default (if raises=False)\n - or raise last exception\n```\n\n**Key concepts:**\n\n* `attempt_scope` isolates one DB operation (transaction or savepoint).\n If the attempt fails, the scope rolls back and prepares for retry.\n* `pre_attempt` runs before each try \u2014 perfect for **timeouts**, **instrumentation**, or **context tagging**.\n* `RetryPolicy` determines how long to wait and how many times to retry.\n\n---\n\n### Design Philosophy\n\nDatabase operations often need **fine-grained resilience** \u2014 but frameworks usually give you an all-or-nothing approach:\n\n* Retry at the HTTP or ORM layer (too coarse).\n* Manual retry loops around transactions (too error-prone).\n* Connection poolers that retry implicitly (too opaque).\n\n`dbop-core` exists to make retries **explicit, minimal, and driver-agnostic**.\nIt focuses on **one unit of work** \u2014 one statement, one transaction, one savepoint \u2014 and lets *you* decide:\n\n* \u2705 *When to retry* (`classifier`, `retry_on`)\n* \u2705 *How to retry* (`RetryPolicy`, exponential backoff + jitter)\n* \u2705 *Where to isolate* (`attempt_scope` / `attempt_scope_async`)\n* \u2705 *What to prepare* before each try (`pre_attempt` hook)\n\nEverything else \u2014 connection pooling, ORM sessions, schema migration \u2014 stays out of scope.\nThis separation keeps `dbop-core` **composable**, **transparent**, and **safe** to embed anywhere in your stack \u2014 from raw DB-API connections to async SQLAlchemy sessions or FastAPI background tasks.\n\n> In short: **`dbop-core` doesn\u2019t manage your database. It helps you survive it.**\n\n---\n\n**Execution modes:**\n\n| Driver Type | Scope used | Hook type | Example Adapter |\n| ----------- | --------------------- | ------------------------ | ------------------------------------- |\n| Sync | `attempt_scope` | `apply_timeouts_sync()` | DB-API, SQLAlchemy |\n| Async | `attempt_scope_async` | `apply_timeouts_async()` | asyncpg, psycopg, aiomysql, aiosqlite |\n\n---\n\n## Contrib Adapters\n\n| Adapter | Sync/Async | Backend | File |\n| ------------------------- | ---------- | --------------------- | ------------------------------------- |\n| DB-API (generic) | Sync | Postgres/MySQL/SQLite | `contrib/dbapi_adapter.py` |\n| SQLAlchemy (Session) | Sync | Any | `contrib/sqlalchemy_adapter.py` |\n| SQLAlchemy (AsyncSession) | Async | Any | `contrib/sqlalchemy_adapter_async.py` |\n| psycopg 3 | Async | Postgres | `contrib/psycopg_adapter.py` |\n| asyncpg | Async | Postgres | `contrib/asyncpg_adapter.py` |\n| aiomysql | Async | MySQL/MariaDB | `contrib/aiomysql_adapter.py` |\n| aiosqlite | Async | SQLite | `contrib/aiosqlite_adapter.py` |\n\n---\n\n### SQLAlchemy (Sync Example)\n\n```python\nimport asyncio\nfrom sqlalchemy import create_engine, text\nfrom sqlalchemy.orm import sessionmaker\nfrom dbop_core.core import execute, RetryPolicy\nfrom dbop_core.contrib.sqlalchemy_adapter import attempt_scope_sync\n\nengine = create_engine(\"sqlite+pysqlite:///:memory:\")\nSession = sessionmaker(bind=engine)\n\ndef setup(sess):\n sess.execute(text(\"CREATE TABLE IF NOT EXISTS kv(k TEXT PRIMARY KEY, v TEXT)\"))\ndef put(sess, k, v):\n sess.execute(text(\"INSERT OR REPLACE INTO kv VALUES (:k,:v)\"), {\"k\": k, \"v\": v})\ndef get(sess, k):\n return sess.execute(text(\"SELECT v FROM kv WHERE k=:k\"), {\"k\": k}).scalar()\n\nasync def main():\n pol = RetryPolicy(max_retries=3, initial_delay=0.05, max_delay=0.2)\n with Session() as sess:\n with sess.begin(): setup(sess)\n\n with sess.begin():\n await execute(lambda: put(sess, \"hello\", \"world\"),\n attempt_scope=lambda r=False: attempt_scope_sync(sess, read_only=r),\n policy=pol)\n\n with sess.begin():\n val = await execute(lambda: get(sess, \"hello\"),\n attempt_scope=lambda r=False: attempt_scope_sync(sess, read_only=r),\n policy=pol, read_only=True)\n print(val)\n\nasyncio.run(main())\n```\n\n---\n\n### psycopg (Postgres, Async)\n\n```python\nfrom functools import partial\nfrom psycopg import AsyncConnection\nfrom dbop_core.core import execute, RetryPolicy\nfrom dbop_core.classify import dbapi_classifier\nfrom dbop_core.contrib.psycopg_adapter import attempt_scope_async, apply_timeouts_async\n\nDSN = \"postgresql://postgres:postgres@localhost:5432/dbop\"\n\nasync def pre(conn): # per-attempt setup\n await apply_timeouts_async(conn, lock_timeout_s=3, stmt_timeout_s=10)\n\nasync def run():\n async with AsyncConnection.connect(DSN) as conn:\n pol = RetryPolicy(max_retries=5, initial_delay=0.05, max_delay=0.5)\n\n await execute(\n lambda: conn.execute(\"INSERT INTO items(name) VALUES ('gamma') ON CONFLICT DO NOTHING\"),\n classifier=dbapi_classifier,\n attempt_scope_async=lambda r=False: attempt_scope_async(conn, read_only=r),\n pre_attempt=partial(pre, conn),\n policy=pol,\n )\n\n count = await execute(\n lambda: conn.execute(\"SELECT COUNT(*) FROM items\"),\n classifier=dbapi_classifier,\n attempt_scope_async=lambda r=False: attempt_scope_async(conn, read_only=r),\n pre_attempt=partial(pre, conn),\n policy=pol, read_only=True,\n )\n print(\"count:\", count)\n```\n\n---\n\n### Generic DB-API (Sync, e.g. SQLite)\n\n```python\nimport asyncio, sqlite3\nfrom dbop_core.core import execute, RetryPolicy\nfrom dbop_core.contrib.dbapi_adapter import attempt_scope_sync, apply_timeouts_sync\n\nconn = sqlite3.connect(\":memory:\")\n\ndef create(): conn.execute(\"CREATE TABLE IF NOT EXISTS t(x INT)\")\ndef insert(): conn.execute(\"INSERT INTO t(x) VALUES (1)\")\ndef count(): return conn.execute(\"SELECT COUNT(*) FROM t\").fetchone()[0]\n\nasync def pre():\n apply_timeouts_sync(conn, backend=\"sqlite\", lock_timeout_s=3)\n\nasync def main():\n create()\n pol = RetryPolicy(max_retries=2, initial_delay=0.05, max_delay=0.2)\n\n await execute(lambda: insert(),\n attempt_scope=lambda r=False: attempt_scope_sync(conn, read_only=r, backend=\"sqlite\"),\n pre_attempt=pre, policy=pol)\n\n n = await execute(lambda: count(),\n attempt_scope=lambda r=False: attempt_scope_sync(conn, read_only=True, backend=\"sqlite\"),\n pre_attempt=pre, policy=pol, read_only=True)\n print(\"rows:\", n)\n\nasyncio.run(main())\n```\n\n---\n\n## Timeout Mapping (per attempt)\n\n| Backend | Mechanism |\n| ----------------- | -------------------------------------------------------------- |\n| **PostgreSQL** | `SET LOCAL lock_timeout`, `SET LOCAL statement_timeout` |\n| **MySQL/MariaDB** | `innodb_lock_wait_timeout`, `MAX_EXECUTION_TIME` (best-effort) |\n| **SQLite** | `PRAGMA busy_timeout` (connection-level) |\n\nUse your adapter\u2019s `apply_timeouts_*` in `pre_attempt()`.\n\n---\n\n## Transient Classification\n\n`dbapi_classifier` detects common transient patterns:\n\n| Backend | Typical Transient Codes / Messages |\n| ------------- | ------------------------------------------------ |\n| Postgres | `40P01` (deadlock), `55P03` (lock not available) |\n| MySQL/MariaDB | `1213`, `1205`, connection lost |\n| SQLite | `database is locked` |\n| Generic | Operational/timeouts from DB-API |\n\nYou can always plug in your own classifier:\n`classifier(exc) -> bool`.\n\n---\n\n## Examples\n\n```bash\ncd examples\ncp .env.sample .env # configure DSNs\n\n# SQLite (local)\nmake install-sqlite-local && make run-sqlite\n\n# Postgres (Docker)\nmake pg-up && make install-psycopg-local && make run-psycopg\nmake install-asyncpg-local && make run-asyncpg\nmake pg-down\n\n# MySQL (Docker)\nmake mysql-up && make install-mysql-local && make run-mysql\nmake mysql-down\n```\n\n---\n\n## Roadmap\n\n* OTLP tracing (spans around retries)\n* Instrumentation hooks (OpenTelemetry / Prometheus)\n* More contrib adapters (`databases`, `gino`, etc.)\n* Extended cookbook examples\n\n---\n\n## Changelog\n\nSee [CHANGELOG.md](CHANGELOG.md)\n\n---\n\n## License\n\nMIT\n\n---\n\n## Author\n\n**Youssef Khaya**\n[LinkedIn](https://www.linkedin.com/in/youssef-khaya-88a1a128)\n[GitHub](https://github.com/yokha/dbop-core)\n\n---\n\n### Optional badges for later\n\nOnce you publish to PyPI and GitHub Actions:\n\n```markdown\n[](https://pypi.org/project/dbop-core/)\n[](https://github.com/yokha/dbop-core/actions)\n[](https://codecov.io/gh/yokha/dbop-core)\n```\n\n---\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "DB-agnostic operation runner with retries/backoff and pluggable scopes/classifiers",
"version": "1.0.0",
"project_urls": null,
"split_keywords": [
"retry",
" backoff",
" database",
" resilience",
" asyncio"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "f8124c5ebef532b0ec5e949ded73fa205471b2da3ed6f47cb4845b47aca13b50",
"md5": "03b284cd3c4beda0b39444cda1b68b10",
"sha256": "eb9cfae051630ecfc40405c6fc2395f42d5bb10b82c488f02d3bd9df1df6c242"
},
"downloads": -1,
"filename": "dbop_core-1.0.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "03b284cd3c4beda0b39444cda1b68b10",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 16308,
"upload_time": "2025-10-19T16:33:38",
"upload_time_iso_8601": "2025-10-19T16:33:38.227662Z",
"url": "https://files.pythonhosted.org/packages/f8/12/4c5ebef532b0ec5e949ded73fa205471b2da3ed6f47cb4845b47aca13b50/dbop_core-1.0.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "33da1fb9ef68ff5c1f215e9908a0ed0ac2f791a93b6863baffd2f047eed484a2",
"md5": "469980d30b2f229f029d2c2f8cbfb5ec",
"sha256": "1668f859ead66382775bc3f3feaa7def3b22981ec6aa396302b63792d0901834"
},
"downloads": -1,
"filename": "dbop_core-1.0.0.tar.gz",
"has_sig": false,
"md5_digest": "469980d30b2f229f029d2c2f8cbfb5ec",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 21653,
"upload_time": "2025-10-19T16:33:40",
"upload_time_iso_8601": "2025-10-19T16:33:40.040344Z",
"url": "https://files.pythonhosted.org/packages/33/da/1fb9ef68ff5c1f215e9908a0ed0ac2f791a93b6863baffd2f047eed484a2/dbop_core-1.0.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-10-19 16:33:40",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "dbop-core"
}