# Psycopg Toolkit
[](https://github.com/descoped/psycopg-toolkit/actions/workflows/build-test-native.yml)
[](https://codecov.io/gh/descoped/psycopg-toolkit)
[](https://www.python.org/downloads/)
[](https://opensource.org/licenses/MIT)
[](https://github.com/descoped/psycopg-toolkit/releases)
A robust PostgreSQL database toolkit providing enterprise-grade connection pooling and database management capabilities for Python applications.
## Features
- Async-first design with connection pooling via `psycopg-pool`
- Comprehensive transaction management with savepoint support
- Type-safe repository pattern with Pydantic model validation
- **JSONB support with automatic field detection and psycopg JSON adapters**
- **Native pgvector support with automatic vector field detection**
- PostgreSQL array field preservation (TEXT[], INTEGER[])
- Automatic date/timestamp conversion for Pydantic models
- SQL query builder with SQL injection protection
- Database schema and test data lifecycle management
- Automatic retry mechanism with exponential backoff
- Granular exception hierarchy for error handling
- Connection health monitoring and validation
- Database initialization callback system
- Statement timeout configuration
- Fully typed with modern Python type hints
## Installation
```bash
pip install psycopg-toolkit
```
## Quick Start
```python
from psycopg_toolkit import Database, DatabaseSettings
from uuid import uuid4
# Configure database
settings = DatabaseSettings(
host="localhost",
port=5432,
dbname="your_database",
user="your_user",
password="your_password"
)
async def main():
# Initialize database
db = Database(settings)
await db.init_db()
# Get transaction manager
tm = await db.get_transaction_manager()
# Execute in transaction
async with tm.transaction() as conn:
async with conn.cursor() as cur:
user_id = uuid4()
await cur.execute(
"INSERT INTO users (id, email) VALUES (%s, %s)",
(user_id, "user@example.com")
)
# Clean up
await db.cleanup()
```
## Core Components
### Database Management
```python
# Health check
is_healthy = await db.check_pool_health()
# Connection management
async with db.connection() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT version()")
```
### Transaction Management
```python
# Basic transaction
async with tm.transaction() as conn:
# Operations automatically rolled back on error
pass
# With savepoint
async with tm.transaction(savepoint="user_creation") as conn:
# Nested transaction using savepoint
pass
```
### Repository Pattern
```python
from pydantic import BaseModel
from psycopg_toolkit import BaseRepository
class User(BaseModel):
id: UUID
email: str
class UserRepository(BaseRepository[User]):
def __init__(self, conn: AsyncConnection):
super().__init__(
db_connection=conn,
table_name="users",
model_class=User,
primary_key="id"
)
# Usage
async with tm.transaction() as conn:
repo = UserRepository(conn)
user = await repo.get_by_id(user_id)
```
### JSONB Support
```python
from typing import Dict, List, Any
from pydantic import BaseModel
from psycopg_toolkit import BaseRepository
class UserProfile(BaseModel):
id: int
name: str
# These fields are automatically detected as JSONB
metadata: Dict[str, Any]
preferences: Dict[str, str]
tags: List[str]
class UserRepository(BaseRepository[UserProfile, int]):
def __init__(self, conn):
super().__init__(
db_connection=conn,
table_name="user_profiles",
model_class=UserProfile,
primary_key="id"
# auto_detect_json=True by default
)
# Usage - JSON fields handled automatically
user = UserProfile(
id=1,
name="John Doe",
metadata={"created_at": "2024-01-01", "source": "web"},
preferences={"theme": "dark", "language": "en"},
tags=["premium", "beta_tester"]
)
# JSONB fields automatically serialized/deserialized
created_user = await repo.create(user)
retrieved_user = await repo.get_by_id(1)
```
### PostgreSQL Arrays and Date Fields
```python
from typing import List
from datetime import date
from pydantic import BaseModel
from psycopg_toolkit import BaseRepository
class User(BaseModel):
id: UUID
username: str
roles: List[str] # PostgreSQL TEXT[] array
permissions: List[str] # PostgreSQL TEXT[] array
metadata: Dict[str, Any] # JSONB field
birthdate: str # ISO date string (from DATE)
created_at: str # ISO datetime string (from TIMESTAMP)
updated_at: str # ISO datetime string (from TIMESTAMPTZ)
last_login: str | None # Optional timestamp field
class UserRepository(BaseRepository[User, UUID]):
def __init__(self, conn):
super().__init__(
db_connection=conn,
table_name="users",
model_class=User,
primary_key="id",
# Preserve PostgreSQL arrays instead of JSONB
array_fields={"roles", "permissions"},
# Auto-convert ALL date/timestamp fields to/from strings
date_fields={"birthdate", "created_at", "updated_at", "last_login"}
)
# PostgreSQL arrays are preserved, dates are auto-converted
user = User(
id=uuid4(),
username="john",
roles=["admin", "user"], # Stored as TEXT[]
permissions=["read", "write"], # Stored as TEXT[]
metadata={"dept": "IT"}, # Stored as JSONB
birthdate="1990-01-01", # Converts to/from PostgreSQL DATE
created_at="2024-01-01T12:00:00", # Converts to/from TIMESTAMP
updated_at="2024-01-01T12:00:00", # Converts to/from TIMESTAMPTZ
last_login=None # Nullable timestamp field
)
```
### Schema Management
```python
from psycopg_toolkit.core.transaction import SchemaManager
class UserSchemaManager(SchemaManager[None]):
async def create_schema(self, conn: AsyncConnection) -> None:
await conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id UUID PRIMARY KEY,
email TEXT UNIQUE NOT NULL
)
""")
async def drop_schema(self, conn: AsyncConnection) -> None:
await conn.execute("DROP TABLE IF EXISTS users")
# Usage
async with tm.with_schema(UserSchemaManager()) as _:
# Schema available here
pass # Automatically dropped after
```
## Error Handling
```python
from psycopg_toolkit import (
DatabaseConnectionError,
DatabasePoolError,
DatabaseNotAvailable,
RecordNotFoundError
)
try:
async with tm.transaction() as conn:
repo = UserRepository(conn)
user = await repo.get_by_id(user_id)
except DatabaseConnectionError as e:
print(f"Connection error: {e.original_error}")
except RecordNotFoundError:
print(f"User {user_id} not found")
```
## Testing with Async Libraries
### Understanding the pytest and Async Boundary
When testing async libraries like `async-task-worker` or database libraries (e.g., `psycopg-toolkit`), you'll encounter a fundamental challenge: pytest's session-scoped fixtures are synchronous, while modern libraries are increasingly async. This section explains the problem and provides the recommended patterns for handling this boundary.
#### The Core Problem
pytest's fixture system was designed before async became prevalent in Python:
- **Session fixtures are synchronous**: They run once per test session and must be sync functions
- **Modern libraries are async**: Database pools, task workers, and connections require `await`
- **Resource efficiency matters**: You want to reuse expensive resources (containers, pools) across tests
This creates an awkward boundary where you need async resources in sync fixture setup.
#### The Standard Solution: `asyncio.run()`
The accepted pattern in the Python testing community is using `asyncio.run()` in session fixtures. While it may feel inelegant, this is the recommended approach documented in pytest-asyncio's own examples.
```python
import pytest
import asyncio
from async_task_worker import AsyncTaskWorker
@pytest.fixture(scope="session")
def worker():
"""Session-scoped worker for test efficiency.
This uses asyncio.run() to bridge the sync/async boundary.
This is the standard pattern for async resources in session fixtures.
"""
# Create the worker
worker = AsyncTaskWorker(max_workers=5)
# Start it using asyncio.run() - the standard workaround
asyncio.run(worker.start())
yield worker
# Cleanup
asyncio.run(worker.stop())
# Your async tests can then use the worker normally
@pytest.mark.asyncio
async def test_task_execution(worker):
task_id = await worker.add_task(my_task, data)
result = await worker.get_task_future(task_id)
assert result == expected
```
### Complete Testing Pattern
Here's a comprehensive example showing best practices for testing async-task-worker:
```python
import pytest
import pytest_asyncio
import asyncio
from datetime import datetime
from async_task_worker import AsyncTaskWorker, task, TaskStatus
# ============================================================
# Session Fixtures (Synchronous with asyncio.run())
# ============================================================
@pytest.fixture(scope="session")
def event_loop_policy():
"""Use a specific event loop policy for tests."""
return asyncio.get_event_loop_policy()
@pytest.fixture(scope="session")
def worker():
"""Create a session-scoped worker instance.
Note: We use asyncio.run() here because session fixtures
must be synchronous. This is the standard pattern.
"""
worker = AsyncTaskWorker(
max_workers=3,
task_timeout=10,
cache_enabled=True,
cache_ttl=60
)
# Start the worker using asyncio.run()
asyncio.run(worker.start())
yield worker
# Cleanup
asyncio.run(worker.stop())
# ============================================================
# Function-Scoped Async Fixtures (Can use async/await)
# ============================================================
@pytest_asyncio.fixture
async def clean_worker(worker):
"""Provides a clean worker state for each test."""
# Clear any existing tasks
tasks = worker.get_all_tasks()
for task in tasks:
if task.status in ("pending", "running"):
await worker.cancel_task(task.id)
# Clear cache if enabled
if worker.cache:
await worker.clear_cache()
yield worker
# Post-test cleanup if needed
# ...
# ============================================================
# Test Task Definitions
# ============================================================
@task("test_computation")
async def test_computation(value, delay=0.1, progress_callback=None):
"""A test task that simulates computation."""
steps = 5
result = value
for i in range(steps):
await asyncio.sleep(delay)
result = result * 2
if progress_callback:
progress_callback((i + 1) / steps)
return result
@task("test_failing_task")
async def test_failing_task(should_fail=True):
"""A task that can be configured to fail."""
if should_fail:
raise ValueError("Task failed as expected")
return "success"
# ============================================================
# Test Cases
# ============================================================
@pytest.mark.asyncio
async def test_basic_task_execution(clean_worker):
"""Test basic task execution flow."""
# Add a task
task_id = await clean_worker.add_task(
test_computation,
value=2,
delay=0.01
)
# Wait for completion
result = await clean_worker.get_task_future(task_id)
# Verify result (2 * 2^5 = 64)
assert result == 64
# Check task info
info = clean_worker.get_task_info(task_id)
assert info.status == TaskStatus.COMPLETED
assert info.progress == 1.0
@pytest.mark.asyncio
async def test_task_cancellation(clean_worker):
"""Test task cancellation."""
# Start a long-running task
task_id = await clean_worker.add_task(
test_computation,
value=1,
delay=1.0 # Long delay
)
# Cancel it
cancelled = await clean_worker.cancel_task(task_id)
assert cancelled
# Verify status
info = clean_worker.get_task_info(task_id)
assert info.status == TaskStatus.CANCELLED
@pytest.mark.asyncio
async def test_cache_functionality(clean_worker):
"""Test caching of task results."""
# First execution
task_id1 = await clean_worker.add_task(
test_computation,
value=3,
delay=0.01,
use_cache=True
)
result1 = await clean_worker.get_task_future(task_id1)
# Second execution with same args (should hit cache)
start_time = datetime.now()
task_id2 = await clean_worker.add_task(
test_computation,
value=3,
delay=0.01, # This delay won't happen due to cache
use_cache=True
)
result2 = await clean_worker.get_task_future(task_id2)
elapsed = (datetime.now() - start_time).total_seconds()
# Results should match
assert result1 == result2
# Second execution should be much faster (cache hit)
assert elapsed < 0.05 # Much less than the task delay
@pytest.mark.asyncio
async def test_concurrent_tasks(clean_worker):
"""Test concurrent task execution."""
# Add multiple tasks
task_ids = []
for i in range(5):
task_id = await clean_worker.add_task(
test_computation,
value=i,
delay=0.01
)
task_ids.append(task_id)
# Wait for all to complete
futures = clean_worker.get_task_futures(task_ids)
results = await asyncio.gather(*futures)
# Verify all completed
assert len(results) == 5
for i, result in enumerate(results):
assert result == i * (2 ** 5)
# ============================================================
# Testing Utilities
# ============================================================
class TaskMonitor:
"""Utility class for monitoring task execution in tests."""
def __init__(self, worker: AsyncTaskWorker):
self.worker = worker
self.events = []
async def monitor_task(self, task_id: str, timeout: float = 5.0):
"""Monitor a task until completion or timeout."""
start_time = asyncio.get_event_loop().time()
while asyncio.get_event_loop().time() - start_time < timeout:
info = self.worker.get_task_info(task_id)
self.events.append({
"time": asyncio.get_event_loop().time() - start_time,
"status": info.status,
"progress": info.progress
})
if info.status not in ("pending", "running"):
return info
await asyncio.sleep(0.1)
raise TimeoutError(f"Task {task_id} did not complete within {timeout}s")
@pytest.mark.asyncio
async def test_with_monitor(clean_worker):
"""Example of using the TaskMonitor utility."""
monitor = TaskMonitor(clean_worker)
task_id = await clean_worker.add_task(
test_computation,
value=5,
delay=0.1
)
# Monitor the task
final_info = await monitor.monitor_task(task_id)
# Check the monitoring captured progress
assert len(monitor.events) > 0
assert final_info.status == TaskStatus.COMPLETED
# Verify progress increased over time
progresses = [e["progress"] for e in monitor.events]
assert progresses[-1] == 1.0
```
### Testing with Database Libraries (e.g., psycopg-toolkit)
The same pattern applies when testing with async database libraries:
```python
import pytest
import asyncio
from psycopg_toolkit import Database
@pytest.fixture(scope="session")
def db():
"""Session-scoped database connection.
Uses asyncio.run() to handle async operations in sync fixture.
This is the standard pattern for pytest with async resources.
"""
db = Database("postgresql://user:pass@localhost/test")
# Create the pool synchronously using asyncio.run()
asyncio.run(db.get_pool())
yield db
# Cleanup
asyncio.run(db.cleanup())
@pytest.fixture(scope="function")
async def db_connection(db):
"""Function-scoped connection from the session pool."""
async with db.connection() as conn:
yield conn
# Automatic cleanup via context manager
```
### Key Takeaways
1. **The `asyncio.run()` pattern is correct**: Don't feel bad about using it in session fixtures - it's the standard solution
2. **Session vs Function Scope**: Use session fixtures for expensive resources (pools, workers) and function fixtures for cleanup
3. **The awkwardness is pytest's limitation, not your design**: Your async library doesn't need sync wrappers
4. **Document the pattern**: Help users understand why `asyncio.run()` appears in test fixtures
### Additional Testing Resources
- **pytest-asyncio**: The standard pytest plugin for async testing
- **pytest-timeout**: Useful for preventing hanging async tests
- **asyncio-test**: Utilities for testing asyncio code
- **Testing async generators**: Use `async for` in tests or `aioitertools` for utilities
### Common Pitfalls and Solutions
#### Pitfall 1: Event Loop Conflicts
```python
# Wrong - Creates new event loop
def test_something():
loop = asyncio.new_event_loop()
loop.run_until_complete(my_async_func())
# Right - Use pytest-asyncio
@pytest.mark.asyncio
async def test_something():
await my_async_func()
```
#### Pitfall 2: Forgetting Cleanup
```python
# Wrong - No cleanup
@pytest.fixture
def worker():
w = AsyncTaskWorker()
asyncio.run(w.start())
return w
# Right - Proper cleanup
@pytest.fixture
def worker():
w = AsyncTaskWorker()
asyncio.run(w.start())
yield w
asyncio.run(w.stop())
```
#### Pitfall 3: Mixing Sync and Async Incorrectly
```python
# Wrong - Can't await in sync function
def test_task(worker):
result = await worker.add_task(...) # SyntaxError
# Right - Mark as async test
@pytest.mark.asyncio
async def test_task(worker):
result = await worker.add_task(...)
```
By following these patterns, you can effectively test async-task-worker and similar async libraries while maintaining clean, efficient test suites.
## Documentation
- [Database Management](docs/database.md)
- [Transaction Management](docs/transaction_manager.md)
- [Base Repository](docs/base_repository.md)
- [JSONB Support](docs/jsonb_support.md)
- [PsycopgHelper](docs/psycopg_helper.md)
## Running Tests
```bash
# Install dependencies
uv sync --all-groups
# Run all tests except performance tests (default)
uv run pytest
# Run only performance tests
uv run pytest -m performance
# Run all tests including performance
uv run pytest -m ""
# Run specific test categories
uv run pytest tests/unit/ # Only unit tests
uv run pytest -m performance # Only performance tests
# Run with coverage
uv run pytest --cov=src/psycopg_toolkit --cov-report=html
```
### Test Categories
The test suite is organized into three categories:
- **Unit tests**: Fast, isolated tests that don't require a database (in `tests/unit/`)
- **Integration tests**: Tests that require a real PostgreSQL database (in `tests/` root)
- **Performance tests**: Benchmarks and performance measurements (marked with `@pytest.mark.performance`)
Performance tests are excluded by default to keep the regular test runs fast. Use the `-m performance` flag to run them explicitly.
## Contributing
1. Fork the repository
2. Create a feature branch
3. Add tests for new features
4. Ensure all tests pass
5. Submit a pull request
## License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
Raw data
{
"_id": null,
"home_page": null,
"name": "psycopg-toolkit",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.11",
"maintainer_email": null,
"keywords": "async, database, jsonb, pgvector, pool, postgresql, psycopg, repository-pattern",
"author": null,
"author_email": "Ove Ranheim <oranheim@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/94/fb/cad3bdc43fde712cb3258554a100d62598cb1a9681205e150b4b16378706/psycopg_toolkit-0.3.1.tar.gz",
"platform": null,
"description": "# Psycopg Toolkit\n\n[](https://github.com/descoped/psycopg-toolkit/actions/workflows/build-test-native.yml)\n[](https://codecov.io/gh/descoped/psycopg-toolkit)\n[](https://www.python.org/downloads/)\n[](https://opensource.org/licenses/MIT)\n[](https://github.com/descoped/psycopg-toolkit/releases)\n\nA robust PostgreSQL database toolkit providing enterprise-grade connection pooling and database management capabilities for Python applications.\n\n## Features\n\n- Async-first design with connection pooling via `psycopg-pool`\n- Comprehensive transaction management with savepoint support\n- Type-safe repository pattern with Pydantic model validation\n- **JSONB support with automatic field detection and psycopg JSON adapters**\n- **Native pgvector support with automatic vector field detection**\n- PostgreSQL array field preservation (TEXT[], INTEGER[])\n- Automatic date/timestamp conversion for Pydantic models\n- SQL query builder with SQL injection protection\n- Database schema and test data lifecycle management\n- Automatic retry mechanism with exponential backoff\n- Granular exception hierarchy for error handling\n- Connection health monitoring and validation\n- Database initialization callback system\n- Statement timeout configuration\n- Fully typed with modern Python type hints\n\n## Installation\n\n```bash\npip install psycopg-toolkit\n```\n\n## Quick Start\n\n```python\nfrom psycopg_toolkit import Database, DatabaseSettings\nfrom uuid import uuid4\n\n# Configure database\nsettings = DatabaseSettings(\n host=\"localhost\",\n port=5432,\n dbname=\"your_database\",\n user=\"your_user\",\n password=\"your_password\"\n)\n\nasync def main():\n # Initialize database\n db = Database(settings)\n await db.init_db()\n \n # Get transaction manager\n tm = await db.get_transaction_manager()\n \n # Execute in transaction\n async with tm.transaction() as conn:\n async with conn.cursor() as cur:\n user_id = uuid4()\n await cur.execute(\n \"INSERT INTO users (id, email) VALUES (%s, %s)\",\n (user_id, \"user@example.com\")\n )\n \n # Clean up\n await db.cleanup()\n```\n\n## Core Components\n\n### Database Management\n\n```python\n# Health check\nis_healthy = await db.check_pool_health()\n\n# Connection management\nasync with db.connection() as conn:\n async with conn.cursor() as cur:\n await cur.execute(\"SELECT version()\")\n```\n\n### Transaction Management\n\n```python\n# Basic transaction\nasync with tm.transaction() as conn:\n # Operations automatically rolled back on error\n pass\n\n# With savepoint\nasync with tm.transaction(savepoint=\"user_creation\") as conn:\n # Nested transaction using savepoint\n pass\n```\n\n### Repository Pattern\n\n```python\nfrom pydantic import BaseModel\nfrom psycopg_toolkit import BaseRepository\n\nclass User(BaseModel):\n id: UUID\n email: str\n\nclass UserRepository(BaseRepository[User]):\n def __init__(self, conn: AsyncConnection):\n super().__init__(\n db_connection=conn,\n table_name=\"users\",\n model_class=User,\n primary_key=\"id\"\n )\n\n# Usage\nasync with tm.transaction() as conn:\n repo = UserRepository(conn)\n user = await repo.get_by_id(user_id)\n```\n\n### JSONB Support\n\n```python\nfrom typing import Dict, List, Any\nfrom pydantic import BaseModel\nfrom psycopg_toolkit import BaseRepository\n\nclass UserProfile(BaseModel):\n id: int\n name: str\n # These fields are automatically detected as JSONB\n metadata: Dict[str, Any]\n preferences: Dict[str, str]\n tags: List[str]\n\nclass UserRepository(BaseRepository[UserProfile, int]):\n def __init__(self, conn):\n super().__init__(\n db_connection=conn,\n table_name=\"user_profiles\",\n model_class=UserProfile,\n primary_key=\"id\"\n # auto_detect_json=True by default\n )\n\n# Usage - JSON fields handled automatically\nuser = UserProfile(\n id=1,\n name=\"John Doe\",\n metadata={\"created_at\": \"2024-01-01\", \"source\": \"web\"},\n preferences={\"theme\": \"dark\", \"language\": \"en\"},\n tags=[\"premium\", \"beta_tester\"]\n)\n\n# JSONB fields automatically serialized/deserialized\ncreated_user = await repo.create(user)\nretrieved_user = await repo.get_by_id(1)\n```\n\n### PostgreSQL Arrays and Date Fields\n\n```python\nfrom typing import List\nfrom datetime import date\nfrom pydantic import BaseModel\nfrom psycopg_toolkit import BaseRepository\n\nclass User(BaseModel):\n id: UUID\n username: str\n roles: List[str] # PostgreSQL TEXT[] array\n permissions: List[str] # PostgreSQL TEXT[] array\n metadata: Dict[str, Any] # JSONB field\n birthdate: str # ISO date string (from DATE)\n created_at: str # ISO datetime string (from TIMESTAMP)\n updated_at: str # ISO datetime string (from TIMESTAMPTZ)\n last_login: str | None # Optional timestamp field\n\nclass UserRepository(BaseRepository[User, UUID]):\n def __init__(self, conn):\n super().__init__(\n db_connection=conn,\n table_name=\"users\",\n model_class=User,\n primary_key=\"id\",\n # Preserve PostgreSQL arrays instead of JSONB\n array_fields={\"roles\", \"permissions\"},\n # Auto-convert ALL date/timestamp fields to/from strings\n date_fields={\"birthdate\", \"created_at\", \"updated_at\", \"last_login\"}\n )\n\n# PostgreSQL arrays are preserved, dates are auto-converted\nuser = User(\n id=uuid4(),\n username=\"john\",\n roles=[\"admin\", \"user\"], # Stored as TEXT[]\n permissions=[\"read\", \"write\"], # Stored as TEXT[]\n metadata={\"dept\": \"IT\"}, # Stored as JSONB\n birthdate=\"1990-01-01\", # Converts to/from PostgreSQL DATE\n created_at=\"2024-01-01T12:00:00\", # Converts to/from TIMESTAMP\n updated_at=\"2024-01-01T12:00:00\", # Converts to/from TIMESTAMPTZ\n last_login=None # Nullable timestamp field\n)\n```\n\n### Schema Management\n\n```python\nfrom psycopg_toolkit.core.transaction import SchemaManager\n\nclass UserSchemaManager(SchemaManager[None]):\n async def create_schema(self, conn: AsyncConnection) -> None:\n await conn.execute(\"\"\"\n CREATE TABLE IF NOT EXISTS users (\n id UUID PRIMARY KEY,\n email TEXT UNIQUE NOT NULL\n )\n \"\"\")\n\n async def drop_schema(self, conn: AsyncConnection) -> None:\n await conn.execute(\"DROP TABLE IF EXISTS users\")\n\n# Usage\nasync with tm.with_schema(UserSchemaManager()) as _:\n # Schema available here\n pass # Automatically dropped after\n```\n\n## Error Handling\n\n```python\nfrom psycopg_toolkit import (\n DatabaseConnectionError,\n DatabasePoolError,\n DatabaseNotAvailable,\n RecordNotFoundError\n)\n\ntry:\n async with tm.transaction() as conn:\n repo = UserRepository(conn)\n user = await repo.get_by_id(user_id)\nexcept DatabaseConnectionError as e:\n print(f\"Connection error: {e.original_error}\")\nexcept RecordNotFoundError:\n print(f\"User {user_id} not found\")\n```\n\n\n## Testing with Async Libraries\n\n### Understanding the pytest and Async Boundary\n\nWhen testing async libraries like `async-task-worker` or database libraries (e.g., `psycopg-toolkit`), you'll encounter a fundamental challenge: pytest's session-scoped fixtures are synchronous, while modern libraries are increasingly async. This section explains the problem and provides the recommended patterns for handling this boundary.\n\n#### The Core Problem\n\npytest's fixture system was designed before async became prevalent in Python:\n- **Session fixtures are synchronous**: They run once per test session and must be sync functions\n- **Modern libraries are async**: Database pools, task workers, and connections require `await`\n- **Resource efficiency matters**: You want to reuse expensive resources (containers, pools) across tests\n\nThis creates an awkward boundary where you need async resources in sync fixture setup.\n\n#### The Standard Solution: `asyncio.run()`\n\nThe accepted pattern in the Python testing community is using `asyncio.run()` in session fixtures. While it may feel inelegant, this is the recommended approach documented in pytest-asyncio's own examples.\n\n```python\nimport pytest\nimport asyncio\nfrom async_task_worker import AsyncTaskWorker\n\n@pytest.fixture(scope=\"session\")\ndef worker():\n \"\"\"Session-scoped worker for test efficiency.\n \n This uses asyncio.run() to bridge the sync/async boundary.\n This is the standard pattern for async resources in session fixtures.\n \"\"\"\n # Create the worker\n worker = AsyncTaskWorker(max_workers=5)\n \n # Start it using asyncio.run() - the standard workaround\n asyncio.run(worker.start())\n \n yield worker\n \n # Cleanup\n asyncio.run(worker.stop())\n\n# Your async tests can then use the worker normally\n@pytest.mark.asyncio\nasync def test_task_execution(worker):\n task_id = await worker.add_task(my_task, data)\n result = await worker.get_task_future(task_id)\n assert result == expected\n```\n\n### Complete Testing Pattern\n\nHere's a comprehensive example showing best practices for testing async-task-worker:\n\n```python\nimport pytest\nimport pytest_asyncio\nimport asyncio\nfrom datetime import datetime\nfrom async_task_worker import AsyncTaskWorker, task, TaskStatus\n\n# ============================================================\n# Session Fixtures (Synchronous with asyncio.run())\n# ============================================================\n\n@pytest.fixture(scope=\"session\")\ndef event_loop_policy():\n \"\"\"Use a specific event loop policy for tests.\"\"\"\n return asyncio.get_event_loop_policy()\n\n@pytest.fixture(scope=\"session\")\ndef worker():\n \"\"\"Create a session-scoped worker instance.\n \n Note: We use asyncio.run() here because session fixtures\n must be synchronous. This is the standard pattern.\n \"\"\"\n worker = AsyncTaskWorker(\n max_workers=3,\n task_timeout=10,\n cache_enabled=True,\n cache_ttl=60\n )\n \n # Start the worker using asyncio.run()\n asyncio.run(worker.start())\n \n yield worker\n \n # Cleanup\n asyncio.run(worker.stop())\n\n# ============================================================\n# Function-Scoped Async Fixtures (Can use async/await)\n# ============================================================\n\n@pytest_asyncio.fixture\nasync def clean_worker(worker):\n \"\"\"Provides a clean worker state for each test.\"\"\"\n # Clear any existing tasks\n tasks = worker.get_all_tasks()\n for task in tasks:\n if task.status in (\"pending\", \"running\"):\n await worker.cancel_task(task.id)\n \n # Clear cache if enabled\n if worker.cache:\n await worker.clear_cache()\n \n yield worker\n \n # Post-test cleanup if needed\n # ...\n\n# ============================================================\n# Test Task Definitions\n# ============================================================\n\n@task(\"test_computation\")\nasync def test_computation(value, delay=0.1, progress_callback=None):\n \"\"\"A test task that simulates computation.\"\"\"\n steps = 5\n result = value\n \n for i in range(steps):\n await asyncio.sleep(delay)\n result = result * 2\n \n if progress_callback:\n progress_callback((i + 1) / steps)\n \n return result\n\n@task(\"test_failing_task\")\nasync def test_failing_task(should_fail=True):\n \"\"\"A task that can be configured to fail.\"\"\"\n if should_fail:\n raise ValueError(\"Task failed as expected\")\n return \"success\"\n\n# ============================================================\n# Test Cases\n# ============================================================\n\n@pytest.mark.asyncio\nasync def test_basic_task_execution(clean_worker):\n \"\"\"Test basic task execution flow.\"\"\"\n # Add a task\n task_id = await clean_worker.add_task(\n test_computation,\n value=2,\n delay=0.01\n )\n \n # Wait for completion\n result = await clean_worker.get_task_future(task_id)\n \n # Verify result (2 * 2^5 = 64)\n assert result == 64\n \n # Check task info\n info = clean_worker.get_task_info(task_id)\n assert info.status == TaskStatus.COMPLETED\n assert info.progress == 1.0\n\n@pytest.mark.asyncio\nasync def test_task_cancellation(clean_worker):\n \"\"\"Test task cancellation.\"\"\"\n # Start a long-running task\n task_id = await clean_worker.add_task(\n test_computation,\n value=1,\n delay=1.0 # Long delay\n )\n \n # Cancel it\n cancelled = await clean_worker.cancel_task(task_id)\n assert cancelled\n \n # Verify status\n info = clean_worker.get_task_info(task_id)\n assert info.status == TaskStatus.CANCELLED\n\n@pytest.mark.asyncio\nasync def test_cache_functionality(clean_worker):\n \"\"\"Test caching of task results.\"\"\"\n # First execution\n task_id1 = await clean_worker.add_task(\n test_computation,\n value=3,\n delay=0.01,\n use_cache=True\n )\n result1 = await clean_worker.get_task_future(task_id1)\n \n # Second execution with same args (should hit cache)\n start_time = datetime.now()\n task_id2 = await clean_worker.add_task(\n test_computation,\n value=3,\n delay=0.01, # This delay won't happen due to cache\n use_cache=True\n )\n result2 = await clean_worker.get_task_future(task_id2)\n elapsed = (datetime.now() - start_time).total_seconds()\n \n # Results should match\n assert result1 == result2\n \n # Second execution should be much faster (cache hit)\n assert elapsed < 0.05 # Much less than the task delay\n\n@pytest.mark.asyncio\nasync def test_concurrent_tasks(clean_worker):\n \"\"\"Test concurrent task execution.\"\"\"\n # Add multiple tasks\n task_ids = []\n for i in range(5):\n task_id = await clean_worker.add_task(\n test_computation,\n value=i,\n delay=0.01\n )\n task_ids.append(task_id)\n \n # Wait for all to complete\n futures = clean_worker.get_task_futures(task_ids)\n results = await asyncio.gather(*futures)\n \n # Verify all completed\n assert len(results) == 5\n for i, result in enumerate(results):\n assert result == i * (2 ** 5)\n\n# ============================================================\n# Testing Utilities\n# ============================================================\n\nclass TaskMonitor:\n \"\"\"Utility class for monitoring task execution in tests.\"\"\"\n \n def __init__(self, worker: AsyncTaskWorker):\n self.worker = worker\n self.events = []\n \n async def monitor_task(self, task_id: str, timeout: float = 5.0):\n \"\"\"Monitor a task until completion or timeout.\"\"\"\n start_time = asyncio.get_event_loop().time()\n \n while asyncio.get_event_loop().time() - start_time < timeout:\n info = self.worker.get_task_info(task_id)\n self.events.append({\n \"time\": asyncio.get_event_loop().time() - start_time,\n \"status\": info.status,\n \"progress\": info.progress\n })\n \n if info.status not in (\"pending\", \"running\"):\n return info\n \n await asyncio.sleep(0.1)\n \n raise TimeoutError(f\"Task {task_id} did not complete within {timeout}s\")\n\n@pytest.mark.asyncio\nasync def test_with_monitor(clean_worker):\n \"\"\"Example of using the TaskMonitor utility.\"\"\"\n monitor = TaskMonitor(clean_worker)\n \n task_id = await clean_worker.add_task(\n test_computation,\n value=5,\n delay=0.1\n )\n \n # Monitor the task\n final_info = await monitor.monitor_task(task_id)\n \n # Check the monitoring captured progress\n assert len(monitor.events) > 0\n assert final_info.status == TaskStatus.COMPLETED\n \n # Verify progress increased over time\n progresses = [e[\"progress\"] for e in monitor.events]\n assert progresses[-1] == 1.0\n```\n\n### Testing with Database Libraries (e.g., psycopg-toolkit)\n\nThe same pattern applies when testing with async database libraries:\n\n```python\nimport pytest\nimport asyncio\nfrom psycopg_toolkit import Database\n\n@pytest.fixture(scope=\"session\")\ndef db():\n \"\"\"Session-scoped database connection.\n \n Uses asyncio.run() to handle async operations in sync fixture.\n This is the standard pattern for pytest with async resources.\n \"\"\"\n db = Database(\"postgresql://user:pass@localhost/test\")\n \n # Create the pool synchronously using asyncio.run()\n asyncio.run(db.get_pool())\n \n yield db\n \n # Cleanup\n asyncio.run(db.cleanup())\n\n@pytest.fixture(scope=\"function\")\nasync def db_connection(db):\n \"\"\"Function-scoped connection from the session pool.\"\"\"\n async with db.connection() as conn:\n yield conn\n # Automatic cleanup via context manager\n```\n\n### Key Takeaways\n\n1. **The `asyncio.run()` pattern is correct**: Don't feel bad about using it in session fixtures - it's the standard solution\n2. **Session vs Function Scope**: Use session fixtures for expensive resources (pools, workers) and function fixtures for cleanup\n3. **The awkwardness is pytest's limitation, not your design**: Your async library doesn't need sync wrappers\n4. **Document the pattern**: Help users understand why `asyncio.run()` appears in test fixtures\n\n### Additional Testing Resources\n\n- **pytest-asyncio**: The standard pytest plugin for async testing\n- **pytest-timeout**: Useful for preventing hanging async tests\n- **asyncio-test**: Utilities for testing asyncio code\n- **Testing async generators**: Use `async for` in tests or `aioitertools` for utilities\n\n### Common Pitfalls and Solutions\n\n#### Pitfall 1: Event Loop Conflicts\n```python\n# Wrong - Creates new event loop\ndef test_something():\n loop = asyncio.new_event_loop()\n loop.run_until_complete(my_async_func())\n\n# Right - Use pytest-asyncio\n@pytest.mark.asyncio\nasync def test_something():\n await my_async_func()\n```\n\n#### Pitfall 2: Forgetting Cleanup\n```python\n# Wrong - No cleanup\n@pytest.fixture\ndef worker():\n w = AsyncTaskWorker()\n asyncio.run(w.start())\n return w\n\n# Right - Proper cleanup\n@pytest.fixture\ndef worker():\n w = AsyncTaskWorker()\n asyncio.run(w.start())\n yield w\n asyncio.run(w.stop())\n```\n\n#### Pitfall 3: Mixing Sync and Async Incorrectly\n```python\n# Wrong - Can't await in sync function\ndef test_task(worker):\n result = await worker.add_task(...) # SyntaxError\n\n# Right - Mark as async test\n@pytest.mark.asyncio\nasync def test_task(worker):\n result = await worker.add_task(...)\n```\n\nBy following these patterns, you can effectively test async-task-worker and similar async libraries while maintaining clean, efficient test suites.\n\n\n## Documentation\n\n- [Database Management](docs/database.md)\n- [Transaction Management](docs/transaction_manager.md)\n- [Base Repository](docs/base_repository.md)\n- [JSONB Support](docs/jsonb_support.md)\n- [PsycopgHelper](docs/psycopg_helper.md)\n\n## Running Tests\n\n```bash\n# Install dependencies\nuv sync --all-groups\n\n# Run all tests except performance tests (default)\nuv run pytest\n\n# Run only performance tests\nuv run pytest -m performance\n\n# Run all tests including performance\nuv run pytest -m \"\"\n\n# Run specific test categories\nuv run pytest tests/unit/ # Only unit tests\nuv run pytest -m performance # Only performance tests\n\n# Run with coverage\nuv run pytest --cov=src/psycopg_toolkit --cov-report=html\n```\n\n### Test Categories\n\nThe test suite is organized into three categories:\n\n- **Unit tests**: Fast, isolated tests that don't require a database (in `tests/unit/`)\n- **Integration tests**: Tests that require a real PostgreSQL database (in `tests/` root)\n- **Performance tests**: Benchmarks and performance measurements (marked with `@pytest.mark.performance`)\n\nPerformance tests are excluded by default to keep the regular test runs fast. Use the `-m performance` flag to run them explicitly.\n\n## Contributing\n\n1. Fork the repository\n2. Create a feature branch\n3. Add tests for new features\n4. Ensure all tests pass\n5. Submit a pull request\n\n## License\n\nThis project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "A Python PostgreSQL database utility with connection pooling",
"version": "0.3.1",
"project_urls": {
"Documentation": "https://github.com/descoped/psycopg-toolkit/tree/master/docs",
"Homepage": "https://github.com/descoped/psycopg-toolkit",
"Issues": "https://github.com/descoped/psycopg-toolkit/issues",
"Repository": "https://github.com/descoped/psycopg-toolkit"
},
"split_keywords": [
"async",
" database",
" jsonb",
" pgvector",
" pool",
" postgresql",
" psycopg",
" repository-pattern"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "9d1d54cee1c704ad42f978b953649248635970df364647389a69eb2261100a3d",
"md5": "6d256b7f28d0cae0a7bc07894ea9d2a3",
"sha256": "221c0f6a6dc372049ca68adcb5a9dd1d67abcc5ad1414d6b1827abd5d4a0a525"
},
"downloads": -1,
"filename": "psycopg_toolkit-0.3.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "6d256b7f28d0cae0a7bc07894ea9d2a3",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.11",
"size": 28218,
"upload_time": "2025-11-02T17:39:16",
"upload_time_iso_8601": "2025-11-02T17:39:16.677976Z",
"url": "https://files.pythonhosted.org/packages/9d/1d/54cee1c704ad42f978b953649248635970df364647389a69eb2261100a3d/psycopg_toolkit-0.3.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "94fbcad3bdc43fde712cb3258554a100d62598cb1a9681205e150b4b16378706",
"md5": "c684d8795c22c7b0687b798b9408dd47",
"sha256": "52801c7bfdb5175c01f800856229466e32c33eabe9c8c6669df77b306ed31132"
},
"downloads": -1,
"filename": "psycopg_toolkit-0.3.1.tar.gz",
"has_sig": false,
"md5_digest": "c684d8795c22c7b0687b798b9408dd47",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.11",
"size": 25746,
"upload_time": "2025-11-02T17:39:18",
"upload_time_iso_8601": "2025-11-02T17:39:18.190588Z",
"url": "https://files.pythonhosted.org/packages/94/fb/cad3bdc43fde712cb3258554a100d62598cb1a9681205e150b4b16378706/psycopg_toolkit-0.3.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-11-02 17:39:18",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "descoped",
"github_project": "psycopg-toolkit",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "psycopg-toolkit"
}