[![PyPI](https://img.shields.io/pypi/v/zmq_tubes?color=green&style=plastic)](https://pypi.org/project/zmq-tubes/)
![PyPI - Python Version](https://img.shields.io/pypi/pyversions/zmq_tubes?style=plastic)
![License](https://img.shields.io/github/license/calcite/zmq_tubes?style=plastic)
# ZMQ Tubes
ZMQ Tubes is a managing system for ZMQ communication.
It can manage many ZMQ sockets by one interface.
The whole system is hierarchical, based on topics
(look at [MQTT topics](https://www.hivemq.com/blog/mqtt-essentials-part-5-mqtt-topics-best-practices/)).
## Classes
- **TubeMessage** - This class represents a request/response message.
Some types of tubes require a response in this format.
- **Tube** - This class wraps a ZMQ socket.
It represents a connection between client and server.
- **TubeMonitor** - The class can sniff of the ZMQTube communication.
- **TubeNode** - This represents an application interface for communication via tubes.
## Asyncio / Threading
The library support bot method. Asyncio from Python 3.7.
```python
from zmq_tubes import TubeNode, Tube # Asyncio classes
from zmq_tubes.threads import TubeNode, Tube # Threads classes
```
## Usage:
### Node definitions in yml file
We can define all tubes for one TubeNode by yml file.
Next examples require install these packages `PyYAML`, `pyzmq` and `zmq_tubes`.
#### Client service (asyncio example)
```yaml
# client.yml
tubes:
- name: Client REQ
addr: ipc:///tmp/req.pipe
tube_type: REQ
topics:
- foo/bar
- name: Client PUB
addr: ipc:///tmp/pub.pipe
tube_type: PUB
topics:
- foo/pub/#
```
```python
# client.py
import asyncio
import yaml
from zmq_tubes import TubeNode, TubeMessage
async def run():
with open('client.yml', 'r+') as fd:
schema = yaml.safe_load(fd)
node = TubeNode(schema=schema)
async with node:
print(await node.request('foo/bar', 'message 1'))
await node.publish('foo/pub/test', 'message 2')
if __name__ == "__main__":
asyncio.run(run())
```
```shell
> python client.py
topic: foo/bar, payload: response
```
#### Server service (threads example)
```yaml
# server.yml
tubes:
- name: server ROUTER
addr: ipc:///tmp/req.pipe
tube_type: ROUTER
server: True
topics:
- foo/bar
- name: server SUB
addr: ipc:///tmp/pub.pipe
tube_type: SUB
server: True
topics:
- foo/pub/#
```
```python
# server.py
import yaml
from zmq_tubes.threads import TubeNode, TubeMessage
def handler(request: TubeMessage):
print(request.payload)
if request.tube.tube_type_name == 'ROUTER':
return request.create_response('response')
def run():
with open('server.yml', 'r+') as fd:
schema = yaml.safe_load(fd)
node = TubeNode(schema=schema)
node.register_handler('foo/#', handler)
with node:
node.start().join()
if __name__ == "__main__":
run()
```
```shell
> python server.py
message 1
message 2
```
### YAML definition
The yaml file starts with a root element `tubes`, which contains list of all our tube definitions.
- `name` - string - name of the tube.
- `addr` - string - connection or bind address in format `transport://address` (see more http://api.zeromq.org/2-1:zmq-connect)
- `server` - bool - is this tube server side (bind to `addr`) or client side (connect to `addr`)
- `tube_type` - string - type of this tube (see more https://zguide.zeromq.org/docs/chapter2/#Messaging-Patterns)
- `identity` - string - (optional) we can setup custom tube identity
- `utf8_decoding` - bool - (default = True), if this is True, the payload is automatically UTF8 decode.
- `sockopts` - dict - (optional) we can setup sockopts for this tube (see more http://api.zeromq.org/4-2:zmq-setsockopt)
- `monitor` - string - (optional) bind address of tube monitor (see more [Debugging / Monitoring](#debugging-/-monitoring))
### Request / Response
This is a simple scenario, the server processes the requests serially.
#### Server:
```python
from zmq_tubes import Tube, TubeNode, TubeMessage
async def handler(request: TubeMessage):
print(request.payload)
return 'answer'
# or return request.create_response('response')
tube = Tube(
name='Server',
addr='ipc:///tmp/req_resp.pipe',
server=True,
tube_type='REP'
)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'question'
```
#### Client:
```python
from zmq_tubes import Tube, TubeNode
tube = Tube(
name='Client',
addr='ipc:///tmp/req_resp.pipe',
tube_type='REQ'
)
node = TubeNode()
node.register_tube(tube, 'test/#')
response = await node.request('test/xxx', 'question')
print(response.payload)
# output: 'answer'
```
The method `request` accepts the optional parameter `utf8_decoding`. When we set this parameter to `False` in previous
example, the returned payload is not automatically decoded, we get bytes.
### Subscribe / Publisher
#### Server:
```python
from zmq_tubes import Tube, TubeNode, TubeMessage
async def handler(request: TubeMessage):
print(request.payload)
tube = Tube(
name='Server',
addr='ipc:///tmp/sub_pub.pipe',
server=True,
tube_type='SUB'
)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'message'
```
#### Client:
```python
from zmq_tubes import Tube, TubeNode
tube = Tube(
name='Client',
addr='ipc:///tmp/sub_pub.pipe',
tube_type='PUB'
)
# In the case of publishing, the first message is very often
# lost. The workaround is to connect the tube manually as soon as possible.
tube.connect()
node = TubeNode()
node.register_tube(tube, 'test/#')
node.publish('test/xxx', 'message')
```
### Request / Router
The server is asynchronous. It means it is able to process
more requests at the same time.
#### Server:
```python
import asyncio
from zmq_tubes import Tube, TubeNode, TubeMessage
async def handler(request: TubeMessage):
print(request.payload)
if request.payload == 'wait':
await asyncio.sleep(10)
return request.create_response(request.payload)
tube = Tube(
name='Server',
addr='ipc:///tmp/req_router.pipe',
server=True,
tube_type='ROUTER'
)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'wait'
# output: 'message'
```
#### Client:
```python
import asyncio
from zmq_tubes import Tube, TubeNode
tube = Tube(
name='Client',
addr='ipc:///tmp/req_router.pipe',
tube_type='REQ'
)
async def task(node, text):
print(await node.request('test/xxx', text))
node = TubeNode()
node.register_tube(tube, 'test/#')
asyncio.create_task(task(node, 'wait'))
asyncio.create_task(task(node, 'message'))
# output: 'message'
# output: 'wait'
```
### Dealer / Response
The client is asynchronous. It means it is able to send
more requests at the same time.
#### Server:
```python
from zmq_tubes import Tube, TubeNode, TubeMessage
async def handler(request: TubeMessage):
print(request.payload)
return 'response'
# or return requset.create_response('response')
tube = Tube(
name='Server',
addr='ipc:///tmp/dealer_resp.pipe',
server=True,
tube_type='REP'
)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'message'
```
#### Client:
```python
from zmq_tubes import Tube, TubeNode, TubeMessage
tube = Tube(
name='Client',
addr='ipc:///tmp/dealer_resp.pipe',
tube_type='DEALER'
)
async def handler(response: TubeMessage):
print(response.payload)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.send('test/xxx', 'message')
# output: 'response'
```
### Dealer / Router
The client and server are asynchronous. It means it is able to send and process
more requests/responses at the same time.
#### Server:
```python
import asyncio
from zmq_tubes import Tube, TubeNode, TubeMessage
async def handler(request: TubeMessage):
print(request.payload)
if request.payload == 'wait':
await asyncio.sleep(10)
return request.create_response(request.payload)
tube = Tube(
name='Server',
addr='ipc:///tmp/dealer_router.pipe',
server=True,
tube_type='ROUTER'
)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'wait'
# output: 'message'
```
#### Client:
```python
from zmq_tubes import Tube, TubeNode, TubeMessage
tube = Tube(
name='Client',
addr='ipc:///tmp/dealer_router.pipe',
tube_type='DEALER'
)
async def handler(response: TubeMessage):
print(response.payload)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.send('test/xxx', 'wait')
await node.send('test/xxx', 'message')
# output: 'message'
# output: 'wait'
```
### Dealer / Dealer
The client and server are asynchronous. It means it is able to send and process
more requests/responses at the same time.
#### Server:
```python
from zmq_tubes import Tube, TubeNode, TubeMessage
tube = Tube(
name='Server',
addr='ipc:///tmp/dealer_dealer.pipe',
server=True,
tube_type='DEALER'
)
async def handler(response: TubeMessage):
print(response.payload)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.send('test/xxx', 'message from server')
# output: 'message from client'
```
#### Client:
```python
from zmq_tubes import Tube, TubeNode, TubeMessage
tube = Tube(
name='Client',
addr='ipc:///tmp/dealer_dealer.pipe',
tube_type='DEALER'
)
async def handler(response: TubeMessage):
print(response.payload)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.send('test/xxx', 'message from client')
# output: 'message from server'
```
## Debugging / Monitoring
We can assign a monitor socket to our zmq tubes. By this monitor socket, we can sniff zmq communication or get a zmq tube
configuration.
```yaml
tubes:
- name: ServerRouter
addr: ipc:///tmp/router.pipe
monitor: ipc:///tmp/test.monitor
tube_type: ROUTER
server: yes
topics:
- foo/#
```
This is example of a yaml definition. We can use the same monitor socket for more tubes in the same tubeNode.
When we add the monitor attribute to our tube definition, the application automatically create a new socket monitor:
`/tmp/test.monitor`. Your application works as a server side. The logs are sent to the socket only for the time, when the monitoring
tool is running.
### Monitoring tool
After enabling of the monitoring in the application, we can use the monitoring tool for sniff.
```shell
# get the server tube configuration
> zmqtube-monitor get_schema ipc:///tmp/display.monitor
tubes:
- addr: ipc:///tmp/router.pipe
monitor: ipc:///tmp/test.monitor
name: ServerRouter
server: 'yes'
tube_type: ROUTER
# the log tube communication. Logs will be saved to dump.rec as well.
> zmqtube-monitor logs -d ./dump.rec ipc:///tmp/display.monitor
0.28026580810546875 ServerRouter < foo/test Request
0.0901789665222168 ServerRouter > foo/test Response
# The format of output
# <relative time> <tube name> <direction> <topic> <message>`
```
### Simulation of the client side
When we have a dump file (e.g. `dump.rec`), we can simulate the communication with our app.
The first step is prepare the mock client schema file.
For this, We can get the tube node configuration from our application and after that edit it.
```shell
> zmqtube-monitor get_schema ipc:///tmp/display.monitor > mock_schema.yaml
> vim mock_schema.yaml
...
# Now, we have to update the file mock_schema.yaml.
# We change configuration to the mock client configuration.
# The names of the tubes must be the same as are in your app.
# We can remove monitoring attribute and change server and
# tube_type attributes. In this mock file, the topics are not
# required, they are ignored.
> cat mock_schema.yaml
tubes:
- addr: ipc:///tmp/router.pipe
name: ServerRouter
tube_type: REQ
```
Now, we can start the simulation of the client communication.
```shell
> zmqtube-monitor simulate mock_schema.yaml dump.rec
```
If the response of our app is not the same as tool expects (the response saved in dump file), then
the monitoring tool warns us.
We can modify speed of the simulation by the parameter `--speed`.
In the default configuration, is the simulation run the same
speed as original communication (parameter `--speed=1`).
| Speed | description |
| :-: | :- |
| 0 | no blocking simulation |
| 0.5 | twice faster than original |
| 1 | original speed |
| 2 | twice slower than original |
### Example of programming declaration of the monitoring.
```python
import zmq
from zmq_tubes.threads import Tube, TubeNode, TubeMessage, TubeMonitor
def handler(request: TubeMessage):
print(request.payload)
return request.create_response('response')
resp_tube = Tube(
name='REP',
addr='ipc:///tmp/rep.pipe',
server='yes',
tube_type=zmq.REP
)
req_tube = Tube(
name='REQ',
addr='ipc:///tmp/rep.pipe',
tube_type=zmq.REQ
)
node = TubeNode()
node.register_tube(resp_tube, f"foo/#")
node.register_tube(req_tube, f"foo/#")
node.register_handler(f"foo/#", handler)
node.register_monitor(resp_tube, TubeMonitor(addr='ipc:///tmp/test.monitor'))
with node:
print(node.request('foo/xxx', 'message 2'))
```
Raw data
{
"_id": null,
"home_page": "https://github.com/calcite/zmq_tubes",
"name": "zmq-tubes",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.6,<4.0",
"maintainer_email": "",
"keywords": "zmq,mqtt,tubes,zmq_tubes",
"author": "Martin Korbel",
"author_email": "mkorbel@alps.cz",
"download_url": "https://files.pythonhosted.org/packages/e4/a0/a0dd8e32ed1e4292b451473cb356ba5829823b38a98f5ab144fecaae4ba2/zmq_tubes-1.14.0.tar.gz",
"platform": null,
"description": "[![PyPI](https://img.shields.io/pypi/v/zmq_tubes?color=green&style=plastic)](https://pypi.org/project/zmq-tubes/)\n![PyPI - Python Version](https://img.shields.io/pypi/pyversions/zmq_tubes?style=plastic)\n![License](https://img.shields.io/github/license/calcite/zmq_tubes?style=plastic)\n# ZMQ Tubes\n\nZMQ Tubes is a managing system for ZMQ communication. \nIt can manage many ZMQ sockets by one interface. \nThe whole system is hierarchical, based on topics \n(look at [MQTT topics](https://www.hivemq.com/blog/mqtt-essentials-part-5-mqtt-topics-best-practices/)).\n\n## Classes\n- **TubeMessage** - This class represents a request/response message. \n Some types of tubes require a response in this format.\n- **Tube** - This class wraps a ZMQ socket. \n It represents a connection between client and server.\n- **TubeMonitor** - The class can sniff of the ZMQTube communication.\n- **TubeNode** - This represents an application interface for communication via tubes.\n\n\n## Asyncio / Threading\nThe library support bot method. Asyncio from Python 3.7.\n\n```python\nfrom zmq_tubes import TubeNode, Tube # Asyncio classes\nfrom zmq_tubes.threads import TubeNode, Tube # Threads classes\n```\n\n\n## Usage:\n\n### Node definitions in yml file \nWe can define all tubes for one TubeNode by yml file.\nNext examples require install these packages `PyYAML`, `pyzmq` and `zmq_tubes`.\n#### Client service (asyncio example)\n```yaml\n# client.yml\ntubes:\n - name: Client REQ\n addr: ipc:///tmp/req.pipe\n tube_type: REQ\n topics:\n - foo/bar\n \n - name: Client PUB\n addr: ipc:///tmp/pub.pipe\n tube_type: PUB\n topics:\n - foo/pub/#\n```\n\n```python\n# client.py\nimport asyncio\nimport yaml\nfrom zmq_tubes import TubeNode, TubeMessage\n\n\nasync def run():\n with open('client.yml', 'r+') as fd:\n schema = yaml.safe_load(fd)\n node = TubeNode(schema=schema) \n async with node:\n print(await node.request('foo/bar', 'message 1'))\n await node.publish('foo/pub/test', 'message 2')\n\nif __name__ == \"__main__\":\n asyncio.run(run())\n```\n```shell\n> python client.py\ntopic: foo/bar, payload: response\n```\n\n\n#### Server service (threads example)\n```yaml\n# server.yml\ntubes:\n - name: server ROUTER\n addr: ipc:///tmp/req.pipe\n tube_type: ROUTER\n server: True\n topics:\n - foo/bar\n \n - name: server SUB\n addr: ipc:///tmp/pub.pipe\n tube_type: SUB\n server: True\n topics:\n - foo/pub/#\n```\n\n```python\n# server.py\nimport yaml\nfrom zmq_tubes.threads import TubeNode, TubeMessage\n\n\ndef handler(request: TubeMessage):\n print(request.payload)\n if request.tube.tube_type_name == 'ROUTER': \n return request.create_response('response')\n\n\ndef run():\n with open('server.yml', 'r+') as fd:\n schema = yaml.safe_load(fd)\n node = TubeNode(schema=schema)\n node.register_handler('foo/#', handler)\n with node:\n node.start().join()\n\nif __name__ == \"__main__\":\n run()\n```\n\n```shell\n> python server.py\nmessage 1\nmessage 2\n```\n\n### YAML definition\n\nThe yaml file starts with a root element `tubes`, which contains list of all our tube definitions.\n- `name` - string - name of the tube.\n- `addr` - string - connection or bind address in format `transport://address` (see more http://api.zeromq.org/2-1:zmq-connect)\n- `server` - bool - is this tube server side (bind to `addr`) or client side (connect to `addr`) \n- `tube_type` - string - type of this tube (see more https://zguide.zeromq.org/docs/chapter2/#Messaging-Patterns)\n- `identity` - string - (optional) we can setup custom tube identity\n- `utf8_decoding` - bool - (default = True), if this is True, the payload is automatically UTF8 decode.\n- `sockopts` - dict - (optional) we can setup sockopts for this tube (see more http://api.zeromq.org/4-2:zmq-setsockopt)\n- `monitor` - string - (optional) bind address of tube monitor (see more [Debugging / Monitoring](#debugging-/-monitoring))\n\n\n### Request / Response\nThis is a simple scenario, the server processes the requests serially.\n#### Server:\n\n```python\nfrom zmq_tubes import Tube, TubeNode, TubeMessage\n\n\nasync def handler(request: TubeMessage):\n print(request.payload)\n return 'answer'\n # or return request.create_response('response')\n\n\ntube = Tube(\n name='Server',\n addr='ipc:///tmp/req_resp.pipe',\n server=True,\n tube_type='REP'\n)\n\nnode = TubeNode()\nnode.register_tube(tube, 'test/#')\nnode.register_handler('test/#', handler)\nawait node.start()\n\n# output: 'question'\n```\n\n#### Client:\n```python\nfrom zmq_tubes import Tube, TubeNode\n\ntube = Tube(\n name='Client',\n addr='ipc:///tmp/req_resp.pipe',\n tube_type='REQ'\n)\n\nnode = TubeNode()\nnode.register_tube(tube, 'test/#')\nresponse = await node.request('test/xxx', 'question')\nprint(response.payload)\n# output: 'answer'\n```\nThe method `request` accepts the optional parameter `utf8_decoding`. When we set this parameter to `False` in previous\nexample, the returned payload is not automatically decoded, we get bytes.\n\n\n### Subscribe / Publisher\n#### Server:\n\n```python\nfrom zmq_tubes import Tube, TubeNode, TubeMessage\n\n\nasync def handler(request: TubeMessage):\n print(request.payload)\n\n\ntube = Tube(\n name='Server',\n addr='ipc:///tmp/sub_pub.pipe',\n server=True,\n tube_type='SUB'\n)\n\nnode = TubeNode()\nnode.register_tube(tube, 'test/#')\nnode.register_handler('test/#', handler)\nawait node.start()\n# output: 'message'\n```\n\n#### Client:\n\n```python\nfrom zmq_tubes import Tube, TubeNode\n\ntube = Tube(\n name='Client',\n addr='ipc:///tmp/sub_pub.pipe',\n tube_type='PUB'\n)\n# In the case of publishing, the first message is very often\n# lost. The workaround is to connect the tube manually as soon as possible.\ntube.connect()\n\nnode = TubeNode()\nnode.register_tube(tube, 'test/#')\nnode.publish('test/xxx', 'message') \n```\n\n\n\n\n### Request / Router\nThe server is asynchronous. It means it is able to process \nmore requests at the same time.\n\n#### Server:\n\n```python\nimport asyncio\nfrom zmq_tubes import Tube, TubeNode, TubeMessage\n\n\nasync def handler(request: TubeMessage):\n print(request.payload)\n if request.payload == 'wait':\n await asyncio.sleep(10)\n return request.create_response(request.payload)\n\n\ntube = Tube(\n name='Server',\n addr='ipc:///tmp/req_router.pipe',\n server=True,\n tube_type='ROUTER'\n)\n\nnode = TubeNode()\nnode.register_tube(tube, 'test/#')\nnode.register_handler('test/#', handler)\nawait node.start()\n# output: 'wait'\n# output: 'message'\n```\n\n#### Client:\n\n```python\nimport asyncio\nfrom zmq_tubes import Tube, TubeNode\n\ntube = Tube(\n name='Client',\n addr='ipc:///tmp/req_router.pipe',\n tube_type='REQ'\n)\n\n\nasync def task(node, text):\n print(await node.request('test/xxx', text))\n\n\nnode = TubeNode()\nnode.register_tube(tube, 'test/#')\nasyncio.create_task(task(node, 'wait'))\nasyncio.create_task(task(node, 'message'))\n# output: 'message'\n# output: 'wait'\n```\n\n\n\n\n### Dealer / Response\nThe client is asynchronous. It means it is able to send \nmore requests at the same time.\n\n#### Server:\n\n```python\nfrom zmq_tubes import Tube, TubeNode, TubeMessage\n\n\nasync def handler(request: TubeMessage):\n print(request.payload)\n return 'response'\n # or return requset.create_response('response')\n\n\ntube = Tube(\n name='Server',\n addr='ipc:///tmp/dealer_resp.pipe',\n server=True,\n tube_type='REP'\n)\n\nnode = TubeNode()\nnode.register_tube(tube, 'test/#')\nnode.register_handler('test/#', handler)\nawait node.start()\n# output: 'message'\n```\n\n#### Client:\n\n```python\nfrom zmq_tubes import Tube, TubeNode, TubeMessage\n\ntube = Tube(\n name='Client',\n addr='ipc:///tmp/dealer_resp.pipe',\n tube_type='DEALER'\n)\n\n\nasync def handler(response: TubeMessage):\n print(response.payload)\n\n\nnode = TubeNode()\nnode.register_tube(tube, 'test/#')\nnode.register_handler('test/#', handler)\n\nawait node.send('test/xxx', 'message')\n\n# output: 'response'\n```\n\n\n\n### Dealer / Router\nThe client and server are asynchronous. It means it is able to send and process \nmore requests/responses at the same time.\n\n#### Server:\n\n```python\nimport asyncio\nfrom zmq_tubes import Tube, TubeNode, TubeMessage\n\n\nasync def handler(request: TubeMessage):\n print(request.payload)\n if request.payload == 'wait':\n await asyncio.sleep(10)\n return request.create_response(request.payload)\n\n\ntube = Tube(\n name='Server',\n addr='ipc:///tmp/dealer_router.pipe',\n server=True,\n tube_type='ROUTER'\n)\n\nnode = TubeNode()\nnode.register_tube(tube, 'test/#')\nnode.register_handler('test/#', handler)\nawait node.start()\n# output: 'wait'\n# output: 'message'\n```\n\n#### Client:\n\n```python\nfrom zmq_tubes import Tube, TubeNode, TubeMessage\n\ntube = Tube(\n name='Client',\n addr='ipc:///tmp/dealer_router.pipe',\n tube_type='DEALER'\n)\n\n\nasync def handler(response: TubeMessage):\n print(response.payload)\n\n\nnode = TubeNode()\nnode.register_tube(tube, 'test/#')\nnode.register_handler('test/#', handler)\n\nawait node.send('test/xxx', 'wait')\nawait node.send('test/xxx', 'message')\n\n# output: 'message'\n# output: 'wait'\n```\n\n\n\n### Dealer / Dealer\nThe client and server are asynchronous. It means it is able to send and process \nmore requests/responses at the same time.\n\n#### Server:\n\n```python\nfrom zmq_tubes import Tube, TubeNode, TubeMessage\n\ntube = Tube(\n name='Server',\n addr='ipc:///tmp/dealer_dealer.pipe',\n server=True,\n tube_type='DEALER'\n)\n\n\nasync def handler(response: TubeMessage):\n print(response.payload)\n\n\nnode = TubeNode()\nnode.register_tube(tube, 'test/#')\nnode.register_handler('test/#', handler)\n\nawait node.send('test/xxx', 'message from server')\n# output: 'message from client'\n```\n\n#### Client:\n\n```python\nfrom zmq_tubes import Tube, TubeNode, TubeMessage\n\ntube = Tube(\n name='Client',\n addr='ipc:///tmp/dealer_dealer.pipe',\n tube_type='DEALER'\n)\n\n\nasync def handler(response: TubeMessage):\n print(response.payload)\n\n\nnode = TubeNode()\nnode.register_tube(tube, 'test/#')\nnode.register_handler('test/#', handler)\n\nawait node.send('test/xxx', 'message from client')\n# output: 'message from server'\n```\n\n\n## Debugging / Monitoring\nWe can assign a monitor socket to our zmq tubes. By this monitor socket, we can sniff zmq communication or get a zmq tube\nconfiguration. \n```yaml\ntubes:\n - name: ServerRouter\n addr: ipc:///tmp/router.pipe\n monitor: ipc:///tmp/test.monitor \n tube_type: ROUTER\n server: yes \n topics:\n - foo/# \n```\nThis is example of a yaml definition. We can use the same monitor socket for more tubes in the same tubeNode.\nWhen we add the monitor attribute to our tube definition, the application automatically create a new socket monitor: \n`/tmp/test.monitor`. Your application works as a server side. The logs are sent to the socket only for the time, when the monitoring\ntool is running.\n\n### Monitoring tool\n\nAfter enabling of the monitoring in the application, we can use the monitoring tool for sniff. \n\n```shell\n# get the server tube configuration\n> zmqtube-monitor get_schema ipc:///tmp/display.monitor\n tubes:\n - addr: ipc:///tmp/router.pipe\n monitor: ipc:///tmp/test.monitor \n name: ServerRouter\n server: 'yes'\n tube_type: ROUTER\n\n# the log tube communication. Logs will be saved to dump.rec as well. \n> zmqtube-monitor logs -d ./dump.rec ipc:///tmp/display.monitor\n 0.28026580810546875 ServerRouter < foo/test Request\n 0.0901789665222168 ServerRouter > foo/test Response\n\n# The format of output\n# <relative time> <tube name> <direction> <topic> <message>` \n```\n\n### Simulation of the client side\nWhen we have a dump file (e.g. `dump.rec`), we can simulate the communication with our app.\nThe first step is prepare the mock client schema file.\nFor this, We can get the tube node configuration from our application and after that edit it. \n```shell\n> zmqtube-monitor get_schema ipc:///tmp/display.monitor > mock_schema.yaml\n> vim mock_schema.yaml\n... \n# Now, we have to update the file mock_schema.yaml. \n# We change configuration to the mock client configuration. \n# The names of the tubes must be the same as are in your app. \n# We can remove monitoring attribute and change server and \n# tube_type attributes. In this mock file, the topics are not \n# required, they are ignored. \n\n> cat mock_schema.yaml\ntubes:\n- addr: ipc:///tmp/router.pipe\n name: ServerRouter\n tube_type: REQ\n```\n\nNow, we can start the simulation of the client communication.\n```shell\n> zmqtube-monitor simulate mock_schema.yaml dump.rec\n```\nIf the response of our app is not the same as tool expects (the response saved in dump file), then \nthe monitoring tool warns us. \nWe can modify speed of the simulation by the parameter `--speed`.\n\nIn the default configuration, is the simulation run the same\nspeed as original communication (parameter `--speed=1`). \n\n| Speed | description |\n| :-: | :- |\n| 0 | no blocking simulation |\n| 0.5 | twice faster than original |\n| 1 | original speed |\n| 2 | twice slower than original |\n\n\n### Example of programming declaration of the monitoring.\n```python\nimport zmq\nfrom zmq_tubes.threads import Tube, TubeNode, TubeMessage, TubeMonitor\n\n\ndef handler(request: TubeMessage):\n print(request.payload)\n return request.create_response('response')\n\nresp_tube = Tube(\n name='REP',\n addr='ipc:///tmp/rep.pipe',\n server='yes',\n tube_type=zmq.REP\n)\n\nreq_tube = Tube(\n name='REQ',\n addr='ipc:///tmp/rep.pipe', \n tube_type=zmq.REQ\n)\n\nnode = TubeNode()\nnode.register_tube(resp_tube, f\"foo/#\")\nnode.register_tube(req_tube, f\"foo/#\")\nnode.register_handler(f\"foo/#\", handler)\n\nnode.register_monitor(resp_tube, TubeMonitor(addr='ipc:///tmp/test.monitor'))\n \nwith node:\n print(node.request('foo/xxx', 'message 2'))\n\n```\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Wrapper for ZMQ comunication.",
"version": "1.14.0",
"project_urls": {
"Documentation": "https://github.com/calcite/zmq_tubes",
"Homepage": "https://github.com/calcite/zmq_tubes",
"Repository": "https://github.com/calcite/zmq_tubes"
},
"split_keywords": [
"zmq",
"mqtt",
"tubes",
"zmq_tubes"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "815ecd9681c763fdafbfd518b998aa8afc573237644a46da1f40c8d3aade38d3",
"md5": "a5da0b229f7cdfb060e8d6a2c8da9d34",
"sha256": "8dee0d676268938810b5debde4f7adb14606f731061e57adb4e154d9c2a37a34"
},
"downloads": -1,
"filename": "zmq_tubes-1.14.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "a5da0b229f7cdfb060e8d6a2c8da9d34",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.6,<4.0",
"size": 18855,
"upload_time": "2023-07-25T09:14:32",
"upload_time_iso_8601": "2023-07-25T09:14:32.500373Z",
"url": "https://files.pythonhosted.org/packages/81/5e/cd9681c763fdafbfd518b998aa8afc573237644a46da1f40c8d3aade38d3/zmq_tubes-1.14.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "e4a0a0dd8e32ed1e4292b451473cb356ba5829823b38a98f5ab144fecaae4ba2",
"md5": "f9c533918582dd9c7f33a570f9980be1",
"sha256": "eaa2625ca00533e69755299ff9093dbfe45d43ec3c83b16e4be728d1ac673b2b"
},
"downloads": -1,
"filename": "zmq_tubes-1.14.0.tar.gz",
"has_sig": false,
"md5_digest": "f9c533918582dd9c7f33a570f9980be1",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.6,<4.0",
"size": 19126,
"upload_time": "2023-07-25T09:14:34",
"upload_time_iso_8601": "2023-07-25T09:14:34.125197Z",
"url": "https://files.pythonhosted.org/packages/e4/a0/a0dd8e32ed1e4292b451473cb356ba5829823b38a98f5ab144fecaae4ba2/zmq_tubes-1.14.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-07-25 09:14:34",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "calcite",
"github_project": "zmq_tubes",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "zmq-tubes"
}