conveyor-streaming


Nameconveyor-streaming JSON
Version 1.2.1 PyPI version JSON
download
home_pagehttps://github.com/ju-bezdek/conveyor
SummaryA Python library for streamlining asynchronous streaming tasks and pipelines.
upload_time2025-08-10 22:19:21
maintainerNone
docs_urlNone
authorju-bezdek
requires_python<4.0,>=3.11
licenseMIT
keywords async streaming pipeline data processing
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Conveyor Streaming

[![Tests](https://github.com/ju-bezdek/conveyor/workflows/Tests/badge.svg)](https://github.com/ju-bezdek/conveyor/actions)
[![codecov](https://codecov.io/gh/ju-bezdek/conveyor/branch/main/graph/badge.svg)](https://codecov.io/gh/ju-bezdek/conveyor)
[![Python 3.11+](https://img.shields.io/badge/python-3.11+-blue.svg)](https://www.python.org/downloads/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

A Python library for efficient asynchronous pipeline processing with real-time streaming capabilities.

This library implements streaming of intermediate results between pipeline stages, allowing subsequent tasks to begin processing as soon as any individual item completes a stage, while still preserving original input ordering when needed by default (with option for streaming ASAP and wait for all to be completed).

### The fundamental challenge:
> ```python
> # This pattern is everywhere, but streaming is hard:
> # Especially if different chunks can take different time to process
> 
> Individual Processing → Batch Processing → Individual Processing
>        (fast)              (slower)           (fast)
> 
> 
> # Examples:
> # • Fetch RAG results → Re-Rank → Enrich ( → stream to UI)
> # • Compress → ML batch inference → Apply
> # • Validate → Batch database ops → Send notifications
> ```

## Key Benefits

- **Early Results Delivery**: Get initial results quickly without waiting for all items to finish processing
- **Order Preservation**: Stream results in original order while maintaining maximum parallelism
- **Optimal Resource Usage**: Process items concurrently through all pipeline stages
- **Flexible Consumption**: Stream results as they're ready or collect them all at completion

## Use Cases

- **API Response Streaming**: Show initial results to users immediately while processing continues
- **Batch Processing with Previews**: Process large datasets in batches but deliver completed batch results as they finish
- **Long-Running Pipeline Visibility**: Monitor progress of multi-stage transformations in real-time
- **Efficient Resource Management**: Maximize throughput for IO-bound workloads by ensuring all stages remain active

## Features

- Define atomic tasks that process single items or batches of items
- Chain tasks together to create powerful asynchronous pipelines
- Flexible output handling: consume results as an async stream or collect them all at once
- Robust error handling: Configure retry logic and error recovery strategies at the task level
- Designed for extensibility

## Installation

```bash
pip install conveyor-streaming 
```


## Quick Start

```python
import asyncio
from conveyor import single_task, batch_task

# Define some tasks
@single_task
async def multiply_by_two(x: int) -> int:
    print(f"Multiplying {x} by 2")
    await asyncio.sleep(0.01) # simulate io-bound work
    return x * 2

@batch_task(max_size=3)
async def sum_batch(batch: list[int]) -> int:
    print(f"Summing batch: {batch}")
    await asyncio.sleep(0.05) # simulate io-bound work
    s = sum(batch)
    print(f"Sum of batch {batch} is {s}")
    return s

@single_task
async def add_ten(x: int) -> int:
    print(f"Adding 10 to {x}")
    await asyncio.sleep(0.01)
    return x + 10

async def main():
    # Create a pipeline
    pipeline = multiply_by_two | sum_batch | add_ten

    data_source = [1, 2, 3, 4, 5, 6, 7]

    # Option 1: Process results as they come
    print("Streaming results as they come...")
    stream = pipeline(data_source)
    async for item in stream:
        print(f"Streamed item: {item}")
        
    # Option 2: Collect all results
    print("\nCollecting all results...")
    # Re-create stream for fresh iteration
    results = await pipeline(data_source).collect()
    print(f"Collected results: {results}")
    # Expected:
    # 1*2=2, 2*2=4, 3*2=6 -> sum_batch([2,4,6]) = 12 -> add_ten(12) = 22
    # 4*2=8, 5*2=10, 6*2=12 -> sum_batch([8,10,12]) = 30 -> add_ten(30) = 40
    # 7*2=14 -> sum_batch([14]) = 14 -> add_ten(14) = 24
    # Results: [22, 40, 24]

if __name__ == "__main__":
    asyncio.run(main())
```

## Stream Processing vs. Collecting Results

Conveyor offers three main approaches to consuming pipeline results, depending on your specific use case:

### Option 1A: Ordered Streaming (Default)

Use async iteration when you want to handle results as they become available while preserving the original input order:

```python
# Process results as they're available, in original input order
async for result in pipeline(data_source):
    print(f"Got a result in original order: {result}")
    # Process each result immediately
```

**🚀 Ordered Streaming Performance Benefits:**

Conveyor's ordered processing uses an **ordered queue approach** that enables true streaming while preserving order:

- **Early yielding**: Results are yielded as soon as they can be yielded in order
- **Maximum parallelism**: All items process concurrently, not sequentially  
- **Pipeline efficiency**: Subsequent stages can start processing early results immediately

**Timeline Example (Optimal Scenario):**
```
Input tasks with processing times: [0.1s, 0.2s, 0.3s, 0.4s, 0.5s, 2.0s, 0.7s, 0.8s]

Traditional approach (waits for all):
Time: 2.0s → ALL results yielded at once

Conveyor's streaming approach:
Time: 0.1s → yield result 1 ⚡ IMMEDIATE
Time: 0.2s → yield result 2 ⚡ IMMEDIATE  
Time: 0.3s → yield result 3 ⚡ IMMEDIATE
Time: 0.4s → yield result 4 ⚡ IMMEDIATE
Time: 0.5s → yield result 5 ⚡ IMMEDIATE
Time: 2.0s → yield results 6,7,8 (7,8 were buffered, waiting for 6)

Benefits: 🎯 First 5 results available 75% faster!
```

**Order Preservation with Outliers:**
- Fast tasks stream immediately when they're next in order
- Slow outliers block subsequent results to maintain order  
- Once outliers complete, buffered results yield immediately
- **Never performs worse** than traditional batch processing

### Option 1B: Unordered Streaming (as_completed)

When you care about processing results as soon as possible, regardless of their original order:

```python
# Process results in the order they complete, ignoring original input order
async for result in pipeline.as_completed(data_source):
    print(f"Got a completed result (fastest first): {result}")
    # Process results in completion order
```

Alternatively, you can create a stream with a specific execution mode:

```python
# Create a stream with as_completed execution mode
stream = pipeline.with_execution_mode("as_completed")(data_source)
async for result in stream:
    print(f"Got result in completion order: {result}")
```

**⚡ Unordered Streaming Benefits:**
- **Maximum responsiveness**: Get results immediately as they complete
- **No blocking**: Fast results are never delayed by slow outliers
- **Optimized for speed**: Ideal when result order doesn't matter
- **Real-time processing**: Perfect for displaying immediate progress

### Option 2: Collecting All Results

Use `collect()` when you need all results available at once:

```python
# Wait for all results to be processed
results = await pipeline(data_source).collect()
print(f"All results are ready: {results}")

# Or collect results in completion order
results = await pipeline.as_completed(data_source).collect()
print(f"All results collected in completion order: {results}")
```


**Pipeline Flow Diagram:**

```mermaid
graph TD
    A["Input Data: 1, 2, 3, 4, 5, 6, 7"] --> B(multiply_by_two - single_task);
    B --> C["Processed Items: 2, 4, 6, 8, 10, 12, 14"];
    C --> D(sum_batch - batch_task, max_size=3);
    D -- "batch [2,4,6]" --> E((12));
    D -- "batch [8,10,12]" --> F((30));
    D -- "batch [14]" --> G((14));
    E --> H(add_ten - single_task);
    F --> H;
    G --> H;
    H --> I["Final Results: 22, 40, 24"];
```

**Understanding Task Types:**

*   **`@single_task`**: Processes one item at a time.

*   **`@batch_task(max_size=N, min_size=M)`**: Collects items into a batch (up to `max_size`) and processes the batch. If the remaining items are fewer than `max_size` but at least `min_size`, they are processed as a final batch.


## Using Side Inputs with `with_inputs`

## Handling Multiple Input Sources

When building pipelines, you might need to incorporate data from multiple sources. Conveyor Streaming offers two approaches:

**Side Inputs Diagram :**

```mermaid
graph LR
    DataSource["data_source <br/>(primary pipeline arg)"] --> Input[Main Input Item]
    subgraph Pipeline with Side Input
        Input --> multiply_by_two_1
        multiply_by_two_1 --> add_value_from_side_input
        add_value_from_side_input --> multiply_by_two_2
    end
    multiply_by_two_2 --> OutputItem[Processed Stream]
    SideInputSrc["Side Input Source<br>(value, coroutine, AsyncStream)"] --> add_value_from_side_input
    
```

### 1. Direct Stream Integration in Tasks

You can create tasks that explicitly process data from multiple sources:

```python
import asyncio
from conveyor import single_task, batch_task, Pipeline

@single_task
async def multiply_by_two(x: int) -> int:
    return x * 2

@batch_task(max_size=3)
async def sum_batch(batch: list[int]) -> int:
    return sum(batch)

# A task that waits for another data source
@single_task
async def wait_for_other_source(x: int, other_stream) -> int:
    other_value = await anext(other_stream)
    print(f"Combined main value {x} with other value {other_value}")
    return x + other_value

async def generate_secondary_data():
    for i in range(10, 40, 10):
        yield i
        await asyncio.sleep(0.02)

async def main_multiple_sources():
    # Create the secondary stream
    secondary_stream = generate_secondary_data()
    
    # Create pipeline with the custom task
    pipeline = multiply_by_two | wait_for_other_source(secondary_stream) | multiply_by_two
    
    data = [1, 2, 3]
    results = await pipeline(data).collect()
    print(f"Results: {results}")
    # Expected flow:
    # 1. multiply_by_two: [2,4,6]
    # 3. wait_task combines with [10, 20, 30]: [12, 24, 36] 
    # 4. multiply_by_two: [24, 48, 72]
```

This approach uses a lambda to capture the secondary stream in the pipeline definition.

### 2. Declarative Side Inputs with `with_inputs`

Alternatively, you can use the more elegant `with_inputs` method to specify additional inputs in a declarative way, as shown in the next section.

Tasks can be configured to accept additional "side inputs" that are resolved when the task is executed. This is useful for incorporating data that isn't part of the main pipeline flow or needs to be fetched/calculated at the time of processing.

The `with_inputs` method on a task allows you to specify these side inputs. They can be regular values, coroutines, or `AsyncStream` instances (from which the first item will be taken).

```python
...

async def main_multiple_sources():
    # Create the secondary stream
    secondary_stream = generate_secondary_data()
    
    # Create pipeline with the custom task
    pipeline = multiply_by_two | wait_for_other_source.with_inputs(secondary_stream) | multiply_by_two
    
    data = [1, 2, 3]
    results = await pipeline(data).collect()
    print(f"Results: {results}")
    # Expected flow:
    # 1. multiply_by_two: [2,4,6]
    # 3. wait_task combines with [10, 20, 30]: [12, 24, 36] 
    # 4. multiply_by_two: [24, 48, 72]

```






## More Complex Pipeline Examples

Here are a few examples demonstrating more intricate pipeline constructions.

### 1. Pipeline with Filtering

Tasks can filter out items by returning `None`. These `None` values are not passed to subsequent tasks.

```python
import asyncio
from conveyor import single_task, batch_task, Pipeline

@single_task
async def initial_processor(x: int) -> int:
    print(f"Initial: Processing {x}")
    await asyncio.sleep(0.01)
    return x * 10

@single_task
async def filter_multiples_of_three(x: int) -> int | None:
    print(f"Filter: Checking {x}")
    await asyncio.sleep(0.01)
    if x % 3 == 0:
        print(f"Filter: {x} is a multiple of 3, keeping.")
        return x
    print(f"Filter: {x} is not a multiple of 3, discarding.")
    return None # This item will be filtered out

@batch_task(max_size=2)
async def batch_adder(batch: list[int]) -> int:
    print(f"BatchAdd: Summing batch {batch}")
    await asyncio.sleep(0.02)
    return sum(batch)

@single_task
async def final_touch(x: int) -> str:
    print(f"Final: Decorating {x}")
    await asyncio.sleep(0.01)
    return f"Result: {x}"

async def main_filtering_pipeline():
    filtering_pipeline = (
        initial_processor |
        filter_multiples_of_three |
        batch_adder |
        final_touch
    )

    data = [1, 2, 3, 4, 5, 6]
    # initial_processor: [10, 20, 30, 40, 50, 60]
    # filter_multiples_of_three: [None, None, 30, None, None, 60] -> effectively [30, 60]
    # batch_adder (max_size=2): batch_adder([30, 60]) -> 90
    # final_touch: "Result: 90"

    results = await filtering_pipeline(data).collect()
    print(f"\nFiltering Pipeline Results: {results}")

if __name__ == "__main__":
    # asyncio.run(main()) # Original quick start
    # asyncio.run(main_with_side_inputs())
    asyncio.run(main_filtering_pipeline())

```


### 2. Pipeline with Multiple Batching Stages

You can have multiple batch tasks in a pipeline, allowing for different grouping strategies at various points.

```python
import asyncio
from conveyor import single_task, batch_task, Pipeline

@batch_task(max_size=3, min_size=1)
async def initial_batch_summarizer(batch: list[int]) -> tuple[int, int]: # Returns (sum, count)
    print(f"InitialBatch: Processing {batch}")
    await asyncio.sleep(0.02)
    return (sum(batch), len(batch))

@single_task
async def process_summary(summary: tuple[int, int]) -> float: # Calculates average
    s, c = summary
    print(f"ProcessSummary: Sum={s}, Count={c}")
    await asyncio.sleep(0.01)
    return s / c if c > 0 else 0

@batch_task(max_size=2, min_size=1)
async def final_batch_aggregator(averages: list[float]) -> float: # Sum of averages
    print(f"FinalBatch: Aggregating averages {averages}")
    await asyncio.sleep(0.02)
    return sum(averages)

async def main_multi_batch_pipeline():
    multi_batch_pipeline = (
        initial_batch_summarizer |
        process_summary |
        final_batch_aggregator
    )

    data = [1, 2, 3, 4, 5, 6, 7]
    # initial_batch_summarizer (max_size=3):
    #   [1,2,3] -> (6,3)
    #   [4,5,6] -> (15,3)
    #   [7]     -> (7,1)
    # process_summary:
    #   (6,3) -> 2.0
    #   (15,3) -> 5.0
    #   (7,1) -> 7.0
    # final_batch_aggregator (max_size=2):
    #   [2.0, 5.0] -> 7.0
    #   [7.0]      -> 7.0
    # Results: [7.0, 7.0]

    results = await multi_batch_pipeline(data).collect()
    print(f"\nMulti-Batch Pipeline Results: {results}")

if __name__ == "__main__":
    # asyncio.run(main()) # Original quick start
    # asyncio.run(main_with_side_inputs())
    # asyncio.run(main_filtering_pipeline())
    asyncio.run(main_multi_batch_pipeline())
```

**Multi-Batch Pipeline Flow :**

```mermaid
graph TD
    A["Input: 1,2,3,4,5,6,7"] --> IB("initial_batch_summarizer max_size=3")
    IB -- "batch [1,2,3]" --> S1["(6,3)"]
    IB -- "batch [4,5,6]" --> S2["(15,3)"]
    IB -- "batch [7]" --> S3["(7,1)"]
    S1 --> PS1(process_summary)
    S2 --> PS2(process_summary)
    S3 --> PS3(process_summary)
    PS1 -- "avg 2.0" --> AV1((2.0))
    PS2 -- "avg 5.0" --> AV2((5.0))
    PS3 -- "avg 7.0" --> AV3((7.0))
    AV1 --> FB("final_batch_aggregator max_size=2")
    AV2 --> FB
    AV3 --> FB
    FB -- "batch [2.0, 5.0]" --> R1((7.0))
    FB -- "batch [7.0]" --> R2((7.0))
    R1 --> FR[Final Results]
    R2 --> FR
```

## Error Handling

Conveyor provides comprehensive error handling capabilities that can be configured directly in task decorators. You can control retry behavior and specify what should happen when errors occur.

### Error Handling Options

- **`on_error`**: What to do when an error occurs after all retry attempts
  - `"fail"` (default): Raise the error and stop the pipeline
  - `"skip_item"`: Skip the failing item and continue processing
  - `"skip_batch"`: For batch tasks, skip the entire batch if any item fails

- **Retry Configuration**: Independent retry logic with exponential backoff
  - `retry_attempts`: Number of retry attempts (default: 1, no retry)
  - `retry_delay`: Base delay between retries in seconds
  - `retry_exponential_backoff`: Whether to use exponential backoff (default: True)
  - `retry_max_delay`: Maximum delay between retries

- **Custom Error Handlers**: For complex error handling scenarios

### Basic Error Handling Example

```python
import asyncio
import random
from conveyor import single_task, batch_task, ErrorHandler

# Task that skips failing items
@single_task(on_error="skip_item")
async def unreliable_task(x: int) -> int:
    await asyncio.sleep(0.01)
    if random.random() < 0.3:  # 30% failure rate
        raise ValueError(f"Random failure processing {x}")
    return x * 2

# Task with retry logic
@single_task(
    retry_attempts=3,
    retry_delay=0.1,
    retry_exponential_backoff=True,
    on_error="skip_item"
)
async def task_with_retry(x: int) -> int:
    if random.random() < 0.5:  # 50% failure rate
        raise ValueError(f"Temporary failure processing {x}")
    return x + 100

# Batch task that skips entire batch on any failure
@batch_task(max_size=3, on_error="skip_batch")
async def sensitive_batch_task(batch: list[int]) -> int:
    if any(x < 0 for x in batch):
        raise ValueError("Negative numbers not allowed in batch")
    return sum(batch)

# Custom error handler for business logic
class BusinessErrorHandler(ErrorHandler):
    async def handle_error(self, error: Exception, item, task_name: str, attempt: int) -> tuple[bool, any]:
        if isinstance(error, ValueError) and "business rule" in str(error):
            print(f"Business rule violation in {task_name}: {error}")
            return True, -1  # Continue with sentinel value
        return False, None  # Re-raise other errors

@single_task(error_handler=BusinessErrorHandler())
async def business_task(x: int) -> int:
    if x % 7 == 0:
        raise ValueError("business rule: multiples of 7 not allowed")
    return x * 5

async def main_error_handling():
    data = [1, 2, -1, 3, 4, 5, 6, 7, 8]
    
    print("1. Skip failing items:")
    pipeline1 = unreliable_task | sensitive_batch_task
    results1 = await pipeline1(data).collect()
    print(f"Results: {results1}")
    
    print("\n2. Retry with backoff:")
    pipeline2 = task_with_retry
    results2 = await pipeline2([1, 2, 3]).collect()
    print(f"Results: {results2}")
    
    print("\n3. Custom error handler:")
    pipeline3 = business_task
    results3 = await pipeline3([1, 2, 7, 14, 21]).collect()
    print(f"Results: {results3}")

if __name__ == "__main__":
    asyncio.run(main_error_handling())
```


## Alternatives

While there are several libraries in adjacent spaces, none directly address the specific challenge of streaming results from mixed single/batch task pipelines. Here's how Conveyor compares to existing solutions:

### Workflow Orchestration Platforms

**Prefect, Airflow, Dagster**
- **What they do**: Orchestrate complex DAG-based workflows with scheduling, monitoring, and error recovery
- **Gap**: Designed for batch-oriented ETL workflows, not real-time streaming of intermediate results
- **Use case**: Great for scheduled data pipelines, but won't help with streaming progressive results to users

**When to use instead**: Large-scale ETL orchestration with complex dependencies and scheduling requirements

### Distributed Computing Frameworks

**Ray, Dask**
- **What they do**: Distributed computing with parallel task execution across clusters
- **Gap**: Much heavier weight, focused on distributed computing rather than pipeline streaming coordination
- **Use case**: Large-scale distributed processing, not single-machine streaming pipelines

**When to use instead**: When you need to scale across multiple machines or have very large datasets

### Task Queue Systems

**Celery, RQ, Dramatiq**
- **What they do**: Distributed task queues for background job processing
- **Gap**: Focused on job distribution and execution, not streaming intermediate results between pipeline stages
- **Use case**: Background job processing, not real-time result streaming

**When to use instead**: When you need distributed task execution across multiple workers

### Stream Processing Frameworks

**Apache Kafka Streams, Apache Beam**
- **What they do**: Event-driven stream processing for real-time data pipelines
- **Gap**: Different paradigm (event streaming), not async Python task pipelines
- **Use case**: Event-driven architectures with message queues

**When to use instead**: Event-driven systems with external message brokers

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/ju-bezdek/conveyor",
    "name": "conveyor-streaming",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.11",
    "maintainer_email": null,
    "keywords": "async, streaming, pipeline, data processing",
    "author": "ju-bezdek",
    "author_email": "juraj.bezdek@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/1e/4f/6c19e89e028ba029e4489805180dd0512171dc0a1a68e9b9499d7c9b95c0/conveyor_streaming-1.2.1.tar.gz",
    "platform": null,
    "description": "# Conveyor Streaming\n\n[![Tests](https://github.com/ju-bezdek/conveyor/workflows/Tests/badge.svg)](https://github.com/ju-bezdek/conveyor/actions)\n[![codecov](https://codecov.io/gh/ju-bezdek/conveyor/branch/main/graph/badge.svg)](https://codecov.io/gh/ju-bezdek/conveyor)\n[![Python 3.11+](https://img.shields.io/badge/python-3.11+-blue.svg)](https://www.python.org/downloads/)\n[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)\n\nA Python library for efficient asynchronous pipeline processing with real-time streaming capabilities.\n\nThis library implements streaming of intermediate results between pipeline stages, allowing subsequent tasks to begin processing as soon as any individual item completes a stage, while still preserving original input ordering when needed by default (with option for streaming ASAP and wait for all to be completed).\n\n### The fundamental challenge:\n> ```python\n> # This pattern is everywhere, but streaming is hard:\n> # Especially if different chunks can take different time to process\n> \n> Individual Processing \u2192 Batch Processing \u2192 Individual Processing\n>        (fast)              (slower)           (fast)\n> \n> \n> # Examples:\n> # \u2022 Fetch RAG results \u2192 Re-Rank \u2192 Enrich ( \u2192 stream to UI)\n> # \u2022 Compress \u2192 ML batch inference \u2192 Apply\n> # \u2022 Validate \u2192 Batch database ops \u2192 Send notifications\n> ```\n\n## Key Benefits\n\n- **Early Results Delivery**: Get initial results quickly without waiting for all items to finish processing\n- **Order Preservation**: Stream results in original order while maintaining maximum parallelism\n- **Optimal Resource Usage**: Process items concurrently through all pipeline stages\n- **Flexible Consumption**: Stream results as they're ready or collect them all at completion\n\n## Use Cases\n\n- **API Response Streaming**: Show initial results to users immediately while processing continues\n- **Batch Processing with Previews**: Process large datasets in batches but deliver completed batch results as they finish\n- **Long-Running Pipeline Visibility**: Monitor progress of multi-stage transformations in real-time\n- **Efficient Resource Management**: Maximize throughput for IO-bound workloads by ensuring all stages remain active\n\n## Features\n\n- Define atomic tasks that process single items or batches of items\n- Chain tasks together to create powerful asynchronous pipelines\n- Flexible output handling: consume results as an async stream or collect them all at once\n- Robust error handling: Configure retry logic and error recovery strategies at the task level\n- Designed for extensibility\n\n## Installation\n\n```bash\npip install conveyor-streaming \n```\n\n\n## Quick Start\n\n```python\nimport asyncio\nfrom conveyor import single_task, batch_task\n\n# Define some tasks\n@single_task\nasync def multiply_by_two(x: int) -> int:\n    print(f\"Multiplying {x} by 2\")\n    await asyncio.sleep(0.01) # simulate io-bound work\n    return x * 2\n\n@batch_task(max_size=3)\nasync def sum_batch(batch: list[int]) -> int:\n    print(f\"Summing batch: {batch}\")\n    await asyncio.sleep(0.05) # simulate io-bound work\n    s = sum(batch)\n    print(f\"Sum of batch {batch} is {s}\")\n    return s\n\n@single_task\nasync def add_ten(x: int) -> int:\n    print(f\"Adding 10 to {x}\")\n    await asyncio.sleep(0.01)\n    return x + 10\n\nasync def main():\n    # Create a pipeline\n    pipeline = multiply_by_two | sum_batch | add_ten\n\n    data_source = [1, 2, 3, 4, 5, 6, 7]\n\n    # Option 1: Process results as they come\n    print(\"Streaming results as they come...\")\n    stream = pipeline(data_source)\n    async for item in stream:\n        print(f\"Streamed item: {item}\")\n        \n    # Option 2: Collect all results\n    print(\"\\nCollecting all results...\")\n    # Re-create stream for fresh iteration\n    results = await pipeline(data_source).collect()\n    print(f\"Collected results: {results}\")\n    # Expected:\n    # 1*2=2, 2*2=4, 3*2=6 -> sum_batch([2,4,6]) = 12 -> add_ten(12) = 22\n    # 4*2=8, 5*2=10, 6*2=12 -> sum_batch([8,10,12]) = 30 -> add_ten(30) = 40\n    # 7*2=14 -> sum_batch([14]) = 14 -> add_ten(14) = 24\n    # Results: [22, 40, 24]\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n```\n\n## Stream Processing vs. Collecting Results\n\nConveyor offers three main approaches to consuming pipeline results, depending on your specific use case:\n\n### Option 1A: Ordered Streaming (Default)\n\nUse async iteration when you want to handle results as they become available while preserving the original input order:\n\n```python\n# Process results as they're available, in original input order\nasync for result in pipeline(data_source):\n    print(f\"Got a result in original order: {result}\")\n    # Process each result immediately\n```\n\n**\ud83d\ude80 Ordered Streaming Performance Benefits:**\n\nConveyor's ordered processing uses an **ordered queue approach** that enables true streaming while preserving order:\n\n- **Early yielding**: Results are yielded as soon as they can be yielded in order\n- **Maximum parallelism**: All items process concurrently, not sequentially  \n- **Pipeline efficiency**: Subsequent stages can start processing early results immediately\n\n**Timeline Example (Optimal Scenario):**\n```\nInput tasks with processing times: [0.1s, 0.2s, 0.3s, 0.4s, 0.5s, 2.0s, 0.7s, 0.8s]\n\nTraditional approach (waits for all):\nTime: 2.0s \u2192 ALL results yielded at once\n\nConveyor's streaming approach:\nTime: 0.1s \u2192 yield result 1 \u26a1 IMMEDIATE\nTime: 0.2s \u2192 yield result 2 \u26a1 IMMEDIATE  \nTime: 0.3s \u2192 yield result 3 \u26a1 IMMEDIATE\nTime: 0.4s \u2192 yield result 4 \u26a1 IMMEDIATE\nTime: 0.5s \u2192 yield result 5 \u26a1 IMMEDIATE\nTime: 2.0s \u2192 yield results 6,7,8 (7,8 were buffered, waiting for 6)\n\nBenefits: \ud83c\udfaf First 5 results available 75% faster!\n```\n\n**Order Preservation with Outliers:**\n- Fast tasks stream immediately when they're next in order\n- Slow outliers block subsequent results to maintain order  \n- Once outliers complete, buffered results yield immediately\n- **Never performs worse** than traditional batch processing\n\n### Option 1B: Unordered Streaming (as_completed)\n\nWhen you care about processing results as soon as possible, regardless of their original order:\n\n```python\n# Process results in the order they complete, ignoring original input order\nasync for result in pipeline.as_completed(data_source):\n    print(f\"Got a completed result (fastest first): {result}\")\n    # Process results in completion order\n```\n\nAlternatively, you can create a stream with a specific execution mode:\n\n```python\n# Create a stream with as_completed execution mode\nstream = pipeline.with_execution_mode(\"as_completed\")(data_source)\nasync for result in stream:\n    print(f\"Got result in completion order: {result}\")\n```\n\n**\u26a1 Unordered Streaming Benefits:**\n- **Maximum responsiveness**: Get results immediately as they complete\n- **No blocking**: Fast results are never delayed by slow outliers\n- **Optimized for speed**: Ideal when result order doesn't matter\n- **Real-time processing**: Perfect for displaying immediate progress\n\n### Option 2: Collecting All Results\n\nUse `collect()` when you need all results available at once:\n\n```python\n# Wait for all results to be processed\nresults = await pipeline(data_source).collect()\nprint(f\"All results are ready: {results}\")\n\n# Or collect results in completion order\nresults = await pipeline.as_completed(data_source).collect()\nprint(f\"All results collected in completion order: {results}\")\n```\n\n\n**Pipeline Flow Diagram:**\n\n```mermaid\ngraph TD\n    A[\"Input Data: 1, 2, 3, 4, 5, 6, 7\"] --> B(multiply_by_two - single_task);\n    B --> C[\"Processed Items: 2, 4, 6, 8, 10, 12, 14\"];\n    C --> D(sum_batch - batch_task, max_size=3);\n    D -- \"batch [2,4,6]\" --> E((12));\n    D -- \"batch [8,10,12]\" --> F((30));\n    D -- \"batch [14]\" --> G((14));\n    E --> H(add_ten - single_task);\n    F --> H;\n    G --> H;\n    H --> I[\"Final Results: 22, 40, 24\"];\n```\n\n**Understanding Task Types:**\n\n*   **`@single_task`**: Processes one item at a time.\n\n*   **`@batch_task(max_size=N, min_size=M)`**: Collects items into a batch (up to `max_size`) and processes the batch. If the remaining items are fewer than `max_size` but at least `min_size`, they are processed as a final batch.\n\n\n## Using Side Inputs with `with_inputs`\n\n## Handling Multiple Input Sources\n\nWhen building pipelines, you might need to incorporate data from multiple sources. Conveyor Streaming offers two approaches:\n\n**Side Inputs Diagram :**\n\n```mermaid\ngraph LR\n    DataSource[\"data_source <br/>(primary pipeline arg)\"] --> Input[Main Input Item]\n    subgraph Pipeline with Side Input\n        Input --> multiply_by_two_1\n        multiply_by_two_1 --> add_value_from_side_input\n        add_value_from_side_input --> multiply_by_two_2\n    end\n    multiply_by_two_2 --> OutputItem[Processed Stream]\n    SideInputSrc[\"Side Input Source<br>(value, coroutine, AsyncStream)\"] --> add_value_from_side_input\n    \n```\n\n### 1. Direct Stream Integration in Tasks\n\nYou can create tasks that explicitly process data from multiple sources:\n\n```python\nimport asyncio\nfrom conveyor import single_task, batch_task, Pipeline\n\n@single_task\nasync def multiply_by_two(x: int) -> int:\n    return x * 2\n\n@batch_task(max_size=3)\nasync def sum_batch(batch: list[int]) -> int:\n    return sum(batch)\n\n# A task that waits for another data source\n@single_task\nasync def wait_for_other_source(x: int, other_stream) -> int:\n    other_value = await anext(other_stream)\n    print(f\"Combined main value {x} with other value {other_value}\")\n    return x + other_value\n\nasync def generate_secondary_data():\n    for i in range(10, 40, 10):\n        yield i\n        await asyncio.sleep(0.02)\n\nasync def main_multiple_sources():\n    # Create the secondary stream\n    secondary_stream = generate_secondary_data()\n    \n    # Create pipeline with the custom task\n    pipeline = multiply_by_two | wait_for_other_source(secondary_stream) | multiply_by_two\n    \n    data = [1, 2, 3]\n    results = await pipeline(data).collect()\n    print(f\"Results: {results}\")\n    # Expected flow:\n    # 1. multiply_by_two: [2,4,6]\n    # 3. wait_task combines with [10, 20, 30]: [12, 24, 36] \n    # 4. multiply_by_two: [24, 48, 72]\n```\n\nThis approach uses a lambda to capture the secondary stream in the pipeline definition.\n\n### 2. Declarative Side Inputs with `with_inputs`\n\nAlternatively, you can use the more elegant `with_inputs` method to specify additional inputs in a declarative way, as shown in the next section.\n\nTasks can be configured to accept additional \"side inputs\" that are resolved when the task is executed. This is useful for incorporating data that isn't part of the main pipeline flow or needs to be fetched/calculated at the time of processing.\n\nThe `with_inputs` method on a task allows you to specify these side inputs. They can be regular values, coroutines, or `AsyncStream` instances (from which the first item will be taken).\n\n```python\n...\n\nasync def main_multiple_sources():\n    # Create the secondary stream\n    secondary_stream = generate_secondary_data()\n    \n    # Create pipeline with the custom task\n    pipeline = multiply_by_two | wait_for_other_source.with_inputs(secondary_stream) | multiply_by_two\n    \n    data = [1, 2, 3]\n    results = await pipeline(data).collect()\n    print(f\"Results: {results}\")\n    # Expected flow:\n    # 1. multiply_by_two: [2,4,6]\n    # 3. wait_task combines with [10, 20, 30]: [12, 24, 36] \n    # 4. multiply_by_two: [24, 48, 72]\n\n```\n\n\n\n\n\n\n## More Complex Pipeline Examples\n\nHere are a few examples demonstrating more intricate pipeline constructions.\n\n### 1. Pipeline with Filtering\n\nTasks can filter out items by returning `None`. These `None` values are not passed to subsequent tasks.\n\n```python\nimport asyncio\nfrom conveyor import single_task, batch_task, Pipeline\n\n@single_task\nasync def initial_processor(x: int) -> int:\n    print(f\"Initial: Processing {x}\")\n    await asyncio.sleep(0.01)\n    return x * 10\n\n@single_task\nasync def filter_multiples_of_three(x: int) -> int | None:\n    print(f\"Filter: Checking {x}\")\n    await asyncio.sleep(0.01)\n    if x % 3 == 0:\n        print(f\"Filter: {x} is a multiple of 3, keeping.\")\n        return x\n    print(f\"Filter: {x} is not a multiple of 3, discarding.\")\n    return None # This item will be filtered out\n\n@batch_task(max_size=2)\nasync def batch_adder(batch: list[int]) -> int:\n    print(f\"BatchAdd: Summing batch {batch}\")\n    await asyncio.sleep(0.02)\n    return sum(batch)\n\n@single_task\nasync def final_touch(x: int) -> str:\n    print(f\"Final: Decorating {x}\")\n    await asyncio.sleep(0.01)\n    return f\"Result: {x}\"\n\nasync def main_filtering_pipeline():\n    filtering_pipeline = (\n        initial_processor |\n        filter_multiples_of_three |\n        batch_adder |\n        final_touch\n    )\n\n    data = [1, 2, 3, 4, 5, 6]\n    # initial_processor: [10, 20, 30, 40, 50, 60]\n    # filter_multiples_of_three: [None, None, 30, None, None, 60] -> effectively [30, 60]\n    # batch_adder (max_size=2): batch_adder([30, 60]) -> 90\n    # final_touch: \"Result: 90\"\n\n    results = await filtering_pipeline(data).collect()\n    print(f\"\\nFiltering Pipeline Results: {results}\")\n\nif __name__ == \"__main__\":\n    # asyncio.run(main()) # Original quick start\n    # asyncio.run(main_with_side_inputs())\n    asyncio.run(main_filtering_pipeline())\n\n```\n\n\n### 2. Pipeline with Multiple Batching Stages\n\nYou can have multiple batch tasks in a pipeline, allowing for different grouping strategies at various points.\n\n```python\nimport asyncio\nfrom conveyor import single_task, batch_task, Pipeline\n\n@batch_task(max_size=3, min_size=1)\nasync def initial_batch_summarizer(batch: list[int]) -> tuple[int, int]: # Returns (sum, count)\n    print(f\"InitialBatch: Processing {batch}\")\n    await asyncio.sleep(0.02)\n    return (sum(batch), len(batch))\n\n@single_task\nasync def process_summary(summary: tuple[int, int]) -> float: # Calculates average\n    s, c = summary\n    print(f\"ProcessSummary: Sum={s}, Count={c}\")\n    await asyncio.sleep(0.01)\n    return s / c if c > 0 else 0\n\n@batch_task(max_size=2, min_size=1)\nasync def final_batch_aggregator(averages: list[float]) -> float: # Sum of averages\n    print(f\"FinalBatch: Aggregating averages {averages}\")\n    await asyncio.sleep(0.02)\n    return sum(averages)\n\nasync def main_multi_batch_pipeline():\n    multi_batch_pipeline = (\n        initial_batch_summarizer |\n        process_summary |\n        final_batch_aggregator\n    )\n\n    data = [1, 2, 3, 4, 5, 6, 7]\n    # initial_batch_summarizer (max_size=3):\n    #   [1,2,3] -> (6,3)\n    #   [4,5,6] -> (15,3)\n    #   [7]     -> (7,1)\n    # process_summary:\n    #   (6,3) -> 2.0\n    #   (15,3) -> 5.0\n    #   (7,1) -> 7.0\n    # final_batch_aggregator (max_size=2):\n    #   [2.0, 5.0] -> 7.0\n    #   [7.0]      -> 7.0\n    # Results: [7.0, 7.0]\n\n    results = await multi_batch_pipeline(data).collect()\n    print(f\"\\nMulti-Batch Pipeline Results: {results}\")\n\nif __name__ == \"__main__\":\n    # asyncio.run(main()) # Original quick start\n    # asyncio.run(main_with_side_inputs())\n    # asyncio.run(main_filtering_pipeline())\n    asyncio.run(main_multi_batch_pipeline())\n```\n\n**Multi-Batch Pipeline Flow :**\n\n```mermaid\ngraph TD\n    A[\"Input: 1,2,3,4,5,6,7\"] --> IB(\"initial_batch_summarizer max_size=3\")\n    IB -- \"batch [1,2,3]\" --> S1[\"(6,3)\"]\n    IB -- \"batch [4,5,6]\" --> S2[\"(15,3)\"]\n    IB -- \"batch [7]\" --> S3[\"(7,1)\"]\n    S1 --> PS1(process_summary)\n    S2 --> PS2(process_summary)\n    S3 --> PS3(process_summary)\n    PS1 -- \"avg 2.0\" --> AV1((2.0))\n    PS2 -- \"avg 5.0\" --> AV2((5.0))\n    PS3 -- \"avg 7.0\" --> AV3((7.0))\n    AV1 --> FB(\"final_batch_aggregator max_size=2\")\n    AV2 --> FB\n    AV3 --> FB\n    FB -- \"batch [2.0, 5.0]\" --> R1((7.0))\n    FB -- \"batch [7.0]\" --> R2((7.0))\n    R1 --> FR[Final Results]\n    R2 --> FR\n```\n\n## Error Handling\n\nConveyor provides comprehensive error handling capabilities that can be configured directly in task decorators. You can control retry behavior and specify what should happen when errors occur.\n\n### Error Handling Options\n\n- **`on_error`**: What to do when an error occurs after all retry attempts\n  - `\"fail\"` (default): Raise the error and stop the pipeline\n  - `\"skip_item\"`: Skip the failing item and continue processing\n  - `\"skip_batch\"`: For batch tasks, skip the entire batch if any item fails\n\n- **Retry Configuration**: Independent retry logic with exponential backoff\n  - `retry_attempts`: Number of retry attempts (default: 1, no retry)\n  - `retry_delay`: Base delay between retries in seconds\n  - `retry_exponential_backoff`: Whether to use exponential backoff (default: True)\n  - `retry_max_delay`: Maximum delay between retries\n\n- **Custom Error Handlers**: For complex error handling scenarios\n\n### Basic Error Handling Example\n\n```python\nimport asyncio\nimport random\nfrom conveyor import single_task, batch_task, ErrorHandler\n\n# Task that skips failing items\n@single_task(on_error=\"skip_item\")\nasync def unreliable_task(x: int) -> int:\n    await asyncio.sleep(0.01)\n    if random.random() < 0.3:  # 30% failure rate\n        raise ValueError(f\"Random failure processing {x}\")\n    return x * 2\n\n# Task with retry logic\n@single_task(\n    retry_attempts=3,\n    retry_delay=0.1,\n    retry_exponential_backoff=True,\n    on_error=\"skip_item\"\n)\nasync def task_with_retry(x: int) -> int:\n    if random.random() < 0.5:  # 50% failure rate\n        raise ValueError(f\"Temporary failure processing {x}\")\n    return x + 100\n\n# Batch task that skips entire batch on any failure\n@batch_task(max_size=3, on_error=\"skip_batch\")\nasync def sensitive_batch_task(batch: list[int]) -> int:\n    if any(x < 0 for x in batch):\n        raise ValueError(\"Negative numbers not allowed in batch\")\n    return sum(batch)\n\n# Custom error handler for business logic\nclass BusinessErrorHandler(ErrorHandler):\n    async def handle_error(self, error: Exception, item, task_name: str, attempt: int) -> tuple[bool, any]:\n        if isinstance(error, ValueError) and \"business rule\" in str(error):\n            print(f\"Business rule violation in {task_name}: {error}\")\n            return True, -1  # Continue with sentinel value\n        return False, None  # Re-raise other errors\n\n@single_task(error_handler=BusinessErrorHandler())\nasync def business_task(x: int) -> int:\n    if x % 7 == 0:\n        raise ValueError(\"business rule: multiples of 7 not allowed\")\n    return x * 5\n\nasync def main_error_handling():\n    data = [1, 2, -1, 3, 4, 5, 6, 7, 8]\n    \n    print(\"1. Skip failing items:\")\n    pipeline1 = unreliable_task | sensitive_batch_task\n    results1 = await pipeline1(data).collect()\n    print(f\"Results: {results1}\")\n    \n    print(\"\\n2. Retry with backoff:\")\n    pipeline2 = task_with_retry\n    results2 = await pipeline2([1, 2, 3]).collect()\n    print(f\"Results: {results2}\")\n    \n    print(\"\\n3. Custom error handler:\")\n    pipeline3 = business_task\n    results3 = await pipeline3([1, 2, 7, 14, 21]).collect()\n    print(f\"Results: {results3}\")\n\nif __name__ == \"__main__\":\n    asyncio.run(main_error_handling())\n```\n\n\n## Alternatives\n\nWhile there are several libraries in adjacent spaces, none directly address the specific challenge of streaming results from mixed single/batch task pipelines. Here's how Conveyor compares to existing solutions:\n\n### Workflow Orchestration Platforms\n\n**Prefect, Airflow, Dagster**\n- **What they do**: Orchestrate complex DAG-based workflows with scheduling, monitoring, and error recovery\n- **Gap**: Designed for batch-oriented ETL workflows, not real-time streaming of intermediate results\n- **Use case**: Great for scheduled data pipelines, but won't help with streaming progressive results to users\n\n**When to use instead**: Large-scale ETL orchestration with complex dependencies and scheduling requirements\n\n### Distributed Computing Frameworks\n\n**Ray, Dask**\n- **What they do**: Distributed computing with parallel task execution across clusters\n- **Gap**: Much heavier weight, focused on distributed computing rather than pipeline streaming coordination\n- **Use case**: Large-scale distributed processing, not single-machine streaming pipelines\n\n**When to use instead**: When you need to scale across multiple machines or have very large datasets\n\n### Task Queue Systems\n\n**Celery, RQ, Dramatiq**\n- **What they do**: Distributed task queues for background job processing\n- **Gap**: Focused on job distribution and execution, not streaming intermediate results between pipeline stages\n- **Use case**: Background job processing, not real-time result streaming\n\n**When to use instead**: When you need distributed task execution across multiple workers\n\n### Stream Processing Frameworks\n\n**Apache Kafka Streams, Apache Beam**\n- **What they do**: Event-driven stream processing for real-time data pipelines\n- **Gap**: Different paradigm (event streaming), not async Python task pipelines\n- **Use case**: Event-driven architectures with message queues\n\n**When to use instead**: Event-driven systems with external message brokers\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "A Python library for streamlining asynchronous streaming tasks and pipelines.",
    "version": "1.2.1",
    "project_urls": {
        "Documentation": "https://conveyor-streaming.readthedocs.io/",
        "Homepage": "https://github.com/ju-bezdek/conveyor",
        "Repository": "https://github.com/ju-bezdek/conveyor"
    },
    "split_keywords": [
        "async",
        " streaming",
        " pipeline",
        " data processing"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "ad8e50feb1bee5d8a7309c296675383a8c1e4e164e549fbd478291169dbb364a",
                "md5": "5fa279d34ce4502c47bf53f4a16ddfcf",
                "sha256": "42e35d1525e27e589dc0a3a5b6d038492c742c6b7b10b779b169983b8286ec48"
            },
            "downloads": -1,
            "filename": "conveyor_streaming-1.2.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "5fa279d34ce4502c47bf53f4a16ddfcf",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.11",
            "size": 18983,
            "upload_time": "2025-08-10T22:19:19",
            "upload_time_iso_8601": "2025-08-10T22:19:19.978966Z",
            "url": "https://files.pythonhosted.org/packages/ad/8e/50feb1bee5d8a7309c296675383a8c1e4e164e549fbd478291169dbb364a/conveyor_streaming-1.2.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "1e4f6c19e89e028ba029e4489805180dd0512171dc0a1a68e9b9499d7c9b95c0",
                "md5": "d689a8516eff5ef8a986e37c919a78b1",
                "sha256": "53279013069168c3d64e956510fd866c3e71925cd582a7336e2a7c46d6bc83e9"
            },
            "downloads": -1,
            "filename": "conveyor_streaming-1.2.1.tar.gz",
            "has_sig": false,
            "md5_digest": "d689a8516eff5ef8a986e37c919a78b1",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.11",
            "size": 22384,
            "upload_time": "2025-08-10T22:19:21",
            "upload_time_iso_8601": "2025-08-10T22:19:21.401149Z",
            "url": "https://files.pythonhosted.org/packages/1e/4f/6c19e89e028ba029e4489805180dd0512171dc0a1a68e9b9499d7c9b95c0/conveyor_streaming-1.2.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-08-10 22:19:21",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "ju-bezdek",
    "github_project": "conveyor",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "conveyor-streaming"
}
        
Elapsed time: 0.90360s