streamz-zmq


Namestreamz-zmq JSON
Version 0.1.0 PyPI version JSON
download
home_pageNone
SummaryZeroMQ integration for streamz - high-performance streaming data processing
upload_time2025-07-24 16:51:28
maintainerNone
docs_urlNone
authorNone
requires_python>=3.10
licenseMIT
keywords data-processing real-time streaming zeromq zmq
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # streamz-zmq

[![PyPI version](https://badge.fury.io/py/streamz-zmq.svg)](https://badge.fury.io/py/streamz-zmq)
[![GitHub release](https://img.shields.io/github/v/release/izzet/streamz-zmq)](https://github.com/izzet/streamz-zmq/releases)
[![Python 3.10+](https://img.shields.io/badge/python-3.10+-blue.svg)](https://www.python.org/downloads/)

ZeroMQ integration for [streamz](https://streamz.readthedocs.io/) - enabling high-performance streaming data processing with distributed messaging.

## Features

- **ZMQ Source (`from_zmq`)**: Receive data streams from ZeroMQ publishers
- **ZMQ Sink (`to_zmq`)**: Send processed data to ZeroMQ sockets  
- **Async Support**: Built with asyncio for high-performance streaming
- **Multiple Patterns**: Support for PUB/SUB, PUSH/PULL, and other ZMQ patterns
- **Seamless Integration**: Extends streamz with familiar API patterns

## Installation

```bash
pip install streamz-zmq
```

Or with uv:

```bash
uv add streamz-zmq
```

## Quick Start

### Receiving data from ZMQ (Source)

```python
from streamz import Stream
import streamz_zmq  # Register the ZMQ extensions

# Create a stream that receives from a ZMQ publisher
source = Stream.from_zmq("tcp://localhost:5555")
source.sink(print)  # Print received messages

# Start the stream
source.start()
```

### Sending data to ZMQ (Sink)

```python
from streamz import Stream
import streamz_zmq  # Register the ZMQ extensions

# Create a stream and send results to ZMQ
source = Stream.from_iterable([1, 2, 3, 4, 5])
source.map(lambda x: x * 2).to_zmq("tcp://*:5556")

# Start the stream
source.start()
```

### Complete Example: Pipeline with ZMQ

```python
import asyncio
from streamz import Stream
import streamz_zmq

async def main():
    # Receive from one ZMQ socket, process, send to another
    source = Stream.from_zmq("tcp://localhost:5555")
    
    processed = (source
                .map(lambda x: x.decode('utf-8'))  # Decode bytes
                .map(str.upper)                    # Process data
                .map(str.encode))                  # Encode back to bytes
    
    processed.to_zmq("tcp://*:5556")
    
    # Start processing
    await source.start()

if __name__ == "__main__":
    asyncio.run(main())
```

## Examples

Check out the `examples/` directory for demonstrations:

- **`simple_example.py`**: Basic example showing ZMQ publisher thread + streamz subscriber
- **`comprehensive_example.py`**: Advanced demonstration showing multiple ZMQ patterns:
  - **PUB/SUB**: Publisher broadcasts weather updates to topic-specific subscribers
  - **PUSH/PULL**: Load balancing work distribution across multiple workers  
  - **Pipeline**: Multi-stage data processing pipeline

Run the simple example:
```bash
uv run python examples/simple_example.py
```

Run the comprehensive example:
```bash
uv run python examples/comprehensive_example.py
```

## API Reference

### `Stream.from_zmq(connect_str, sock_type=zmq.SUB, subscribe=b"")`

Creates a stream source that receives messages from a ZMQ socket.

**Parameters:**
- `connect_str` (str): ZMQ connection string (e.g., "tcp://localhost:5555")
- `sock_type` (int, optional): ZMQ socket type. Defaults to `zmq.SUB`
- `subscribe` (bytes, optional): Subscription topic for SUB sockets. Defaults to `b""` (all messages)

### `stream.to_zmq(connect_str, sock_type=zmq.PUSH)`

Sends stream elements to a ZMQ socket.

**Parameters:**
- `connect_str` (str): ZMQ connection string (e.g., "tcp://*:5556") 
- `sock_type` (int, optional): ZMQ socket type. Defaults to `zmq.PUSH`

## ZMQ Patterns Supported

- **PUB/SUB**: Publisher broadcasts to multiple subscribers
- **PUSH/PULL**: Load balancing across workers
- **REQ/REP**: Request-response (less common for streaming)

## Requirements

- Python 3.10+
- streamz >= 0.6.4
- pyzmq >= 27.0.0

## Development

```bash
# Clone the repository
git clone https://github.com/izzet/streamz-zmq.git
cd streamz-zmq

# Install with uv (uses uv.lock for reproducible builds)
uv sync --dev

# Set up pre-commit hooks (recommended)
uv run pre-commit install

# Run tests
uv run pytest

# Format code
uv run ruff format .

# Check linting
uv run ruff check .

# Build package
uv build
```

**Note**: This project uses `uv.lock` for reproducible dependency management. The lock file is committed to ensure all developers and CI/CD use identical dependency versions.

**Pre-commit hooks**: The project includes pre-commit hooks that automatically format code, check linting, and run tests before each commit to maintain code quality.

## License

MIT License. See [LICENSE](LICENSE) file for details.

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request.
            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "streamz-zmq",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.10",
    "maintainer_email": "Izzet Yildirim <izzetcyildirim@gmail.com>",
    "keywords": "data-processing, real-time, streaming, zeromq, zmq",
    "author": null,
    "author_email": "Izzet Yildirim <izzetcyildirim@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/df/71/8b9d07180f23eae876ab70910330fc913f3402ff68a9c1fdd7b785b055c2/streamz_zmq-0.1.0.tar.gz",
    "platform": null,
    "description": "# streamz-zmq\n\n[![PyPI version](https://badge.fury.io/py/streamz-zmq.svg)](https://badge.fury.io/py/streamz-zmq)\n[![GitHub release](https://img.shields.io/github/v/release/izzet/streamz-zmq)](https://github.com/izzet/streamz-zmq/releases)\n[![Python 3.10+](https://img.shields.io/badge/python-3.10+-blue.svg)](https://www.python.org/downloads/)\n\nZeroMQ integration for [streamz](https://streamz.readthedocs.io/) - enabling high-performance streaming data processing with distributed messaging.\n\n## Features\n\n- **ZMQ Source (`from_zmq`)**: Receive data streams from ZeroMQ publishers\n- **ZMQ Sink (`to_zmq`)**: Send processed data to ZeroMQ sockets  \n- **Async Support**: Built with asyncio for high-performance streaming\n- **Multiple Patterns**: Support for PUB/SUB, PUSH/PULL, and other ZMQ patterns\n- **Seamless Integration**: Extends streamz with familiar API patterns\n\n## Installation\n\n```bash\npip install streamz-zmq\n```\n\nOr with uv:\n\n```bash\nuv add streamz-zmq\n```\n\n## Quick Start\n\n### Receiving data from ZMQ (Source)\n\n```python\nfrom streamz import Stream\nimport streamz_zmq  # Register the ZMQ extensions\n\n# Create a stream that receives from a ZMQ publisher\nsource = Stream.from_zmq(\"tcp://localhost:5555\")\nsource.sink(print)  # Print received messages\n\n# Start the stream\nsource.start()\n```\n\n### Sending data to ZMQ (Sink)\n\n```python\nfrom streamz import Stream\nimport streamz_zmq  # Register the ZMQ extensions\n\n# Create a stream and send results to ZMQ\nsource = Stream.from_iterable([1, 2, 3, 4, 5])\nsource.map(lambda x: x * 2).to_zmq(\"tcp://*:5556\")\n\n# Start the stream\nsource.start()\n```\n\n### Complete Example: Pipeline with ZMQ\n\n```python\nimport asyncio\nfrom streamz import Stream\nimport streamz_zmq\n\nasync def main():\n    # Receive from one ZMQ socket, process, send to another\n    source = Stream.from_zmq(\"tcp://localhost:5555\")\n    \n    processed = (source\n                .map(lambda x: x.decode('utf-8'))  # Decode bytes\n                .map(str.upper)                    # Process data\n                .map(str.encode))                  # Encode back to bytes\n    \n    processed.to_zmq(\"tcp://*:5556\")\n    \n    # Start processing\n    await source.start()\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n```\n\n## Examples\n\nCheck out the `examples/` directory for demonstrations:\n\n- **`simple_example.py`**: Basic example showing ZMQ publisher thread + streamz subscriber\n- **`comprehensive_example.py`**: Advanced demonstration showing multiple ZMQ patterns:\n  - **PUB/SUB**: Publisher broadcasts weather updates to topic-specific subscribers\n  - **PUSH/PULL**: Load balancing work distribution across multiple workers  \n  - **Pipeline**: Multi-stage data processing pipeline\n\nRun the simple example:\n```bash\nuv run python examples/simple_example.py\n```\n\nRun the comprehensive example:\n```bash\nuv run python examples/comprehensive_example.py\n```\n\n## API Reference\n\n### `Stream.from_zmq(connect_str, sock_type=zmq.SUB, subscribe=b\"\")`\n\nCreates a stream source that receives messages from a ZMQ socket.\n\n**Parameters:**\n- `connect_str` (str): ZMQ connection string (e.g., \"tcp://localhost:5555\")\n- `sock_type` (int, optional): ZMQ socket type. Defaults to `zmq.SUB`\n- `subscribe` (bytes, optional): Subscription topic for SUB sockets. Defaults to `b\"\"` (all messages)\n\n### `stream.to_zmq(connect_str, sock_type=zmq.PUSH)`\n\nSends stream elements to a ZMQ socket.\n\n**Parameters:**\n- `connect_str` (str): ZMQ connection string (e.g., \"tcp://*:5556\") \n- `sock_type` (int, optional): ZMQ socket type. Defaults to `zmq.PUSH`\n\n## ZMQ Patterns Supported\n\n- **PUB/SUB**: Publisher broadcasts to multiple subscribers\n- **PUSH/PULL**: Load balancing across workers\n- **REQ/REP**: Request-response (less common for streaming)\n\n## Requirements\n\n- Python 3.10+\n- streamz >= 0.6.4\n- pyzmq >= 27.0.0\n\n## Development\n\n```bash\n# Clone the repository\ngit clone https://github.com/izzet/streamz-zmq.git\ncd streamz-zmq\n\n# Install with uv (uses uv.lock for reproducible builds)\nuv sync --dev\n\n# Set up pre-commit hooks (recommended)\nuv run pre-commit install\n\n# Run tests\nuv run pytest\n\n# Format code\nuv run ruff format .\n\n# Check linting\nuv run ruff check .\n\n# Build package\nuv build\n```\n\n**Note**: This project uses `uv.lock` for reproducible dependency management. The lock file is committed to ensure all developers and CI/CD use identical dependency versions.\n\n**Pre-commit hooks**: The project includes pre-commit hooks that automatically format code, check linting, and run tests before each commit to maintain code quality.\n\n## License\n\nMIT License. See [LICENSE](LICENSE) file for details.\n\n## Contributing\n\nContributions are welcome! Please feel free to submit a Pull Request.",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "ZeroMQ integration for streamz - high-performance streaming data processing",
    "version": "0.1.0",
    "project_urls": {
        "Bug Tracker": "https://github.com/izzet/streamz-zmq/issues",
        "Documentation": "https://github.com/izzet/streamz-zmq#readme",
        "Homepage": "https://github.com/izzet/streamz-zmq",
        "Repository": "https://github.com/izzet/streamz-zmq"
    },
    "split_keywords": [
        "data-processing",
        " real-time",
        " streaming",
        " zeromq",
        " zmq"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "c592f0282fd08d63e5185aa026df82e181c9790578a4d65e176dc5e56658c728",
                "md5": "615a7cfbdd33ad352a728d8ea4918569",
                "sha256": "0972be6d9c37c0b6f43c4429f0a613d68a4d1c8543a79ab383e230e8e48a3d6d"
            },
            "downloads": -1,
            "filename": "streamz_zmq-0.1.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "615a7cfbdd33ad352a728d8ea4918569",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.10",
            "size": 6268,
            "upload_time": "2025-07-24T16:51:27",
            "upload_time_iso_8601": "2025-07-24T16:51:27.149591Z",
            "url": "https://files.pythonhosted.org/packages/c5/92/f0282fd08d63e5185aa026df82e181c9790578a4d65e176dc5e56658c728/streamz_zmq-0.1.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "df718b9d07180f23eae876ab70910330fc913f3402ff68a9c1fdd7b785b055c2",
                "md5": "815f07395021ea89514441c816a07a76",
                "sha256": "2d70a7851bf69709d31491697247d9823077a062280f703cd8cf74ff614ed38e"
            },
            "downloads": -1,
            "filename": "streamz_zmq-0.1.0.tar.gz",
            "has_sig": false,
            "md5_digest": "815f07395021ea89514441c816a07a76",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.10",
            "size": 36565,
            "upload_time": "2025-07-24T16:51:28",
            "upload_time_iso_8601": "2025-07-24T16:51:28.428405Z",
            "url": "https://files.pythonhosted.org/packages/df/71/8b9d07180f23eae876ab70910330fc913f3402ff68a9c1fdd7b785b055c2/streamz_zmq-0.1.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-07-24 16:51:28",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "izzet",
    "github_project": "streamz-zmq",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "streamz-zmq"
}
        
Elapsed time: 1.04621s