# SparkForge β‘
> **The modern data pipeline framework for Apache Spark & Delta Lake**
[](https://pypi.org/project/sparkforge/)
[](https://www.python.org/downloads/)
[](https://opensource.org/licenses/MIT)
[](https://sparkforge.readthedocs.io/)
[](https://github.com/eddiethedean/sparkforge)
[](https://github.com/eddiethedean/sparkforge)
[](https://github.com/eddiethedean/sparkforge)
[](https://github.com/eddiethedean/sparkforge/actions)
**SparkForge** is a production-ready data pipeline framework that transforms complex Spark + Delta Lake development into clean, maintainable code. Built on the proven Medallion Architecture (Bronze β Silver β Gold), it eliminates boilerplate while providing enterprise-grade features.
## β¨ Why SparkForge?
| **Before SparkForge** | **With SparkForge** |
|----------------------|-------------------|
| 200+ lines of complex Spark code | 20 lines of clean, readable code |
| Manual dependency management | Automatic inference & validation |
| Scattered validation logic | Centralized, configurable rules |
| Hard-to-debug pipelines | Step-by-step execution & debugging |
| No built-in error handling | Comprehensive error management |
| Manual schema management | Multi-schema support out-of-the-box |
## π Quick Start
### Installation
**With PySpark (for production):**
```bash
pip install sparkforge[pyspark]
```
**With mock-spark (for testing/development):**
```bash
pip install sparkforge[mock]
```
**For PySpark compatibility testing:**
```bash
pip install sparkforge[compat-test]
```
**Note:** SparkForge now supports both PySpark and mock-spark. Choose the installation method that best fits your use case. The framework automatically detects which engine is available.
### Quick Start Example
```python
from sparkforge.pipeline.builder import PipelineBuilder
from pyspark.sql import SparkSession, functions as F
# Initialize Spark
spark = SparkSession.builder.appName("QuickStart").getOrCreate()
# Sample data
data = [
("user1", "prod1", 2, 29.99),
("user2", "prod2", 1, 49.99),
("user3", "prod1", 3, 29.99),
]
df = spark.createDataFrame(data, ["user_id", "product_id", "quantity", "price"])
# Build and execute pipeline
pipeline = PipelineBuilder() \
.add_bronze_step("raw_orders", df) \
.add_silver_step("clean_orders", "raw_orders",
validation_rules={"quantity": ["positive"]}) \
.add_gold_step("summary", "clean_orders",
transform_func=lambda df: df.groupBy("product_id")
.agg(F.sum("quantity").alias("total"))) \
.build()
results = pipeline.execute()
results["summary"].show()
```
### Your First Pipeline
```python
from sparkforge import PipelineBuilder
from pyspark.sql import SparkSession, functions as F
# Initialize Spark
spark = SparkSession.builder.appName("EcommerceAnalytics").getOrCreate()
# Sample e-commerce data
events_data = [
("user_123", "purchase", "2024-01-15 10:30:00", 99.99, "electronics"),
("user_456", "view", "2024-01-15 11:15:00", 49.99, "clothing"),
("user_123", "add_to_cart", "2024-01-15 12:00:00", 29.99, "books"),
("user_789", "purchase", "2024-01-15 14:30:00", 199.99, "electronics"),
]
source_df = spark.createDataFrame(events_data, ["user_id", "action", "timestamp", "price", "category"])
# Build the pipeline
builder = PipelineBuilder(spark=spark, schema="analytics")
# Bronze: Raw event ingestion with validation (using string rules)
builder.with_bronze_rules(
name="events",
rules={
"user_id": ["not_null"],
"action": ["not_null"],
"price": ["gt", 0] # Greater than 0
},
incremental_col="timestamp"
)
# Silver: Clean and enrich the data
builder.add_silver_transform(
name="enriched_events",
source_bronze="events",
transform=lambda spark, df, silvers: (
df.withColumn("event_date", F.to_date("timestamp"))
.withColumn("hour", F.hour("timestamp"))
.withColumn("is_purchase", F.col("action") == "purchase")
.filter(F.col("user_id").isNotNull())
),
rules={
"user_id": ["not_null"],
"event_date": ["not_null"]
},
table_name="enriched_events"
)
# Gold: Business analytics
builder.add_gold_transform(
name="daily_revenue",
source_silvers=["enriched_events"],
transform=lambda spark, silvers: (
silvers["enriched_events"]
.filter(F.col("is_purchase"))
.groupBy("event_date")
.agg(
F.count("*").alias("total_purchases"),
F.sum("price").alias("total_revenue"),
F.countDistinct("user_id").alias("unique_customers")
)
.orderBy("event_date")
),
rules={
"event_date": ["not_null"],
"total_revenue": ["gte", 0] # Greater than or equal to 0
},
table_name="daily_revenue"
)
# Execute the pipeline
pipeline = builder.to_pipeline()
result = pipeline.run_initial_load(bronze_sources={"events": source_df})
print(f"β
Pipeline completed: {result.status}")
print(f"π Processed {result.metrics.total_rows_written} rows")
```
## β‘ Smart Parallel Execution
SparkForge automatically analyzes your pipeline dependencies and executes independent steps in parallel for maximum performance. **No configuration needed** - it just works!
### How It Works
```python
# Your pipeline with multiple independent steps
builder = PipelineBuilder(spark=spark, schema="analytics")
# These 3 bronze steps will run in parallel
builder.with_bronze_rules(name="events_a", rules={"id": ["not_null"]})
builder.with_bronze_rules(name="events_b", rules={"id": ["not_null"]})
builder.with_bronze_rules(name="events_c", rules={"id": ["not_null"]})
# These 3 silver steps will also run in parallel (after bronze completes)
builder.add_silver_transform(name="clean_a", source_bronze="events_a", ...)
builder.add_silver_transform(name="clean_b", source_bronze="events_b", ...)
builder.add_silver_transform(name="clean_c", source_bronze="events_c", ...)
# Gold step runs after all silver steps complete
builder.add_gold_transform(name="analytics", source_silvers=["clean_a", "clean_b", "clean_c"], ...)
pipeline = builder.to_pipeline()
result = pipeline.run_initial_load(bronze_sources={...})
# Check parallel execution metrics
print(f"β‘ Parallel efficiency: {result.parallel_efficiency:.1f}%")
print(f"π Execution groups: {result.execution_groups_count}")
print(f"π Max parallelism: {result.max_group_size} concurrent steps")
```
### Execution Flow
```
Timeline (with 3 independent steps, 2s each):
Sequential Execution (old): Parallel Execution (new):
ββββββββ ββββββββ
βStep Aβ 2s βStep Aββββ
ββββββββ€ ββββββββ€ β
βStep Bβ 2s βStep Bββββ€ All 3 run
ββββββββ€ ββββββββ€ β in parallel!
βStep Cβ 2s βStep Cββββ
ββββββββ ββββββββ
Total: 6s Total: 2s β‘
3x faster!
```
### Performance Configuration
```python
from sparkforge.models import PipelineConfig
# Default: Parallel enabled with 4 workers (recommended)
builder = PipelineBuilder(spark=spark, schema="analytics")
# High-performance: 16 workers for maximum throughput
config = PipelineConfig.create_high_performance(schema="analytics")
# Conservative: Sequential execution (1 worker)
config = PipelineConfig.create_conservative(schema="analytics")
# Custom configuration
from sparkforge.models import ParallelConfig, ValidationThresholds
config = PipelineConfig(
schema="analytics",
thresholds=ValidationThresholds.create_default(),
parallel=ParallelConfig(enabled=True, max_workers=8, timeout_secs=600),
verbose=True
)
```
### Key Benefits
- π **Automatic optimization** - No manual configuration needed
- β‘ **3-5x faster** for pipelines with independent steps
- π§ **Dependency-aware** - Automatically respects step dependencies
- π **Observable** - Detailed metrics show parallelization effectiveness
- π **Thread-safe** - Built-in protection against race conditions
- π― **Zero code changes** - Works with existing pipelines
## π¨ String Rules - Human-Readable Validation
SparkForge supports both PySpark expressions and human-readable string rules:
```python
# String rules (automatically converted to PySpark expressions)
rules = {
"user_id": ["not_null"], # F.col("user_id").isNotNull()
"age": ["gt", 0], # F.col("age") > 0
"status": ["in", ["active", "inactive"]], # F.col("status").isin(["active", "inactive"])
"score": ["between", 0, 100], # F.col("score").between(0, 100)
"email": ["like", "%@%.%"] # F.col("email").like("%@%.%")
}
# Or use PySpark expressions directly
rules = {
"user_id": [F.col("user_id").isNotNull()],
"age": [F.col("age") > 0],
"status": [F.col("status").isin(["active", "inactive"])]
}
```
**Supported String Rules:**
- `"not_null"` β `F.col("column").isNotNull()`
- `"gt", value` β `F.col("column") > value`
- `"gte", value` β `F.col("column") >= value`
- `"lt", value` β `F.col("column") < value`
- `"lte", value` β `F.col("column") <= value`
- `"eq", value` β `F.col("column") == value`
- `"in", [values]` β `F.col("column").isin(values)`
- `"between", min, max` β `F.col("column").between(min, max)`
- `"like", pattern` β `F.col("column").like(pattern)`
## π― Core Features
### ποΈ **Medallion Architecture Made Simple**
- **Bronze Layer**: Raw data ingestion with validation
- **Silver Layer**: Cleaned, enriched, and transformed data
- **Gold Layer**: Business-ready analytics and metrics
- **Automatic dependency management** between layers
### β‘ **Developer Experience**
- **70% less boilerplate** compared to raw Spark
- **Auto-inference** of data dependencies
- **Step-by-step debugging** for complex pipelines
- **Preset configurations** for dev/prod/test environments
- **Comprehensive error handling** with actionable messages
### π‘οΈ **Production Ready**
- **Robust validation system** with early error detection
- **Configurable validation thresholds** (Bronze: 90%, Silver: 95%, Gold: 98%)
- **Delta Lake integration** with ACID transactions
- **Multi-schema support** for enterprise environments
- **Performance monitoring** and optimization
- **Comprehensive logging** and audit trails
- **83% test coverage** with 1,441 comprehensive tests
- **100% type safety** with mypy compliance
- **Security hardened** with zero security vulnerabilities
### π§ **Advanced Capabilities**
- **Smart parallel execution** - Automatic dependency analysis and concurrent step execution (enabled by default)
- **String rules support** - Human-readable validation rules (`"not_null"`, `"gt", 0`, `"in", ["active", "inactive"]`)
- **Column filtering control** - choose what gets preserved
- **Incremental processing** with watermarking
- **Schema evolution** support
- **Time travel** and data versioning
- **Concurrent write handling**
## π Examples & Use Cases
### π― **Core Examples**
- **[Hello World](https://github.com/eddiethedean/sparkforge/blob/main/examples/core/hello_world.py)** - 3-line pipeline introduction
- **[Basic Pipeline](https://github.com/eddiethedean/sparkforge/blob/main/examples/core/basic_pipeline.py)** - Complete Bronze β Silver β Gold flow
- **[Step-by-Step Debugging](https://github.com/eddiethedean/sparkforge/blob/main/examples/core/step_by_step_execution.py)** - Debug individual steps
### π **Advanced Features**
- **[Auto-Inference](https://github.com/eddiethedean/sparkforge/blob/main/examples/advanced/auto_infer_source_bronze_simple.py)** - Automatic dependency detection
- **[Multi-Schema Support](https://github.com/eddiethedean/sparkforge/blob/main/examples/advanced/multi_schema_pipeline.py)** - Cross-schema data flows
- **[Column Filtering](https://github.com/eddiethedean/sparkforge/blob/main/examples/specialized/column_filtering_behavior.py)** - Control data preservation
### π’ **Real-World Use Cases**
- **[E-commerce Analytics](https://github.com/eddiethedean/sparkforge/blob/main/examples/usecases/ecommerce_analytics.py)** - Order processing, customer insights
- **[IoT Sensor Data](https://github.com/eddiethedean/sparkforge/blob/main/examples/usecases/iot_sensor_pipeline.py)** - Real-time sensor processing
- **[Business Intelligence](https://github.com/eddiethedean/sparkforge/blob/main/examples/usecases/step_by_step_debugging.py)** - KPI dashboards, reporting
## π LogWriter - Pipeline Execution Tracking
Track and analyze your pipeline executions with the simplified LogWriter API:
### Quick Example
```python
from sparkforge import PipelineBuilder, LogWriter
# Build and run your pipeline
builder = PipelineBuilder(spark, schema="analytics")
# ... add steps ...
pipeline = builder.to_pipeline()
report = pipeline.run_initial_load(bronze_sources={"events": df})
# Initialize LogWriter (simple API - just schema and table name!)
writer = LogWriter(spark, schema="logs", table_name="pipeline_execution")
# Create log table from first report
writer.create_table(report)
# Append subsequent runs
report2 = pipeline.run_incremental(bronze_sources={"events": df2})
writer.append(report2)
# Query your logs
logs = spark.table("logs.pipeline_execution")
logs.show()
```
### Key Features
- β
**Simple initialization** - Just provide `schema` and `table_name`
- β
**Works with PipelineReport** - Direct integration with pipeline results
- β
**Easy methods** - `create_table()` and `append()` for intuitive workflow
- β
**Comprehensive metrics** - Tracks rows processed, durations, success rates
- β
**Detailed metadata** - Layer durations, parallel efficiency, warnings, recommendations
- β
**Emoji-rich output** - Visual feedback during execution (πβ
β)
### What Gets Logged
Each pipeline execution is logged with:
- **Run information**: run_id, mode (initial/incremental), timestamps
- **Execution metrics**: total steps, successful/failed counts, durations by layer
- **Data metrics**: rows processed, rows written, validation rates
- **Performance**: parallel efficiency, execution groups, max parallelism
- **Status**: success/failure, error messages, warnings, recommendations
### Example Log Query
```python
# Get recent pipeline runs
recent_runs = spark.sql("""
SELECT run_id, run_mode, success, rows_written, duration_secs
FROM logs.pipeline_execution
WHERE run_started_at >= current_date() - 7
ORDER BY run_started_at DESC
""")
# Analyze performance trends
performance = spark.sql("""
SELECT
DATE(run_started_at) as date,
COUNT(*) as runs,
AVG(duration_secs) as avg_duration,
SUM(rows_written) as total_rows
FROM logs.pipeline_execution
WHERE success = true
GROUP BY DATE(run_started_at)
ORDER BY date DESC
""")
```
See **[examples/specialized/logwriter_simple_example.py](https://github.com/eddiethedean/sparkforge/blob/main/examples/specialized/logwriter_simple_example.py)** for a complete working example.
## π οΈ Installation & Setup
### Prerequisites
- **Python 3.8+** (tested with 3.8, 3.9, 3.10, 3.11)
- **Java 8+** (for PySpark)
- **PySpark 3.2.4+**
- **Delta Lake 1.2.0+**
### Quick Install
```bash
# Install from PyPI
pip install sparkforge
# Verify installation
python -c "import sparkforge; print(f'SparkForge {sparkforge.__version__} installed!')"
```
### Development Install
```bash
# Clone the repository
git clone https://github.com/eddiethedean/sparkforge.git
cd sparkforge
# Setup Python 3.8 environment with PySpark 3.2
python3.8 -m venv venv38
source venv38/bin/activate
pip install --upgrade pip
# Install with all dependencies
pip install -e ".[dev,test,docs]"
# Verify installation
python test_environment.py
```
**Quick Setup Script** (Recommended):
```bash
bash setup.sh # Automated setup for development environment
```
See [QUICKSTART.md](https://github.com/eddiethedean/sparkforge/blob/main/QUICKSTART.md) for detailed setup instructions.
## π Documentation
### π **Complete Documentation**
- **[π Full Documentation](https://sparkforge.readthedocs.io/)** - Comprehensive guides and API reference
- **[β‘ 5-Minute Quick Start](https://sparkforge.readthedocs.io/en/latest/quick_start_5_min.html)** - Get running fast
- **[π― User Guide](https://sparkforge.readthedocs.io/en/latest/user_guide.html)** - Complete feature walkthrough
- **[π§ API Reference](https://sparkforge.readthedocs.io/en/latest/api_reference.html)** - Detailed API documentation
### π― **Use Case Guides**
- **[π E-commerce Analytics](https://sparkforge.readthedocs.io/en/latest/usecase_ecommerce.html)** - Order processing, customer analytics
- **[π‘ IoT Data Processing](https://sparkforge.readthedocs.io/en/latest/usecase_iot.html)** - Sensor data, anomaly detection
- **[π Business Intelligence](https://sparkforge.readthedocs.io/en/latest/usecase_bi.html)** - Dashboards, KPIs, reporting
## π§ͺ Testing & Quality
SparkForge includes a comprehensive test suite with **1,441 tests** covering all functionality:
```bash
# Run all tests with coverage and type checking (recommended)
make test
# Run all tests (standard)
pytest tests/ -v
# Run by category
pytest tests/unit/ -v # Unit tests
pytest tests/integration/ -v # Integration tests
pytest tests/system/ -v # System tests
# Run with coverage
pytest tests/ --cov=sparkforge --cov-report=html
# Activate environment
source activate_env.sh # Loads Python 3.8 + PySpark 3.2
# Verify environment
python scripts/test_python38_environment.py # Comprehensive environment check
# Code quality checks
make format # Format code with Black and isort
make lint # Run ruff and pylint
make type-check # Type checking with mypy
make security # Security scan with bandit
```
**Quality Metrics**:
- β
**1,441 tests passed** (100% pass rate)
- β
**83% test coverage** across all modules
- β
**100% type safety** with mypy compliance (43 source files)
- β
**Zero security vulnerabilities** (bandit clean)
- β
**Code formatting** compliant (Black + isort + ruff)
- β
**Python 3.8-3.11 compatible**
## π€ Contributing
We welcome contributions! Here's how to get started:
### Quick Start for Contributors
1. **Fork the repository**
2. **Clone your fork**: `git clone https://github.com/yourusername/sparkforge.git`
3. **Setup environment**: `bash setup.sh` or see [QUICKSTART.md](https://github.com/eddiethedean/sparkforge/blob/main/QUICKSTART.md)
4. **Activate environment**: `source activate_env.sh`
5. **Run tests**: `make test` (1,441 tests, 100% pass rate)
6. **Create a feature branch**: `git checkout -b feature/amazing-feature`
7. **Make your changes and add tests**
8. **Format code**: `make format`
9. **Submit a pull request**
### Development Guidelines
- Follow the existing code style (Black formatting + isort + ruff)
- Add tests for new features (aim for 90%+ coverage)
- Ensure type safety with mypy compliance
- Run security scan with bandit
- Update documentation as needed
- Ensure all tests pass: `make test`
- Python 3.8 required for development (as per project standards)
## π Performance & Benchmarks
| Metric | SparkForge | Raw Spark | Improvement |
|--------|------------|-----------|-------------|
| **Lines of Code** | 20 lines | 200+ lines | **90% reduction** |
| **Development Time** | 30 minutes | 4+ hours | **87% faster** |
| **Execution Speed** | Parallel (3-5x faster) | Sequential | **3-5x faster** |
| **Test Coverage** | 83% (1,400 tests) | Manual | **Comprehensive** |
| **Type Safety** | 100% mypy compliant | None | **Production-ready** |
| **Security** | Zero vulnerabilities | Manual | **Enterprise-grade** |
| **Error Handling** | Built-in + Early Validation | Manual | **Production-ready** |
| **Debugging** | Step-by-step | Complex | **Developer-friendly** |
| **Validation** | Automatic + Configurable | Manual | **Enterprise-grade** |
### Real-World Performance Example
```
Pipeline: 3 independent data sources β 3 transformations β 1 aggregation
Sequential Execution: Parallel Execution (SparkForge):
βββββββββββββββββββββ βββββββββββββββββββββββββββββββββ
Source A: 2s Group 1 (parallel): 2s
Source B: 2s ββ Source A: 2s β
Source C: 2s ββ Source B: 2s ββ All concurrent
Transform A: 3s ββ Source C: 2s β
Transform B: 3s Group 2 (parallel): 3s
Transform C: 3s ββ Transform A: 3s β
Aggregate: 1s ββ Transform B: 3s ββ All concurrent
βββββββββββββββββββββ ββ Transform C: 3s β
Total: 16s Group 3: 1s
ββ Aggregate: 1s
βββββββββββββββββββββββββββββββββ
Total: 6s (2.7x faster!)
```
## π What's New in v1.2.0
### π **NEW: Enhanced Logging with Rich Metrics**
- β
**Unified logging format** - Consistent timestamps, emojis, and formatting
- β
**Detailed metrics** - Rows processed, rows written, invalid counts, validation rates
- β
**Visual indicators** - π Starting, β
Completed, β Failed with clear status
- β
**Smart formatting** - Bronze shows "processed", Silver/Gold show "written"
- β
**Execution insights** - Duration tracking, parallel efficiency, group information
```
13:08:09 - PipelineRunner - INFO - π Starting BRONZE step: bronze_events
13:08:09 - PipelineRunner - INFO - β
Completed BRONZE step: bronze_events (0.51s, 1,000 rows processed, validation: 100.0%)
13:08:12 - PipelineRunner - INFO - π Starting SILVER step: silver_purchases
13:08:13 - PipelineRunner - INFO - β
Completed SILVER step: silver_purchases (0.81s, 350 rows processed, 4 invalid, validation: 98.9%)
```
### β‘ **Smart Parallel Execution (Enhanced)**
- β
**Automatic parallel execution** - Independent steps run concurrently (3-5x faster!)
- β
**Dependency-aware scheduling** - Automatically respects step dependencies
- β
**Thread-safe execution** - Built-in protection against race conditions
- β
**Real-time parallel logging** - See concurrent step execution in action
- β
**Performance metrics** - Track parallel efficiency and throughput
- β
**Zero configuration** - Enabled by default with sensible defaults (4 workers)
- β
**Highly configurable** - Adjust workers from 1 (sequential) to 16+ (high-performance)
### π― **Quality & Reliability**
- β
**100% type safety** - Complete mypy compliance across all 43 source files
- β
**Security hardened** - Zero vulnerabilities (bandit clean)
- β
**83% test coverage** - Comprehensive test suite with 1,441 tests
- β
**Code quality** - Black formatting + isort + ruff linting
- β
**Production ready** - All quality gates passed
### π§ **Enhanced Features**
- β
**Robust validation system** - Early error detection with clear messages
- β
**String rules support** - Human-readable validation rules
- β
**Comprehensive error handling** - Detailed error context and suggestions
- β
**Improved documentation** - Updated docstrings with examples
- β
**Mock Functions compatibility** - Enhanced mock-spark support for testing
- β
**Better test alignment** - Tests now reflect actual intended behavior
- β
**Optimized test runner** - Type checking only on source code, not tests
## π What Makes SparkForge Different?
### β
**Built for Production**
- **Enterprise-grade error handling** with detailed context
- **Configurable validation thresholds** for data quality
- **Multi-schema support** for complex environments
- **Performance monitoring** and optimization
- **100% type safety** with comprehensive mypy compliance
- **Security hardened** with zero vulnerabilities
- **83% test coverage** with 1,284 comprehensive tests
### β
**Developer-First Design**
- **Clean, readable API** that's easy to understand
- **Comprehensive documentation** with real-world examples
- **Step-by-step debugging** for complex pipelines
- **Auto-inference** reduces boilerplate by 70%
### β
**Modern Architecture**
- **Delta Lake integration** with ACID transactions
- **Medallion Architecture** best practices built-in
- **Schema evolution** and time travel support
- **Incremental processing** with watermarking
## π License
This project is licensed under the MIT License - see the [LICENSE](https://github.com/eddiethedean/sparkforge/blob/main/LICENSE) file for details.
## π Acknowledgments
- Built on top of [Apache Spark](https://spark.apache.org/) - the industry standard for big data processing
- Powered by [Delta Lake](https://delta.io/) - reliable data lakehouse storage
- Inspired by the Medallion Architecture pattern for data lakehouse design
- Thanks to the PySpark and Delta Lake communities for their excellent work
---
<div align="center">
**Made with β€οΈ for the data engineering community**
[β Star us on GitHub](https://github.com/eddiethedean/sparkforge) β’ [π Read the docs](https://sparkforge.readthedocs.io/) β’ [π Report issues](https://github.com/eddiethedean/sparkforge/issues) β’ [π¬ Join discussions](https://github.com/eddiethedean/sparkforge/discussions)
</div>
Raw data
{
"_id": null,
"home_page": null,
"name": "sparkforge",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": "Odos Matthews <odosmatthews@gmail.com>",
"keywords": "spark, databricks, pipeline, etl, data-engineering, data-lakehouse, bronze-silver-gold, delta-lake, big-data",
"author": null,
"author_email": "Odos Matthews <odosmatthews@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/77/7e/04e54951ba11b3a1c3cadc2f159ef471b22507879c564982b1dc7d04cd9f/sparkforge-1.3.3.tar.gz",
"platform": null,
"description": "# SparkForge \u26a1\n\n> **The modern data pipeline framework for Apache Spark & Delta Lake**\n\n[](https://pypi.org/project/sparkforge/)\n[](https://www.python.org/downloads/)\n[](https://opensource.org/licenses/MIT)\n[](https://sparkforge.readthedocs.io/)\n[](https://github.com/eddiethedean/sparkforge)\n[](https://github.com/eddiethedean/sparkforge)\n[](https://github.com/eddiethedean/sparkforge)\n[](https://github.com/eddiethedean/sparkforge/actions)\n\n**SparkForge** is a production-ready data pipeline framework that transforms complex Spark + Delta Lake development into clean, maintainable code. Built on the proven Medallion Architecture (Bronze \u2192 Silver \u2192 Gold), it eliminates boilerplate while providing enterprise-grade features.\n\n## \u2728 Why SparkForge?\n\n| **Before SparkForge** | **With SparkForge** |\n|----------------------|-------------------|\n| 200+ lines of complex Spark code | 20 lines of clean, readable code |\n| Manual dependency management | Automatic inference & validation |\n| Scattered validation logic | Centralized, configurable rules |\n| Hard-to-debug pipelines | Step-by-step execution & debugging |\n| No built-in error handling | Comprehensive error management |\n| Manual schema management | Multi-schema support out-of-the-box |\n\n## \ud83d\ude80 Quick Start\n\n### Installation\n\n**With PySpark (for production):**\n```bash\npip install sparkforge[pyspark]\n```\n\n**With mock-spark (for testing/development):**\n```bash\npip install sparkforge[mock]\n```\n\n**For PySpark compatibility testing:**\n```bash\npip install sparkforge[compat-test]\n```\n\n**Note:** SparkForge now supports both PySpark and mock-spark. Choose the installation method that best fits your use case. The framework automatically detects which engine is available.\n\n### Quick Start Example\n```python\nfrom sparkforge.pipeline.builder import PipelineBuilder\nfrom pyspark.sql import SparkSession, functions as F\n\n# Initialize Spark\nspark = SparkSession.builder.appName(\"QuickStart\").getOrCreate()\n\n# Sample data\ndata = [\n (\"user1\", \"prod1\", 2, 29.99),\n (\"user2\", \"prod2\", 1, 49.99),\n (\"user3\", \"prod1\", 3, 29.99),\n]\ndf = spark.createDataFrame(data, [\"user_id\", \"product_id\", \"quantity\", \"price\"])\n\n# Build and execute pipeline\npipeline = PipelineBuilder() \\\n .add_bronze_step(\"raw_orders\", df) \\\n .add_silver_step(\"clean_orders\", \"raw_orders\",\n validation_rules={\"quantity\": [\"positive\"]}) \\\n .add_gold_step(\"summary\", \"clean_orders\",\n transform_func=lambda df: df.groupBy(\"product_id\")\n .agg(F.sum(\"quantity\").alias(\"total\"))) \\\n .build()\n\nresults = pipeline.execute()\nresults[\"summary\"].show()\n```\n\n### Your First Pipeline\n```python\nfrom sparkforge import PipelineBuilder\nfrom pyspark.sql import SparkSession, functions as F\n\n# Initialize Spark\nspark = SparkSession.builder.appName(\"EcommerceAnalytics\").getOrCreate()\n\n# Sample e-commerce data\nevents_data = [\n (\"user_123\", \"purchase\", \"2024-01-15 10:30:00\", 99.99, \"electronics\"),\n (\"user_456\", \"view\", \"2024-01-15 11:15:00\", 49.99, \"clothing\"),\n (\"user_123\", \"add_to_cart\", \"2024-01-15 12:00:00\", 29.99, \"books\"),\n (\"user_789\", \"purchase\", \"2024-01-15 14:30:00\", 199.99, \"electronics\"),\n]\nsource_df = spark.createDataFrame(events_data, [\"user_id\", \"action\", \"timestamp\", \"price\", \"category\"])\n\n# Build the pipeline\nbuilder = PipelineBuilder(spark=spark, schema=\"analytics\")\n\n# Bronze: Raw event ingestion with validation (using string rules)\nbuilder.with_bronze_rules(\n name=\"events\",\n rules={\n \"user_id\": [\"not_null\"],\n \"action\": [\"not_null\"],\n \"price\": [\"gt\", 0] # Greater than 0\n },\n incremental_col=\"timestamp\"\n)\n\n# Silver: Clean and enrich the data\nbuilder.add_silver_transform(\n name=\"enriched_events\",\n source_bronze=\"events\",\n transform=lambda spark, df, silvers: (\n df.withColumn(\"event_date\", F.to_date(\"timestamp\"))\n .withColumn(\"hour\", F.hour(\"timestamp\"))\n .withColumn(\"is_purchase\", F.col(\"action\") == \"purchase\")\n .filter(F.col(\"user_id\").isNotNull())\n ),\n rules={\n \"user_id\": [\"not_null\"],\n \"event_date\": [\"not_null\"]\n },\n table_name=\"enriched_events\"\n)\n\n# Gold: Business analytics\nbuilder.add_gold_transform(\n name=\"daily_revenue\",\n source_silvers=[\"enriched_events\"],\n transform=lambda spark, silvers: (\n silvers[\"enriched_events\"]\n .filter(F.col(\"is_purchase\"))\n .groupBy(\"event_date\")\n .agg(\n F.count(\"*\").alias(\"total_purchases\"),\n F.sum(\"price\").alias(\"total_revenue\"),\n F.countDistinct(\"user_id\").alias(\"unique_customers\")\n )\n .orderBy(\"event_date\")\n ),\n rules={\n \"event_date\": [\"not_null\"],\n \"total_revenue\": [\"gte\", 0] # Greater than or equal to 0\n },\n table_name=\"daily_revenue\"\n)\n\n# Execute the pipeline\npipeline = builder.to_pipeline()\nresult = pipeline.run_initial_load(bronze_sources={\"events\": source_df})\n\nprint(f\"\u2705 Pipeline completed: {result.status}\")\nprint(f\"\ud83d\udcca Processed {result.metrics.total_rows_written} rows\")\n```\n\n## \u26a1 Smart Parallel Execution\n\nSparkForge automatically analyzes your pipeline dependencies and executes independent steps in parallel for maximum performance. **No configuration needed** - it just works!\n\n### How It Works\n\n```python\n# Your pipeline with multiple independent steps\nbuilder = PipelineBuilder(spark=spark, schema=\"analytics\")\n\n# These 3 bronze steps will run in parallel\nbuilder.with_bronze_rules(name=\"events_a\", rules={\"id\": [\"not_null\"]})\nbuilder.with_bronze_rules(name=\"events_b\", rules={\"id\": [\"not_null\"]})\nbuilder.with_bronze_rules(name=\"events_c\", rules={\"id\": [\"not_null\"]})\n\n# These 3 silver steps will also run in parallel (after bronze completes)\nbuilder.add_silver_transform(name=\"clean_a\", source_bronze=\"events_a\", ...)\nbuilder.add_silver_transform(name=\"clean_b\", source_bronze=\"events_b\", ...)\nbuilder.add_silver_transform(name=\"clean_c\", source_bronze=\"events_c\", ...)\n\n# Gold step runs after all silver steps complete\nbuilder.add_gold_transform(name=\"analytics\", source_silvers=[\"clean_a\", \"clean_b\", \"clean_c\"], ...)\n\npipeline = builder.to_pipeline()\nresult = pipeline.run_initial_load(bronze_sources={...})\n\n# Check parallel execution metrics\nprint(f\"\u26a1 Parallel efficiency: {result.parallel_efficiency:.1f}%\")\nprint(f\"\ud83d\udcca Execution groups: {result.execution_groups_count}\")\nprint(f\"\ud83d\ude80 Max parallelism: {result.max_group_size} concurrent steps\")\n```\n\n### Execution Flow\n\n```\nTimeline (with 3 independent steps, 2s each):\n\nSequential Execution (old): Parallel Execution (new):\n\u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2510\n\u2502Step A\u2502 2s \u2502Step A\u251c\u2500\u2500\u2510\n\u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2524 \u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2524 \u2502\n\u2502Step B\u2502 2s \u2502Step B\u251c\u2500\u2500\u2524 All 3 run\n\u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2524 \u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2524 \u2502 in parallel!\n\u2502Step C\u2502 2s \u2502Step C\u251c\u2500\u2500\u2518\n\u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2518\nTotal: 6s Total: 2s \u26a1\n\n 3x faster!\n```\n\n### Performance Configuration\n\n```python\nfrom sparkforge.models import PipelineConfig\n\n# Default: Parallel enabled with 4 workers (recommended)\nbuilder = PipelineBuilder(spark=spark, schema=\"analytics\")\n\n# High-performance: 16 workers for maximum throughput\nconfig = PipelineConfig.create_high_performance(schema=\"analytics\")\n\n# Conservative: Sequential execution (1 worker)\nconfig = PipelineConfig.create_conservative(schema=\"analytics\")\n\n# Custom configuration\nfrom sparkforge.models import ParallelConfig, ValidationThresholds\n\nconfig = PipelineConfig(\n schema=\"analytics\",\n thresholds=ValidationThresholds.create_default(),\n parallel=ParallelConfig(enabled=True, max_workers=8, timeout_secs=600),\n verbose=True\n)\n```\n\n### Key Benefits\n\n- \ud83d\ude80 **Automatic optimization** - No manual configuration needed\n- \u26a1 **3-5x faster** for pipelines with independent steps\n- \ud83e\udde0 **Dependency-aware** - Automatically respects step dependencies\n- \ud83d\udcca **Observable** - Detailed metrics show parallelization effectiveness\n- \ud83d\udd12 **Thread-safe** - Built-in protection against race conditions\n- \ud83c\udfaf **Zero code changes** - Works with existing pipelines\n\n## \ud83c\udfa8 String Rules - Human-Readable Validation\n\nSparkForge supports both PySpark expressions and human-readable string rules:\n\n```python\n# String rules (automatically converted to PySpark expressions)\nrules = {\n \"user_id\": [\"not_null\"], # F.col(\"user_id\").isNotNull()\n \"age\": [\"gt\", 0], # F.col(\"age\") > 0\n \"status\": [\"in\", [\"active\", \"inactive\"]], # F.col(\"status\").isin([\"active\", \"inactive\"])\n \"score\": [\"between\", 0, 100], # F.col(\"score\").between(0, 100)\n \"email\": [\"like\", \"%@%.%\"] # F.col(\"email\").like(\"%@%.%\")\n}\n\n# Or use PySpark expressions directly\nrules = {\n \"user_id\": [F.col(\"user_id\").isNotNull()],\n \"age\": [F.col(\"age\") > 0],\n \"status\": [F.col(\"status\").isin([\"active\", \"inactive\"])]\n}\n```\n\n**Supported String Rules:**\n- `\"not_null\"` \u2192 `F.col(\"column\").isNotNull()`\n- `\"gt\", value` \u2192 `F.col(\"column\") > value`\n- `\"gte\", value` \u2192 `F.col(\"column\") >= value`\n- `\"lt\", value` \u2192 `F.col(\"column\") < value`\n- `\"lte\", value` \u2192 `F.col(\"column\") <= value`\n- `\"eq\", value` \u2192 `F.col(\"column\") == value`\n- `\"in\", [values]` \u2192 `F.col(\"column\").isin(values)`\n- `\"between\", min, max` \u2192 `F.col(\"column\").between(min, max)`\n- `\"like\", pattern` \u2192 `F.col(\"column\").like(pattern)`\n\n## \ud83c\udfaf Core Features\n\n### \ud83c\udfd7\ufe0f **Medallion Architecture Made Simple**\n- **Bronze Layer**: Raw data ingestion with validation\n- **Silver Layer**: Cleaned, enriched, and transformed data\n- **Gold Layer**: Business-ready analytics and metrics\n- **Automatic dependency management** between layers\n\n### \u26a1 **Developer Experience**\n- **70% less boilerplate** compared to raw Spark\n- **Auto-inference** of data dependencies\n- **Step-by-step debugging** for complex pipelines\n- **Preset configurations** for dev/prod/test environments\n- **Comprehensive error handling** with actionable messages\n\n### \ud83d\udee1\ufe0f **Production Ready**\n- **Robust validation system** with early error detection\n- **Configurable validation thresholds** (Bronze: 90%, Silver: 95%, Gold: 98%)\n- **Delta Lake integration** with ACID transactions\n- **Multi-schema support** for enterprise environments\n- **Performance monitoring** and optimization\n- **Comprehensive logging** and audit trails\n- **83% test coverage** with 1,441 comprehensive tests\n- **100% type safety** with mypy compliance\n- **Security hardened** with zero security vulnerabilities\n\n### \ud83d\udd27 **Advanced Capabilities**\n- **Smart parallel execution** - Automatic dependency analysis and concurrent step execution (enabled by default)\n- **String rules support** - Human-readable validation rules (`\"not_null\"`, `\"gt\", 0`, `\"in\", [\"active\", \"inactive\"]`)\n- **Column filtering control** - choose what gets preserved\n- **Incremental processing** with watermarking\n- **Schema evolution** support\n- **Time travel** and data versioning\n- **Concurrent write handling**\n\n## \ud83d\udcda Examples & Use Cases\n\n### \ud83c\udfaf **Core Examples**\n- **[Hello World](https://github.com/eddiethedean/sparkforge/blob/main/examples/core/hello_world.py)** - 3-line pipeline introduction\n- **[Basic Pipeline](https://github.com/eddiethedean/sparkforge/blob/main/examples/core/basic_pipeline.py)** - Complete Bronze \u2192 Silver \u2192 Gold flow\n- **[Step-by-Step Debugging](https://github.com/eddiethedean/sparkforge/blob/main/examples/core/step_by_step_execution.py)** - Debug individual steps\n\n### \ud83d\ude80 **Advanced Features**\n- **[Auto-Inference](https://github.com/eddiethedean/sparkforge/blob/main/examples/advanced/auto_infer_source_bronze_simple.py)** - Automatic dependency detection\n- **[Multi-Schema Support](https://github.com/eddiethedean/sparkforge/blob/main/examples/advanced/multi_schema_pipeline.py)** - Cross-schema data flows\n- **[Column Filtering](https://github.com/eddiethedean/sparkforge/blob/main/examples/specialized/column_filtering_behavior.py)** - Control data preservation\n\n### \ud83c\udfe2 **Real-World Use Cases**\n- **[E-commerce Analytics](https://github.com/eddiethedean/sparkforge/blob/main/examples/usecases/ecommerce_analytics.py)** - Order processing, customer insights\n- **[IoT Sensor Data](https://github.com/eddiethedean/sparkforge/blob/main/examples/usecases/iot_sensor_pipeline.py)** - Real-time sensor processing\n- **[Business Intelligence](https://github.com/eddiethedean/sparkforge/blob/main/examples/usecases/step_by_step_debugging.py)** - KPI dashboards, reporting\n\n## \ud83d\udcca LogWriter - Pipeline Execution Tracking\n\nTrack and analyze your pipeline executions with the simplified LogWriter API:\n\n### Quick Example\n\n```python\nfrom sparkforge import PipelineBuilder, LogWriter\n\n# Build and run your pipeline\nbuilder = PipelineBuilder(spark, schema=\"analytics\")\n# ... add steps ...\npipeline = builder.to_pipeline()\nreport = pipeline.run_initial_load(bronze_sources={\"events\": df})\n\n# Initialize LogWriter (simple API - just schema and table name!)\nwriter = LogWriter(spark, schema=\"logs\", table_name=\"pipeline_execution\")\n\n# Create log table from first report\nwriter.create_table(report)\n\n# Append subsequent runs\nreport2 = pipeline.run_incremental(bronze_sources={\"events\": df2})\nwriter.append(report2)\n\n# Query your logs\nlogs = spark.table(\"logs.pipeline_execution\")\nlogs.show()\n```\n\n### Key Features\n\n- \u2705 **Simple initialization** - Just provide `schema` and `table_name`\n- \u2705 **Works with PipelineReport** - Direct integration with pipeline results\n- \u2705 **Easy methods** - `create_table()` and `append()` for intuitive workflow\n- \u2705 **Comprehensive metrics** - Tracks rows processed, durations, success rates\n- \u2705 **Detailed metadata** - Layer durations, parallel efficiency, warnings, recommendations\n- \u2705 **Emoji-rich output** - Visual feedback during execution (\ud83d\udcca\u2705\u274c)\n\n### What Gets Logged\n\nEach pipeline execution is logged with:\n- **Run information**: run_id, mode (initial/incremental), timestamps\n- **Execution metrics**: total steps, successful/failed counts, durations by layer\n- **Data metrics**: rows processed, rows written, validation rates\n- **Performance**: parallel efficiency, execution groups, max parallelism\n- **Status**: success/failure, error messages, warnings, recommendations\n\n### Example Log Query\n\n```python\n# Get recent pipeline runs\nrecent_runs = spark.sql(\"\"\"\n SELECT run_id, run_mode, success, rows_written, duration_secs\n FROM logs.pipeline_execution\n WHERE run_started_at >= current_date() - 7\n ORDER BY run_started_at DESC\n\"\"\")\n\n# Analyze performance trends\nperformance = spark.sql(\"\"\"\n SELECT \n DATE(run_started_at) as date,\n COUNT(*) as runs,\n AVG(duration_secs) as avg_duration,\n SUM(rows_written) as total_rows\n FROM logs.pipeline_execution\n WHERE success = true\n GROUP BY DATE(run_started_at)\n ORDER BY date DESC\n\"\"\")\n```\n\nSee **[examples/specialized/logwriter_simple_example.py](https://github.com/eddiethedean/sparkforge/blob/main/examples/specialized/logwriter_simple_example.py)** for a complete working example.\n\n## \ud83d\udee0\ufe0f Installation & Setup\n\n### Prerequisites\n- **Python 3.8+** (tested with 3.8, 3.9, 3.10, 3.11)\n- **Java 8+** (for PySpark)\n- **PySpark 3.2.4+**\n- **Delta Lake 1.2.0+**\n\n### Quick Install\n```bash\n# Install from PyPI\npip install sparkforge\n\n# Verify installation\npython -c \"import sparkforge; print(f'SparkForge {sparkforge.__version__} installed!')\"\n```\n\n### Development Install\n```bash\n# Clone the repository\ngit clone https://github.com/eddiethedean/sparkforge.git\ncd sparkforge\n\n# Setup Python 3.8 environment with PySpark 3.2\npython3.8 -m venv venv38\nsource venv38/bin/activate\npip install --upgrade pip\n\n# Install with all dependencies\npip install -e \".[dev,test,docs]\"\n\n# Verify installation\npython test_environment.py\n```\n\n**Quick Setup Script** (Recommended):\n```bash\nbash setup.sh # Automated setup for development environment\n```\n\nSee [QUICKSTART.md](https://github.com/eddiethedean/sparkforge/blob/main/QUICKSTART.md) for detailed setup instructions.\n\n## \ud83d\udcd6 Documentation\n\n### \ud83d\udcda **Complete Documentation**\n- **[\ud83d\udcd6 Full Documentation](https://sparkforge.readthedocs.io/)** - Comprehensive guides and API reference\n- **[\u26a1 5-Minute Quick Start](https://sparkforge.readthedocs.io/en/latest/quick_start_5_min.html)** - Get running fast\n- **[\ud83c\udfaf User Guide](https://sparkforge.readthedocs.io/en/latest/user_guide.html)** - Complete feature walkthrough\n- **[\ud83d\udd27 API Reference](https://sparkforge.readthedocs.io/en/latest/api_reference.html)** - Detailed API documentation\n\n### \ud83c\udfaf **Use Case Guides**\n- **[\ud83d\uded2 E-commerce Analytics](https://sparkforge.readthedocs.io/en/latest/usecase_ecommerce.html)** - Order processing, customer analytics\n- **[\ud83d\udce1 IoT Data Processing](https://sparkforge.readthedocs.io/en/latest/usecase_iot.html)** - Sensor data, anomaly detection\n- **[\ud83d\udcca Business Intelligence](https://sparkforge.readthedocs.io/en/latest/usecase_bi.html)** - Dashboards, KPIs, reporting\n\n## \ud83e\uddea Testing & Quality\n\nSparkForge includes a comprehensive test suite with **1,441 tests** covering all functionality:\n\n```bash\n# Run all tests with coverage and type checking (recommended)\nmake test\n\n# Run all tests (standard)\npytest tests/ -v\n\n# Run by category\npytest tests/unit/ -v # Unit tests\npytest tests/integration/ -v # Integration tests\npytest tests/system/ -v # System tests\n\n# Run with coverage\npytest tests/ --cov=sparkforge --cov-report=html\n\n# Activate environment\nsource activate_env.sh # Loads Python 3.8 + PySpark 3.2\n\n# Verify environment\npython scripts/test_python38_environment.py # Comprehensive environment check\n\n# Code quality checks\nmake format # Format code with Black and isort\nmake lint # Run ruff and pylint\nmake type-check # Type checking with mypy\nmake security # Security scan with bandit\n```\n\n**Quality Metrics**:\n- \u2705 **1,441 tests passed** (100% pass rate)\n- \u2705 **83% test coverage** across all modules\n- \u2705 **100% type safety** with mypy compliance (43 source files)\n- \u2705 **Zero security vulnerabilities** (bandit clean)\n- \u2705 **Code formatting** compliant (Black + isort + ruff)\n- \u2705 **Python 3.8-3.11 compatible**\n\n## \ud83e\udd1d Contributing\n\nWe welcome contributions! Here's how to get started:\n\n### Quick Start for Contributors\n1. **Fork the repository**\n2. **Clone your fork**: `git clone https://github.com/yourusername/sparkforge.git`\n3. **Setup environment**: `bash setup.sh` or see [QUICKSTART.md](https://github.com/eddiethedean/sparkforge/blob/main/QUICKSTART.md)\n4. **Activate environment**: `source activate_env.sh`\n5. **Run tests**: `make test` (1,441 tests, 100% pass rate)\n6. **Create a feature branch**: `git checkout -b feature/amazing-feature`\n7. **Make your changes and add tests**\n8. **Format code**: `make format`\n9. **Submit a pull request**\n\n### Development Guidelines\n- Follow the existing code style (Black formatting + isort + ruff)\n- Add tests for new features (aim for 90%+ coverage)\n- Ensure type safety with mypy compliance\n- Run security scan with bandit\n- Update documentation as needed\n- Ensure all tests pass: `make test`\n- Python 3.8 required for development (as per project standards)\n\n## \ud83d\udcca Performance & Benchmarks\n\n| Metric | SparkForge | Raw Spark | Improvement |\n|--------|------------|-----------|-------------|\n| **Lines of Code** | 20 lines | 200+ lines | **90% reduction** |\n| **Development Time** | 30 minutes | 4+ hours | **87% faster** |\n| **Execution Speed** | Parallel (3-5x faster) | Sequential | **3-5x faster** |\n| **Test Coverage** | 83% (1,400 tests) | Manual | **Comprehensive** |\n| **Type Safety** | 100% mypy compliant | None | **Production-ready** |\n| **Security** | Zero vulnerabilities | Manual | **Enterprise-grade** |\n| **Error Handling** | Built-in + Early Validation | Manual | **Production-ready** |\n| **Debugging** | Step-by-step | Complex | **Developer-friendly** |\n| **Validation** | Automatic + Configurable | Manual | **Enterprise-grade** |\n\n### Real-World Performance Example\n\n```\nPipeline: 3 independent data sources \u2192 3 transformations \u2192 1 aggregation\n\nSequential Execution: Parallel Execution (SparkForge):\n\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\nSource A: 2s Group 1 (parallel): 2s\nSource B: 2s \u251c\u2500 Source A: 2s \u2510\nSource C: 2s \u251c\u2500 Source B: 2s \u251c\u2500 All concurrent\nTransform A: 3s \u2514\u2500 Source C: 2s \u2518\nTransform B: 3s Group 2 (parallel): 3s\nTransform C: 3s \u251c\u2500 Transform A: 3s \u2510\nAggregate: 1s \u251c\u2500 Transform B: 3s \u251c\u2500 All concurrent\n\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 \u2514\u2500 Transform C: 3s \u2518\nTotal: 16s Group 3: 1s\n \u2514\u2500 Aggregate: 1s\n \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n Total: 6s (2.7x faster!)\n```\n\n## \ud83d\ude80 What's New in v1.2.0\n\n### \ud83d\udcca **NEW: Enhanced Logging with Rich Metrics**\n- \u2705 **Unified logging format** - Consistent timestamps, emojis, and formatting\n- \u2705 **Detailed metrics** - Rows processed, rows written, invalid counts, validation rates\n- \u2705 **Visual indicators** - \ud83d\ude80 Starting, \u2705 Completed, \u274c Failed with clear status\n- \u2705 **Smart formatting** - Bronze shows \"processed\", Silver/Gold show \"written\"\n- \u2705 **Execution insights** - Duration tracking, parallel efficiency, group information\n\n```\n13:08:09 - PipelineRunner - INFO - \ud83d\ude80 Starting BRONZE step: bronze_events\n13:08:09 - PipelineRunner - INFO - \u2705 Completed BRONZE step: bronze_events (0.51s, 1,000 rows processed, validation: 100.0%)\n13:08:12 - PipelineRunner - INFO - \ud83d\ude80 Starting SILVER step: silver_purchases\n13:08:13 - PipelineRunner - INFO - \u2705 Completed SILVER step: silver_purchases (0.81s, 350 rows processed, 4 invalid, validation: 98.9%)\n```\n\n### \u26a1 **Smart Parallel Execution (Enhanced)**\n- \u2705 **Automatic parallel execution** - Independent steps run concurrently (3-5x faster!)\n- \u2705 **Dependency-aware scheduling** - Automatically respects step dependencies\n- \u2705 **Thread-safe execution** - Built-in protection against race conditions\n- \u2705 **Real-time parallel logging** - See concurrent step execution in action\n- \u2705 **Performance metrics** - Track parallel efficiency and throughput\n- \u2705 **Zero configuration** - Enabled by default with sensible defaults (4 workers)\n- \u2705 **Highly configurable** - Adjust workers from 1 (sequential) to 16+ (high-performance)\n\n### \ud83c\udfaf **Quality & Reliability**\n- \u2705 **100% type safety** - Complete mypy compliance across all 43 source files\n- \u2705 **Security hardened** - Zero vulnerabilities (bandit clean)\n- \u2705 **83% test coverage** - Comprehensive test suite with 1,441 tests\n- \u2705 **Code quality** - Black formatting + isort + ruff linting\n- \u2705 **Production ready** - All quality gates passed\n\n### \ud83d\udd27 **Enhanced Features**\n- \u2705 **Robust validation system** - Early error detection with clear messages\n- \u2705 **String rules support** - Human-readable validation rules\n- \u2705 **Comprehensive error handling** - Detailed error context and suggestions\n- \u2705 **Improved documentation** - Updated docstrings with examples\n- \u2705 **Mock Functions compatibility** - Enhanced mock-spark support for testing\n- \u2705 **Better test alignment** - Tests now reflect actual intended behavior\n- \u2705 **Optimized test runner** - Type checking only on source code, not tests\n\n## \ud83c\udfc6 What Makes SparkForge Different?\n\n### \u2705 **Built for Production**\n- **Enterprise-grade error handling** with detailed context\n- **Configurable validation thresholds** for data quality\n- **Multi-schema support** for complex environments\n- **Performance monitoring** and optimization\n- **100% type safety** with comprehensive mypy compliance\n- **Security hardened** with zero vulnerabilities\n- **83% test coverage** with 1,284 comprehensive tests\n\n### \u2705 **Developer-First Design**\n- **Clean, readable API** that's easy to understand\n- **Comprehensive documentation** with real-world examples\n- **Step-by-step debugging** for complex pipelines\n- **Auto-inference** reduces boilerplate by 70%\n\n### \u2705 **Modern Architecture**\n- **Delta Lake integration** with ACID transactions\n- **Medallion Architecture** best practices built-in\n- **Schema evolution** and time travel support\n- **Incremental processing** with watermarking\n\n## \ud83d\udcdd License\n\nThis project is licensed under the MIT License - see the [LICENSE](https://github.com/eddiethedean/sparkforge/blob/main/LICENSE) file for details.\n\n## \ud83d\ude4f Acknowledgments\n\n- Built on top of [Apache Spark](https://spark.apache.org/) - the industry standard for big data processing\n- Powered by [Delta Lake](https://delta.io/) - reliable data lakehouse storage\n- Inspired by the Medallion Architecture pattern for data lakehouse design\n- Thanks to the PySpark and Delta Lake communities for their excellent work\n\n---\n\n<div align=\"center\">\n\n**Made with \u2764\ufe0f for the data engineering community**\n\n[\u2b50 Star us on GitHub](https://github.com/eddiethedean/sparkforge) \u2022 [\ud83d\udcd6 Read the docs](https://sparkforge.readthedocs.io/) \u2022 [\ud83d\udc1b Report issues](https://github.com/eddiethedean/sparkforge/issues) \u2022 [\ud83d\udcac Join discussions](https://github.com/eddiethedean/sparkforge/discussions)\n\n</div>\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "A simplified, production-ready data pipeline builder for Apache Spark and Delta Lake",
"version": "1.3.3",
"project_urls": {
"Bug Tracker": "https://github.com/eddiethedean/sparkforge/issues",
"Documentation": "https://sparkforge.readthedocs.io/",
"Homepage": "https://github.com/eddiethedean/sparkforge",
"Repository": "https://github.com/eddiethedean/sparkforge"
},
"split_keywords": [
"spark",
" databricks",
" pipeline",
" etl",
" data-engineering",
" data-lakehouse",
" bronze-silver-gold",
" delta-lake",
" big-data"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "b743c5360dad8886cb889a2e1d459e2d7a6aab5b359f9d229c91be7298ea1687",
"md5": "7d6c17aabd0f92a151893e5856c85f6b",
"sha256": "35843cbd45a5f1586205db5650cd9fec10b7a021d917d55d70a5e3ce66b8c7d6"
},
"downloads": -1,
"filename": "sparkforge-1.3.3-py3-none-any.whl",
"has_sig": false,
"md5_digest": "7d6c17aabd0f92a151893e5856c85f6b",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 108980,
"upload_time": "2025-10-20T18:50:28",
"upload_time_iso_8601": "2025-10-20T18:50:28.064907Z",
"url": "https://files.pythonhosted.org/packages/b7/43/c5360dad8886cb889a2e1d459e2d7a6aab5b359f9d229c91be7298ea1687/sparkforge-1.3.3-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "777e04e54951ba11b3a1c3cadc2f159ef471b22507879c564982b1dc7d04cd9f",
"md5": "ac12958b56493e7bda43776ba4039172",
"sha256": "cf745f7b2c1233dc4666b5db676b5e027cf34c017f32d5ab6b77df57c1685cd0"
},
"downloads": -1,
"filename": "sparkforge-1.3.3.tar.gz",
"has_sig": false,
"md5_digest": "ac12958b56493e7bda43776ba4039172",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 466041,
"upload_time": "2025-10-20T18:50:29",
"upload_time_iso_8601": "2025-10-20T18:50:29.652991Z",
"url": "https://files.pythonhosted.org/packages/77/7e/04e54951ba11b3a1c3cadc2f159ef471b22507879c564982b1dc7d04cd9f/sparkforge-1.3.3.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-10-20 18:50:29",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "eddiethedean",
"github_project": "sparkforge",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "sparkforge"
}