# Datacompose
[](https://pypi.org/project/datacompose/)
[](https://www.python.org/downloads/)
[](https://github.com/your-username/datacompose)
[](https://opensource.org/licenses/MIT)
A powerful data transformation framework for building reusable, composable data cleaning pipelines in PySpark.
## Overview
Datacompose provides a declarative way to build data transformation pipelines using composable primitives. It generates optimized, standalone PySpark code that can be deployed without runtime dependencies.
## Key Features
- **Composable Primitives**: Build complex transformations from simple, reusable functions
- **Smart Partial Application**: Configure transformations with parameters for reuse
- **Pipeline Compilation**: Convert declarative pipeline definitions into optimized Spark operations
- **Code Generation**: Generate standalone PySpark code with embedded dependencies
- **Comprehensive Libraries**: Pre-built primitives for emails, addresses, and phone numbers
- **Conditional Logic**: Support for if/else branching in pipelines
- **Type-Safe Operations**: All transformations maintain Spark column type safety
## Installation
```bash
pip install datacompose
```
## Quick Start
### 1. Initialize a Project
```bash
datacompose init
```
This creates a `datacompose.json` configuration file with default settings.
### 2. Generate Transformation Code
```bash
# Generate email cleaning primitives
datacompose add clean_emails --target pyspark
# Generate address standardization primitives
datacompose add clean_addresses --target pyspark
# Generate phone number validation primitives
datacompose add clean_phone_numbers --target pyspark
```
### 3. Use the Generated Code
```python
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# Import the generated primitives
from build.pyspark.clean_emails.email_primitives import emails
# Create Spark session
spark = SparkSession.builder.appName("DataCleaning").getOrCreate()
# Load your data
df = spark.read.csv("data.csv", header=True)
# Apply email transformations
cleaned_df = df.withColumn(
"email_clean",
emails.standardize_email(F.col("email"))
).withColumn(
"email_domain",
emails.extract_domain(F.col("email_clean"))
).withColumn(
"is_valid",
emails.is_valid_email(F.col("email_clean"))
)
# Filter to valid emails only
valid_emails = cleaned_df.filter(F.col("is_valid"))
```
## Core Concepts
### PrimitiveRegistry
A container for organizing related transformation functions:
```python
from datacompose.operators.primitives import PrimitiveRegistry
# Create a registry for text operations
text = PrimitiveRegistry("text")
# Register transformation functions
@text.register()
def lowercase(col):
return F.lower(col)
@text.register()
def remove_spaces(col):
return F.regexp_replace(col, r'\s+', '')
# Use the transformations
df = df.withColumn("clean_text", text.lowercase(F.col("input")))
```
### SmartPrimitive
Enables partial application of transformations:
```python
@text.register()
def trim(col, chars=' '):
return F.trim(col, chars)
# Direct usage
df = df.withColumn("trimmed", text.trim(F.col("input")))
# Pre-configured usage
trim_tabs = text.trim(chars='\t')
df = df.withColumn("no_tabs", trim_tabs(F.col("input")))
```
### Pipeline Composition
Build complex pipelines from simple primitives:
```python
@text.compose(text=text)
def clean_pipeline():
text.trim()
text.lowercase()
text.remove_spaces()
# Apply the entire pipeline
df = df.withColumn("cleaned", clean_pipeline(F.col("input")))
```
### Conditional Pipelines
Add conditional logic to your transformations:
```python
@text.register(is_conditional=True)
def is_valid_length(col):
return F.length(col) > 5
@text.register()
def truncate(col):
return F.substring(col, 1, 5)
@text.compose(text=text)
def smart_truncate():
if text.is_valid_length():
text.truncate()
```
## Available Primitives
### Email Primitives
```python
from build.pyspark.clean_emails.email_primitives import emails
# Validation
emails.is_valid_email(col)
emails.is_business_email(col)
emails.is_disposable_email(col)
# Extraction
emails.extract_domain(col)
emails.extract_username(col)
emails.extract_tld(col)
# Standardization
emails.standardize_email(col)
emails.normalize_gmail(col)
emails.fix_common_typos(col)
# Filtering
emails.filter_valid_emails(col)
emails.filter_business_emails(col)
```
### Address Primitives
```python
from build.pyspark.clean_addresses.address_primitives import addresses
# Extraction
addresses.extract_street_number(col)
addresses.extract_street_name(col)
addresses.extract_city(col)
addresses.extract_state(col)
addresses.extract_zip_code(col)
# Standardization
addresses.standardize_state(col)
addresses.standardize_street_suffix(col)
addresses.standardize_direction(col)
# Validation
addresses.is_valid_zip_code(col)
addresses.is_valid_state(col)
addresses.is_po_box(col)
```
### Phone Number Primitives
```python
from build.pyspark.clean_phone_numbers.phone_primitives import phones
# Validation
phones.is_valid_nanp(col)
phones.is_valid_international(col)
phones.is_toll_free(col)
# Extraction
phones.extract_country_code(col)
phones.extract_area_code(col)
phones.extract_exchange(col)
phones.extract_subscriber(col)
# Formatting
phones.format_nanp(col)
phones.format_e164(col)
phones.format_international(col)
# Standardization
phones.standardize_phone(col)
phones.clean_phone(col)
```
## Advanced Usage
### Creating Custom Primitives
```python
from datacompose.operators.primitives import PrimitiveRegistry
# Create your own registry
custom = PrimitiveRegistry("custom")
@custom.register()
def remove_special_chars(col):
return F.regexp_replace(col, r'[^a-zA-Z0-9\s]', '')
@custom.register()
def capitalize_words(col):
return F.initcap(col)
@custom.register(is_conditional=True)
def contains_numbers(col):
return col.rlike(r'\d+')
# Create a pipeline with your custom primitives
@custom.compose(custom=custom)
def clean_text():
custom.remove_special_chars()
if custom.contains_numbers():
custom.capitalize_words()
```
### Working with Parameters
```python
@custom.register()
def pad_string(col, length=10, fill_char='0'):
return F.lpad(col, length, fill_char)
# Use with different parameters
df = df.withColumn("padded_10", custom.pad_string(F.col("id")))
df = df.withColumn("padded_5", custom.pad_string(length=5)(F.col("id")))
df = df.withColumn("padded_x", custom.pad_string(length=8, fill_char='X')(F.col("id")))
```
### Combining Multiple Registries
```python
from build.pyspark.clean_emails.email_primitives import emails
from build.pyspark.clean_phones.phone_primitives import phones
# Create a combined validation pipeline
validation = PrimitiveRegistry("validation")
@validation.compose(emails=emails, phones=phones)
def validate_contact_info():
# Check email
if emails.is_valid_email():
emails.standardize_email()
# Check phone
if phones.is_valid_phone():
phones.standardize_phone()
```
## CLI Commands
### Initialize a Project
```bash
datacompose init [--yes]
```
### Add Transformers
```bash
datacompose add <transformer> [--target TARGET] [--output OUTPUT] [--verbose]
# Examples
datacompose add clean_emails --target pyspark
datacompose add clean_addresses --target pyspark --output ./custom/path
datacompose add clean_phone_numbers --target pyspark --verbose
```
### List Available Transformers
```bash
datacompose list transformers
datacompose list generators
```
## Project Structure
After running `datacompose add`, your project will have the following structure:
```
project/
├── datacompose.json # Configuration file
├── build/
│ └── pyspark/
│ ├── clean_emails/
│ │ ├── email_primitives.py # Generated email primitives
│ │ └── utils/
│ │ └── primitives.py # Core framework (embedded)
│ ├── clean_addresses/
│ │ ├── address_primitives.py
│ │ └── utils/
│ │ └── primitives.py
│ └── clean_phone_numbers/
│ ├── phone_primitives.py
│ └── utils/
│ └── primitives.py
```
## Configuration
The `datacompose.json` file configures default settings:
```json
{
"version": "1.0.0",
"targets": {
"pyspark": {
"output": "./build/pyspark",
"generator": "SparkPandasUDFGenerator"
}
},
"templates": {
"directory": "src/transformers/templates"
}
}
```
## Performance Considerations
- Primitives are designed to be efficient Spark operations
- Pipelines are compiled to minimize intermediate columns
- Conditional logic uses Spark's `when/otherwise` for vectorized operations
- Generated code has no runtime dependencies beyond PySpark
## Philosophy & Inspiration
Datacompose is inspired by [shadcn-svelte](https://www.shadcn-svelte.com/) and [huntabyte](https://github.com/huntabyte)'s approach to component libraries. Just as shadcn-svelte provides "copy and paste" components rather than npm packages, Datacompose generates data transformation code that becomes part of YOUR codebase.
**Why we believe in this approach:**
- **You Own Your Code**: No external dependencies to manage or worry about breaking changes
- **Full Transparency**: Every transformation is readable, debuggable PySpark code you can understand
- **Customization First**: Need to adjust transformation? Just edit the code
- **Learn by Reading**: The generated code serves as documentation and learning material
This is NOT a traditional library - it's a code generator that gives you production-ready data transformation primitives that you can modify to fit your exact needs.
## Test Coverage
**Critical components are thoroughly tested:**
| Component | Coverage | Tests |
|-----------|----------|-------|
| **Phone Number Primitives** | 95% | ✅ All formats validated |
| **Address Primitives** | 94% | ✅ Full parsing tested |
| **Email Primitives** | 89% | ✅ RFC compliant |
| **Code Generation** | 87-91% | ✅ All targets verified |
**335 tests passing** • **76% overall coverage**
## License
MIT License - see LICENSE file for details
Raw data
{
"_id": null,
"home_page": null,
"name": "datacompose",
"maintainer": "Datacompose Contributors",
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": null,
"keywords": "data-cleaning, data-quality, udf, spark, postgres, code-generation, data-pipeline, etl",
"author": "Datacompose Contributors",
"author_email": null,
"download_url": "https://files.pythonhosted.org/packages/9f/c2/cafc91b61996f1b76f18f393601fb890230deb2cdea64686521330db22e6/datacompose-0.2.4.1.tar.gz",
"platform": null,
"description": "# Datacompose\n\n[](https://pypi.org/project/datacompose/)\n[](https://www.python.org/downloads/)\n[](https://github.com/your-username/datacompose)\n[](https://opensource.org/licenses/MIT)\n\nA powerful data transformation framework for building reusable, composable data cleaning pipelines in PySpark.\n\n## Overview\n\nDatacompose provides a declarative way to build data transformation pipelines using composable primitives. It generates optimized, standalone PySpark code that can be deployed without runtime dependencies.\n\n## Key Features\n\n- **Composable Primitives**: Build complex transformations from simple, reusable functions\n- **Smart Partial Application**: Configure transformations with parameters for reuse\n- **Pipeline Compilation**: Convert declarative pipeline definitions into optimized Spark operations\n- **Code Generation**: Generate standalone PySpark code with embedded dependencies\n- **Comprehensive Libraries**: Pre-built primitives for emails, addresses, and phone numbers\n- **Conditional Logic**: Support for if/else branching in pipelines\n- **Type-Safe Operations**: All transformations maintain Spark column type safety\n\n## Installation\n\n```bash\npip install datacompose\n```\n\n## Quick Start\n\n### 1. Initialize a Project\n\n```bash\ndatacompose init\n```\n\nThis creates a `datacompose.json` configuration file with default settings.\n\n### 2. Generate Transformation Code\n\n```bash\n# Generate email cleaning primitives\ndatacompose add clean_emails --target pyspark\n\n# Generate address standardization primitives \ndatacompose add clean_addresses --target pyspark\n\n# Generate phone number validation primitives\ndatacompose add clean_phone_numbers --target pyspark\n```\n\n### 3. Use the Generated Code\n\n```python\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql import functions as F\n\n# Import the generated primitives\nfrom build.pyspark.clean_emails.email_primitives import emails\n\n# Create Spark session\nspark = SparkSession.builder.appName(\"DataCleaning\").getOrCreate()\n\n# Load your data\ndf = spark.read.csv(\"data.csv\", header=True)\n\n# Apply email transformations\ncleaned_df = df.withColumn(\n \"email_clean\",\n emails.standardize_email(F.col(\"email\"))\n).withColumn(\n \"email_domain\",\n emails.extract_domain(F.col(\"email_clean\"))\n).withColumn(\n \"is_valid\",\n emails.is_valid_email(F.col(\"email_clean\"))\n)\n\n# Filter to valid emails only\nvalid_emails = cleaned_df.filter(F.col(\"is_valid\"))\n```\n\n## Core Concepts\n\n### PrimitiveRegistry\n\nA container for organizing related transformation functions:\n\n```python\nfrom datacompose.operators.primitives import PrimitiveRegistry\n\n# Create a registry for text operations\ntext = PrimitiveRegistry(\"text\")\n\n# Register transformation functions\n@text.register()\ndef lowercase(col):\n return F.lower(col)\n\n@text.register()\ndef remove_spaces(col):\n return F.regexp_replace(col, r'\\s+', '')\n\n# Use the transformations\ndf = df.withColumn(\"clean_text\", text.lowercase(F.col(\"input\")))\n```\n\n### SmartPrimitive\n\nEnables partial application of transformations:\n\n```python\n@text.register()\ndef trim(col, chars=' '):\n return F.trim(col, chars)\n\n# Direct usage\ndf = df.withColumn(\"trimmed\", text.trim(F.col(\"input\")))\n\n# Pre-configured usage\ntrim_tabs = text.trim(chars='\\t')\ndf = df.withColumn(\"no_tabs\", trim_tabs(F.col(\"input\")))\n```\n\n### Pipeline Composition\n\nBuild complex pipelines from simple primitives:\n\n```python\n@text.compose(text=text)\ndef clean_pipeline():\n text.trim()\n text.lowercase()\n text.remove_spaces()\n\n# Apply the entire pipeline\ndf = df.withColumn(\"cleaned\", clean_pipeline(F.col(\"input\")))\n```\n\n### Conditional Pipelines\n\nAdd conditional logic to your transformations:\n\n```python\n@text.register(is_conditional=True)\ndef is_valid_length(col):\n return F.length(col) > 5\n\n@text.register()\ndef truncate(col):\n return F.substring(col, 1, 5)\n\n@text.compose(text=text)\ndef smart_truncate():\n if text.is_valid_length():\n text.truncate()\n```\n\n## Available Primitives\n\n### Email Primitives\n\n```python\nfrom build.pyspark.clean_emails.email_primitives import emails\n\n# Validation\nemails.is_valid_email(col)\nemails.is_business_email(col)\nemails.is_disposable_email(col)\n\n# Extraction\nemails.extract_domain(col)\nemails.extract_username(col)\nemails.extract_tld(col)\n\n# Standardization\nemails.standardize_email(col)\nemails.normalize_gmail(col)\nemails.fix_common_typos(col)\n\n# Filtering\nemails.filter_valid_emails(col)\nemails.filter_business_emails(col)\n```\n\n### Address Primitives\n\n```python\nfrom build.pyspark.clean_addresses.address_primitives import addresses\n\n# Extraction\naddresses.extract_street_number(col)\naddresses.extract_street_name(col)\naddresses.extract_city(col)\naddresses.extract_state(col)\naddresses.extract_zip_code(col)\n\n# Standardization\naddresses.standardize_state(col)\naddresses.standardize_street_suffix(col)\naddresses.standardize_direction(col)\n\n# Validation\naddresses.is_valid_zip_code(col)\naddresses.is_valid_state(col)\naddresses.is_po_box(col)\n```\n\n### Phone Number Primitives\n\n```python\nfrom build.pyspark.clean_phone_numbers.phone_primitives import phones\n\n# Validation\nphones.is_valid_nanp(col)\nphones.is_valid_international(col)\nphones.is_toll_free(col)\n\n# Extraction\nphones.extract_country_code(col)\nphones.extract_area_code(col)\nphones.extract_exchange(col)\nphones.extract_subscriber(col)\n\n# Formatting\nphones.format_nanp(col)\nphones.format_e164(col)\nphones.format_international(col)\n\n# Standardization\nphones.standardize_phone(col)\nphones.clean_phone(col)\n```\n\n## Advanced Usage\n\n### Creating Custom Primitives\n\n```python\nfrom datacompose.operators.primitives import PrimitiveRegistry\n\n# Create your own registry\ncustom = PrimitiveRegistry(\"custom\")\n\n@custom.register()\ndef remove_special_chars(col):\n return F.regexp_replace(col, r'[^a-zA-Z0-9\\s]', '')\n\n@custom.register()\ndef capitalize_words(col):\n return F.initcap(col)\n\n@custom.register(is_conditional=True)\ndef contains_numbers(col):\n return col.rlike(r'\\d+')\n\n# Create a pipeline with your custom primitives\n@custom.compose(custom=custom)\ndef clean_text():\n custom.remove_special_chars()\n if custom.contains_numbers():\n custom.capitalize_words()\n```\n\n### Working with Parameters\n\n```python\n@custom.register()\ndef pad_string(col, length=10, fill_char='0'):\n return F.lpad(col, length, fill_char)\n\n# Use with different parameters\ndf = df.withColumn(\"padded_10\", custom.pad_string(F.col(\"id\")))\ndf = df.withColumn(\"padded_5\", custom.pad_string(length=5)(F.col(\"id\")))\ndf = df.withColumn(\"padded_x\", custom.pad_string(length=8, fill_char='X')(F.col(\"id\")))\n```\n\n### Combining Multiple Registries\n\n```python\nfrom build.pyspark.clean_emails.email_primitives import emails\nfrom build.pyspark.clean_phones.phone_primitives import phones\n\n# Create a combined validation pipeline\nvalidation = PrimitiveRegistry(\"validation\")\n\n@validation.compose(emails=emails, phones=phones)\ndef validate_contact_info():\n # Check email\n if emails.is_valid_email():\n emails.standardize_email()\n \n # Check phone\n if phones.is_valid_phone():\n phones.standardize_phone()\n```\n\n## CLI Commands\n\n### Initialize a Project\n```bash\ndatacompose init [--yes]\n```\n\n### Add Transformers\n```bash\ndatacompose add <transformer> [--target TARGET] [--output OUTPUT] [--verbose]\n\n# Examples\ndatacompose add clean_emails --target pyspark\ndatacompose add clean_addresses --target pyspark --output ./custom/path\ndatacompose add clean_phone_numbers --target pyspark --verbose\n```\n\n### List Available Transformers\n```bash\ndatacompose list transformers\ndatacompose list generators\n```\n\n## Project Structure\n\nAfter running `datacompose add`, your project will have the following structure:\n\n```\nproject/\n\u251c\u2500\u2500 datacompose.json # Configuration file\n\u251c\u2500\u2500 build/\n\u2502 \u2514\u2500\u2500 pyspark/\n\u2502 \u251c\u2500\u2500 clean_emails/\n\u2502 \u2502 \u251c\u2500\u2500 email_primitives.py # Generated email primitives\n\u2502 \u2502 \u2514\u2500\u2500 utils/\n\u2502 \u2502 \u2514\u2500\u2500 primitives.py # Core framework (embedded)\n\u2502 \u251c\u2500\u2500 clean_addresses/\n\u2502 \u2502 \u251c\u2500\u2500 address_primitives.py\n\u2502 \u2502 \u2514\u2500\u2500 utils/\n\u2502 \u2502 \u2514\u2500\u2500 primitives.py\n\u2502 \u2514\u2500\u2500 clean_phone_numbers/\n\u2502 \u251c\u2500\u2500 phone_primitives.py\n\u2502 \u2514\u2500\u2500 utils/\n\u2502 \u2514\u2500\u2500 primitives.py\n```\n\n## Configuration\n\nThe `datacompose.json` file configures default settings:\n\n```json\n{\n \"version\": \"1.0.0\",\n \"targets\": {\n \"pyspark\": {\n \"output\": \"./build/pyspark\",\n \"generator\": \"SparkPandasUDFGenerator\"\n }\n },\n \"templates\": {\n \"directory\": \"src/transformers/templates\"\n }\n}\n```\n\n\n\n## Performance Considerations\n\n- Primitives are designed to be efficient Spark operations\n- Pipelines are compiled to minimize intermediate columns\n- Conditional logic uses Spark's `when/otherwise` for vectorized operations\n- Generated code has no runtime dependencies beyond PySpark\n\n## Philosophy & Inspiration\n\nDatacompose is inspired by [shadcn-svelte](https://www.shadcn-svelte.com/) and [huntabyte](https://github.com/huntabyte)'s approach to component libraries. Just as shadcn-svelte provides \"copy and paste\" components rather than npm packages, Datacompose generates data transformation code that becomes part of YOUR codebase.\n\n**Why we believe in this approach:**\n\n- **You Own Your Code**: No external dependencies to manage or worry about breaking changes\n- **Full Transparency**: Every transformation is readable, debuggable PySpark code you can understand\n- **Customization First**: Need to adjust transformation? Just edit the code\n- **Learn by Reading**: The generated code serves as documentation and learning material\n\nThis is NOT a traditional library - it's a code generator that gives you production-ready data transformation primitives that you can modify to fit your exact needs.\n\n\n## Test Coverage\n\n**Critical components are thoroughly tested:**\n\n| Component | Coverage | Tests |\n|-----------|----------|-------|\n| **Phone Number Primitives** | 95% | \u2705 All formats validated |\n| **Address Primitives** | 94% | \u2705 Full parsing tested |\n| **Email Primitives** | 89% | \u2705 RFC compliant |\n| **Code Generation** | 87-91% | \u2705 All targets verified |\n\n**335 tests passing** \u2022 **76% overall coverage**\n\n## License\n\nMIT License - see LICENSE file for details\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Copy-pasteable data transformation primitives for PySpark. Inspired by shadcn-svelte.",
"version": "0.2.4.1",
"project_urls": {
"Changelog": "https://github.com/tc-cole/datacompose/blob/main/CHANGELOG.md",
"Documentation": "https://github.com/tc-cole/datacompose/tree/main/docs",
"Homepage": "https://github.com/tc-cole/datacompose",
"Issues": "https://github.com/tc-cole/datacompose/issues",
"Repository": "https://github.com/tc-cole/datacompose.git"
},
"split_keywords": [
"data-cleaning",
" data-quality",
" udf",
" spark",
" postgres",
" code-generation",
" data-pipeline",
" etl"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "c0b544be81ff0e0201caa20cee6cb6d0edbc4caea18fe3e47d37bb79388a2bae",
"md5": "1a1e7c62eef0cecce973390290a9c757",
"sha256": "6d93c1fa45d7a3f3ea63a3b41e92c46bfe709eb2cb3b72994a936cb2eaeceaa1"
},
"downloads": -1,
"filename": "datacompose-0.2.4.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "1a1e7c62eef0cecce973390290a9c757",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 52919,
"upload_time": "2025-08-13T18:47:52",
"upload_time_iso_8601": "2025-08-13T18:47:52.072331Z",
"url": "https://files.pythonhosted.org/packages/c0/b5/44be81ff0e0201caa20cee6cb6d0edbc4caea18fe3e47d37bb79388a2bae/datacompose-0.2.4.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "9fc2cafc91b61996f1b76f18f393601fb890230deb2cdea64686521330db22e6",
"md5": "0118d63b7c70bb20a0b3793f020cace9",
"sha256": "64a133651287d2d4494ec3d582e95240025d33433787ec0372349a2a9123b0fc"
},
"downloads": -1,
"filename": "datacompose-0.2.4.1.tar.gz",
"has_sig": false,
"md5_digest": "0118d63b7c70bb20a0b3793f020cace9",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 142711,
"upload_time": "2025-08-13T18:47:53",
"upload_time_iso_8601": "2025-08-13T18:47:53.387904Z",
"url": "https://files.pythonhosted.org/packages/9f/c2/cafc91b61996f1b76f18f393601fb890230deb2cdea64686521330db22e6/datacompose-0.2.4.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-08-13 18:47:53",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "tc-cole",
"github_project": "datacompose",
"github_not_found": true,
"lcname": "datacompose"
}