# Records2 🚀
**The Modern Async Python Database Toolkit with Pydantic Integration**
_A complete reimagining of database interaction for modern Python applications_
---
## Why Records2?
Records2 is a **fully async**, **Pydantic-integrated** mini-ORM that makes database operations simple, type-safe, and developer-friendly. Built from the ground up for modern Python applications.
### ✨ Key Features
- 🔥 **100% Async/Await** - No sync fallbacks, fully async architecture
- 🛡️ **Type Safety** - Full Pydantic v2 integration with automatic validation
- ⚡ **Developer Friendly** - Intuitive API that just works
- 🔄 **Transaction Support** - ACID compliance with context managers
- 🎯 **Zero Configuration** - Works out of the box with any SQL database
- 📊 **Production Ready** - Connection pooling and error handling built-in
---
## Quick Start
### Installation
```bash
pip install records2 pydantic aiosqlite # For SQLite
# or
pip install records2 pydantic asyncpg # For PostgreSQL
```
### Your First Query
```python
import asyncio
from records2 import Database
async def main():
# Connect to database
db = Database("sqlite+aiosqlite:///example.db")
# Execute a simple query
conn = await db.connect()
result = await conn.query("SELECT 'Hello, Records2!' as message")
row = result.fetchone()
print(row[0]) # Hello, Records2!
await conn.close()
await db.close()
asyncio.run(main())
```
---
## Pydantic Integration
Define your data models with Pydantic for automatic validation and serialization:
```python
from typing import Optional
from datetime import datetime
from pydantic import Field, EmailStr
from records2 import Database, BaseRecord
class User(BaseRecord):
"""User model with validation."""
id: Optional[int] = Field(None, description="User ID")
name: str = Field(..., min_length=1, max_length=100)
email: EmailStr = Field(..., description="Valid email address")
age: Optional[int] = Field(None, ge=0, le=150)
is_active: bool = Field(True, description="Account status")
created_at: Optional[datetime] = Field(None)
async def user_example():
db = Database("sqlite+aiosqlite:///users.db")
# Setup table
async with db.transaction() as conn:
await conn.query("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
age INTEGER,
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Insert with validation
async with db.transaction() as conn:
user = User(
name="Alice Johnson",
email="alice@example.com",
age=28
)
await conn.query("""
INSERT INTO users (name, email, age, is_active)
VALUES (:name, :email, :age, :is_active)
""", **user.model_dump(exclude={'id', 'created_at'}))
# Query with automatic model creation
conn = await db.connect()
try:
users = await conn.fetch_all("SELECT * FROM users", model=User)
for user in users:
print(f"User: {user.name} ({user.email})")
print(f"Valid: {user.model_dump()}") # Automatic serialization
finally:
await conn.close()
await db.close()
```
---
## Database Operations
### Basic Queries
```python
async def basic_queries():
db = Database("sqlite+aiosqlite:///example.db")
conn = await db.connect()
try:
# Simple query
result = await conn.query("SELECT COUNT(*) FROM users")
count = result.fetchone()[0]
print(f"Total users: {count}")
# Parameterized query (safe from SQL injection)
result = await conn.query(
"SELECT * FROM users WHERE age > :min_age",
min_age=25
)
rows = result.fetchall()
# Fetch with Pydantic model
users = await conn.fetch_all(
"SELECT * FROM users WHERE is_active = :active",
model=User,
active=True
)
# Fetch single record
user = await conn.fetch_one(
"SELECT * FROM users WHERE email = :email",
model=User,
email="alice@example.com"
)
finally:
await conn.close()
await db.close()
```
### Transactions
```python
async def transaction_example():
db = Database("sqlite+aiosqlite:///example.db")
# Automatic transaction management
async with db.transaction() as conn:
# All operations in this block are part of one transaction
await conn.query(
"INSERT INTO users (name, email) VALUES (:name, :email)",
name="Bob Smith",
email="bob@example.com"
)
await conn.query(
"UPDATE users SET is_active = :active WHERE email = :email",
active=True,
email="bob@example.com"
)
# If any operation fails, entire transaction is rolled back
# If block completes successfully, transaction is committed
await db.close()
```
### Error Handling with Rollback
```python
async def error_handling_example():
db = Database("sqlite+aiosqlite:///example.db")
try:
async with db.transaction() as conn:
# This will succeed
await conn.query(
"INSERT INTO users (name, email) VALUES (:name, :email)",
name="Charlie Brown",
email="charlie@example.com"
)
# This will fail (duplicate email) and rollback entire transaction
await conn.query(
"INSERT INTO users (name, email) VALUES (:name, :email)",
name="Charlie Duplicate",
email="charlie@example.com" # Same email - will fail
)
except Exception as e:
print(f"Transaction failed and was rolled back: {e}")
# Charlie Brown was NOT inserted due to rollback
await db.close()
```
---
## Advanced Features
### Multiple Models
```python
class Product(BaseRecord):
"""Product model."""
id: Optional[int] = None
name: str = Field(..., min_length=1)
price: float = Field(..., gt=0)
category: str
in_stock: bool = True
class Order(BaseRecord):
"""Order model."""
id: Optional[int] = None
user_id: int
product_id: int
quantity: int = Field(..., gt=0)
total_price: float = Field(..., gt=0)
order_date: Optional[datetime] = None
async def multi_model_example():
db = Database("sqlite+aiosqlite:///shop.db")
# Setup tables
async with db.transaction() as conn:
await conn.query("""
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
price REAL NOT NULL,
category TEXT NOT NULL,
in_stock BOOLEAN DEFAULT TRUE
)
""")
await conn.query("""
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
product_id INTEGER NOT NULL,
quantity INTEGER NOT NULL,
total_price REAL NOT NULL,
order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Insert products
async with db.transaction() as conn:
products = [
Product(name="Laptop", price=999.99, category="Electronics"),
Product(name="Mouse", price=29.99, category="Electronics"),
Product(name="Book", price=19.99, category="Books"),
]
for product in products:
await conn.query("""
INSERT INTO products (name, price, category, in_stock)
VALUES (:name, :price, :category, :in_stock)
""", **product.model_dump(exclude={'id'}))
# Query different models
conn = await db.connect()
try:
# Get all products
products = await conn.fetch_all("SELECT * FROM products", model=Product)
print(f"Found {len(products)} products")
# Get electronics only
electronics = await conn.fetch_all(
"SELECT * FROM products WHERE category = :category",
model=Product,
category="Electronics"
)
for product in electronics:
print(f"- {product.name}: ${product.price}")
finally:
await conn.close()
await db.close()
```
### Bulk Operations
```python
async def bulk_operations():
db = Database("sqlite+aiosqlite:///bulk.db")
# Setup
async with db.transaction() as conn:
await conn.query("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT NOT NULL,
age INTEGER
)
""")
# Bulk insert
async with db.transaction() as conn:
users_data = [
{"name": f"User{i}", "email": f"user{i}@example.com", "age": 20 + (i % 50)}
for i in range(1000)
]
for user_data in users_data:
# Validate with Pydantic
user = User(**user_data)
await conn.query("""
INSERT INTO users (name, email, age)
VALUES (:name, :email, :age)
""", **user.model_dump(exclude={'id', 'is_active', 'created_at'}))
# Bulk query
conn = await db.connect()
try:
# Get all users (automatically converted to User models)
users = await conn.fetch_all("SELECT * FROM users ORDER BY id", model=User)
print(f"Inserted and retrieved {len(users)} users")
# Filter young users
young_users = await conn.fetch_all(
"SELECT * FROM users WHERE age < :max_age",
model=User,
max_age=30
)
print(f"Found {len(young_users)} young users")
finally:
await conn.close()
await db.close()
```
---
## Database Support
Records2 works with any SQLAlchemy-supported database:
### SQLite (Development)
```python
db = Database("sqlite+aiosqlite:///app.db")
# or in-memory
db = Database("sqlite+aiosqlite:///:memory:")
```
### PostgreSQL (Production)
```python
db = Database("postgresql+asyncpg://user:password@localhost/dbname")
```
### MySQL
```python
db = Database("mysql+aiomysql://user:password@localhost/dbname")
```
---
## Real-World Example: Blog API
```python
from typing import List, Optional
from datetime import datetime
from pydantic import Field
from records2 import Database, BaseRecord
class Author(BaseRecord):
id: Optional[int] = None
name: str = Field(..., min_length=1, max_length=100)
email: str = Field(..., regex=r'^[\w\.-]+@[\w\.-]+\.\w+$')
bio: Optional[str] = None
created_at: Optional[datetime] = None
class Post(BaseRecord):
id: Optional[int] = None
title: str = Field(..., min_length=1, max_length=200)
content: str = Field(..., min_length=1)
author_id: int
published: bool = Field(False)
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class BlogService:
def __init__(self, database_url: str):
self.db = Database(database_url)
async def setup_database(self):
"""Initialize database schema."""
async with self.db.transaction() as conn:
await conn.query("""
CREATE TABLE IF NOT EXISTS authors (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
bio TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
await conn.query("""
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
content TEXT NOT NULL,
author_id INTEGER NOT NULL,
published BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (author_id) REFERENCES authors (id)
)
""")
async def create_author(self, author: Author) -> Author:
"""Create a new author."""
async with self.db.transaction() as conn:
result = await conn.query("""
INSERT INTO authors (name, email, bio)
VALUES (:name, :email, :bio)
RETURNING id
""", **author.model_dump(exclude={'id', 'created_at'}))
author_id = result.fetchone()[0]
return await self.get_author(author_id)
async def get_author(self, author_id: int) -> Optional[Author]:
"""Get author by ID."""
conn = await self.db.connect()
try:
return await conn.fetch_one(
"SELECT * FROM authors WHERE id = :id",
model=Author,
id=author_id
)
finally:
await conn.close()
async def create_post(self, post: Post) -> Post:
"""Create a new blog post."""
async with self.db.transaction() as conn:
result = await conn.query("""
INSERT INTO posts (title, content, author_id, published)
VALUES (:title, :content, :author_id, :published)
RETURNING id
""", **post.model_dump(exclude={'id', 'created_at', 'updated_at'}))
post_id = result.fetchone()[0]
return await self.get_post(post_id)
async def get_post(self, post_id: int) -> Optional[Post]:
"""Get post by ID."""
conn = await self.db.connect()
try:
return await conn.fetch_one(
"SELECT * FROM posts WHERE id = :id",
model=Post,
id=post_id
)
finally:
await conn.close()
async def get_published_posts(self) -> List[Post]:
"""Get all published posts."""
conn = await self.db.connect()
try:
return await conn.fetch_all(
"SELECT * FROM posts WHERE published = TRUE ORDER BY created_at DESC",
model=Post
)
finally:
await conn.close()
async def get_posts_by_author(self, author_id: int) -> List[Post]:
"""Get all posts by an author."""
conn = await self.db.connect()
try:
return await conn.fetch_all(
"SELECT * FROM posts WHERE author_id = :author_id ORDER BY created_at DESC",
model=Post,
author_id=author_id
)
finally:
await conn.close()
async def close(self):
"""Close database connection."""
await self.db.close()
# Usage example
async def blog_example():
blog = BlogService("sqlite+aiosqlite:///blog.db")
try:
# Setup database
await blog.setup_database()
# Create an author
author = Author(
name="Jane Doe",
email="jane@example.com",
bio="Tech writer and Python enthusiast"
)
created_author = await blog.create_author(author)
print(f"Created author: {created_author.name} (ID: {created_author.id})")
# Create posts
post1 = Post(
title="Getting Started with Records2",
content="Records2 makes database operations simple and type-safe...",
author_id=created_author.id,
published=True
)
post2 = Post(
title="Advanced Records2 Patterns",
content="Learn advanced patterns for using Records2 in production...",
author_id=created_author.id,
published=False # Draft
)
created_post1 = await blog.create_post(post1)
created_post2 = await blog.create_post(post2)
print(f"Created posts: {created_post1.title}, {created_post2.title}")
# Query posts
published_posts = await blog.get_published_posts()
print(f"Published posts: {len(published_posts)}")
author_posts = await blog.get_posts_by_author(created_author.id)
print(f"Author's total posts: {len(author_posts)}")
# Display published posts
for post in published_posts:
print(f"📝 {post.title}")
print(f" By author ID: {post.author_id}")
print(f" Published: {post.published}")
print()
finally:
await blog.close()
# Run the example
if __name__ == "__main__":
import asyncio
asyncio.run(blog_example())
```
---
## Testing
Records2 includes comprehensive test coverage. Run tests with:
```bash
# Install test dependencies
pip install pytest pytest-asyncio
# Run tests
pytest
# Run with coverage
pytest --cov=records2
```
---
## Migration from Original Records
If you're migrating from the original Records library:
### Before (Original Records)
```python
import records
db = records.Database('sqlite:///example.db')
rows = db.query('SELECT * FROM users')
for row in rows:
print(row.name)
```
### After (Records2)
```python
import asyncio
from records2 import Database, BaseRecord
class User(BaseRecord):
id: int
name: str
async def main():
db = Database('sqlite+aiosqlite:///example.db')
conn = await db.connect()
try:
users = await conn.fetch_all('SELECT * FROM users', model=User)
for user in users:
print(user.name) # Type-safe access
finally:
await conn.close()
await db.close()
asyncio.run(main())
```
---
## Contributing
We welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md) for details.
## License
Records2 is released under the MIT License. See [LICENSE](LICENSE) for details.
---
**Records2** - Making database operations simple, safe, and modern. 🚀
Raw data
{
"_id": null,
"home_page": null,
"name": "records2",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": "async, database, nexios, python, records, sqlite, type-safe",
"author": null,
"author_email": "Chidebele Dunamis <techwithdunamix@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/d1/c0/1dbedc050dd4ea15cac7e7cc0f416553f8a5b46e8b1d8170104e32442b0c/records2-1.0.0.tar.gz",
"platform": null,
"description": "# Records2 \ud83d\ude80\n\n**The Modern Async Python Database Toolkit with Pydantic Integration**\n\n_A complete reimagining of database interaction for modern Python applications_\n\n---\n\n## Why Records2?\n\nRecords2 is a **fully async**, **Pydantic-integrated** mini-ORM that makes database operations simple, type-safe, and developer-friendly. Built from the ground up for modern Python applications.\n\n### \u2728 Key Features\n\n- \ud83d\udd25 **100% Async/Await** - No sync fallbacks, fully async architecture\n- \ud83d\udee1\ufe0f **Type Safety** - Full Pydantic v2 integration with automatic validation\n- \u26a1 **Developer Friendly** - Intuitive API that just works\n- \ud83d\udd04 **Transaction Support** - ACID compliance with context managers\n- \ud83c\udfaf **Zero Configuration** - Works out of the box with any SQL database\n- \ud83d\udcca **Production Ready** - Connection pooling and error handling built-in\n\n---\n\n## Quick Start\n\n### Installation\n\n```bash\npip install records2 pydantic aiosqlite # For SQLite\n# or\npip install records2 pydantic asyncpg # For PostgreSQL\n```\n\n### Your First Query\n\n```python\nimport asyncio\nfrom records2 import Database\n\nasync def main():\n # Connect to database\n db = Database(\"sqlite+aiosqlite:///example.db\")\n\n # Execute a simple query\n conn = await db.connect()\n result = await conn.query(\"SELECT 'Hello, Records2!' as message\")\n row = result.fetchone()\n print(row[0]) # Hello, Records2!\n\n await conn.close()\n await db.close()\n\nasyncio.run(main())\n```\n\n---\n\n## Pydantic Integration\n\nDefine your data models with Pydantic for automatic validation and serialization:\n\n```python\nfrom typing import Optional\nfrom datetime import datetime\nfrom pydantic import Field, EmailStr\nfrom records2 import Database, BaseRecord\n\nclass User(BaseRecord):\n \"\"\"User model with validation.\"\"\"\n id: Optional[int] = Field(None, description=\"User ID\")\n name: str = Field(..., min_length=1, max_length=100)\n email: EmailStr = Field(..., description=\"Valid email address\")\n age: Optional[int] = Field(None, ge=0, le=150)\n is_active: bool = Field(True, description=\"Account status\")\n created_at: Optional[datetime] = Field(None)\n\nasync def user_example():\n db = Database(\"sqlite+aiosqlite:///users.db\")\n\n # Setup table\n async with db.transaction() as conn:\n await conn.query(\"\"\"\n CREATE TABLE IF NOT EXISTS users (\n id INTEGER PRIMARY KEY AUTOINCREMENT,\n name TEXT NOT NULL,\n email TEXT UNIQUE NOT NULL,\n age INTEGER,\n is_active BOOLEAN DEFAULT TRUE,\n created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n )\n \"\"\")\n\n # Insert with validation\n async with db.transaction() as conn:\n user = User(\n name=\"Alice Johnson\",\n email=\"alice@example.com\",\n age=28\n )\n await conn.query(\"\"\"\n INSERT INTO users (name, email, age, is_active)\n VALUES (:name, :email, :age, :is_active)\n \"\"\", **user.model_dump(exclude={'id', 'created_at'}))\n\n # Query with automatic model creation\n conn = await db.connect()\n try:\n users = await conn.fetch_all(\"SELECT * FROM users\", model=User)\n for user in users:\n print(f\"User: {user.name} ({user.email})\")\n print(f\"Valid: {user.model_dump()}\") # Automatic serialization\n finally:\n await conn.close()\n await db.close()\n```\n\n---\n\n## Database Operations\n\n### Basic Queries\n\n```python\nasync def basic_queries():\n db = Database(\"sqlite+aiosqlite:///example.db\")\n conn = await db.connect()\n\n try:\n # Simple query\n result = await conn.query(\"SELECT COUNT(*) FROM users\")\n count = result.fetchone()[0]\n print(f\"Total users: {count}\")\n\n # Parameterized query (safe from SQL injection)\n result = await conn.query(\n \"SELECT * FROM users WHERE age > :min_age\",\n min_age=25\n )\n rows = result.fetchall()\n\n # Fetch with Pydantic model\n users = await conn.fetch_all(\n \"SELECT * FROM users WHERE is_active = :active\",\n model=User,\n active=True\n )\n\n # Fetch single record\n user = await conn.fetch_one(\n \"SELECT * FROM users WHERE email = :email\",\n model=User,\n email=\"alice@example.com\"\n )\n\n finally:\n await conn.close()\n await db.close()\n```\n\n### Transactions\n\n```python\nasync def transaction_example():\n db = Database(\"sqlite+aiosqlite:///example.db\")\n\n # Automatic transaction management\n async with db.transaction() as conn:\n # All operations in this block are part of one transaction\n await conn.query(\n \"INSERT INTO users (name, email) VALUES (:name, :email)\",\n name=\"Bob Smith\",\n email=\"bob@example.com\"\n )\n\n await conn.query(\n \"UPDATE users SET is_active = :active WHERE email = :email\",\n active=True,\n email=\"bob@example.com\"\n )\n\n # If any operation fails, entire transaction is rolled back\n # If block completes successfully, transaction is committed\n\n await db.close()\n```\n\n### Error Handling with Rollback\n\n```python\nasync def error_handling_example():\n db = Database(\"sqlite+aiosqlite:///example.db\")\n\n try:\n async with db.transaction() as conn:\n # This will succeed\n await conn.query(\n \"INSERT INTO users (name, email) VALUES (:name, :email)\",\n name=\"Charlie Brown\",\n email=\"charlie@example.com\"\n )\n\n # This will fail (duplicate email) and rollback entire transaction\n await conn.query(\n \"INSERT INTO users (name, email) VALUES (:name, :email)\",\n name=\"Charlie Duplicate\",\n email=\"charlie@example.com\" # Same email - will fail\n )\n\n except Exception as e:\n print(f\"Transaction failed and was rolled back: {e}\")\n # Charlie Brown was NOT inserted due to rollback\n\n await db.close()\n```\n\n---\n\n## Advanced Features\n\n### Multiple Models\n\n```python\nclass Product(BaseRecord):\n \"\"\"Product model.\"\"\"\n id: Optional[int] = None\n name: str = Field(..., min_length=1)\n price: float = Field(..., gt=0)\n category: str\n in_stock: bool = True\n\nclass Order(BaseRecord):\n \"\"\"Order model.\"\"\"\n id: Optional[int] = None\n user_id: int\n product_id: int\n quantity: int = Field(..., gt=0)\n total_price: float = Field(..., gt=0)\n order_date: Optional[datetime] = None\n\nasync def multi_model_example():\n db = Database(\"sqlite+aiosqlite:///shop.db\")\n\n # Setup tables\n async with db.transaction() as conn:\n await conn.query(\"\"\"\n CREATE TABLE IF NOT EXISTS products (\n id INTEGER PRIMARY KEY AUTOINCREMENT,\n name TEXT NOT NULL,\n price REAL NOT NULL,\n category TEXT NOT NULL,\n in_stock BOOLEAN DEFAULT TRUE\n )\n \"\"\")\n\n await conn.query(\"\"\"\n CREATE TABLE IF NOT EXISTS orders (\n id INTEGER PRIMARY KEY AUTOINCREMENT,\n user_id INTEGER NOT NULL,\n product_id INTEGER NOT NULL,\n quantity INTEGER NOT NULL,\n total_price REAL NOT NULL,\n order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n )\n \"\"\")\n\n # Insert products\n async with db.transaction() as conn:\n products = [\n Product(name=\"Laptop\", price=999.99, category=\"Electronics\"),\n Product(name=\"Mouse\", price=29.99, category=\"Electronics\"),\n Product(name=\"Book\", price=19.99, category=\"Books\"),\n ]\n\n for product in products:\n await conn.query(\"\"\"\n INSERT INTO products (name, price, category, in_stock)\n VALUES (:name, :price, :category, :in_stock)\n \"\"\", **product.model_dump(exclude={'id'}))\n\n # Query different models\n conn = await db.connect()\n try:\n # Get all products\n products = await conn.fetch_all(\"SELECT * FROM products\", model=Product)\n print(f\"Found {len(products)} products\")\n\n # Get electronics only\n electronics = await conn.fetch_all(\n \"SELECT * FROM products WHERE category = :category\",\n model=Product,\n category=\"Electronics\"\n )\n\n for product in electronics:\n print(f\"- {product.name}: ${product.price}\")\n\n finally:\n await conn.close()\n await db.close()\n```\n\n### Bulk Operations\n\n```python\nasync def bulk_operations():\n db = Database(\"sqlite+aiosqlite:///bulk.db\")\n\n # Setup\n async with db.transaction() as conn:\n await conn.query(\"\"\"\n CREATE TABLE IF NOT EXISTS users (\n id INTEGER PRIMARY KEY AUTOINCREMENT,\n name TEXT NOT NULL,\n email TEXT NOT NULL,\n age INTEGER\n )\n \"\"\")\n\n # Bulk insert\n async with db.transaction() as conn:\n users_data = [\n {\"name\": f\"User{i}\", \"email\": f\"user{i}@example.com\", \"age\": 20 + (i % 50)}\n for i in range(1000)\n ]\n\n for user_data in users_data:\n # Validate with Pydantic\n user = User(**user_data)\n await conn.query(\"\"\"\n INSERT INTO users (name, email, age)\n VALUES (:name, :email, :age)\n \"\"\", **user.model_dump(exclude={'id', 'is_active', 'created_at'}))\n\n # Bulk query\n conn = await db.connect()\n try:\n # Get all users (automatically converted to User models)\n users = await conn.fetch_all(\"SELECT * FROM users ORDER BY id\", model=User)\n print(f\"Inserted and retrieved {len(users)} users\")\n\n # Filter young users\n young_users = await conn.fetch_all(\n \"SELECT * FROM users WHERE age < :max_age\",\n model=User,\n max_age=30\n )\n print(f\"Found {len(young_users)} young users\")\n\n finally:\n await conn.close()\n await db.close()\n```\n\n---\n\n## Database Support\n\nRecords2 works with any SQLAlchemy-supported database:\n\n### SQLite (Development)\n\n```python\ndb = Database(\"sqlite+aiosqlite:///app.db\")\n# or in-memory\ndb = Database(\"sqlite+aiosqlite:///:memory:\")\n```\n\n### PostgreSQL (Production)\n\n```python\ndb = Database(\"postgresql+asyncpg://user:password@localhost/dbname\")\n```\n\n### MySQL\n\n```python\ndb = Database(\"mysql+aiomysql://user:password@localhost/dbname\")\n```\n\n---\n\n## Real-World Example: Blog API\n\n```python\nfrom typing import List, Optional\nfrom datetime import datetime\nfrom pydantic import Field\nfrom records2 import Database, BaseRecord\n\nclass Author(BaseRecord):\n id: Optional[int] = None\n name: str = Field(..., min_length=1, max_length=100)\n email: str = Field(..., regex=r'^[\\w\\.-]+@[\\w\\.-]+\\.\\w+$')\n bio: Optional[str] = None\n created_at: Optional[datetime] = None\n\nclass Post(BaseRecord):\n id: Optional[int] = None\n title: str = Field(..., min_length=1, max_length=200)\n content: str = Field(..., min_length=1)\n author_id: int\n published: bool = Field(False)\n created_at: Optional[datetime] = None\n updated_at: Optional[datetime] = None\n\nclass BlogService:\n def __init__(self, database_url: str):\n self.db = Database(database_url)\n\n async def setup_database(self):\n \"\"\"Initialize database schema.\"\"\"\n async with self.db.transaction() as conn:\n await conn.query(\"\"\"\n CREATE TABLE IF NOT EXISTS authors (\n id INTEGER PRIMARY KEY AUTOINCREMENT,\n name TEXT NOT NULL,\n email TEXT UNIQUE NOT NULL,\n bio TEXT,\n created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n )\n \"\"\")\n\n await conn.query(\"\"\"\n CREATE TABLE IF NOT EXISTS posts (\n id INTEGER PRIMARY KEY AUTOINCREMENT,\n title TEXT NOT NULL,\n content TEXT NOT NULL,\n author_id INTEGER NOT NULL,\n published BOOLEAN DEFAULT FALSE,\n created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,\n updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,\n FOREIGN KEY (author_id) REFERENCES authors (id)\n )\n \"\"\")\n\n async def create_author(self, author: Author) -> Author:\n \"\"\"Create a new author.\"\"\"\n async with self.db.transaction() as conn:\n result = await conn.query(\"\"\"\n INSERT INTO authors (name, email, bio)\n VALUES (:name, :email, :bio)\n RETURNING id\n \"\"\", **author.model_dump(exclude={'id', 'created_at'}))\n\n author_id = result.fetchone()[0]\n return await self.get_author(author_id)\n\n async def get_author(self, author_id: int) -> Optional[Author]:\n \"\"\"Get author by ID.\"\"\"\n conn = await self.db.connect()\n try:\n return await conn.fetch_one(\n \"SELECT * FROM authors WHERE id = :id\",\n model=Author,\n id=author_id\n )\n finally:\n await conn.close()\n\n async def create_post(self, post: Post) -> Post:\n \"\"\"Create a new blog post.\"\"\"\n async with self.db.transaction() as conn:\n result = await conn.query(\"\"\"\n INSERT INTO posts (title, content, author_id, published)\n VALUES (:title, :content, :author_id, :published)\n RETURNING id\n \"\"\", **post.model_dump(exclude={'id', 'created_at', 'updated_at'}))\n\n post_id = result.fetchone()[0]\n return await self.get_post(post_id)\n\n async def get_post(self, post_id: int) -> Optional[Post]:\n \"\"\"Get post by ID.\"\"\"\n conn = await self.db.connect()\n try:\n return await conn.fetch_one(\n \"SELECT * FROM posts WHERE id = :id\",\n model=Post,\n id=post_id\n )\n finally:\n await conn.close()\n\n async def get_published_posts(self) -> List[Post]:\n \"\"\"Get all published posts.\"\"\"\n conn = await self.db.connect()\n try:\n return await conn.fetch_all(\n \"SELECT * FROM posts WHERE published = TRUE ORDER BY created_at DESC\",\n model=Post\n )\n finally:\n await conn.close()\n\n async def get_posts_by_author(self, author_id: int) -> List[Post]:\n \"\"\"Get all posts by an author.\"\"\"\n conn = await self.db.connect()\n try:\n return await conn.fetch_all(\n \"SELECT * FROM posts WHERE author_id = :author_id ORDER BY created_at DESC\",\n model=Post,\n author_id=author_id\n )\n finally:\n await conn.close()\n\n async def close(self):\n \"\"\"Close database connection.\"\"\"\n await self.db.close()\n\n# Usage example\nasync def blog_example():\n blog = BlogService(\"sqlite+aiosqlite:///blog.db\")\n\n try:\n # Setup database\n await blog.setup_database()\n\n # Create an author\n author = Author(\n name=\"Jane Doe\",\n email=\"jane@example.com\",\n bio=\"Tech writer and Python enthusiast\"\n )\n created_author = await blog.create_author(author)\n print(f\"Created author: {created_author.name} (ID: {created_author.id})\")\n\n # Create posts\n post1 = Post(\n title=\"Getting Started with Records2\",\n content=\"Records2 makes database operations simple and type-safe...\",\n author_id=created_author.id,\n published=True\n )\n\n post2 = Post(\n title=\"Advanced Records2 Patterns\",\n content=\"Learn advanced patterns for using Records2 in production...\",\n author_id=created_author.id,\n published=False # Draft\n )\n\n created_post1 = await blog.create_post(post1)\n created_post2 = await blog.create_post(post2)\n\n print(f\"Created posts: {created_post1.title}, {created_post2.title}\")\n\n # Query posts\n published_posts = await blog.get_published_posts()\n print(f\"Published posts: {len(published_posts)}\")\n\n author_posts = await blog.get_posts_by_author(created_author.id)\n print(f\"Author's total posts: {len(author_posts)}\")\n\n # Display published posts\n for post in published_posts:\n print(f\"\ud83d\udcdd {post.title}\")\n print(f\" By author ID: {post.author_id}\")\n print(f\" Published: {post.published}\")\n print()\n\n finally:\n await blog.close()\n\n# Run the example\nif __name__ == \"__main__\":\n import asyncio\n asyncio.run(blog_example())\n```\n\n---\n\n## Testing\n\nRecords2 includes comprehensive test coverage. Run tests with:\n\n```bash\n# Install test dependencies\npip install pytest pytest-asyncio\n\n# Run tests\npytest\n\n# Run with coverage\npytest --cov=records2\n```\n\n---\n\n## Migration from Original Records\n\nIf you're migrating from the original Records library:\n\n### Before (Original Records)\n\n```python\nimport records\n\ndb = records.Database('sqlite:///example.db')\nrows = db.query('SELECT * FROM users')\nfor row in rows:\n print(row.name)\n```\n\n### After (Records2)\n\n```python\nimport asyncio\nfrom records2 import Database, BaseRecord\n\nclass User(BaseRecord):\n id: int\n name: str\n\nasync def main():\n db = Database('sqlite+aiosqlite:///example.db')\n conn = await db.connect()\n try:\n users = await conn.fetch_all('SELECT * FROM users', model=User)\n for user in users:\n print(user.name) # Type-safe access\n finally:\n await conn.close()\n await db.close()\n\nasyncio.run(main())\n```\n\n---\n\n## Contributing\n\nWe welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md) for details.\n\n## License\n\nRecords2 is released under the MIT License. See [LICENSE](LICENSE) for details.\n\n---\n\n**Records2** - Making database operations simple, safe, and modern. \ud83d\ude80\n",
"bugtrack_url": null,
"license": null,
"summary": "A comprehensive, type-safe database solution for Python",
"version": "1.0.0",
"project_urls": null,
"split_keywords": [
"async",
" database",
" nexios",
" python",
" records",
" sqlite",
" type-safe"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "1d0bdd22e52be50e7d7518c15df3a5915b0390ebc1c6975eea63d44aee872dd0",
"md5": "d7c906b980ffac1fa2c290a5f0c28bb5",
"sha256": "99a988779bdf911981a5b60134e94434858871cd936f128acd16f063a68fbc9c"
},
"downloads": -1,
"filename": "records2-1.0.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "d7c906b980ffac1fa2c290a5f0c28bb5",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 9097,
"upload_time": "2025-07-26T11:09:37",
"upload_time_iso_8601": "2025-07-26T11:09:37.723569Z",
"url": "https://files.pythonhosted.org/packages/1d/0b/dd22e52be50e7d7518c15df3a5915b0390ebc1c6975eea63d44aee872dd0/records2-1.0.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "d1c01dbedc050dd4ea15cac7e7cc0f416553f8a5b46e8b1d8170104e32442b0c",
"md5": "d7ff4e855028b54b1fb38d1f5e2134ab",
"sha256": "e6f8dbafdd6691e7bcef96c040b643cd52b290e313fc543f3036f53a0a9c6c27"
},
"downloads": -1,
"filename": "records2-1.0.0.tar.gz",
"has_sig": false,
"md5_digest": "d7ff4e855028b54b1fb38d1f5e2134ab",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 41158,
"upload_time": "2025-07-26T11:09:38",
"upload_time_iso_8601": "2025-07-26T11:09:38.988005Z",
"url": "https://files.pythonhosted.org/packages/d1/c0/1dbedc050dd4ea15cac7e7cc0f416553f8a5b46e8b1d8170104e32442b0c/records2-1.0.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-26 11:09:38",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "records2"
}