dataknobs-data


Namedataknobs-data JSON
Version 0.1.0 PyPI version JSON
download
home_pageNone
SummaryUnified data abstraction layer for consistent database operations across multiple storage technologies
upload_time2025-08-18 03:33:48
maintainerNone
docs_urlNone
authorNone
requires_python>=3.10
licenseNone
keywords abstraction data database records storage
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # DataKnobs Data Package

A unified data abstraction layer that provides consistent database operations across multiple storage technologies.

## Overview

The `dataknobs-data` package enables seamless data management regardless of the underlying storage mechanism, from in-memory structures to cloud storage and databases. It provides a simple, consistent API for CRUD operations, searching, and data manipulation across diverse backends.

## Features

- **Unified Interface**: Same API regardless of storage backend
- **Multiple Backends**: Memory, File (JSON/CSV/Parquet), PostgreSQL, Elasticsearch, S3
- **Record-Based**: Data represented as structured records with metadata and first-class ID support
- **Pandas Integration**: Seamless bidirectional conversion to/from DataFrames with type preservation
- **Migration Utilities**: Backend-to-backend migration, schema evolution, and data transformation
- **Schema Validation**: Comprehensive validation system with constraints and type coercion
- **Streaming Support**: Efficient streaming APIs for large datasets
- **Type Safety**: Strong typing with field validation and automatic type conversion
- **Async Support**: Both synchronous and asynchronous APIs
- **Query System**: Powerful, backend-agnostic query capabilities
- **Configuration Support**: Full integration with DataKnobs configuration system
- **Batch Operations**: Efficient bulk insert, update, and upsert operations
- **Connection Management**: Automatic connection lifecycle management
- **Extensible**: Easy to add custom storage backends, validators, and transformers

## Installation

```bash
# Basic installation
pip install dataknobs-data

# With specific backend support
pip install dataknobs-data[postgres]     # PostgreSQL support
pip install dataknobs-data[s3]          # AWS S3 support
pip install dataknobs-data[elasticsearch] # Elasticsearch support
pip install dataknobs-data[all]         # All backends
```

## Quick Start

```python
from dataknobs_data import AsyncDatabase, Record, Query, Operator

# Async usage
async def main():
    # Create and auto-connect to database
    db = await AsyncDatabase.create("memory")
    
    # Create a record
    record = Record({
        "name": "John Doe",
        "age": 30,
        "email": "john@example.com",
        "active": True
    })
    
    # CRUD operations
    id = await db.create(record)
    retrieved = await db.read(id)
    record.set_value("age", 31)
    await db.update(id, record)
    await db.delete(id)
    
    # Search with queries
    query = (Query()
        .filter("age", Operator.GTE, 25)
        .filter("active", Operator.EQ, True)
        .sort("name")
        .limit(10))
    
    results = await db.search(query)
    for record in results:
        print(f"{record.get_value('name')}: {record.get_value('age')}")
    
    await db.close()

# Synchronous usage
from dataknobs_data import SyncDatabase

db = SyncDatabase.create("memory")
record = Record({"name": "Jane Doe", "age": 28})
id = db.create(record)
retrieved = db.read(id)
db.close()
```

## Backend Configuration

### File Backend
```python
db = await Database.create("file", {
    "path": "/data/records.json",
    "pretty": True,
    "backup": True
})
```

### PostgreSQL Backend
```python
db = await Database.create("postgres", {
    "host": "localhost",
    "database": "mydb",
    "user": "user",
    "password": "pass",
    "table": "records",
    "schema": "public"
})
```

### S3 Backend
```python
db = await Database.create("s3", {
    "bucket": "my-bucket",
    "prefix": "records/",
    "region": "us-west-2",
    "aws_access_key_id": "key",
    "aws_secret_access_key": "secret"
})
```

### Elasticsearch Backend
```python
db = await Database.create("elasticsearch", {
    "host": "localhost",
    "port": 9200,
    "index": "records",
    "refresh": True
})
```

## Configuration Support

The data package fully integrates with the DataKnobs configuration system. All backends inherit from `ConfigurableBase` and can be instantiated from configuration files.

### Using Configuration Files

```yaml
# config.yaml
databases:
  - name: primary
    class: dataknobs_data.backends.postgres.PostgresDatabase
    host: ${DB_HOST:localhost}  # Environment variable with default
    port: ${DB_PORT:5432}
    database: myapp
    user: ${DB_USER:postgres}
    password: ${DB_PASSWORD}
    table: records
    
  - name: cache
    class: dataknobs_data.backends.memory.MemoryDatabase
    
  - name: archive
    class: dataknobs_data.backends.file.SyncFileDatabase
    path: /data/archive.json
    format: json
    compression: gzip
    
  - name: cloud_storage
    class: dataknobs_data.backends.s3.S3Database
    bucket: ${S3_BUCKET:my-data-bucket}
    prefix: ${S3_PREFIX:records/}
    region: ${AWS_REGION:us-east-1}
    endpoint_url: ${S3_ENDPOINT}  # Optional, for LocalStack/MinIO
```

### Loading from Configuration

```python
from dataknobs_config import Config
from dataknobs_data import Record, Query

# Load configuration
config = Config("config.yaml")

# Create database instances from config
primary_db = config.get_instance("databases", "primary")
cache_db = config.get_instance("databases", "cache")
archive_db = config.get_instance("databases", "archive")

# Use the databases normally
record = Record({"name": "test", "value": 42})
record_id = primary_db.create(record)

# Cache frequently accessed data
cache_db.create(record)

# Archive old records
archive_db.create(record)
```

### Direct Configuration

```python
from dataknobs_data.backends.postgres import PostgresDatabase

# All backends support from_config classmethod
db = PostgresDatabase.from_config({
    "host": "localhost",
    "database": "myapp",
    "user": "postgres",
    "password": "secret"
})
```

## Backend Factory

The data package provides a factory pattern for dynamic backend selection:

### Using the Factory Directly

```python
from dataknobs_data import DatabaseFactory

factory = DatabaseFactory()

# Create different backends
memory_db = factory.create(backend="memory")
file_db = factory.create(backend="file", path="data.json", format="json")
s3_db = factory.create(backend="s3", bucket="my-bucket", prefix="data/")
```

### Factory with Configuration

```python
from dataknobs_config import Config
from dataknobs_data import database_factory

# Register factory for cleaner configs
config = Config()
config.register_factory("database", database_factory)

# Use registered factory in configuration
config.load({
    "databases": [{
        "name": "main",
        "factory": "database",  # Uses registered factory
        "backend": "postgres",
        "host": "localhost",
        "database": "myapp"
    }]
})

db = config.get_instance("databases", "main")
```

### Factory Configuration Examples

```yaml
# Using registered factory (cleaner)
databases:
  - name: main
    factory: database
    backend: ${DB_BACKEND:postgres}
    host: ${DB_HOST:localhost}
    
# Using module path (no registration needed)
databases:
  - name: main
    factory: dataknobs_data.factory.database_factory
    backend: postgres
    host: localhost
```

## Pandas Integration

The data package provides comprehensive pandas integration for data analysis workflows:

```python
import pandas as pd
from dataknobs_data.pandas import DataFrameConverter, BatchOperations

# Convert records to DataFrame with type preservation
converter = DataFrameConverter()
df = converter.records_to_dataframe(records, preserve_types=True)

# Perform pandas operations
df_filtered = df[df['age'] > 25]
df_aggregated = df.groupby('category').agg({'price': 'mean'})

# Convert back to records
new_records = converter.dataframe_to_records(df_filtered)

# Bulk operations with DataFrames
batch_ops = BatchOperations(database)
result = batch_ops.bulk_insert_dataframe(df, batch_size=1000)
print(f"Inserted {result.successful} records")

# Upsert from DataFrame
result = batch_ops.bulk_upsert_dataframe(
    df, 
    id_column="user_id",
    merge_strategy="update"
)
```

## Schema Validation

Define and enforce data schemas with comprehensive validation:

```python
from dataknobs_data.validation import Schema, FieldType
from dataknobs_data.validation.constraints import *

# Define schema with constraints
user_schema = Schema("UserSchema")
user_schema.field("email", FieldType.STRING, 
    required=True,
    constraints=[Pattern(r"^.+@.+\..+$"), Unique()])
user_schema.field("age", FieldType.INTEGER,
    constraints=[Range(min=0, max=150)])
user_schema.field("status", FieldType.STRING,
    default="active",
    constraints=[Enum(["active", "inactive", "suspended"])])

# Validate records
result = user_schema.validate(record)
if not result.valid:
    for error in result.errors:
        print(error)

# Automatic type coercion
record = Record({"age": "30"})  # String value
result = user_schema.validate(record, coerce=True)  # Converts to int
if result.valid:
    print(record.get_value("age"))  # 30 (as integer)
```

## Data Migration

Migrate data between backends with transformation support:

```python
from dataknobs_data.migration import Migration, Migrator
from dataknobs_data.migration.operations import *

# Define migration
migration = Migration("upgrade_schema", "2.0.0")
migration.add_operation(AddField("created_at", default=datetime.now()))
migration.add_operation(RenameField("user_name", "username"))
migration.add_operation(TransformField("email", lambda x: x.lower()))

# Migrate between backends
async def migrate_data():
    source_db = await Database.create("postgres", postgres_config)
    target_db = await Database.create("s3", s3_config)
    
    migrator = Migrator(source_db, target_db)
    
    # Run migration with progress tracking
    progress = await migrator.migrate(
        migration=migration,
        batch_size=1000,
        on_progress=lambda p: print(f"Progress: {p.percentage:.1f}%")
    )
    
    print(f"Migrated: {progress.successful} records")
    print(f"Failed: {progress.failed} records")
    print(f"Duration: {progress.duration}s")
    
    await source_db.close()
    await target_db.close()
```

## Advanced Queries

```python
# Complex query with multiple filters
query = (Query()
    .filter("status", Operator.IN, ["active", "pending"])
    .filter("created_at", Operator.GTE, "2024-01-01")
    .filter("name", Operator.LIKE, "John%")
    .sort("priority", SortOrder.DESC)
    .sort("created_at", SortOrder.ASC)
    .offset(20)
    .limit(10)
    .select(["name", "email", "status"]))  # Select specific fields

results = await db.search(query)
```

## Streaming Support

```python
from dataknobs_data import StreamConfig

# Stream large datasets efficiently
config = StreamConfig(
    batch_size=100,
    buffer_size=1000
)

# Stream read
async for record in db.stream_read(query, config):
    # Process each record without loading all into memory
    process_record(record)

# Stream write
result = await db.stream_write(record_generator(), config)
print(f"Streamed {result.total_processed} records")
```


## Documentation

For complete API documentation, see [API Reference](docs/API_REFERENCE.md).

## Custom Backend

```python
from dataknobs_data import AsyncDatabase, DatabaseBackend

class CustomBackend(DatabaseBackend):
    def create(self, record):
        # Implementation
        pass
    
    def read(self, record_id):
        # Implementation
        pass
    
    # ... other methods

# Register custom backend
AsyncDatabase.register_backend("custom", CustomBackend)

# Use custom backend
db = AsyncDatabase.create("custom", config)
```

## Development

```bash
# Install development dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Run tests with coverage
pytest --cov=dataknobs_data

# Type checking
mypy src/dataknobs_data

# Linting
ruff check src/dataknobs_data

# Format code
black src/dataknobs_data
```

## Architecture

The package follows a modular architecture:

- **Records**: Data representation with fields and metadata
- **Database Interface**: Abstract base classes (AsyncDatabase/SyncDatabase) for all backends
- **Query System**: Backend-agnostic query building
- **Backends**: Implementations for different storage technologies
- **Serializers**: Type conversion and format handling
- **Utils**: Pandas integration and migration tools

## Performance

The package is designed for optimal performance:

- Connection pooling for database backends
- Batch operations for efficiency
- Lazy loading and pagination
- Caching for frequently accessed data
- Async support for concurrent operations

## Contributing

Contributions are welcome! Please see our [Contributing Guide](../../CONTRIBUTING.md) for details.

## License

This project is licensed under the MIT License - see the [LICENSE](../../LICENSE) file for details.
            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "dataknobs-data",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.10",
    "maintainer_email": null,
    "keywords": "abstraction, data, database, records, storage",
    "author": null,
    "author_email": "DataKnobs Team <team@dataknobs.com>",
    "download_url": "https://files.pythonhosted.org/packages/0b/71/72d8c02b58ddb1cda5752ea6d06c02c11414c9f269843606891464902bb2/dataknobs_data-0.1.0.tar.gz",
    "platform": null,
    "description": "# DataKnobs Data Package\n\nA unified data abstraction layer that provides consistent database operations across multiple storage technologies.\n\n## Overview\n\nThe `dataknobs-data` package enables seamless data management regardless of the underlying storage mechanism, from in-memory structures to cloud storage and databases. It provides a simple, consistent API for CRUD operations, searching, and data manipulation across diverse backends.\n\n## Features\n\n- **Unified Interface**: Same API regardless of storage backend\n- **Multiple Backends**: Memory, File (JSON/CSV/Parquet), PostgreSQL, Elasticsearch, S3\n- **Record-Based**: Data represented as structured records with metadata and first-class ID support\n- **Pandas Integration**: Seamless bidirectional conversion to/from DataFrames with type preservation\n- **Migration Utilities**: Backend-to-backend migration, schema evolution, and data transformation\n- **Schema Validation**: Comprehensive validation system with constraints and type coercion\n- **Streaming Support**: Efficient streaming APIs for large datasets\n- **Type Safety**: Strong typing with field validation and automatic type conversion\n- **Async Support**: Both synchronous and asynchronous APIs\n- **Query System**: Powerful, backend-agnostic query capabilities\n- **Configuration Support**: Full integration with DataKnobs configuration system\n- **Batch Operations**: Efficient bulk insert, update, and upsert operations\n- **Connection Management**: Automatic connection lifecycle management\n- **Extensible**: Easy to add custom storage backends, validators, and transformers\n\n## Installation\n\n```bash\n# Basic installation\npip install dataknobs-data\n\n# With specific backend support\npip install dataknobs-data[postgres]     # PostgreSQL support\npip install dataknobs-data[s3]          # AWS S3 support\npip install dataknobs-data[elasticsearch] # Elasticsearch support\npip install dataknobs-data[all]         # All backends\n```\n\n## Quick Start\n\n```python\nfrom dataknobs_data import AsyncDatabase, Record, Query, Operator\n\n# Async usage\nasync def main():\n    # Create and auto-connect to database\n    db = await AsyncDatabase.create(\"memory\")\n    \n    # Create a record\n    record = Record({\n        \"name\": \"John Doe\",\n        \"age\": 30,\n        \"email\": \"john@example.com\",\n        \"active\": True\n    })\n    \n    # CRUD operations\n    id = await db.create(record)\n    retrieved = await db.read(id)\n    record.set_value(\"age\", 31)\n    await db.update(id, record)\n    await db.delete(id)\n    \n    # Search with queries\n    query = (Query()\n        .filter(\"age\", Operator.GTE, 25)\n        .filter(\"active\", Operator.EQ, True)\n        .sort(\"name\")\n        .limit(10))\n    \n    results = await db.search(query)\n    for record in results:\n        print(f\"{record.get_value('name')}: {record.get_value('age')}\")\n    \n    await db.close()\n\n# Synchronous usage\nfrom dataknobs_data import SyncDatabase\n\ndb = SyncDatabase.create(\"memory\")\nrecord = Record({\"name\": \"Jane Doe\", \"age\": 28})\nid = db.create(record)\nretrieved = db.read(id)\ndb.close()\n```\n\n## Backend Configuration\n\n### File Backend\n```python\ndb = await Database.create(\"file\", {\n    \"path\": \"/data/records.json\",\n    \"pretty\": True,\n    \"backup\": True\n})\n```\n\n### PostgreSQL Backend\n```python\ndb = await Database.create(\"postgres\", {\n    \"host\": \"localhost\",\n    \"database\": \"mydb\",\n    \"user\": \"user\",\n    \"password\": \"pass\",\n    \"table\": \"records\",\n    \"schema\": \"public\"\n})\n```\n\n### S3 Backend\n```python\ndb = await Database.create(\"s3\", {\n    \"bucket\": \"my-bucket\",\n    \"prefix\": \"records/\",\n    \"region\": \"us-west-2\",\n    \"aws_access_key_id\": \"key\",\n    \"aws_secret_access_key\": \"secret\"\n})\n```\n\n### Elasticsearch Backend\n```python\ndb = await Database.create(\"elasticsearch\", {\n    \"host\": \"localhost\",\n    \"port\": 9200,\n    \"index\": \"records\",\n    \"refresh\": True\n})\n```\n\n## Configuration Support\n\nThe data package fully integrates with the DataKnobs configuration system. All backends inherit from `ConfigurableBase` and can be instantiated from configuration files.\n\n### Using Configuration Files\n\n```yaml\n# config.yaml\ndatabases:\n  - name: primary\n    class: dataknobs_data.backends.postgres.PostgresDatabase\n    host: ${DB_HOST:localhost}  # Environment variable with default\n    port: ${DB_PORT:5432}\n    database: myapp\n    user: ${DB_USER:postgres}\n    password: ${DB_PASSWORD}\n    table: records\n    \n  - name: cache\n    class: dataknobs_data.backends.memory.MemoryDatabase\n    \n  - name: archive\n    class: dataknobs_data.backends.file.SyncFileDatabase\n    path: /data/archive.json\n    format: json\n    compression: gzip\n    \n  - name: cloud_storage\n    class: dataknobs_data.backends.s3.S3Database\n    bucket: ${S3_BUCKET:my-data-bucket}\n    prefix: ${S3_PREFIX:records/}\n    region: ${AWS_REGION:us-east-1}\n    endpoint_url: ${S3_ENDPOINT}  # Optional, for LocalStack/MinIO\n```\n\n### Loading from Configuration\n\n```python\nfrom dataknobs_config import Config\nfrom dataknobs_data import Record, Query\n\n# Load configuration\nconfig = Config(\"config.yaml\")\n\n# Create database instances from config\nprimary_db = config.get_instance(\"databases\", \"primary\")\ncache_db = config.get_instance(\"databases\", \"cache\")\narchive_db = config.get_instance(\"databases\", \"archive\")\n\n# Use the databases normally\nrecord = Record({\"name\": \"test\", \"value\": 42})\nrecord_id = primary_db.create(record)\n\n# Cache frequently accessed data\ncache_db.create(record)\n\n# Archive old records\narchive_db.create(record)\n```\n\n### Direct Configuration\n\n```python\nfrom dataknobs_data.backends.postgres import PostgresDatabase\n\n# All backends support from_config classmethod\ndb = PostgresDatabase.from_config({\n    \"host\": \"localhost\",\n    \"database\": \"myapp\",\n    \"user\": \"postgres\",\n    \"password\": \"secret\"\n})\n```\n\n## Backend Factory\n\nThe data package provides a factory pattern for dynamic backend selection:\n\n### Using the Factory Directly\n\n```python\nfrom dataknobs_data import DatabaseFactory\n\nfactory = DatabaseFactory()\n\n# Create different backends\nmemory_db = factory.create(backend=\"memory\")\nfile_db = factory.create(backend=\"file\", path=\"data.json\", format=\"json\")\ns3_db = factory.create(backend=\"s3\", bucket=\"my-bucket\", prefix=\"data/\")\n```\n\n### Factory with Configuration\n\n```python\nfrom dataknobs_config import Config\nfrom dataknobs_data import database_factory\n\n# Register factory for cleaner configs\nconfig = Config()\nconfig.register_factory(\"database\", database_factory)\n\n# Use registered factory in configuration\nconfig.load({\n    \"databases\": [{\n        \"name\": \"main\",\n        \"factory\": \"database\",  # Uses registered factory\n        \"backend\": \"postgres\",\n        \"host\": \"localhost\",\n        \"database\": \"myapp\"\n    }]\n})\n\ndb = config.get_instance(\"databases\", \"main\")\n```\n\n### Factory Configuration Examples\n\n```yaml\n# Using registered factory (cleaner)\ndatabases:\n  - name: main\n    factory: database\n    backend: ${DB_BACKEND:postgres}\n    host: ${DB_HOST:localhost}\n    \n# Using module path (no registration needed)\ndatabases:\n  - name: main\n    factory: dataknobs_data.factory.database_factory\n    backend: postgres\n    host: localhost\n```\n\n## Pandas Integration\n\nThe data package provides comprehensive pandas integration for data analysis workflows:\n\n```python\nimport pandas as pd\nfrom dataknobs_data.pandas import DataFrameConverter, BatchOperations\n\n# Convert records to DataFrame with type preservation\nconverter = DataFrameConverter()\ndf = converter.records_to_dataframe(records, preserve_types=True)\n\n# Perform pandas operations\ndf_filtered = df[df['age'] > 25]\ndf_aggregated = df.groupby('category').agg({'price': 'mean'})\n\n# Convert back to records\nnew_records = converter.dataframe_to_records(df_filtered)\n\n# Bulk operations with DataFrames\nbatch_ops = BatchOperations(database)\nresult = batch_ops.bulk_insert_dataframe(df, batch_size=1000)\nprint(f\"Inserted {result.successful} records\")\n\n# Upsert from DataFrame\nresult = batch_ops.bulk_upsert_dataframe(\n    df, \n    id_column=\"user_id\",\n    merge_strategy=\"update\"\n)\n```\n\n## Schema Validation\n\nDefine and enforce data schemas with comprehensive validation:\n\n```python\nfrom dataknobs_data.validation import Schema, FieldType\nfrom dataknobs_data.validation.constraints import *\n\n# Define schema with constraints\nuser_schema = Schema(\"UserSchema\")\nuser_schema.field(\"email\", FieldType.STRING, \n    required=True,\n    constraints=[Pattern(r\"^.+@.+\\..+$\"), Unique()])\nuser_schema.field(\"age\", FieldType.INTEGER,\n    constraints=[Range(min=0, max=150)])\nuser_schema.field(\"status\", FieldType.STRING,\n    default=\"active\",\n    constraints=[Enum([\"active\", \"inactive\", \"suspended\"])])\n\n# Validate records\nresult = user_schema.validate(record)\nif not result.valid:\n    for error in result.errors:\n        print(error)\n\n# Automatic type coercion\nrecord = Record({\"age\": \"30\"})  # String value\nresult = user_schema.validate(record, coerce=True)  # Converts to int\nif result.valid:\n    print(record.get_value(\"age\"))  # 30 (as integer)\n```\n\n## Data Migration\n\nMigrate data between backends with transformation support:\n\n```python\nfrom dataknobs_data.migration import Migration, Migrator\nfrom dataknobs_data.migration.operations import *\n\n# Define migration\nmigration = Migration(\"upgrade_schema\", \"2.0.0\")\nmigration.add_operation(AddField(\"created_at\", default=datetime.now()))\nmigration.add_operation(RenameField(\"user_name\", \"username\"))\nmigration.add_operation(TransformField(\"email\", lambda x: x.lower()))\n\n# Migrate between backends\nasync def migrate_data():\n    source_db = await Database.create(\"postgres\", postgres_config)\n    target_db = await Database.create(\"s3\", s3_config)\n    \n    migrator = Migrator(source_db, target_db)\n    \n    # Run migration with progress tracking\n    progress = await migrator.migrate(\n        migration=migration,\n        batch_size=1000,\n        on_progress=lambda p: print(f\"Progress: {p.percentage:.1f}%\")\n    )\n    \n    print(f\"Migrated: {progress.successful} records\")\n    print(f\"Failed: {progress.failed} records\")\n    print(f\"Duration: {progress.duration}s\")\n    \n    await source_db.close()\n    await target_db.close()\n```\n\n## Advanced Queries\n\n```python\n# Complex query with multiple filters\nquery = (Query()\n    .filter(\"status\", Operator.IN, [\"active\", \"pending\"])\n    .filter(\"created_at\", Operator.GTE, \"2024-01-01\")\n    .filter(\"name\", Operator.LIKE, \"John%\")\n    .sort(\"priority\", SortOrder.DESC)\n    .sort(\"created_at\", SortOrder.ASC)\n    .offset(20)\n    .limit(10)\n    .select([\"name\", \"email\", \"status\"]))  # Select specific fields\n\nresults = await db.search(query)\n```\n\n## Streaming Support\n\n```python\nfrom dataknobs_data import StreamConfig\n\n# Stream large datasets efficiently\nconfig = StreamConfig(\n    batch_size=100,\n    buffer_size=1000\n)\n\n# Stream read\nasync for record in db.stream_read(query, config):\n    # Process each record without loading all into memory\n    process_record(record)\n\n# Stream write\nresult = await db.stream_write(record_generator(), config)\nprint(f\"Streamed {result.total_processed} records\")\n```\n\n\n## Documentation\n\nFor complete API documentation, see [API Reference](docs/API_REFERENCE.md).\n\n## Custom Backend\n\n```python\nfrom dataknobs_data import AsyncDatabase, DatabaseBackend\n\nclass CustomBackend(DatabaseBackend):\n    def create(self, record):\n        # Implementation\n        pass\n    \n    def read(self, record_id):\n        # Implementation\n        pass\n    \n    # ... other methods\n\n# Register custom backend\nAsyncDatabase.register_backend(\"custom\", CustomBackend)\n\n# Use custom backend\ndb = AsyncDatabase.create(\"custom\", config)\n```\n\n## Development\n\n```bash\n# Install development dependencies\npip install -e \".[dev]\"\n\n# Run tests\npytest\n\n# Run tests with coverage\npytest --cov=dataknobs_data\n\n# Type checking\nmypy src/dataknobs_data\n\n# Linting\nruff check src/dataknobs_data\n\n# Format code\nblack src/dataknobs_data\n```\n\n## Architecture\n\nThe package follows a modular architecture:\n\n- **Records**: Data representation with fields and metadata\n- **Database Interface**: Abstract base classes (AsyncDatabase/SyncDatabase) for all backends\n- **Query System**: Backend-agnostic query building\n- **Backends**: Implementations for different storage technologies\n- **Serializers**: Type conversion and format handling\n- **Utils**: Pandas integration and migration tools\n\n## Performance\n\nThe package is designed for optimal performance:\n\n- Connection pooling for database backends\n- Batch operations for efficiency\n- Lazy loading and pagination\n- Caching for frequently accessed data\n- Async support for concurrent operations\n\n## Contributing\n\nContributions are welcome! Please see our [Contributing Guide](../../CONTRIBUTING.md) for details.\n\n## License\n\nThis project is licensed under the MIT License - see the [LICENSE](../../LICENSE) file for details.",
    "bugtrack_url": null,
    "license": null,
    "summary": "Unified data abstraction layer for consistent database operations across multiple storage technologies",
    "version": "0.1.0",
    "project_urls": {
        "Bug Tracker": "https://github.com/dataknobs/dataknobs/issues",
        "Documentation": "https://dataknobs.readthedocs.io",
        "Homepage": "https://github.com/dataknobs/dataknobs"
    },
    "split_keywords": [
        "abstraction",
        " data",
        " database",
        " records",
        " storage"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "fa2698bbf553fad8b4873f41689c48f7300fedf88dfe6841126947dee827602b",
                "md5": "181630515a8dbbd83b6b20fb125e743c",
                "sha256": "9d546b4e1f8120133f84b175a3da80d4a7cf099ca99c1d00128eee6b1e99f0aa"
            },
            "downloads": -1,
            "filename": "dataknobs_data-0.1.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "181630515a8dbbd83b6b20fb125e743c",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.10",
            "size": 102714,
            "upload_time": "2025-08-18T03:33:46",
            "upload_time_iso_8601": "2025-08-18T03:33:46.723945Z",
            "url": "https://files.pythonhosted.org/packages/fa/26/98bbf553fad8b4873f41689c48f7300fedf88dfe6841126947dee827602b/dataknobs_data-0.1.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "0b7172d8c02b58ddb1cda5752ea6d06c02c11414c9f269843606891464902bb2",
                "md5": "6040fa4b1e4fd08edee2bbb05aace7a7",
                "sha256": "59efe7998a10e941f2d4d6ec21b88c240bd75363c0d9996231e9af56041f6635"
            },
            "downloads": -1,
            "filename": "dataknobs_data-0.1.0.tar.gz",
            "has_sig": false,
            "md5_digest": "6040fa4b1e4fd08edee2bbb05aace7a7",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.10",
            "size": 752395,
            "upload_time": "2025-08-18T03:33:48",
            "upload_time_iso_8601": "2025-08-18T03:33:48.308609Z",
            "url": "https://files.pythonhosted.org/packages/0b/71/72d8c02b58ddb1cda5752ea6d06c02c11414c9f269843606891464902bb2/dataknobs_data-0.1.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-08-18 03:33:48",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "dataknobs",
    "github_project": "dataknobs",
    "github_not_found": true,
    "lcname": "dataknobs-data"
}
        
Elapsed time: 1.95497s