# π daplug-ddb
> **Schema-Driven DynamoDB Normalization & Event Publishing for Python**
[](https://circleci.com/gh/dual/daplug-ddb)
[](https://sonarcloud.io/summary/new_code?id=dual_daplug-ddb)
[](https://sonarcloud.io/summary/new_code?id=dual_daplug-ddb)
[](https://sonarcloud.io/summary/new_code?id=dual_daplug-ddb)
[](https://www.python.org/downloads/)
[](https://pypi.org/project/daplug-ddb/)
[](LICENSE)
[](https://github.com/paulcruse3/daplug-ddb/issues)
`daplug-ddb` is a lightweight package that provides schema-aware CRUD helpers, batch utilities, and optional SNS publishing so you can treat DynamoDB as a structured datastore without rewriting boilerplate for every project.
## β¨ Key Features
- **Schema Mapping** β Convert inbound payloads into strongly typed DynamoDB
items driven by your OpenAPI (or JSON schema) definitions.
- **Idempotent CRUD** β Consistent `create`, `overwrite`, `update`, `delete`,
and `read` operations with optional optimistic locking via an
`idempotence_key`.
- **Batch Helpers** β Simplified batch insert/delete flows that validate data
and handle chunking for you.
- **SNS Integration** β Optional event publishing for every write operation so
downstream systems stay in sync.
## π Quick Start
### Installation
```bash
pip install daplug-ddb
# pipenv install daplug-ddb
# poetry add daplug-ddb
# uv pip install daplug-ddb
```
### Basic Usage
```python
import daplug_ddb
adapter = daplug_ddb.adapter(
table="example-table",
endpoint="https://dynamodb.us-east-2.amazonaws.com", # optional, will use AWS conventional env vars if using on lambda
schema_file="openapi.yml",
identifier="record_id",
idempotence_key="modified",
)
item = adapter.create(
data={
"record_id": "abc123",
"object_key": {"string_key": "value"},
"array_number": [1, 2, 3],
"modified": "2024-01-01",
},
schema="ExampleModel",
)
print(item)
```
Because the adapter is configured with a `schema_file`, every call can opt into
mapping by supplying `schema`. Skip the schema argument when you want to write
the data exactly as provided.
## π§ Advanced Configuration
### Selective Updates
```python
# Merge partial updates while preserving existing attributes
adapter.update(
operation="get", # fetch original item via get; use "query" for indexes
query={
"Key": {"record_id": "abc123", "sort_key": "v1"}
},
data={
"record_id": "abc123",
"sort_key": "v1",
"array_number": [1, 2, 3, 4],
},
update_list_operation="replace",
)
```
### Hash/Range Prefixing
```python
adapter = daplug_ddb.adapter(
table="tenant-config",
endpoint="https://dynamodb.us-east-2.amazonaws.com",
schema_file="openapi.yml",
identifier="tenant_id",
)
prefix_args = {
"hash_key": "tenant_id",
"hash_prefix": "tenant#",
"range_key": "sort_key",
"range_prefix": "config#",
}
item = adapter.create(
data={
"tenant_id": "abc",
"sort_key": "default",
"modified": "2024-01-01",
},
schema="TenantModel",
**prefix_args,
)
# DynamoDB stores tenant_id as "tenant#abc", but the adapter returns "abc"
```
When prefixes are provided, the adapter automatically applies them on the way
into DynamoDB (including batch operations and deletes) and removes them before
returning data or publishing SNS events. Pass the same `prefix_args` to reads
(`get`, `query`, `scan`) so query keys are expanded and responses are cleaned.
### Batched Writes
```python
adapter.batch_insert(
data=[
{"record_id": str(idx), "sort_key": str(idx)}
for idx in range(100)
],
batch_size=25,
)
adapter.batch_delete(
data=[
{"record_id": str(idx), "sort_key": str(idx)}
for idx in range(100)
]
)
```
### Idempotent Operations
```python
adapter = daplug_ddb.adapter(
table="orders",
endpoint="https://dynamodb.us-east-2.amazonaws.com",
schema_file="openapi.yml",
identifier="order_id",
idempotence_key="modified",
)
updated = adapter.update(
data={"order_id": "abc123", "modified": "2024-02-01"},
operation="get",
query={"Key": {"order_id": "abc123"}},
schema="OrderModel",
)
```
The adapter fetches the current item, merges the update, and executes a
conditional `PutItem` to ensure the stored `modified` value still matches
what was read. If another writer changes the record first, the operation
fails with a conditional check error rather than overwriting the data.
Set `raise_idempotence_error=True` if you prefer the adapter to raise a
`ValueError` instead of relying on DynamoDB's conditional failure. Leaving it
at the default (`False`) allows you to detect conflicts without breaking the
update flow.
```python
adapter = daplug_ddb.adapter(
table="orders",
schema_file="openapi.yml",
identifier="order_id",
idempotence_key="modified",
raise_idempotence_error=True,
)
```
Enable `idempotence_use_latest=True` when you want the adapter to keep the
most recent copy based on the timestamp stored in the idempotence key. Stale
updates are ignored automatically.
```python
adapter = daplug_ddb.adapter(
table="orders",
schema_file="openapi.yml",
identifier="order_id",
idempotence_key="modified",
idempotence_use_latest=True,
)
````
Stale updates are short-circuited before DynamoDB writes occur.
```txt
Client Update Request
β
βΌ
[Adapter.fetch]
β (reads original item)
βΌ
ββββββββββββββββββββββββββββ
β Original Item β
β modified = "2024-01-01" β
ββββββββββββββββββββββββββββ
β merge + map
βΌ
PutItem rejected β original returned
```
```txt
Client Update Request
β
βΌ
[Adapter.fetch]
β (reads original item)
βΌ
ββββββββββββββββββββββββββββ
β Original Item β
β idempotence_key = "v1" β
ββββββββββββββββββββββββββββ
β merge + map
βΌ
PutItem(Item=β¦, ConditionExpression=Attr(idempotence_key).eq("v1"))
β
ββββββ΄ββββββββ
β β
βΌ βΌ
Success ConditionalCheckFailed
(another writer changed key)
```
### SNS Publishing
### Per-call SNS Attributes
You can supply request-scoped SNS message attributes by passing 'sns_attributes'
into any adapter operation (e.g. 'create', 'update', 'delete'). These merge
with adapter defaults and schema-derived metadata.
```python
adapter = daplug_ddb.adapter(
table="audit-table",
schema_file="openapi.yml",
identifier="audit_id",
idempotence_key="version",
sns_arn="arn:aws:sns:us-east-2:123456789012:audit-events",
sns_endpoint="https://sns.us-east-2.amazonaws.com",
sns_attributes={"source": "daplug"},
)
adapter.create(
data=item,
schema="AuditModel",
sns_attributes={"source": "billing", "priority": "high"},
)
# => publishes a formatted SNS event with schema metadata
```
## π Method Reference
Each adapter instance holds shared configuration such as `schema_file`, SNS
defaults, and optional key prefixes. Pass the schema name (and any
operation-specific overrides) when you invoke a method.
```python
adapter = daplug_ddb.adapter(
table="orders",
schema_file="openapi.yml",
identifier="order_id",
idempotence_key="modified",
)
```
### `create` (wrapper around `insert`/`overwrite`)
```python
# default: behaves like insert with idempotence protection
adapter.create(data=payload, schema="OrderModel")
# explicit overwrite (upsert semantics)
adapter.create(
operation="overwrite",
data=payload,
schema="OrderModel",
)
```
### `insert`
```python
adapter.insert(data=payload, schema="OrderModel")
```
### `overwrite`
```python
adapter.overwrite(data=payload, schema="OrderModel")
```
### `get`
```python
adapter.get(
query={"Key": {"order_id": "abc123"}},
schema="OrderModel",
)
```
### `query`
```python
adapter.query(
query={
"IndexName": "test_query_id",
"KeyConditionExpression": "test_query_id = :id",
"ExpressionAttributeValues": {":id": "def345"},
},
schema="OrderModel",
)
```
### `scan`
```python
adapter.scan(schema="OrderModel")
# raw DynamoDB response
adapter.scan(raw_scan=True)
```
### `read`
`read` delegates to `get`, `query`, or `scan` based on the
`operation` kwarg.
```python
# single item
adapter.read(operation="get", query={"Key": {"order_id": "abc123"}}, schema="OrderModel")
# query
adapter.read(
operation="query",
query={
"KeyConditionExpression": "test_query_id = :id",
"ExpressionAttributeValues": {":id": "def345"},
},
schema="OrderModel",
)
```
### `update`
```python
adapter.update(
data={"order_id": "abc123", "modified": "2024-03-02"},
operation="get",
query={"Key": {"order_id": "abc123"}},
schema="OrderModel",
)
```
### `delete`
```python
adapter.delete(query={"Key": {"order_id": "abc123"}})
```
### `batch_insert`
```python
adapter.batch_insert(data=[{...} for _ in range(10)], schema="OrderModel", batch_size=25)
```
### `batch_delete`
```python
adapter.batch_delete(data=[{...} for _ in range(10)], batch_size=25)
```
### Prefixing Helpers
Include per-call prefix overrides whenever you need to scope keys.
```python
adapter.insert(
data=payload,
schema="OrderModel",
hash_key="order_id",
hash_prefix="tenant#",
)
```
## π§ͺ Local Development
### Prerequisites
- Python **3.9+**
- [Pipenv](https://pipenv.pypa.io/)
- Docker (for running DynamoDB Local during tests)
### Environment Setup
```bash
git clone https://github.com/paulcruse3/daplug-ddb.git
cd daplug-ddb
pipenv install --dev
```
### Run Tests
```bash
# unit tests (no DynamoDB required)
pipenv run test
# integration tests (spins up local DynamoDB when available)
pipenv run integrations
```
Supplying an `idempotence_key` enables optimistic concurrency for updates and overwrites. The adapter reads the original item, captures the keyβs value, and issues a `PutItem` with a `ConditionExpression` asserting the value is unchanged. If another writer updates the record first, DynamoDB returns a conditional check failure instead of silently overwriting data.
```txt
Client Update Request
β
βΌ
[Adapter.fetch]
β (reads original item)
βΌ
ββββββββββββββββββββββββββββ
β Original Item β
β idempotence_key = "v1" β
ββββββββββββββββββββββββββββ
β merge + map
βΌ
PutItem(Item=β¦, ConditionExpression=Attr(idempotence_key).eq("v1"))
β
ββββββ΄ββββββββ
β β
βΌ βΌ
Success ConditionalCheckFailed
(another writer changed key)
```
- **Optional:** Omit `idempotence_key` to mirror DynamoDBβs default βlast write winsβ behavior while still benefiting from schema normalization.
- **Safety:** When the key is configured but missing on the fetched item, the adapter raises `ValueError`, surfacing misconfigurations early.
- **Events:** SNS notifications include the idempotence metadata so downstream services can reason about version changes.
### Coverage & Linting
```bash
# generates HTML, XML, and JUnit reports under ./coverage/
pipenv run coverage
# pylint configuration aligned with the legacy project
pipenv run lint
```
## π¦ Project Structure
```txt
daplug-ddb/
βββ daplug_ddb/
βΒ Β βββ adapter.py # DynamoDB adapter implementation
βΒ Β βββ prefixer.py # DynamoDB prefixer implementation
βΒ Β βββ common/ # Shared helpers (merging, schema loading, logging)
βΒ Β βββ __init__.py # Public adapter factory & exports
βββ tests/
βΒ Β βββ integration/ # Integration suite against DynamoDB Local
βΒ Β βββ unit/ # Isolated unit tests using mocks
βΒ Β βββ openapi.yml # Sample schema used for mapping tests
βββ Pipfile # Runtime and dev dependencies
βββ setup.py # Packaging metadata
βββ README.md
```
## π€ Contributing
Contributions are welcome! Open an issue or submit a pull request if youβd like
to add new features, improve documentation, or expand test coverage.
```bash
git checkout -b feature/amazing-improvement
# make your changes
pipenv run lint
pipenv run test
pipenv run integrations
git commit -am "feat: amazing improvement"
git push origin feature/amazing-improvement
```
## π License
Apache License 2.0 β see [LICENSE](LICENSE) for full text.
---
> Built to keep DynamoDB integrations DRY, predictable, and schema-driven.
Raw data
{
"_id": null,
"home_page": "https://github.com/dual/daplug-ddb",
"name": "daplug-ddb",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": "dynamodb, aws, ddb, normalizer, schema, sns, event-driven, database, adapter, python-library",
"author": "Paul Cruse III",
"author_email": "paulcruse3@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/22/bb/2916a2e050887a28173c4af2bfba405f408c598306b3c8786dad74e6cd5a/daplug_ddb-1.0.0b7.tar.gz",
"platform": "any",
"description": "# \ud83d\udd0c daplug-ddb\n\n> **Schema-Driven DynamoDB Normalization & Event Publishing for Python**\n\n[](https://circleci.com/gh/dual/daplug-ddb)\n[](https://sonarcloud.io/summary/new_code?id=dual_daplug-ddb)\n[](https://sonarcloud.io/summary/new_code?id=dual_daplug-ddb)\n[](https://sonarcloud.io/summary/new_code?id=dual_daplug-ddb)\n[](https://www.python.org/downloads/)\n[](https://pypi.org/project/daplug-ddb/)\n[](LICENSE)\n[](https://github.com/paulcruse3/daplug-ddb/issues)\n\n`daplug-ddb` is a lightweight package that provides schema-aware CRUD helpers, batch utilities, and optional SNS publishing so you can treat DynamoDB as a structured datastore without rewriting boilerplate for every project.\n\n## \u2728 Key Features\n\n- **Schema Mapping** \u2013 Convert inbound payloads into strongly typed DynamoDB\n items driven by your OpenAPI (or JSON schema) definitions.\n- **Idempotent CRUD** \u2013 Consistent `create`, `overwrite`, `update`, `delete`,\n and `read` operations with optional optimistic locking via an\n `idempotence_key`.\n- **Batch Helpers** \u2013 Simplified batch insert/delete flows that validate data\n and handle chunking for you.\n- **SNS Integration** \u2013 Optional event publishing for every write operation so\n downstream systems stay in sync.\n\n## \ud83d\ude80 Quick Start\n\n### Installation\n\n```bash\npip install daplug-ddb\n# pipenv install daplug-ddb\n# poetry add daplug-ddb\n# uv pip install daplug-ddb\n```\n\n### Basic Usage\n\n```python\nimport daplug_ddb\n\nadapter = daplug_ddb.adapter(\n table=\"example-table\",\n endpoint=\"https://dynamodb.us-east-2.amazonaws.com\", # optional, will use AWS conventional env vars if using on lambda\n schema_file=\"openapi.yml\",\n identifier=\"record_id\",\n idempotence_key=\"modified\",\n)\n\nitem = adapter.create(\n data={\n \"record_id\": \"abc123\",\n \"object_key\": {\"string_key\": \"value\"},\n \"array_number\": [1, 2, 3],\n \"modified\": \"2024-01-01\",\n },\n schema=\"ExampleModel\",\n)\n\nprint(item)\n```\n\nBecause the adapter is configured with a `schema_file`, every call can opt into\nmapping by supplying `schema`. Skip the schema argument when you want to write\nthe data exactly as provided.\n\n## \ud83d\udd27 Advanced Configuration\n\n### Selective Updates\n\n```python\n# Merge partial updates while preserving existing attributes\nadapter.update(\n operation=\"get\", # fetch original item via get; use \"query\" for indexes\n query={\n \"Key\": {\"record_id\": \"abc123\", \"sort_key\": \"v1\"}\n },\n data={\n \"record_id\": \"abc123\",\n \"sort_key\": \"v1\",\n \"array_number\": [1, 2, 3, 4],\n },\n update_list_operation=\"replace\",\n)\n```\n\n### Hash/Range Prefixing\n\n```python\nadapter = daplug_ddb.adapter(\n table=\"tenant-config\",\n endpoint=\"https://dynamodb.us-east-2.amazonaws.com\",\n schema_file=\"openapi.yml\",\n identifier=\"tenant_id\",\n)\n\nprefix_args = {\n \"hash_key\": \"tenant_id\",\n \"hash_prefix\": \"tenant#\",\n \"range_key\": \"sort_key\",\n \"range_prefix\": \"config#\",\n}\n\nitem = adapter.create(\n data={\n \"tenant_id\": \"abc\",\n \"sort_key\": \"default\",\n \"modified\": \"2024-01-01\",\n },\n schema=\"TenantModel\",\n **prefix_args,\n)\n# DynamoDB stores tenant_id as \"tenant#abc\", but the adapter returns \"abc\"\n```\n\nWhen prefixes are provided, the adapter automatically applies them on the way\ninto DynamoDB (including batch operations and deletes) and removes them before\nreturning data or publishing SNS events. Pass the same `prefix_args` to reads\n(`get`, `query`, `scan`) so query keys are expanded and responses are cleaned.\n\n### Batched Writes\n\n```python\nadapter.batch_insert(\n data=[\n {\"record_id\": str(idx), \"sort_key\": str(idx)}\n for idx in range(100)\n ],\n batch_size=25,\n)\n\nadapter.batch_delete(\n data=[\n {\"record_id\": str(idx), \"sort_key\": str(idx)}\n for idx in range(100)\n ]\n)\n```\n\n### Idempotent Operations\n\n```python\nadapter = daplug_ddb.adapter(\n table=\"orders\",\n endpoint=\"https://dynamodb.us-east-2.amazonaws.com\",\n schema_file=\"openapi.yml\",\n identifier=\"order_id\",\n idempotence_key=\"modified\",\n)\n\nupdated = adapter.update(\n data={\"order_id\": \"abc123\", \"modified\": \"2024-02-01\"},\n operation=\"get\",\n query={\"Key\": {\"order_id\": \"abc123\"}},\n schema=\"OrderModel\",\n)\n```\n\nThe adapter fetches the current item, merges the update, and executes a\nconditional `PutItem` to ensure the stored `modified` value still matches\nwhat was read. If another writer changes the record first, the operation\nfails with a conditional check error rather than overwriting the data.\n\nSet `raise_idempotence_error=True` if you prefer the adapter to raise a\n`ValueError` instead of relying on DynamoDB's conditional failure. Leaving it\nat the default (`False`) allows you to detect conflicts without breaking the\nupdate flow.\n\n```python\nadapter = daplug_ddb.adapter(\n table=\"orders\",\n schema_file=\"openapi.yml\",\n identifier=\"order_id\",\n idempotence_key=\"modified\",\n raise_idempotence_error=True,\n)\n```\n\nEnable `idempotence_use_latest=True` when you want the adapter to keep the\nmost recent copy based on the timestamp stored in the idempotence key. Stale\nupdates are ignored automatically.\n\n```python\nadapter = daplug_ddb.adapter(\n table=\"orders\",\n schema_file=\"openapi.yml\",\n identifier=\"order_id\",\n idempotence_key=\"modified\",\n idempotence_use_latest=True,\n)\n````\n\nStale updates are short-circuited before DynamoDB writes occur.\n\n```txt\nClient Update Request\n \u2502\n \u25bc\n [Adapter.fetch]\n \u2502 (reads original item)\n \u25bc\n\u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510\n\u2502 Original Item \u2502\n\u2502 modified = \"2024-01-01\" \u2502\n\u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518\n \u2502 merge + map\n \u25bc\nPutItem rejected \u2192 original returned\n```\n\n```txt\nClient Update Request\n \u2502\n \u25bc\n [Adapter.fetch]\n \u2502 (reads original item)\n \u25bc\n\u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510\n\u2502 Original Item \u2502\n\u2502 idempotence_key = \"v1\" \u2502\n\u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518\n \u2502 merge + map\n \u25bc\nPutItem(Item=\u2026, ConditionExpression=Attr(idempotence_key).eq(\"v1\"))\n \u2502\n \u250c\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510\n \u2502 \u2502\n \u25bc \u25bc\nSuccess ConditionalCheckFailed\n (another writer changed key)\n```\n\n### SNS Publishing\n\n### Per-call SNS Attributes\n\nYou can supply request-scoped SNS message attributes by passing 'sns_attributes'\ninto any adapter operation (e.g. 'create', 'update', 'delete'). These merge\nwith adapter defaults and schema-derived metadata.\n\n```python\nadapter = daplug_ddb.adapter(\n table=\"audit-table\",\n schema_file=\"openapi.yml\",\n identifier=\"audit_id\",\n idempotence_key=\"version\",\n sns_arn=\"arn:aws:sns:us-east-2:123456789012:audit-events\",\n sns_endpoint=\"https://sns.us-east-2.amazonaws.com\",\n sns_attributes={\"source\": \"daplug\"},\n)\nadapter.create(\n data=item,\n schema=\"AuditModel\",\n sns_attributes={\"source\": \"billing\", \"priority\": \"high\"},\n)\n# => publishes a formatted SNS event with schema metadata\n```\n\n## \ud83d\udcda Method Reference\n\nEach adapter instance holds shared configuration such as `schema_file`, SNS\ndefaults, and optional key prefixes. Pass the schema name (and any\noperation-specific overrides) when you invoke a method.\n\n```python\nadapter = daplug_ddb.adapter(\n table=\"orders\",\n schema_file=\"openapi.yml\",\n identifier=\"order_id\",\n idempotence_key=\"modified\",\n)\n```\n\n### `create` (wrapper around `insert`/`overwrite`)\n\n```python\n# default: behaves like insert with idempotence protection\nadapter.create(data=payload, schema=\"OrderModel\")\n\n# explicit overwrite (upsert semantics)\nadapter.create(\n operation=\"overwrite\",\n data=payload,\n schema=\"OrderModel\",\n)\n```\n\n### `insert`\n\n```python\nadapter.insert(data=payload, schema=\"OrderModel\")\n```\n\n### `overwrite`\n\n```python\nadapter.overwrite(data=payload, schema=\"OrderModel\")\n```\n\n### `get`\n\n```python\nadapter.get(\n query={\"Key\": {\"order_id\": \"abc123\"}},\n schema=\"OrderModel\",\n)\n```\n\n### `query`\n\n```python\nadapter.query(\n query={\n \"IndexName\": \"test_query_id\",\n \"KeyConditionExpression\": \"test_query_id = :id\",\n \"ExpressionAttributeValues\": {\":id\": \"def345\"},\n },\n schema=\"OrderModel\",\n)\n```\n\n### `scan`\n\n```python\nadapter.scan(schema=\"OrderModel\")\n\n# raw DynamoDB response\nadapter.scan(raw_scan=True)\n```\n\n### `read`\n\n`read` delegates to `get`, `query`, or `scan` based on the\n`operation` kwarg.\n\n```python\n# single item\nadapter.read(operation=\"get\", query={\"Key\": {\"order_id\": \"abc123\"}}, schema=\"OrderModel\")\n\n# query\nadapter.read(\n operation=\"query\",\n query={\n \"KeyConditionExpression\": \"test_query_id = :id\",\n \"ExpressionAttributeValues\": {\":id\": \"def345\"},\n },\n schema=\"OrderModel\",\n)\n```\n\n### `update`\n\n```python\nadapter.update(\n data={\"order_id\": \"abc123\", \"modified\": \"2024-03-02\"},\n operation=\"get\",\n query={\"Key\": {\"order_id\": \"abc123\"}},\n schema=\"OrderModel\",\n)\n```\n\n### `delete`\n\n```python\nadapter.delete(query={\"Key\": {\"order_id\": \"abc123\"}})\n```\n\n### `batch_insert`\n\n```python\nadapter.batch_insert(data=[{...} for _ in range(10)], schema=\"OrderModel\", batch_size=25)\n```\n\n### `batch_delete`\n\n```python\nadapter.batch_delete(data=[{...} for _ in range(10)], batch_size=25)\n```\n\n### Prefixing Helpers\n\nInclude per-call prefix overrides whenever you need to scope keys.\n\n```python\nadapter.insert(\n data=payload,\n schema=\"OrderModel\",\n hash_key=\"order_id\",\n hash_prefix=\"tenant#\",\n)\n```\n\n## \ud83e\uddea Local Development\n\n### Prerequisites\n\n- Python **3.9+**\n- [Pipenv](https://pipenv.pypa.io/)\n- Docker (for running DynamoDB Local during tests)\n\n### Environment Setup\n\n```bash\ngit clone https://github.com/paulcruse3/daplug-ddb.git\ncd daplug-ddb\npipenv install --dev\n```\n\n### Run Tests\n\n```bash\n# unit tests (no DynamoDB required)\npipenv run test\n\n# integration tests (spins up local DynamoDB when available)\npipenv run integrations\n```\n\nSupplying an `idempotence_key` enables optimistic concurrency for updates and overwrites. The adapter reads the original item, captures the key\u2019s value, and issues a `PutItem` with a `ConditionExpression` asserting the value is unchanged. If another writer updates the record first, DynamoDB returns a conditional check failure instead of silently overwriting data.\n\n```txt\nClient Update Request\n \u2502\n \u25bc\n [Adapter.fetch]\n \u2502 (reads original item)\n \u25bc\n\u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510\n\u2502 Original Item \u2502\n\u2502 idempotence_key = \"v1\" \u2502\n\u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518\n \u2502 merge + map\n \u25bc\nPutItem(Item=\u2026, ConditionExpression=Attr(idempotence_key).eq(\"v1\"))\n \u2502\n \u250c\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510\n \u2502 \u2502\n \u25bc \u25bc\nSuccess ConditionalCheckFailed\n (another writer changed key)\n```\n\n- **Optional:** Omit `idempotence_key` to mirror DynamoDB\u2019s default \u201clast write wins\u201d behavior while still benefiting from schema normalization.\n- **Safety:** When the key is configured but missing on the fetched item, the adapter raises `ValueError`, surfacing misconfigurations early.\n- **Events:** SNS notifications include the idempotence metadata so downstream services can reason about version changes.\n\n### Coverage & Linting\n\n```bash\n# generates HTML, XML, and JUnit reports under ./coverage/\npipenv run coverage\n\n# pylint configuration aligned with the legacy project\npipenv run lint\n```\n\n## \ud83d\udce6 Project Structure\n\n```txt\ndaplug-ddb/\n\u251c\u2500\u2500 daplug_ddb/\n\u2502\u00a0\u00a0 \u251c\u2500\u2500 adapter.py # DynamoDB adapter implementation\n\u2502\u00a0\u00a0 \u251c\u2500\u2500 prefixer.py # DynamoDB prefixer implementation\n\u2502\u00a0\u00a0 \u251c\u2500\u2500 common/ # Shared helpers (merging, schema loading, logging)\n\u2502\u00a0\u00a0 \u2514\u2500\u2500 __init__.py # Public adapter factory & exports\n\u251c\u2500\u2500 tests/\n\u2502\u00a0\u00a0 \u251c\u2500\u2500 integration/ # Integration suite against DynamoDB Local\n\u2502\u00a0\u00a0 \u251c\u2500\u2500 unit/ # Isolated unit tests using mocks\n\u2502\u00a0\u00a0 \u2514\u2500\u2500 openapi.yml # Sample schema used for mapping tests\n\u251c\u2500\u2500 Pipfile # Runtime and dev dependencies\n\u251c\u2500\u2500 setup.py # Packaging metadata\n\u2514\u2500\u2500 README.md\n```\n\n## \ud83e\udd1d Contributing\n\nContributions are welcome! Open an issue or submit a pull request if you\u2019d like\nto add new features, improve documentation, or expand test coverage.\n\n```bash\ngit checkout -b feature/amazing-improvement\n# make your changes\npipenv run lint\npipenv run test\npipenv run integrations\ngit commit -am \"feat: amazing improvement\"\ngit push origin feature/amazing-improvement\n```\n\n## \ud83d\udcc4 License\n\nApache License 2.0 \u2013 see [LICENSE](LICENSE) for full text.\n\n---\n\n> Built to keep DynamoDB integrations DRY, predictable, and schema-driven.\n",
"bugtrack_url": null,
"license": "Apache License 2.0",
"summary": "A DRY DynamoDB normalization layer extracted from Trellis Python.",
"version": "1.0.0b7",
"project_urls": {
"Bug Reports": "https://github.com/dual/daplug-ddb/issues",
"CI/CD": "https://circleci.com/gh/dual/daplug-ddb",
"Documentation": "https://github.com/dual/daplug-ddb#readme",
"Homepage": "https://github.com/dual/daplug-ddb",
"Source Code": "https://github.com/dual/daplug-ddb"
},
"split_keywords": [
"dynamodb",
" aws",
" ddb",
" normalizer",
" schema",
" sns",
" event-driven",
" database",
" adapter",
" python-library"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "07c624e97ef72411e5f55e5bc89f64875b55f3b87eb133015ca0ac473b2707e3",
"md5": "67df18b99412f0d6f6ed7d814f2d7da4",
"sha256": "39b72c196143fef544856849c14b8f66dd5d11208e4547d23bbafd93f0d90c0b"
},
"downloads": -1,
"filename": "daplug_ddb-1.0.0b7-py3-none-any.whl",
"has_sig": false,
"md5_digest": "67df18b99412f0d6f6ed7d814f2d7da4",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 28089,
"upload_time": "2025-10-31T06:28:42",
"upload_time_iso_8601": "2025-10-31T06:28:42.229097Z",
"url": "https://files.pythonhosted.org/packages/07/c6/24e97ef72411e5f55e5bc89f64875b55f3b87eb133015ca0ac473b2707e3/daplug_ddb-1.0.0b7-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "22bb2916a2e050887a28173c4af2bfba405f408c598306b3c8786dad74e6cd5a",
"md5": "97afb34eff340e1d533c1853e5b4d7f9",
"sha256": "fe23326aa059b93db6bb6c1662304cc4866702a052f8642a2c47b39c80e7106e"
},
"downloads": -1,
"filename": "daplug_ddb-1.0.0b7.tar.gz",
"has_sig": false,
"md5_digest": "97afb34eff340e1d533c1853e5b4d7f9",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 26593,
"upload_time": "2025-10-31T06:28:43",
"upload_time_iso_8601": "2025-10-31T06:28:43.266636Z",
"url": "https://files.pythonhosted.org/packages/22/bb/2916a2e050887a28173c4af2bfba405f408c598306b3c8786dad74e6cd5a/daplug_ddb-1.0.0b7.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-10-31 06:28:43",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "dual",
"github_project": "daplug-ddb",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"circle": true,
"lcname": "daplug-ddb"
}