# Distilled
A data stream reduction middleware that intelligently reduces large multivariate data streams to representative subsets while maintaining proportional characteristics.
## Overview
Distilled uses spatial vector analysis and A/B testing strategies to continuously analyze streaming data and pass along a configurable percentage (default 10%) that accurately represents the full dataset's characteristics across a sliding time window.
### Key Features
- **Proportional Representation**: Maintains statistical accuracy of multivariate characteristics
- **Sliding Time Window**: Configurable time horizon (60-3600 seconds) for analysis
- **Extensible Design**: Object-oriented architecture with customizable grading functions
- **Generator/Coroutine Pattern**: Efficient streaming data processing
- **Real-time Processing**: Optimized for high-throughput data streams
## How It Works
1. **Data Ingestion**: Receives batches of raw data points
2. **Vector Evaluation**: Applies grading functions to extract characteristics
3. **Proportion Analysis**: Calculates current vs. sent data proportions
4. **Optimal Selection**: Uses vector analysis to select representative subset
5. **Time Horizon Management**: Maintains FIFO queues with automatic cleanup
## Architecture
```
Raw Data Stream → Grading Functions → Vector Analysis → Optimal Selection → Reduced Stream
↓ ↓ ↓
DataPoints Proportions Time Horizon
Comparison Management
```
## Installation
```bash
pip install -r requirements.txt
```
## Quick Start
```python
from distilled import DistilledProcessor, NumericGrader, CategoricalGrader
# Define how to evaluate your data characteristics
grading_functions = {
"age": NumericGrader("age", lambda person: float(person.age)),
"gender": CategoricalGrader("gender", ["male", "female", "other"],
lambda person: person.gender.lower()),
"income": NumericGrader("income", lambda person: person.income)
}
# Create processor
processor = DistilledProcessor(
grading_functions=grading_functions,
time_horizon_seconds=3600, # 1 hour window
reduction_percentage=0.1, # 10% pass-through
batch_size=100
)
# Process data batches
batch_data = get_your_data_batch() # Your data source
selected_points = processor.process_batch(batch_data)
# selected_points now contains ~10% of input that best represents full dataset
```
## Core Classes
### DistilledProcessor
Main processor class implementing the generator/coroutine pattern:
- **process_batch()**: Process a batch and return selected subset
- **get_current_stats()**: Get current proportion statistics
- **reset()**: Reset processor state
### DataPoint
Represents individual data points with metadata:
- `data`: Raw data payload
- `timestamp`: Processing timestamp
- `vector_values`: Evaluated characteristics
- `sent_previously`: Tracking flag
### Grading Functions
Define how to evaluate data characteristics:
- **NumericGrader**: Extracts numerical values
- **CategoricalGrader**: Classifies into predefined categories
- **LambdaGrader**: Quick wrapper for simple functions
## Selection Algorithm
The core algorithm works as follows:
1. **Gap Analysis**: Calculate representation gaps between current and sent data
2. **Point Scoring**: Score each candidate point's improvement potential
3. **Greedy Selection**: Select points that best minimize representation gaps
4. **Tie Breaking**: Prioritize smallest proportional buckets first
5. **Update Tracking**: Maintain sent data proportions for next iteration
## Configuration
### Time Horizon
Controls how long data is retained for analysis:
```python
processor = DistilledProcessor(
grading_functions=functions,
time_horizon_seconds=1800 # 30 minutes
)
```
### Reduction Percentage
Controls what percentage of data passes through:
```python
processor = DistilledProcessor(
grading_functions=functions,
reduction_percentage=0.05 # 5% pass-through
)
```
### Batch Size
Controls internal processing batch size:
```python
processor = DistilledProcessor(
grading_functions=functions,
batch_size=50 # Smaller batches
)
```
## Examples
See `examples/basic_usage.py` for a complete working example with sample data.
## Testing
Run tests with:
```bash
python -m pytest tests/
```
Or run individual test files:
```bash
python tests/test_basic.py
```
## Development Status
**Current Status**: Architecture and API design complete with method stubs.
**Next Steps**:
1. Implement grading function evaluation
2. Implement proportion calculation algorithms
3. Implement vector analysis and selection logic
4. Implement time horizon management
5. Add comprehensive testing
6. Performance optimization
## Contributing
This is an open source project. Contributions are welcome!
## License
MIT License - see LICENSE file for details.
---
*Distilled - Intelligent data stream reduction for the modern data pipeline.*
Raw data
{
"_id": null,
"home_page": "https://github.com/yourusername/distilled",
"name": "distilled",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": "Distilled Team <team@distilled.dev>",
"keywords": "data, stream, reduction, middleware, sampling, representation",
"author": "Distilled Team",
"author_email": "Distilled Team <team@distilled.dev>",
"download_url": "https://files.pythonhosted.org/packages/82/fb/a06beb52bf0410ce7df6ea1aeee443cfb58497775991d1c057c55eca3158/distilled-0.1.0.tar.gz",
"platform": null,
"description": "# Distilled\n\nA data stream reduction middleware that intelligently reduces large multivariate data streams to representative subsets while maintaining proportional characteristics.\n\n## Overview\n\nDistilled uses spatial vector analysis and A/B testing strategies to continuously analyze streaming data and pass along a configurable percentage (default 10%) that accurately represents the full dataset's characteristics across a sliding time window.\n\n### Key Features\n\n- **Proportional Representation**: Maintains statistical accuracy of multivariate characteristics\n- **Sliding Time Window**: Configurable time horizon (60-3600 seconds) for analysis\n- **Extensible Design**: Object-oriented architecture with customizable grading functions\n- **Generator/Coroutine Pattern**: Efficient streaming data processing\n- **Real-time Processing**: Optimized for high-throughput data streams\n\n## How It Works\n\n1. **Data Ingestion**: Receives batches of raw data points\n2. **Vector Evaluation**: Applies grading functions to extract characteristics\n3. **Proportion Analysis**: Calculates current vs. sent data proportions\n4. **Optimal Selection**: Uses vector analysis to select representative subset\n5. **Time Horizon Management**: Maintains FIFO queues with automatic cleanup\n\n## Architecture\n\n```\nRaw Data Stream \u2192 Grading Functions \u2192 Vector Analysis \u2192 Optimal Selection \u2192 Reduced Stream\n \u2193 \u2193 \u2193\n DataPoints Proportions Time Horizon\n Comparison Management\n```\n\n## Installation\n\n```bash\npip install -r requirements.txt\n```\n\n## Quick Start\n\n```python\nfrom distilled import DistilledProcessor, NumericGrader, CategoricalGrader\n\n# Define how to evaluate your data characteristics\ngrading_functions = {\n \"age\": NumericGrader(\"age\", lambda person: float(person.age)),\n \"gender\": CategoricalGrader(\"gender\", [\"male\", \"female\", \"other\"], \n lambda person: person.gender.lower()),\n \"income\": NumericGrader(\"income\", lambda person: person.income)\n}\n\n# Create processor\nprocessor = DistilledProcessor(\n grading_functions=grading_functions,\n time_horizon_seconds=3600, # 1 hour window\n reduction_percentage=0.1, # 10% pass-through\n batch_size=100\n)\n\n# Process data batches\nbatch_data = get_your_data_batch() # Your data source\nselected_points = processor.process_batch(batch_data)\n\n# selected_points now contains ~10% of input that best represents full dataset\n```\n\n## Core Classes\n\n### DistilledProcessor\n\nMain processor class implementing the generator/coroutine pattern:\n\n- **process_batch()**: Process a batch and return selected subset\n- **get_current_stats()**: Get current proportion statistics\n- **reset()**: Reset processor state\n\n### DataPoint\n\nRepresents individual data points with metadata:\n\n- `data`: Raw data payload\n- `timestamp`: Processing timestamp \n- `vector_values`: Evaluated characteristics\n- `sent_previously`: Tracking flag\n\n### Grading Functions\n\nDefine how to evaluate data characteristics:\n\n- **NumericGrader**: Extracts numerical values\n- **CategoricalGrader**: Classifies into predefined categories\n- **LambdaGrader**: Quick wrapper for simple functions\n\n## Selection Algorithm\n\nThe core algorithm works as follows:\n\n1. **Gap Analysis**: Calculate representation gaps between current and sent data\n2. **Point Scoring**: Score each candidate point's improvement potential\n3. **Greedy Selection**: Select points that best minimize representation gaps\n4. **Tie Breaking**: Prioritize smallest proportional buckets first\n5. **Update Tracking**: Maintain sent data proportions for next iteration\n\n## Configuration\n\n### Time Horizon\n\nControls how long data is retained for analysis:\n\n```python\nprocessor = DistilledProcessor(\n grading_functions=functions,\n time_horizon_seconds=1800 # 30 minutes\n)\n```\n\n### Reduction Percentage\n\nControls what percentage of data passes through:\n\n```python\nprocessor = DistilledProcessor(\n grading_functions=functions,\n reduction_percentage=0.05 # 5% pass-through\n)\n```\n\n### Batch Size\n\nControls internal processing batch size:\n\n```python\nprocessor = DistilledProcessor(\n grading_functions=functions,\n batch_size=50 # Smaller batches\n)\n```\n\n## Examples\n\nSee `examples/basic_usage.py` for a complete working example with sample data.\n\n## Testing\n\nRun tests with:\n\n```bash\npython -m pytest tests/\n```\n\nOr run individual test files:\n\n```bash\npython tests/test_basic.py\n```\n\n## Development Status\n\n**Current Status**: Architecture and API design complete with method stubs.\n\n**Next Steps**:\n1. Implement grading function evaluation\n2. Implement proportion calculation algorithms \n3. Implement vector analysis and selection logic\n4. Implement time horizon management \n5. Add comprehensive testing\n6. Performance optimization\n\n## Contributing\n\nThis is an open source project. Contributions are welcome! \n\n## License\n\nMIT License - see LICENSE file for details.\n\n---\n\n*Distilled - Intelligent data stream reduction for the modern data pipeline.* \n",
"bugtrack_url": null,
"license": "MIT",
"summary": "A data stream reduction middleware that maintains proportional representation",
"version": "0.1.0",
"project_urls": {
"Bug Tracker": "https://github.com/yourusername/distilled/issues",
"Changelog": "https://github.com/yourusername/distilled/blob/main/CHANGELOG.md",
"Documentation": "https://github.com/yourusername/distilled#readme",
"Homepage": "https://github.com/yourusername/distilled",
"Repository": "https://github.com/yourusername/distilled"
},
"split_keywords": [
"data",
" stream",
" reduction",
" middleware",
" sampling",
" representation"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "3f851858a947e2a89515847a1970a972d9fc77d7d0e23efeb173f26f2733d9bc",
"md5": "ec65607a73c2dd6517454fbc4c4b0e30",
"sha256": "0a374487e54de7fb6759cf1d7755ae15fe2aaffdc77cbd48427b7ac2deba076a"
},
"downloads": -1,
"filename": "distilled-0.1.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "ec65607a73c2dd6517454fbc4c4b0e30",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 15850,
"upload_time": "2025-08-29T20:37:00",
"upload_time_iso_8601": "2025-08-29T20:37:00.840347Z",
"url": "https://files.pythonhosted.org/packages/3f/85/1858a947e2a89515847a1970a972d9fc77d7d0e23efeb173f26f2733d9bc/distilled-0.1.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "82fba06beb52bf0410ce7df6ea1aeee443cfb58497775991d1c057c55eca3158",
"md5": "1519fcba36c783f88b6360afd4065226",
"sha256": "effe675a3313dd5fc1ce9d2ec29000a4950aa58f9beb403c91be04cdcb096606"
},
"downloads": -1,
"filename": "distilled-0.1.0.tar.gz",
"has_sig": false,
"md5_digest": "1519fcba36c783f88b6360afd4065226",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 29199,
"upload_time": "2025-08-29T20:37:02",
"upload_time_iso_8601": "2025-08-29T20:37:02.280065Z",
"url": "https://files.pythonhosted.org/packages/82/fb/a06beb52bf0410ce7df6ea1aeee443cfb58497775991d1c057c55eca3158/distilled-0.1.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-08-29 20:37:02",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "yourusername",
"github_project": "distilled",
"github_not_found": true,
"lcname": "distilled"
}