ff-storage


Nameff-storage JSON
Version 1.0.0 PyPI version JSON
download
home_pageNone
SummaryFenixflow storage package for database and file operations with async connection pools
upload_time2025-10-06 19:41:11
maintainerNone
docs_urlNone
authorNone
requires_python>=3.10
licenseMIT
keywords storage database postgresql mysql sqlserver s3 object-storage file-storage migrations fenixflow
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # ff-storage

[![PyPI version](https://badge.fury.io/py/ff-storage.svg)](https://badge.fury.io/py/ff-storage)
[![Python Support](https://img.shields.io/pypi/pyversions/ff-storage.svg)](https://pypi.org/project/ff-storage/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

A comprehensive storage package for Fenixflow applications, providing **async connection pools** for modern Python applications, database connections, object storage abstractions, migration management, and model utilities. Supports PostgreSQL, MySQL, Microsoft SQL Server, local filesystem storage, and S3-compatible services.

Created by **Ben Moag** at **[Fenixflow](https://fenixflow.com)**

## 🚨 Version 1.0.0 - Async Pools

**Breaking Change**: All connection pools are now async for better performance and scalability. Use direct connections for synchronous code.

## Quick Start

### Installation

#### From PyPI
```bash
pip install ff-storage
```

#### From GitLab
```bash
pip install git+https://gitlab.com/fenixflow/fenix-packages.git#subdirectory=ff-storage
```

### Async Pool (FastAPI, Production)

```python
from ff_storage.db import PostgresPool

# Create async connection pool
pool = PostgresPool(
    dbname="fenix_db",
    user="fenix",
    password="password",
    host="localhost",
    port=5432,
    min_size=10,
    max_size=20
)

# Connect once at startup
await pool.connect()

# Use many times - pool handles connections internally
# Returns dictionaries by default for easy access
results = await pool.fetch_all("SELECT id, title, status FROM documents WHERE status = $1", "active")
# results = [{'id': 1, 'title': 'Doc 1', 'status': 'active'}, ...]

print(results[0]['title'])  # Access by column name - intuitive!

# Fetch single row
user = await pool.fetch_one("SELECT id, name, email FROM users WHERE id = $1", 123)
# user = {'id': 123, 'name': 'Alice', 'email': 'alice@example.com'}

# Disconnect once at shutdown
await pool.disconnect()
```

### Sync Connection (Scripts, Simple Apps)

```python
from ff_storage.db import Postgres

# Create direct connection
db = Postgres(
    dbname="fenix_db",
    user="fenix",
    password="password",
    host="localhost",
    port=5432
)

# Connect and query - returns dicts by default
db.connect()
results = db.read_query("SELECT id, title, status FROM documents WHERE status = %(status)s", {"status": "active"})
# results = [{'id': 1, 'title': 'Doc 1', 'status': 'active'}, ...]

print(results[0]['title'])  # Easy access by column name

db.close_connection()
```

### FastAPI Integration

```python
from fastapi import FastAPI
from ff_storage.db import PostgresPool

app = FastAPI()

# Create pool once
app.state.db = PostgresPool(
    dbname="fenix_db",
    user="fenix",
    password="password",
    host="localhost",
    min_size=10,
    max_size=20
)

@app.on_event("startup")
async def startup():
    await app.state.db.connect()

@app.on_event("shutdown")
async def shutdown():
    await app.state.db.disconnect()

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    # Pool handles connection automatically
    user = await app.state.db.fetch_one(
        "SELECT * FROM users WHERE id = $1", user_id
    )
    return user
```

## Migration Guide (v0.3.0 → v1.0.0)

### Breaking Changes

**Pools are now async** - all `*Pool` classes require `await`:

| v0.3.0 (Sync) | v1.0.0 (Async) |
|---------------|----------------|
| `pool.connect()` | `await pool.connect()` |
| `pool.read_query()` | `await pool.fetch_all()` |
| `pool.execute()` | `await pool.execute()` |
| `pool.close_connection()` | `await pool.disconnect()` |

**For sync code**, use direct connections (no breaking changes):
- `Postgres` (sync) - unchanged
- `MySQL` (sync) - unchanged
- `SQLServer` (sync) - unchanged

## Features

### Database Operations
- **Async Connection Pools**: High-performance async pools for PostgreSQL, MySQL, and SQL Server
- **Sync Direct Connections**: Simple sync connections for scripts and non-async code
- **Multi-Database Support**: Uniform interface across PostgreSQL, MySQL, and Microsoft SQL Server
- **Transaction Management**: Built-in support for transactions with rollback
- **Batch Operations**: Execute many queries efficiently
- **Query Builder**: SQL query construction utilities

### Object Storage
- **Multiple Backends**: Local filesystem and S3/S3-compatible services
- **Async Operations**: Non-blocking I/O for better performance
- **Streaming Support**: Handle large files without memory overhead
- **Atomic Writes**: Safe file operations with temp file + rename
- **Metadata Management**: Store and retrieve metadata with objects

### Migration System
- **SQL File-Based**: Simple, version-controlled migrations
- **Automatic Tracking**: Keeps track of applied migrations
- **Rollback Support**: Undo migrations when needed

## Core Components

### Database Connections

#### PostgreSQL with Connection Pooling
```python
from ff_storage import PostgresPool

# Initialize pool
db = PostgresPool(
    dbname="fenix_db",
    user="fenix",
    password="password",
    host="localhost",
    port=5432,
    pool_size=20
)

# Use connection from pool - returns dicts by default
db.connect()
try:
    # Execute queries - returns list of dicts
    results = db.read_query("SELECT id, title, status FROM documents WHERE status = %s", {"status": "active"})
    # results = [{'id': 1, 'title': 'Doc 1', 'status': 'active'}, ...]
    print(results[0]['title'])  # Easy access by column name

    # Execute with RETURNING
    new_id = db.execute_query(
        "INSERT INTO documents (title) VALUES (%s) RETURNING id",
        {"title": "New Document"}
    )
    # new_id = [{'id': 123}]

    # Transaction example
    db.begin_transaction()
    try:
        db.execute("UPDATE documents SET status = %s WHERE id = %s", {"status": "archived", "id": 123})
        db.execute("INSERT INTO audit_log (action) VALUES (%s)", {"action": "archive"})
        db.commit_transaction()
    except Exception:
        db.rollback_transaction()
        raise
finally:
    # Return connection to pool
    db.close_connection()
```

#### MySQL with Connection Pooling
```python
from ff_storage import MySQLPool

# Initialize pool
db = MySQLPool(
    dbname="fenix_db",
    user="root",
    password="password",
    host="localhost",
    port=3306,
    pool_size=10
)

# Similar usage pattern as PostgreSQL - returns dicts by default
db.connect()
results = db.read_query("SELECT id, title, status FROM documents WHERE status = %s", {"status": "active"})
# results = [{'id': 1, 'title': 'Doc 1', 'status': 'active'}, ...]
print(results[0]['title'])  # Easy access by column name
db.close_connection()
```

#### Microsoft SQL Server with Connection Pooling
```python
from ff_storage import SQLServerPool

# Initialize pool
db = SQLServerPool(
    dbname="fenix_db",
    user="sa",
    password="YourPassword123",
    host="localhost",
    port=1433,
    driver="ODBC Driver 18 for SQL Server",
    pool_size=10
)

# Connect and execute queries - returns dicts by default
db.connect()
try:
    # Read query - returns list of dicts
    results = db.read_query("SELECT id, title, status FROM documents WHERE status = ?", {"status": "active"})
    # results = [{'id': 1, 'title': 'Doc 1', 'status': 'active'}, ...]
    print(results[0]['title'])  # Easy access by column name

    # Execute with OUTPUT clause
    new_id = db.execute_query(
        "INSERT INTO documents (title) OUTPUT INSERTED.id VALUES (?)",
        {"title": "New Document"}
    )
    # new_id = [{'id': 123}]

    # Check table existence
    if db.table_exists("users", schema="dbo"):
        columns = db.get_table_columns("users", schema="dbo")
finally:
    db.close_connection()
```

### Object Storage

#### Local Filesystem Storage
```python
from ff_storage import LocalObjectStorage
import asyncio

async def main():
    # Initialize local storage
    storage = LocalObjectStorage("/var/data/documents")

    # Write file with metadata
    await storage.write(
        "reports/2025/quarterly.pdf",
        pdf_bytes,
        metadata={"content-type": "application/pdf", "author": "system"}
    )

    # Read file
    data = await storage.read("reports/2025/quarterly.pdf")

    # Check existence
    exists = await storage.exists("reports/2025/quarterly.pdf")

    # List files with prefix
    files = await storage.list_keys(prefix="reports/2025/")

    # Delete file
    await storage.delete("reports/2025/quarterly.pdf")

asyncio.run(main())
```

#### S3-Compatible Storage
```python
from ff_storage import S3ObjectStorage
import asyncio

async def main():
    # AWS S3
    s3 = S3ObjectStorage(
        bucket="fenix-documents",
        region="us-east-1"
    )

    # Or MinIO/other S3-compatible
    s3 = S3ObjectStorage(
        bucket="fenix-documents",
        endpoint_url="http://localhost:9000",
        access_key="minioadmin",
        secret_key="minioadmin"
    )

    # Write file
    await s3.write("docs/report.pdf", pdf_bytes)

    # Stream large files
    async for chunk in s3.read_stream("large_file.bin", chunk_size=8192):
        await process_chunk(chunk)

    # Multipart upload for large files (automatic)
    await s3.write("huge_file.bin", huge_data)  # Automatically uses multipart if > 5MB

asyncio.run(main())
```

### Migration Management

```python
from ff_storage.db.migrations import MigrationManager

# Setup migration manager
manager = MigrationManager(db_connection, "./migrations")

# Run all pending migrations
manager.migrate()

# Create new migration
manager.create_migration("add_user_roles")

# Check migration status
pending = manager.get_pending_migrations()
applied = manager.get_applied_migrations()
```

Migration files follow the naming pattern: `001_initial_schema.sql`, `002_add_indexes.sql`, etc.

### Base Models

```python
from ff_storage.db.models import BaseModel, BaseModelWithDates
from dataclasses import dataclass
from typing import Optional
import uuid

@dataclass
class Document(BaseModelWithDates):
    title: str
    content: str
    status: str = "draft"
    author_id: Optional[uuid.UUID] = None

# Automatic UUID and timestamp handling
doc = Document(
    title="Quarterly Report",
    content="...",
    status="published"
)
# doc.id = UUID automatically generated
# doc.created_at = current timestamp
# doc.updated_at = current timestamp
```

## Advanced Features

### Transaction Management
```python
# Context manager for automatic transaction handling
async def transfer_ownership(db, doc_id, new_owner_id):
    db.begin_transaction()
    try:
        # Multiple operations in single transaction
        db.execute("UPDATE documents SET owner_id = %s WHERE id = %s",
                  {"owner_id": new_owner_id, "id": doc_id})
        db.execute("INSERT INTO audit_log (action, doc_id, user_id) VALUES (%s, %s, %s)",
                  {"action": "transfer", "doc_id": doc_id, "user_id": new_owner_id})
        db.commit_transaction()
    except Exception as e:
        db.rollback_transaction()
        raise
```

### Connection Pool Monitoring
```python
# Check pool statistics
pool = PostgresPool(...)
open_connections = pool.get_open_connections()
print(f"Open connections: {open_connections}")

# Graceful shutdown
pool.close_all_connections()
```

### Query Builder Utilities
```python
from ff_storage.db.sql import build_insert, build_update, build_select

# Build INSERT query
query, params = build_insert("documents", {
    "title": "New Doc",
    "status": "draft"
})

# Build UPDATE query
query, params = build_update("documents",
    {"status": "published"},
    {"id": doc_id}
)

# Build SELECT with conditions
query, params = build_select("documents",
    columns=["id", "title"],
    where={"status": "published", "author_id": user_id}
)
```

## Error Handling

```python
from ff_storage.exceptions import StorageError, DatabaseError

try:
    db.connect()
    results = db.read_query("SELECT * FROM documents")
except DatabaseError as e:
    print(f"Database error: {e}")
except StorageError as e:
    print(f"Storage error: {e}")
finally:
    db.close_connection()
```

## Testing

```bash
# Run tests
pytest tests/

# With coverage
pytest --cov=ff_storage tests/

# Run specific test file
pytest tests/test_postgres.py

# Run with verbose output
pytest -v tests/
```

## Configuration

### Environment Variables
```bash
# Database
export DB_HOST=localhost
export DB_PORT=5432
export DB_NAME=fenix_db
export DB_USER=fenix
export DB_PASSWORD=secret

# S3 Storage
export AWS_ACCESS_KEY_ID=your-key
export AWS_SECRET_ACCESS_KEY=your-secret
export AWS_DEFAULT_REGION=us-east-1

# Local Storage
export STORAGE_PATH=/var/data/documents
```

### Configuration File
```python
# config.py
from ff_storage import PostgresPool, S3ObjectStorage

# Database configuration
DATABASE = {
    "dbname": os.getenv("DB_NAME", "fenix_db"),
    "user": os.getenv("DB_USER", "fenix"),
    "password": os.getenv("DB_PASSWORD"),
    "host": os.getenv("DB_HOST", "localhost"),
    "port": int(os.getenv("DB_PORT", 5432)),
    "pool_size": 20
}

# Storage configuration
STORAGE = {
    "bucket": os.getenv("S3_BUCKET", "fenix-documents"),
    "region": os.getenv("AWS_DEFAULT_REGION", "us-east-1")
}

# Initialize
db = PostgresPool(**DATABASE)
storage = S3ObjectStorage(**STORAGE)
```

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

## License

MIT License - See [LICENSE](LICENSE) file for details.

## Author

Created and maintained by **Ben Moag** at **[Fenixflow](https://fenixflow.com)**

For more information, visit the [GitLab repository](https://gitlab.com/fenixflow/fenix-packages).

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "ff-storage",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.10",
    "maintainer_email": "Fenixflow Team <dev@fenixflow.com>",
    "keywords": "storage, database, postgresql, mysql, sqlserver, s3, object-storage, file-storage, migrations, fenixflow",
    "author": null,
    "author_email": "Ben Moag <dev@fenixflow.com>",
    "download_url": "https://files.pythonhosted.org/packages/b3/4c/ca50a138fd78aad4fe44bea45a5f56d685f01bf8a19da848dd80c3619112/ff_storage-1.0.0.tar.gz",
    "platform": null,
    "description": "# ff-storage\n\n[![PyPI version](https://badge.fury.io/py/ff-storage.svg)](https://badge.fury.io/py/ff-storage)\n[![Python Support](https://img.shields.io/pypi/pyversions/ff-storage.svg)](https://pypi.org/project/ff-storage/)\n[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)\n\nA comprehensive storage package for Fenixflow applications, providing **async connection pools** for modern Python applications, database connections, object storage abstractions, migration management, and model utilities. Supports PostgreSQL, MySQL, Microsoft SQL Server, local filesystem storage, and S3-compatible services.\n\nCreated by **Ben Moag** at **[Fenixflow](https://fenixflow.com)**\n\n## \ud83d\udea8 Version 1.0.0 - Async Pools\n\n**Breaking Change**: All connection pools are now async for better performance and scalability. Use direct connections for synchronous code.\n\n## Quick Start\n\n### Installation\n\n#### From PyPI\n```bash\npip install ff-storage\n```\n\n#### From GitLab\n```bash\npip install git+https://gitlab.com/fenixflow/fenix-packages.git#subdirectory=ff-storage\n```\n\n### Async Pool (FastAPI, Production)\n\n```python\nfrom ff_storage.db import PostgresPool\n\n# Create async connection pool\npool = PostgresPool(\n    dbname=\"fenix_db\",\n    user=\"fenix\",\n    password=\"password\",\n    host=\"localhost\",\n    port=5432,\n    min_size=10,\n    max_size=20\n)\n\n# Connect once at startup\nawait pool.connect()\n\n# Use many times - pool handles connections internally\n# Returns dictionaries by default for easy access\nresults = await pool.fetch_all(\"SELECT id, title, status FROM documents WHERE status = $1\", \"active\")\n# results = [{'id': 1, 'title': 'Doc 1', 'status': 'active'}, ...]\n\nprint(results[0]['title'])  # Access by column name - intuitive!\n\n# Fetch single row\nuser = await pool.fetch_one(\"SELECT id, name, email FROM users WHERE id = $1\", 123)\n# user = {'id': 123, 'name': 'Alice', 'email': 'alice@example.com'}\n\n# Disconnect once at shutdown\nawait pool.disconnect()\n```\n\n### Sync Connection (Scripts, Simple Apps)\n\n```python\nfrom ff_storage.db import Postgres\n\n# Create direct connection\ndb = Postgres(\n    dbname=\"fenix_db\",\n    user=\"fenix\",\n    password=\"password\",\n    host=\"localhost\",\n    port=5432\n)\n\n# Connect and query - returns dicts by default\ndb.connect()\nresults = db.read_query(\"SELECT id, title, status FROM documents WHERE status = %(status)s\", {\"status\": \"active\"})\n# results = [{'id': 1, 'title': 'Doc 1', 'status': 'active'}, ...]\n\nprint(results[0]['title'])  # Easy access by column name\n\ndb.close_connection()\n```\n\n### FastAPI Integration\n\n```python\nfrom fastapi import FastAPI\nfrom ff_storage.db import PostgresPool\n\napp = FastAPI()\n\n# Create pool once\napp.state.db = PostgresPool(\n    dbname=\"fenix_db\",\n    user=\"fenix\",\n    password=\"password\",\n    host=\"localhost\",\n    min_size=10,\n    max_size=20\n)\n\n@app.on_event(\"startup\")\nasync def startup():\n    await app.state.db.connect()\n\n@app.on_event(\"shutdown\")\nasync def shutdown():\n    await app.state.db.disconnect()\n\n@app.get(\"/users/{user_id}\")\nasync def get_user(user_id: int):\n    # Pool handles connection automatically\n    user = await app.state.db.fetch_one(\n        \"SELECT * FROM users WHERE id = $1\", user_id\n    )\n    return user\n```\n\n## Migration Guide (v0.3.0 \u2192 v1.0.0)\n\n### Breaking Changes\n\n**Pools are now async** - all `*Pool` classes require `await`:\n\n| v0.3.0 (Sync) | v1.0.0 (Async) |\n|---------------|----------------|\n| `pool.connect()` | `await pool.connect()` |\n| `pool.read_query()` | `await pool.fetch_all()` |\n| `pool.execute()` | `await pool.execute()` |\n| `pool.close_connection()` | `await pool.disconnect()` |\n\n**For sync code**, use direct connections (no breaking changes):\n- `Postgres` (sync) - unchanged\n- `MySQL` (sync) - unchanged\n- `SQLServer` (sync) - unchanged\n\n## Features\n\n### Database Operations\n- **Async Connection Pools**: High-performance async pools for PostgreSQL, MySQL, and SQL Server\n- **Sync Direct Connections**: Simple sync connections for scripts and non-async code\n- **Multi-Database Support**: Uniform interface across PostgreSQL, MySQL, and Microsoft SQL Server\n- **Transaction Management**: Built-in support for transactions with rollback\n- **Batch Operations**: Execute many queries efficiently\n- **Query Builder**: SQL query construction utilities\n\n### Object Storage\n- **Multiple Backends**: Local filesystem and S3/S3-compatible services\n- **Async Operations**: Non-blocking I/O for better performance\n- **Streaming Support**: Handle large files without memory overhead\n- **Atomic Writes**: Safe file operations with temp file + rename\n- **Metadata Management**: Store and retrieve metadata with objects\n\n### Migration System\n- **SQL File-Based**: Simple, version-controlled migrations\n- **Automatic Tracking**: Keeps track of applied migrations\n- **Rollback Support**: Undo migrations when needed\n\n## Core Components\n\n### Database Connections\n\n#### PostgreSQL with Connection Pooling\n```python\nfrom ff_storage import PostgresPool\n\n# Initialize pool\ndb = PostgresPool(\n    dbname=\"fenix_db\",\n    user=\"fenix\",\n    password=\"password\",\n    host=\"localhost\",\n    port=5432,\n    pool_size=20\n)\n\n# Use connection from pool - returns dicts by default\ndb.connect()\ntry:\n    # Execute queries - returns list of dicts\n    results = db.read_query(\"SELECT id, title, status FROM documents WHERE status = %s\", {\"status\": \"active\"})\n    # results = [{'id': 1, 'title': 'Doc 1', 'status': 'active'}, ...]\n    print(results[0]['title'])  # Easy access by column name\n\n    # Execute with RETURNING\n    new_id = db.execute_query(\n        \"INSERT INTO documents (title) VALUES (%s) RETURNING id\",\n        {\"title\": \"New Document\"}\n    )\n    # new_id = [{'id': 123}]\n\n    # Transaction example\n    db.begin_transaction()\n    try:\n        db.execute(\"UPDATE documents SET status = %s WHERE id = %s\", {\"status\": \"archived\", \"id\": 123})\n        db.execute(\"INSERT INTO audit_log (action) VALUES (%s)\", {\"action\": \"archive\"})\n        db.commit_transaction()\n    except Exception:\n        db.rollback_transaction()\n        raise\nfinally:\n    # Return connection to pool\n    db.close_connection()\n```\n\n#### MySQL with Connection Pooling\n```python\nfrom ff_storage import MySQLPool\n\n# Initialize pool\ndb = MySQLPool(\n    dbname=\"fenix_db\",\n    user=\"root\",\n    password=\"password\",\n    host=\"localhost\",\n    port=3306,\n    pool_size=10\n)\n\n# Similar usage pattern as PostgreSQL - returns dicts by default\ndb.connect()\nresults = db.read_query(\"SELECT id, title, status FROM documents WHERE status = %s\", {\"status\": \"active\"})\n# results = [{'id': 1, 'title': 'Doc 1', 'status': 'active'}, ...]\nprint(results[0]['title'])  # Easy access by column name\ndb.close_connection()\n```\n\n#### Microsoft SQL Server with Connection Pooling\n```python\nfrom ff_storage import SQLServerPool\n\n# Initialize pool\ndb = SQLServerPool(\n    dbname=\"fenix_db\",\n    user=\"sa\",\n    password=\"YourPassword123\",\n    host=\"localhost\",\n    port=1433,\n    driver=\"ODBC Driver 18 for SQL Server\",\n    pool_size=10\n)\n\n# Connect and execute queries - returns dicts by default\ndb.connect()\ntry:\n    # Read query - returns list of dicts\n    results = db.read_query(\"SELECT id, title, status FROM documents WHERE status = ?\", {\"status\": \"active\"})\n    # results = [{'id': 1, 'title': 'Doc 1', 'status': 'active'}, ...]\n    print(results[0]['title'])  # Easy access by column name\n\n    # Execute with OUTPUT clause\n    new_id = db.execute_query(\n        \"INSERT INTO documents (title) OUTPUT INSERTED.id VALUES (?)\",\n        {\"title\": \"New Document\"}\n    )\n    # new_id = [{'id': 123}]\n\n    # Check table existence\n    if db.table_exists(\"users\", schema=\"dbo\"):\n        columns = db.get_table_columns(\"users\", schema=\"dbo\")\nfinally:\n    db.close_connection()\n```\n\n### Object Storage\n\n#### Local Filesystem Storage\n```python\nfrom ff_storage import LocalObjectStorage\nimport asyncio\n\nasync def main():\n    # Initialize local storage\n    storage = LocalObjectStorage(\"/var/data/documents\")\n\n    # Write file with metadata\n    await storage.write(\n        \"reports/2025/quarterly.pdf\",\n        pdf_bytes,\n        metadata={\"content-type\": \"application/pdf\", \"author\": \"system\"}\n    )\n\n    # Read file\n    data = await storage.read(\"reports/2025/quarterly.pdf\")\n\n    # Check existence\n    exists = await storage.exists(\"reports/2025/quarterly.pdf\")\n\n    # List files with prefix\n    files = await storage.list_keys(prefix=\"reports/2025/\")\n\n    # Delete file\n    await storage.delete(\"reports/2025/quarterly.pdf\")\n\nasyncio.run(main())\n```\n\n#### S3-Compatible Storage\n```python\nfrom ff_storage import S3ObjectStorage\nimport asyncio\n\nasync def main():\n    # AWS S3\n    s3 = S3ObjectStorage(\n        bucket=\"fenix-documents\",\n        region=\"us-east-1\"\n    )\n\n    # Or MinIO/other S3-compatible\n    s3 = S3ObjectStorage(\n        bucket=\"fenix-documents\",\n        endpoint_url=\"http://localhost:9000\",\n        access_key=\"minioadmin\",\n        secret_key=\"minioadmin\"\n    )\n\n    # Write file\n    await s3.write(\"docs/report.pdf\", pdf_bytes)\n\n    # Stream large files\n    async for chunk in s3.read_stream(\"large_file.bin\", chunk_size=8192):\n        await process_chunk(chunk)\n\n    # Multipart upload for large files (automatic)\n    await s3.write(\"huge_file.bin\", huge_data)  # Automatically uses multipart if > 5MB\n\nasyncio.run(main())\n```\n\n### Migration Management\n\n```python\nfrom ff_storage.db.migrations import MigrationManager\n\n# Setup migration manager\nmanager = MigrationManager(db_connection, \"./migrations\")\n\n# Run all pending migrations\nmanager.migrate()\n\n# Create new migration\nmanager.create_migration(\"add_user_roles\")\n\n# Check migration status\npending = manager.get_pending_migrations()\napplied = manager.get_applied_migrations()\n```\n\nMigration files follow the naming pattern: `001_initial_schema.sql`, `002_add_indexes.sql`, etc.\n\n### Base Models\n\n```python\nfrom ff_storage.db.models import BaseModel, BaseModelWithDates\nfrom dataclasses import dataclass\nfrom typing import Optional\nimport uuid\n\n@dataclass\nclass Document(BaseModelWithDates):\n    title: str\n    content: str\n    status: str = \"draft\"\n    author_id: Optional[uuid.UUID] = None\n\n# Automatic UUID and timestamp handling\ndoc = Document(\n    title=\"Quarterly Report\",\n    content=\"...\",\n    status=\"published\"\n)\n# doc.id = UUID automatically generated\n# doc.created_at = current timestamp\n# doc.updated_at = current timestamp\n```\n\n## Advanced Features\n\n### Transaction Management\n```python\n# Context manager for automatic transaction handling\nasync def transfer_ownership(db, doc_id, new_owner_id):\n    db.begin_transaction()\n    try:\n        # Multiple operations in single transaction\n        db.execute(\"UPDATE documents SET owner_id = %s WHERE id = %s\",\n                  {\"owner_id\": new_owner_id, \"id\": doc_id})\n        db.execute(\"INSERT INTO audit_log (action, doc_id, user_id) VALUES (%s, %s, %s)\",\n                  {\"action\": \"transfer\", \"doc_id\": doc_id, \"user_id\": new_owner_id})\n        db.commit_transaction()\n    except Exception as e:\n        db.rollback_transaction()\n        raise\n```\n\n### Connection Pool Monitoring\n```python\n# Check pool statistics\npool = PostgresPool(...)\nopen_connections = pool.get_open_connections()\nprint(f\"Open connections: {open_connections}\")\n\n# Graceful shutdown\npool.close_all_connections()\n```\n\n### Query Builder Utilities\n```python\nfrom ff_storage.db.sql import build_insert, build_update, build_select\n\n# Build INSERT query\nquery, params = build_insert(\"documents\", {\n    \"title\": \"New Doc\",\n    \"status\": \"draft\"\n})\n\n# Build UPDATE query\nquery, params = build_update(\"documents\",\n    {\"status\": \"published\"},\n    {\"id\": doc_id}\n)\n\n# Build SELECT with conditions\nquery, params = build_select(\"documents\",\n    columns=[\"id\", \"title\"],\n    where={\"status\": \"published\", \"author_id\": user_id}\n)\n```\n\n## Error Handling\n\n```python\nfrom ff_storage.exceptions import StorageError, DatabaseError\n\ntry:\n    db.connect()\n    results = db.read_query(\"SELECT * FROM documents\")\nexcept DatabaseError as e:\n    print(f\"Database error: {e}\")\nexcept StorageError as e:\n    print(f\"Storage error: {e}\")\nfinally:\n    db.close_connection()\n```\n\n## Testing\n\n```bash\n# Run tests\npytest tests/\n\n# With coverage\npytest --cov=ff_storage tests/\n\n# Run specific test file\npytest tests/test_postgres.py\n\n# Run with verbose output\npytest -v tests/\n```\n\n## Configuration\n\n### Environment Variables\n```bash\n# Database\nexport DB_HOST=localhost\nexport DB_PORT=5432\nexport DB_NAME=fenix_db\nexport DB_USER=fenix\nexport DB_PASSWORD=secret\n\n# S3 Storage\nexport AWS_ACCESS_KEY_ID=your-key\nexport AWS_SECRET_ACCESS_KEY=your-secret\nexport AWS_DEFAULT_REGION=us-east-1\n\n# Local Storage\nexport STORAGE_PATH=/var/data/documents\n```\n\n### Configuration File\n```python\n# config.py\nfrom ff_storage import PostgresPool, S3ObjectStorage\n\n# Database configuration\nDATABASE = {\n    \"dbname\": os.getenv(\"DB_NAME\", \"fenix_db\"),\n    \"user\": os.getenv(\"DB_USER\", \"fenix\"),\n    \"password\": os.getenv(\"DB_PASSWORD\"),\n    \"host\": os.getenv(\"DB_HOST\", \"localhost\"),\n    \"port\": int(os.getenv(\"DB_PORT\", 5432)),\n    \"pool_size\": 20\n}\n\n# Storage configuration\nSTORAGE = {\n    \"bucket\": os.getenv(\"S3_BUCKET\", \"fenix-documents\"),\n    \"region\": os.getenv(\"AWS_DEFAULT_REGION\", \"us-east-1\")\n}\n\n# Initialize\ndb = PostgresPool(**DATABASE)\nstorage = S3ObjectStorage(**STORAGE)\n```\n\n## Contributing\n\nContributions are welcome! Please feel free to submit a Pull Request.\n\n## License\n\nMIT License - See [LICENSE](LICENSE) file for details.\n\n## Author\n\nCreated and maintained by **Ben Moag** at **[Fenixflow](https://fenixflow.com)**\n\nFor more information, visit the [GitLab repository](https://gitlab.com/fenixflow/fenix-packages).\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Fenixflow storage package for database and file operations with async connection pools",
    "version": "1.0.0",
    "project_urls": {
        "Bug Tracker": "https://gitlab.com/fenixflow/fenix-packages/-/issues",
        "Changelog": "https://gitlab.com/fenixflow/fenix-packages/-/blob/main/ff-storage/CHANGELOG.md",
        "Documentation": "https://gitlab.com/fenixflow/fenix-packages/-/tree/main/ff-storage",
        "Homepage": "https://fenixflow.com",
        "Repository": "https://gitlab.com/fenixflow/fenix-packages"
    },
    "split_keywords": [
        "storage",
        " database",
        " postgresql",
        " mysql",
        " sqlserver",
        " s3",
        " object-storage",
        " file-storage",
        " migrations",
        " fenixflow"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "a615096dfad9b6d82a563d4e6e646a2cdaad1ccc0759a1d93cb3a5b99849c8a8",
                "md5": "8875bcbffb5d2f70f3ddc5a65ba40855",
                "sha256": "7af7b17253541859360bc0ef19ba45224bc04764a3f7cf5739312f1a229f5eac"
            },
            "downloads": -1,
            "filename": "ff_storage-1.0.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "8875bcbffb5d2f70f3ddc5a65ba40855",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.10",
            "size": 34743,
            "upload_time": "2025-10-06T19:41:10",
            "upload_time_iso_8601": "2025-10-06T19:41:10.127229Z",
            "url": "https://files.pythonhosted.org/packages/a6/15/096dfad9b6d82a563d4e6e646a2cdaad1ccc0759a1d93cb3a5b99849c8a8/ff_storage-1.0.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "b34cca50a138fd78aad4fe44bea45a5f56d685f01bf8a19da848dd80c3619112",
                "md5": "91c6c6200a8bca1fb9e0a79747995b55",
                "sha256": "bb115a0922a8220b4e2dd2981241b0900af53ad47b54a052332581b35d120465"
            },
            "downloads": -1,
            "filename": "ff_storage-1.0.0.tar.gz",
            "has_sig": false,
            "md5_digest": "91c6c6200a8bca1fb9e0a79747995b55",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.10",
            "size": 38132,
            "upload_time": "2025-10-06T19:41:11",
            "upload_time_iso_8601": "2025-10-06T19:41:11.099409Z",
            "url": "https://files.pythonhosted.org/packages/b3/4c/ca50a138fd78aad4fe44bea45a5f56d685f01bf8a19da848dd80c3619112/ff_storage-1.0.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-10-06 19:41:11",
    "github": false,
    "gitlab": true,
    "bitbucket": false,
    "codeberg": false,
    "gitlab_user": "fenixflow",
    "gitlab_project": "fenix-packages",
    "lcname": "ff-storage"
}
        
Elapsed time: 1.72683s