# DataGhost ๐ป
> **Time-Travel Debugger for Data Pipelines**
Debug your data pipelines like you debug your code. DataGhost captures complete execution snapshots, enables precise replay of historical runs, and provides rich comparison tools - all with zero configuration.
[](https://badge.fury.io/py/dataghost)
[](https://www.python.org/downloads/)
[](LICENSE)
[](https://pepy.tech/project/dataghost)
---
## ๐ Quick Start
### Installation
```bash
# Basic installation
pip install dataghost
# With dashboard support
pip install dataghost[dashboard]
# For Google Colab/Jupyter
pip install dataghost[colab]
```
### 30-Second Demo
```python
from ttd import snapshot
@snapshot(task_id="my_pipeline")
def process_data(data: list, multiplier: int = 2) -> dict:
processed = [x * multiplier for x in data]
return {
"processed_data": processed,
"count": len(processed),
"average": sum(processed) / len(processed)
}
# Run your function normally - snapshots are captured automatically
result = process_data([1, 2, 3, 4, 5], multiplier=3)
```
### View Your Data
```bash
# See all your pipeline runs
dataghost overview
# Launch interactive dashboard
dataghost dashboard
# For Google Colab (creates public URL)
dataghost dashboard --tunnel
```
---
## โจ Key Features
### ๐ฏ **Zero-Config Snapshots**
Just add `@snapshot` decorator - no setup required. Captures inputs, outputs, metadata, and execution context automatically.
### ๐ **Time-Travel Debugging**
Replay any historical execution with identical conditions. Perfect for debugging failures and testing changes.
### ๐ **Rich Comparisons**
Compare runs side-by-side with structured diffing. See exactly what changed between executions.
### ๐ **Cloud-Ready**
Works seamlessly in Google Colab, Jupyter notebooks, and remote environments with automatic tunnel support.
### ๐ฑ **Beautiful Dashboard**
Interactive web interface with real-time monitoring, performance analytics, and one-click operations.
### ๐๏ธ **Framework Integration**
First-class support for Apache Airflow, with more integrations coming soon.
---
## ๐ฎ Interactive Demo
Try DataGhost in your browser:
[](https://colab.research.google.com/github/dataghost/dataghost/blob/main/examples/colab_demo.ipynb)
---
## ๐ Complete Guide
### 1. Basic Usage
#### Capture Snapshots
```python
from ttd import snapshot
@snapshot(task_id="data_processing")
def transform_data(raw_data: list, config: dict) -> dict:
# Your data processing logic
filtered = [x for x in raw_data if x > config['threshold']]
return {
"filtered_count": len(filtered),
"data": filtered,
"metadata": {"threshold": config['threshold']}
}
# Every call is automatically captured
result = transform_data([1, 5, 10, 2, 8], {"threshold": 4})
```
#### Advanced Snapshot Options
```python
@snapshot(
task_id="advanced_task",
capture_env=True, # Capture environment variables
capture_system=True, # Capture system info
storage_backend=None # Use custom storage
)
def advanced_processing(data):
return process_complex_data(data)
```
### 2. Command Line Interface
#### Overview & Monitoring
```bash
# Comprehensive dashboard overview
dataghost overview
# List all captured snapshots
dataghost snapshot --list
# List snapshots for specific task
dataghost snapshot --task-id my_task
# Show all replayable tasks
dataghost tasks
```
#### Debugging & Replay
```bash
# Replay latest run of a task
dataghost replay my_task
# Replay specific run
dataghost replay my_task --run-id 20241201_143022
# Replay with validation
dataghost replay my_task --validate
```
#### Comparison & Analysis
```bash
# Compare latest two runs
dataghost diff my_task
# Compare specific runs
dataghost diff my_task --run-id1 run1 --run-id2 run2
# Output comparison as JSON
dataghost diff my_task --format json
```
### 3. Interactive Dashboard
#### Local Development
```bash
# Start dashboard (opens browser automatically)
dataghost dashboard
# Custom port and host
dataghost dashboard --port 3000 --host 0.0.0.0
# Run without opening browser
dataghost dashboard --no-browser
```
#### Cloud Environments
```bash
# Auto-detects Colab/Jupyter and creates public tunnel
dataghost dashboard --tunnel
# Use specific tunnel service
dataghost dashboard --tunnel --tunnel-service localtunnel
```
#### Dashboard Features
- **๐ Real-time Overview**: Live statistics and health metrics
- **๐ฏ Task Health Monitoring**: Success rates and performance trends
- **โก Recent Activity**: Latest pipeline executions with filtering
- **๐ Interactive Task Management**: Browse, replay, and compare runs
- **๐ One-click Operations**: Replay tasks directly from the UI
- **๐ Visual Diffs**: Side-by-side run comparisons
- **๐ Detailed Snapshots**: Drill down into execution details
- **๐ Performance Analytics**: Execution time trends and statistics
- **๐ฑ Mobile Responsive**: Works on all devices
### 4. Programmatic API
#### Replay Engine
```python
from ttd import ReplayEngine
engine = ReplayEngine()
# Replay latest run
result = engine.replay(task_id="my_task")
# Replay specific run
result = engine.replay(task_id="my_task", run_id="20241201_143022")
# Replay with custom validation
result = engine.replay(task_id="my_task", validate_output=True)
print(f"Replay successful: {result['success']}")
print(f"Original output: {result['original_output']}")
print(f"Replayed output: {result['replayed_output']}")
```
#### Diff Engine
```python
from ttd import DiffEngine
diff_engine = DiffEngine()
# Compare latest two runs
diff = diff_engine.diff_task_runs("my_task")
# Compare specific runs
diff = diff_engine.diff_task_runs("my_task", run_id1="run1", run_id2="run2")
# Generate human-readable report
report = diff_engine.generate_diff_report(diff, format="text")
print(report)
```
#### Custom Storage
```python
from ttd.storage import DuckDBStorageBackend
# Custom database location
storage = DuckDBStorageBackend(
db_path="my_pipeline_snapshots.db",
data_dir="./snapshot_data"
)
# Use with snapshots
@snapshot(task_id="custom_storage", storage_backend=storage)
def my_task(data):
return process_data(data)
```
---
## ๐ Google Colab & Jupyter
### Quick Setup
```python
# Install in Colab
!pip install dataghost[colab]
# Your DataGhost code
from ttd import snapshot
@snapshot(task_id="colab_analysis")
def analyze_data(dataset):
# Your analysis logic
return {"mean": sum(dataset) / len(dataset)}
# Run analysis
result = analyze_data([1, 2, 3, 4, 5])
```
### Launch Dashboard
```python
# Auto-detects environment and creates public tunnel
!dataghost dashboard --tunnel
```
**What happens:**
1. ๐ Detects Google Colab environment
2. ๐ Creates secure ngrok tunnel
3. ๐ฑ Generates public URL (e.g., `https://abc123.ngrok.io`)
4. ๐ Share URL with teammates for collaborative debugging
### Advanced Colab Usage
```python
# Programmatic setup
from ttd.dashboard.colab_utils import setup_colab_dashboard
public_url, success = setup_colab_dashboard(port=8080)
if success:
print(f"๐ Dashboard: {public_url}")
print("๐ก Share this URL with your team!")
```
---
## ๐๏ธ Framework Integrations
### Apache Airflow
```python
from ttd.integrations.airflow import DataGhostPythonOperator, create_datahost_dag
from datetime import datetime
# Create DataGhost-enabled DAG
dag = create_datahost_dag(
dag_id='my_etl_pipeline',
default_args={'owner': 'data-team'},
schedule_interval='@daily'
)
# Use DataGhost operators
extract_task = DataGhostPythonOperator(
task_id='extract_data',
python_callable=extract_data_function,
dag=dag
)
transform_task = DataGhostPythonOperator(
task_id='transform_data',
python_callable=transform_data_function,
dag=dag
)
# Set dependencies
extract_task >> transform_task
```
### Coming Soon
- ๐ฅ **Prefect Integration**
- ๐ **Dagster Integration**
- ๐ **Native Jupyter Support**
- ๐ง **VS Code Extension**
---
## ๐ฏ Use Cases
### ๐ Debug Pipeline Failures
```python
# When a pipeline fails, replay the exact conditions
from ttd import ReplayEngine
engine = ReplayEngine()
result = engine.replay(task_id="failed_task", run_id="failure_run_id")
if not result['success']:
print(f"Error: {result['error']}")
print(f"Inputs: {result['inputs']}")
print(f"Stack trace: {result['stack_trace']}")
```
### ๐ Monitor Performance Changes
```bash
# Compare performance between deployments
dataghost diff my_etl_task --run-id1 yesterday --run-id2 today
# See execution time changes, output differences, etc.
```
### ๐งช Test Changes Safely
```python
# Test new logic against historical data
historical_inputs = get_historical_inputs("my_task", "specific_run")
new_result = new_function(historical_inputs)
# Compare with historical output
diff_engine = DiffEngine()
diff = diff_engine.compare_outputs(historical_output, new_result)
```
### ๐ Data Quality Monitoring
```python
@snapshot(task_id="data_quality_check")
def check_data_quality(df):
return {
"row_count": len(df),
"null_count": df.isnull().sum().sum(),
"duplicate_count": df.duplicated().sum(),
"completeness": 1 - (df.isnull().sum().sum() / df.size)
}
# Track quality metrics over time
quality_result = check_data_quality(daily_data)
```
---
## ๐ง Configuration
### Environment Variables
```bash
export DATAGHOST_DB_PATH="./my_snapshots.db"
export DATAGHOST_DATA_DIR="./my_data"
export DATAGHOST_CAPTURE_ENV="true"
export DATAGHOST_CAPTURE_SYSTEM="true"
```
### Global Settings
```python
from ttd import set_storage_backend
from ttd.storage import DuckDBStorageBackend
# Set global storage backend
set_storage_backend(DuckDBStorageBackend("global.db"))
```
---
## ๐๏ธ Storage Backends
### DuckDB (Default)
```python
from ttd.storage import DuckDBStorageBackend
# Default configuration
storage = DuckDBStorageBackend()
# Custom configuration
storage = DuckDBStorageBackend(
db_path="custom_snapshots.db",
data_dir="./custom_data"
)
```
### S3 (Coming Soon)
```python
from ttd.storage import S3StorageBackend
storage = S3StorageBackend(
bucket="my-dataghost-bucket",
prefix="snapshots/",
region="us-west-2"
)
```
---
## ๐ Advanced Features
### Custom Snapshot Metadata
```python
@snapshot(task_id="custom_meta")
def process_with_metadata(data):
# Add custom metadata to snapshots
snapshot_metadata = {
"data_source": "production_db",
"processing_mode": "batch",
"quality_score": calculate_quality(data)
}
return {
"result": process_data(data),
"_metadata": snapshot_metadata
}
```
### Conditional Snapshots
```python
@snapshot(task_id="conditional", condition=lambda inputs: inputs[0] > 100)
def process_large_datasets(data):
# Only capture snapshots for large datasets
return expensive_processing(data)
```
### Performance Optimization
```python
@snapshot(
task_id="optimized",
compress_data=True, # Compress large outputs
sample_large_data=True, # Sample large inputs
max_snapshot_size="10MB" # Limit snapshot size
)
def memory_efficient_task(large_data):
return process_efficiently(large_data)
```
---
## ๐ ๏ธ Development & Contributing
### Development Setup
```bash
# Clone repository
git clone https://github.com/dataghost/dataghost.git
cd dataghost
# Install development dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Run linting
black .
isort .
flake8
```
### Running Examples
```bash
# Basic example
python examples/basic_example.py
# Airflow DAG example
python examples/airflow_dag.py
# Google Colab example
python examples/colab_example.py
```
### Contributing Guidelines
1. Fork the repository
2. Create a feature branch
3. Write tests for new functionality
4. Ensure all tests pass
5. Submit a pull request
---
## ๐บ๏ธ Roadmap
### โ
**v0.1.0 - Core Engine** (Current)
- [x] Snapshot decorator with metadata capture
- [x] DuckDB storage backend
- [x] CLI with rich commands
- [x] Replay engine
- [x] Diff engine
- [x] Interactive web dashboard
- [x] Google Colab support
### ๐ง **v0.2.0 - Enhanced Features** (In Progress)
- [ ] S3 storage backend
- [ ] Advanced diff algorithms
- [ ] Performance optimizations
- [ ] Extended Airflow integration
- [ ] Prefect integration
### ๐ **v0.3.0 - Ecosystem Integration**
- [ ] Dagster integration
- [ ] Native Jupyter support
- [ ] VS Code extension
- [ ] Slack/Teams notifications
### ๐จ **v0.4.0 - Advanced UI**
- [ ] Advanced dashboard features
- [ ] Custom dashboard themes
- [ ] Real-time collaboration
- [ ] Mobile app
---
## ๐ Performance
DataGhost is designed for minimal overhead:
- **Snapshot capture**: < 1ms overhead per function call
- **Storage**: Efficient compression and deduplication
- **Memory usage**: < 50MB for typical workloads
- **Dashboard**: Sub-second response times
---
## ๐ค Support & Community
### Getting Help
- ๐ **Documentation**: [GitHub Wiki](https://github.com/dataghost/dataghost/wiki)
- ๐ฌ **Discussions**: [GitHub Discussions](https://github.com/dataghost/dataghost/discussions)
- ๐ **Issues**: [GitHub Issues](https://github.com/dataghost/dataghost/issues)
- ๐ง **Email**: [2003kshah@gmail.com](mailto:2003kshah@gmail.com)
### Community
- โญ **Star us on GitHub**: [dataghost/dataghost](https://github.com/dataghost/dataghost)
- ๐ฆ **Follow updates**: [@dataghost](https://twitter.com/dataghost) (coming soon)
- ๐บ **YouTube tutorials**: [DataGhost Channel](https://youtube.com/dataghost) (coming soon)
---
## ๐ License
MIT License - see [LICENSE](LICENSE) file for details.
---
## ๐ Acknowledgments
- Built with โค๏ธ by [Krish Shah](https://github.com/2003kshah)
- Inspired by time-travel debugging concepts from software engineering
- Thanks to the Apache Airflow community for pipeline orchestration patterns
- Special thanks to the Python data engineering community
---
<div align="center">
**Happy Time-Travel Debugging! ๐ปโจ**
[Get Started](#-quick-start) โข [Documentation](https://github.com/dataghost/dataghost/wiki) โข [Examples](./examples/) โข [Contributing](#-development--contributing)
</div>
Raw data
{
"_id": null,
"home_page": "https://github.com/dataghost/dataghost",
"name": "dataghost",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": null,
"keywords": "debugging, data-pipelines, time-travel, airflow, data-engineering",
"author": "Krish Shah",
"author_email": "Krish Shah <2003kshah@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/16/88/404014d771dce886d2ce51ab502d6cba15ef0618454f1eb8e5e675a6b454/dataghost-0.1.1.tar.gz",
"platform": null,
"description": "# DataGhost \ud83d\udc7b\n\n> **Time-Travel Debugger for Data Pipelines**\n\nDebug your data pipelines like you debug your code. DataGhost captures complete execution snapshots, enables precise replay of historical runs, and provides rich comparison tools - all with zero configuration.\n\n[](https://badge.fury.io/py/dataghost)\n[](https://www.python.org/downloads/)\n[](LICENSE)\n[](https://pepy.tech/project/dataghost)\n\n---\n\n## \ud83d\ude80 Quick Start\n\n### Installation\n\n```bash\n# Basic installation\npip install dataghost\n\n# With dashboard support\npip install dataghost[dashboard]\n\n# For Google Colab/Jupyter\npip install dataghost[colab]\n```\n\n### 30-Second Demo\n\n```python\nfrom ttd import snapshot\n\n@snapshot(task_id=\"my_pipeline\")\ndef process_data(data: list, multiplier: int = 2) -> dict:\n processed = [x * multiplier for x in data]\n return {\n \"processed_data\": processed,\n \"count\": len(processed),\n \"average\": sum(processed) / len(processed)\n }\n\n# Run your function normally - snapshots are captured automatically\nresult = process_data([1, 2, 3, 4, 5], multiplier=3)\n```\n\n### View Your Data\n\n```bash\n# See all your pipeline runs\ndataghost overview\n\n# Launch interactive dashboard\ndataghost dashboard\n\n# For Google Colab (creates public URL)\ndataghost dashboard --tunnel\n```\n\n---\n\n## \u2728 Key Features\n\n### \ud83c\udfaf **Zero-Config Snapshots**\nJust add `@snapshot` decorator - no setup required. Captures inputs, outputs, metadata, and execution context automatically.\n\n### \ud83d\udd04 **Time-Travel Debugging**\nReplay any historical execution with identical conditions. Perfect for debugging failures and testing changes.\n\n### \ud83d\udcca **Rich Comparisons**\nCompare runs side-by-side with structured diffing. See exactly what changed between executions.\n\n### \ud83c\udf10 **Cloud-Ready**\nWorks seamlessly in Google Colab, Jupyter notebooks, and remote environments with automatic tunnel support.\n\n### \ud83d\udcf1 **Beautiful Dashboard**\nInteractive web interface with real-time monitoring, performance analytics, and one-click operations.\n\n### \ud83c\udfd7\ufe0f **Framework Integration**\nFirst-class support for Apache Airflow, with more integrations coming soon.\n\n---\n\n## \ud83c\udfae Interactive Demo\n\nTry DataGhost in your browser:\n\n[](https://colab.research.google.com/github/dataghost/dataghost/blob/main/examples/colab_demo.ipynb)\n\n---\n\n## \ud83d\udcd6 Complete Guide\n\n### 1. Basic Usage\n\n#### Capture Snapshots\n\n```python\nfrom ttd import snapshot\n\n@snapshot(task_id=\"data_processing\")\ndef transform_data(raw_data: list, config: dict) -> dict:\n # Your data processing logic\n filtered = [x for x in raw_data if x > config['threshold']]\n return {\n \"filtered_count\": len(filtered),\n \"data\": filtered,\n \"metadata\": {\"threshold\": config['threshold']}\n }\n\n# Every call is automatically captured\nresult = transform_data([1, 5, 10, 2, 8], {\"threshold\": 4})\n```\n\n#### Advanced Snapshot Options\n\n```python\n@snapshot(\n task_id=\"advanced_task\",\n capture_env=True, # Capture environment variables\n capture_system=True, # Capture system info\n storage_backend=None # Use custom storage\n)\ndef advanced_processing(data):\n return process_complex_data(data)\n```\n\n### 2. Command Line Interface\n\n#### Overview & Monitoring\n\n```bash\n# Comprehensive dashboard overview\ndataghost overview\n\n# List all captured snapshots\ndataghost snapshot --list\n\n# List snapshots for specific task\ndataghost snapshot --task-id my_task\n\n# Show all replayable tasks\ndataghost tasks\n```\n\n#### Debugging & Replay\n\n```bash\n# Replay latest run of a task\ndataghost replay my_task\n\n# Replay specific run\ndataghost replay my_task --run-id 20241201_143022\n\n# Replay with validation\ndataghost replay my_task --validate\n```\n\n#### Comparison & Analysis\n\n```bash\n# Compare latest two runs\ndataghost diff my_task\n\n# Compare specific runs\ndataghost diff my_task --run-id1 run1 --run-id2 run2\n\n# Output comparison as JSON\ndataghost diff my_task --format json\n```\n\n### 3. Interactive Dashboard\n\n#### Local Development\n\n```bash\n# Start dashboard (opens browser automatically)\ndataghost dashboard\n\n# Custom port and host\ndataghost dashboard --port 3000 --host 0.0.0.0\n\n# Run without opening browser\ndataghost dashboard --no-browser\n```\n\n#### Cloud Environments\n\n```bash\n# Auto-detects Colab/Jupyter and creates public tunnel\ndataghost dashboard --tunnel\n\n# Use specific tunnel service\ndataghost dashboard --tunnel --tunnel-service localtunnel\n```\n\n#### Dashboard Features\n\n- **\ud83d\udcca Real-time Overview**: Live statistics and health metrics\n- **\ud83c\udfaf Task Health Monitoring**: Success rates and performance trends \n- **\u26a1 Recent Activity**: Latest pipeline executions with filtering\n- **\ud83d\udccb Interactive Task Management**: Browse, replay, and compare runs\n- **\ud83d\udd04 One-click Operations**: Replay tasks directly from the UI\n- **\ud83d\udcca Visual Diffs**: Side-by-side run comparisons\n- **\ud83d\udd0d Detailed Snapshots**: Drill down into execution details\n- **\ud83d\udcc8 Performance Analytics**: Execution time trends and statistics\n- **\ud83d\udcf1 Mobile Responsive**: Works on all devices\n\n### 4. Programmatic API\n\n#### Replay Engine\n\n```python\nfrom ttd import ReplayEngine\n\nengine = ReplayEngine()\n\n# Replay latest run\nresult = engine.replay(task_id=\"my_task\")\n\n# Replay specific run\nresult = engine.replay(task_id=\"my_task\", run_id=\"20241201_143022\")\n\n# Replay with custom validation\nresult = engine.replay(task_id=\"my_task\", validate_output=True)\n\nprint(f\"Replay successful: {result['success']}\")\nprint(f\"Original output: {result['original_output']}\")\nprint(f\"Replayed output: {result['replayed_output']}\")\n```\n\n#### Diff Engine\n\n```python\nfrom ttd import DiffEngine\n\ndiff_engine = DiffEngine()\n\n# Compare latest two runs\ndiff = diff_engine.diff_task_runs(\"my_task\")\n\n# Compare specific runs\ndiff = diff_engine.diff_task_runs(\"my_task\", run_id1=\"run1\", run_id2=\"run2\")\n\n# Generate human-readable report\nreport = diff_engine.generate_diff_report(diff, format=\"text\")\nprint(report)\n```\n\n#### Custom Storage\n\n```python\nfrom ttd.storage import DuckDBStorageBackend\n\n# Custom database location\nstorage = DuckDBStorageBackend(\n db_path=\"my_pipeline_snapshots.db\",\n data_dir=\"./snapshot_data\"\n)\n\n# Use with snapshots\n@snapshot(task_id=\"custom_storage\", storage_backend=storage)\ndef my_task(data):\n return process_data(data)\n```\n\n---\n\n## \ud83c\udf10 Google Colab & Jupyter\n\n### Quick Setup\n\n```python\n# Install in Colab\n!pip install dataghost[colab]\n\n# Your DataGhost code\nfrom ttd import snapshot\n\n@snapshot(task_id=\"colab_analysis\")\ndef analyze_data(dataset):\n # Your analysis logic\n return {\"mean\": sum(dataset) / len(dataset)}\n\n# Run analysis\nresult = analyze_data([1, 2, 3, 4, 5])\n```\n\n### Launch Dashboard\n\n```python\n# Auto-detects environment and creates public tunnel\n!dataghost dashboard --tunnel\n```\n\n**What happens:**\n1. \ud83d\udd0d Detects Google Colab environment\n2. \ud83c\udf10 Creates secure ngrok tunnel \n3. \ud83d\udcf1 Generates public URL (e.g., `https://abc123.ngrok.io`)\n4. \ud83d\udd17 Share URL with teammates for collaborative debugging\n\n### Advanced Colab Usage\n\n```python\n# Programmatic setup\nfrom ttd.dashboard.colab_utils import setup_colab_dashboard\n\npublic_url, success = setup_colab_dashboard(port=8080)\n\nif success:\n print(f\"\ud83d\ude80 Dashboard: {public_url}\")\n print(\"\ud83d\udca1 Share this URL with your team!\")\n```\n\n---\n\n## \ud83c\udfd7\ufe0f Framework Integrations\n\n### Apache Airflow\n\n```python\nfrom ttd.integrations.airflow import DataGhostPythonOperator, create_datahost_dag\nfrom datetime import datetime\n\n# Create DataGhost-enabled DAG\ndag = create_datahost_dag(\n dag_id='my_etl_pipeline',\n default_args={'owner': 'data-team'},\n schedule_interval='@daily'\n)\n\n# Use DataGhost operators\nextract_task = DataGhostPythonOperator(\n task_id='extract_data',\n python_callable=extract_data_function,\n dag=dag\n)\n\ntransform_task = DataGhostPythonOperator(\n task_id='transform_data', \n python_callable=transform_data_function,\n dag=dag\n)\n\n# Set dependencies\nextract_task >> transform_task\n```\n\n### Coming Soon\n- \ud83d\udd25 **Prefect Integration**\n- \ud83d\ude80 **Dagster Integration** \n- \ud83d\udcd3 **Native Jupyter Support**\n- \ud83d\udd27 **VS Code Extension**\n\n---\n\n## \ud83c\udfaf Use Cases\n\n### \ud83d\udd0d Debug Pipeline Failures\n\n```python\n# When a pipeline fails, replay the exact conditions\nfrom ttd import ReplayEngine\n\nengine = ReplayEngine()\nresult = engine.replay(task_id=\"failed_task\", run_id=\"failure_run_id\")\n\nif not result['success']:\n print(f\"Error: {result['error']}\")\n print(f\"Inputs: {result['inputs']}\")\n print(f\"Stack trace: {result['stack_trace']}\")\n```\n\n### \ud83d\udcc8 Monitor Performance Changes\n\n```bash\n# Compare performance between deployments\ndataghost diff my_etl_task --run-id1 yesterday --run-id2 today\n\n# See execution time changes, output differences, etc.\n```\n\n### \ud83e\uddea Test Changes Safely\n\n```python\n# Test new logic against historical data\nhistorical_inputs = get_historical_inputs(\"my_task\", \"specific_run\")\nnew_result = new_function(historical_inputs)\n\n# Compare with historical output\ndiff_engine = DiffEngine()\ndiff = diff_engine.compare_outputs(historical_output, new_result)\n```\n\n### \ud83d\udcca Data Quality Monitoring\n\n```python\n@snapshot(task_id=\"data_quality_check\")\ndef check_data_quality(df):\n return {\n \"row_count\": len(df),\n \"null_count\": df.isnull().sum().sum(),\n \"duplicate_count\": df.duplicated().sum(),\n \"completeness\": 1 - (df.isnull().sum().sum() / df.size)\n }\n\n# Track quality metrics over time\nquality_result = check_data_quality(daily_data)\n```\n\n---\n\n## \ud83d\udd27 Configuration\n\n### Environment Variables\n\n```bash\nexport DATAGHOST_DB_PATH=\"./my_snapshots.db\"\nexport DATAGHOST_DATA_DIR=\"./my_data\"\nexport DATAGHOST_CAPTURE_ENV=\"true\"\nexport DATAGHOST_CAPTURE_SYSTEM=\"true\"\n```\n\n### Global Settings\n\n```python\nfrom ttd import set_storage_backend\nfrom ttd.storage import DuckDBStorageBackend\n\n# Set global storage backend\nset_storage_backend(DuckDBStorageBackend(\"global.db\"))\n```\n\n---\n\n## \ud83d\uddc4\ufe0f Storage Backends\n\n### DuckDB (Default)\n\n```python\nfrom ttd.storage import DuckDBStorageBackend\n\n# Default configuration\nstorage = DuckDBStorageBackend()\n\n# Custom configuration\nstorage = DuckDBStorageBackend(\n db_path=\"custom_snapshots.db\",\n data_dir=\"./custom_data\"\n)\n```\n\n### S3 (Coming Soon)\n\n```python\nfrom ttd.storage import S3StorageBackend\n\nstorage = S3StorageBackend(\n bucket=\"my-dataghost-bucket\",\n prefix=\"snapshots/\",\n region=\"us-west-2\"\n)\n```\n\n---\n\n## \ud83d\ude80 Advanced Features\n\n### Custom Snapshot Metadata\n\n```python\n@snapshot(task_id=\"custom_meta\")\ndef process_with_metadata(data):\n # Add custom metadata to snapshots\n snapshot_metadata = {\n \"data_source\": \"production_db\",\n \"processing_mode\": \"batch\",\n \"quality_score\": calculate_quality(data)\n }\n \n return {\n \"result\": process_data(data),\n \"_metadata\": snapshot_metadata\n }\n```\n\n### Conditional Snapshots\n\n```python\n@snapshot(task_id=\"conditional\", condition=lambda inputs: inputs[0] > 100)\ndef process_large_datasets(data):\n # Only capture snapshots for large datasets\n return expensive_processing(data)\n```\n\n### Performance Optimization\n\n```python\n@snapshot(\n task_id=\"optimized\",\n compress_data=True, # Compress large outputs\n sample_large_data=True, # Sample large inputs\n max_snapshot_size=\"10MB\" # Limit snapshot size\n)\ndef memory_efficient_task(large_data):\n return process_efficiently(large_data)\n```\n\n---\n\n## \ud83d\udee0\ufe0f Development & Contributing\n\n### Development Setup\n\n```bash\n# Clone repository\ngit clone https://github.com/dataghost/dataghost.git\ncd dataghost\n\n# Install development dependencies\npip install -e \".[dev]\"\n\n# Run tests\npytest\n\n# Run linting\nblack .\nisort .\nflake8\n```\n\n### Running Examples\n\n```bash\n# Basic example\npython examples/basic_example.py\n\n# Airflow DAG example\npython examples/airflow_dag.py\n\n# Google Colab example\npython examples/colab_example.py\n```\n\n### Contributing Guidelines\n\n1. Fork the repository\n2. Create a feature branch\n3. Write tests for new functionality\n4. Ensure all tests pass\n5. Submit a pull request\n\n---\n\n## \ud83d\uddfa\ufe0f Roadmap\n\n### \u2705 **v0.1.0 - Core Engine** (Current)\n- [x] Snapshot decorator with metadata capture\n- [x] DuckDB storage backend\n- [x] CLI with rich commands\n- [x] Replay engine\n- [x] Diff engine \n- [x] Interactive web dashboard\n- [x] Google Colab support\n\n### \ud83d\udea7 **v0.2.0 - Enhanced Features** (In Progress)\n- [ ] S3 storage backend\n- [ ] Advanced diff algorithms\n- [ ] Performance optimizations\n- [ ] Extended Airflow integration\n- [ ] Prefect integration\n\n### \ud83d\udccb **v0.3.0 - Ecosystem Integration**\n- [ ] Dagster integration\n- [ ] Native Jupyter support\n- [ ] VS Code extension\n- [ ] Slack/Teams notifications\n\n### \ud83c\udfa8 **v0.4.0 - Advanced UI**\n- [ ] Advanced dashboard features\n- [ ] Custom dashboard themes\n- [ ] Real-time collaboration\n- [ ] Mobile app\n\n---\n\n## \ud83d\udcca Performance\n\nDataGhost is designed for minimal overhead:\n\n- **Snapshot capture**: < 1ms overhead per function call\n- **Storage**: Efficient compression and deduplication\n- **Memory usage**: < 50MB for typical workloads\n- **Dashboard**: Sub-second response times\n\n---\n\n## \ud83e\udd1d Support & Community\n\n### Getting Help\n\n- \ud83d\udcda **Documentation**: [GitHub Wiki](https://github.com/dataghost/dataghost/wiki)\n- \ud83d\udcac **Discussions**: [GitHub Discussions](https://github.com/dataghost/dataghost/discussions)\n- \ud83d\udc1b **Issues**: [GitHub Issues](https://github.com/dataghost/dataghost/issues)\n- \ud83d\udce7 **Email**: [2003kshah@gmail.com](mailto:2003kshah@gmail.com)\n\n### Community\n\n- \u2b50 **Star us on GitHub**: [dataghost/dataghost](https://github.com/dataghost/dataghost)\n- \ud83d\udc26 **Follow updates**: [@dataghost](https://twitter.com/dataghost) (coming soon)\n- \ud83d\udcfa **YouTube tutorials**: [DataGhost Channel](https://youtube.com/dataghost) (coming soon)\n\n---\n\n## \ud83d\udcc4 License\n\nMIT License - see [LICENSE](LICENSE) file for details.\n\n---\n\n## \ud83d\ude4f Acknowledgments\n\n- Built with \u2764\ufe0f by [Krish Shah](https://github.com/2003kshah)\n- Inspired by time-travel debugging concepts from software engineering\n- Thanks to the Apache Airflow community for pipeline orchestration patterns\n- Special thanks to the Python data engineering community\n\n---\n\n<div align=\"center\">\n\n**Happy Time-Travel Debugging! \ud83d\udc7b\u2728**\n\n[Get Started](#-quick-start) \u2022 [Documentation](https://github.com/dataghost/dataghost/wiki) \u2022 [Examples](./examples/) \u2022 [Contributing](#-development--contributing)\n\n</div>\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Time-Travel Debugger for Data Pipelines",
"version": "0.1.1",
"project_urls": {
"Bug Tracker": "https://github.com/dataghost/dataghost/issues",
"Documentation": "https://github.com/dataghost/dataghost/docs",
"Homepage": "https://github.com/dataghost/dataghost",
"Repository": "https://github.com/dataghost/dataghost"
},
"split_keywords": [
"debugging",
" data-pipelines",
" time-travel",
" airflow",
" data-engineering"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "b161ff32f4be2004eaeeeee56b85115287bf8818c1ff37aabbbd4062d5096f1c",
"md5": "0579525e0481914c83073ddf8a4f7fc2",
"sha256": "1f760974598e8ab64061b50581c981a112c5598198f7d520340a8eaa7c58c220"
},
"downloads": -1,
"filename": "dataghost-0.1.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "0579525e0481914c83073ddf8a4f7fc2",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 207498,
"upload_time": "2025-07-09T03:00:48",
"upload_time_iso_8601": "2025-07-09T03:00:48.716411Z",
"url": "https://files.pythonhosted.org/packages/b1/61/ff32f4be2004eaeeeee56b85115287bf8818c1ff37aabbbd4062d5096f1c/dataghost-0.1.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "1688404014d771dce886d2ce51ab502d6cba15ef0618454f1eb8e5e675a6b454",
"md5": "82e7b761213c448f3902a12a2ed58345",
"sha256": "92f666447c17b6ae0e9a049e994e339db7059e8ac2c057074b5219b63f3289a4"
},
"downloads": -1,
"filename": "dataghost-0.1.1.tar.gz",
"has_sig": false,
"md5_digest": "82e7b761213c448f3902a12a2ed58345",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 216526,
"upload_time": "2025-07-09T03:00:50",
"upload_time_iso_8601": "2025-07-09T03:00:50.233106Z",
"url": "https://files.pythonhosted.org/packages/16/88/404014d771dce886d2ce51ab502d6cba15ef0618454f1eb8e5e675a6b454/dataghost-0.1.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-09 03:00:50",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "dataghost",
"github_project": "dataghost",
"github_not_found": true,
"lcname": "dataghost"
}