# Rembus for Python
[](https://github.com/cardo-org/rembus.python/actions/workflows/CI.yml?query=branch%3Amain)
[](https://codecov.io/gh/cardo-org/rembus.python)
Rembus is a Pub/Sub and RPC middleware.
There are few key concepts to get confident with Rembus:
- A Component is a distributed application that communicate with Pub/Sub or RPC styles;
- A Component connect to a Broker;
- A Broker dispatch messages between Components;
- A Component expose RPC services and/or subscribe to Pub/Sub topics;
- A Component make RPC requests and/or publish messages to Pub/Sub topics;
This API version supports only the WebSocket protocol.
## Getting Started
Start the [Rembus](https://cardo-org.github.io/Rembus.python/stable/) broker.
Install the package:
```shell
pip install rembus
```
```python
import rembus
rb = rembus.node()
rb.publish({'name': 'sensor_1', 'metric': 'T', 'value':21.6})
rb.close()
```
or call `component("myname")` for the asynchronous Python API:
```python
import asyncio
import rembus
async def main():
rb = await rembus.component("myname")
await rb.publish("mytopic", {'name': 'sensor_1','metric': 'T','value':21.6})
await rb.close()
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
```
## Initialize a Component
Currently the Python API provides the WebSocket protocol for connecting to the Rembus broker.
The url argument of the `component` function define the component identity and the broker endpoint to connect:
```python
import rembus
# Broker endpoint and named component
rb = await rembus.component('ws://hostname:port/component_name')
# Broker endpoint and anonymous component
rb = await rembus.component('ws://hostname:port')
# Default broker and named component
rb = await rembus.component('component_name')
# Default broker and anonymous component
rb = await rembus.component()
```
The `component` builder function returns a Rembus handler that will be used for interacting with the components via Pub/Sub and RPC messages.
`component_name` is the unique name that assert the component identity between online sessions (connect/disconnect windows).
`component_name` is optional: if it is missing then a random identifier that changes at each connection event is used as the component identifier. In this case the broker is unable to bind the component to a persistent twin and messages published when the component is offline get not broadcasted to the component when it gets online again.
The default broker endpoint is set by `REMBUS_BASE_URL` environment variable and default to `ws://127.0.0.1:8000`.
## Pub/Sub example
A message is published with `publish` function.
```python
rb.publish('mytopic', arg_1, arg_2, ..., arg_N)
```
Where the arguments `arg_i` comprise the message data payload that gets received by the subscribed components.
A subscribed component interested to the topic `mytopic` have to define a function named as the topic of interest and with the same numbers of arguments:
```python
# do something each time a message published to topic mytopic is published
def mytopic(arg_1, arg_2, ..., arg_N):
...
rb.subscribe(mytopic)
rb.wait()
```
The first argument to `subscribe` is the function, named as the topic of interest, that will be called each time a message is published.
The optional second argument of `subscribe` define the "retroactive" feature of the
subscribed topic.
If the second argument is `True` then the messages published when the component is offline will be delivered as soon as the component will get online again, otherwise
the messages published before connecting will be lost.
> **NOTE**: To cache messages for an offline component the broker needs to know that such component has subscribed for a specific topic. This imply that messages published before the first subscribe happens will be lost. If you want all message will be delivered subscribe first and publish after.
## RPC example
A RPC service is implemented with a function named as the exposed service.
```python
import rembus as rembus
def add(x,y):
return x+y
rb = rembus.node('calculator')
rb.expose(add)
rb.wait()
```
The `calculator` component expose the `add` service, the RPC client will invoke as:
```python
import rembus as rembus
rb = rembus.node()
result = rb.rpc('add', 1, 2)
```
The asynchronous client and server implementations will be something like:
```python
#server.py
import asyncio
import rembus
async def add(x, y):
return x+y
async def main():
rb = await rembus.component()
await rb.expose(add)
await rb.wait()
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
```
```python
# client.py
import asyncio
import rembus
async def main():
rb = await rembus.component()
result = await rb.rpc('add', 1, 2)
print(f'result={result}')
await rb.close()
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
```
## Test
```shell
pytest --cov=rembus --cov-report=lcov:lcov.info
```
Raw data
{
"_id": null,
"home_page": null,
"name": "rembus",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.12",
"maintainer_email": null,
"keywords": "rembus, rpc, publish, subscribe, websocket, cbor",
"author": null,
"author_email": "Attilio Don\u00e0 <attilio.dona@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/35/a8/a6c66451c47c486cf5b65c67fd0bbedc9390288be8bfcf44be72088f89fe/rembus-0.5.0.tar.gz",
"platform": null,
"description": "# Rembus for Python\n\n[](https://github.com/cardo-org/rembus.python/actions/workflows/CI.yml?query=branch%3Amain)\n[](https://codecov.io/gh/cardo-org/rembus.python)\n\nRembus is a Pub/Sub and RPC middleware.\n\nThere are few key concepts to get confident with Rembus:\n\n- A Component is a distributed application that communicate with Pub/Sub or RPC styles;\n- A Component connect to a Broker;\n- A Broker dispatch messages between Components;\n- A Component expose RPC services and/or subscribe to Pub/Sub topics;\n- A Component make RPC requests and/or publish messages to Pub/Sub topics;\n\nThis API version supports only the WebSocket protocol.\n\n## Getting Started\n\nStart the [Rembus](https://cardo-org.github.io/Rembus.python/stable/) broker.\n\nInstall the package:\n\n```shell\npip install rembus\n```\n\n```python\nimport rembus\n\nrb = rembus.node()\nrb.publish({'name': 'sensor_1', 'metric': 'T', 'value':21.6})\nrb.close()\n```\n\nor call `component(\"myname\")` for the asynchronous Python API:\n\n```python\nimport asyncio\nimport rembus\n\nasync def main():\n rb = await rembus.component(\"myname\")\n await rb.publish(\"mytopic\", {'name': 'sensor_1','metric': 'T','value':21.6})\n await rb.close()\n\n\nloop = asyncio.new_event_loop()\nloop.run_until_complete(main())\n\n```\n\n## Initialize a Component\n\nCurrently the Python API provides the WebSocket protocol for connecting to the Rembus broker.\n\nThe url argument of the `component` function define the component identity and the broker endpoint to connect:\n\n```python\nimport rembus\n\n# Broker endpoint and named component\nrb = await rembus.component('ws://hostname:port/component_name')\n\n# Broker endpoint and anonymous component \nrb = await rembus.component('ws://hostname:port')\n\n# Default broker and named component \nrb = await rembus.component('component_name')\n\n# Default broker and anonymous component \nrb = await rembus.component()\n```\n\nThe `component` builder function returns a Rembus handler that will be used for interacting with the components via Pub/Sub and RPC messages.\n\n`component_name` is the unique name that assert the component identity between online sessions (connect/disconnect windows).\n\n`component_name` is optional: if it is missing then a random identifier that changes at each connection event is used as the component identifier. In this case the broker is unable to bind the component to a persistent twin and messages published when the component is offline get not broadcasted to the component when it gets online again.\n\nThe default broker endpoint is set by `REMBUS_BASE_URL` environment variable and default to `ws://127.0.0.1:8000`.\n\n## Pub/Sub example\n\nA message is published with `publish` function.\n\n```python\nrb.publish('mytopic', arg_1, arg_2, ..., arg_N)\n```\n\nWhere the arguments `arg_i` comprise the message data payload that gets received by the subscribed components.\n\nA subscribed component interested to the topic `mytopic` have to define a function named as the topic of interest and with the same numbers of arguments:\n\n```python\n# do something each time a message published to topic mytopic is published\ndef mytopic(arg_1, arg_2, ..., arg_N):\n ...\n\nrb.subscribe(mytopic)\n\nrb.wait()\n```\n\nThe first argument to `subscribe` is the function, named as the topic of interest, that will be called each time a message is published.\n\nThe optional second argument of `subscribe` define the \"retroactive\" feature of the\nsubscribed topic.\n\nIf the second argument is `True` then the messages published when the component is offline will be delivered as soon as the component will get online again, otherwise\nthe messages published before connecting will be lost.\n\n> **NOTE**: To cache messages for an offline component the broker needs to know that such component has subscribed for a specific topic. This imply that messages published before the first subscribe happens will be lost. If you want all message will be delivered subscribe first and publish after. \n\n## RPC example\n\nA RPC service is implemented with a function named as the exposed service.\n\n```python\nimport rembus as rembus\n\ndef add(x,y):\n return x+y\n\nrb = rembus.node('calculator')\n\nrb.expose(add)\n\nrb.wait()\n```\n\nThe `calculator` component expose the `add` service, the RPC client will invoke as:\n\n```python\nimport rembus as rembus\n\nrb = rembus.node()\nresult = rb.rpc('add', 1, 2)\n```\n\nThe asynchronous client and server implementations will be something like:\n\n```python\n#server.py\nimport asyncio\nimport rembus\n\nasync def add(x, y):\n return x+y\n\nasync def main():\n rb = await rembus.component()\n \n await rb.expose(add)\n await rb.wait()\n\nloop = asyncio.new_event_loop()\nloop.run_until_complete(main())\n```\n\n```python\n# client.py\nimport asyncio\nimport rembus\n\nasync def main():\n rb = await rembus.component()\n result = await rb.rpc('add', 1, 2)\n print(f'result={result}')\n await rb.close()\n\n\nloop = asyncio.new_event_loop()\nloop.run_until_complete(main())\n```\n\n## Test\n\n```shell\npytest --cov=rembus --cov-report=lcov:lcov.info\n```\n",
"bugtrack_url": null,
"license": null,
"summary": "Rembus for python",
"version": "0.5.0",
"project_urls": {
"Bug Tracker": "https://github.com/cardo-org/rembus.python/issues",
"Homepage": "https://github.com/cardo-org/rembus.python"
},
"split_keywords": [
"rembus",
" rpc",
" publish",
" subscribe",
" websocket",
" cbor"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "fdf0e3dad687f19cfad26237318b66baf5492078a564c9db32cf15cc602b699c",
"md5": "6e87b5042656cc373e22a43e815c8a8f",
"sha256": "678195c51f58d283d782cf78f6149c81a536fe7d5bab2ad6d46ac08734d730e4"
},
"downloads": -1,
"filename": "rembus-0.5.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "6e87b5042656cc373e22a43e815c8a8f",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.12",
"size": 33003,
"upload_time": "2025-10-16T08:34:32",
"upload_time_iso_8601": "2025-10-16T08:34:32.848512Z",
"url": "https://files.pythonhosted.org/packages/fd/f0/e3dad687f19cfad26237318b66baf5492078a564c9db32cf15cc602b699c/rembus-0.5.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "35a8a6c66451c47c486cf5b65c67fd0bbedc9390288be8bfcf44be72088f89fe",
"md5": "0c68fceacd412f2ab7b529039442e273",
"sha256": "e5b58232b021320a11bddfcccad89661c7550bdf7955e37b7142b5ce0ba6b8b2"
},
"downloads": -1,
"filename": "rembus-0.5.0.tar.gz",
"has_sig": false,
"md5_digest": "0c68fceacd412f2ab7b529039442e273",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.12",
"size": 33939,
"upload_time": "2025-10-16T08:34:33",
"upload_time_iso_8601": "2025-10-16T08:34:33.689407Z",
"url": "https://files.pythonhosted.org/packages/35/a8/a6c66451c47c486cf5b65c67fd0bbedc9390288be8bfcf44be72088f89fe/rembus-0.5.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-10-16 08:34:33",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "cardo-org",
"github_project": "rembus.python",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "rembus"
}