# streamz-zmq
[](https://badge.fury.io/py/streamz-zmq)
[](https://github.com/izzet/streamz-zmq/releases)
[](https://www.python.org/downloads/)
ZeroMQ integration for [streamz](https://streamz.readthedocs.io/) - enabling high-performance streaming data processing with distributed messaging.
## Features
- **ZMQ Source (`from_zmq`)**: Receive data streams from ZeroMQ publishers
- **ZMQ Sink (`to_zmq`)**: Send processed data to ZeroMQ sockets
- **Async Support**: Built with asyncio for high-performance streaming
- **Multiple Patterns**: Support for PUB/SUB, PUSH/PULL, and other ZMQ patterns
- **Seamless Integration**: Extends streamz with familiar API patterns
## Installation
```bash
pip install streamz-zmq
```
Or with uv:
```bash
uv add streamz-zmq
```
## Quick Start
### Receiving data from ZMQ (Source)
```python
from streamz import Stream
import streamz_zmq # Register the ZMQ extensions
# Create a stream that receives from a ZMQ publisher
source = Stream.from_zmq("tcp://localhost:5555")
source.sink(print) # Print received messages
# Start the stream
source.start()
```
### Sending data to ZMQ (Sink)
```python
from streamz import Stream
import streamz_zmq # Register the ZMQ extensions
# Create a stream and send results to ZMQ
source = Stream.from_iterable([1, 2, 3, 4, 5])
source.map(lambda x: x * 2).to_zmq("tcp://*:5556")
# Start the stream
source.start()
```
### Complete Example: Pipeline with ZMQ
```python
import asyncio
from streamz import Stream
import streamz_zmq
async def main():
# Receive from one ZMQ socket, process, send to another
source = Stream.from_zmq("tcp://localhost:5555")
processed = (source
.map(lambda x: x.decode('utf-8')) # Decode bytes
.map(str.upper) # Process data
.map(str.encode)) # Encode back to bytes
processed.to_zmq("tcp://*:5556")
# Start processing
await source.start()
if __name__ == "__main__":
asyncio.run(main())
```
## Examples
Check out the `examples/` directory for demonstrations:
- **`simple_example.py`**: Basic example showing ZMQ publisher thread + streamz subscriber
- **`comprehensive_example.py`**: Advanced demonstration showing multiple ZMQ patterns:
- **PUB/SUB**: Publisher broadcasts weather updates to topic-specific subscribers
- **PUSH/PULL**: Load balancing work distribution across multiple workers
- **Pipeline**: Multi-stage data processing pipeline
Run the simple example:
```bash
uv run python examples/simple_example.py
```
Run the comprehensive example:
```bash
uv run python examples/comprehensive_example.py
```
## API Reference
### `Stream.from_zmq(connect_str, sock_type=zmq.SUB, subscribe=b"")`
Creates a stream source that receives messages from a ZMQ socket.
**Parameters:**
- `connect_str` (str): ZMQ connection string (e.g., "tcp://localhost:5555")
- `sock_type` (int, optional): ZMQ socket type. Defaults to `zmq.SUB`
- `subscribe` (bytes, optional): Subscription topic for SUB sockets. Defaults to `b""` (all messages)
### `stream.to_zmq(connect_str, sock_type=zmq.PUSH)`
Sends stream elements to a ZMQ socket.
**Parameters:**
- `connect_str` (str): ZMQ connection string (e.g., "tcp://*:5556")
- `sock_type` (int, optional): ZMQ socket type. Defaults to `zmq.PUSH`
## ZMQ Patterns Supported
- **PUB/SUB**: Publisher broadcasts to multiple subscribers
- **PUSH/PULL**: Load balancing across workers
- **REQ/REP**: Request-response (less common for streaming)
## Requirements
- Python 3.10+
- streamz >= 0.6.4
- pyzmq >= 27.0.0
## Development
```bash
# Clone the repository
git clone https://github.com/izzet/streamz-zmq.git
cd streamz-zmq
# Install with uv (uses uv.lock for reproducible builds)
uv sync --dev
# Set up pre-commit hooks (recommended)
uv run pre-commit install
# Run tests
uv run pytest
# Format code
uv run ruff format .
# Check linting
uv run ruff check .
# Build package
uv build
```
**Note**: This project uses `uv.lock` for reproducible dependency management. The lock file is committed to ensure all developers and CI/CD use identical dependency versions.
**Pre-commit hooks**: The project includes pre-commit hooks that automatically format code, check linting, and run tests before each commit to maintain code quality.
## License
MIT License. See [LICENSE](LICENSE) file for details.
## Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Raw data
{
"_id": null,
"home_page": null,
"name": "streamz-zmq",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.10",
"maintainer_email": "Izzet Yildirim <izzetcyildirim@gmail.com>",
"keywords": "data-processing, real-time, streaming, zeromq, zmq",
"author": null,
"author_email": "Izzet Yildirim <izzetcyildirim@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/df/71/8b9d07180f23eae876ab70910330fc913f3402ff68a9c1fdd7b785b055c2/streamz_zmq-0.1.0.tar.gz",
"platform": null,
"description": "# streamz-zmq\n\n[](https://badge.fury.io/py/streamz-zmq)\n[](https://github.com/izzet/streamz-zmq/releases)\n[](https://www.python.org/downloads/)\n\nZeroMQ integration for [streamz](https://streamz.readthedocs.io/) - enabling high-performance streaming data processing with distributed messaging.\n\n## Features\n\n- **ZMQ Source (`from_zmq`)**: Receive data streams from ZeroMQ publishers\n- **ZMQ Sink (`to_zmq`)**: Send processed data to ZeroMQ sockets \n- **Async Support**: Built with asyncio for high-performance streaming\n- **Multiple Patterns**: Support for PUB/SUB, PUSH/PULL, and other ZMQ patterns\n- **Seamless Integration**: Extends streamz with familiar API patterns\n\n## Installation\n\n```bash\npip install streamz-zmq\n```\n\nOr with uv:\n\n```bash\nuv add streamz-zmq\n```\n\n## Quick Start\n\n### Receiving data from ZMQ (Source)\n\n```python\nfrom streamz import Stream\nimport streamz_zmq # Register the ZMQ extensions\n\n# Create a stream that receives from a ZMQ publisher\nsource = Stream.from_zmq(\"tcp://localhost:5555\")\nsource.sink(print) # Print received messages\n\n# Start the stream\nsource.start()\n```\n\n### Sending data to ZMQ (Sink)\n\n```python\nfrom streamz import Stream\nimport streamz_zmq # Register the ZMQ extensions\n\n# Create a stream and send results to ZMQ\nsource = Stream.from_iterable([1, 2, 3, 4, 5])\nsource.map(lambda x: x * 2).to_zmq(\"tcp://*:5556\")\n\n# Start the stream\nsource.start()\n```\n\n### Complete Example: Pipeline with ZMQ\n\n```python\nimport asyncio\nfrom streamz import Stream\nimport streamz_zmq\n\nasync def main():\n # Receive from one ZMQ socket, process, send to another\n source = Stream.from_zmq(\"tcp://localhost:5555\")\n \n processed = (source\n .map(lambda x: x.decode('utf-8')) # Decode bytes\n .map(str.upper) # Process data\n .map(str.encode)) # Encode back to bytes\n \n processed.to_zmq(\"tcp://*:5556\")\n \n # Start processing\n await source.start()\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\n## Examples\n\nCheck out the `examples/` directory for demonstrations:\n\n- **`simple_example.py`**: Basic example showing ZMQ publisher thread + streamz subscriber\n- **`comprehensive_example.py`**: Advanced demonstration showing multiple ZMQ patterns:\n - **PUB/SUB**: Publisher broadcasts weather updates to topic-specific subscribers\n - **PUSH/PULL**: Load balancing work distribution across multiple workers \n - **Pipeline**: Multi-stage data processing pipeline\n\nRun the simple example:\n```bash\nuv run python examples/simple_example.py\n```\n\nRun the comprehensive example:\n```bash\nuv run python examples/comprehensive_example.py\n```\n\n## API Reference\n\n### `Stream.from_zmq(connect_str, sock_type=zmq.SUB, subscribe=b\"\")`\n\nCreates a stream source that receives messages from a ZMQ socket.\n\n**Parameters:**\n- `connect_str` (str): ZMQ connection string (e.g., \"tcp://localhost:5555\")\n- `sock_type` (int, optional): ZMQ socket type. Defaults to `zmq.SUB`\n- `subscribe` (bytes, optional): Subscription topic for SUB sockets. Defaults to `b\"\"` (all messages)\n\n### `stream.to_zmq(connect_str, sock_type=zmq.PUSH)`\n\nSends stream elements to a ZMQ socket.\n\n**Parameters:**\n- `connect_str` (str): ZMQ connection string (e.g., \"tcp://*:5556\") \n- `sock_type` (int, optional): ZMQ socket type. Defaults to `zmq.PUSH`\n\n## ZMQ Patterns Supported\n\n- **PUB/SUB**: Publisher broadcasts to multiple subscribers\n- **PUSH/PULL**: Load balancing across workers\n- **REQ/REP**: Request-response (less common for streaming)\n\n## Requirements\n\n- Python 3.10+\n- streamz >= 0.6.4\n- pyzmq >= 27.0.0\n\n## Development\n\n```bash\n# Clone the repository\ngit clone https://github.com/izzet/streamz-zmq.git\ncd streamz-zmq\n\n# Install with uv (uses uv.lock for reproducible builds)\nuv sync --dev\n\n# Set up pre-commit hooks (recommended)\nuv run pre-commit install\n\n# Run tests\nuv run pytest\n\n# Format code\nuv run ruff format .\n\n# Check linting\nuv run ruff check .\n\n# Build package\nuv build\n```\n\n**Note**: This project uses `uv.lock` for reproducible dependency management. The lock file is committed to ensure all developers and CI/CD use identical dependency versions.\n\n**Pre-commit hooks**: The project includes pre-commit hooks that automatically format code, check linting, and run tests before each commit to maintain code quality.\n\n## License\n\nMIT License. See [LICENSE](LICENSE) file for details.\n\n## Contributing\n\nContributions are welcome! Please feel free to submit a Pull Request.",
"bugtrack_url": null,
"license": "MIT",
"summary": "ZeroMQ integration for streamz - high-performance streaming data processing",
"version": "0.1.0",
"project_urls": {
"Bug Tracker": "https://github.com/izzet/streamz-zmq/issues",
"Documentation": "https://github.com/izzet/streamz-zmq#readme",
"Homepage": "https://github.com/izzet/streamz-zmq",
"Repository": "https://github.com/izzet/streamz-zmq"
},
"split_keywords": [
"data-processing",
" real-time",
" streaming",
" zeromq",
" zmq"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "c592f0282fd08d63e5185aa026df82e181c9790578a4d65e176dc5e56658c728",
"md5": "615a7cfbdd33ad352a728d8ea4918569",
"sha256": "0972be6d9c37c0b6f43c4429f0a613d68a4d1c8543a79ab383e230e8e48a3d6d"
},
"downloads": -1,
"filename": "streamz_zmq-0.1.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "615a7cfbdd33ad352a728d8ea4918569",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.10",
"size": 6268,
"upload_time": "2025-07-24T16:51:27",
"upload_time_iso_8601": "2025-07-24T16:51:27.149591Z",
"url": "https://files.pythonhosted.org/packages/c5/92/f0282fd08d63e5185aa026df82e181c9790578a4d65e176dc5e56658c728/streamz_zmq-0.1.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "df718b9d07180f23eae876ab70910330fc913f3402ff68a9c1fdd7b785b055c2",
"md5": "815f07395021ea89514441c816a07a76",
"sha256": "2d70a7851bf69709d31491697247d9823077a062280f703cd8cf74ff614ed38e"
},
"downloads": -1,
"filename": "streamz_zmq-0.1.0.tar.gz",
"has_sig": false,
"md5_digest": "815f07395021ea89514441c816a07a76",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.10",
"size": 36565,
"upload_time": "2025-07-24T16:51:28",
"upload_time_iso_8601": "2025-07-24T16:51:28.428405Z",
"url": "https://files.pythonhosted.org/packages/df/71/8b9d07180f23eae876ab70910330fc913f3402ff68a9c1fdd7b785b055c2/streamz_zmq-0.1.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-24 16:51:28",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "izzet",
"github_project": "streamz-zmq",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "streamz-zmq"
}