# ff-storage
[](https://badge.fury.io/py/ff-storage)
[](https://pypi.org/project/ff-storage/)
[](https://opensource.org/licenses/MIT)
A comprehensive storage package for Fenixflow applications, providing **async connection pools** for modern Python applications, database connections, object storage abstractions, migration management, and model utilities. Supports PostgreSQL, MySQL, Microsoft SQL Server, local filesystem storage, and S3-compatible services.
Created by **Ben Moag** at **[Fenixflow](https://fenixflow.com)**
## 🚨 Version 1.0.0 - Async Pools
**Breaking Change**: All connection pools are now async for better performance and scalability. Use direct connections for synchronous code.
## Quick Start
### Installation
#### From PyPI
```bash
pip install ff-storage
```
#### From GitLab
```bash
pip install git+https://gitlab.com/fenixflow/fenix-packages.git#subdirectory=ff-storage
```
### Async Pool (FastAPI, Production)
```python
from ff_storage.db import PostgresPool
# Create async connection pool
pool = PostgresPool(
dbname="fenix_db",
user="fenix",
password="password",
host="localhost",
port=5432,
min_size=10,
max_size=20
)
# Connect once at startup
await pool.connect()
# Use many times - pool handles connections internally
# Returns dictionaries by default for easy access
results = await pool.fetch_all("SELECT id, title, status FROM documents WHERE status = $1", "active")
# results = [{'id': 1, 'title': 'Doc 1', 'status': 'active'}, ...]
print(results[0]['title']) # Access by column name - intuitive!
# Fetch single row
user = await pool.fetch_one("SELECT id, name, email FROM users WHERE id = $1", 123)
# user = {'id': 123, 'name': 'Alice', 'email': 'alice@example.com'}
# Disconnect once at shutdown
await pool.disconnect()
```
### Sync Connection (Scripts, Simple Apps)
```python
from ff_storage.db import Postgres
# Create direct connection
db = Postgres(
dbname="fenix_db",
user="fenix",
password="password",
host="localhost",
port=5432
)
# Connect and query - returns dicts by default
db.connect()
results = db.read_query("SELECT id, title, status FROM documents WHERE status = %(status)s", {"status": "active"})
# results = [{'id': 1, 'title': 'Doc 1', 'status': 'active'}, ...]
print(results[0]['title']) # Easy access by column name
db.close_connection()
```
### FastAPI Integration
```python
from fastapi import FastAPI
from ff_storage.db import PostgresPool
app = FastAPI()
# Create pool once
app.state.db = PostgresPool(
dbname="fenix_db",
user="fenix",
password="password",
host="localhost",
min_size=10,
max_size=20
)
@app.on_event("startup")
async def startup():
await app.state.db.connect()
@app.on_event("shutdown")
async def shutdown():
await app.state.db.disconnect()
@app.get("/users/{user_id}")
async def get_user(user_id: int):
# Pool handles connection automatically
user = await app.state.db.fetch_one(
"SELECT * FROM users WHERE id = $1", user_id
)
return user
```
## Migration Guide (v0.3.0 → v1.0.0)
### Breaking Changes
**Pools are now async** - all `*Pool` classes require `await`:
| v0.3.0 (Sync) | v1.0.0 (Async) |
|---------------|----------------|
| `pool.connect()` | `await pool.connect()` |
| `pool.read_query()` | `await pool.fetch_all()` |
| `pool.execute()` | `await pool.execute()` |
| `pool.close_connection()` | `await pool.disconnect()` |
**For sync code**, use direct connections (no breaking changes):
- `Postgres` (sync) - unchanged
- `MySQL` (sync) - unchanged
- `SQLServer` (sync) - unchanged
## Features
### Database Operations
- **Async Connection Pools**: High-performance async pools for PostgreSQL, MySQL, and SQL Server
- **Sync Direct Connections**: Simple sync connections for scripts and non-async code
- **Multi-Database Support**: Uniform interface across PostgreSQL, MySQL, and Microsoft SQL Server
- **Transaction Management**: Built-in support for transactions with rollback
- **Batch Operations**: Execute many queries efficiently
- **Query Builder**: SQL query construction utilities
### Object Storage
- **Multiple Backends**: Local filesystem and S3/S3-compatible services
- **Async Operations**: Non-blocking I/O for better performance
- **Streaming Support**: Handle large files without memory overhead
- **Atomic Writes**: Safe file operations with temp file + rename
- **Metadata Management**: Store and retrieve metadata with objects
### Migration System
- **SQL File-Based**: Simple, version-controlled migrations
- **Automatic Tracking**: Keeps track of applied migrations
- **Rollback Support**: Undo migrations when needed
## Core Components
### Database Connections
#### PostgreSQL with Connection Pooling
```python
from ff_storage import PostgresPool
# Initialize pool
db = PostgresPool(
dbname="fenix_db",
user="fenix",
password="password",
host="localhost",
port=5432,
pool_size=20
)
# Use connection from pool - returns dicts by default
db.connect()
try:
# Execute queries - returns list of dicts
results = db.read_query("SELECT id, title, status FROM documents WHERE status = %s", {"status": "active"})
# results = [{'id': 1, 'title': 'Doc 1', 'status': 'active'}, ...]
print(results[0]['title']) # Easy access by column name
# Execute with RETURNING
new_id = db.execute_query(
"INSERT INTO documents (title) VALUES (%s) RETURNING id",
{"title": "New Document"}
)
# new_id = [{'id': 123}]
# Transaction example
db.begin_transaction()
try:
db.execute("UPDATE documents SET status = %s WHERE id = %s", {"status": "archived", "id": 123})
db.execute("INSERT INTO audit_log (action) VALUES (%s)", {"action": "archive"})
db.commit_transaction()
except Exception:
db.rollback_transaction()
raise
finally:
# Return connection to pool
db.close_connection()
```
#### MySQL with Connection Pooling
```python
from ff_storage import MySQLPool
# Initialize pool
db = MySQLPool(
dbname="fenix_db",
user="root",
password="password",
host="localhost",
port=3306,
pool_size=10
)
# Similar usage pattern as PostgreSQL - returns dicts by default
db.connect()
results = db.read_query("SELECT id, title, status FROM documents WHERE status = %s", {"status": "active"})
# results = [{'id': 1, 'title': 'Doc 1', 'status': 'active'}, ...]
print(results[0]['title']) # Easy access by column name
db.close_connection()
```
#### Microsoft SQL Server with Connection Pooling
```python
from ff_storage import SQLServerPool
# Initialize pool
db = SQLServerPool(
dbname="fenix_db",
user="sa",
password="YourPassword123",
host="localhost",
port=1433,
driver="ODBC Driver 18 for SQL Server",
pool_size=10
)
# Connect and execute queries - returns dicts by default
db.connect()
try:
# Read query - returns list of dicts
results = db.read_query("SELECT id, title, status FROM documents WHERE status = ?", {"status": "active"})
# results = [{'id': 1, 'title': 'Doc 1', 'status': 'active'}, ...]
print(results[0]['title']) # Easy access by column name
# Execute with OUTPUT clause
new_id = db.execute_query(
"INSERT INTO documents (title) OUTPUT INSERTED.id VALUES (?)",
{"title": "New Document"}
)
# new_id = [{'id': 123}]
# Check table existence
if db.table_exists("users", schema="dbo"):
columns = db.get_table_columns("users", schema="dbo")
finally:
db.close_connection()
```
### Object Storage
#### Local Filesystem Storage
```python
from ff_storage import LocalObjectStorage
import asyncio
async def main():
# Initialize local storage
storage = LocalObjectStorage("/var/data/documents")
# Write file with metadata
await storage.write(
"reports/2025/quarterly.pdf",
pdf_bytes,
metadata={"content-type": "application/pdf", "author": "system"}
)
# Read file
data = await storage.read("reports/2025/quarterly.pdf")
# Check existence
exists = await storage.exists("reports/2025/quarterly.pdf")
# List files with prefix
files = await storage.list_keys(prefix="reports/2025/")
# Delete file
await storage.delete("reports/2025/quarterly.pdf")
asyncio.run(main())
```
#### S3-Compatible Storage
```python
from ff_storage import S3ObjectStorage
import asyncio
async def main():
# AWS S3
s3 = S3ObjectStorage(
bucket="fenix-documents",
region="us-east-1"
)
# Or MinIO/other S3-compatible
s3 = S3ObjectStorage(
bucket="fenix-documents",
endpoint_url="http://localhost:9000",
access_key="minioadmin",
secret_key="minioadmin"
)
# Write file
await s3.write("docs/report.pdf", pdf_bytes)
# Stream large files
async for chunk in s3.read_stream("large_file.bin", chunk_size=8192):
await process_chunk(chunk)
# Multipart upload for large files (automatic)
await s3.write("huge_file.bin", huge_data) # Automatically uses multipart if > 5MB
asyncio.run(main())
```
### Migration Management
```python
from ff_storage.db.migrations import MigrationManager
# Setup migration manager
manager = MigrationManager(db_connection, "./migrations")
# Run all pending migrations
manager.migrate()
# Create new migration
manager.create_migration("add_user_roles")
# Check migration status
pending = manager.get_pending_migrations()
applied = manager.get_applied_migrations()
```
Migration files follow the naming pattern: `001_initial_schema.sql`, `002_add_indexes.sql`, etc.
### Base Models
```python
from ff_storage.db.models import BaseModel, BaseModelWithDates
from dataclasses import dataclass
from typing import Optional
import uuid
@dataclass
class Document(BaseModelWithDates):
title: str
content: str
status: str = "draft"
author_id: Optional[uuid.UUID] = None
# Automatic UUID and timestamp handling
doc = Document(
title="Quarterly Report",
content="...",
status="published"
)
# doc.id = UUID automatically generated
# doc.created_at = current timestamp
# doc.updated_at = current timestamp
```
## Advanced Features
### Transaction Management
```python
# Context manager for automatic transaction handling
async def transfer_ownership(db, doc_id, new_owner_id):
db.begin_transaction()
try:
# Multiple operations in single transaction
db.execute("UPDATE documents SET owner_id = %s WHERE id = %s",
{"owner_id": new_owner_id, "id": doc_id})
db.execute("INSERT INTO audit_log (action, doc_id, user_id) VALUES (%s, %s, %s)",
{"action": "transfer", "doc_id": doc_id, "user_id": new_owner_id})
db.commit_transaction()
except Exception as e:
db.rollback_transaction()
raise
```
### Connection Pool Monitoring
```python
# Check pool statistics
pool = PostgresPool(...)
open_connections = pool.get_open_connections()
print(f"Open connections: {open_connections}")
# Graceful shutdown
pool.close_all_connections()
```
### Query Builder Utilities
```python
from ff_storage.db.sql import build_insert, build_update, build_select
# Build INSERT query
query, params = build_insert("documents", {
"title": "New Doc",
"status": "draft"
})
# Build UPDATE query
query, params = build_update("documents",
{"status": "published"},
{"id": doc_id}
)
# Build SELECT with conditions
query, params = build_select("documents",
columns=["id", "title"],
where={"status": "published", "author_id": user_id}
)
```
## Error Handling
```python
from ff_storage.exceptions import StorageError, DatabaseError
try:
db.connect()
results = db.read_query("SELECT * FROM documents")
except DatabaseError as e:
print(f"Database error: {e}")
except StorageError as e:
print(f"Storage error: {e}")
finally:
db.close_connection()
```
## Testing
```bash
# Run tests
pytest tests/
# With coverage
pytest --cov=ff_storage tests/
# Run specific test file
pytest tests/test_postgres.py
# Run with verbose output
pytest -v tests/
```
## Configuration
### Environment Variables
```bash
# Database
export DB_HOST=localhost
export DB_PORT=5432
export DB_NAME=fenix_db
export DB_USER=fenix
export DB_PASSWORD=secret
# S3 Storage
export AWS_ACCESS_KEY_ID=your-key
export AWS_SECRET_ACCESS_KEY=your-secret
export AWS_DEFAULT_REGION=us-east-1
# Local Storage
export STORAGE_PATH=/var/data/documents
```
### Configuration File
```python
# config.py
from ff_storage import PostgresPool, S3ObjectStorage
# Database configuration
DATABASE = {
"dbname": os.getenv("DB_NAME", "fenix_db"),
"user": os.getenv("DB_USER", "fenix"),
"password": os.getenv("DB_PASSWORD"),
"host": os.getenv("DB_HOST", "localhost"),
"port": int(os.getenv("DB_PORT", 5432)),
"pool_size": 20
}
# Storage configuration
STORAGE = {
"bucket": os.getenv("S3_BUCKET", "fenix-documents"),
"region": os.getenv("AWS_DEFAULT_REGION", "us-east-1")
}
# Initialize
db = PostgresPool(**DATABASE)
storage = S3ObjectStorage(**STORAGE)
```
## Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
## License
MIT License - See [LICENSE](LICENSE) file for details.
## Author
Created and maintained by **Ben Moag** at **[Fenixflow](https://fenixflow.com)**
For more information, visit the [GitLab repository](https://gitlab.com/fenixflow/fenix-packages).
Raw data
{
"_id": null,
"home_page": null,
"name": "ff-storage",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.10",
"maintainer_email": "Fenixflow Team <dev@fenixflow.com>",
"keywords": "storage, database, postgresql, mysql, sqlserver, s3, object-storage, file-storage, migrations, fenixflow",
"author": null,
"author_email": "Ben Moag <dev@fenixflow.com>",
"download_url": "https://files.pythonhosted.org/packages/b3/4c/ca50a138fd78aad4fe44bea45a5f56d685f01bf8a19da848dd80c3619112/ff_storage-1.0.0.tar.gz",
"platform": null,
"description": "# ff-storage\n\n[](https://badge.fury.io/py/ff-storage)\n[](https://pypi.org/project/ff-storage/)\n[](https://opensource.org/licenses/MIT)\n\nA comprehensive storage package for Fenixflow applications, providing **async connection pools** for modern Python applications, database connections, object storage abstractions, migration management, and model utilities. Supports PostgreSQL, MySQL, Microsoft SQL Server, local filesystem storage, and S3-compatible services.\n\nCreated by **Ben Moag** at **[Fenixflow](https://fenixflow.com)**\n\n## \ud83d\udea8 Version 1.0.0 - Async Pools\n\n**Breaking Change**: All connection pools are now async for better performance and scalability. Use direct connections for synchronous code.\n\n## Quick Start\n\n### Installation\n\n#### From PyPI\n```bash\npip install ff-storage\n```\n\n#### From GitLab\n```bash\npip install git+https://gitlab.com/fenixflow/fenix-packages.git#subdirectory=ff-storage\n```\n\n### Async Pool (FastAPI, Production)\n\n```python\nfrom ff_storage.db import PostgresPool\n\n# Create async connection pool\npool = PostgresPool(\n dbname=\"fenix_db\",\n user=\"fenix\",\n password=\"password\",\n host=\"localhost\",\n port=5432,\n min_size=10,\n max_size=20\n)\n\n# Connect once at startup\nawait pool.connect()\n\n# Use many times - pool handles connections internally\n# Returns dictionaries by default for easy access\nresults = await pool.fetch_all(\"SELECT id, title, status FROM documents WHERE status = $1\", \"active\")\n# results = [{'id': 1, 'title': 'Doc 1', 'status': 'active'}, ...]\n\nprint(results[0]['title']) # Access by column name - intuitive!\n\n# Fetch single row\nuser = await pool.fetch_one(\"SELECT id, name, email FROM users WHERE id = $1\", 123)\n# user = {'id': 123, 'name': 'Alice', 'email': 'alice@example.com'}\n\n# Disconnect once at shutdown\nawait pool.disconnect()\n```\n\n### Sync Connection (Scripts, Simple Apps)\n\n```python\nfrom ff_storage.db import Postgres\n\n# Create direct connection\ndb = Postgres(\n dbname=\"fenix_db\",\n user=\"fenix\",\n password=\"password\",\n host=\"localhost\",\n port=5432\n)\n\n# Connect and query - returns dicts by default\ndb.connect()\nresults = db.read_query(\"SELECT id, title, status FROM documents WHERE status = %(status)s\", {\"status\": \"active\"})\n# results = [{'id': 1, 'title': 'Doc 1', 'status': 'active'}, ...]\n\nprint(results[0]['title']) # Easy access by column name\n\ndb.close_connection()\n```\n\n### FastAPI Integration\n\n```python\nfrom fastapi import FastAPI\nfrom ff_storage.db import PostgresPool\n\napp = FastAPI()\n\n# Create pool once\napp.state.db = PostgresPool(\n dbname=\"fenix_db\",\n user=\"fenix\",\n password=\"password\",\n host=\"localhost\",\n min_size=10,\n max_size=20\n)\n\n@app.on_event(\"startup\")\nasync def startup():\n await app.state.db.connect()\n\n@app.on_event(\"shutdown\")\nasync def shutdown():\n await app.state.db.disconnect()\n\n@app.get(\"/users/{user_id}\")\nasync def get_user(user_id: int):\n # Pool handles connection automatically\n user = await app.state.db.fetch_one(\n \"SELECT * FROM users WHERE id = $1\", user_id\n )\n return user\n```\n\n## Migration Guide (v0.3.0 \u2192 v1.0.0)\n\n### Breaking Changes\n\n**Pools are now async** - all `*Pool` classes require `await`:\n\n| v0.3.0 (Sync) | v1.0.0 (Async) |\n|---------------|----------------|\n| `pool.connect()` | `await pool.connect()` |\n| `pool.read_query()` | `await pool.fetch_all()` |\n| `pool.execute()` | `await pool.execute()` |\n| `pool.close_connection()` | `await pool.disconnect()` |\n\n**For sync code**, use direct connections (no breaking changes):\n- `Postgres` (sync) - unchanged\n- `MySQL` (sync) - unchanged\n- `SQLServer` (sync) - unchanged\n\n## Features\n\n### Database Operations\n- **Async Connection Pools**: High-performance async pools for PostgreSQL, MySQL, and SQL Server\n- **Sync Direct Connections**: Simple sync connections for scripts and non-async code\n- **Multi-Database Support**: Uniform interface across PostgreSQL, MySQL, and Microsoft SQL Server\n- **Transaction Management**: Built-in support for transactions with rollback\n- **Batch Operations**: Execute many queries efficiently\n- **Query Builder**: SQL query construction utilities\n\n### Object Storage\n- **Multiple Backends**: Local filesystem and S3/S3-compatible services\n- **Async Operations**: Non-blocking I/O for better performance\n- **Streaming Support**: Handle large files without memory overhead\n- **Atomic Writes**: Safe file operations with temp file + rename\n- **Metadata Management**: Store and retrieve metadata with objects\n\n### Migration System\n- **SQL File-Based**: Simple, version-controlled migrations\n- **Automatic Tracking**: Keeps track of applied migrations\n- **Rollback Support**: Undo migrations when needed\n\n## Core Components\n\n### Database Connections\n\n#### PostgreSQL with Connection Pooling\n```python\nfrom ff_storage import PostgresPool\n\n# Initialize pool\ndb = PostgresPool(\n dbname=\"fenix_db\",\n user=\"fenix\",\n password=\"password\",\n host=\"localhost\",\n port=5432,\n pool_size=20\n)\n\n# Use connection from pool - returns dicts by default\ndb.connect()\ntry:\n # Execute queries - returns list of dicts\n results = db.read_query(\"SELECT id, title, status FROM documents WHERE status = %s\", {\"status\": \"active\"})\n # results = [{'id': 1, 'title': 'Doc 1', 'status': 'active'}, ...]\n print(results[0]['title']) # Easy access by column name\n\n # Execute with RETURNING\n new_id = db.execute_query(\n \"INSERT INTO documents (title) VALUES (%s) RETURNING id\",\n {\"title\": \"New Document\"}\n )\n # new_id = [{'id': 123}]\n\n # Transaction example\n db.begin_transaction()\n try:\n db.execute(\"UPDATE documents SET status = %s WHERE id = %s\", {\"status\": \"archived\", \"id\": 123})\n db.execute(\"INSERT INTO audit_log (action) VALUES (%s)\", {\"action\": \"archive\"})\n db.commit_transaction()\n except Exception:\n db.rollback_transaction()\n raise\nfinally:\n # Return connection to pool\n db.close_connection()\n```\n\n#### MySQL with Connection Pooling\n```python\nfrom ff_storage import MySQLPool\n\n# Initialize pool\ndb = MySQLPool(\n dbname=\"fenix_db\",\n user=\"root\",\n password=\"password\",\n host=\"localhost\",\n port=3306,\n pool_size=10\n)\n\n# Similar usage pattern as PostgreSQL - returns dicts by default\ndb.connect()\nresults = db.read_query(\"SELECT id, title, status FROM documents WHERE status = %s\", {\"status\": \"active\"})\n# results = [{'id': 1, 'title': 'Doc 1', 'status': 'active'}, ...]\nprint(results[0]['title']) # Easy access by column name\ndb.close_connection()\n```\n\n#### Microsoft SQL Server with Connection Pooling\n```python\nfrom ff_storage import SQLServerPool\n\n# Initialize pool\ndb = SQLServerPool(\n dbname=\"fenix_db\",\n user=\"sa\",\n password=\"YourPassword123\",\n host=\"localhost\",\n port=1433,\n driver=\"ODBC Driver 18 for SQL Server\",\n pool_size=10\n)\n\n# Connect and execute queries - returns dicts by default\ndb.connect()\ntry:\n # Read query - returns list of dicts\n results = db.read_query(\"SELECT id, title, status FROM documents WHERE status = ?\", {\"status\": \"active\"})\n # results = [{'id': 1, 'title': 'Doc 1', 'status': 'active'}, ...]\n print(results[0]['title']) # Easy access by column name\n\n # Execute with OUTPUT clause\n new_id = db.execute_query(\n \"INSERT INTO documents (title) OUTPUT INSERTED.id VALUES (?)\",\n {\"title\": \"New Document\"}\n )\n # new_id = [{'id': 123}]\n\n # Check table existence\n if db.table_exists(\"users\", schema=\"dbo\"):\n columns = db.get_table_columns(\"users\", schema=\"dbo\")\nfinally:\n db.close_connection()\n```\n\n### Object Storage\n\n#### Local Filesystem Storage\n```python\nfrom ff_storage import LocalObjectStorage\nimport asyncio\n\nasync def main():\n # Initialize local storage\n storage = LocalObjectStorage(\"/var/data/documents\")\n\n # Write file with metadata\n await storage.write(\n \"reports/2025/quarterly.pdf\",\n pdf_bytes,\n metadata={\"content-type\": \"application/pdf\", \"author\": \"system\"}\n )\n\n # Read file\n data = await storage.read(\"reports/2025/quarterly.pdf\")\n\n # Check existence\n exists = await storage.exists(\"reports/2025/quarterly.pdf\")\n\n # List files with prefix\n files = await storage.list_keys(prefix=\"reports/2025/\")\n\n # Delete file\n await storage.delete(\"reports/2025/quarterly.pdf\")\n\nasyncio.run(main())\n```\n\n#### S3-Compatible Storage\n```python\nfrom ff_storage import S3ObjectStorage\nimport asyncio\n\nasync def main():\n # AWS S3\n s3 = S3ObjectStorage(\n bucket=\"fenix-documents\",\n region=\"us-east-1\"\n )\n\n # Or MinIO/other S3-compatible\n s3 = S3ObjectStorage(\n bucket=\"fenix-documents\",\n endpoint_url=\"http://localhost:9000\",\n access_key=\"minioadmin\",\n secret_key=\"minioadmin\"\n )\n\n # Write file\n await s3.write(\"docs/report.pdf\", pdf_bytes)\n\n # Stream large files\n async for chunk in s3.read_stream(\"large_file.bin\", chunk_size=8192):\n await process_chunk(chunk)\n\n # Multipart upload for large files (automatic)\n await s3.write(\"huge_file.bin\", huge_data) # Automatically uses multipart if > 5MB\n\nasyncio.run(main())\n```\n\n### Migration Management\n\n```python\nfrom ff_storage.db.migrations import MigrationManager\n\n# Setup migration manager\nmanager = MigrationManager(db_connection, \"./migrations\")\n\n# Run all pending migrations\nmanager.migrate()\n\n# Create new migration\nmanager.create_migration(\"add_user_roles\")\n\n# Check migration status\npending = manager.get_pending_migrations()\napplied = manager.get_applied_migrations()\n```\n\nMigration files follow the naming pattern: `001_initial_schema.sql`, `002_add_indexes.sql`, etc.\n\n### Base Models\n\n```python\nfrom ff_storage.db.models import BaseModel, BaseModelWithDates\nfrom dataclasses import dataclass\nfrom typing import Optional\nimport uuid\n\n@dataclass\nclass Document(BaseModelWithDates):\n title: str\n content: str\n status: str = \"draft\"\n author_id: Optional[uuid.UUID] = None\n\n# Automatic UUID and timestamp handling\ndoc = Document(\n title=\"Quarterly Report\",\n content=\"...\",\n status=\"published\"\n)\n# doc.id = UUID automatically generated\n# doc.created_at = current timestamp\n# doc.updated_at = current timestamp\n```\n\n## Advanced Features\n\n### Transaction Management\n```python\n# Context manager for automatic transaction handling\nasync def transfer_ownership(db, doc_id, new_owner_id):\n db.begin_transaction()\n try:\n # Multiple operations in single transaction\n db.execute(\"UPDATE documents SET owner_id = %s WHERE id = %s\",\n {\"owner_id\": new_owner_id, \"id\": doc_id})\n db.execute(\"INSERT INTO audit_log (action, doc_id, user_id) VALUES (%s, %s, %s)\",\n {\"action\": \"transfer\", \"doc_id\": doc_id, \"user_id\": new_owner_id})\n db.commit_transaction()\n except Exception as e:\n db.rollback_transaction()\n raise\n```\n\n### Connection Pool Monitoring\n```python\n# Check pool statistics\npool = PostgresPool(...)\nopen_connections = pool.get_open_connections()\nprint(f\"Open connections: {open_connections}\")\n\n# Graceful shutdown\npool.close_all_connections()\n```\n\n### Query Builder Utilities\n```python\nfrom ff_storage.db.sql import build_insert, build_update, build_select\n\n# Build INSERT query\nquery, params = build_insert(\"documents\", {\n \"title\": \"New Doc\",\n \"status\": \"draft\"\n})\n\n# Build UPDATE query\nquery, params = build_update(\"documents\",\n {\"status\": \"published\"},\n {\"id\": doc_id}\n)\n\n# Build SELECT with conditions\nquery, params = build_select(\"documents\",\n columns=[\"id\", \"title\"],\n where={\"status\": \"published\", \"author_id\": user_id}\n)\n```\n\n## Error Handling\n\n```python\nfrom ff_storage.exceptions import StorageError, DatabaseError\n\ntry:\n db.connect()\n results = db.read_query(\"SELECT * FROM documents\")\nexcept DatabaseError as e:\n print(f\"Database error: {e}\")\nexcept StorageError as e:\n print(f\"Storage error: {e}\")\nfinally:\n db.close_connection()\n```\n\n## Testing\n\n```bash\n# Run tests\npytest tests/\n\n# With coverage\npytest --cov=ff_storage tests/\n\n# Run specific test file\npytest tests/test_postgres.py\n\n# Run with verbose output\npytest -v tests/\n```\n\n## Configuration\n\n### Environment Variables\n```bash\n# Database\nexport DB_HOST=localhost\nexport DB_PORT=5432\nexport DB_NAME=fenix_db\nexport DB_USER=fenix\nexport DB_PASSWORD=secret\n\n# S3 Storage\nexport AWS_ACCESS_KEY_ID=your-key\nexport AWS_SECRET_ACCESS_KEY=your-secret\nexport AWS_DEFAULT_REGION=us-east-1\n\n# Local Storage\nexport STORAGE_PATH=/var/data/documents\n```\n\n### Configuration File\n```python\n# config.py\nfrom ff_storage import PostgresPool, S3ObjectStorage\n\n# Database configuration\nDATABASE = {\n \"dbname\": os.getenv(\"DB_NAME\", \"fenix_db\"),\n \"user\": os.getenv(\"DB_USER\", \"fenix\"),\n \"password\": os.getenv(\"DB_PASSWORD\"),\n \"host\": os.getenv(\"DB_HOST\", \"localhost\"),\n \"port\": int(os.getenv(\"DB_PORT\", 5432)),\n \"pool_size\": 20\n}\n\n# Storage configuration\nSTORAGE = {\n \"bucket\": os.getenv(\"S3_BUCKET\", \"fenix-documents\"),\n \"region\": os.getenv(\"AWS_DEFAULT_REGION\", \"us-east-1\")\n}\n\n# Initialize\ndb = PostgresPool(**DATABASE)\nstorage = S3ObjectStorage(**STORAGE)\n```\n\n## Contributing\n\nContributions are welcome! Please feel free to submit a Pull Request.\n\n## License\n\nMIT License - See [LICENSE](LICENSE) file for details.\n\n## Author\n\nCreated and maintained by **Ben Moag** at **[Fenixflow](https://fenixflow.com)**\n\nFor more information, visit the [GitLab repository](https://gitlab.com/fenixflow/fenix-packages).\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Fenixflow storage package for database and file operations with async connection pools",
"version": "1.0.0",
"project_urls": {
"Bug Tracker": "https://gitlab.com/fenixflow/fenix-packages/-/issues",
"Changelog": "https://gitlab.com/fenixflow/fenix-packages/-/blob/main/ff-storage/CHANGELOG.md",
"Documentation": "https://gitlab.com/fenixflow/fenix-packages/-/tree/main/ff-storage",
"Homepage": "https://fenixflow.com",
"Repository": "https://gitlab.com/fenixflow/fenix-packages"
},
"split_keywords": [
"storage",
" database",
" postgresql",
" mysql",
" sqlserver",
" s3",
" object-storage",
" file-storage",
" migrations",
" fenixflow"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "a615096dfad9b6d82a563d4e6e646a2cdaad1ccc0759a1d93cb3a5b99849c8a8",
"md5": "8875bcbffb5d2f70f3ddc5a65ba40855",
"sha256": "7af7b17253541859360bc0ef19ba45224bc04764a3f7cf5739312f1a229f5eac"
},
"downloads": -1,
"filename": "ff_storage-1.0.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "8875bcbffb5d2f70f3ddc5a65ba40855",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.10",
"size": 34743,
"upload_time": "2025-10-06T19:41:10",
"upload_time_iso_8601": "2025-10-06T19:41:10.127229Z",
"url": "https://files.pythonhosted.org/packages/a6/15/096dfad9b6d82a563d4e6e646a2cdaad1ccc0759a1d93cb3a5b99849c8a8/ff_storage-1.0.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "b34cca50a138fd78aad4fe44bea45a5f56d685f01bf8a19da848dd80c3619112",
"md5": "91c6c6200a8bca1fb9e0a79747995b55",
"sha256": "bb115a0922a8220b4e2dd2981241b0900af53ad47b54a052332581b35d120465"
},
"downloads": -1,
"filename": "ff_storage-1.0.0.tar.gz",
"has_sig": false,
"md5_digest": "91c6c6200a8bca1fb9e0a79747995b55",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.10",
"size": 38132,
"upload_time": "2025-10-06T19:41:11",
"upload_time_iso_8601": "2025-10-06T19:41:11.099409Z",
"url": "https://files.pythonhosted.org/packages/b3/4c/ca50a138fd78aad4fe44bea45a5f56d685f01bf8a19da848dd80c3619112/ff_storage-1.0.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-10-06 19:41:11",
"github": false,
"gitlab": true,
"bitbucket": false,
"codeberg": false,
"gitlab_user": "fenixflow",
"gitlab_project": "fenix-packages",
"lcname": "ff-storage"
}