commondao


Namecommondao JSON
Version 1.3.1 PyPI version JSON
download
home_pageNone
SummaryMysql toolkit
upload_time2025-10-27 10:17:06
maintainerNone
docs_urlNone
authorNone
requires_python>=3.10
licenseApache-2.0
keywords mysql
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # CommonDAO

A powerful, type-safe, and Pydantic-integrated async MySQL toolkit for Python.

![Python](https://img.shields.io/badge/Python-3.10%2B-blue?style=for-the-badge&logo=python&logoColor=white)
![MySQL](https://img.shields.io/badge/MySQL-4479A1?style=for-the-badge&logo=mysql&logoColor=white)
![Async](https://img.shields.io/badge/Async-FF5A00?style=for-the-badge&logo=python&logoColor=white)
![Type Safe](https://img.shields.io/badge/Type_Safe-3178C6?style=for-the-badge&logoColor=white)

CommonDAO is a lightweight, type-safe async MySQL toolkit designed to simplify database operations with a clean, intuitive API. It integrates seamlessly with Pydantic for robust data validation while providing a comprehensive set of tools for common database tasks.

## โœจ Features

- **Async/Await Support**: Built on aiomysql for non-blocking database operations
- **Type Safety**: Strong typing with Python's type hints and runtime type checking
- **Pydantic Integration**: Seamless validation and transformation of database records to Pydantic models
- **SQL Injection Protection**: Parameterized queries for secure database access
- **Comprehensive CRUD Operations**: Simple methods for common database tasks
- **Raw SQL Support**: Full control when you need it with parameterized raw SQL
- **Connection Pooling**: Efficient database connection management
- **Minimal Boilerplate**: Write less code while maintaining readability and control

## ๐Ÿš€ Installation

```bash
pip install commondao
```

## ๐Ÿ” Quick Start

```python
import asyncio
from commondao import connect
from commondao.annotation import TableId
from pydantic import BaseModel
from typing import Annotated

# Define your Pydantic models with TableId annotation
class User(BaseModel):
    id: Annotated[int, TableId('users')]  # First field with TableId is the primary key
    name: str
    email: str

class UserInsert(BaseModel):
    id: Annotated[Optional[int], TableId('users')] = None  # Optional for auto-increment
    name: str
    email: str

async def main():
    # Connect to database
    config = {
        'host': 'localhost',
        'port': 3306,
        'user': 'root',
        'password': 'password',
        'db': 'testdb',
        'autocommit': True,
    }

    async with connect(**config) as db:
        # Insert a new user using Pydantic model
        user = UserInsert(name='John Doe', email='john@example.com')
        await db.insert(user)

        # Query the user by key with Pydantic model validation
        result = await db.get_by_key(User, key={'email': 'john@example.com'})
        if result:
            print(f"User: {result.name} ({result.email})")  # Output => User: John Doe (john@example.com)

if __name__ == "__main__":
    asyncio.run(main())
```

## ๐Ÿ“Š Core Operations

### Connection

```python
from commondao import connect

async with connect(
    host='localhost', 
    port=3306, 
    user='root', 
    password='password', 
    db='testdb'
) as db:
    # Your database operations here
    pass
```

### Insert Data (with Pydantic Models)

```python
from pydantic import BaseModel
from commondao.annotation import TableId
from typing import Annotated, Optional

class UserInsert(BaseModel):
    id: Annotated[Optional[int], TableId('users')] = None  # Auto-increment primary key
    name: str
    email: str

# Insert using Pydantic model (id will be auto-generated)
user = UserInsert(name='John', email='john@example.com')
await db.insert(user)
print(f"New user id: {db.lastrowid()}")  # Get the auto-generated id

# Insert with ignore option (skips duplicate key errors)
user2 = UserInsert(name='Jane', email='jane@example.com')
await db.insert(user2, ignore=True)

# Insert with custom field handling
# exclude_unset=False: includes all fields, even those not explicitly set
# exclude_none=True: excludes fields with None values
user3 = UserInsert(name='Bob', email='bob@example.com')
await db.insert(user3, exclude_unset=False, exclude_none=True)
```

### Update Data (with Pydantic Models)

```python
class UserUpdate(BaseModel):
    id: Optional[int] = None
    name: Optional[str] = None
    email: Optional[str] = None

# Update by primary key (id must be provided)
user = UserUpdate(id=1, name='John Smith', email='john.smith@example.com')
await db.update_by_id(user)

# Update by custom key (partial update - only specified fields)
user_update = UserUpdate(name='Jane Doe', email='jane.doe@example.com')
await db.update_by_key(user_update, key={'email': 'john.smith@example.com'})

# Update with field handling options
# exclude_unset=True (default): only update explicitly set fields
# exclude_none=False (default): include None values (set column to NULL)
user = UserUpdate(id=2, name='Alice', email=None)
await db.update_by_id(user, exclude_unset=True, exclude_none=False)  # Sets email to NULL

# Update excluding None values
user = UserUpdate(id=3, name='Bob', email=None)
await db.update_by_id(user, exclude_unset=True, exclude_none=True)  # Won't update email column
```

### Delete Data

```python
# Delete by primary key
await db.delete_by_id(User, 1)

# Delete by custom key
await db.delete_by_key(User, key={'email': 'john@example.com'})
```

### Query Data

```python
# Get a single row by primary key
user = await db.get_by_id(User, 1)

# Get a row by primary key or raise NotFoundError if not found
user = await db.get_by_id_or_fail(User, 1)

# Get by custom key
user = await db.get_by_key(User, key={'email': 'john@example.com'})

# Get by key or raise NotFoundError if not found
user = await db.get_by_key_or_fail(User, key={'email': 'john@example.com'})

# Use with Pydantic models
from pydantic import BaseModel
from commondao.annotation import RawSql
from typing import Annotated

class UserModel(BaseModel):
    id: int
    name: str
    email: str
    full_name: Annotated[str, RawSql("CONCAT(first_name, ' ', last_name)")]

# Query with model validation
user = await db.select_one(
    "select * from users where id = :id",
    UserModel,
    {"id": 1}
)

# Query multiple rows
users = await db.select_all(
    "select * from users where status = :status",
    UserModel,
    {"status": "active"}
)

# Paginated queries
from commondao import Paged

result: Paged[UserModel] = await db.select_paged(
    "select * from users where status = :status",
    UserModel,
    {"status": "active"},
    size=10,
    offset=0
)

print(f"Total users: {result.total}")
print(f"Current page: {len(result.items)} users")
```

### Raw SQL Execution

CommonDAO supports parameterized SQL queries using named parameters with the `:parameter_name` format for secure and readable queries.

#### execute_query - For SELECT operations

```python
# Simple query without parameters
rows = await db.execute_query("SELECT * FROM users")

# Query with single parameter
user_rows = await db.execute_query(
    "SELECT * FROM users WHERE id = :user_id",
    {"user_id": 123}
)

# Query with multiple parameters
filtered_rows = await db.execute_query(
    "SELECT * FROM users WHERE name = :name AND age > :min_age",
    {"name": "John", "min_age": 18}
)

# Query with IN clause (using list parameter)
users_in_group = await db.execute_query(
    "SELECT * FROM users WHERE id IN :user_ids",
    {"user_ids": [1, 2, 3, 4]}
)

# Complex query with date filtering
recent_users = await db.execute_query(
    "SELECT * FROM users WHERE created_at > :date AND status = :status",
    {"date": "2023-01-01", "status": "active"}
)
```

#### execute_mutation - For INSERT, UPDATE, DELETE operations

```python
# INSERT statement
affected = await db.execute_mutation(
    "INSERT INTO users (name, email, age) VALUES (:name, :email, :age)",
    {"name": "John", "email": "john@example.com", "age": 25}
)
print(f"Inserted {affected} rows")

# UPDATE statement
affected = await db.execute_mutation(
    "UPDATE users SET email = :new_email WHERE id = :user_id",
    {"new_email": "newemail@example.com", "user_id": 123}
)
print(f"Updated {affected} rows")

# DELETE statement
affected = await db.execute_mutation(
    "DELETE FROM users WHERE age < :min_age",
    {"min_age": 18}
)
print(f"Deleted {affected} rows")

# Multiple parameter UPDATE
affected = await db.execute_mutation(
    "UPDATE users SET name = :name, age = :age WHERE id = :id",
    {"name": "Jane", "age": 30, "id": 456}
)

# Bulk operations with loop
user_list = [
    {"name": "Alice", "email": "alice@example.com"},
    {"name": "Bob", "email": "bob@example.com"},
]

for user_data in user_list:
    affected = await db.execute_mutation(
        "INSERT INTO users (name, email) VALUES (:name, :email)",
        user_data
    )
```

#### Parameter Format Rules

- **Named Parameters**: Use `:parameter_name` format in SQL
- **Dictionary Keys**: Match parameter names without the colon prefix
- **Supported Types**: str, int, float, bytes, datetime, date, time, timedelta, Decimal
- **Lists/Tuples**: Supported for IN clauses in queries
- **None Values**: Properly handled as SQL NULL

```python
# Example with various data types
from datetime import datetime, date
from decimal import Decimal

result = await db.execute_query(
    """
    SELECT * FROM orders
    WHERE customer_id = :customer_id
    AND total >= :min_total
    AND created_date = :order_date
    AND status IN :valid_statuses
    """,
    {
        "customer_id": 123,
        "min_total": Decimal("99.99"),
        "order_date": date(2023, 12, 25),
        "valid_statuses": ["pending", "confirmed", "shipped"]
    }
)
```

### Transactions

```python
from commondao.annotation import TableId
from typing import Annotated, Optional

class OrderInsert(BaseModel):
    id: Annotated[Optional[int], TableId('orders')] = None
    customer_id: int
    total: float

class OrderItemInsert(BaseModel):
    id: Annotated[Optional[int], TableId('order_items')] = None
    order_id: int
    product_id: int

async with connect(host='localhost', user='root', db='testdb') as db:
    # Start transaction (autocommit=False by default)
    order = OrderInsert(customer_id=1, total=99.99)
    await db.insert(order)
    order_id = db.lastrowid()  # Get the auto-generated order id

    item = OrderItemInsert(order_id=order_id, product_id=42)
    await db.insert(item)

    # Commit the transaction
    await db.commit()
```

## ๐Ÿ” Type Safety

CommonDAO provides robust type checking to help prevent errors:

```python
from commondao import is_row_dict, is_query_dict
from typing import Dict, Any
from datetime import datetime

# Valid row dict (for updates/inserts)
valid_data: Dict[str, Any] = {
    "id": 1,
    "name": "John",
    "created_at": datetime.now(),
}

# Check type safety
assert is_row_dict(valid_data)  # Type check passes

# Valid query dict (can contain lists/tuples for IN clauses)
valid_query: Dict[str, Any] = {
    "id": 1,
    "status": "active",
    "tags": ["admin", "user"],  # Lists are valid for query dicts
    "codes": (200, 201)  # Tuples are also valid
}

assert is_query_dict(valid_query)  # Type check passes

# Invalid row dict (contains a list)
invalid_data: Dict[str, Any] = {
    "id": 1,
    "tags": ["admin", "user"]  # Lists are not valid row values
}

assert not is_row_dict(invalid_data)  # Type check fails
```

## ๐Ÿ“– API Documentation

For complete API documentation, please see the docstrings in the code or visit our documentation website.

## ๐Ÿงช Testing

CommonDAO comes with comprehensive tests to ensure reliability:

```bash
# Install test dependencies
pip install -e ".[test]"

# Run tests
pytest tests
```

## ๐Ÿค Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

## ๐Ÿ“„ License

This project is licensed under the Apache License 2.0.

# CommonDAO API Reference

This document provides comprehensive API reference for CommonDAO, a powerful async MySQL toolkit with Pydantic integration.

## Table of Contents

- [Connection Management](#connection-management)
- [Core CRUD Operations](#core-crud-operations)
- [Query Methods](#query-methods)
- [Raw SQL Execution](#raw-sql-execution)
- [Transaction Management](#transaction-management)
- [Type Safety Utilities](#type-safety-utilities)
- [Annotations](#annotations)
- [Error Classes](#error-classes)
- [Data Types](#data-types)

## Connection Management

### `connect(**config) -> AsyncContextManager[Commondao]`

Creates an async database connection context manager.

**Parameters:**
- `host` (str): Database host
- `port` (int): Database port (default: 3306)
- `user` (str): Database username
- `password` (str): Database password
- `db` (str): Database name
- `autocommit` (bool): Enable autocommit mode (default: True)
- Additional aiomysql connection parameters

**Returns:** AsyncContextManager yielding a Commondao instance

**Example:**
```python
async with connect(host='localhost', user='root', password='pwd', db='testdb') as db:
    # Database operations here
    pass
```

---

## Core CRUD Operations

### `insert(entity: BaseModel, *, ignore: bool = False, exclude_unset: bool = True, exclude_none: bool = False) -> int`

Insert a Pydantic model instance into the database.

**Parameters:**
- `entity` (BaseModel): Pydantic model instance to insert
- `ignore` (bool): Use INSERT IGNORE to skip duplicate key errors
- `exclude_unset` (bool): If True (default), excludes fields not explicitly set. Allows database defaults for unset fields
- `exclude_none` (bool): If False (default), includes None values. If True, excludes None values from INSERT

**Returns:** int - Number of affected rows (1 on success, 0 if ignored)

**Example:**
```python
user = UserInsert(name='John', email='john@example.com')
affected_rows = await db.insert(user)
```

### `update_by_id(entity: BaseModel, *, exclude_unset: bool = True, exclude_none: bool = False) -> int`

Update a record by its primary key using a Pydantic model.

**Parameters:**
- `entity` (BaseModel): Pydantic model with primary key value set
- `exclude_unset` (bool): If True (default), only updates explicitly set fields. Enables partial updates
- `exclude_none` (bool): If False (default), includes None values (sets to NULL). If True, excludes None values

**Returns:** int - Number of affected rows

**Raises:**
- `EmptyPrimaryKeyError`: If primary key is None or empty

**Example:**
```python
user = UserUpdate(id=1, name='John Updated', email='john.new@example.com')
affected_rows = await db.update_by_id(user)
```

### `update_by_key(entity: BaseModel, *, key: QueryDict, exclude_unset: bool = True, exclude_none: bool = False) -> int`

Update records matching the specified key conditions.

**Parameters:**
- `entity` (BaseModel): Pydantic model with update values
- `key` (QueryDict): Dictionary of key-value pairs for WHERE conditions
- `exclude_unset` (bool): If True (default), only updates explicitly set fields. Enables partial updates
- `exclude_none` (bool): If False (default), includes None values (sets to NULL). If True, excludes None values

**Returns:** int - Number of affected rows

**Example:**
```python
user_update = UserUpdate(name='Jane', email='jane@example.com')
affected_rows = await db.update_by_key(user_update, key={'id': 1})
```

### `delete_by_id(entity_class: Type[BaseModel], entity_id: Union[int, str]) -> int`

Delete a record by its primary key value.

**Parameters:**
- `entity_class` (Type[BaseModel]): Pydantic model class
- `entity_id` (Union[int, str]): The primary key value

**Returns:** int - Number of affected rows

**Raises:**
- `AssertionError`: If entity_id is None

**Example:**
```python
affected_rows = await db.delete_by_id(User, 1)
```
### `delete_by_key(entity_class: Type[BaseModel], *, key: QueryDict) -> int`

Delete records matching the specified key conditions.

**Parameters:**
- `entity_class` (Type[BaseModel]): Pydantic model class
- `key` (QueryDict): Dictionary of key-value pairs for WHERE conditions

**Returns:** int - Number of affected rows

**Example:**
```python
affected_rows = await db.delete_by_key(User, key={'id': 1})
```

---

## Query Methods

### `get_by_id(entity_class: Type[M], entity_id: Union[int, str]) -> Optional[M]`

Get a single record by its primary key value.

**Parameters:**
- `entity_class` (Type[M]): Pydantic model class
- `entity_id` (Union[int, str]): The primary key value

**Returns:** Optional[M] - Model instance or None if not found

**Raises:**
- `AssertionError`: If entity_id is None

**Example:**
```python
user = await db.get_by_id(User, 1)
if user:
    print(f"Found user: {user.name}")
```

### `get_by_id_or_fail(entity_class: Type[M], entity_id: Union[int, str]) -> M`

Get a single record by its primary key value or raise an error if not found.

**Parameters:**
- `entity_class` (Type[M]): Pydantic model class
- `entity_id` (Union[int, str]): The primary key value

**Returns:** M - Model instance

**Raises:**
- `NotFoundError`: If no record matches the primary key
- `AssertionError`: If entity_id is None

**Example:**
```python
try:
    user = await db.get_by_id_or_fail(User, 1)
    print(f"User: {user.name}")
except NotFoundError:
    print("User not found")
```

### `get_by_key(entity_class: Type[M], *, key: QueryDict) -> Optional[M]`

Get a single record matching the specified key conditions.

**Parameters:**
- `entity_class` (Type[M]): Pydantic model class
- `key` (QueryDict): Dictionary of key-value pairs for WHERE conditions

**Returns:** Optional[M] - Model instance or None if not found

**Example:**
```python
user = await db.get_by_key(User, key={'email': 'john@example.com'})
```

### `get_by_key_or_fail(entity_class: Type[M], *, key: QueryDict) -> M`

Get a single record matching the key conditions or raise an error if not found.

**Parameters:**
- `entity_class` (Type[M]): Pydantic model class
- `key` (QueryDict): Dictionary of key-value pairs for WHERE conditions

**Returns:** M - Model instance

**Raises:**
- `NotFoundError`: If no record matches the conditions

**Example:**
```python
user = await db.get_by_key_or_fail(User, key={'email': 'john@example.com'})
```

### `select_one(sql: str, select: Type[M], data: QueryDict = {}) -> Optional[M]`

Execute a SELECT query and return the first row as a validated model instance.

**Parameters:**
- `sql` (str): SQL query starting with 'select * from'
- `select` (Type[M]): Pydantic model class for result validation
- `data` (QueryDict): Parameters for the query

**Returns:** Optional[M] - Model instance or None if no results

**Example:**
```python
user = await db.select_one(
    "select * from users where age > :min_age",
    User,
    {"min_age": 18}
)
```

### `select_one_or_fail(sql: str, select: Type[M], data: QueryDict = {}) -> M`

Execute a SELECT query and return the first row or raise an error if not found.

**Parameters:**
- `sql` (str): SQL query starting with 'select * from'
- `select` (Type[M]): Pydantic model class for result validation
- `data` (QueryDict): Parameters for the query

**Returns:** M - Model instance

**Raises:**
- `NotFoundError`: If no results found

**Example:**
```python
user = await db.select_one_or_fail(
    "select * from users where email = :email",
    User,
    {"email": "john@example.com"}
)
```

### `select_all(sql: str, select: Type[M], data: QueryDict = {}) -> list[M]`

Execute a SELECT query and return all matching rows as validated model instances.

**Parameters:**
- `sql` (str): SQL query starting with 'select * from'
- `select` (Type[M]): Pydantic model class for result validation
- `data` (QueryDict): Parameters for the query

**Returns:** list[M] - List of model instances

**Example:**
```python
active_users = await db.select_all(
    "select * from users where status = :status",
    User,
    {"status": "active"}
)
```

### `select_paged(sql: str, select: Type[M], data: QueryDict = {}, *, size: int, offset: int = 0) -> Paged[M]`

Execute a paginated SELECT query with total count.

**Parameters:**
- `sql` (str): SQL query starting with 'select * from'
- `select` (Type[M]): Pydantic model class for result validation
- `data` (QueryDict): Parameters for the query
- `size` (int): Number of items per page
- `offset` (int): Number of items to skip

**Returns:** Paged[M] - Paginated result with items and total count

**Example:**
```python
result = await db.select_paged(
    "select * from users where status = :status",
    User,
    {"status": "active"},
    size=10,
    offset=20
)
print(f"Total: {result.total}, Page items: {len(result.items)}")
```

---

## Raw SQL Execution

### `execute_query(sql: str, data: Mapping[str, Any] = {}) -> list`

Execute a parameterized SQL query and return all results.

**Parameters:**
- `sql` (str): SQL query with named parameter placeholders (:param_name)
- `data` (Mapping[str, Any]): Dictionary mapping parameter names to values

**Returns:** list - List of result rows (dictionaries)

**Supported Parameter Types:**
- str, int, float, bytes, datetime, date, time, timedelta, Decimal
- Lists/tuples (for IN clauses)
- None (converted to SQL NULL)

**Examples:**
```python
# Simple query
rows = await db.execute_query("SELECT * FROM users")

# Parameterized query
rows = await db.execute_query(
    "SELECT * FROM users WHERE age > :min_age AND status = :status",
    {"min_age": 18, "status": "active"}
)

# IN clause with list
rows = await db.execute_query(
    "SELECT * FROM users WHERE id IN :user_ids",
    {"user_ids": [1, 2, 3, 4]}
)
```

### `execute_mutation(sql: str, data: Mapping[str, Any] = {}) -> int`

Execute a parameterized SQL mutation (INSERT, UPDATE, DELETE) and return affected row count.

**Parameters:**
- `sql` (str): SQL mutation statement with named parameter placeholders (:param_name)
- `data` (Mapping[str, Any]): Dictionary mapping parameter names to values

**Returns:** int - Number of affected rows

**Examples:**
```python
# INSERT
affected = await db.execute_mutation(
    "INSERT INTO users (name, email, age) VALUES (:name, :email, :age)",
    {"name": "John", "email": "john@example.com", "age": 25}
)

# UPDATE
affected = await db.execute_mutation(
    "UPDATE users SET email = :email WHERE id = :id",
    {"email": "newemail@example.com", "id": 123}
)

# DELETE
affected = await db.execute_mutation(
    "DELETE FROM users WHERE age < :min_age",
    {"min_age": 18}
)
```

---

## Transaction Management

### `commit() -> None`

Commit the current transaction.

**Example:**
```python
async with connect(host='localhost', autocommit=False) as db:
    await db.insert(user)
    await db.commit()  # Explicitly commit
```

### `rollback() -> None`

Rollback the current transaction.

**Example:**
```python
try:
    await db.insert(user)
    await db.insert(order)
    await db.commit()
except Exception:
    await db.rollback()
```

### `lastrowid() -> int`

Get the auto-generated ID of the last inserted row.

**Returns:** int - The last inserted row ID

**Example:**
```python
await db.insert(user)
user_id = db.lastrowid()
print(f"New user ID: {user_id}")
```

---

## Type Safety Utilities

### `is_row_dict(data: Mapping) -> TypeGuard[RowDict]`

Check if a mapping is valid for database row operations (INSERT/UPDATE).

**Parameters:**
- `data` (Mapping): The mapping to check

**Returns:** bool - True if valid for row operations

**Valid Types:** str, int, float, bytes, datetime, date, time, timedelta, Decimal, None

**Example:**
```python
from commondao import is_row_dict

data = {"id": 1, "name": "John", "age": 25}
assert is_row_dict(data)
# Now TypeScript knows data is a valid RowDict
```

### `is_query_dict(data: Mapping) -> TypeGuard[QueryDict]`

Check if a mapping is valid for query operations (WHERE clauses).

**Parameters:**
- `data` (Mapping): The mapping to check

**Returns:** bool - True if valid for query operations

**Valid Types:** All RowDict types plus lists and tuples (for IN clauses)

**Example:**
```python
from commondao import is_query_dict

query_data = {"id": 1, "status": "active", "tags": ["admin", "user"]}
assert is_query_dict(query_data)
# Now TypeScript knows query_data is a valid QueryDict
```

---

## Annotations

### `TableId(table_name: str)`

Annotation to mark a field as the primary key and specify the table name.

**Parameters:**
- `table_name` (str): Name of the database table

**Usage:**
```python
from commondao.annotation import TableId
from typing import Annotated, Optional
from pydantic import BaseModel

class User(BaseModel):
    id: Annotated[Optional[int], TableId('users')] = None
    name: str
    email: str
```

### `RawSql(expression: str)`

Annotation to include raw SQL expressions in SELECT queries.

**Parameters:**
- `expression` (str): SQL expression to include

**Usage:**
```python
from commondao.annotation import RawSql
from typing import Annotated

class UserWithFullName(BaseModel):
    id: int
    first_name: str
    last_name: str
    full_name: Annotated[str, RawSql("CONCAT(first_name, ' ', last_name)")]
```

---

## Error Classes

### `NotFoundError(ValueError)`

Raised when a record is not found in operations that expect a result.

**Example:**
```python
try:
    user = await db.get_by_id_or_fail(User, 999)
except NotFoundError as e:
    print(f"User not found: {e}")
```

### `EmptyPrimaryKeyError(ValueError)`

Raised when attempting operations with empty or None primary key values.

**Example:**
```python
try:
    user = UserUpdate(id=None, name="John")
    await db.update_by_id(user)
except EmptyPrimaryKeyError as e:
    print(f"Primary key error: {e}")
```

### `NotTableError(ValueError)`

Raised when a Pydantic model doesn't have proper table annotation.

### `MissingParamError(ValueError)`

Raised when required parameters are missing from SQL queries.

### `TooManyResultError(ValueError)`

Raised when operations expecting single results return multiple rows.

---

## Data Types

### `RowDict`

Type alias for mappings valid in database row operations.

```python
RowDict = Mapping[str, Union[str, int, float, bytes, datetime, date, time, timedelta, Decimal, None]]
```

### `QueryDict`

Type alias for mappings valid in query operations (extends RowDict with list/tuple support).

```python
QueryDict = Mapping[str, Union[RowValueType, list, tuple]]
```

### `Paged[T]`

Generic container for paginated query results.

**Attributes:**
- `items` (list[T]): List of result items
- `total` (int): Total count of all matching records

**Example:**
```python
result: Paged[User] = await db.select_paged(
    "select * from users",
    User,
    size=10
)
print(f"Page has {len(result.items)} items out of {result.total} total")
```

---

## Type Safety with TypeGuard

CommonDAO provides TypeGuard functions `is_row_dict()` and `is_query_dict()` for runtime type checking. **Important**: These functions should only be used with `assert` statements for proper type narrowing.

### Correct Usage Pattern

```python
from commondao import is_row_dict, is_query_dict

# โœ… Correct: Use with assert
def process_database_row(data: dict):
    assert is_row_dict(data)
    # Type checker now knows data is RowDict
    # Safe to use for database operations
    user = UserInsert(**data)
    await db.insert(user)

def process_query_parameters(params: dict):
    assert is_query_dict(params)
    # Type checker now knows params is QueryDict
    # Safe to use in queries
    result = await db.execute_query(
        "SELECT * FROM users WHERE status = :status",
        params
    )

# โœ… Correct: In test validation
async def test_query_result():
    rows = await db.execute_query("SELECT * FROM users")
    assert len(rows) > 0
    row = rows[0]
    assert is_row_dict(row)  # Validates row format
    # Now safely access row data
    user_id = row['id']
```

### What NOT to do

```python
# โŒ Wrong: Don't use in if statements
if is_row_dict(data):
    # Type narrowing won't work properly
    pass

# โŒ Wrong: Don't use in boolean expressions
valid = is_row_dict(data) and data['id'] > 0

# โŒ Wrong: Don't use with or/and logic
assert is_row_dict(data) or True  # Defeats the purpose
```

### Why Use Assert

TypeGuard functions with `assert` provide both runtime validation and compile-time type narrowing:

```python
def example_function(unknown_data: dict):
    # Before assert: unknown_data is just dict
    assert is_row_dict(unknown_data)
    # After assert: type checker knows unknown_data is RowDict

    # This will have proper type hints and validation
    await db.execute_mutation(
        "INSERT INTO users (name, email) VALUES (:name, :email)",
        unknown_data  # Type checker knows this is safe
    )
```

## Best Practices

### Entity Class Naming Conventions

When designing Pydantic models for database operations, follow these naming conventions to improve code clarity and maintainability:

- **Naming Convention**: Name entity classes used for `dao.insert()` with an `Insert` suffix; name entity classes used for `dao.update()` with an `Update` suffix; keep query entity class names unchanged.

- **Field Optionality Rules**:
  - For query entity classes: If a field's DDL is `NOT NULL`, the field should not be optional
  - For insert entity classes: If a field's DDL is `nullable` or has a default value, the field can be optional
  - For update entity classes: All fields should be optional

- **Field Inclusion**:
  - Insert entity classes should only include fields that may need to be inserted
  - Update entity classes should only include fields that may need to be modified

- **TableId Annotation**: Entity classes used for insert/update operations must always include the `TableId` annotation, even if the primary key field is optional

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "commondao",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.10",
    "maintainer_email": null,
    "keywords": "mysql",
    "author": null,
    "author_email": "qorzj <goodhorsezxj@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/4a/da/50d0bd6e5b33efde87cd77ee8414537debe079563da10c2a73a982389cbb/commondao-1.3.1.tar.gz",
    "platform": null,
    "description": "# CommonDAO\n\nA powerful, type-safe, and Pydantic-integrated async MySQL toolkit for Python.\n\n![Python](https://img.shields.io/badge/Python-3.10%2B-blue?style=for-the-badge&logo=python&logoColor=white)\n![MySQL](https://img.shields.io/badge/MySQL-4479A1?style=for-the-badge&logo=mysql&logoColor=white)\n![Async](https://img.shields.io/badge/Async-FF5A00?style=for-the-badge&logo=python&logoColor=white)\n![Type Safe](https://img.shields.io/badge/Type_Safe-3178C6?style=for-the-badge&logoColor=white)\n\nCommonDAO is a lightweight, type-safe async MySQL toolkit designed to simplify database operations with a clean, intuitive API. It integrates seamlessly with Pydantic for robust data validation while providing a comprehensive set of tools for common database tasks.\n\n## \u2728 Features\n\n- **Async/Await Support**: Built on aiomysql for non-blocking database operations\n- **Type Safety**: Strong typing with Python's type hints and runtime type checking\n- **Pydantic Integration**: Seamless validation and transformation of database records to Pydantic models\n- **SQL Injection Protection**: Parameterized queries for secure database access\n- **Comprehensive CRUD Operations**: Simple methods for common database tasks\n- **Raw SQL Support**: Full control when you need it with parameterized raw SQL\n- **Connection Pooling**: Efficient database connection management\n- **Minimal Boilerplate**: Write less code while maintaining readability and control\n\n## \ud83d\ude80 Installation\n\n```bash\npip install commondao\n```\n\n## \ud83d\udd0d Quick Start\n\n```python\nimport asyncio\nfrom commondao import connect\nfrom commondao.annotation import TableId\nfrom pydantic import BaseModel\nfrom typing import Annotated\n\n# Define your Pydantic models with TableId annotation\nclass User(BaseModel):\n    id: Annotated[int, TableId('users')]  # First field with TableId is the primary key\n    name: str\n    email: str\n\nclass UserInsert(BaseModel):\n    id: Annotated[Optional[int], TableId('users')] = None  # Optional for auto-increment\n    name: str\n    email: str\n\nasync def main():\n    # Connect to database\n    config = {\n        'host': 'localhost',\n        'port': 3306,\n        'user': 'root',\n        'password': 'password',\n        'db': 'testdb',\n        'autocommit': True,\n    }\n\n    async with connect(**config) as db:\n        # Insert a new user using Pydantic model\n        user = UserInsert(name='John Doe', email='john@example.com')\n        await db.insert(user)\n\n        # Query the user by key with Pydantic model validation\n        result = await db.get_by_key(User, key={'email': 'john@example.com'})\n        if result:\n            print(f\"User: {result.name} ({result.email})\")  # Output => User: John Doe (john@example.com)\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n```\n\n## \ud83d\udcca Core Operations\n\n### Connection\n\n```python\nfrom commondao import connect\n\nasync with connect(\n    host='localhost', \n    port=3306, \n    user='root', \n    password='password', \n    db='testdb'\n) as db:\n    # Your database operations here\n    pass\n```\n\n### Insert Data (with Pydantic Models)\n\n```python\nfrom pydantic import BaseModel\nfrom commondao.annotation import TableId\nfrom typing import Annotated, Optional\n\nclass UserInsert(BaseModel):\n    id: Annotated[Optional[int], TableId('users')] = None  # Auto-increment primary key\n    name: str\n    email: str\n\n# Insert using Pydantic model (id will be auto-generated)\nuser = UserInsert(name='John', email='john@example.com')\nawait db.insert(user)\nprint(f\"New user id: {db.lastrowid()}\")  # Get the auto-generated id\n\n# Insert with ignore option (skips duplicate key errors)\nuser2 = UserInsert(name='Jane', email='jane@example.com')\nawait db.insert(user2, ignore=True)\n\n# Insert with custom field handling\n# exclude_unset=False: includes all fields, even those not explicitly set\n# exclude_none=True: excludes fields with None values\nuser3 = UserInsert(name='Bob', email='bob@example.com')\nawait db.insert(user3, exclude_unset=False, exclude_none=True)\n```\n\n### Update Data (with Pydantic Models)\n\n```python\nclass UserUpdate(BaseModel):\n    id: Optional[int] = None\n    name: Optional[str] = None\n    email: Optional[str] = None\n\n# Update by primary key (id must be provided)\nuser = UserUpdate(id=1, name='John Smith', email='john.smith@example.com')\nawait db.update_by_id(user)\n\n# Update by custom key (partial update - only specified fields)\nuser_update = UserUpdate(name='Jane Doe', email='jane.doe@example.com')\nawait db.update_by_key(user_update, key={'email': 'john.smith@example.com'})\n\n# Update with field handling options\n# exclude_unset=True (default): only update explicitly set fields\n# exclude_none=False (default): include None values (set column to NULL)\nuser = UserUpdate(id=2, name='Alice', email=None)\nawait db.update_by_id(user, exclude_unset=True, exclude_none=False)  # Sets email to NULL\n\n# Update excluding None values\nuser = UserUpdate(id=3, name='Bob', email=None)\nawait db.update_by_id(user, exclude_unset=True, exclude_none=True)  # Won't update email column\n```\n\n### Delete Data\n\n```python\n# Delete by primary key\nawait db.delete_by_id(User, 1)\n\n# Delete by custom key\nawait db.delete_by_key(User, key={'email': 'john@example.com'})\n```\n\n### Query Data\n\n```python\n# Get a single row by primary key\nuser = await db.get_by_id(User, 1)\n\n# Get a row by primary key or raise NotFoundError if not found\nuser = await db.get_by_id_or_fail(User, 1)\n\n# Get by custom key\nuser = await db.get_by_key(User, key={'email': 'john@example.com'})\n\n# Get by key or raise NotFoundError if not found\nuser = await db.get_by_key_or_fail(User, key={'email': 'john@example.com'})\n\n# Use with Pydantic models\nfrom pydantic import BaseModel\nfrom commondao.annotation import RawSql\nfrom typing import Annotated\n\nclass UserModel(BaseModel):\n    id: int\n    name: str\n    email: str\n    full_name: Annotated[str, RawSql(\"CONCAT(first_name, ' ', last_name)\")]\n\n# Query with model validation\nuser = await db.select_one(\n    \"select * from users where id = :id\",\n    UserModel,\n    {\"id\": 1}\n)\n\n# Query multiple rows\nusers = await db.select_all(\n    \"select * from users where status = :status\",\n    UserModel,\n    {\"status\": \"active\"}\n)\n\n# Paginated queries\nfrom commondao import Paged\n\nresult: Paged[UserModel] = await db.select_paged(\n    \"select * from users where status = :status\",\n    UserModel,\n    {\"status\": \"active\"},\n    size=10,\n    offset=0\n)\n\nprint(f\"Total users: {result.total}\")\nprint(f\"Current page: {len(result.items)} users\")\n```\n\n### Raw SQL Execution\n\nCommonDAO supports parameterized SQL queries using named parameters with the `:parameter_name` format for secure and readable queries.\n\n#### execute_query - For SELECT operations\n\n```python\n# Simple query without parameters\nrows = await db.execute_query(\"SELECT * FROM users\")\n\n# Query with single parameter\nuser_rows = await db.execute_query(\n    \"SELECT * FROM users WHERE id = :user_id\",\n    {\"user_id\": 123}\n)\n\n# Query with multiple parameters\nfiltered_rows = await db.execute_query(\n    \"SELECT * FROM users WHERE name = :name AND age > :min_age\",\n    {\"name\": \"John\", \"min_age\": 18}\n)\n\n# Query with IN clause (using list parameter)\nusers_in_group = await db.execute_query(\n    \"SELECT * FROM users WHERE id IN :user_ids\",\n    {\"user_ids\": [1, 2, 3, 4]}\n)\n\n# Complex query with date filtering\nrecent_users = await db.execute_query(\n    \"SELECT * FROM users WHERE created_at > :date AND status = :status\",\n    {\"date\": \"2023-01-01\", \"status\": \"active\"}\n)\n```\n\n#### execute_mutation - For INSERT, UPDATE, DELETE operations\n\n```python\n# INSERT statement\naffected = await db.execute_mutation(\n    \"INSERT INTO users (name, email, age) VALUES (:name, :email, :age)\",\n    {\"name\": \"John\", \"email\": \"john@example.com\", \"age\": 25}\n)\nprint(f\"Inserted {affected} rows\")\n\n# UPDATE statement\naffected = await db.execute_mutation(\n    \"UPDATE users SET email = :new_email WHERE id = :user_id\",\n    {\"new_email\": \"newemail@example.com\", \"user_id\": 123}\n)\nprint(f\"Updated {affected} rows\")\n\n# DELETE statement\naffected = await db.execute_mutation(\n    \"DELETE FROM users WHERE age < :min_age\",\n    {\"min_age\": 18}\n)\nprint(f\"Deleted {affected} rows\")\n\n# Multiple parameter UPDATE\naffected = await db.execute_mutation(\n    \"UPDATE users SET name = :name, age = :age WHERE id = :id\",\n    {\"name\": \"Jane\", \"age\": 30, \"id\": 456}\n)\n\n# Bulk operations with loop\nuser_list = [\n    {\"name\": \"Alice\", \"email\": \"alice@example.com\"},\n    {\"name\": \"Bob\", \"email\": \"bob@example.com\"},\n]\n\nfor user_data in user_list:\n    affected = await db.execute_mutation(\n        \"INSERT INTO users (name, email) VALUES (:name, :email)\",\n        user_data\n    )\n```\n\n#### Parameter Format Rules\n\n- **Named Parameters**: Use `:parameter_name` format in SQL\n- **Dictionary Keys**: Match parameter names without the colon prefix\n- **Supported Types**: str, int, float, bytes, datetime, date, time, timedelta, Decimal\n- **Lists/Tuples**: Supported for IN clauses in queries\n- **None Values**: Properly handled as SQL NULL\n\n```python\n# Example with various data types\nfrom datetime import datetime, date\nfrom decimal import Decimal\n\nresult = await db.execute_query(\n    \"\"\"\n    SELECT * FROM orders\n    WHERE customer_id = :customer_id\n    AND total >= :min_total\n    AND created_date = :order_date\n    AND status IN :valid_statuses\n    \"\"\",\n    {\n        \"customer_id\": 123,\n        \"min_total\": Decimal(\"99.99\"),\n        \"order_date\": date(2023, 12, 25),\n        \"valid_statuses\": [\"pending\", \"confirmed\", \"shipped\"]\n    }\n)\n```\n\n### Transactions\n\n```python\nfrom commondao.annotation import TableId\nfrom typing import Annotated, Optional\n\nclass OrderInsert(BaseModel):\n    id: Annotated[Optional[int], TableId('orders')] = None\n    customer_id: int\n    total: float\n\nclass OrderItemInsert(BaseModel):\n    id: Annotated[Optional[int], TableId('order_items')] = None\n    order_id: int\n    product_id: int\n\nasync with connect(host='localhost', user='root', db='testdb') as db:\n    # Start transaction (autocommit=False by default)\n    order = OrderInsert(customer_id=1, total=99.99)\n    await db.insert(order)\n    order_id = db.lastrowid()  # Get the auto-generated order id\n\n    item = OrderItemInsert(order_id=order_id, product_id=42)\n    await db.insert(item)\n\n    # Commit the transaction\n    await db.commit()\n```\n\n## \ud83d\udd10 Type Safety\n\nCommonDAO provides robust type checking to help prevent errors:\n\n```python\nfrom commondao import is_row_dict, is_query_dict\nfrom typing import Dict, Any\nfrom datetime import datetime\n\n# Valid row dict (for updates/inserts)\nvalid_data: Dict[str, Any] = {\n    \"id\": 1,\n    \"name\": \"John\",\n    \"created_at\": datetime.now(),\n}\n\n# Check type safety\nassert is_row_dict(valid_data)  # Type check passes\n\n# Valid query dict (can contain lists/tuples for IN clauses)\nvalid_query: Dict[str, Any] = {\n    \"id\": 1,\n    \"status\": \"active\",\n    \"tags\": [\"admin\", \"user\"],  # Lists are valid for query dicts\n    \"codes\": (200, 201)  # Tuples are also valid\n}\n\nassert is_query_dict(valid_query)  # Type check passes\n\n# Invalid row dict (contains a list)\ninvalid_data: Dict[str, Any] = {\n    \"id\": 1,\n    \"tags\": [\"admin\", \"user\"]  # Lists are not valid row values\n}\n\nassert not is_row_dict(invalid_data)  # Type check fails\n```\n\n## \ud83d\udcd6 API Documentation\n\nFor complete API documentation, please see the docstrings in the code or visit our documentation website.\n\n## \ud83e\uddea Testing\n\nCommonDAO comes with comprehensive tests to ensure reliability:\n\n```bash\n# Install test dependencies\npip install -e \".[test]\"\n\n# Run tests\npytest tests\n```\n\n## \ud83e\udd1d Contributing\n\nContributions are welcome! Please feel free to submit a Pull Request.\n\n## \ud83d\udcc4 License\n\nThis project is licensed under the Apache License 2.0.\n\n# CommonDAO API Reference\n\nThis document provides comprehensive API reference for CommonDAO, a powerful async MySQL toolkit with Pydantic integration.\n\n## Table of Contents\n\n- [Connection Management](#connection-management)\n- [Core CRUD Operations](#core-crud-operations)\n- [Query Methods](#query-methods)\n- [Raw SQL Execution](#raw-sql-execution)\n- [Transaction Management](#transaction-management)\n- [Type Safety Utilities](#type-safety-utilities)\n- [Annotations](#annotations)\n- [Error Classes](#error-classes)\n- [Data Types](#data-types)\n\n## Connection Management\n\n### `connect(**config) -> AsyncContextManager[Commondao]`\n\nCreates an async database connection context manager.\n\n**Parameters:**\n- `host` (str): Database host\n- `port` (int): Database port (default: 3306)\n- `user` (str): Database username\n- `password` (str): Database password\n- `db` (str): Database name\n- `autocommit` (bool): Enable autocommit mode (default: True)\n- Additional aiomysql connection parameters\n\n**Returns:** AsyncContextManager yielding a Commondao instance\n\n**Example:**\n```python\nasync with connect(host='localhost', user='root', password='pwd', db='testdb') as db:\n    # Database operations here\n    pass\n```\n\n---\n\n## Core CRUD Operations\n\n### `insert(entity: BaseModel, *, ignore: bool = False, exclude_unset: bool = True, exclude_none: bool = False) -> int`\n\nInsert a Pydantic model instance into the database.\n\n**Parameters:**\n- `entity` (BaseModel): Pydantic model instance to insert\n- `ignore` (bool): Use INSERT IGNORE to skip duplicate key errors\n- `exclude_unset` (bool): If True (default), excludes fields not explicitly set. Allows database defaults for unset fields\n- `exclude_none` (bool): If False (default), includes None values. If True, excludes None values from INSERT\n\n**Returns:** int - Number of affected rows (1 on success, 0 if ignored)\n\n**Example:**\n```python\nuser = UserInsert(name='John', email='john@example.com')\naffected_rows = await db.insert(user)\n```\n\n### `update_by_id(entity: BaseModel, *, exclude_unset: bool = True, exclude_none: bool = False) -> int`\n\nUpdate a record by its primary key using a Pydantic model.\n\n**Parameters:**\n- `entity` (BaseModel): Pydantic model with primary key value set\n- `exclude_unset` (bool): If True (default), only updates explicitly set fields. Enables partial updates\n- `exclude_none` (bool): If False (default), includes None values (sets to NULL). If True, excludes None values\n\n**Returns:** int - Number of affected rows\n\n**Raises:**\n- `EmptyPrimaryKeyError`: If primary key is None or empty\n\n**Example:**\n```python\nuser = UserUpdate(id=1, name='John Updated', email='john.new@example.com')\naffected_rows = await db.update_by_id(user)\n```\n\n### `update_by_key(entity: BaseModel, *, key: QueryDict, exclude_unset: bool = True, exclude_none: bool = False) -> int`\n\nUpdate records matching the specified key conditions.\n\n**Parameters:**\n- `entity` (BaseModel): Pydantic model with update values\n- `key` (QueryDict): Dictionary of key-value pairs for WHERE conditions\n- `exclude_unset` (bool): If True (default), only updates explicitly set fields. Enables partial updates\n- `exclude_none` (bool): If False (default), includes None values (sets to NULL). If True, excludes None values\n\n**Returns:** int - Number of affected rows\n\n**Example:**\n```python\nuser_update = UserUpdate(name='Jane', email='jane@example.com')\naffected_rows = await db.update_by_key(user_update, key={'id': 1})\n```\n\n### `delete_by_id(entity_class: Type[BaseModel], entity_id: Union[int, str]) -> int`\n\nDelete a record by its primary key value.\n\n**Parameters:**\n- `entity_class` (Type[BaseModel]): Pydantic model class\n- `entity_id` (Union[int, str]): The primary key value\n\n**Returns:** int - Number of affected rows\n\n**Raises:**\n- `AssertionError`: If entity_id is None\n\n**Example:**\n```python\naffected_rows = await db.delete_by_id(User, 1)\n```\n### `delete_by_key(entity_class: Type[BaseModel], *, key: QueryDict) -> int`\n\nDelete records matching the specified key conditions.\n\n**Parameters:**\n- `entity_class` (Type[BaseModel]): Pydantic model class\n- `key` (QueryDict): Dictionary of key-value pairs for WHERE conditions\n\n**Returns:** int - Number of affected rows\n\n**Example:**\n```python\naffected_rows = await db.delete_by_key(User, key={'id': 1})\n```\n\n---\n\n## Query Methods\n\n### `get_by_id(entity_class: Type[M], entity_id: Union[int, str]) -> Optional[M]`\n\nGet a single record by its primary key value.\n\n**Parameters:**\n- `entity_class` (Type[M]): Pydantic model class\n- `entity_id` (Union[int, str]): The primary key value\n\n**Returns:** Optional[M] - Model instance or None if not found\n\n**Raises:**\n- `AssertionError`: If entity_id is None\n\n**Example:**\n```python\nuser = await db.get_by_id(User, 1)\nif user:\n    print(f\"Found user: {user.name}\")\n```\n\n### `get_by_id_or_fail(entity_class: Type[M], entity_id: Union[int, str]) -> M`\n\nGet a single record by its primary key value or raise an error if not found.\n\n**Parameters:**\n- `entity_class` (Type[M]): Pydantic model class\n- `entity_id` (Union[int, str]): The primary key value\n\n**Returns:** M - Model instance\n\n**Raises:**\n- `NotFoundError`: If no record matches the primary key\n- `AssertionError`: If entity_id is None\n\n**Example:**\n```python\ntry:\n    user = await db.get_by_id_or_fail(User, 1)\n    print(f\"User: {user.name}\")\nexcept NotFoundError:\n    print(\"User not found\")\n```\n\n### `get_by_key(entity_class: Type[M], *, key: QueryDict) -> Optional[M]`\n\nGet a single record matching the specified key conditions.\n\n**Parameters:**\n- `entity_class` (Type[M]): Pydantic model class\n- `key` (QueryDict): Dictionary of key-value pairs for WHERE conditions\n\n**Returns:** Optional[M] - Model instance or None if not found\n\n**Example:**\n```python\nuser = await db.get_by_key(User, key={'email': 'john@example.com'})\n```\n\n### `get_by_key_or_fail(entity_class: Type[M], *, key: QueryDict) -> M`\n\nGet a single record matching the key conditions or raise an error if not found.\n\n**Parameters:**\n- `entity_class` (Type[M]): Pydantic model class\n- `key` (QueryDict): Dictionary of key-value pairs for WHERE conditions\n\n**Returns:** M - Model instance\n\n**Raises:**\n- `NotFoundError`: If no record matches the conditions\n\n**Example:**\n```python\nuser = await db.get_by_key_or_fail(User, key={'email': 'john@example.com'})\n```\n\n### `select_one(sql: str, select: Type[M], data: QueryDict = {}) -> Optional[M]`\n\nExecute a SELECT query and return the first row as a validated model instance.\n\n**Parameters:**\n- `sql` (str): SQL query starting with 'select * from'\n- `select` (Type[M]): Pydantic model class for result validation\n- `data` (QueryDict): Parameters for the query\n\n**Returns:** Optional[M] - Model instance or None if no results\n\n**Example:**\n```python\nuser = await db.select_one(\n    \"select * from users where age > :min_age\",\n    User,\n    {\"min_age\": 18}\n)\n```\n\n### `select_one_or_fail(sql: str, select: Type[M], data: QueryDict = {}) -> M`\n\nExecute a SELECT query and return the first row or raise an error if not found.\n\n**Parameters:**\n- `sql` (str): SQL query starting with 'select * from'\n- `select` (Type[M]): Pydantic model class for result validation\n- `data` (QueryDict): Parameters for the query\n\n**Returns:** M - Model instance\n\n**Raises:**\n- `NotFoundError`: If no results found\n\n**Example:**\n```python\nuser = await db.select_one_or_fail(\n    \"select * from users where email = :email\",\n    User,\n    {\"email\": \"john@example.com\"}\n)\n```\n\n### `select_all(sql: str, select: Type[M], data: QueryDict = {}) -> list[M]`\n\nExecute a SELECT query and return all matching rows as validated model instances.\n\n**Parameters:**\n- `sql` (str): SQL query starting with 'select * from'\n- `select` (Type[M]): Pydantic model class for result validation\n- `data` (QueryDict): Parameters for the query\n\n**Returns:** list[M] - List of model instances\n\n**Example:**\n```python\nactive_users = await db.select_all(\n    \"select * from users where status = :status\",\n    User,\n    {\"status\": \"active\"}\n)\n```\n\n### `select_paged(sql: str, select: Type[M], data: QueryDict = {}, *, size: int, offset: int = 0) -> Paged[M]`\n\nExecute a paginated SELECT query with total count.\n\n**Parameters:**\n- `sql` (str): SQL query starting with 'select * from'\n- `select` (Type[M]): Pydantic model class for result validation\n- `data` (QueryDict): Parameters for the query\n- `size` (int): Number of items per page\n- `offset` (int): Number of items to skip\n\n**Returns:** Paged[M] - Paginated result with items and total count\n\n**Example:**\n```python\nresult = await db.select_paged(\n    \"select * from users where status = :status\",\n    User,\n    {\"status\": \"active\"},\n    size=10,\n    offset=20\n)\nprint(f\"Total: {result.total}, Page items: {len(result.items)}\")\n```\n\n---\n\n## Raw SQL Execution\n\n### `execute_query(sql: str, data: Mapping[str, Any] = {}) -> list`\n\nExecute a parameterized SQL query and return all results.\n\n**Parameters:**\n- `sql` (str): SQL query with named parameter placeholders (:param_name)\n- `data` (Mapping[str, Any]): Dictionary mapping parameter names to values\n\n**Returns:** list - List of result rows (dictionaries)\n\n**Supported Parameter Types:**\n- str, int, float, bytes, datetime, date, time, timedelta, Decimal\n- Lists/tuples (for IN clauses)\n- None (converted to SQL NULL)\n\n**Examples:**\n```python\n# Simple query\nrows = await db.execute_query(\"SELECT * FROM users\")\n\n# Parameterized query\nrows = await db.execute_query(\n    \"SELECT * FROM users WHERE age > :min_age AND status = :status\",\n    {\"min_age\": 18, \"status\": \"active\"}\n)\n\n# IN clause with list\nrows = await db.execute_query(\n    \"SELECT * FROM users WHERE id IN :user_ids\",\n    {\"user_ids\": [1, 2, 3, 4]}\n)\n```\n\n### `execute_mutation(sql: str, data: Mapping[str, Any] = {}) -> int`\n\nExecute a parameterized SQL mutation (INSERT, UPDATE, DELETE) and return affected row count.\n\n**Parameters:**\n- `sql` (str): SQL mutation statement with named parameter placeholders (:param_name)\n- `data` (Mapping[str, Any]): Dictionary mapping parameter names to values\n\n**Returns:** int - Number of affected rows\n\n**Examples:**\n```python\n# INSERT\naffected = await db.execute_mutation(\n    \"INSERT INTO users (name, email, age) VALUES (:name, :email, :age)\",\n    {\"name\": \"John\", \"email\": \"john@example.com\", \"age\": 25}\n)\n\n# UPDATE\naffected = await db.execute_mutation(\n    \"UPDATE users SET email = :email WHERE id = :id\",\n    {\"email\": \"newemail@example.com\", \"id\": 123}\n)\n\n# DELETE\naffected = await db.execute_mutation(\n    \"DELETE FROM users WHERE age < :min_age\",\n    {\"min_age\": 18}\n)\n```\n\n---\n\n## Transaction Management\n\n### `commit() -> None`\n\nCommit the current transaction.\n\n**Example:**\n```python\nasync with connect(host='localhost', autocommit=False) as db:\n    await db.insert(user)\n    await db.commit()  # Explicitly commit\n```\n\n### `rollback() -> None`\n\nRollback the current transaction.\n\n**Example:**\n```python\ntry:\n    await db.insert(user)\n    await db.insert(order)\n    await db.commit()\nexcept Exception:\n    await db.rollback()\n```\n\n### `lastrowid() -> int`\n\nGet the auto-generated ID of the last inserted row.\n\n**Returns:** int - The last inserted row ID\n\n**Example:**\n```python\nawait db.insert(user)\nuser_id = db.lastrowid()\nprint(f\"New user ID: {user_id}\")\n```\n\n---\n\n## Type Safety Utilities\n\n### `is_row_dict(data: Mapping) -> TypeGuard[RowDict]`\n\nCheck if a mapping is valid for database row operations (INSERT/UPDATE).\n\n**Parameters:**\n- `data` (Mapping): The mapping to check\n\n**Returns:** bool - True if valid for row operations\n\n**Valid Types:** str, int, float, bytes, datetime, date, time, timedelta, Decimal, None\n\n**Example:**\n```python\nfrom commondao import is_row_dict\n\ndata = {\"id\": 1, \"name\": \"John\", \"age\": 25}\nassert is_row_dict(data)\n# Now TypeScript knows data is a valid RowDict\n```\n\n### `is_query_dict(data: Mapping) -> TypeGuard[QueryDict]`\n\nCheck if a mapping is valid for query operations (WHERE clauses).\n\n**Parameters:**\n- `data` (Mapping): The mapping to check\n\n**Returns:** bool - True if valid for query operations\n\n**Valid Types:** All RowDict types plus lists and tuples (for IN clauses)\n\n**Example:**\n```python\nfrom commondao import is_query_dict\n\nquery_data = {\"id\": 1, \"status\": \"active\", \"tags\": [\"admin\", \"user\"]}\nassert is_query_dict(query_data)\n# Now TypeScript knows query_data is a valid QueryDict\n```\n\n---\n\n## Annotations\n\n### `TableId(table_name: str)`\n\nAnnotation to mark a field as the primary key and specify the table name.\n\n**Parameters:**\n- `table_name` (str): Name of the database table\n\n**Usage:**\n```python\nfrom commondao.annotation import TableId\nfrom typing import Annotated, Optional\nfrom pydantic import BaseModel\n\nclass User(BaseModel):\n    id: Annotated[Optional[int], TableId('users')] = None\n    name: str\n    email: str\n```\n\n### `RawSql(expression: str)`\n\nAnnotation to include raw SQL expressions in SELECT queries.\n\n**Parameters:**\n- `expression` (str): SQL expression to include\n\n**Usage:**\n```python\nfrom commondao.annotation import RawSql\nfrom typing import Annotated\n\nclass UserWithFullName(BaseModel):\n    id: int\n    first_name: str\n    last_name: str\n    full_name: Annotated[str, RawSql(\"CONCAT(first_name, ' ', last_name)\")]\n```\n\n---\n\n## Error Classes\n\n### `NotFoundError(ValueError)`\n\nRaised when a record is not found in operations that expect a result.\n\n**Example:**\n```python\ntry:\n    user = await db.get_by_id_or_fail(User, 999)\nexcept NotFoundError as e:\n    print(f\"User not found: {e}\")\n```\n\n### `EmptyPrimaryKeyError(ValueError)`\n\nRaised when attempting operations with empty or None primary key values.\n\n**Example:**\n```python\ntry:\n    user = UserUpdate(id=None, name=\"John\")\n    await db.update_by_id(user)\nexcept EmptyPrimaryKeyError as e:\n    print(f\"Primary key error: {e}\")\n```\n\n### `NotTableError(ValueError)`\n\nRaised when a Pydantic model doesn't have proper table annotation.\n\n### `MissingParamError(ValueError)`\n\nRaised when required parameters are missing from SQL queries.\n\n### `TooManyResultError(ValueError)`\n\nRaised when operations expecting single results return multiple rows.\n\n---\n\n## Data Types\n\n### `RowDict`\n\nType alias for mappings valid in database row operations.\n\n```python\nRowDict = Mapping[str, Union[str, int, float, bytes, datetime, date, time, timedelta, Decimal, None]]\n```\n\n### `QueryDict`\n\nType alias for mappings valid in query operations (extends RowDict with list/tuple support).\n\n```python\nQueryDict = Mapping[str, Union[RowValueType, list, tuple]]\n```\n\n### `Paged[T]`\n\nGeneric container for paginated query results.\n\n**Attributes:**\n- `items` (list[T]): List of result items\n- `total` (int): Total count of all matching records\n\n**Example:**\n```python\nresult: Paged[User] = await db.select_paged(\n    \"select * from users\",\n    User,\n    size=10\n)\nprint(f\"Page has {len(result.items)} items out of {result.total} total\")\n```\n\n---\n\n## Type Safety with TypeGuard\n\nCommonDAO provides TypeGuard functions `is_row_dict()` and `is_query_dict()` for runtime type checking. **Important**: These functions should only be used with `assert` statements for proper type narrowing.\n\n### Correct Usage Pattern\n\n```python\nfrom commondao import is_row_dict, is_query_dict\n\n# \u2705 Correct: Use with assert\ndef process_database_row(data: dict):\n    assert is_row_dict(data)\n    # Type checker now knows data is RowDict\n    # Safe to use for database operations\n    user = UserInsert(**data)\n    await db.insert(user)\n\ndef process_query_parameters(params: dict):\n    assert is_query_dict(params)\n    # Type checker now knows params is QueryDict\n    # Safe to use in queries\n    result = await db.execute_query(\n        \"SELECT * FROM users WHERE status = :status\",\n        params\n    )\n\n# \u2705 Correct: In test validation\nasync def test_query_result():\n    rows = await db.execute_query(\"SELECT * FROM users\")\n    assert len(rows) > 0\n    row = rows[0]\n    assert is_row_dict(row)  # Validates row format\n    # Now safely access row data\n    user_id = row['id']\n```\n\n### What NOT to do\n\n```python\n# \u274c Wrong: Don't use in if statements\nif is_row_dict(data):\n    # Type narrowing won't work properly\n    pass\n\n# \u274c Wrong: Don't use in boolean expressions\nvalid = is_row_dict(data) and data['id'] > 0\n\n# \u274c Wrong: Don't use with or/and logic\nassert is_row_dict(data) or True  # Defeats the purpose\n```\n\n### Why Use Assert\n\nTypeGuard functions with `assert` provide both runtime validation and compile-time type narrowing:\n\n```python\ndef example_function(unknown_data: dict):\n    # Before assert: unknown_data is just dict\n    assert is_row_dict(unknown_data)\n    # After assert: type checker knows unknown_data is RowDict\n\n    # This will have proper type hints and validation\n    await db.execute_mutation(\n        \"INSERT INTO users (name, email) VALUES (:name, :email)\",\n        unknown_data  # Type checker knows this is safe\n    )\n```\n\n## Best Practices\n\n### Entity Class Naming Conventions\n\nWhen designing Pydantic models for database operations, follow these naming conventions to improve code clarity and maintainability:\n\n- **Naming Convention**: Name entity classes used for `dao.insert()` with an `Insert` suffix; name entity classes used for `dao.update()` with an `Update` suffix; keep query entity class names unchanged.\n\n- **Field Optionality Rules**:\n  - For query entity classes: If a field's DDL is `NOT NULL`, the field should not be optional\n  - For insert entity classes: If a field's DDL is `nullable` or has a default value, the field can be optional\n  - For update entity classes: All fields should be optional\n\n- **Field Inclusion**:\n  - Insert entity classes should only include fields that may need to be inserted\n  - Update entity classes should only include fields that may need to be modified\n\n- **TableId Annotation**: Entity classes used for insert/update operations must always include the `TableId` annotation, even if the primary key field is optional\n",
    "bugtrack_url": null,
    "license": "Apache-2.0",
    "summary": "Mysql toolkit",
    "version": "1.3.1",
    "project_urls": {
        "homepage": "https://github.com/lessweb/commondao"
    },
    "split_keywords": [
        "mysql"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "540d4657516166cbe4711810c62989cc458efdb8e6bcf14ce41827a58b415935",
                "md5": "7c5fac92d48607f6c4a2df298c2dc407",
                "sha256": "4bd4955e586587504b53c55016c4380a60fc6d2df61cc2daf11c5e8a829fbe57"
            },
            "downloads": -1,
            "filename": "commondao-1.3.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "7c5fac92d48607f6c4a2df298c2dc407",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.10",
            "size": 22571,
            "upload_time": "2025-10-27T10:17:04",
            "upload_time_iso_8601": "2025-10-27T10:17:04.901254Z",
            "url": "https://files.pythonhosted.org/packages/54/0d/4657516166cbe4711810c62989cc458efdb8e6bcf14ce41827a58b415935/commondao-1.3.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "4ada50d0bd6e5b33efde87cd77ee8414537debe079563da10c2a73a982389cbb",
                "md5": "1091684351b72546df6c02aa05d2d82d",
                "sha256": "5e60404a6807cd82c3b047310d6e53847564a35961e7387aac6f7525e928a64f"
            },
            "downloads": -1,
            "filename": "commondao-1.3.1.tar.gz",
            "has_sig": false,
            "md5_digest": "1091684351b72546df6c02aa05d2d82d",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.10",
            "size": 43552,
            "upload_time": "2025-10-27T10:17:06",
            "upload_time_iso_8601": "2025-10-27T10:17:06.294478Z",
            "url": "https://files.pythonhosted.org/packages/4a/da/50d0bd6e5b33efde87cd77ee8414537debe079563da10c2a73a982389cbb/commondao-1.3.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-10-27 10:17:06",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "lessweb",
    "github_project": "commondao",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "commondao"
}
        
Elapsed time: 1.12457s