records2


Namerecords2 JSON
Version 1.0.0 PyPI version JSON
download
home_pageNone
SummaryA comprehensive, type-safe database solution for Python
upload_time2025-07-26 11:09:38
maintainerNone
docs_urlNone
authorNone
requires_python>=3.9
licenseNone
keywords async database nexios python records sqlite type-safe
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Records2 🚀

**The Modern Async Python Database Toolkit with Pydantic Integration**

_A complete reimagining of database interaction for modern Python applications_

---

## Why Records2?

Records2 is a **fully async**, **Pydantic-integrated** mini-ORM that makes database operations simple, type-safe, and developer-friendly. Built from the ground up for modern Python applications.

### ✨ Key Features

- 🔥 **100% Async/Await** - No sync fallbacks, fully async architecture
- 🛡️ **Type Safety** - Full Pydantic v2 integration with automatic validation
- ⚡ **Developer Friendly** - Intuitive API that just works
- 🔄 **Transaction Support** - ACID compliance with context managers
- 🎯 **Zero Configuration** - Works out of the box with any SQL database
- 📊 **Production Ready** - Connection pooling and error handling built-in

---

## Quick Start

### Installation

```bash
pip install records2 pydantic aiosqlite  # For SQLite
# or
pip install records2 pydantic asyncpg    # For PostgreSQL
```

### Your First Query

```python
import asyncio
from records2 import Database

async def main():
    # Connect to database
    db = Database("sqlite+aiosqlite:///example.db")

    # Execute a simple query
    conn = await db.connect()
    result = await conn.query("SELECT 'Hello, Records2!' as message")
    row = result.fetchone()
    print(row[0])  # Hello, Records2!

    await conn.close()
    await db.close()

asyncio.run(main())
```

---

## Pydantic Integration

Define your data models with Pydantic for automatic validation and serialization:

```python
from typing import Optional
from datetime import datetime
from pydantic import Field, EmailStr
from records2 import Database, BaseRecord

class User(BaseRecord):
    """User model with validation."""
    id: Optional[int] = Field(None, description="User ID")
    name: str = Field(..., min_length=1, max_length=100)
    email: EmailStr = Field(..., description="Valid email address")
    age: Optional[int] = Field(None, ge=0, le=150)
    is_active: bool = Field(True, description="Account status")
    created_at: Optional[datetime] = Field(None)

async def user_example():
    db = Database("sqlite+aiosqlite:///users.db")

    # Setup table
    async with db.transaction() as conn:
        await conn.query("""
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                email TEXT UNIQUE NOT NULL,
                age INTEGER,
                is_active BOOLEAN DEFAULT TRUE,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)

    # Insert with validation
    async with db.transaction() as conn:
        user = User(
            name="Alice Johnson",
            email="alice@example.com",
            age=28
        )
        await conn.query("""
            INSERT INTO users (name, email, age, is_active)
            VALUES (:name, :email, :age, :is_active)
        """, **user.model_dump(exclude={'id', 'created_at'}))

    # Query with automatic model creation
    conn = await db.connect()
    try:
        users = await conn.fetch_all("SELECT * FROM users", model=User)
        for user in users:
            print(f"User: {user.name} ({user.email})")
            print(f"Valid: {user.model_dump()}")  # Automatic serialization
    finally:
        await conn.close()
        await db.close()
```

---

## Database Operations

### Basic Queries

```python
async def basic_queries():
    db = Database("sqlite+aiosqlite:///example.db")
    conn = await db.connect()

    try:
        # Simple query
        result = await conn.query("SELECT COUNT(*) FROM users")
        count = result.fetchone()[0]
        print(f"Total users: {count}")

        # Parameterized query (safe from SQL injection)
        result = await conn.query(
            "SELECT * FROM users WHERE age > :min_age",
            min_age=25
        )
        rows = result.fetchall()

        # Fetch with Pydantic model
        users = await conn.fetch_all(
            "SELECT * FROM users WHERE is_active = :active",
            model=User,
            active=True
        )

        # Fetch single record
        user = await conn.fetch_one(
            "SELECT * FROM users WHERE email = :email",
            model=User,
            email="alice@example.com"
        )

    finally:
        await conn.close()
        await db.close()
```

### Transactions

```python
async def transaction_example():
    db = Database("sqlite+aiosqlite:///example.db")

    # Automatic transaction management
    async with db.transaction() as conn:
        # All operations in this block are part of one transaction
        await conn.query(
            "INSERT INTO users (name, email) VALUES (:name, :email)",
            name="Bob Smith",
            email="bob@example.com"
        )

        await conn.query(
            "UPDATE users SET is_active = :active WHERE email = :email",
            active=True,
            email="bob@example.com"
        )

        # If any operation fails, entire transaction is rolled back
        # If block completes successfully, transaction is committed

    await db.close()
```

### Error Handling with Rollback

```python
async def error_handling_example():
    db = Database("sqlite+aiosqlite:///example.db")

    try:
        async with db.transaction() as conn:
            # This will succeed
            await conn.query(
                "INSERT INTO users (name, email) VALUES (:name, :email)",
                name="Charlie Brown",
                email="charlie@example.com"
            )

            # This will fail (duplicate email) and rollback entire transaction
            await conn.query(
                "INSERT INTO users (name, email) VALUES (:name, :email)",
                name="Charlie Duplicate",
                email="charlie@example.com"  # Same email - will fail
            )

    except Exception as e:
        print(f"Transaction failed and was rolled back: {e}")
        # Charlie Brown was NOT inserted due to rollback

    await db.close()
```

---

## Advanced Features

### Multiple Models

```python
class Product(BaseRecord):
    """Product model."""
    id: Optional[int] = None
    name: str = Field(..., min_length=1)
    price: float = Field(..., gt=0)
    category: str
    in_stock: bool = True

class Order(BaseRecord):
    """Order model."""
    id: Optional[int] = None
    user_id: int
    product_id: int
    quantity: int = Field(..., gt=0)
    total_price: float = Field(..., gt=0)
    order_date: Optional[datetime] = None

async def multi_model_example():
    db = Database("sqlite+aiosqlite:///shop.db")

    # Setup tables
    async with db.transaction() as conn:
        await conn.query("""
            CREATE TABLE IF NOT EXISTS products (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                price REAL NOT NULL,
                category TEXT NOT NULL,
                in_stock BOOLEAN DEFAULT TRUE
            )
        """)

        await conn.query("""
            CREATE TABLE IF NOT EXISTS orders (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                user_id INTEGER NOT NULL,
                product_id INTEGER NOT NULL,
                quantity INTEGER NOT NULL,
                total_price REAL NOT NULL,
                order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)

    # Insert products
    async with db.transaction() as conn:
        products = [
            Product(name="Laptop", price=999.99, category="Electronics"),
            Product(name="Mouse", price=29.99, category="Electronics"),
            Product(name="Book", price=19.99, category="Books"),
        ]

        for product in products:
            await conn.query("""
                INSERT INTO products (name, price, category, in_stock)
                VALUES (:name, :price, :category, :in_stock)
            """, **product.model_dump(exclude={'id'}))

    # Query different models
    conn = await db.connect()
    try:
        # Get all products
        products = await conn.fetch_all("SELECT * FROM products", model=Product)
        print(f"Found {len(products)} products")

        # Get electronics only
        electronics = await conn.fetch_all(
            "SELECT * FROM products WHERE category = :category",
            model=Product,
            category="Electronics"
        )

        for product in electronics:
            print(f"- {product.name}: ${product.price}")

    finally:
        await conn.close()
        await db.close()
```

### Bulk Operations

```python
async def bulk_operations():
    db = Database("sqlite+aiosqlite:///bulk.db")

    # Setup
    async with db.transaction() as conn:
        await conn.query("""
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                email TEXT NOT NULL,
                age INTEGER
            )
        """)

    # Bulk insert
    async with db.transaction() as conn:
        users_data = [
            {"name": f"User{i}", "email": f"user{i}@example.com", "age": 20 + (i % 50)}
            for i in range(1000)
        ]

        for user_data in users_data:
            # Validate with Pydantic
            user = User(**user_data)
            await conn.query("""
                INSERT INTO users (name, email, age)
                VALUES (:name, :email, :age)
            """, **user.model_dump(exclude={'id', 'is_active', 'created_at'}))

    # Bulk query
    conn = await db.connect()
    try:
        # Get all users (automatically converted to User models)
        users = await conn.fetch_all("SELECT * FROM users ORDER BY id", model=User)
        print(f"Inserted and retrieved {len(users)} users")

        # Filter young users
        young_users = await conn.fetch_all(
            "SELECT * FROM users WHERE age < :max_age",
            model=User,
            max_age=30
        )
        print(f"Found {len(young_users)} young users")

    finally:
        await conn.close()
        await db.close()
```

---

## Database Support

Records2 works with any SQLAlchemy-supported database:

### SQLite (Development)

```python
db = Database("sqlite+aiosqlite:///app.db")
# or in-memory
db = Database("sqlite+aiosqlite:///:memory:")
```

### PostgreSQL (Production)

```python
db = Database("postgresql+asyncpg://user:password@localhost/dbname")
```

### MySQL

```python
db = Database("mysql+aiomysql://user:password@localhost/dbname")
```

---

## Real-World Example: Blog API

```python
from typing import List, Optional
from datetime import datetime
from pydantic import Field
from records2 import Database, BaseRecord

class Author(BaseRecord):
    id: Optional[int] = None
    name: str = Field(..., min_length=1, max_length=100)
    email: str = Field(..., regex=r'^[\w\.-]+@[\w\.-]+\.\w+$')
    bio: Optional[str] = None
    created_at: Optional[datetime] = None

class Post(BaseRecord):
    id: Optional[int] = None
    title: str = Field(..., min_length=1, max_length=200)
    content: str = Field(..., min_length=1)
    author_id: int
    published: bool = Field(False)
    created_at: Optional[datetime] = None
    updated_at: Optional[datetime] = None

class BlogService:
    def __init__(self, database_url: str):
        self.db = Database(database_url)

    async def setup_database(self):
        """Initialize database schema."""
        async with self.db.transaction() as conn:
            await conn.query("""
                CREATE TABLE IF NOT EXISTS authors (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    name TEXT NOT NULL,
                    email TEXT UNIQUE NOT NULL,
                    bio TEXT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            """)

            await conn.query("""
                CREATE TABLE IF NOT EXISTS posts (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    title TEXT NOT NULL,
                    content TEXT NOT NULL,
                    author_id INTEGER NOT NULL,
                    published BOOLEAN DEFAULT FALSE,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    FOREIGN KEY (author_id) REFERENCES authors (id)
                )
            """)

    async def create_author(self, author: Author) -> Author:
        """Create a new author."""
        async with self.db.transaction() as conn:
            result = await conn.query("""
                INSERT INTO authors (name, email, bio)
                VALUES (:name, :email, :bio)
                RETURNING id
            """, **author.model_dump(exclude={'id', 'created_at'}))

            author_id = result.fetchone()[0]
            return await self.get_author(author_id)

    async def get_author(self, author_id: int) -> Optional[Author]:
        """Get author by ID."""
        conn = await self.db.connect()
        try:
            return await conn.fetch_one(
                "SELECT * FROM authors WHERE id = :id",
                model=Author,
                id=author_id
            )
        finally:
            await conn.close()

    async def create_post(self, post: Post) -> Post:
        """Create a new blog post."""
        async with self.db.transaction() as conn:
            result = await conn.query("""
                INSERT INTO posts (title, content, author_id, published)
                VALUES (:title, :content, :author_id, :published)
                RETURNING id
            """, **post.model_dump(exclude={'id', 'created_at', 'updated_at'}))

            post_id = result.fetchone()[0]
            return await self.get_post(post_id)

    async def get_post(self, post_id: int) -> Optional[Post]:
        """Get post by ID."""
        conn = await self.db.connect()
        try:
            return await conn.fetch_one(
                "SELECT * FROM posts WHERE id = :id",
                model=Post,
                id=post_id
            )
        finally:
            await conn.close()

    async def get_published_posts(self) -> List[Post]:
        """Get all published posts."""
        conn = await self.db.connect()
        try:
            return await conn.fetch_all(
                "SELECT * FROM posts WHERE published = TRUE ORDER BY created_at DESC",
                model=Post
            )
        finally:
            await conn.close()

    async def get_posts_by_author(self, author_id: int) -> List[Post]:
        """Get all posts by an author."""
        conn = await self.db.connect()
        try:
            return await conn.fetch_all(
                "SELECT * FROM posts WHERE author_id = :author_id ORDER BY created_at DESC",
                model=Post,
                author_id=author_id
            )
        finally:
            await conn.close()

    async def close(self):
        """Close database connection."""
        await self.db.close()

# Usage example
async def blog_example():
    blog = BlogService("sqlite+aiosqlite:///blog.db")

    try:
        # Setup database
        await blog.setup_database()

        # Create an author
        author = Author(
            name="Jane Doe",
            email="jane@example.com",
            bio="Tech writer and Python enthusiast"
        )
        created_author = await blog.create_author(author)
        print(f"Created author: {created_author.name} (ID: {created_author.id})")

        # Create posts
        post1 = Post(
            title="Getting Started with Records2",
            content="Records2 makes database operations simple and type-safe...",
            author_id=created_author.id,
            published=True
        )

        post2 = Post(
            title="Advanced Records2 Patterns",
            content="Learn advanced patterns for using Records2 in production...",
            author_id=created_author.id,
            published=False  # Draft
        )

        created_post1 = await blog.create_post(post1)
        created_post2 = await blog.create_post(post2)

        print(f"Created posts: {created_post1.title}, {created_post2.title}")

        # Query posts
        published_posts = await blog.get_published_posts()
        print(f"Published posts: {len(published_posts)}")

        author_posts = await blog.get_posts_by_author(created_author.id)
        print(f"Author's total posts: {len(author_posts)}")

        # Display published posts
        for post in published_posts:
            print(f"📝 {post.title}")
            print(f"   By author ID: {post.author_id}")
            print(f"   Published: {post.published}")
            print()

    finally:
        await blog.close()

# Run the example
if __name__ == "__main__":
    import asyncio
    asyncio.run(blog_example())
```

---

## Testing

Records2 includes comprehensive test coverage. Run tests with:

```bash
# Install test dependencies
pip install pytest pytest-asyncio

# Run tests
pytest

# Run with coverage
pytest --cov=records2
```

---

## Migration from Original Records

If you're migrating from the original Records library:

### Before (Original Records)

```python
import records

db = records.Database('sqlite:///example.db')
rows = db.query('SELECT * FROM users')
for row in rows:
    print(row.name)
```

### After (Records2)

```python
import asyncio
from records2 import Database, BaseRecord

class User(BaseRecord):
    id: int
    name: str

async def main():
    db = Database('sqlite+aiosqlite:///example.db')
    conn = await db.connect()
    try:
        users = await conn.fetch_all('SELECT * FROM users', model=User)
        for user in users:
            print(user.name)  # Type-safe access
    finally:
        await conn.close()
        await db.close()

asyncio.run(main())
```

---

## Contributing

We welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md) for details.

## License

Records2 is released under the MIT License. See [LICENSE](LICENSE) for details.

---

**Records2** - Making database operations simple, safe, and modern. 🚀

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "records2",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.9",
    "maintainer_email": null,
    "keywords": "async, database, nexios, python, records, sqlite, type-safe",
    "author": null,
    "author_email": "Chidebele Dunamis <techwithdunamix@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/d1/c0/1dbedc050dd4ea15cac7e7cc0f416553f8a5b46e8b1d8170104e32442b0c/records2-1.0.0.tar.gz",
    "platform": null,
    "description": "# Records2 \ud83d\ude80\n\n**The Modern Async Python Database Toolkit with Pydantic Integration**\n\n_A complete reimagining of database interaction for modern Python applications_\n\n---\n\n## Why Records2?\n\nRecords2 is a **fully async**, **Pydantic-integrated** mini-ORM that makes database operations simple, type-safe, and developer-friendly. Built from the ground up for modern Python applications.\n\n### \u2728 Key Features\n\n- \ud83d\udd25 **100% Async/Await** - No sync fallbacks, fully async architecture\n- \ud83d\udee1\ufe0f **Type Safety** - Full Pydantic v2 integration with automatic validation\n- \u26a1 **Developer Friendly** - Intuitive API that just works\n- \ud83d\udd04 **Transaction Support** - ACID compliance with context managers\n- \ud83c\udfaf **Zero Configuration** - Works out of the box with any SQL database\n- \ud83d\udcca **Production Ready** - Connection pooling and error handling built-in\n\n---\n\n## Quick Start\n\n### Installation\n\n```bash\npip install records2 pydantic aiosqlite  # For SQLite\n# or\npip install records2 pydantic asyncpg    # For PostgreSQL\n```\n\n### Your First Query\n\n```python\nimport asyncio\nfrom records2 import Database\n\nasync def main():\n    # Connect to database\n    db = Database(\"sqlite+aiosqlite:///example.db\")\n\n    # Execute a simple query\n    conn = await db.connect()\n    result = await conn.query(\"SELECT 'Hello, Records2!' as message\")\n    row = result.fetchone()\n    print(row[0])  # Hello, Records2!\n\n    await conn.close()\n    await db.close()\n\nasyncio.run(main())\n```\n\n---\n\n## Pydantic Integration\n\nDefine your data models with Pydantic for automatic validation and serialization:\n\n```python\nfrom typing import Optional\nfrom datetime import datetime\nfrom pydantic import Field, EmailStr\nfrom records2 import Database, BaseRecord\n\nclass User(BaseRecord):\n    \"\"\"User model with validation.\"\"\"\n    id: Optional[int] = Field(None, description=\"User ID\")\n    name: str = Field(..., min_length=1, max_length=100)\n    email: EmailStr = Field(..., description=\"Valid email address\")\n    age: Optional[int] = Field(None, ge=0, le=150)\n    is_active: bool = Field(True, description=\"Account status\")\n    created_at: Optional[datetime] = Field(None)\n\nasync def user_example():\n    db = Database(\"sqlite+aiosqlite:///users.db\")\n\n    # Setup table\n    async with db.transaction() as conn:\n        await conn.query(\"\"\"\n            CREATE TABLE IF NOT EXISTS users (\n                id INTEGER PRIMARY KEY AUTOINCREMENT,\n                name TEXT NOT NULL,\n                email TEXT UNIQUE NOT NULL,\n                age INTEGER,\n                is_active BOOLEAN DEFAULT TRUE,\n                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n            )\n        \"\"\")\n\n    # Insert with validation\n    async with db.transaction() as conn:\n        user = User(\n            name=\"Alice Johnson\",\n            email=\"alice@example.com\",\n            age=28\n        )\n        await conn.query(\"\"\"\n            INSERT INTO users (name, email, age, is_active)\n            VALUES (:name, :email, :age, :is_active)\n        \"\"\", **user.model_dump(exclude={'id', 'created_at'}))\n\n    # Query with automatic model creation\n    conn = await db.connect()\n    try:\n        users = await conn.fetch_all(\"SELECT * FROM users\", model=User)\n        for user in users:\n            print(f\"User: {user.name} ({user.email})\")\n            print(f\"Valid: {user.model_dump()}\")  # Automatic serialization\n    finally:\n        await conn.close()\n        await db.close()\n```\n\n---\n\n## Database Operations\n\n### Basic Queries\n\n```python\nasync def basic_queries():\n    db = Database(\"sqlite+aiosqlite:///example.db\")\n    conn = await db.connect()\n\n    try:\n        # Simple query\n        result = await conn.query(\"SELECT COUNT(*) FROM users\")\n        count = result.fetchone()[0]\n        print(f\"Total users: {count}\")\n\n        # Parameterized query (safe from SQL injection)\n        result = await conn.query(\n            \"SELECT * FROM users WHERE age > :min_age\",\n            min_age=25\n        )\n        rows = result.fetchall()\n\n        # Fetch with Pydantic model\n        users = await conn.fetch_all(\n            \"SELECT * FROM users WHERE is_active = :active\",\n            model=User,\n            active=True\n        )\n\n        # Fetch single record\n        user = await conn.fetch_one(\n            \"SELECT * FROM users WHERE email = :email\",\n            model=User,\n            email=\"alice@example.com\"\n        )\n\n    finally:\n        await conn.close()\n        await db.close()\n```\n\n### Transactions\n\n```python\nasync def transaction_example():\n    db = Database(\"sqlite+aiosqlite:///example.db\")\n\n    # Automatic transaction management\n    async with db.transaction() as conn:\n        # All operations in this block are part of one transaction\n        await conn.query(\n            \"INSERT INTO users (name, email) VALUES (:name, :email)\",\n            name=\"Bob Smith\",\n            email=\"bob@example.com\"\n        )\n\n        await conn.query(\n            \"UPDATE users SET is_active = :active WHERE email = :email\",\n            active=True,\n            email=\"bob@example.com\"\n        )\n\n        # If any operation fails, entire transaction is rolled back\n        # If block completes successfully, transaction is committed\n\n    await db.close()\n```\n\n### Error Handling with Rollback\n\n```python\nasync def error_handling_example():\n    db = Database(\"sqlite+aiosqlite:///example.db\")\n\n    try:\n        async with db.transaction() as conn:\n            # This will succeed\n            await conn.query(\n                \"INSERT INTO users (name, email) VALUES (:name, :email)\",\n                name=\"Charlie Brown\",\n                email=\"charlie@example.com\"\n            )\n\n            # This will fail (duplicate email) and rollback entire transaction\n            await conn.query(\n                \"INSERT INTO users (name, email) VALUES (:name, :email)\",\n                name=\"Charlie Duplicate\",\n                email=\"charlie@example.com\"  # Same email - will fail\n            )\n\n    except Exception as e:\n        print(f\"Transaction failed and was rolled back: {e}\")\n        # Charlie Brown was NOT inserted due to rollback\n\n    await db.close()\n```\n\n---\n\n## Advanced Features\n\n### Multiple Models\n\n```python\nclass Product(BaseRecord):\n    \"\"\"Product model.\"\"\"\n    id: Optional[int] = None\n    name: str = Field(..., min_length=1)\n    price: float = Field(..., gt=0)\n    category: str\n    in_stock: bool = True\n\nclass Order(BaseRecord):\n    \"\"\"Order model.\"\"\"\n    id: Optional[int] = None\n    user_id: int\n    product_id: int\n    quantity: int = Field(..., gt=0)\n    total_price: float = Field(..., gt=0)\n    order_date: Optional[datetime] = None\n\nasync def multi_model_example():\n    db = Database(\"sqlite+aiosqlite:///shop.db\")\n\n    # Setup tables\n    async with db.transaction() as conn:\n        await conn.query(\"\"\"\n            CREATE TABLE IF NOT EXISTS products (\n                id INTEGER PRIMARY KEY AUTOINCREMENT,\n                name TEXT NOT NULL,\n                price REAL NOT NULL,\n                category TEXT NOT NULL,\n                in_stock BOOLEAN DEFAULT TRUE\n            )\n        \"\"\")\n\n        await conn.query(\"\"\"\n            CREATE TABLE IF NOT EXISTS orders (\n                id INTEGER PRIMARY KEY AUTOINCREMENT,\n                user_id INTEGER NOT NULL,\n                product_id INTEGER NOT NULL,\n                quantity INTEGER NOT NULL,\n                total_price REAL NOT NULL,\n                order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n            )\n        \"\"\")\n\n    # Insert products\n    async with db.transaction() as conn:\n        products = [\n            Product(name=\"Laptop\", price=999.99, category=\"Electronics\"),\n            Product(name=\"Mouse\", price=29.99, category=\"Electronics\"),\n            Product(name=\"Book\", price=19.99, category=\"Books\"),\n        ]\n\n        for product in products:\n            await conn.query(\"\"\"\n                INSERT INTO products (name, price, category, in_stock)\n                VALUES (:name, :price, :category, :in_stock)\n            \"\"\", **product.model_dump(exclude={'id'}))\n\n    # Query different models\n    conn = await db.connect()\n    try:\n        # Get all products\n        products = await conn.fetch_all(\"SELECT * FROM products\", model=Product)\n        print(f\"Found {len(products)} products\")\n\n        # Get electronics only\n        electronics = await conn.fetch_all(\n            \"SELECT * FROM products WHERE category = :category\",\n            model=Product,\n            category=\"Electronics\"\n        )\n\n        for product in electronics:\n            print(f\"- {product.name}: ${product.price}\")\n\n    finally:\n        await conn.close()\n        await db.close()\n```\n\n### Bulk Operations\n\n```python\nasync def bulk_operations():\n    db = Database(\"sqlite+aiosqlite:///bulk.db\")\n\n    # Setup\n    async with db.transaction() as conn:\n        await conn.query(\"\"\"\n            CREATE TABLE IF NOT EXISTS users (\n                id INTEGER PRIMARY KEY AUTOINCREMENT,\n                name TEXT NOT NULL,\n                email TEXT NOT NULL,\n                age INTEGER\n            )\n        \"\"\")\n\n    # Bulk insert\n    async with db.transaction() as conn:\n        users_data = [\n            {\"name\": f\"User{i}\", \"email\": f\"user{i}@example.com\", \"age\": 20 + (i % 50)}\n            for i in range(1000)\n        ]\n\n        for user_data in users_data:\n            # Validate with Pydantic\n            user = User(**user_data)\n            await conn.query(\"\"\"\n                INSERT INTO users (name, email, age)\n                VALUES (:name, :email, :age)\n            \"\"\", **user.model_dump(exclude={'id', 'is_active', 'created_at'}))\n\n    # Bulk query\n    conn = await db.connect()\n    try:\n        # Get all users (automatically converted to User models)\n        users = await conn.fetch_all(\"SELECT * FROM users ORDER BY id\", model=User)\n        print(f\"Inserted and retrieved {len(users)} users\")\n\n        # Filter young users\n        young_users = await conn.fetch_all(\n            \"SELECT * FROM users WHERE age < :max_age\",\n            model=User,\n            max_age=30\n        )\n        print(f\"Found {len(young_users)} young users\")\n\n    finally:\n        await conn.close()\n        await db.close()\n```\n\n---\n\n## Database Support\n\nRecords2 works with any SQLAlchemy-supported database:\n\n### SQLite (Development)\n\n```python\ndb = Database(\"sqlite+aiosqlite:///app.db\")\n# or in-memory\ndb = Database(\"sqlite+aiosqlite:///:memory:\")\n```\n\n### PostgreSQL (Production)\n\n```python\ndb = Database(\"postgresql+asyncpg://user:password@localhost/dbname\")\n```\n\n### MySQL\n\n```python\ndb = Database(\"mysql+aiomysql://user:password@localhost/dbname\")\n```\n\n---\n\n## Real-World Example: Blog API\n\n```python\nfrom typing import List, Optional\nfrom datetime import datetime\nfrom pydantic import Field\nfrom records2 import Database, BaseRecord\n\nclass Author(BaseRecord):\n    id: Optional[int] = None\n    name: str = Field(..., min_length=1, max_length=100)\n    email: str = Field(..., regex=r'^[\\w\\.-]+@[\\w\\.-]+\\.\\w+$')\n    bio: Optional[str] = None\n    created_at: Optional[datetime] = None\n\nclass Post(BaseRecord):\n    id: Optional[int] = None\n    title: str = Field(..., min_length=1, max_length=200)\n    content: str = Field(..., min_length=1)\n    author_id: int\n    published: bool = Field(False)\n    created_at: Optional[datetime] = None\n    updated_at: Optional[datetime] = None\n\nclass BlogService:\n    def __init__(self, database_url: str):\n        self.db = Database(database_url)\n\n    async def setup_database(self):\n        \"\"\"Initialize database schema.\"\"\"\n        async with self.db.transaction() as conn:\n            await conn.query(\"\"\"\n                CREATE TABLE IF NOT EXISTS authors (\n                    id INTEGER PRIMARY KEY AUTOINCREMENT,\n                    name TEXT NOT NULL,\n                    email TEXT UNIQUE NOT NULL,\n                    bio TEXT,\n                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n                )\n            \"\"\")\n\n            await conn.query(\"\"\"\n                CREATE TABLE IF NOT EXISTS posts (\n                    id INTEGER PRIMARY KEY AUTOINCREMENT,\n                    title TEXT NOT NULL,\n                    content TEXT NOT NULL,\n                    author_id INTEGER NOT NULL,\n                    published BOOLEAN DEFAULT FALSE,\n                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,\n                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,\n                    FOREIGN KEY (author_id) REFERENCES authors (id)\n                )\n            \"\"\")\n\n    async def create_author(self, author: Author) -> Author:\n        \"\"\"Create a new author.\"\"\"\n        async with self.db.transaction() as conn:\n            result = await conn.query(\"\"\"\n                INSERT INTO authors (name, email, bio)\n                VALUES (:name, :email, :bio)\n                RETURNING id\n            \"\"\", **author.model_dump(exclude={'id', 'created_at'}))\n\n            author_id = result.fetchone()[0]\n            return await self.get_author(author_id)\n\n    async def get_author(self, author_id: int) -> Optional[Author]:\n        \"\"\"Get author by ID.\"\"\"\n        conn = await self.db.connect()\n        try:\n            return await conn.fetch_one(\n                \"SELECT * FROM authors WHERE id = :id\",\n                model=Author,\n                id=author_id\n            )\n        finally:\n            await conn.close()\n\n    async def create_post(self, post: Post) -> Post:\n        \"\"\"Create a new blog post.\"\"\"\n        async with self.db.transaction() as conn:\n            result = await conn.query(\"\"\"\n                INSERT INTO posts (title, content, author_id, published)\n                VALUES (:title, :content, :author_id, :published)\n                RETURNING id\n            \"\"\", **post.model_dump(exclude={'id', 'created_at', 'updated_at'}))\n\n            post_id = result.fetchone()[0]\n            return await self.get_post(post_id)\n\n    async def get_post(self, post_id: int) -> Optional[Post]:\n        \"\"\"Get post by ID.\"\"\"\n        conn = await self.db.connect()\n        try:\n            return await conn.fetch_one(\n                \"SELECT * FROM posts WHERE id = :id\",\n                model=Post,\n                id=post_id\n            )\n        finally:\n            await conn.close()\n\n    async def get_published_posts(self) -> List[Post]:\n        \"\"\"Get all published posts.\"\"\"\n        conn = await self.db.connect()\n        try:\n            return await conn.fetch_all(\n                \"SELECT * FROM posts WHERE published = TRUE ORDER BY created_at DESC\",\n                model=Post\n            )\n        finally:\n            await conn.close()\n\n    async def get_posts_by_author(self, author_id: int) -> List[Post]:\n        \"\"\"Get all posts by an author.\"\"\"\n        conn = await self.db.connect()\n        try:\n            return await conn.fetch_all(\n                \"SELECT * FROM posts WHERE author_id = :author_id ORDER BY created_at DESC\",\n                model=Post,\n                author_id=author_id\n            )\n        finally:\n            await conn.close()\n\n    async def close(self):\n        \"\"\"Close database connection.\"\"\"\n        await self.db.close()\n\n# Usage example\nasync def blog_example():\n    blog = BlogService(\"sqlite+aiosqlite:///blog.db\")\n\n    try:\n        # Setup database\n        await blog.setup_database()\n\n        # Create an author\n        author = Author(\n            name=\"Jane Doe\",\n            email=\"jane@example.com\",\n            bio=\"Tech writer and Python enthusiast\"\n        )\n        created_author = await blog.create_author(author)\n        print(f\"Created author: {created_author.name} (ID: {created_author.id})\")\n\n        # Create posts\n        post1 = Post(\n            title=\"Getting Started with Records2\",\n            content=\"Records2 makes database operations simple and type-safe...\",\n            author_id=created_author.id,\n            published=True\n        )\n\n        post2 = Post(\n            title=\"Advanced Records2 Patterns\",\n            content=\"Learn advanced patterns for using Records2 in production...\",\n            author_id=created_author.id,\n            published=False  # Draft\n        )\n\n        created_post1 = await blog.create_post(post1)\n        created_post2 = await blog.create_post(post2)\n\n        print(f\"Created posts: {created_post1.title}, {created_post2.title}\")\n\n        # Query posts\n        published_posts = await blog.get_published_posts()\n        print(f\"Published posts: {len(published_posts)}\")\n\n        author_posts = await blog.get_posts_by_author(created_author.id)\n        print(f\"Author's total posts: {len(author_posts)}\")\n\n        # Display published posts\n        for post in published_posts:\n            print(f\"\ud83d\udcdd {post.title}\")\n            print(f\"   By author ID: {post.author_id}\")\n            print(f\"   Published: {post.published}\")\n            print()\n\n    finally:\n        await blog.close()\n\n# Run the example\nif __name__ == \"__main__\":\n    import asyncio\n    asyncio.run(blog_example())\n```\n\n---\n\n## Testing\n\nRecords2 includes comprehensive test coverage. Run tests with:\n\n```bash\n# Install test dependencies\npip install pytest pytest-asyncio\n\n# Run tests\npytest\n\n# Run with coverage\npytest --cov=records2\n```\n\n---\n\n## Migration from Original Records\n\nIf you're migrating from the original Records library:\n\n### Before (Original Records)\n\n```python\nimport records\n\ndb = records.Database('sqlite:///example.db')\nrows = db.query('SELECT * FROM users')\nfor row in rows:\n    print(row.name)\n```\n\n### After (Records2)\n\n```python\nimport asyncio\nfrom records2 import Database, BaseRecord\n\nclass User(BaseRecord):\n    id: int\n    name: str\n\nasync def main():\n    db = Database('sqlite+aiosqlite:///example.db')\n    conn = await db.connect()\n    try:\n        users = await conn.fetch_all('SELECT * FROM users', model=User)\n        for user in users:\n            print(user.name)  # Type-safe access\n    finally:\n        await conn.close()\n        await db.close()\n\nasyncio.run(main())\n```\n\n---\n\n## Contributing\n\nWe welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md) for details.\n\n## License\n\nRecords2 is released under the MIT License. See [LICENSE](LICENSE) for details.\n\n---\n\n**Records2** - Making database operations simple, safe, and modern. \ud83d\ude80\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "A comprehensive, type-safe database solution for Python",
    "version": "1.0.0",
    "project_urls": null,
    "split_keywords": [
        "async",
        " database",
        " nexios",
        " python",
        " records",
        " sqlite",
        " type-safe"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "1d0bdd22e52be50e7d7518c15df3a5915b0390ebc1c6975eea63d44aee872dd0",
                "md5": "d7c906b980ffac1fa2c290a5f0c28bb5",
                "sha256": "99a988779bdf911981a5b60134e94434858871cd936f128acd16f063a68fbc9c"
            },
            "downloads": -1,
            "filename": "records2-1.0.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "d7c906b980ffac1fa2c290a5f0c28bb5",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.9",
            "size": 9097,
            "upload_time": "2025-07-26T11:09:37",
            "upload_time_iso_8601": "2025-07-26T11:09:37.723569Z",
            "url": "https://files.pythonhosted.org/packages/1d/0b/dd22e52be50e7d7518c15df3a5915b0390ebc1c6975eea63d44aee872dd0/records2-1.0.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "d1c01dbedc050dd4ea15cac7e7cc0f416553f8a5b46e8b1d8170104e32442b0c",
                "md5": "d7ff4e855028b54b1fb38d1f5e2134ab",
                "sha256": "e6f8dbafdd6691e7bcef96c040b643cd52b290e313fc543f3036f53a0a9c6c27"
            },
            "downloads": -1,
            "filename": "records2-1.0.0.tar.gz",
            "has_sig": false,
            "md5_digest": "d7ff4e855028b54b1fb38d1f5e2134ab",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.9",
            "size": 41158,
            "upload_time": "2025-07-26T11:09:38",
            "upload_time_iso_8601": "2025-07-26T11:09:38.988005Z",
            "url": "https://files.pythonhosted.org/packages/d1/c0/1dbedc050dd4ea15cac7e7cc0f416553f8a5b46e8b1d8170104e32442b0c/records2-1.0.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-07-26 11:09:38",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "records2"
}
        
Elapsed time: 2.71837s