streamz-zmq


Namestreamz-zmq JSON
Version 0.1.5 PyPI version JSON
download
home_pageNone
SummaryZeroMQ integration for streamz - high-performance streaming data processing
upload_time2025-09-02 15:44:24
maintainerNone
docs_urlNone
authorNone
requires_python>3.8
licenseMIT
keywords data-processing real-time streaming zeromq zmq
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # streamz-zmq

[![PyPI version](https://badge.fury.io/py/streamz-zmq.svg)](https://badge.fury.io/py/streamz-zmq)
[![GitHub release](https://img.shields.io/github/v/release/izzet/streamz-zmq)](https://github.com/izzet/streamz-zmq/releases)
[![Python 3.10+](https://img.shields.io/badge/python-3.10+-blue.svg)](https://www.python.org/downloads/)

ZeroMQ integration for [streamz](https://streamz.readthedocs.io/) - enabling high-performance streaming data processing with distributed messaging.

## Features

- **ZMQ Source (`from_zmq`)**: Receive data streams from ZeroMQ publishers
- **ZMQ Sink (`to_zmq`)**: Send processed data to ZeroMQ sockets
- **Async Support**: Built with asyncio for high-performance streaming
- **Multiple Patterns**: Support for PUB/SUB, PUSH/PULL, and other ZMQ patterns
- **Seamless Integration**: Extends streamz with familiar API patterns

## Installation

```bash
pip install streamz-zmq
```

Or with uv:

```bash
uv add streamz-zmq
```

## Quick Start

### Receiving data from ZMQ (Source)

```python
from streamz import Stream
import streamz_zmq  # Register the ZMQ extensions
import zmq

# Create a stream that receives from a ZMQ publisher (connect mode, default)
source = Stream.from_zmq("tcp://localhost:5555")
source.sink(print)  # Print received messages

# Start the stream
source.start()

# Or, act as a collector/server and accept connections from publishers:
collector = Stream.from_zmq("tcp://*:6000", sock_type=zmq.PULL, bind=True)
collector.sink(print)
collector.start()
```

### Sending data to ZMQ (Sink)

```python
from streamz import Stream
import streamz_zmq  # Register the ZMQ extensions

# Create a stream and send results to an existing ZMQ service (default: connect mode)
source = Stream.from_iterable([1, 2, 3, 4, 5])
source.map(lambda x: x * 2).to_zmq("tcp://localhost:5556")

# Or, act as a service and accept connections from ZMQ clients (bind mode)
source.map(...).to_zmq("tcp://*:5556", bind=True)

# Start the stream
source.start()
```

### Complete Example: Pipeline with ZMQ

```python
import asyncio
from streamz import Stream
import streamz_zmq

async def main():
    # Receive from one ZMQ socket, process, send to another
    source = Stream.from_zmq("tcp://localhost:5555")

    processed = (source
                .map(lambda x: x.decode('utf-8'))  # Decode bytes
                .map(str.upper)                    # Process data
                .map(str.encode))                  # Encode back to bytes

    processed.to_zmq("tcp://*:5556")

    # Start processing
    await source.start()

if __name__ == "__main__":
    asyncio.run(main())
```

## Examples

Check out the `examples/` directory for demonstrations:

- **`simple_example.py`**: Basic example showing ZMQ publisher thread + streamz subscriber
- **`comprehensive_example.py`**: Advanced demonstration showing multiple ZMQ patterns:
  - **PUB/SUB**: Publisher broadcasts weather updates to topic-specific subscribers
  - **PUSH/PULL**: Load balancing work distribution across multiple workers
  - **Pipeline**: Multi-stage data processing pipeline

Run the simple example:

```bash
uv run python examples/simple_example.py
```

Run the comprehensive example:

```bash
uv run python examples/comprehensive_example.py
```

## API Reference

### `Stream.from_zmq(connect_str, sock_type=zmq.SUB, subscribe=b"", bind=False)`

Creates a stream source that receives messages from a ZMQ socket.

**Parameters:**

- `connect_str` (str): ZMQ connection string (e.g., "tcp://localhost:5555" for connect, or "tcp://\*:5555" for bind)
- `sock_type` (int, optional): ZMQ socket type. Defaults to `zmq.SUB`
- `subscribe` (bytes, optional): Subscription topic for SUB sockets. Defaults to `b""` (all messages)
- `bind` (bool, optional): If True, bind the socket (act as a server/collector). If False (default), connect to the address.

### `stream.to_zmq(connect_str, sock_type=zmq.PUSH, bind=False)`

Sends stream elements to a ZMQ socket.

**Parameters:**

- `connect_str` (str): ZMQ connection string (e.g., "tcp://\*:5556" for bind, or "tcp://localhost:5556" for connect)
- `sock_type` (int, optional): ZMQ socket type. Defaults to `zmq.PUSH`
- `bind` (bool, optional): If True, bind the socket (act as a service). If False (default), connect to the address.

## ZMQ Patterns Supported

- **PUB/SUB**: Publisher broadcasts to multiple subscribers
- **PUSH/PULL**: Load balancing across workers
- **REQ/REP**: Request-response (less common for streaming)

## Requirements

- Python 3.9+
- streamz >= 0.6.4
- pyzmq >= 27.0.0

## Development

```bash
# Clone the repository
git clone https://github.com/izzet/streamz-zmq.git
cd streamz-zmq

# Install with uv (uses uv.lock for reproducible builds)
uv sync --dev

# Set up pre-commit hooks (recommended)
uv run pre-commit install

# Run tests
uv run pytest

# Format code
uv run ruff format .

# Check linting
uv run ruff check .

# Build package
uv build
```

**Note**: This project uses `uv.lock` for reproducible dependency management. The lock file is committed to ensure all developers and CI/CD use identical dependency versions.

**Pre-commit hooks**: The project includes pre-commit hooks that automatically format code, check linting, and run tests before each commit to maintain code quality.

## License

MIT License. See [LICENSE](LICENSE) file for details.

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "streamz-zmq",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">3.8",
    "maintainer_email": "Izzet Yildirim <izzetcyildirim@gmail.com>",
    "keywords": "data-processing, real-time, streaming, zeromq, zmq",
    "author": null,
    "author_email": "Izzet Yildirim <izzetcyildirim@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/d8/d8/117d077de8fdbbc65bc24f7cc41276e3199877bb959c83bac26a9a9dd0f2/streamz_zmq-0.1.5.tar.gz",
    "platform": null,
    "description": "# streamz-zmq\n\n[![PyPI version](https://badge.fury.io/py/streamz-zmq.svg)](https://badge.fury.io/py/streamz-zmq)\n[![GitHub release](https://img.shields.io/github/v/release/izzet/streamz-zmq)](https://github.com/izzet/streamz-zmq/releases)\n[![Python 3.10+](https://img.shields.io/badge/python-3.10+-blue.svg)](https://www.python.org/downloads/)\n\nZeroMQ integration for [streamz](https://streamz.readthedocs.io/) - enabling high-performance streaming data processing with distributed messaging.\n\n## Features\n\n- **ZMQ Source (`from_zmq`)**: Receive data streams from ZeroMQ publishers\n- **ZMQ Sink (`to_zmq`)**: Send processed data to ZeroMQ sockets\n- **Async Support**: Built with asyncio for high-performance streaming\n- **Multiple Patterns**: Support for PUB/SUB, PUSH/PULL, and other ZMQ patterns\n- **Seamless Integration**: Extends streamz with familiar API patterns\n\n## Installation\n\n```bash\npip install streamz-zmq\n```\n\nOr with uv:\n\n```bash\nuv add streamz-zmq\n```\n\n## Quick Start\n\n### Receiving data from ZMQ (Source)\n\n```python\nfrom streamz import Stream\nimport streamz_zmq  # Register the ZMQ extensions\nimport zmq\n\n# Create a stream that receives from a ZMQ publisher (connect mode, default)\nsource = Stream.from_zmq(\"tcp://localhost:5555\")\nsource.sink(print)  # Print received messages\n\n# Start the stream\nsource.start()\n\n# Or, act as a collector/server and accept connections from publishers:\ncollector = Stream.from_zmq(\"tcp://*:6000\", sock_type=zmq.PULL, bind=True)\ncollector.sink(print)\ncollector.start()\n```\n\n### Sending data to ZMQ (Sink)\n\n```python\nfrom streamz import Stream\nimport streamz_zmq  # Register the ZMQ extensions\n\n# Create a stream and send results to an existing ZMQ service (default: connect mode)\nsource = Stream.from_iterable([1, 2, 3, 4, 5])\nsource.map(lambda x: x * 2).to_zmq(\"tcp://localhost:5556\")\n\n# Or, act as a service and accept connections from ZMQ clients (bind mode)\nsource.map(...).to_zmq(\"tcp://*:5556\", bind=True)\n\n# Start the stream\nsource.start()\n```\n\n### Complete Example: Pipeline with ZMQ\n\n```python\nimport asyncio\nfrom streamz import Stream\nimport streamz_zmq\n\nasync def main():\n    # Receive from one ZMQ socket, process, send to another\n    source = Stream.from_zmq(\"tcp://localhost:5555\")\n\n    processed = (source\n                .map(lambda x: x.decode('utf-8'))  # Decode bytes\n                .map(str.upper)                    # Process data\n                .map(str.encode))                  # Encode back to bytes\n\n    processed.to_zmq(\"tcp://*:5556\")\n\n    # Start processing\n    await source.start()\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n```\n\n## Examples\n\nCheck out the `examples/` directory for demonstrations:\n\n- **`simple_example.py`**: Basic example showing ZMQ publisher thread + streamz subscriber\n- **`comprehensive_example.py`**: Advanced demonstration showing multiple ZMQ patterns:\n  - **PUB/SUB**: Publisher broadcasts weather updates to topic-specific subscribers\n  - **PUSH/PULL**: Load balancing work distribution across multiple workers\n  - **Pipeline**: Multi-stage data processing pipeline\n\nRun the simple example:\n\n```bash\nuv run python examples/simple_example.py\n```\n\nRun the comprehensive example:\n\n```bash\nuv run python examples/comprehensive_example.py\n```\n\n## API Reference\n\n### `Stream.from_zmq(connect_str, sock_type=zmq.SUB, subscribe=b\"\", bind=False)`\n\nCreates a stream source that receives messages from a ZMQ socket.\n\n**Parameters:**\n\n- `connect_str` (str): ZMQ connection string (e.g., \"tcp://localhost:5555\" for connect, or \"tcp://\\*:5555\" for bind)\n- `sock_type` (int, optional): ZMQ socket type. Defaults to `zmq.SUB`\n- `subscribe` (bytes, optional): Subscription topic for SUB sockets. Defaults to `b\"\"` (all messages)\n- `bind` (bool, optional): If True, bind the socket (act as a server/collector). If False (default), connect to the address.\n\n### `stream.to_zmq(connect_str, sock_type=zmq.PUSH, bind=False)`\n\nSends stream elements to a ZMQ socket.\n\n**Parameters:**\n\n- `connect_str` (str): ZMQ connection string (e.g., \"tcp://\\*:5556\" for bind, or \"tcp://localhost:5556\" for connect)\n- `sock_type` (int, optional): ZMQ socket type. Defaults to `zmq.PUSH`\n- `bind` (bool, optional): If True, bind the socket (act as a service). If False (default), connect to the address.\n\n## ZMQ Patterns Supported\n\n- **PUB/SUB**: Publisher broadcasts to multiple subscribers\n- **PUSH/PULL**: Load balancing across workers\n- **REQ/REP**: Request-response (less common for streaming)\n\n## Requirements\n\n- Python 3.9+\n- streamz >= 0.6.4\n- pyzmq >= 27.0.0\n\n## Development\n\n```bash\n# Clone the repository\ngit clone https://github.com/izzet/streamz-zmq.git\ncd streamz-zmq\n\n# Install with uv (uses uv.lock for reproducible builds)\nuv sync --dev\n\n# Set up pre-commit hooks (recommended)\nuv run pre-commit install\n\n# Run tests\nuv run pytest\n\n# Format code\nuv run ruff format .\n\n# Check linting\nuv run ruff check .\n\n# Build package\nuv build\n```\n\n**Note**: This project uses `uv.lock` for reproducible dependency management. The lock file is committed to ensure all developers and CI/CD use identical dependency versions.\n\n**Pre-commit hooks**: The project includes pre-commit hooks that automatically format code, check linting, and run tests before each commit to maintain code quality.\n\n## License\n\nMIT License. See [LICENSE](LICENSE) file for details.\n\n## Contributing\n\nContributions are welcome! Please feel free to submit a Pull Request.\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "ZeroMQ integration for streamz - high-performance streaming data processing",
    "version": "0.1.5",
    "project_urls": {
        "Bug Tracker": "https://github.com/izzet/streamz-zmq/issues",
        "Documentation": "https://github.com/izzet/streamz-zmq#readme",
        "Homepage": "https://github.com/izzet/streamz-zmq",
        "Repository": "https://github.com/izzet/streamz-zmq"
    },
    "split_keywords": [
        "data-processing",
        " real-time",
        " streaming",
        " zeromq",
        " zmq"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "1d44da2a9cf1bad781f084a01dd4daf95218027ad25b1be2255eab47a73fc534",
                "md5": "4ae027acb42cb823df6fcdc0a114a3dc",
                "sha256": "a006a072109f68e5a4617ddd45b26c1f8f3fbc39526131f09c1dc83f9591aa97"
            },
            "downloads": -1,
            "filename": "streamz_zmq-0.1.5-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "4ae027acb42cb823df6fcdc0a114a3dc",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">3.8",
            "size": 7615,
            "upload_time": "2025-09-02T15:44:22",
            "upload_time_iso_8601": "2025-09-02T15:44:22.430760Z",
            "url": "https://files.pythonhosted.org/packages/1d/44/da2a9cf1bad781f084a01dd4daf95218027ad25b1be2255eab47a73fc534/streamz_zmq-0.1.5-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "d8d8117d077de8fdbbc65bc24f7cc41276e3199877bb959c83bac26a9a9dd0f2",
                "md5": "1d0b954b6cb1eabd38e928a0f864ec99",
                "sha256": "7f9381a49a801aeb1898fe5166a3d26629b265dc46d8e34cc1750b56a258aa5a"
            },
            "downloads": -1,
            "filename": "streamz_zmq-0.1.5.tar.gz",
            "has_sig": false,
            "md5_digest": "1d0b954b6cb1eabd38e928a0f864ec99",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">3.8",
            "size": 48996,
            "upload_time": "2025-09-02T15:44:24",
            "upload_time_iso_8601": "2025-09-02T15:44:24.247030Z",
            "url": "https://files.pythonhosted.org/packages/d8/d8/117d077de8fdbbc65bc24f7cc41276e3199877bb959c83bac26a9a9dd0f2/streamz_zmq-0.1.5.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-09-02 15:44:24",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "izzet",
    "github_project": "streamz-zmq",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "streamz-zmq"
}
        
Elapsed time: 2.62689s