| Name | commondao JSON |
| Version |
1.3.1
JSON |
| download |
| home_page | None |
| Summary | Mysql toolkit |
| upload_time | 2025-10-27 10:17:06 |
| maintainer | None |
| docs_url | None |
| author | None |
| requires_python | >=3.10 |
| license | Apache-2.0 |
| keywords |
mysql
|
| VCS |
 |
| bugtrack_url |
|
| requirements |
No requirements were recorded.
|
| Travis-CI |
No Travis.
|
| coveralls test coverage |
No coveralls.
|
# CommonDAO
A powerful, type-safe, and Pydantic-integrated async MySQL toolkit for Python.




CommonDAO is a lightweight, type-safe async MySQL toolkit designed to simplify database operations with a clean, intuitive API. It integrates seamlessly with Pydantic for robust data validation while providing a comprehensive set of tools for common database tasks.
## โจ Features
- **Async/Await Support**: Built on aiomysql for non-blocking database operations
- **Type Safety**: Strong typing with Python's type hints and runtime type checking
- **Pydantic Integration**: Seamless validation and transformation of database records to Pydantic models
- **SQL Injection Protection**: Parameterized queries for secure database access
- **Comprehensive CRUD Operations**: Simple methods for common database tasks
- **Raw SQL Support**: Full control when you need it with parameterized raw SQL
- **Connection Pooling**: Efficient database connection management
- **Minimal Boilerplate**: Write less code while maintaining readability and control
## ๐ Installation
```bash
pip install commondao
```
## ๐ Quick Start
```python
import asyncio
from commondao import connect
from commondao.annotation import TableId
from pydantic import BaseModel
from typing import Annotated
# Define your Pydantic models with TableId annotation
class User(BaseModel):
id: Annotated[int, TableId('users')] # First field with TableId is the primary key
name: str
email: str
class UserInsert(BaseModel):
id: Annotated[Optional[int], TableId('users')] = None # Optional for auto-increment
name: str
email: str
async def main():
# Connect to database
config = {
'host': 'localhost',
'port': 3306,
'user': 'root',
'password': 'password',
'db': 'testdb',
'autocommit': True,
}
async with connect(**config) as db:
# Insert a new user using Pydantic model
user = UserInsert(name='John Doe', email='john@example.com')
await db.insert(user)
# Query the user by key with Pydantic model validation
result = await db.get_by_key(User, key={'email': 'john@example.com'})
if result:
print(f"User: {result.name} ({result.email})") # Output => User: John Doe (john@example.com)
if __name__ == "__main__":
asyncio.run(main())
```
## ๐ Core Operations
### Connection
```python
from commondao import connect
async with connect(
host='localhost',
port=3306,
user='root',
password='password',
db='testdb'
) as db:
# Your database operations here
pass
```
### Insert Data (with Pydantic Models)
```python
from pydantic import BaseModel
from commondao.annotation import TableId
from typing import Annotated, Optional
class UserInsert(BaseModel):
id: Annotated[Optional[int], TableId('users')] = None # Auto-increment primary key
name: str
email: str
# Insert using Pydantic model (id will be auto-generated)
user = UserInsert(name='John', email='john@example.com')
await db.insert(user)
print(f"New user id: {db.lastrowid()}") # Get the auto-generated id
# Insert with ignore option (skips duplicate key errors)
user2 = UserInsert(name='Jane', email='jane@example.com')
await db.insert(user2, ignore=True)
# Insert with custom field handling
# exclude_unset=False: includes all fields, even those not explicitly set
# exclude_none=True: excludes fields with None values
user3 = UserInsert(name='Bob', email='bob@example.com')
await db.insert(user3, exclude_unset=False, exclude_none=True)
```
### Update Data (with Pydantic Models)
```python
class UserUpdate(BaseModel):
id: Optional[int] = None
name: Optional[str] = None
email: Optional[str] = None
# Update by primary key (id must be provided)
user = UserUpdate(id=1, name='John Smith', email='john.smith@example.com')
await db.update_by_id(user)
# Update by custom key (partial update - only specified fields)
user_update = UserUpdate(name='Jane Doe', email='jane.doe@example.com')
await db.update_by_key(user_update, key={'email': 'john.smith@example.com'})
# Update with field handling options
# exclude_unset=True (default): only update explicitly set fields
# exclude_none=False (default): include None values (set column to NULL)
user = UserUpdate(id=2, name='Alice', email=None)
await db.update_by_id(user, exclude_unset=True, exclude_none=False) # Sets email to NULL
# Update excluding None values
user = UserUpdate(id=3, name='Bob', email=None)
await db.update_by_id(user, exclude_unset=True, exclude_none=True) # Won't update email column
```
### Delete Data
```python
# Delete by primary key
await db.delete_by_id(User, 1)
# Delete by custom key
await db.delete_by_key(User, key={'email': 'john@example.com'})
```
### Query Data
```python
# Get a single row by primary key
user = await db.get_by_id(User, 1)
# Get a row by primary key or raise NotFoundError if not found
user = await db.get_by_id_or_fail(User, 1)
# Get by custom key
user = await db.get_by_key(User, key={'email': 'john@example.com'})
# Get by key or raise NotFoundError if not found
user = await db.get_by_key_or_fail(User, key={'email': 'john@example.com'})
# Use with Pydantic models
from pydantic import BaseModel
from commondao.annotation import RawSql
from typing import Annotated
class UserModel(BaseModel):
id: int
name: str
email: str
full_name: Annotated[str, RawSql("CONCAT(first_name, ' ', last_name)")]
# Query with model validation
user = await db.select_one(
"select * from users where id = :id",
UserModel,
{"id": 1}
)
# Query multiple rows
users = await db.select_all(
"select * from users where status = :status",
UserModel,
{"status": "active"}
)
# Paginated queries
from commondao import Paged
result: Paged[UserModel] = await db.select_paged(
"select * from users where status = :status",
UserModel,
{"status": "active"},
size=10,
offset=0
)
print(f"Total users: {result.total}")
print(f"Current page: {len(result.items)} users")
```
### Raw SQL Execution
CommonDAO supports parameterized SQL queries using named parameters with the `:parameter_name` format for secure and readable queries.
#### execute_query - For SELECT operations
```python
# Simple query without parameters
rows = await db.execute_query("SELECT * FROM users")
# Query with single parameter
user_rows = await db.execute_query(
"SELECT * FROM users WHERE id = :user_id",
{"user_id": 123}
)
# Query with multiple parameters
filtered_rows = await db.execute_query(
"SELECT * FROM users WHERE name = :name AND age > :min_age",
{"name": "John", "min_age": 18}
)
# Query with IN clause (using list parameter)
users_in_group = await db.execute_query(
"SELECT * FROM users WHERE id IN :user_ids",
{"user_ids": [1, 2, 3, 4]}
)
# Complex query with date filtering
recent_users = await db.execute_query(
"SELECT * FROM users WHERE created_at > :date AND status = :status",
{"date": "2023-01-01", "status": "active"}
)
```
#### execute_mutation - For INSERT, UPDATE, DELETE operations
```python
# INSERT statement
affected = await db.execute_mutation(
"INSERT INTO users (name, email, age) VALUES (:name, :email, :age)",
{"name": "John", "email": "john@example.com", "age": 25}
)
print(f"Inserted {affected} rows")
# UPDATE statement
affected = await db.execute_mutation(
"UPDATE users SET email = :new_email WHERE id = :user_id",
{"new_email": "newemail@example.com", "user_id": 123}
)
print(f"Updated {affected} rows")
# DELETE statement
affected = await db.execute_mutation(
"DELETE FROM users WHERE age < :min_age",
{"min_age": 18}
)
print(f"Deleted {affected} rows")
# Multiple parameter UPDATE
affected = await db.execute_mutation(
"UPDATE users SET name = :name, age = :age WHERE id = :id",
{"name": "Jane", "age": 30, "id": 456}
)
# Bulk operations with loop
user_list = [
{"name": "Alice", "email": "alice@example.com"},
{"name": "Bob", "email": "bob@example.com"},
]
for user_data in user_list:
affected = await db.execute_mutation(
"INSERT INTO users (name, email) VALUES (:name, :email)",
user_data
)
```
#### Parameter Format Rules
- **Named Parameters**: Use `:parameter_name` format in SQL
- **Dictionary Keys**: Match parameter names without the colon prefix
- **Supported Types**: str, int, float, bytes, datetime, date, time, timedelta, Decimal
- **Lists/Tuples**: Supported for IN clauses in queries
- **None Values**: Properly handled as SQL NULL
```python
# Example with various data types
from datetime import datetime, date
from decimal import Decimal
result = await db.execute_query(
"""
SELECT * FROM orders
WHERE customer_id = :customer_id
AND total >= :min_total
AND created_date = :order_date
AND status IN :valid_statuses
""",
{
"customer_id": 123,
"min_total": Decimal("99.99"),
"order_date": date(2023, 12, 25),
"valid_statuses": ["pending", "confirmed", "shipped"]
}
)
```
### Transactions
```python
from commondao.annotation import TableId
from typing import Annotated, Optional
class OrderInsert(BaseModel):
id: Annotated[Optional[int], TableId('orders')] = None
customer_id: int
total: float
class OrderItemInsert(BaseModel):
id: Annotated[Optional[int], TableId('order_items')] = None
order_id: int
product_id: int
async with connect(host='localhost', user='root', db='testdb') as db:
# Start transaction (autocommit=False by default)
order = OrderInsert(customer_id=1, total=99.99)
await db.insert(order)
order_id = db.lastrowid() # Get the auto-generated order id
item = OrderItemInsert(order_id=order_id, product_id=42)
await db.insert(item)
# Commit the transaction
await db.commit()
```
## ๐ Type Safety
CommonDAO provides robust type checking to help prevent errors:
```python
from commondao import is_row_dict, is_query_dict
from typing import Dict, Any
from datetime import datetime
# Valid row dict (for updates/inserts)
valid_data: Dict[str, Any] = {
"id": 1,
"name": "John",
"created_at": datetime.now(),
}
# Check type safety
assert is_row_dict(valid_data) # Type check passes
# Valid query dict (can contain lists/tuples for IN clauses)
valid_query: Dict[str, Any] = {
"id": 1,
"status": "active",
"tags": ["admin", "user"], # Lists are valid for query dicts
"codes": (200, 201) # Tuples are also valid
}
assert is_query_dict(valid_query) # Type check passes
# Invalid row dict (contains a list)
invalid_data: Dict[str, Any] = {
"id": 1,
"tags": ["admin", "user"] # Lists are not valid row values
}
assert not is_row_dict(invalid_data) # Type check fails
```
## ๐ API Documentation
For complete API documentation, please see the docstrings in the code or visit our documentation website.
## ๐งช Testing
CommonDAO comes with comprehensive tests to ensure reliability:
```bash
# Install test dependencies
pip install -e ".[test]"
# Run tests
pytest tests
```
## ๐ค Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
## ๐ License
This project is licensed under the Apache License 2.0.
# CommonDAO API Reference
This document provides comprehensive API reference for CommonDAO, a powerful async MySQL toolkit with Pydantic integration.
## Table of Contents
- [Connection Management](#connection-management)
- [Core CRUD Operations](#core-crud-operations)
- [Query Methods](#query-methods)
- [Raw SQL Execution](#raw-sql-execution)
- [Transaction Management](#transaction-management)
- [Type Safety Utilities](#type-safety-utilities)
- [Annotations](#annotations)
- [Error Classes](#error-classes)
- [Data Types](#data-types)
## Connection Management
### `connect(**config) -> AsyncContextManager[Commondao]`
Creates an async database connection context manager.
**Parameters:**
- `host` (str): Database host
- `port` (int): Database port (default: 3306)
- `user` (str): Database username
- `password` (str): Database password
- `db` (str): Database name
- `autocommit` (bool): Enable autocommit mode (default: True)
- Additional aiomysql connection parameters
**Returns:** AsyncContextManager yielding a Commondao instance
**Example:**
```python
async with connect(host='localhost', user='root', password='pwd', db='testdb') as db:
# Database operations here
pass
```
---
## Core CRUD Operations
### `insert(entity: BaseModel, *, ignore: bool = False, exclude_unset: bool = True, exclude_none: bool = False) -> int`
Insert a Pydantic model instance into the database.
**Parameters:**
- `entity` (BaseModel): Pydantic model instance to insert
- `ignore` (bool): Use INSERT IGNORE to skip duplicate key errors
- `exclude_unset` (bool): If True (default), excludes fields not explicitly set. Allows database defaults for unset fields
- `exclude_none` (bool): If False (default), includes None values. If True, excludes None values from INSERT
**Returns:** int - Number of affected rows (1 on success, 0 if ignored)
**Example:**
```python
user = UserInsert(name='John', email='john@example.com')
affected_rows = await db.insert(user)
```
### `update_by_id(entity: BaseModel, *, exclude_unset: bool = True, exclude_none: bool = False) -> int`
Update a record by its primary key using a Pydantic model.
**Parameters:**
- `entity` (BaseModel): Pydantic model with primary key value set
- `exclude_unset` (bool): If True (default), only updates explicitly set fields. Enables partial updates
- `exclude_none` (bool): If False (default), includes None values (sets to NULL). If True, excludes None values
**Returns:** int - Number of affected rows
**Raises:**
- `EmptyPrimaryKeyError`: If primary key is None or empty
**Example:**
```python
user = UserUpdate(id=1, name='John Updated', email='john.new@example.com')
affected_rows = await db.update_by_id(user)
```
### `update_by_key(entity: BaseModel, *, key: QueryDict, exclude_unset: bool = True, exclude_none: bool = False) -> int`
Update records matching the specified key conditions.
**Parameters:**
- `entity` (BaseModel): Pydantic model with update values
- `key` (QueryDict): Dictionary of key-value pairs for WHERE conditions
- `exclude_unset` (bool): If True (default), only updates explicitly set fields. Enables partial updates
- `exclude_none` (bool): If False (default), includes None values (sets to NULL). If True, excludes None values
**Returns:** int - Number of affected rows
**Example:**
```python
user_update = UserUpdate(name='Jane', email='jane@example.com')
affected_rows = await db.update_by_key(user_update, key={'id': 1})
```
### `delete_by_id(entity_class: Type[BaseModel], entity_id: Union[int, str]) -> int`
Delete a record by its primary key value.
**Parameters:**
- `entity_class` (Type[BaseModel]): Pydantic model class
- `entity_id` (Union[int, str]): The primary key value
**Returns:** int - Number of affected rows
**Raises:**
- `AssertionError`: If entity_id is None
**Example:**
```python
affected_rows = await db.delete_by_id(User, 1)
```
### `delete_by_key(entity_class: Type[BaseModel], *, key: QueryDict) -> int`
Delete records matching the specified key conditions.
**Parameters:**
- `entity_class` (Type[BaseModel]): Pydantic model class
- `key` (QueryDict): Dictionary of key-value pairs for WHERE conditions
**Returns:** int - Number of affected rows
**Example:**
```python
affected_rows = await db.delete_by_key(User, key={'id': 1})
```
---
## Query Methods
### `get_by_id(entity_class: Type[M], entity_id: Union[int, str]) -> Optional[M]`
Get a single record by its primary key value.
**Parameters:**
- `entity_class` (Type[M]): Pydantic model class
- `entity_id` (Union[int, str]): The primary key value
**Returns:** Optional[M] - Model instance or None if not found
**Raises:**
- `AssertionError`: If entity_id is None
**Example:**
```python
user = await db.get_by_id(User, 1)
if user:
print(f"Found user: {user.name}")
```
### `get_by_id_or_fail(entity_class: Type[M], entity_id: Union[int, str]) -> M`
Get a single record by its primary key value or raise an error if not found.
**Parameters:**
- `entity_class` (Type[M]): Pydantic model class
- `entity_id` (Union[int, str]): The primary key value
**Returns:** M - Model instance
**Raises:**
- `NotFoundError`: If no record matches the primary key
- `AssertionError`: If entity_id is None
**Example:**
```python
try:
user = await db.get_by_id_or_fail(User, 1)
print(f"User: {user.name}")
except NotFoundError:
print("User not found")
```
### `get_by_key(entity_class: Type[M], *, key: QueryDict) -> Optional[M]`
Get a single record matching the specified key conditions.
**Parameters:**
- `entity_class` (Type[M]): Pydantic model class
- `key` (QueryDict): Dictionary of key-value pairs for WHERE conditions
**Returns:** Optional[M] - Model instance or None if not found
**Example:**
```python
user = await db.get_by_key(User, key={'email': 'john@example.com'})
```
### `get_by_key_or_fail(entity_class: Type[M], *, key: QueryDict) -> M`
Get a single record matching the key conditions or raise an error if not found.
**Parameters:**
- `entity_class` (Type[M]): Pydantic model class
- `key` (QueryDict): Dictionary of key-value pairs for WHERE conditions
**Returns:** M - Model instance
**Raises:**
- `NotFoundError`: If no record matches the conditions
**Example:**
```python
user = await db.get_by_key_or_fail(User, key={'email': 'john@example.com'})
```
### `select_one(sql: str, select: Type[M], data: QueryDict = {}) -> Optional[M]`
Execute a SELECT query and return the first row as a validated model instance.
**Parameters:**
- `sql` (str): SQL query starting with 'select * from'
- `select` (Type[M]): Pydantic model class for result validation
- `data` (QueryDict): Parameters for the query
**Returns:** Optional[M] - Model instance or None if no results
**Example:**
```python
user = await db.select_one(
"select * from users where age > :min_age",
User,
{"min_age": 18}
)
```
### `select_one_or_fail(sql: str, select: Type[M], data: QueryDict = {}) -> M`
Execute a SELECT query and return the first row or raise an error if not found.
**Parameters:**
- `sql` (str): SQL query starting with 'select * from'
- `select` (Type[M]): Pydantic model class for result validation
- `data` (QueryDict): Parameters for the query
**Returns:** M - Model instance
**Raises:**
- `NotFoundError`: If no results found
**Example:**
```python
user = await db.select_one_or_fail(
"select * from users where email = :email",
User,
{"email": "john@example.com"}
)
```
### `select_all(sql: str, select: Type[M], data: QueryDict = {}) -> list[M]`
Execute a SELECT query and return all matching rows as validated model instances.
**Parameters:**
- `sql` (str): SQL query starting with 'select * from'
- `select` (Type[M]): Pydantic model class for result validation
- `data` (QueryDict): Parameters for the query
**Returns:** list[M] - List of model instances
**Example:**
```python
active_users = await db.select_all(
"select * from users where status = :status",
User,
{"status": "active"}
)
```
### `select_paged(sql: str, select: Type[M], data: QueryDict = {}, *, size: int, offset: int = 0) -> Paged[M]`
Execute a paginated SELECT query with total count.
**Parameters:**
- `sql` (str): SQL query starting with 'select * from'
- `select` (Type[M]): Pydantic model class for result validation
- `data` (QueryDict): Parameters for the query
- `size` (int): Number of items per page
- `offset` (int): Number of items to skip
**Returns:** Paged[M] - Paginated result with items and total count
**Example:**
```python
result = await db.select_paged(
"select * from users where status = :status",
User,
{"status": "active"},
size=10,
offset=20
)
print(f"Total: {result.total}, Page items: {len(result.items)}")
```
---
## Raw SQL Execution
### `execute_query(sql: str, data: Mapping[str, Any] = {}) -> list`
Execute a parameterized SQL query and return all results.
**Parameters:**
- `sql` (str): SQL query with named parameter placeholders (:param_name)
- `data` (Mapping[str, Any]): Dictionary mapping parameter names to values
**Returns:** list - List of result rows (dictionaries)
**Supported Parameter Types:**
- str, int, float, bytes, datetime, date, time, timedelta, Decimal
- Lists/tuples (for IN clauses)
- None (converted to SQL NULL)
**Examples:**
```python
# Simple query
rows = await db.execute_query("SELECT * FROM users")
# Parameterized query
rows = await db.execute_query(
"SELECT * FROM users WHERE age > :min_age AND status = :status",
{"min_age": 18, "status": "active"}
)
# IN clause with list
rows = await db.execute_query(
"SELECT * FROM users WHERE id IN :user_ids",
{"user_ids": [1, 2, 3, 4]}
)
```
### `execute_mutation(sql: str, data: Mapping[str, Any] = {}) -> int`
Execute a parameterized SQL mutation (INSERT, UPDATE, DELETE) and return affected row count.
**Parameters:**
- `sql` (str): SQL mutation statement with named parameter placeholders (:param_name)
- `data` (Mapping[str, Any]): Dictionary mapping parameter names to values
**Returns:** int - Number of affected rows
**Examples:**
```python
# INSERT
affected = await db.execute_mutation(
"INSERT INTO users (name, email, age) VALUES (:name, :email, :age)",
{"name": "John", "email": "john@example.com", "age": 25}
)
# UPDATE
affected = await db.execute_mutation(
"UPDATE users SET email = :email WHERE id = :id",
{"email": "newemail@example.com", "id": 123}
)
# DELETE
affected = await db.execute_mutation(
"DELETE FROM users WHERE age < :min_age",
{"min_age": 18}
)
```
---
## Transaction Management
### `commit() -> None`
Commit the current transaction.
**Example:**
```python
async with connect(host='localhost', autocommit=False) as db:
await db.insert(user)
await db.commit() # Explicitly commit
```
### `rollback() -> None`
Rollback the current transaction.
**Example:**
```python
try:
await db.insert(user)
await db.insert(order)
await db.commit()
except Exception:
await db.rollback()
```
### `lastrowid() -> int`
Get the auto-generated ID of the last inserted row.
**Returns:** int - The last inserted row ID
**Example:**
```python
await db.insert(user)
user_id = db.lastrowid()
print(f"New user ID: {user_id}")
```
---
## Type Safety Utilities
### `is_row_dict(data: Mapping) -> TypeGuard[RowDict]`
Check if a mapping is valid for database row operations (INSERT/UPDATE).
**Parameters:**
- `data` (Mapping): The mapping to check
**Returns:** bool - True if valid for row operations
**Valid Types:** str, int, float, bytes, datetime, date, time, timedelta, Decimal, None
**Example:**
```python
from commondao import is_row_dict
data = {"id": 1, "name": "John", "age": 25}
assert is_row_dict(data)
# Now TypeScript knows data is a valid RowDict
```
### `is_query_dict(data: Mapping) -> TypeGuard[QueryDict]`
Check if a mapping is valid for query operations (WHERE clauses).
**Parameters:**
- `data` (Mapping): The mapping to check
**Returns:** bool - True if valid for query operations
**Valid Types:** All RowDict types plus lists and tuples (for IN clauses)
**Example:**
```python
from commondao import is_query_dict
query_data = {"id": 1, "status": "active", "tags": ["admin", "user"]}
assert is_query_dict(query_data)
# Now TypeScript knows query_data is a valid QueryDict
```
---
## Annotations
### `TableId(table_name: str)`
Annotation to mark a field as the primary key and specify the table name.
**Parameters:**
- `table_name` (str): Name of the database table
**Usage:**
```python
from commondao.annotation import TableId
from typing import Annotated, Optional
from pydantic import BaseModel
class User(BaseModel):
id: Annotated[Optional[int], TableId('users')] = None
name: str
email: str
```
### `RawSql(expression: str)`
Annotation to include raw SQL expressions in SELECT queries.
**Parameters:**
- `expression` (str): SQL expression to include
**Usage:**
```python
from commondao.annotation import RawSql
from typing import Annotated
class UserWithFullName(BaseModel):
id: int
first_name: str
last_name: str
full_name: Annotated[str, RawSql("CONCAT(first_name, ' ', last_name)")]
```
---
## Error Classes
### `NotFoundError(ValueError)`
Raised when a record is not found in operations that expect a result.
**Example:**
```python
try:
user = await db.get_by_id_or_fail(User, 999)
except NotFoundError as e:
print(f"User not found: {e}")
```
### `EmptyPrimaryKeyError(ValueError)`
Raised when attempting operations with empty or None primary key values.
**Example:**
```python
try:
user = UserUpdate(id=None, name="John")
await db.update_by_id(user)
except EmptyPrimaryKeyError as e:
print(f"Primary key error: {e}")
```
### `NotTableError(ValueError)`
Raised when a Pydantic model doesn't have proper table annotation.
### `MissingParamError(ValueError)`
Raised when required parameters are missing from SQL queries.
### `TooManyResultError(ValueError)`
Raised when operations expecting single results return multiple rows.
---
## Data Types
### `RowDict`
Type alias for mappings valid in database row operations.
```python
RowDict = Mapping[str, Union[str, int, float, bytes, datetime, date, time, timedelta, Decimal, None]]
```
### `QueryDict`
Type alias for mappings valid in query operations (extends RowDict with list/tuple support).
```python
QueryDict = Mapping[str, Union[RowValueType, list, tuple]]
```
### `Paged[T]`
Generic container for paginated query results.
**Attributes:**
- `items` (list[T]): List of result items
- `total` (int): Total count of all matching records
**Example:**
```python
result: Paged[User] = await db.select_paged(
"select * from users",
User,
size=10
)
print(f"Page has {len(result.items)} items out of {result.total} total")
```
---
## Type Safety with TypeGuard
CommonDAO provides TypeGuard functions `is_row_dict()` and `is_query_dict()` for runtime type checking. **Important**: These functions should only be used with `assert` statements for proper type narrowing.
### Correct Usage Pattern
```python
from commondao import is_row_dict, is_query_dict
# โ
Correct: Use with assert
def process_database_row(data: dict):
assert is_row_dict(data)
# Type checker now knows data is RowDict
# Safe to use for database operations
user = UserInsert(**data)
await db.insert(user)
def process_query_parameters(params: dict):
assert is_query_dict(params)
# Type checker now knows params is QueryDict
# Safe to use in queries
result = await db.execute_query(
"SELECT * FROM users WHERE status = :status",
params
)
# โ
Correct: In test validation
async def test_query_result():
rows = await db.execute_query("SELECT * FROM users")
assert len(rows) > 0
row = rows[0]
assert is_row_dict(row) # Validates row format
# Now safely access row data
user_id = row['id']
```
### What NOT to do
```python
# โ Wrong: Don't use in if statements
if is_row_dict(data):
# Type narrowing won't work properly
pass
# โ Wrong: Don't use in boolean expressions
valid = is_row_dict(data) and data['id'] > 0
# โ Wrong: Don't use with or/and logic
assert is_row_dict(data) or True # Defeats the purpose
```
### Why Use Assert
TypeGuard functions with `assert` provide both runtime validation and compile-time type narrowing:
```python
def example_function(unknown_data: dict):
# Before assert: unknown_data is just dict
assert is_row_dict(unknown_data)
# After assert: type checker knows unknown_data is RowDict
# This will have proper type hints and validation
await db.execute_mutation(
"INSERT INTO users (name, email) VALUES (:name, :email)",
unknown_data # Type checker knows this is safe
)
```
## Best Practices
### Entity Class Naming Conventions
When designing Pydantic models for database operations, follow these naming conventions to improve code clarity and maintainability:
- **Naming Convention**: Name entity classes used for `dao.insert()` with an `Insert` suffix; name entity classes used for `dao.update()` with an `Update` suffix; keep query entity class names unchanged.
- **Field Optionality Rules**:
- For query entity classes: If a field's DDL is `NOT NULL`, the field should not be optional
- For insert entity classes: If a field's DDL is `nullable` or has a default value, the field can be optional
- For update entity classes: All fields should be optional
- **Field Inclusion**:
- Insert entity classes should only include fields that may need to be inserted
- Update entity classes should only include fields that may need to be modified
- **TableId Annotation**: Entity classes used for insert/update operations must always include the `TableId` annotation, even if the primary key field is optional
Raw data
{
"_id": null,
"home_page": null,
"name": "commondao",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.10",
"maintainer_email": null,
"keywords": "mysql",
"author": null,
"author_email": "qorzj <goodhorsezxj@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/4a/da/50d0bd6e5b33efde87cd77ee8414537debe079563da10c2a73a982389cbb/commondao-1.3.1.tar.gz",
"platform": null,
"description": "# CommonDAO\n\nA powerful, type-safe, and Pydantic-integrated async MySQL toolkit for Python.\n\n\n\n\n\n\nCommonDAO is a lightweight, type-safe async MySQL toolkit designed to simplify database operations with a clean, intuitive API. It integrates seamlessly with Pydantic for robust data validation while providing a comprehensive set of tools for common database tasks.\n\n## \u2728 Features\n\n- **Async/Await Support**: Built on aiomysql for non-blocking database operations\n- **Type Safety**: Strong typing with Python's type hints and runtime type checking\n- **Pydantic Integration**: Seamless validation and transformation of database records to Pydantic models\n- **SQL Injection Protection**: Parameterized queries for secure database access\n- **Comprehensive CRUD Operations**: Simple methods for common database tasks\n- **Raw SQL Support**: Full control when you need it with parameterized raw SQL\n- **Connection Pooling**: Efficient database connection management\n- **Minimal Boilerplate**: Write less code while maintaining readability and control\n\n## \ud83d\ude80 Installation\n\n```bash\npip install commondao\n```\n\n## \ud83d\udd0d Quick Start\n\n```python\nimport asyncio\nfrom commondao import connect\nfrom commondao.annotation import TableId\nfrom pydantic import BaseModel\nfrom typing import Annotated\n\n# Define your Pydantic models with TableId annotation\nclass User(BaseModel):\n id: Annotated[int, TableId('users')] # First field with TableId is the primary key\n name: str\n email: str\n\nclass UserInsert(BaseModel):\n id: Annotated[Optional[int], TableId('users')] = None # Optional for auto-increment\n name: str\n email: str\n\nasync def main():\n # Connect to database\n config = {\n 'host': 'localhost',\n 'port': 3306,\n 'user': 'root',\n 'password': 'password',\n 'db': 'testdb',\n 'autocommit': True,\n }\n\n async with connect(**config) as db:\n # Insert a new user using Pydantic model\n user = UserInsert(name='John Doe', email='john@example.com')\n await db.insert(user)\n\n # Query the user by key with Pydantic model validation\n result = await db.get_by_key(User, key={'email': 'john@example.com'})\n if result:\n print(f\"User: {result.name} ({result.email})\") # Output => User: John Doe (john@example.com)\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\n## \ud83d\udcca Core Operations\n\n### Connection\n\n```python\nfrom commondao import connect\n\nasync with connect(\n host='localhost', \n port=3306, \n user='root', \n password='password', \n db='testdb'\n) as db:\n # Your database operations here\n pass\n```\n\n### Insert Data (with Pydantic Models)\n\n```python\nfrom pydantic import BaseModel\nfrom commondao.annotation import TableId\nfrom typing import Annotated, Optional\n\nclass UserInsert(BaseModel):\n id: Annotated[Optional[int], TableId('users')] = None # Auto-increment primary key\n name: str\n email: str\n\n# Insert using Pydantic model (id will be auto-generated)\nuser = UserInsert(name='John', email='john@example.com')\nawait db.insert(user)\nprint(f\"New user id: {db.lastrowid()}\") # Get the auto-generated id\n\n# Insert with ignore option (skips duplicate key errors)\nuser2 = UserInsert(name='Jane', email='jane@example.com')\nawait db.insert(user2, ignore=True)\n\n# Insert with custom field handling\n# exclude_unset=False: includes all fields, even those not explicitly set\n# exclude_none=True: excludes fields with None values\nuser3 = UserInsert(name='Bob', email='bob@example.com')\nawait db.insert(user3, exclude_unset=False, exclude_none=True)\n```\n\n### Update Data (with Pydantic Models)\n\n```python\nclass UserUpdate(BaseModel):\n id: Optional[int] = None\n name: Optional[str] = None\n email: Optional[str] = None\n\n# Update by primary key (id must be provided)\nuser = UserUpdate(id=1, name='John Smith', email='john.smith@example.com')\nawait db.update_by_id(user)\n\n# Update by custom key (partial update - only specified fields)\nuser_update = UserUpdate(name='Jane Doe', email='jane.doe@example.com')\nawait db.update_by_key(user_update, key={'email': 'john.smith@example.com'})\n\n# Update with field handling options\n# exclude_unset=True (default): only update explicitly set fields\n# exclude_none=False (default): include None values (set column to NULL)\nuser = UserUpdate(id=2, name='Alice', email=None)\nawait db.update_by_id(user, exclude_unset=True, exclude_none=False) # Sets email to NULL\n\n# Update excluding None values\nuser = UserUpdate(id=3, name='Bob', email=None)\nawait db.update_by_id(user, exclude_unset=True, exclude_none=True) # Won't update email column\n```\n\n### Delete Data\n\n```python\n# Delete by primary key\nawait db.delete_by_id(User, 1)\n\n# Delete by custom key\nawait db.delete_by_key(User, key={'email': 'john@example.com'})\n```\n\n### Query Data\n\n```python\n# Get a single row by primary key\nuser = await db.get_by_id(User, 1)\n\n# Get a row by primary key or raise NotFoundError if not found\nuser = await db.get_by_id_or_fail(User, 1)\n\n# Get by custom key\nuser = await db.get_by_key(User, key={'email': 'john@example.com'})\n\n# Get by key or raise NotFoundError if not found\nuser = await db.get_by_key_or_fail(User, key={'email': 'john@example.com'})\n\n# Use with Pydantic models\nfrom pydantic import BaseModel\nfrom commondao.annotation import RawSql\nfrom typing import Annotated\n\nclass UserModel(BaseModel):\n id: int\n name: str\n email: str\n full_name: Annotated[str, RawSql(\"CONCAT(first_name, ' ', last_name)\")]\n\n# Query with model validation\nuser = await db.select_one(\n \"select * from users where id = :id\",\n UserModel,\n {\"id\": 1}\n)\n\n# Query multiple rows\nusers = await db.select_all(\n \"select * from users where status = :status\",\n UserModel,\n {\"status\": \"active\"}\n)\n\n# Paginated queries\nfrom commondao import Paged\n\nresult: Paged[UserModel] = await db.select_paged(\n \"select * from users where status = :status\",\n UserModel,\n {\"status\": \"active\"},\n size=10,\n offset=0\n)\n\nprint(f\"Total users: {result.total}\")\nprint(f\"Current page: {len(result.items)} users\")\n```\n\n### Raw SQL Execution\n\nCommonDAO supports parameterized SQL queries using named parameters with the `:parameter_name` format for secure and readable queries.\n\n#### execute_query - For SELECT operations\n\n```python\n# Simple query without parameters\nrows = await db.execute_query(\"SELECT * FROM users\")\n\n# Query with single parameter\nuser_rows = await db.execute_query(\n \"SELECT * FROM users WHERE id = :user_id\",\n {\"user_id\": 123}\n)\n\n# Query with multiple parameters\nfiltered_rows = await db.execute_query(\n \"SELECT * FROM users WHERE name = :name AND age > :min_age\",\n {\"name\": \"John\", \"min_age\": 18}\n)\n\n# Query with IN clause (using list parameter)\nusers_in_group = await db.execute_query(\n \"SELECT * FROM users WHERE id IN :user_ids\",\n {\"user_ids\": [1, 2, 3, 4]}\n)\n\n# Complex query with date filtering\nrecent_users = await db.execute_query(\n \"SELECT * FROM users WHERE created_at > :date AND status = :status\",\n {\"date\": \"2023-01-01\", \"status\": \"active\"}\n)\n```\n\n#### execute_mutation - For INSERT, UPDATE, DELETE operations\n\n```python\n# INSERT statement\naffected = await db.execute_mutation(\n \"INSERT INTO users (name, email, age) VALUES (:name, :email, :age)\",\n {\"name\": \"John\", \"email\": \"john@example.com\", \"age\": 25}\n)\nprint(f\"Inserted {affected} rows\")\n\n# UPDATE statement\naffected = await db.execute_mutation(\n \"UPDATE users SET email = :new_email WHERE id = :user_id\",\n {\"new_email\": \"newemail@example.com\", \"user_id\": 123}\n)\nprint(f\"Updated {affected} rows\")\n\n# DELETE statement\naffected = await db.execute_mutation(\n \"DELETE FROM users WHERE age < :min_age\",\n {\"min_age\": 18}\n)\nprint(f\"Deleted {affected} rows\")\n\n# Multiple parameter UPDATE\naffected = await db.execute_mutation(\n \"UPDATE users SET name = :name, age = :age WHERE id = :id\",\n {\"name\": \"Jane\", \"age\": 30, \"id\": 456}\n)\n\n# Bulk operations with loop\nuser_list = [\n {\"name\": \"Alice\", \"email\": \"alice@example.com\"},\n {\"name\": \"Bob\", \"email\": \"bob@example.com\"},\n]\n\nfor user_data in user_list:\n affected = await db.execute_mutation(\n \"INSERT INTO users (name, email) VALUES (:name, :email)\",\n user_data\n )\n```\n\n#### Parameter Format Rules\n\n- **Named Parameters**: Use `:parameter_name` format in SQL\n- **Dictionary Keys**: Match parameter names without the colon prefix\n- **Supported Types**: str, int, float, bytes, datetime, date, time, timedelta, Decimal\n- **Lists/Tuples**: Supported for IN clauses in queries\n- **None Values**: Properly handled as SQL NULL\n\n```python\n# Example with various data types\nfrom datetime import datetime, date\nfrom decimal import Decimal\n\nresult = await db.execute_query(\n \"\"\"\n SELECT * FROM orders\n WHERE customer_id = :customer_id\n AND total >= :min_total\n AND created_date = :order_date\n AND status IN :valid_statuses\n \"\"\",\n {\n \"customer_id\": 123,\n \"min_total\": Decimal(\"99.99\"),\n \"order_date\": date(2023, 12, 25),\n \"valid_statuses\": [\"pending\", \"confirmed\", \"shipped\"]\n }\n)\n```\n\n### Transactions\n\n```python\nfrom commondao.annotation import TableId\nfrom typing import Annotated, Optional\n\nclass OrderInsert(BaseModel):\n id: Annotated[Optional[int], TableId('orders')] = None\n customer_id: int\n total: float\n\nclass OrderItemInsert(BaseModel):\n id: Annotated[Optional[int], TableId('order_items')] = None\n order_id: int\n product_id: int\n\nasync with connect(host='localhost', user='root', db='testdb') as db:\n # Start transaction (autocommit=False by default)\n order = OrderInsert(customer_id=1, total=99.99)\n await db.insert(order)\n order_id = db.lastrowid() # Get the auto-generated order id\n\n item = OrderItemInsert(order_id=order_id, product_id=42)\n await db.insert(item)\n\n # Commit the transaction\n await db.commit()\n```\n\n## \ud83d\udd10 Type Safety\n\nCommonDAO provides robust type checking to help prevent errors:\n\n```python\nfrom commondao import is_row_dict, is_query_dict\nfrom typing import Dict, Any\nfrom datetime import datetime\n\n# Valid row dict (for updates/inserts)\nvalid_data: Dict[str, Any] = {\n \"id\": 1,\n \"name\": \"John\",\n \"created_at\": datetime.now(),\n}\n\n# Check type safety\nassert is_row_dict(valid_data) # Type check passes\n\n# Valid query dict (can contain lists/tuples for IN clauses)\nvalid_query: Dict[str, Any] = {\n \"id\": 1,\n \"status\": \"active\",\n \"tags\": [\"admin\", \"user\"], # Lists are valid for query dicts\n \"codes\": (200, 201) # Tuples are also valid\n}\n\nassert is_query_dict(valid_query) # Type check passes\n\n# Invalid row dict (contains a list)\ninvalid_data: Dict[str, Any] = {\n \"id\": 1,\n \"tags\": [\"admin\", \"user\"] # Lists are not valid row values\n}\n\nassert not is_row_dict(invalid_data) # Type check fails\n```\n\n## \ud83d\udcd6 API Documentation\n\nFor complete API documentation, please see the docstrings in the code or visit our documentation website.\n\n## \ud83e\uddea Testing\n\nCommonDAO comes with comprehensive tests to ensure reliability:\n\n```bash\n# Install test dependencies\npip install -e \".[test]\"\n\n# Run tests\npytest tests\n```\n\n## \ud83e\udd1d Contributing\n\nContributions are welcome! Please feel free to submit a Pull Request.\n\n## \ud83d\udcc4 License\n\nThis project is licensed under the Apache License 2.0.\n\n# CommonDAO API Reference\n\nThis document provides comprehensive API reference for CommonDAO, a powerful async MySQL toolkit with Pydantic integration.\n\n## Table of Contents\n\n- [Connection Management](#connection-management)\n- [Core CRUD Operations](#core-crud-operations)\n- [Query Methods](#query-methods)\n- [Raw SQL Execution](#raw-sql-execution)\n- [Transaction Management](#transaction-management)\n- [Type Safety Utilities](#type-safety-utilities)\n- [Annotations](#annotations)\n- [Error Classes](#error-classes)\n- [Data Types](#data-types)\n\n## Connection Management\n\n### `connect(**config) -> AsyncContextManager[Commondao]`\n\nCreates an async database connection context manager.\n\n**Parameters:**\n- `host` (str): Database host\n- `port` (int): Database port (default: 3306)\n- `user` (str): Database username\n- `password` (str): Database password\n- `db` (str): Database name\n- `autocommit` (bool): Enable autocommit mode (default: True)\n- Additional aiomysql connection parameters\n\n**Returns:** AsyncContextManager yielding a Commondao instance\n\n**Example:**\n```python\nasync with connect(host='localhost', user='root', password='pwd', db='testdb') as db:\n # Database operations here\n pass\n```\n\n---\n\n## Core CRUD Operations\n\n### `insert(entity: BaseModel, *, ignore: bool = False, exclude_unset: bool = True, exclude_none: bool = False) -> int`\n\nInsert a Pydantic model instance into the database.\n\n**Parameters:**\n- `entity` (BaseModel): Pydantic model instance to insert\n- `ignore` (bool): Use INSERT IGNORE to skip duplicate key errors\n- `exclude_unset` (bool): If True (default), excludes fields not explicitly set. Allows database defaults for unset fields\n- `exclude_none` (bool): If False (default), includes None values. If True, excludes None values from INSERT\n\n**Returns:** int - Number of affected rows (1 on success, 0 if ignored)\n\n**Example:**\n```python\nuser = UserInsert(name='John', email='john@example.com')\naffected_rows = await db.insert(user)\n```\n\n### `update_by_id(entity: BaseModel, *, exclude_unset: bool = True, exclude_none: bool = False) -> int`\n\nUpdate a record by its primary key using a Pydantic model.\n\n**Parameters:**\n- `entity` (BaseModel): Pydantic model with primary key value set\n- `exclude_unset` (bool): If True (default), only updates explicitly set fields. Enables partial updates\n- `exclude_none` (bool): If False (default), includes None values (sets to NULL). If True, excludes None values\n\n**Returns:** int - Number of affected rows\n\n**Raises:**\n- `EmptyPrimaryKeyError`: If primary key is None or empty\n\n**Example:**\n```python\nuser = UserUpdate(id=1, name='John Updated', email='john.new@example.com')\naffected_rows = await db.update_by_id(user)\n```\n\n### `update_by_key(entity: BaseModel, *, key: QueryDict, exclude_unset: bool = True, exclude_none: bool = False) -> int`\n\nUpdate records matching the specified key conditions.\n\n**Parameters:**\n- `entity` (BaseModel): Pydantic model with update values\n- `key` (QueryDict): Dictionary of key-value pairs for WHERE conditions\n- `exclude_unset` (bool): If True (default), only updates explicitly set fields. Enables partial updates\n- `exclude_none` (bool): If False (default), includes None values (sets to NULL). If True, excludes None values\n\n**Returns:** int - Number of affected rows\n\n**Example:**\n```python\nuser_update = UserUpdate(name='Jane', email='jane@example.com')\naffected_rows = await db.update_by_key(user_update, key={'id': 1})\n```\n\n### `delete_by_id(entity_class: Type[BaseModel], entity_id: Union[int, str]) -> int`\n\nDelete a record by its primary key value.\n\n**Parameters:**\n- `entity_class` (Type[BaseModel]): Pydantic model class\n- `entity_id` (Union[int, str]): The primary key value\n\n**Returns:** int - Number of affected rows\n\n**Raises:**\n- `AssertionError`: If entity_id is None\n\n**Example:**\n```python\naffected_rows = await db.delete_by_id(User, 1)\n```\n### `delete_by_key(entity_class: Type[BaseModel], *, key: QueryDict) -> int`\n\nDelete records matching the specified key conditions.\n\n**Parameters:**\n- `entity_class` (Type[BaseModel]): Pydantic model class\n- `key` (QueryDict): Dictionary of key-value pairs for WHERE conditions\n\n**Returns:** int - Number of affected rows\n\n**Example:**\n```python\naffected_rows = await db.delete_by_key(User, key={'id': 1})\n```\n\n---\n\n## Query Methods\n\n### `get_by_id(entity_class: Type[M], entity_id: Union[int, str]) -> Optional[M]`\n\nGet a single record by its primary key value.\n\n**Parameters:**\n- `entity_class` (Type[M]): Pydantic model class\n- `entity_id` (Union[int, str]): The primary key value\n\n**Returns:** Optional[M] - Model instance or None if not found\n\n**Raises:**\n- `AssertionError`: If entity_id is None\n\n**Example:**\n```python\nuser = await db.get_by_id(User, 1)\nif user:\n print(f\"Found user: {user.name}\")\n```\n\n### `get_by_id_or_fail(entity_class: Type[M], entity_id: Union[int, str]) -> M`\n\nGet a single record by its primary key value or raise an error if not found.\n\n**Parameters:**\n- `entity_class` (Type[M]): Pydantic model class\n- `entity_id` (Union[int, str]): The primary key value\n\n**Returns:** M - Model instance\n\n**Raises:**\n- `NotFoundError`: If no record matches the primary key\n- `AssertionError`: If entity_id is None\n\n**Example:**\n```python\ntry:\n user = await db.get_by_id_or_fail(User, 1)\n print(f\"User: {user.name}\")\nexcept NotFoundError:\n print(\"User not found\")\n```\n\n### `get_by_key(entity_class: Type[M], *, key: QueryDict) -> Optional[M]`\n\nGet a single record matching the specified key conditions.\n\n**Parameters:**\n- `entity_class` (Type[M]): Pydantic model class\n- `key` (QueryDict): Dictionary of key-value pairs for WHERE conditions\n\n**Returns:** Optional[M] - Model instance or None if not found\n\n**Example:**\n```python\nuser = await db.get_by_key(User, key={'email': 'john@example.com'})\n```\n\n### `get_by_key_or_fail(entity_class: Type[M], *, key: QueryDict) -> M`\n\nGet a single record matching the key conditions or raise an error if not found.\n\n**Parameters:**\n- `entity_class` (Type[M]): Pydantic model class\n- `key` (QueryDict): Dictionary of key-value pairs for WHERE conditions\n\n**Returns:** M - Model instance\n\n**Raises:**\n- `NotFoundError`: If no record matches the conditions\n\n**Example:**\n```python\nuser = await db.get_by_key_or_fail(User, key={'email': 'john@example.com'})\n```\n\n### `select_one(sql: str, select: Type[M], data: QueryDict = {}) -> Optional[M]`\n\nExecute a SELECT query and return the first row as a validated model instance.\n\n**Parameters:**\n- `sql` (str): SQL query starting with 'select * from'\n- `select` (Type[M]): Pydantic model class for result validation\n- `data` (QueryDict): Parameters for the query\n\n**Returns:** Optional[M] - Model instance or None if no results\n\n**Example:**\n```python\nuser = await db.select_one(\n \"select * from users where age > :min_age\",\n User,\n {\"min_age\": 18}\n)\n```\n\n### `select_one_or_fail(sql: str, select: Type[M], data: QueryDict = {}) -> M`\n\nExecute a SELECT query and return the first row or raise an error if not found.\n\n**Parameters:**\n- `sql` (str): SQL query starting with 'select * from'\n- `select` (Type[M]): Pydantic model class for result validation\n- `data` (QueryDict): Parameters for the query\n\n**Returns:** M - Model instance\n\n**Raises:**\n- `NotFoundError`: If no results found\n\n**Example:**\n```python\nuser = await db.select_one_or_fail(\n \"select * from users where email = :email\",\n User,\n {\"email\": \"john@example.com\"}\n)\n```\n\n### `select_all(sql: str, select: Type[M], data: QueryDict = {}) -> list[M]`\n\nExecute a SELECT query and return all matching rows as validated model instances.\n\n**Parameters:**\n- `sql` (str): SQL query starting with 'select * from'\n- `select` (Type[M]): Pydantic model class for result validation\n- `data` (QueryDict): Parameters for the query\n\n**Returns:** list[M] - List of model instances\n\n**Example:**\n```python\nactive_users = await db.select_all(\n \"select * from users where status = :status\",\n User,\n {\"status\": \"active\"}\n)\n```\n\n### `select_paged(sql: str, select: Type[M], data: QueryDict = {}, *, size: int, offset: int = 0) -> Paged[M]`\n\nExecute a paginated SELECT query with total count.\n\n**Parameters:**\n- `sql` (str): SQL query starting with 'select * from'\n- `select` (Type[M]): Pydantic model class for result validation\n- `data` (QueryDict): Parameters for the query\n- `size` (int): Number of items per page\n- `offset` (int): Number of items to skip\n\n**Returns:** Paged[M] - Paginated result with items and total count\n\n**Example:**\n```python\nresult = await db.select_paged(\n \"select * from users where status = :status\",\n User,\n {\"status\": \"active\"},\n size=10,\n offset=20\n)\nprint(f\"Total: {result.total}, Page items: {len(result.items)}\")\n```\n\n---\n\n## Raw SQL Execution\n\n### `execute_query(sql: str, data: Mapping[str, Any] = {}) -> list`\n\nExecute a parameterized SQL query and return all results.\n\n**Parameters:**\n- `sql` (str): SQL query with named parameter placeholders (:param_name)\n- `data` (Mapping[str, Any]): Dictionary mapping parameter names to values\n\n**Returns:** list - List of result rows (dictionaries)\n\n**Supported Parameter Types:**\n- str, int, float, bytes, datetime, date, time, timedelta, Decimal\n- Lists/tuples (for IN clauses)\n- None (converted to SQL NULL)\n\n**Examples:**\n```python\n# Simple query\nrows = await db.execute_query(\"SELECT * FROM users\")\n\n# Parameterized query\nrows = await db.execute_query(\n \"SELECT * FROM users WHERE age > :min_age AND status = :status\",\n {\"min_age\": 18, \"status\": \"active\"}\n)\n\n# IN clause with list\nrows = await db.execute_query(\n \"SELECT * FROM users WHERE id IN :user_ids\",\n {\"user_ids\": [1, 2, 3, 4]}\n)\n```\n\n### `execute_mutation(sql: str, data: Mapping[str, Any] = {}) -> int`\n\nExecute a parameterized SQL mutation (INSERT, UPDATE, DELETE) and return affected row count.\n\n**Parameters:**\n- `sql` (str): SQL mutation statement with named parameter placeholders (:param_name)\n- `data` (Mapping[str, Any]): Dictionary mapping parameter names to values\n\n**Returns:** int - Number of affected rows\n\n**Examples:**\n```python\n# INSERT\naffected = await db.execute_mutation(\n \"INSERT INTO users (name, email, age) VALUES (:name, :email, :age)\",\n {\"name\": \"John\", \"email\": \"john@example.com\", \"age\": 25}\n)\n\n# UPDATE\naffected = await db.execute_mutation(\n \"UPDATE users SET email = :email WHERE id = :id\",\n {\"email\": \"newemail@example.com\", \"id\": 123}\n)\n\n# DELETE\naffected = await db.execute_mutation(\n \"DELETE FROM users WHERE age < :min_age\",\n {\"min_age\": 18}\n)\n```\n\n---\n\n## Transaction Management\n\n### `commit() -> None`\n\nCommit the current transaction.\n\n**Example:**\n```python\nasync with connect(host='localhost', autocommit=False) as db:\n await db.insert(user)\n await db.commit() # Explicitly commit\n```\n\n### `rollback() -> None`\n\nRollback the current transaction.\n\n**Example:**\n```python\ntry:\n await db.insert(user)\n await db.insert(order)\n await db.commit()\nexcept Exception:\n await db.rollback()\n```\n\n### `lastrowid() -> int`\n\nGet the auto-generated ID of the last inserted row.\n\n**Returns:** int - The last inserted row ID\n\n**Example:**\n```python\nawait db.insert(user)\nuser_id = db.lastrowid()\nprint(f\"New user ID: {user_id}\")\n```\n\n---\n\n## Type Safety Utilities\n\n### `is_row_dict(data: Mapping) -> TypeGuard[RowDict]`\n\nCheck if a mapping is valid for database row operations (INSERT/UPDATE).\n\n**Parameters:**\n- `data` (Mapping): The mapping to check\n\n**Returns:** bool - True if valid for row operations\n\n**Valid Types:** str, int, float, bytes, datetime, date, time, timedelta, Decimal, None\n\n**Example:**\n```python\nfrom commondao import is_row_dict\n\ndata = {\"id\": 1, \"name\": \"John\", \"age\": 25}\nassert is_row_dict(data)\n# Now TypeScript knows data is a valid RowDict\n```\n\n### `is_query_dict(data: Mapping) -> TypeGuard[QueryDict]`\n\nCheck if a mapping is valid for query operations (WHERE clauses).\n\n**Parameters:**\n- `data` (Mapping): The mapping to check\n\n**Returns:** bool - True if valid for query operations\n\n**Valid Types:** All RowDict types plus lists and tuples (for IN clauses)\n\n**Example:**\n```python\nfrom commondao import is_query_dict\n\nquery_data = {\"id\": 1, \"status\": \"active\", \"tags\": [\"admin\", \"user\"]}\nassert is_query_dict(query_data)\n# Now TypeScript knows query_data is a valid QueryDict\n```\n\n---\n\n## Annotations\n\n### `TableId(table_name: str)`\n\nAnnotation to mark a field as the primary key and specify the table name.\n\n**Parameters:**\n- `table_name` (str): Name of the database table\n\n**Usage:**\n```python\nfrom commondao.annotation import TableId\nfrom typing import Annotated, Optional\nfrom pydantic import BaseModel\n\nclass User(BaseModel):\n id: Annotated[Optional[int], TableId('users')] = None\n name: str\n email: str\n```\n\n### `RawSql(expression: str)`\n\nAnnotation to include raw SQL expressions in SELECT queries.\n\n**Parameters:**\n- `expression` (str): SQL expression to include\n\n**Usage:**\n```python\nfrom commondao.annotation import RawSql\nfrom typing import Annotated\n\nclass UserWithFullName(BaseModel):\n id: int\n first_name: str\n last_name: str\n full_name: Annotated[str, RawSql(\"CONCAT(first_name, ' ', last_name)\")]\n```\n\n---\n\n## Error Classes\n\n### `NotFoundError(ValueError)`\n\nRaised when a record is not found in operations that expect a result.\n\n**Example:**\n```python\ntry:\n user = await db.get_by_id_or_fail(User, 999)\nexcept NotFoundError as e:\n print(f\"User not found: {e}\")\n```\n\n### `EmptyPrimaryKeyError(ValueError)`\n\nRaised when attempting operations with empty or None primary key values.\n\n**Example:**\n```python\ntry:\n user = UserUpdate(id=None, name=\"John\")\n await db.update_by_id(user)\nexcept EmptyPrimaryKeyError as e:\n print(f\"Primary key error: {e}\")\n```\n\n### `NotTableError(ValueError)`\n\nRaised when a Pydantic model doesn't have proper table annotation.\n\n### `MissingParamError(ValueError)`\n\nRaised when required parameters are missing from SQL queries.\n\n### `TooManyResultError(ValueError)`\n\nRaised when operations expecting single results return multiple rows.\n\n---\n\n## Data Types\n\n### `RowDict`\n\nType alias for mappings valid in database row operations.\n\n```python\nRowDict = Mapping[str, Union[str, int, float, bytes, datetime, date, time, timedelta, Decimal, None]]\n```\n\n### `QueryDict`\n\nType alias for mappings valid in query operations (extends RowDict with list/tuple support).\n\n```python\nQueryDict = Mapping[str, Union[RowValueType, list, tuple]]\n```\n\n### `Paged[T]`\n\nGeneric container for paginated query results.\n\n**Attributes:**\n- `items` (list[T]): List of result items\n- `total` (int): Total count of all matching records\n\n**Example:**\n```python\nresult: Paged[User] = await db.select_paged(\n \"select * from users\",\n User,\n size=10\n)\nprint(f\"Page has {len(result.items)} items out of {result.total} total\")\n```\n\n---\n\n## Type Safety with TypeGuard\n\nCommonDAO provides TypeGuard functions `is_row_dict()` and `is_query_dict()` for runtime type checking. **Important**: These functions should only be used with `assert` statements for proper type narrowing.\n\n### Correct Usage Pattern\n\n```python\nfrom commondao import is_row_dict, is_query_dict\n\n# \u2705 Correct: Use with assert\ndef process_database_row(data: dict):\n assert is_row_dict(data)\n # Type checker now knows data is RowDict\n # Safe to use for database operations\n user = UserInsert(**data)\n await db.insert(user)\n\ndef process_query_parameters(params: dict):\n assert is_query_dict(params)\n # Type checker now knows params is QueryDict\n # Safe to use in queries\n result = await db.execute_query(\n \"SELECT * FROM users WHERE status = :status\",\n params\n )\n\n# \u2705 Correct: In test validation\nasync def test_query_result():\n rows = await db.execute_query(\"SELECT * FROM users\")\n assert len(rows) > 0\n row = rows[0]\n assert is_row_dict(row) # Validates row format\n # Now safely access row data\n user_id = row['id']\n```\n\n### What NOT to do\n\n```python\n# \u274c Wrong: Don't use in if statements\nif is_row_dict(data):\n # Type narrowing won't work properly\n pass\n\n# \u274c Wrong: Don't use in boolean expressions\nvalid = is_row_dict(data) and data['id'] > 0\n\n# \u274c Wrong: Don't use with or/and logic\nassert is_row_dict(data) or True # Defeats the purpose\n```\n\n### Why Use Assert\n\nTypeGuard functions with `assert` provide both runtime validation and compile-time type narrowing:\n\n```python\ndef example_function(unknown_data: dict):\n # Before assert: unknown_data is just dict\n assert is_row_dict(unknown_data)\n # After assert: type checker knows unknown_data is RowDict\n\n # This will have proper type hints and validation\n await db.execute_mutation(\n \"INSERT INTO users (name, email) VALUES (:name, :email)\",\n unknown_data # Type checker knows this is safe\n )\n```\n\n## Best Practices\n\n### Entity Class Naming Conventions\n\nWhen designing Pydantic models for database operations, follow these naming conventions to improve code clarity and maintainability:\n\n- **Naming Convention**: Name entity classes used for `dao.insert()` with an `Insert` suffix; name entity classes used for `dao.update()` with an `Update` suffix; keep query entity class names unchanged.\n\n- **Field Optionality Rules**:\n - For query entity classes: If a field's DDL is `NOT NULL`, the field should not be optional\n - For insert entity classes: If a field's DDL is `nullable` or has a default value, the field can be optional\n - For update entity classes: All fields should be optional\n\n- **Field Inclusion**:\n - Insert entity classes should only include fields that may need to be inserted\n - Update entity classes should only include fields that may need to be modified\n\n- **TableId Annotation**: Entity classes used for insert/update operations must always include the `TableId` annotation, even if the primary key field is optional\n",
"bugtrack_url": null,
"license": "Apache-2.0",
"summary": "Mysql toolkit",
"version": "1.3.1",
"project_urls": {
"homepage": "https://github.com/lessweb/commondao"
},
"split_keywords": [
"mysql"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "540d4657516166cbe4711810c62989cc458efdb8e6bcf14ce41827a58b415935",
"md5": "7c5fac92d48607f6c4a2df298c2dc407",
"sha256": "4bd4955e586587504b53c55016c4380a60fc6d2df61cc2daf11c5e8a829fbe57"
},
"downloads": -1,
"filename": "commondao-1.3.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "7c5fac92d48607f6c4a2df298c2dc407",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.10",
"size": 22571,
"upload_time": "2025-10-27T10:17:04",
"upload_time_iso_8601": "2025-10-27T10:17:04.901254Z",
"url": "https://files.pythonhosted.org/packages/54/0d/4657516166cbe4711810c62989cc458efdb8e6bcf14ce41827a58b415935/commondao-1.3.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "4ada50d0bd6e5b33efde87cd77ee8414537debe079563da10c2a73a982389cbb",
"md5": "1091684351b72546df6c02aa05d2d82d",
"sha256": "5e60404a6807cd82c3b047310d6e53847564a35961e7387aac6f7525e928a64f"
},
"downloads": -1,
"filename": "commondao-1.3.1.tar.gz",
"has_sig": false,
"md5_digest": "1091684351b72546df6c02aa05d2d82d",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.10",
"size": 43552,
"upload_time": "2025-10-27T10:17:06",
"upload_time_iso_8601": "2025-10-27T10:17:06.294478Z",
"url": "https://files.pythonhosted.org/packages/4a/da/50d0bd6e5b33efde87cd77ee8414537debe079563da10c2a73a982389cbb/commondao-1.3.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-10-27 10:17:06",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "lessweb",
"github_project": "commondao",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "commondao"
}