Name | relais JSON |
Version |
0.2.1
JSON |
| download |
home_page | None |
Summary | A practical tool for managing async pipelines. |
upload_time | 2025-09-03 22:35:25 |
maintainer | None |
docs_url | None |
author | None |
requires_python | >=3.11 |
license | Copyright © 2025 mattbit
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
keywords |
async
asyncio
concurrency
data-processing
functional
pipeline
streaming
|
VCS |
 |
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# relais
[](https://badge.fury.io/py/relais)
[](https://pypi.org/project/relais/)
[](https://pypi.org/project/relais/)
[](https://github.com/Giskard-AI/relais/actions/workflows/ci.yml)
[](https://codecov.io/gh/Giskard-AI/relais)
A high-performance async streaming pipeline library for Python.
**Key Features:**
- 🚀 **True Streaming**: Process data as it flows through the pipeline
- ⚡ **Directional Cancellation**: Early termination optimizations (e.g., `take(5)` stops upstream processing)
- 🔄 **Concurrent Processing**: All operations run concurrently with proper backpressure
- 🛡️ **Memory Efficient**: Bounded memory usage with configurable stream buffers
- 🎯 **Flexible Ordering**: Choose between ordered and unordered processing for optimal performance
**Perfect for:**
- LLM evaluation pipelines
- API data processing
- Real-time data transformation
- I/O-bound concurrent operations
# usage
```py
import relais as r
# Simple pipeline
pipeline = range(10) | r.Map(lambda x: x * 2) | r.Take(5)
result = await pipeline.collect() # [0, 2, 4, 6, 8]
# Streaming processing
async for item in (range(100) | r.Map(lambda x: x * 2) | r.Take(5)).stream():
print(item) # Prints results as they become available
```
## Installation
```bash
pip install relais
```
## Requirements
- Python 3.11+
## Quick Start
```py
import asyncio
import relais as r
async def main():
# Transform and filter data
result = await (
range(20)
| r.Map(lambda x: x * 2)
| r.Filter(lambda x: x > 10)
| r.Take(5)
).collect()
print(result) # [12, 14, 16, 18, 20]
# Stream processing with async operations
async def slow_square(x):
await asyncio.sleep(0.1) # Simulate I/O
return x * x
# Process items as they complete
async for item in (range(5) | r.Map(slow_square)).stream():
print(f"Completed: {item}")
asyncio.run(main())
```
# API Reference
## Core Operations
### Transform Operations
- `r.Map(fn)` - Apply function to each item (supports async functions)
- `r.Filter(fn)` - Filter items based on condition
- `r.FlatMap(fn)` - Flatten iterables returned by function
### Collection Operations
- `r.Take(n, ordered=False)` - Take first N items (with early cancellation)
- `r.Skip(n, ordered=False)` - Skip first N items
- `r.Distinct(key_fn=None)` - Remove duplicates
- `r.Sort(key_fn=None)` - Sort items (stateful operation)
- `r.Batch(size)` - Group items into batches
- `r.Reduce(fn, initial)` - Accumulate values
### Processing Modes
- **Unordered** (default): Maximum performance, items processed as available
- **Ordered**: Preserves input order, may be slower for some operations
**All operations support async functions and run concurrently by default.**
## Performance Features
### Directional Cancellation
```py
# Only processes first 5 items, cancels upstream automatically
result = await (large_data_source | r.expensive_operation() | r.Take(5)).collect()
```
### Memory Efficiency
```py
# Streams through millions of items with bounded memory
async for batch in (huge_dataset | r.Map(transform) | r.Batch(100)).stream():
process_batch(batch) # Constant memory usage
```
### Concurrent Processing
```py
# All async operations run concurrently
pipeline = (
data_source
| r.Map(async_api_call) # Multiple concurrent API calls
| r.Filter(validate) # Filters results as they arrive
| r.Take(10) # Stops processing after 10 valid results
)
```
## Pipeline Composition
Pipelines are built using the intuitive `|` operator for chaining operations:
### Basic Usage
```py
# Data source | operations | collection
result = await (range(5) | r.Map(lambda x: x * 2) | r.Filter(lambda x: x > 4)).collect()
# [6, 8]
```
### Runtime Input
```py
# Define pipeline without input data
pipeline = r.Map(lambda x: x * 2) | r.Take(3)
# Apply to different data sources
result1 = await pipeline.collect(range(10)) # [0, 2, 4]
result2 = await pipeline.collect([5, 6, 7, 8]) # [10, 12, 14]
```
### Streaming Results
```py
import asyncio
import random
async def slow_process(x):
await asyncio.sleep(random.uniform(0.1, 0.5))
return x * x
pipeline = range(10) | r.Map(slow_process) | r.Filter(lambda x: x % 2 == 0)
# Process results as they become available
async for result in pipeline.stream():
print(f"Got result: {result}")
# Results appear in completion order, not input order
```
### Error Handling
```py
from relais.errors import ErrorPolicy
# Fail fast (default) - stops on first error
pipeline = r.Pipeline(
[r.Map(might_fail), r.Filter(lambda x: x > 0)],
error_policy=ErrorPolicy.FAIL_FAST
)
# Collect errors for later inspection
pipeline = r.Pipeline(
[r.Map(might_fail), r.Take(10)],
error_policy=ErrorPolicy.COLLECT
)
combined = await pipeline.collect(data, error_policy=ErrorPolicy.COLLECT)
results = [x for x in combined if not isinstance(x, Exception)]
errors = [x for x in combined if isinstance(x, Exception)]
# Ignore errors and continue processing
pipeline = r.Pipeline(
[r.Map(might_fail), r.Take(10)],
error_policy=ErrorPolicy.IGNORE
)
```
### Advanced: Context Manager Usage
```py
# For fine-grained control over pipeline execution
async with await pipeline.open(data_source) as stream:
async for event in stream:
if isinstance(event, StreamItemEvent):
print(f"Item: {event.item}")
elif isinstance(event, StreamErrorEvent):
print(f"Error: {event.error}")
# Early termination
if some_condition:
break
# Pipeline automatically cleans up resources
```
## Architecture & Performance
### Streaming Architecture
Relais uses a true streaming architecture where:
- **Data flows through bounded queues** between pipeline steps
- **Operations run concurrently** - each step processes items as they arrive
- **Memory usage is bounded** - configurable queue sizes prevent memory explosions
- **Backpressure handling** - upstream producers slow down when downstream is busy
### Directional Cancellation
Optimizations flow backwards through the pipeline:
```py
# take(5) signals upstream to stop after 5 items
# This prevents processing millions of unnecessary items
huge_dataset | expensive_computation | r.Take(5)
```
### Memory Efficiency
- **Bounded queues**: Default 1000 items per stream (configurable)
- **Streaming processing**: Items are processed and released immediately
- **Resource cleanup**: Automatic cleanup via context managers
### Performance Characteristics
- **Best for**: I/O-bound operations with 100-100K items
- **Concurrent**: All async operations run in parallel
- **Memory bounded**: Constant memory usage regardless of input size
- **Early termination**: Operations like `take()` provide significant optimizations
## Use Cases
### LLM Evaluation Pipeline
```py
# Generate test cases → Run model → Evaluate results
test_cases | r.Map(run_llm_async) | r.Map(evaluate_response) | r.Take(100)
```
### API Data Processing
```py
# Fetch → Transform → Validate → Store
api_endpoints | r.Map(fetch_async) | r.Map(transform) | r.Filter(validate) | r.Batch(10)
```
### Real-time Stream Processing
```py
# Process events as they arrive
event_stream | r.Filter(important) | r.Map(enrich) | r.Batch(5)
```
## Support
- 📝 [Issues & Bug Reports](https://github.com/Giskard-AI/relais/issues)
- 💡 [Feature Requests](https://github.com/Giskard-AI/relais/discussions)
- 📚 [Documentation](https://github.com/Giskard-AI/relais/blob/main/README.md)
- 🤝 [Contributing](https://github.com/Giskard-AI/relais/blob/main/CONTRIBUTING.md)
Raw data
{
"_id": null,
"home_page": null,
"name": "relais",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.11",
"maintainer_email": null,
"keywords": "async, asyncio, concurrency, data-processing, functional, pipeline, streaming",
"author": null,
"author_email": "Matteo <me@mattbit.com>, Kevin Messiaen <kevind.messiaen@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/16/2b/d9bb9aa75f8571fd17babd76984a79a86cb4a733e37b57da216ed9d9418c/relais-0.2.1.tar.gz",
"platform": null,
"description": "# relais\n\n[](https://badge.fury.io/py/relais)\n[](https://pypi.org/project/relais/)\n[](https://pypi.org/project/relais/)\n[](https://github.com/Giskard-AI/relais/actions/workflows/ci.yml)\n[](https://codecov.io/gh/Giskard-AI/relais)\n\nA high-performance async streaming pipeline library for Python.\n\n**Key Features:**\n- \ud83d\ude80 **True Streaming**: Process data as it flows through the pipeline\n- \u26a1 **Directional Cancellation**: Early termination optimizations (e.g., `take(5)` stops upstream processing)\n- \ud83d\udd04 **Concurrent Processing**: All operations run concurrently with proper backpressure\n- \ud83d\udee1\ufe0f **Memory Efficient**: Bounded memory usage with configurable stream buffers\n- \ud83c\udfaf **Flexible Ordering**: Choose between ordered and unordered processing for optimal performance\n\n**Perfect for:**\n- LLM evaluation pipelines\n- API data processing\n- Real-time data transformation\n- I/O-bound concurrent operations\n\n# usage\n\n```py\nimport relais as r\n\n# Simple pipeline\npipeline = range(10) | r.Map(lambda x: x * 2) | r.Take(5)\nresult = await pipeline.collect() # [0, 2, 4, 6, 8]\n\n# Streaming processing\nasync for item in (range(100) | r.Map(lambda x: x * 2) | r.Take(5)).stream():\n print(item) # Prints results as they become available\n```\n\n## Installation\n\n```bash\npip install relais\n```\n\n## Requirements\n- Python 3.11+\n\n## Quick Start\n\n```py\nimport asyncio\nimport relais as r\n\nasync def main():\n # Transform and filter data\n result = await (\n range(20)\n | r.Map(lambda x: x * 2)\n | r.Filter(lambda x: x > 10)\n | r.Take(5)\n ).collect()\n print(result) # [12, 14, 16, 18, 20]\n\n # Stream processing with async operations\n async def slow_square(x):\n await asyncio.sleep(0.1) # Simulate I/O\n return x * x\n\n # Process items as they complete\n async for item in (range(5) | r.Map(slow_square)).stream():\n print(f\"Completed: {item}\")\n\nasyncio.run(main())\n```\n\n# API Reference\n\n## Core Operations\n\n### Transform Operations\n- `r.Map(fn)` - Apply function to each item (supports async functions)\n- `r.Filter(fn)` - Filter items based on condition\n- `r.FlatMap(fn)` - Flatten iterables returned by function\n\n### Collection Operations\n- `r.Take(n, ordered=False)` - Take first N items (with early cancellation)\n- `r.Skip(n, ordered=False)` - Skip first N items\n- `r.Distinct(key_fn=None)` - Remove duplicates\n- `r.Sort(key_fn=None)` - Sort items (stateful operation)\n- `r.Batch(size)` - Group items into batches\n- `r.Reduce(fn, initial)` - Accumulate values\n\n### Processing Modes\n- **Unordered** (default): Maximum performance, items processed as available\n- **Ordered**: Preserves input order, may be slower for some operations\n\n**All operations support async functions and run concurrently by default.**\n\n## Performance Features\n\n### Directional Cancellation\n```py\n# Only processes first 5 items, cancels upstream automatically\nresult = await (large_data_source | r.expensive_operation() | r.Take(5)).collect()\n```\n\n### Memory Efficiency\n```py\n# Streams through millions of items with bounded memory\nasync for batch in (huge_dataset | r.Map(transform) | r.Batch(100)).stream():\n process_batch(batch) # Constant memory usage\n```\n\n### Concurrent Processing\n```py\n# All async operations run concurrently\npipeline = (\n data_source\n | r.Map(async_api_call) # Multiple concurrent API calls\n | r.Filter(validate) # Filters results as they arrive\n | r.Take(10) # Stops processing after 10 valid results\n)\n```\n\n## Pipeline Composition\n\nPipelines are built using the intuitive `|` operator for chaining operations:\n\n### Basic Usage\n\n```py\n# Data source | operations | collection\nresult = await (range(5) | r.Map(lambda x: x * 2) | r.Filter(lambda x: x > 4)).collect()\n# [6, 8]\n```\n\n### Runtime Input\n\n```py\n# Define pipeline without input data\npipeline = r.Map(lambda x: x * 2) | r.Take(3)\n\n# Apply to different data sources\nresult1 = await pipeline.collect(range(10)) # [0, 2, 4]\nresult2 = await pipeline.collect([5, 6, 7, 8]) # [10, 12, 14]\n```\n\n### Streaming Results\n\n```py\nimport asyncio\nimport random\n\nasync def slow_process(x):\n await asyncio.sleep(random.uniform(0.1, 0.5))\n return x * x\n\npipeline = range(10) | r.Map(slow_process) | r.Filter(lambda x: x % 2 == 0)\n\n# Process results as they become available\nasync for result in pipeline.stream():\n print(f\"Got result: {result}\")\n # Results appear in completion order, not input order\n```\n\n### Error Handling\n\n```py\nfrom relais.errors import ErrorPolicy\n\n# Fail fast (default) - stops on first error\npipeline = r.Pipeline(\n [r.Map(might_fail), r.Filter(lambda x: x > 0)],\n error_policy=ErrorPolicy.FAIL_FAST\n)\n\n# Collect errors for later inspection\npipeline = r.Pipeline(\n [r.Map(might_fail), r.Take(10)],\n error_policy=ErrorPolicy.COLLECT\n)\ncombined = await pipeline.collect(data, error_policy=ErrorPolicy.COLLECT)\nresults = [x for x in combined if not isinstance(x, Exception)]\nerrors = [x for x in combined if isinstance(x, Exception)]\n\n# Ignore errors and continue processing\npipeline = r.Pipeline(\n [r.Map(might_fail), r.Take(10)],\n error_policy=ErrorPolicy.IGNORE\n)\n```\n\n### Advanced: Context Manager Usage\n\n```py\n# For fine-grained control over pipeline execution\nasync with await pipeline.open(data_source) as stream:\n async for event in stream:\n if isinstance(event, StreamItemEvent):\n print(f\"Item: {event.item}\")\n elif isinstance(event, StreamErrorEvent):\n print(f\"Error: {event.error}\")\n\n # Early termination\n if some_condition:\n break\n# Pipeline automatically cleans up resources\n```\n\n## Architecture & Performance\n\n### Streaming Architecture\n\nRelais uses a true streaming architecture where:\n- **Data flows through bounded queues** between pipeline steps\n- **Operations run concurrently** - each step processes items as they arrive\n- **Memory usage is bounded** - configurable queue sizes prevent memory explosions\n- **Backpressure handling** - upstream producers slow down when downstream is busy\n\n### Directional Cancellation\n\nOptimizations flow backwards through the pipeline:\n\n```py\n# take(5) signals upstream to stop after 5 items\n# This prevents processing millions of unnecessary items\nhuge_dataset | expensive_computation | r.Take(5)\n```\n\n### Memory Efficiency\n\n- **Bounded queues**: Default 1000 items per stream (configurable)\n- **Streaming processing**: Items are processed and released immediately\n- **Resource cleanup**: Automatic cleanup via context managers\n\n### Performance Characteristics\n\n- **Best for**: I/O-bound operations with 100-100K items\n- **Concurrent**: All async operations run in parallel\n- **Memory bounded**: Constant memory usage regardless of input size\n- **Early termination**: Operations like `take()` provide significant optimizations\n\n## Use Cases\n\n### LLM Evaluation Pipeline\n```py\n# Generate test cases \u2192 Run model \u2192 Evaluate results\ntest_cases | r.Map(run_llm_async) | r.Map(evaluate_response) | r.Take(100)\n```\n\n### API Data Processing\n```py\n# Fetch \u2192 Transform \u2192 Validate \u2192 Store\napi_endpoints | r.Map(fetch_async) | r.Map(transform) | r.Filter(validate) | r.Batch(10)\n```\n\n### Real-time Stream Processing\n```py\n# Process events as they arrive\nevent_stream | r.Filter(important) | r.Map(enrich) | r.Batch(5)\n```\n\n## Support\n- \ud83d\udcdd [Issues & Bug Reports](https://github.com/Giskard-AI/relais/issues)\n- \ud83d\udca1 [Feature Requests](https://github.com/Giskard-AI/relais/discussions)\n- \ud83d\udcda [Documentation](https://github.com/Giskard-AI/relais/blob/main/README.md)\n- \ud83e\udd1d [Contributing](https://github.com/Giskard-AI/relais/blob/main/CONTRIBUTING.md)\n",
"bugtrack_url": null,
"license": "Copyright \u00a9 2025 mattbit\n \n Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the \u201cSoftware\u201d), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:\n \n The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.\n \n THE SOFTWARE IS PROVIDED \u201cAS IS\u201d, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.",
"summary": "A practical tool for managing async pipelines.",
"version": "0.2.1",
"project_urls": {
"Changelog": "https://github.com/Giskard-AI/relais/blob/main/CHANGELOG.md",
"Homepage": "https://github.com/Giskard-AI/relais",
"Issues": "https://github.com/Giskard-AI/relais/issues",
"Repository": "https://github.com/Giskard-AI/relais"
},
"split_keywords": [
"async",
" asyncio",
" concurrency",
" data-processing",
" functional",
" pipeline",
" streaming"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "655010d14257b10ee34372cdf98e7142a7a2d5477436498c20f4dae64f4a45c4",
"md5": "44daece86e0fff2ddf948555662c8aea",
"sha256": "853b640a7886d44f40f3c4eb29739145d697b77e6a27e56af5151aa6fcfb871b"
},
"downloads": -1,
"filename": "relais-0.2.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "44daece86e0fff2ddf948555662c8aea",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.11",
"size": 36135,
"upload_time": "2025-09-03T22:35:24",
"upload_time_iso_8601": "2025-09-03T22:35:24.001596Z",
"url": "https://files.pythonhosted.org/packages/65/50/10d14257b10ee34372cdf98e7142a7a2d5477436498c20f4dae64f4a45c4/relais-0.2.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "162bd9bb9aa75f8571fd17babd76984a79a86cb4a733e37b57da216ed9d9418c",
"md5": "10505b936e187d195d273d226a4d963d",
"sha256": "2a25876cc46f282d9df168d52bed5c7e5f61143a4cd797a2eab17aa5fcf24af4"
},
"downloads": -1,
"filename": "relais-0.2.1.tar.gz",
"has_sig": false,
"md5_digest": "10505b936e187d195d273d226a4d963d",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.11",
"size": 128113,
"upload_time": "2025-09-03T22:35:25",
"upload_time_iso_8601": "2025-09-03T22:35:25.425523Z",
"url": "https://files.pythonhosted.org/packages/16/2b/d9bb9aa75f8571fd17babd76984a79a86cb4a733e37b57da216ed9d9418c/relais-0.2.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-09-03 22:35:25",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "Giskard-AI",
"github_project": "relais",
"github_not_found": true,
"lcname": "relais"
}