# DDR - Dynamic MapReduce Framework
A flexible framework for distributed data processing using MapReduce patterns.
## Installation
### Prerequisites
This project requires Python 3.13+ and uses conda for dependency management. We recommend using the provided `environment.yml` file to create a consistent development environment.
### Setting up the Conda Environment
The project includes an `environment.yml` file with the following dependencies:
```yaml
name: ddr
channels:
- conda-forge
dependencies:
- coffea=>2025.3.0
- fsspec-xrootd=>0.5.1
- ndcctools>=7.15.8
- python=>3.12
- rich=>13.9.4
- uproot=>5.6.0
- xrootd=>5.8.1
- setuptools<81
```
1. **Create the conda environment from the provided environment.yml file:**
```bash
conda env create -f environment.yml
```
2. **Activate the environment:**
```bash
conda activate ddr
```
3. **Verify the installation:**
```bash
python --version # Should show Python 3.13.2
conda list | grep -E "(coffea|ndcctools)" # Should show the installed packages
```
### From PyPI
```bash
pip install dynamic_data_reduction
```
### Installing from Source
Once you have the conda environment set up:
```bash
# Clone the repository
git clone https://github.com/cooperative-computing-lab/dynamic_data_reduction.git
cd dynamic_data_reduction
# Activate the conda environment (if not already active)
conda activate ddr
# Install the package in development mode
pip install -e .
```
## Quick Start
Minimal toy example to get started:
```python
from dynamic_data_reduction import DynamicDataReduction
import ndcctools.taskvine as vine
import getpass
# Simple data: process two datasets
data = {
"datasets": {
"numbers": {"values": [1, 2, 3, 4, 5]},
"more_numbers": {"values": [10, 20, 30]}
}
}
# Define functions
def preprocess(dataset_info, **kwargs):
for val in dataset_info["values"]:
yield (val, 1)
def postprocess(val, **kwargs):
return val # Just return the value
def processor(x):
return x * 2 # Double each number
def reducer(a, b):
return a + b # Sum the results
# Run
mgr = vine.Manager(port=[9123, 9129], name=f"{getpass.getuser()}-quick-start-ddr")
print(f"Manager started on port {mgr.port}")
ddr = DynamicDataReduction(mgr,
data=data,
source_preprocess=preprocess,
source_postprocess=postprocess,
processors=processor,
accumulator=reducer)
# Use local workers, condor, slurm, or sge for scale
workers = vine.Factory("local", manager=mgr)
workers.max_workers = 2
workers.min_workers = 0
workers.cores = 4
workers.memory = 2000
workers.disk = 8000
with workers:
result = ddr.compute()
print(f"Result: {result}") # Expected: (1+2+3+4+5)*2 + (10+20+30)*2 = 150
```
## Usage
- General use example: [examples/simple/simple-example.py](https://github.com/cooperative-computing-lab/dynamic_data_reduction/blob/main/examples/simple/simple-example.py)
- Using Coffea Processors Classes Directly: [examples/coffea_processor/example_with_preprocess.py](https://github.com/cooperative-computing-lab/dynamic_data_reduction/blob/main/examples/coffea_processor/example_with_preprocess.py)
- Coffea use in analysis: [examples/cortado/ddr_cortado.py](https://github.com/cooperative-computing-lab/dynamic_data_reduction/blob/main/examples/cortado/ddr_cortado.py)
## License
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
Raw data
{
"_id": null,
"home_page": null,
"name": "dynamic-data-reduction",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.11",
"maintainer_email": "Ben Tovar <btovar@nd.edu>",
"keywords": "mapreduce, data-processing, distributed-computing",
"author": null,
"author_email": "Ben Tovar <btovar@nd.edu>",
"download_url": "https://files.pythonhosted.org/packages/be/df/7d4796d2770dd2dd9adc365d830b5da1a50b504372abfad97cf000b10192/dynamic_data_reduction-2025.11.1.tar.gz",
"platform": null,
"description": "# DDR - Dynamic MapReduce Framework\n\nA flexible framework for distributed data processing using MapReduce patterns.\n\n## Installation\n\n### Prerequisites\n\nThis project requires Python 3.13+ and uses conda for dependency management. We recommend using the provided `environment.yml` file to create a consistent development environment.\n\n### Setting up the Conda Environment\n\nThe project includes an `environment.yml` file with the following dependencies:\n\n```yaml\nname: ddr\nchannels:\n - conda-forge\ndependencies:\n - coffea=>2025.3.0\n - fsspec-xrootd=>0.5.1\n - ndcctools>=7.15.8\n - python=>3.12\n - rich=>13.9.4\n - uproot=>5.6.0\n - xrootd=>5.8.1\n - setuptools<81\n```\n\n1. **Create the conda environment from the provided environment.yml file:**\n ```bash\n conda env create -f environment.yml\n ```\n\n2. **Activate the environment:**\n ```bash\n conda activate ddr\n ```\n\n3. **Verify the installation:**\n ```bash\n python --version # Should show Python 3.13.2\n conda list | grep -E \"(coffea|ndcctools)\" # Should show the installed packages\n ```\n\n### From PyPI\n```bash\npip install dynamic_data_reduction\n```\n\n### Installing from Source\n\nOnce you have the conda environment set up:\n\n```bash\n# Clone the repository\ngit clone https://github.com/cooperative-computing-lab/dynamic_data_reduction.git\ncd dynamic_data_reduction\n\n# Activate the conda environment (if not already active)\nconda activate ddr\n\n# Install the package in development mode\npip install -e .\n```\n\n\n## Quick Start\n\nMinimal toy example to get started:\n\n```python\nfrom dynamic_data_reduction import DynamicDataReduction\nimport ndcctools.taskvine as vine\nimport getpass\n\n# Simple data: process two datasets\ndata = {\n \"datasets\": {\n \"numbers\": {\"values\": [1, 2, 3, 4, 5]},\n \"more_numbers\": {\"values\": [10, 20, 30]}\n }\n}\n\n# Define functions\ndef preprocess(dataset_info, **kwargs):\n for val in dataset_info[\"values\"]:\n yield (val, 1)\n\ndef postprocess(val, **kwargs):\n return val # Just return the value\n\ndef processor(x):\n return x * 2 # Double each number\n\ndef reducer(a, b):\n return a + b # Sum the results\n\n# Run\nmgr = vine.Manager(port=[9123, 9129], name=f\"{getpass.getuser()}-quick-start-ddr\")\nprint(f\"Manager started on port {mgr.port}\")\nddr = DynamicDataReduction(mgr,\n data=data,\n source_preprocess=preprocess, \n source_postprocess=postprocess,\n processors=processor, \n accumulator=reducer)\n\n# Use local workers, condor, slurm, or sge for scale\nworkers = vine.Factory(\"local\", manager=mgr)\nworkers.max_workers = 2\nworkers.min_workers = 0\nworkers.cores = 4\nworkers.memory = 2000\nworkers.disk = 8000\nwith workers:\n result = ddr.compute()\n\nprint(f\"Result: {result}\") # Expected: (1+2+3+4+5)*2 + (10+20+30)*2 = 150\n```\n\n## Usage\n\n- General use example: [examples/simple/simple-example.py](https://github.com/cooperative-computing-lab/dynamic_data_reduction/blob/main/examples/simple/simple-example.py)\n- Using Coffea Processors Classes Directly: [examples/coffea_processor/example_with_preprocess.py](https://github.com/cooperative-computing-lab/dynamic_data_reduction/blob/main/examples/coffea_processor/example_with_preprocess.py)\n- Coffea use in analysis: [examples/cortado/ddr_cortado.py](https://github.com/cooperative-computing-lab/dynamic_data_reduction/blob/main/examples/cortado/ddr_cortado.py)\n\n\n## License\n\nThis project is licensed under the Apache License 2.0 - see the LICENSE file for details.\n",
"bugtrack_url": null,
"license": null,
"summary": "Dynamic MapReduce framework for data processing",
"version": "2025.11.1",
"project_urls": {
"Bug Tracker": "https://github.com/btovar/dynamic_data_reduction/issues",
"Documentation": "https://github.com/btovar/dynamic_data_reduction#readme",
"Homepage": "https://github.com/btovar/dynamic_data_reduction",
"Repository": "https://github.com/btovar/dynamic_data_reduction.git"
},
"split_keywords": [
"mapreduce",
" data-processing",
" distributed-computing"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "37a1550742d0d58656f4db2ce313e35a3e3a987dce37b0b9bcd030deb2e84a66",
"md5": "e7a930b0ff88b0d1c3c09ec417cb7bd1",
"sha256": "353d5bcdfcfd7ccd4b0b281f52a4c103a53ad81cefd0463246e2686caa09d50b"
},
"downloads": -1,
"filename": "dynamic_data_reduction-2025.11.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "e7a930b0ff88b0d1c3c09ec417cb7bd1",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.11",
"size": 24583,
"upload_time": "2025-11-04T14:39:56",
"upload_time_iso_8601": "2025-11-04T14:39:56.838166Z",
"url": "https://files.pythonhosted.org/packages/37/a1/550742d0d58656f4db2ce313e35a3e3a987dce37b0b9bcd030deb2e84a66/dynamic_data_reduction-2025.11.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "bedf7d4796d2770dd2dd9adc365d830b5da1a50b504372abfad97cf000b10192",
"md5": "4033ad3705cd685e85bd0c63fb41a008",
"sha256": "b614d4d91239c0646edaf2e88317fad3d07a36135ecd664c6812c808521717d9"
},
"downloads": -1,
"filename": "dynamic_data_reduction-2025.11.1.tar.gz",
"has_sig": false,
"md5_digest": "4033ad3705cd685e85bd0c63fb41a008",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.11",
"size": 27053,
"upload_time": "2025-11-04T14:39:57",
"upload_time_iso_8601": "2025-11-04T14:39:57.974895Z",
"url": "https://files.pythonhosted.org/packages/be/df/7d4796d2770dd2dd9adc365d830b5da1a50b504372abfad97cf000b10192/dynamic_data_reduction-2025.11.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-11-04 14:39:57",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "btovar",
"github_project": "dynamic_data_reduction",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "dynamic-data-reduction"
}