psycopg-toolkit


Namepsycopg-toolkit JSON
Version 0.3.1 PyPI version JSON
download
home_pageNone
SummaryA Python PostgreSQL database utility with connection pooling
upload_time2025-11-02 17:39:18
maintainerNone
docs_urlNone
authorNone
requires_python>=3.11
licenseMIT
keywords async database jsonb pgvector pool postgresql psycopg repository-pattern
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Psycopg Toolkit

[![Build Status](https://github.com/descoped/psycopg-toolkit/actions/workflows/build-test.yml/badge.svg)](https://github.com/descoped/psycopg-toolkit/actions/workflows/build-test-native.yml)
[![Coverage](https://codecov.io/gh/descoped/psycopg-toolkit/branch/master/graph/badge.svg)](https://codecov.io/gh/descoped/psycopg-toolkit)
[![Python Version](https://img.shields.io/badge/python-3.11%2B-blue.svg)](https://www.python.org/downloads/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![Release](https://img.shields.io/github/v/release/descoped/psycopg-toolkit)](https://github.com/descoped/psycopg-toolkit/releases)

A robust PostgreSQL database toolkit providing enterprise-grade connection pooling and database management capabilities for Python applications.

## Features

- Async-first design with connection pooling via `psycopg-pool`
- Comprehensive transaction management with savepoint support
- Type-safe repository pattern with Pydantic model validation
- **JSONB support with automatic field detection and psycopg JSON adapters**
- **Native pgvector support with automatic vector field detection**
- PostgreSQL array field preservation (TEXT[], INTEGER[])
- Automatic date/timestamp conversion for Pydantic models
- SQL query builder with SQL injection protection
- Database schema and test data lifecycle management
- Automatic retry mechanism with exponential backoff
- Granular exception hierarchy for error handling
- Connection health monitoring and validation
- Database initialization callback system
- Statement timeout configuration
- Fully typed with modern Python type hints

## Installation

```bash
pip install psycopg-toolkit
```

## Quick Start

```python
from psycopg_toolkit import Database, DatabaseSettings
from uuid import uuid4

# Configure database
settings = DatabaseSettings(
    host="localhost",
    port=5432,
    dbname="your_database",
    user="your_user",
    password="your_password"
)

async def main():
    # Initialize database
    db = Database(settings)
    await db.init_db()
    
    # Get transaction manager
    tm = await db.get_transaction_manager()
    
    # Execute in transaction
    async with tm.transaction() as conn:
        async with conn.cursor() as cur:
            user_id = uuid4()
            await cur.execute(
                "INSERT INTO users (id, email) VALUES (%s, %s)",
                (user_id, "user@example.com")
            )
    
    # Clean up
    await db.cleanup()
```

## Core Components

### Database Management

```python
# Health check
is_healthy = await db.check_pool_health()

# Connection management
async with db.connection() as conn:
    async with conn.cursor() as cur:
        await cur.execute("SELECT version()")
```

### Transaction Management

```python
# Basic transaction
async with tm.transaction() as conn:
    # Operations automatically rolled back on error
    pass

# With savepoint
async with tm.transaction(savepoint="user_creation") as conn:
    # Nested transaction using savepoint
    pass
```

### Repository Pattern

```python
from pydantic import BaseModel
from psycopg_toolkit import BaseRepository

class User(BaseModel):
    id: UUID
    email: str

class UserRepository(BaseRepository[User]):
    def __init__(self, conn: AsyncConnection):
        super().__init__(
            db_connection=conn,
            table_name="users",
            model_class=User,
            primary_key="id"
        )

# Usage
async with tm.transaction() as conn:
    repo = UserRepository(conn)
    user = await repo.get_by_id(user_id)
```

### JSONB Support

```python
from typing import Dict, List, Any
from pydantic import BaseModel
from psycopg_toolkit import BaseRepository

class UserProfile(BaseModel):
    id: int
    name: str
    # These fields are automatically detected as JSONB
    metadata: Dict[str, Any]
    preferences: Dict[str, str]
    tags: List[str]

class UserRepository(BaseRepository[UserProfile, int]):
    def __init__(self, conn):
        super().__init__(
            db_connection=conn,
            table_name="user_profiles",
            model_class=UserProfile,
            primary_key="id"
            # auto_detect_json=True by default
        )

# Usage - JSON fields handled automatically
user = UserProfile(
    id=1,
    name="John Doe",
    metadata={"created_at": "2024-01-01", "source": "web"},
    preferences={"theme": "dark", "language": "en"},
    tags=["premium", "beta_tester"]
)

# JSONB fields automatically serialized/deserialized
created_user = await repo.create(user)
retrieved_user = await repo.get_by_id(1)
```

### PostgreSQL Arrays and Date Fields

```python
from typing import List
from datetime import date
from pydantic import BaseModel
from psycopg_toolkit import BaseRepository

class User(BaseModel):
    id: UUID
    username: str
    roles: List[str]          # PostgreSQL TEXT[] array
    permissions: List[str]    # PostgreSQL TEXT[] array
    metadata: Dict[str, Any]  # JSONB field
    birthdate: str            # ISO date string (from DATE)
    created_at: str           # ISO datetime string (from TIMESTAMP)
    updated_at: str           # ISO datetime string (from TIMESTAMPTZ)
    last_login: str | None    # Optional timestamp field

class UserRepository(BaseRepository[User, UUID]):
    def __init__(self, conn):
        super().__init__(
            db_connection=conn,
            table_name="users",
            model_class=User,
            primary_key="id",
            # Preserve PostgreSQL arrays instead of JSONB
            array_fields={"roles", "permissions"},
            # Auto-convert ALL date/timestamp fields to/from strings
            date_fields={"birthdate", "created_at", "updated_at", "last_login"}
        )

# PostgreSQL arrays are preserved, dates are auto-converted
user = User(
    id=uuid4(),
    username="john",
    roles=["admin", "user"],      # Stored as TEXT[]
    permissions=["read", "write"], # Stored as TEXT[]
    metadata={"dept": "IT"},       # Stored as JSONB
    birthdate="1990-01-01",           # Converts to/from PostgreSQL DATE
    created_at="2024-01-01T12:00:00", # Converts to/from TIMESTAMP
    updated_at="2024-01-01T12:00:00", # Converts to/from TIMESTAMPTZ
    last_login=None                   # Nullable timestamp field
)
```

### Schema Management

```python
from psycopg_toolkit.core.transaction import SchemaManager

class UserSchemaManager(SchemaManager[None]):
    async def create_schema(self, conn: AsyncConnection) -> None:
        await conn.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id UUID PRIMARY KEY,
                email TEXT UNIQUE NOT NULL
            )
        """)

    async def drop_schema(self, conn: AsyncConnection) -> None:
        await conn.execute("DROP TABLE IF EXISTS users")

# Usage
async with tm.with_schema(UserSchemaManager()) as _:
    # Schema available here
    pass  # Automatically dropped after
```

## Error Handling

```python
from psycopg_toolkit import (
    DatabaseConnectionError,
    DatabasePoolError,
    DatabaseNotAvailable,
    RecordNotFoundError
)

try:
    async with tm.transaction() as conn:
        repo = UserRepository(conn)
        user = await repo.get_by_id(user_id)
except DatabaseConnectionError as e:
    print(f"Connection error: {e.original_error}")
except RecordNotFoundError:
    print(f"User {user_id} not found")
```


## Testing with Async Libraries

### Understanding the pytest and Async Boundary

When testing async libraries like `async-task-worker` or database libraries (e.g., `psycopg-toolkit`), you'll encounter a fundamental challenge: pytest's session-scoped fixtures are synchronous, while modern libraries are increasingly async. This section explains the problem and provides the recommended patterns for handling this boundary.

#### The Core Problem

pytest's fixture system was designed before async became prevalent in Python:
- **Session fixtures are synchronous**: They run once per test session and must be sync functions
- **Modern libraries are async**: Database pools, task workers, and connections require `await`
- **Resource efficiency matters**: You want to reuse expensive resources (containers, pools) across tests

This creates an awkward boundary where you need async resources in sync fixture setup.

#### The Standard Solution: `asyncio.run()`

The accepted pattern in the Python testing community is using `asyncio.run()` in session fixtures. While it may feel inelegant, this is the recommended approach documented in pytest-asyncio's own examples.

```python
import pytest
import asyncio
from async_task_worker import AsyncTaskWorker

@pytest.fixture(scope="session")
def worker():
    """Session-scoped worker for test efficiency.
    
    This uses asyncio.run() to bridge the sync/async boundary.
    This is the standard pattern for async resources in session fixtures.
    """
    # Create the worker
    worker = AsyncTaskWorker(max_workers=5)
    
    # Start it using asyncio.run() - the standard workaround
    asyncio.run(worker.start())
    
    yield worker
    
    # Cleanup
    asyncio.run(worker.stop())

# Your async tests can then use the worker normally
@pytest.mark.asyncio
async def test_task_execution(worker):
    task_id = await worker.add_task(my_task, data)
    result = await worker.get_task_future(task_id)
    assert result == expected
```

### Complete Testing Pattern

Here's a comprehensive example showing best practices for testing async-task-worker:

```python
import pytest
import pytest_asyncio
import asyncio
from datetime import datetime
from async_task_worker import AsyncTaskWorker, task, TaskStatus

# ============================================================
# Session Fixtures (Synchronous with asyncio.run())
# ============================================================

@pytest.fixture(scope="session")
def event_loop_policy():
    """Use a specific event loop policy for tests."""
    return asyncio.get_event_loop_policy()

@pytest.fixture(scope="session")
def worker():
    """Create a session-scoped worker instance.
    
    Note: We use asyncio.run() here because session fixtures
    must be synchronous. This is the standard pattern.
    """
    worker = AsyncTaskWorker(
        max_workers=3,
        task_timeout=10,
        cache_enabled=True,
        cache_ttl=60
    )
    
    # Start the worker using asyncio.run()
    asyncio.run(worker.start())
    
    yield worker
    
    # Cleanup
    asyncio.run(worker.stop())

# ============================================================
# Function-Scoped Async Fixtures (Can use async/await)
# ============================================================

@pytest_asyncio.fixture
async def clean_worker(worker):
    """Provides a clean worker state for each test."""
    # Clear any existing tasks
    tasks = worker.get_all_tasks()
    for task in tasks:
        if task.status in ("pending", "running"):
            await worker.cancel_task(task.id)
    
    # Clear cache if enabled
    if worker.cache:
        await worker.clear_cache()
    
    yield worker
    
    # Post-test cleanup if needed
    # ...

# ============================================================
# Test Task Definitions
# ============================================================

@task("test_computation")
async def test_computation(value, delay=0.1, progress_callback=None):
    """A test task that simulates computation."""
    steps = 5
    result = value
    
    for i in range(steps):
        await asyncio.sleep(delay)
        result = result * 2
        
        if progress_callback:
            progress_callback((i + 1) / steps)
    
    return result

@task("test_failing_task")
async def test_failing_task(should_fail=True):
    """A task that can be configured to fail."""
    if should_fail:
        raise ValueError("Task failed as expected")
    return "success"

# ============================================================
# Test Cases
# ============================================================

@pytest.mark.asyncio
async def test_basic_task_execution(clean_worker):
    """Test basic task execution flow."""
    # Add a task
    task_id = await clean_worker.add_task(
        test_computation,
        value=2,
        delay=0.01
    )
    
    # Wait for completion
    result = await clean_worker.get_task_future(task_id)
    
    # Verify result (2 * 2^5 = 64)
    assert result == 64
    
    # Check task info
    info = clean_worker.get_task_info(task_id)
    assert info.status == TaskStatus.COMPLETED
    assert info.progress == 1.0

@pytest.mark.asyncio
async def test_task_cancellation(clean_worker):
    """Test task cancellation."""
    # Start a long-running task
    task_id = await clean_worker.add_task(
        test_computation,
        value=1,
        delay=1.0  # Long delay
    )
    
    # Cancel it
    cancelled = await clean_worker.cancel_task(task_id)
    assert cancelled
    
    # Verify status
    info = clean_worker.get_task_info(task_id)
    assert info.status == TaskStatus.CANCELLED

@pytest.mark.asyncio
async def test_cache_functionality(clean_worker):
    """Test caching of task results."""
    # First execution
    task_id1 = await clean_worker.add_task(
        test_computation,
        value=3,
        delay=0.01,
        use_cache=True
    )
    result1 = await clean_worker.get_task_future(task_id1)
    
    # Second execution with same args (should hit cache)
    start_time = datetime.now()
    task_id2 = await clean_worker.add_task(
        test_computation,
        value=3,
        delay=0.01,  # This delay won't happen due to cache
        use_cache=True
    )
    result2 = await clean_worker.get_task_future(task_id2)
    elapsed = (datetime.now() - start_time).total_seconds()
    
    # Results should match
    assert result1 == result2
    
    # Second execution should be much faster (cache hit)
    assert elapsed < 0.05  # Much less than the task delay

@pytest.mark.asyncio
async def test_concurrent_tasks(clean_worker):
    """Test concurrent task execution."""
    # Add multiple tasks
    task_ids = []
    for i in range(5):
        task_id = await clean_worker.add_task(
            test_computation,
            value=i,
            delay=0.01
        )
        task_ids.append(task_id)
    
    # Wait for all to complete
    futures = clean_worker.get_task_futures(task_ids)
    results = await asyncio.gather(*futures)
    
    # Verify all completed
    assert len(results) == 5
    for i, result in enumerate(results):
        assert result == i * (2 ** 5)

# ============================================================
# Testing Utilities
# ============================================================

class TaskMonitor:
    """Utility class for monitoring task execution in tests."""
    
    def __init__(self, worker: AsyncTaskWorker):
        self.worker = worker
        self.events = []
    
    async def monitor_task(self, task_id: str, timeout: float = 5.0):
        """Monitor a task until completion or timeout."""
        start_time = asyncio.get_event_loop().time()
        
        while asyncio.get_event_loop().time() - start_time < timeout:
            info = self.worker.get_task_info(task_id)
            self.events.append({
                "time": asyncio.get_event_loop().time() - start_time,
                "status": info.status,
                "progress": info.progress
            })
            
            if info.status not in ("pending", "running"):
                return info
            
            await asyncio.sleep(0.1)
        
        raise TimeoutError(f"Task {task_id} did not complete within {timeout}s")

@pytest.mark.asyncio
async def test_with_monitor(clean_worker):
    """Example of using the TaskMonitor utility."""
    monitor = TaskMonitor(clean_worker)
    
    task_id = await clean_worker.add_task(
        test_computation,
        value=5,
        delay=0.1
    )
    
    # Monitor the task
    final_info = await monitor.monitor_task(task_id)
    
    # Check the monitoring captured progress
    assert len(monitor.events) > 0
    assert final_info.status == TaskStatus.COMPLETED
    
    # Verify progress increased over time
    progresses = [e["progress"] for e in monitor.events]
    assert progresses[-1] == 1.0
```

### Testing with Database Libraries (e.g., psycopg-toolkit)

The same pattern applies when testing with async database libraries:

```python
import pytest
import asyncio
from psycopg_toolkit import Database

@pytest.fixture(scope="session")
def db():
    """Session-scoped database connection.
    
    Uses asyncio.run() to handle async operations in sync fixture.
    This is the standard pattern for pytest with async resources.
    """
    db = Database("postgresql://user:pass@localhost/test")
    
    # Create the pool synchronously using asyncio.run()
    asyncio.run(db.get_pool())
    
    yield db
    
    # Cleanup
    asyncio.run(db.cleanup())

@pytest.fixture(scope="function")
async def db_connection(db):
    """Function-scoped connection from the session pool."""
    async with db.connection() as conn:
        yield conn
        # Automatic cleanup via context manager
```

### Key Takeaways

1. **The `asyncio.run()` pattern is correct**: Don't feel bad about using it in session fixtures - it's the standard solution
2. **Session vs Function Scope**: Use session fixtures for expensive resources (pools, workers) and function fixtures for cleanup
3. **The awkwardness is pytest's limitation, not your design**: Your async library doesn't need sync wrappers
4. **Document the pattern**: Help users understand why `asyncio.run()` appears in test fixtures

### Additional Testing Resources

- **pytest-asyncio**: The standard pytest plugin for async testing
- **pytest-timeout**: Useful for preventing hanging async tests
- **asyncio-test**: Utilities for testing asyncio code
- **Testing async generators**: Use `async for` in tests or `aioitertools` for utilities

### Common Pitfalls and Solutions

#### Pitfall 1: Event Loop Conflicts
```python
# Wrong - Creates new event loop
def test_something():
    loop = asyncio.new_event_loop()
    loop.run_until_complete(my_async_func())

# Right - Use pytest-asyncio
@pytest.mark.asyncio
async def test_something():
    await my_async_func()
```

#### Pitfall 2: Forgetting Cleanup
```python
# Wrong - No cleanup
@pytest.fixture
def worker():
    w = AsyncTaskWorker()
    asyncio.run(w.start())
    return w

# Right - Proper cleanup
@pytest.fixture
def worker():
    w = AsyncTaskWorker()
    asyncio.run(w.start())
    yield w
    asyncio.run(w.stop())
```

#### Pitfall 3: Mixing Sync and Async Incorrectly
```python
# Wrong - Can't await in sync function
def test_task(worker):
    result = await worker.add_task(...)  # SyntaxError

# Right - Mark as async test
@pytest.mark.asyncio
async def test_task(worker):
    result = await worker.add_task(...)
```

By following these patterns, you can effectively test async-task-worker and similar async libraries while maintaining clean, efficient test suites.


## Documentation

- [Database Management](docs/database.md)
- [Transaction Management](docs/transaction_manager.md)
- [Base Repository](docs/base_repository.md)
- [JSONB Support](docs/jsonb_support.md)
- [PsycopgHelper](docs/psycopg_helper.md)

## Running Tests

```bash
# Install dependencies
uv sync --all-groups

# Run all tests except performance tests (default)
uv run pytest

# Run only performance tests
uv run pytest -m performance

# Run all tests including performance
uv run pytest -m ""

# Run specific test categories
uv run pytest tests/unit/  # Only unit tests
uv run pytest -m performance  # Only performance tests

# Run with coverage
uv run pytest --cov=src/psycopg_toolkit --cov-report=html
```

### Test Categories

The test suite is organized into three categories:

- **Unit tests**: Fast, isolated tests that don't require a database (in `tests/unit/`)
- **Integration tests**: Tests that require a real PostgreSQL database (in `tests/` root)
- **Performance tests**: Benchmarks and performance measurements (marked with `@pytest.mark.performance`)

Performance tests are excluded by default to keep the regular test runs fast. Use the `-m performance` flag to run them explicitly.

## Contributing

1. Fork the repository
2. Create a feature branch
3. Add tests for new features
4. Ensure all tests pass
5. Submit a pull request

## License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "psycopg-toolkit",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.11",
    "maintainer_email": null,
    "keywords": "async, database, jsonb, pgvector, pool, postgresql, psycopg, repository-pattern",
    "author": null,
    "author_email": "Ove Ranheim <oranheim@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/94/fb/cad3bdc43fde712cb3258554a100d62598cb1a9681205e150b4b16378706/psycopg_toolkit-0.3.1.tar.gz",
    "platform": null,
    "description": "# Psycopg Toolkit\n\n[![Build Status](https://github.com/descoped/psycopg-toolkit/actions/workflows/build-test.yml/badge.svg)](https://github.com/descoped/psycopg-toolkit/actions/workflows/build-test-native.yml)\n[![Coverage](https://codecov.io/gh/descoped/psycopg-toolkit/branch/master/graph/badge.svg)](https://codecov.io/gh/descoped/psycopg-toolkit)\n[![Python Version](https://img.shields.io/badge/python-3.11%2B-blue.svg)](https://www.python.org/downloads/)\n[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)\n[![Release](https://img.shields.io/github/v/release/descoped/psycopg-toolkit)](https://github.com/descoped/psycopg-toolkit/releases)\n\nA robust PostgreSQL database toolkit providing enterprise-grade connection pooling and database management capabilities for Python applications.\n\n## Features\n\n- Async-first design with connection pooling via `psycopg-pool`\n- Comprehensive transaction management with savepoint support\n- Type-safe repository pattern with Pydantic model validation\n- **JSONB support with automatic field detection and psycopg JSON adapters**\n- **Native pgvector support with automatic vector field detection**\n- PostgreSQL array field preservation (TEXT[], INTEGER[])\n- Automatic date/timestamp conversion for Pydantic models\n- SQL query builder with SQL injection protection\n- Database schema and test data lifecycle management\n- Automatic retry mechanism with exponential backoff\n- Granular exception hierarchy for error handling\n- Connection health monitoring and validation\n- Database initialization callback system\n- Statement timeout configuration\n- Fully typed with modern Python type hints\n\n## Installation\n\n```bash\npip install psycopg-toolkit\n```\n\n## Quick Start\n\n```python\nfrom psycopg_toolkit import Database, DatabaseSettings\nfrom uuid import uuid4\n\n# Configure database\nsettings = DatabaseSettings(\n    host=\"localhost\",\n    port=5432,\n    dbname=\"your_database\",\n    user=\"your_user\",\n    password=\"your_password\"\n)\n\nasync def main():\n    # Initialize database\n    db = Database(settings)\n    await db.init_db()\n    \n    # Get transaction manager\n    tm = await db.get_transaction_manager()\n    \n    # Execute in transaction\n    async with tm.transaction() as conn:\n        async with conn.cursor() as cur:\n            user_id = uuid4()\n            await cur.execute(\n                \"INSERT INTO users (id, email) VALUES (%s, %s)\",\n                (user_id, \"user@example.com\")\n            )\n    \n    # Clean up\n    await db.cleanup()\n```\n\n## Core Components\n\n### Database Management\n\n```python\n# Health check\nis_healthy = await db.check_pool_health()\n\n# Connection management\nasync with db.connection() as conn:\n    async with conn.cursor() as cur:\n        await cur.execute(\"SELECT version()\")\n```\n\n### Transaction Management\n\n```python\n# Basic transaction\nasync with tm.transaction() as conn:\n    # Operations automatically rolled back on error\n    pass\n\n# With savepoint\nasync with tm.transaction(savepoint=\"user_creation\") as conn:\n    # Nested transaction using savepoint\n    pass\n```\n\n### Repository Pattern\n\n```python\nfrom pydantic import BaseModel\nfrom psycopg_toolkit import BaseRepository\n\nclass User(BaseModel):\n    id: UUID\n    email: str\n\nclass UserRepository(BaseRepository[User]):\n    def __init__(self, conn: AsyncConnection):\n        super().__init__(\n            db_connection=conn,\n            table_name=\"users\",\n            model_class=User,\n            primary_key=\"id\"\n        )\n\n# Usage\nasync with tm.transaction() as conn:\n    repo = UserRepository(conn)\n    user = await repo.get_by_id(user_id)\n```\n\n### JSONB Support\n\n```python\nfrom typing import Dict, List, Any\nfrom pydantic import BaseModel\nfrom psycopg_toolkit import BaseRepository\n\nclass UserProfile(BaseModel):\n    id: int\n    name: str\n    # These fields are automatically detected as JSONB\n    metadata: Dict[str, Any]\n    preferences: Dict[str, str]\n    tags: List[str]\n\nclass UserRepository(BaseRepository[UserProfile, int]):\n    def __init__(self, conn):\n        super().__init__(\n            db_connection=conn,\n            table_name=\"user_profiles\",\n            model_class=UserProfile,\n            primary_key=\"id\"\n            # auto_detect_json=True by default\n        )\n\n# Usage - JSON fields handled automatically\nuser = UserProfile(\n    id=1,\n    name=\"John Doe\",\n    metadata={\"created_at\": \"2024-01-01\", \"source\": \"web\"},\n    preferences={\"theme\": \"dark\", \"language\": \"en\"},\n    tags=[\"premium\", \"beta_tester\"]\n)\n\n# JSONB fields automatically serialized/deserialized\ncreated_user = await repo.create(user)\nretrieved_user = await repo.get_by_id(1)\n```\n\n### PostgreSQL Arrays and Date Fields\n\n```python\nfrom typing import List\nfrom datetime import date\nfrom pydantic import BaseModel\nfrom psycopg_toolkit import BaseRepository\n\nclass User(BaseModel):\n    id: UUID\n    username: str\n    roles: List[str]          # PostgreSQL TEXT[] array\n    permissions: List[str]    # PostgreSQL TEXT[] array\n    metadata: Dict[str, Any]  # JSONB field\n    birthdate: str            # ISO date string (from DATE)\n    created_at: str           # ISO datetime string (from TIMESTAMP)\n    updated_at: str           # ISO datetime string (from TIMESTAMPTZ)\n    last_login: str | None    # Optional timestamp field\n\nclass UserRepository(BaseRepository[User, UUID]):\n    def __init__(self, conn):\n        super().__init__(\n            db_connection=conn,\n            table_name=\"users\",\n            model_class=User,\n            primary_key=\"id\",\n            # Preserve PostgreSQL arrays instead of JSONB\n            array_fields={\"roles\", \"permissions\"},\n            # Auto-convert ALL date/timestamp fields to/from strings\n            date_fields={\"birthdate\", \"created_at\", \"updated_at\", \"last_login\"}\n        )\n\n# PostgreSQL arrays are preserved, dates are auto-converted\nuser = User(\n    id=uuid4(),\n    username=\"john\",\n    roles=[\"admin\", \"user\"],      # Stored as TEXT[]\n    permissions=[\"read\", \"write\"], # Stored as TEXT[]\n    metadata={\"dept\": \"IT\"},       # Stored as JSONB\n    birthdate=\"1990-01-01\",           # Converts to/from PostgreSQL DATE\n    created_at=\"2024-01-01T12:00:00\", # Converts to/from TIMESTAMP\n    updated_at=\"2024-01-01T12:00:00\", # Converts to/from TIMESTAMPTZ\n    last_login=None                   # Nullable timestamp field\n)\n```\n\n### Schema Management\n\n```python\nfrom psycopg_toolkit.core.transaction import SchemaManager\n\nclass UserSchemaManager(SchemaManager[None]):\n    async def create_schema(self, conn: AsyncConnection) -> None:\n        await conn.execute(\"\"\"\n            CREATE TABLE IF NOT EXISTS users (\n                id UUID PRIMARY KEY,\n                email TEXT UNIQUE NOT NULL\n            )\n        \"\"\")\n\n    async def drop_schema(self, conn: AsyncConnection) -> None:\n        await conn.execute(\"DROP TABLE IF EXISTS users\")\n\n# Usage\nasync with tm.with_schema(UserSchemaManager()) as _:\n    # Schema available here\n    pass  # Automatically dropped after\n```\n\n## Error Handling\n\n```python\nfrom psycopg_toolkit import (\n    DatabaseConnectionError,\n    DatabasePoolError,\n    DatabaseNotAvailable,\n    RecordNotFoundError\n)\n\ntry:\n    async with tm.transaction() as conn:\n        repo = UserRepository(conn)\n        user = await repo.get_by_id(user_id)\nexcept DatabaseConnectionError as e:\n    print(f\"Connection error: {e.original_error}\")\nexcept RecordNotFoundError:\n    print(f\"User {user_id} not found\")\n```\n\n\n## Testing with Async Libraries\n\n### Understanding the pytest and Async Boundary\n\nWhen testing async libraries like `async-task-worker` or database libraries (e.g., `psycopg-toolkit`), you'll encounter a fundamental challenge: pytest's session-scoped fixtures are synchronous, while modern libraries are increasingly async. This section explains the problem and provides the recommended patterns for handling this boundary.\n\n#### The Core Problem\n\npytest's fixture system was designed before async became prevalent in Python:\n- **Session fixtures are synchronous**: They run once per test session and must be sync functions\n- **Modern libraries are async**: Database pools, task workers, and connections require `await`\n- **Resource efficiency matters**: You want to reuse expensive resources (containers, pools) across tests\n\nThis creates an awkward boundary where you need async resources in sync fixture setup.\n\n#### The Standard Solution: `asyncio.run()`\n\nThe accepted pattern in the Python testing community is using `asyncio.run()` in session fixtures. While it may feel inelegant, this is the recommended approach documented in pytest-asyncio's own examples.\n\n```python\nimport pytest\nimport asyncio\nfrom async_task_worker import AsyncTaskWorker\n\n@pytest.fixture(scope=\"session\")\ndef worker():\n    \"\"\"Session-scoped worker for test efficiency.\n    \n    This uses asyncio.run() to bridge the sync/async boundary.\n    This is the standard pattern for async resources in session fixtures.\n    \"\"\"\n    # Create the worker\n    worker = AsyncTaskWorker(max_workers=5)\n    \n    # Start it using asyncio.run() - the standard workaround\n    asyncio.run(worker.start())\n    \n    yield worker\n    \n    # Cleanup\n    asyncio.run(worker.stop())\n\n# Your async tests can then use the worker normally\n@pytest.mark.asyncio\nasync def test_task_execution(worker):\n    task_id = await worker.add_task(my_task, data)\n    result = await worker.get_task_future(task_id)\n    assert result == expected\n```\n\n### Complete Testing Pattern\n\nHere's a comprehensive example showing best practices for testing async-task-worker:\n\n```python\nimport pytest\nimport pytest_asyncio\nimport asyncio\nfrom datetime import datetime\nfrom async_task_worker import AsyncTaskWorker, task, TaskStatus\n\n# ============================================================\n# Session Fixtures (Synchronous with asyncio.run())\n# ============================================================\n\n@pytest.fixture(scope=\"session\")\ndef event_loop_policy():\n    \"\"\"Use a specific event loop policy for tests.\"\"\"\n    return asyncio.get_event_loop_policy()\n\n@pytest.fixture(scope=\"session\")\ndef worker():\n    \"\"\"Create a session-scoped worker instance.\n    \n    Note: We use asyncio.run() here because session fixtures\n    must be synchronous. This is the standard pattern.\n    \"\"\"\n    worker = AsyncTaskWorker(\n        max_workers=3,\n        task_timeout=10,\n        cache_enabled=True,\n        cache_ttl=60\n    )\n    \n    # Start the worker using asyncio.run()\n    asyncio.run(worker.start())\n    \n    yield worker\n    \n    # Cleanup\n    asyncio.run(worker.stop())\n\n# ============================================================\n# Function-Scoped Async Fixtures (Can use async/await)\n# ============================================================\n\n@pytest_asyncio.fixture\nasync def clean_worker(worker):\n    \"\"\"Provides a clean worker state for each test.\"\"\"\n    # Clear any existing tasks\n    tasks = worker.get_all_tasks()\n    for task in tasks:\n        if task.status in (\"pending\", \"running\"):\n            await worker.cancel_task(task.id)\n    \n    # Clear cache if enabled\n    if worker.cache:\n        await worker.clear_cache()\n    \n    yield worker\n    \n    # Post-test cleanup if needed\n    # ...\n\n# ============================================================\n# Test Task Definitions\n# ============================================================\n\n@task(\"test_computation\")\nasync def test_computation(value, delay=0.1, progress_callback=None):\n    \"\"\"A test task that simulates computation.\"\"\"\n    steps = 5\n    result = value\n    \n    for i in range(steps):\n        await asyncio.sleep(delay)\n        result = result * 2\n        \n        if progress_callback:\n            progress_callback((i + 1) / steps)\n    \n    return result\n\n@task(\"test_failing_task\")\nasync def test_failing_task(should_fail=True):\n    \"\"\"A task that can be configured to fail.\"\"\"\n    if should_fail:\n        raise ValueError(\"Task failed as expected\")\n    return \"success\"\n\n# ============================================================\n# Test Cases\n# ============================================================\n\n@pytest.mark.asyncio\nasync def test_basic_task_execution(clean_worker):\n    \"\"\"Test basic task execution flow.\"\"\"\n    # Add a task\n    task_id = await clean_worker.add_task(\n        test_computation,\n        value=2,\n        delay=0.01\n    )\n    \n    # Wait for completion\n    result = await clean_worker.get_task_future(task_id)\n    \n    # Verify result (2 * 2^5 = 64)\n    assert result == 64\n    \n    # Check task info\n    info = clean_worker.get_task_info(task_id)\n    assert info.status == TaskStatus.COMPLETED\n    assert info.progress == 1.0\n\n@pytest.mark.asyncio\nasync def test_task_cancellation(clean_worker):\n    \"\"\"Test task cancellation.\"\"\"\n    # Start a long-running task\n    task_id = await clean_worker.add_task(\n        test_computation,\n        value=1,\n        delay=1.0  # Long delay\n    )\n    \n    # Cancel it\n    cancelled = await clean_worker.cancel_task(task_id)\n    assert cancelled\n    \n    # Verify status\n    info = clean_worker.get_task_info(task_id)\n    assert info.status == TaskStatus.CANCELLED\n\n@pytest.mark.asyncio\nasync def test_cache_functionality(clean_worker):\n    \"\"\"Test caching of task results.\"\"\"\n    # First execution\n    task_id1 = await clean_worker.add_task(\n        test_computation,\n        value=3,\n        delay=0.01,\n        use_cache=True\n    )\n    result1 = await clean_worker.get_task_future(task_id1)\n    \n    # Second execution with same args (should hit cache)\n    start_time = datetime.now()\n    task_id2 = await clean_worker.add_task(\n        test_computation,\n        value=3,\n        delay=0.01,  # This delay won't happen due to cache\n        use_cache=True\n    )\n    result2 = await clean_worker.get_task_future(task_id2)\n    elapsed = (datetime.now() - start_time).total_seconds()\n    \n    # Results should match\n    assert result1 == result2\n    \n    # Second execution should be much faster (cache hit)\n    assert elapsed < 0.05  # Much less than the task delay\n\n@pytest.mark.asyncio\nasync def test_concurrent_tasks(clean_worker):\n    \"\"\"Test concurrent task execution.\"\"\"\n    # Add multiple tasks\n    task_ids = []\n    for i in range(5):\n        task_id = await clean_worker.add_task(\n            test_computation,\n            value=i,\n            delay=0.01\n        )\n        task_ids.append(task_id)\n    \n    # Wait for all to complete\n    futures = clean_worker.get_task_futures(task_ids)\n    results = await asyncio.gather(*futures)\n    \n    # Verify all completed\n    assert len(results) == 5\n    for i, result in enumerate(results):\n        assert result == i * (2 ** 5)\n\n# ============================================================\n# Testing Utilities\n# ============================================================\n\nclass TaskMonitor:\n    \"\"\"Utility class for monitoring task execution in tests.\"\"\"\n    \n    def __init__(self, worker: AsyncTaskWorker):\n        self.worker = worker\n        self.events = []\n    \n    async def monitor_task(self, task_id: str, timeout: float = 5.0):\n        \"\"\"Monitor a task until completion or timeout.\"\"\"\n        start_time = asyncio.get_event_loop().time()\n        \n        while asyncio.get_event_loop().time() - start_time < timeout:\n            info = self.worker.get_task_info(task_id)\n            self.events.append({\n                \"time\": asyncio.get_event_loop().time() - start_time,\n                \"status\": info.status,\n                \"progress\": info.progress\n            })\n            \n            if info.status not in (\"pending\", \"running\"):\n                return info\n            \n            await asyncio.sleep(0.1)\n        \n        raise TimeoutError(f\"Task {task_id} did not complete within {timeout}s\")\n\n@pytest.mark.asyncio\nasync def test_with_monitor(clean_worker):\n    \"\"\"Example of using the TaskMonitor utility.\"\"\"\n    monitor = TaskMonitor(clean_worker)\n    \n    task_id = await clean_worker.add_task(\n        test_computation,\n        value=5,\n        delay=0.1\n    )\n    \n    # Monitor the task\n    final_info = await monitor.monitor_task(task_id)\n    \n    # Check the monitoring captured progress\n    assert len(monitor.events) > 0\n    assert final_info.status == TaskStatus.COMPLETED\n    \n    # Verify progress increased over time\n    progresses = [e[\"progress\"] for e in monitor.events]\n    assert progresses[-1] == 1.0\n```\n\n### Testing with Database Libraries (e.g., psycopg-toolkit)\n\nThe same pattern applies when testing with async database libraries:\n\n```python\nimport pytest\nimport asyncio\nfrom psycopg_toolkit import Database\n\n@pytest.fixture(scope=\"session\")\ndef db():\n    \"\"\"Session-scoped database connection.\n    \n    Uses asyncio.run() to handle async operations in sync fixture.\n    This is the standard pattern for pytest with async resources.\n    \"\"\"\n    db = Database(\"postgresql://user:pass@localhost/test\")\n    \n    # Create the pool synchronously using asyncio.run()\n    asyncio.run(db.get_pool())\n    \n    yield db\n    \n    # Cleanup\n    asyncio.run(db.cleanup())\n\n@pytest.fixture(scope=\"function\")\nasync def db_connection(db):\n    \"\"\"Function-scoped connection from the session pool.\"\"\"\n    async with db.connection() as conn:\n        yield conn\n        # Automatic cleanup via context manager\n```\n\n### Key Takeaways\n\n1. **The `asyncio.run()` pattern is correct**: Don't feel bad about using it in session fixtures - it's the standard solution\n2. **Session vs Function Scope**: Use session fixtures for expensive resources (pools, workers) and function fixtures for cleanup\n3. **The awkwardness is pytest's limitation, not your design**: Your async library doesn't need sync wrappers\n4. **Document the pattern**: Help users understand why `asyncio.run()` appears in test fixtures\n\n### Additional Testing Resources\n\n- **pytest-asyncio**: The standard pytest plugin for async testing\n- **pytest-timeout**: Useful for preventing hanging async tests\n- **asyncio-test**: Utilities for testing asyncio code\n- **Testing async generators**: Use `async for` in tests or `aioitertools` for utilities\n\n### Common Pitfalls and Solutions\n\n#### Pitfall 1: Event Loop Conflicts\n```python\n# Wrong - Creates new event loop\ndef test_something():\n    loop = asyncio.new_event_loop()\n    loop.run_until_complete(my_async_func())\n\n# Right - Use pytest-asyncio\n@pytest.mark.asyncio\nasync def test_something():\n    await my_async_func()\n```\n\n#### Pitfall 2: Forgetting Cleanup\n```python\n# Wrong - No cleanup\n@pytest.fixture\ndef worker():\n    w = AsyncTaskWorker()\n    asyncio.run(w.start())\n    return w\n\n# Right - Proper cleanup\n@pytest.fixture\ndef worker():\n    w = AsyncTaskWorker()\n    asyncio.run(w.start())\n    yield w\n    asyncio.run(w.stop())\n```\n\n#### Pitfall 3: Mixing Sync and Async Incorrectly\n```python\n# Wrong - Can't await in sync function\ndef test_task(worker):\n    result = await worker.add_task(...)  # SyntaxError\n\n# Right - Mark as async test\n@pytest.mark.asyncio\nasync def test_task(worker):\n    result = await worker.add_task(...)\n```\n\nBy following these patterns, you can effectively test async-task-worker and similar async libraries while maintaining clean, efficient test suites.\n\n\n## Documentation\n\n- [Database Management](docs/database.md)\n- [Transaction Management](docs/transaction_manager.md)\n- [Base Repository](docs/base_repository.md)\n- [JSONB Support](docs/jsonb_support.md)\n- [PsycopgHelper](docs/psycopg_helper.md)\n\n## Running Tests\n\n```bash\n# Install dependencies\nuv sync --all-groups\n\n# Run all tests except performance tests (default)\nuv run pytest\n\n# Run only performance tests\nuv run pytest -m performance\n\n# Run all tests including performance\nuv run pytest -m \"\"\n\n# Run specific test categories\nuv run pytest tests/unit/  # Only unit tests\nuv run pytest -m performance  # Only performance tests\n\n# Run with coverage\nuv run pytest --cov=src/psycopg_toolkit --cov-report=html\n```\n\n### Test Categories\n\nThe test suite is organized into three categories:\n\n- **Unit tests**: Fast, isolated tests that don't require a database (in `tests/unit/`)\n- **Integration tests**: Tests that require a real PostgreSQL database (in `tests/` root)\n- **Performance tests**: Benchmarks and performance measurements (marked with `@pytest.mark.performance`)\n\nPerformance tests are excluded by default to keep the regular test runs fast. Use the `-m performance` flag to run them explicitly.\n\n## Contributing\n\n1. Fork the repository\n2. Create a feature branch\n3. Add tests for new features\n4. Ensure all tests pass\n5. Submit a pull request\n\n## License\n\nThis project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "A Python PostgreSQL database utility with connection pooling",
    "version": "0.3.1",
    "project_urls": {
        "Documentation": "https://github.com/descoped/psycopg-toolkit/tree/master/docs",
        "Homepage": "https://github.com/descoped/psycopg-toolkit",
        "Issues": "https://github.com/descoped/psycopg-toolkit/issues",
        "Repository": "https://github.com/descoped/psycopg-toolkit"
    },
    "split_keywords": [
        "async",
        " database",
        " jsonb",
        " pgvector",
        " pool",
        " postgresql",
        " psycopg",
        " repository-pattern"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "9d1d54cee1c704ad42f978b953649248635970df364647389a69eb2261100a3d",
                "md5": "6d256b7f28d0cae0a7bc07894ea9d2a3",
                "sha256": "221c0f6a6dc372049ca68adcb5a9dd1d67abcc5ad1414d6b1827abd5d4a0a525"
            },
            "downloads": -1,
            "filename": "psycopg_toolkit-0.3.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "6d256b7f28d0cae0a7bc07894ea9d2a3",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.11",
            "size": 28218,
            "upload_time": "2025-11-02T17:39:16",
            "upload_time_iso_8601": "2025-11-02T17:39:16.677976Z",
            "url": "https://files.pythonhosted.org/packages/9d/1d/54cee1c704ad42f978b953649248635970df364647389a69eb2261100a3d/psycopg_toolkit-0.3.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "94fbcad3bdc43fde712cb3258554a100d62598cb1a9681205e150b4b16378706",
                "md5": "c684d8795c22c7b0687b798b9408dd47",
                "sha256": "52801c7bfdb5175c01f800856229466e32c33eabe9c8c6669df77b306ed31132"
            },
            "downloads": -1,
            "filename": "psycopg_toolkit-0.3.1.tar.gz",
            "has_sig": false,
            "md5_digest": "c684d8795c22c7b0687b798b9408dd47",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.11",
            "size": 25746,
            "upload_time": "2025-11-02T17:39:18",
            "upload_time_iso_8601": "2025-11-02T17:39:18.190588Z",
            "url": "https://files.pythonhosted.org/packages/94/fb/cad3bdc43fde712cb3258554a100d62598cb1a9681205e150b4b16378706/psycopg_toolkit-0.3.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-11-02 17:39:18",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "descoped",
    "github_project": "psycopg-toolkit",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "psycopg-toolkit"
}
        
Elapsed time: 2.20244s