rembus


Namerembus JSON
Version 0.5.0 PyPI version JSON
download
home_pageNone
SummaryRembus for python
upload_time2025-10-16 08:34:33
maintainerNone
docs_urlNone
authorNone
requires_python>=3.12
licenseNone
keywords rembus rpc publish subscribe websocket cbor
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Rembus for Python

[![Build Status](https://github.com/cardo-org/rembus.python/actions/workflows/python-app.yml/badge.svg?branch=main)](https://github.com/cardo-org/rembus.python/actions/workflows/CI.yml?query=branch%3Amain)
[![Coverage](https://codecov.io/gh/cardo-org/rembus.python/branch/main/graph/badge.svg)](https://codecov.io/gh/cardo-org/rembus.python)

Rembus is a Pub/Sub and RPC middleware.

There are few key concepts to get confident with Rembus:

- A Component is a distributed application that communicate with Pub/Sub or RPC styles;
- A Component connect to a Broker;
- A Broker dispatch messages between Components;
- A Component expose RPC services and/or subscribe to Pub/Sub topics;
- A Component make RPC requests and/or publish messages to Pub/Sub topics;

This API version supports only the WebSocket protocol.

## Getting Started

Start the [Rembus](https://cardo-org.github.io/Rembus.python/stable/) broker.

Install the package:

```shell
pip install rembus
```

```python
import rembus

rb = rembus.node()
rb.publish({'name': 'sensor_1', 'metric': 'T', 'value':21.6})
rb.close()
```

or call `component("myname")` for the asynchronous Python API:

```python
import asyncio
import rembus

async def main():
    rb = await rembus.component("myname")
    await rb.publish("mytopic", {'name': 'sensor_1','metric': 'T','value':21.6})
    await rb.close()


loop = asyncio.new_event_loop()
loop.run_until_complete(main())

```

## Initialize a Component

Currently the Python API provides the WebSocket protocol for connecting to the Rembus broker.

The url argument of the `component` function define the component identity and the broker endpoint to connect:

```python
import rembus

# Broker endpoint and named component
rb = await rembus.component('ws://hostname:port/component_name')

# Broker endpoint and anonymous component 
rb = await rembus.component('ws://hostname:port')

# Default broker and named component 
rb = await rembus.component('component_name')

# Default broker and anonymous component 
rb = await rembus.component()
```

The `component` builder function returns a Rembus handler that will be used for interacting with the components via Pub/Sub and RPC messages.

`component_name` is the unique name that assert the component identity between online sessions (connect/disconnect windows).

`component_name` is optional: if it is missing then a random identifier that changes at each connection event is used as the component identifier. In this case the broker is unable to bind the component to a persistent twin and messages published when the component is offline get not broadcasted to the component when it gets online again.

The default broker endpoint is set by `REMBUS_BASE_URL` environment variable and default to `ws://127.0.0.1:8000`.

## Pub/Sub example

A message is published with `publish` function.

```python
rb.publish('mytopic', arg_1, arg_2, ..., arg_N)
```

Where the arguments `arg_i` comprise the message data payload that gets received by the subscribed components.

A subscribed component interested to the topic `mytopic` have to define a function named as the topic of interest and with the same numbers of arguments:

```python
# do something each time a message published to topic mytopic is published
def mytopic(arg_1, arg_2, ..., arg_N):
    ...

rb.subscribe(mytopic)

rb.wait()
```

The first argument to `subscribe` is the function, named as the topic of interest, that will be called each time a message is published.

The optional second argument of `subscribe` define the "retroactive" feature of the
subscribed topic.

If the second argument is `True` then the messages published when the component is offline will be delivered as soon as the component will get online again, otherwise
the messages published before connecting will be lost.

> **NOTE**: To cache messages for an offline component the broker needs to know that such component has subscribed for a specific topic. This imply that messages published before the first subscribe happens will be lost. If you want all message will be delivered subscribe first and publish after.  

## RPC example

A RPC service is implemented with a function named as the exposed service.

```python
import rembus as rembus

def add(x,y):
    return x+y

rb = rembus.node('calculator')

rb.expose(add)

rb.wait()
```

The `calculator` component expose the `add` service, the RPC client will invoke as:

```python
import rembus as rembus

rb = rembus.node()
result = rb.rpc('add', 1, 2)
```

The asynchronous client and server implementations will be something like:

```python
#server.py
import asyncio
import rembus

async def add(x, y):
    return x+y

async def main():
    rb = await rembus.component()
    
    await rb.expose(add)
    await rb.wait()

loop = asyncio.new_event_loop()
loop.run_until_complete(main())
```

```python
# client.py
import asyncio
import rembus

async def main():
    rb = await rembus.component()
    result = await rb.rpc('add', 1, 2)
    print(f'result={result}')
    await rb.close()


loop = asyncio.new_event_loop()
loop.run_until_complete(main())
```

## Test

```shell
pytest --cov=rembus --cov-report=lcov:lcov.info
```

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "rembus",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.12",
    "maintainer_email": null,
    "keywords": "rembus, rpc, publish, subscribe, websocket, cbor",
    "author": null,
    "author_email": "Attilio Don\u00e0 <attilio.dona@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/35/a8/a6c66451c47c486cf5b65c67fd0bbedc9390288be8bfcf44be72088f89fe/rembus-0.5.0.tar.gz",
    "platform": null,
    "description": "# Rembus for Python\n\n[![Build Status](https://github.com/cardo-org/rembus.python/actions/workflows/python-app.yml/badge.svg?branch=main)](https://github.com/cardo-org/rembus.python/actions/workflows/CI.yml?query=branch%3Amain)\n[![Coverage](https://codecov.io/gh/cardo-org/rembus.python/branch/main/graph/badge.svg)](https://codecov.io/gh/cardo-org/rembus.python)\n\nRembus is a Pub/Sub and RPC middleware.\n\nThere are few key concepts to get confident with Rembus:\n\n- A Component is a distributed application that communicate with Pub/Sub or RPC styles;\n- A Component connect to a Broker;\n- A Broker dispatch messages between Components;\n- A Component expose RPC services and/or subscribe to Pub/Sub topics;\n- A Component make RPC requests and/or publish messages to Pub/Sub topics;\n\nThis API version supports only the WebSocket protocol.\n\n## Getting Started\n\nStart the [Rembus](https://cardo-org.github.io/Rembus.python/stable/) broker.\n\nInstall the package:\n\n```shell\npip install rembus\n```\n\n```python\nimport rembus\n\nrb = rembus.node()\nrb.publish({'name': 'sensor_1', 'metric': 'T', 'value':21.6})\nrb.close()\n```\n\nor call `component(\"myname\")` for the asynchronous Python API:\n\n```python\nimport asyncio\nimport rembus\n\nasync def main():\n    rb = await rembus.component(\"myname\")\n    await rb.publish(\"mytopic\", {'name': 'sensor_1','metric': 'T','value':21.6})\n    await rb.close()\n\n\nloop = asyncio.new_event_loop()\nloop.run_until_complete(main())\n\n```\n\n## Initialize a Component\n\nCurrently the Python API provides the WebSocket protocol for connecting to the Rembus broker.\n\nThe url argument of the `component` function define the component identity and the broker endpoint to connect:\n\n```python\nimport rembus\n\n# Broker endpoint and named component\nrb = await rembus.component('ws://hostname:port/component_name')\n\n# Broker endpoint and anonymous component \nrb = await rembus.component('ws://hostname:port')\n\n# Default broker and named component \nrb = await rembus.component('component_name')\n\n# Default broker and anonymous component \nrb = await rembus.component()\n```\n\nThe `component` builder function returns a Rembus handler that will be used for interacting with the components via Pub/Sub and RPC messages.\n\n`component_name` is the unique name that assert the component identity between online sessions (connect/disconnect windows).\n\n`component_name` is optional: if it is missing then a random identifier that changes at each connection event is used as the component identifier. In this case the broker is unable to bind the component to a persistent twin and messages published when the component is offline get not broadcasted to the component when it gets online again.\n\nThe default broker endpoint is set by `REMBUS_BASE_URL` environment variable and default to `ws://127.0.0.1:8000`.\n\n## Pub/Sub example\n\nA message is published with `publish` function.\n\n```python\nrb.publish('mytopic', arg_1, arg_2, ..., arg_N)\n```\n\nWhere the arguments `arg_i` comprise the message data payload that gets received by the subscribed components.\n\nA subscribed component interested to the topic `mytopic` have to define a function named as the topic of interest and with the same numbers of arguments:\n\n```python\n# do something each time a message published to topic mytopic is published\ndef mytopic(arg_1, arg_2, ..., arg_N):\n    ...\n\nrb.subscribe(mytopic)\n\nrb.wait()\n```\n\nThe first argument to `subscribe` is the function, named as the topic of interest, that will be called each time a message is published.\n\nThe optional second argument of `subscribe` define the \"retroactive\" feature of the\nsubscribed topic.\n\nIf the second argument is `True` then the messages published when the component is offline will be delivered as soon as the component will get online again, otherwise\nthe messages published before connecting will be lost.\n\n> **NOTE**: To cache messages for an offline component the broker needs to know that such component has subscribed for a specific topic. This imply that messages published before the first subscribe happens will be lost. If you want all message will be delivered subscribe first and publish after.  \n\n## RPC example\n\nA RPC service is implemented with a function named as the exposed service.\n\n```python\nimport rembus as rembus\n\ndef add(x,y):\n    return x+y\n\nrb = rembus.node('calculator')\n\nrb.expose(add)\n\nrb.wait()\n```\n\nThe `calculator` component expose the `add` service, the RPC client will invoke as:\n\n```python\nimport rembus as rembus\n\nrb = rembus.node()\nresult = rb.rpc('add', 1, 2)\n```\n\nThe asynchronous client and server implementations will be something like:\n\n```python\n#server.py\nimport asyncio\nimport rembus\n\nasync def add(x, y):\n    return x+y\n\nasync def main():\n    rb = await rembus.component()\n    \n    await rb.expose(add)\n    await rb.wait()\n\nloop = asyncio.new_event_loop()\nloop.run_until_complete(main())\n```\n\n```python\n# client.py\nimport asyncio\nimport rembus\n\nasync def main():\n    rb = await rembus.component()\n    result = await rb.rpc('add', 1, 2)\n    print(f'result={result}')\n    await rb.close()\n\n\nloop = asyncio.new_event_loop()\nloop.run_until_complete(main())\n```\n\n## Test\n\n```shell\npytest --cov=rembus --cov-report=lcov:lcov.info\n```\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "Rembus for python",
    "version": "0.5.0",
    "project_urls": {
        "Bug Tracker": "https://github.com/cardo-org/rembus.python/issues",
        "Homepage": "https://github.com/cardo-org/rembus.python"
    },
    "split_keywords": [
        "rembus",
        " rpc",
        " publish",
        " subscribe",
        " websocket",
        " cbor"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "fdf0e3dad687f19cfad26237318b66baf5492078a564c9db32cf15cc602b699c",
                "md5": "6e87b5042656cc373e22a43e815c8a8f",
                "sha256": "678195c51f58d283d782cf78f6149c81a536fe7d5bab2ad6d46ac08734d730e4"
            },
            "downloads": -1,
            "filename": "rembus-0.5.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "6e87b5042656cc373e22a43e815c8a8f",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.12",
            "size": 33003,
            "upload_time": "2025-10-16T08:34:32",
            "upload_time_iso_8601": "2025-10-16T08:34:32.848512Z",
            "url": "https://files.pythonhosted.org/packages/fd/f0/e3dad687f19cfad26237318b66baf5492078a564c9db32cf15cc602b699c/rembus-0.5.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "35a8a6c66451c47c486cf5b65c67fd0bbedc9390288be8bfcf44be72088f89fe",
                "md5": "0c68fceacd412f2ab7b529039442e273",
                "sha256": "e5b58232b021320a11bddfcccad89661c7550bdf7955e37b7142b5ce0ba6b8b2"
            },
            "downloads": -1,
            "filename": "rembus-0.5.0.tar.gz",
            "has_sig": false,
            "md5_digest": "0c68fceacd412f2ab7b529039442e273",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.12",
            "size": 33939,
            "upload_time": "2025-10-16T08:34:33",
            "upload_time_iso_8601": "2025-10-16T08:34:33.689407Z",
            "url": "https://files.pythonhosted.org/packages/35/a8/a6c66451c47c486cf5b65c67fd0bbedc9390288be8bfcf44be72088f89fe/rembus-0.5.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-10-16 08:34:33",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "cardo-org",
    "github_project": "rembus.python",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "rembus"
}
        
Elapsed time: 4.83175s