# Realtime Pub/Sub Client for Python
The `realtime-pubsub-client` is a Python client library for interacting
with [Realtime Pub/Sub](https://realtime.21no.de) applications. It enables developers to manage real-time WebSocket
connections, handle subscriptions, and process messages efficiently. The library provides a simple and flexible API to
interact with realtime applications, supporting features like publishing/sending messages, subscribing to topics,
handling acknowledgements, and waiting for replies with timeout support.
## Features
- **WebSocket Connection Management**: Seamlessly connect and disconnect from the Realtime Pub/Sub service with
automatic reconnection support.
- **Topic Subscription**: Subscribe and unsubscribe to topics for receiving messages.
- **Topic Publishing**: [Publish](https://realtime.21no.de/documentation/#publishers) messages to specific topics with
optional message types and compression.
- **Message Sending**: [Send](https://realtime.21no.de/documentation/#websocket-inbound-messaging) messages to backend
applications with optional message types and compression.
- **Event Handling**: Handle incoming messages with custom event listeners.
- **Acknowledgements and Replies**: Wait for gateway acknowledgements or replies to messages with timeout support.
- **Error Handling**: Robust error handling and logging capabilities.
- **Asynchronous Support**: Built using `asyncio` for efficient asynchronous programming.
## Installation
Install the `realtime-pubsub-client` library via pip:
```bash
pip install realtime-pubsub-client
```
## Getting Started
This guide will help you set up and use the `realtime-pubsub-client` library in your Python project.
### Connecting to the Server
First, import the `RealtimeClient` class and create a new instance with the required configuration:
```python
import asyncio
import logging
import os
from realtime_pubsub_client import RealtimeClient
async def main():
async def get_url():
# replace with your access token retrieval strategy
access_token = os.environ.get('ACCESS_TOKEN')
app_id = os.environ.get('APP_ID')
# return the WebSocket URL with the access token
return f"wss://genesis.r7.21no.de/apps/{app_id}?access_token={access_token}"
client_options = {
'logger': logging.getLogger('RealtimeClient'),
'websocket_options': {
'url_provider': get_url,
},
}
client = RealtimeClient(client_options)
async def on_session_started(connection_info):
print('Connection ID:', connection_info['id'])
# Subscribe to topics here
await client.subscribe_remote_topic('topic1')
await client.subscribe_remote_topic('topic2')
client.on('session.started', on_session_started)
await client.connect()
await client.wait_for('session.started')
asyncio.run(main())
```
### Subscribing to Incoming Messages
You can handle messages for specific topics and message types:
> **Note**: The topic and message type are separated by a dot (`.`) in the event name.
```python
def handle_message(message, reply_fn):
# Message handling logic here
print('Received message:', message['data']['payload'])
client.on('topic1.action1', handle_message)
```
Wildcard subscriptions are also supported:
```python
client.on('topic1.*', handle_message)
```
### Publishing Messages
Publish messages to a topic:
```python
await client.publish('topic1', 'Hello, world!', message_type='text-message')
```
### Responding to Incoming Messages
Set up event listeners to handle incoming messages:
```python
async def handle_message(message, reply_fn):
# Processing the message
print('Received message:', message['data']['payload'])
# Sending a reply
await reply_fn('Message received!', 'ok')
client.on('topic1.text-message', handle_message)
```
### Waiting for Acknowledgements and Replies
- **`wait_for_ack(timeout=None)`**: Waits for an acknowledgement of the message, with an optional timeout in seconds.
- **`wait_for_reply(timeout=None)`**: Waits for a reply to the message, with an optional timeout in seconds.
Wait for the Realtime Gateway acknowledgement after publishing a message:
```python
waiter = await client.publish('secure/peer-to-peer1', 'Hi', message_type='text-message')
await waiter.wait_for_ack()
```
Wait for the Realtime Gateway acknowledgement after sending a message:
```python
waiter = await client.send({
# Message payload
}, message_type='create')
await waiter.wait_for_ack()
```
Wait for a reply with a timeout:
```python
waiter = await client.send({
# Message payload
}, message_type='create')
await waiter.wait_for_reply(timeout=5) # Wait for up to 5 seconds
```
### Error Handling
Handle errors and disconnections:
```python
def on_error(error):
print('WebSocket error:', error)
def on_close(event):
print('WebSocket closed:', event)
client.on('error', on_error)
client.on('close', on_close)
```
## API Reference
### RealtimeClient
#### Constructor
```python
RealtimeClient(config)
```
Creates a new `RealtimeClient` instance.
- **`config`**: Configuration options for the client.
#### Methods
- **`connect()`**: Connects the client to the WebSocket Messaging Gateway.
```python
await client.connect()
```
- **`disconnect()`**: Terminates the WebSocket connection.
```python
await client.disconnect()
```
- **`subscribe_remote_topic(topic)`**: [Subscribes](https://realtime.21no.de/documentation/#subscribers) the connection
to a remote topic.
```python
await client.subscribe_remote_topic(topic)
```
- **`unsubscribe_remote_topic(topic)`**: [Unsubscribes](https://realtime.21no.de/documentation/#subscribers) the
connection from a remote topic.
```python
await client.unsubscribe_remote_topic(topic)
```
- **`publish(topic, payload, message_type="broadcast", compress=False, message_id=None)`**: Publishes a message to a topic.
```python
waiter = await client.publish(topic, payload)
```
Returns a `WaitFor` instance to wait for acknowledgements or replies.
- **`send(payload, compress=False, message_id=None)`**: Sends a message to the server.
```python
waiter = await client.send(payload, options)
```
Returns a `WaitFor` instance to wait for acknowledgements or replies.
- **`wait(ms)`**: Waits for a specified duration in milliseconds. Utility function for waiting in async functions.
```python
await wait(ms)
```
#### Events
- **`'session.started'`**: Emitted when the session starts.
```python
client.on('session.started', on_session_started)
```
- **`'error'`**: Emitted on WebSocket errors.
```python
client.on('error', on_error)
```
- **`'close'`**: Emitted when the WebSocket connection closes.
```python
client.on('close', on_close)
```
- **Custom Events**: Handle custom events based on topic and message type.
```python
client.on('TOPIC_NAME.MESSAGE_TYPE', handle_message)
```
> **Note**: Wildcard subscriptions are also supported.
## License
This library is licensed under the MIT License.
---
For more detailed examples and advanced configurations, please refer to
the [documentation](https://realtime.21no.de/docs).
## Notes
- Ensure that you have an account and an app set up with [Realtime Pub/Sub](https://realtime.21no.de).
- Customize the `url_provider` or URL to retrieve the access token for connecting to your realtime application.
- Implement the `get_auth_token` function according to your authentication mechanism.
- Optionally use the `logger` option to integrate with your application's logging system.
- Handle errors and disconnections gracefully to improve the robustness of your application.
- Make sure to handle timeouts when waiting for replies to avoid hanging operations.
---
Feel free to contribute to this project by submitting issues or pull requests
on [GitHub](https://github.com/BackendStack21/realtime-pubsub-client-python).
Raw data
{
"_id": null,
"home_page": null,
"name": "realtime-pubsub-client",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.6",
"maintainer_email": null,
"keywords": "client, messaging, pubsub, python, realtime, websocket",
"author": null,
"author_email": "Rolando Santamaria Maso <kyberneees@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/53/66/9551fd8eb7c37da79e71eec669d1bea1493a18c6d75f5afdfb0be49d1d0f/realtime_pubsub_client-1.0.0.tar.gz",
"platform": null,
"description": "# Realtime Pub/Sub Client for Python\n\nThe `realtime-pubsub-client` is a Python client library for interacting\nwith [Realtime Pub/Sub](https://realtime.21no.de) applications. It enables developers to manage real-time WebSocket\nconnections, handle subscriptions, and process messages efficiently. The library provides a simple and flexible API to\ninteract with realtime applications, supporting features like publishing/sending messages, subscribing to topics,\nhandling acknowledgements, and waiting for replies with timeout support.\n\n## Features\n\n- **WebSocket Connection Management**: Seamlessly connect and disconnect from the Realtime Pub/Sub service with\n automatic reconnection support.\n- **Topic Subscription**: Subscribe and unsubscribe to topics for receiving messages.\n- **Topic Publishing**: [Publish](https://realtime.21no.de/documentation/#publishers) messages to specific topics with\n optional message types and compression.\n- **Message Sending**: [Send](https://realtime.21no.de/documentation/#websocket-inbound-messaging) messages to backend\n applications with optional message types and compression.\n- **Event Handling**: Handle incoming messages with custom event listeners.\n- **Acknowledgements and Replies**: Wait for gateway acknowledgements or replies to messages with timeout support.\n- **Error Handling**: Robust error handling and logging capabilities.\n- **Asynchronous Support**: Built using `asyncio` for efficient asynchronous programming.\n\n## Installation\n\nInstall the `realtime-pubsub-client` library via pip:\n\n```bash\npip install realtime-pubsub-client\n```\n\n## Getting Started\n\nThis guide will help you set up and use the `realtime-pubsub-client` library in your Python project.\n\n### Connecting to the Server\n\nFirst, import the `RealtimeClient` class and create a new instance with the required configuration:\n\n```python\nimport asyncio\nimport logging\nimport os\nfrom realtime_pubsub_client import RealtimeClient\n\n\nasync def main():\n async def get_url():\n # replace with your access token retrieval strategy\n access_token = os.environ.get('ACCESS_TOKEN')\n app_id = os.environ.get('APP_ID')\n\n # return the WebSocket URL with the access token\n return f\"wss://genesis.r7.21no.de/apps/{app_id}?access_token={access_token}\"\n\n client_options = {\n 'logger': logging.getLogger('RealtimeClient'),\n 'websocket_options': {\n 'url_provider': get_url,\n },\n }\n client = RealtimeClient(client_options)\n\n async def on_session_started(connection_info):\n print('Connection ID:', connection_info['id'])\n # Subscribe to topics here\n await client.subscribe_remote_topic('topic1')\n await client.subscribe_remote_topic('topic2')\n\n client.on('session.started', on_session_started)\n\n await client.connect()\n await client.wait_for('session.started')\n\n\nasyncio.run(main())\n```\n\n### Subscribing to Incoming Messages\n\nYou can handle messages for specific topics and message types:\n\n> **Note**: The topic and message type are separated by a dot (`.`) in the event name.\n\n```python\ndef handle_message(message, reply_fn):\n # Message handling logic here\n print('Received message:', message['data']['payload'])\n\n\nclient.on('topic1.action1', handle_message)\n```\n\nWildcard subscriptions are also supported:\n\n```python\nclient.on('topic1.*', handle_message)\n```\n\n### Publishing Messages\n\nPublish messages to a topic:\n\n```python\nawait client.publish('topic1', 'Hello, world!', message_type='text-message')\n```\n\n### Responding to Incoming Messages\n\nSet up event listeners to handle incoming messages:\n\n```python\nasync def handle_message(message, reply_fn):\n # Processing the message\n print('Received message:', message['data']['payload'])\n\n # Sending a reply\n await reply_fn('Message received!', 'ok')\n\n\nclient.on('topic1.text-message', handle_message)\n```\n\n### Waiting for Acknowledgements and Replies\n\n- **`wait_for_ack(timeout=None)`**: Waits for an acknowledgement of the message, with an optional timeout in seconds.\n- **`wait_for_reply(timeout=None)`**: Waits for a reply to the message, with an optional timeout in seconds.\n\nWait for the Realtime Gateway acknowledgement after publishing a message:\n\n```python\nwaiter = await client.publish('secure/peer-to-peer1', 'Hi', message_type='text-message')\nawait waiter.wait_for_ack()\n```\n\nWait for the Realtime Gateway acknowledgement after sending a message:\n\n```python\nwaiter = await client.send({\n # Message payload\n}, message_type='create')\nawait waiter.wait_for_ack()\n```\n\nWait for a reply with a timeout:\n\n```python\nwaiter = await client.send({\n # Message payload\n}, message_type='create')\nawait waiter.wait_for_reply(timeout=5) # Wait for up to 5 seconds\n```\n\n### Error Handling\n\nHandle errors and disconnections:\n\n```python\ndef on_error(error):\n print('WebSocket error:', error)\n\n\ndef on_close(event):\n print('WebSocket closed:', event)\n\n\nclient.on('error', on_error)\nclient.on('close', on_close)\n```\n\n## API Reference\n\n### RealtimeClient\n\n#### Constructor\n\n```python\nRealtimeClient(config)\n```\n\nCreates a new `RealtimeClient` instance.\n\n- **`config`**: Configuration options for the client.\n\n#### Methods\n\n- **`connect()`**: Connects the client to the WebSocket Messaging Gateway.\n\n ```python\n await client.connect()\n ```\n\n- **`disconnect()`**: Terminates the WebSocket connection.\n\n ```python\n await client.disconnect()\n ```\n\n- **`subscribe_remote_topic(topic)`**: [Subscribes](https://realtime.21no.de/documentation/#subscribers) the connection\n to a remote topic.\n\n ```python\n await client.subscribe_remote_topic(topic)\n ```\n\n- **`unsubscribe_remote_topic(topic)`**: [Unsubscribes](https://realtime.21no.de/documentation/#subscribers) the\n connection from a remote topic.\n\n ```python\n await client.unsubscribe_remote_topic(topic)\n ```\n\n- **`publish(topic, payload, message_type=\"broadcast\", compress=False, message_id=None)`**: Publishes a message to a topic.\n\n ```python\n waiter = await client.publish(topic, payload)\n ```\n\n Returns a `WaitFor` instance to wait for acknowledgements or replies.\n\n- **`send(payload, compress=False, message_id=None)`**: Sends a message to the server.\n\n ```python\n waiter = await client.send(payload, options)\n ```\n\n Returns a `WaitFor` instance to wait for acknowledgements or replies.\n\n- **`wait(ms)`**: Waits for a specified duration in milliseconds. Utility function for waiting in async functions.\n\n ```python\n await wait(ms)\n ```\n\n#### Events\n\n- **`'session.started'`**: Emitted when the session starts.\n\n ```python\n client.on('session.started', on_session_started)\n ```\n\n- **`'error'`**: Emitted on WebSocket errors.\n\n ```python\n client.on('error', on_error)\n ```\n\n- **`'close'`**: Emitted when the WebSocket connection closes.\n\n ```python\n client.on('close', on_close)\n ```\n\n- **Custom Events**: Handle custom events based on topic and message type.\n\n ```python\n client.on('TOPIC_NAME.MESSAGE_TYPE', handle_message)\n ```\n\n > **Note**: Wildcard subscriptions are also supported.\n\n## License\n\nThis library is licensed under the MIT License.\n\n---\n\nFor more detailed examples and advanced configurations, please refer to\nthe [documentation](https://realtime.21no.de/docs).\n\n## Notes\n\n- Ensure that you have an account and an app set up with [Realtime Pub/Sub](https://realtime.21no.de).\n- Customize the `url_provider` or URL to retrieve the access token for connecting to your realtime application.\n- Implement the `get_auth_token` function according to your authentication mechanism.\n- Optionally use the `logger` option to integrate with your application's logging system.\n- Handle errors and disconnections gracefully to improve the robustness of your application.\n- Make sure to handle timeouts when waiting for replies to avoid hanging operations.\n\n---\n\nFeel free to contribute to this project by submitting issues or pull requests\non [GitHub](https://github.com/BackendStack21/realtime-pubsub-client-python).",
"bugtrack_url": null,
"license": "MIT",
"summary": "The official Realtime Pub/Sub client for Python",
"version": "1.0.0",
"project_urls": null,
"split_keywords": [
"client",
" messaging",
" pubsub",
" python",
" realtime",
" websocket"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "4e8e21c351ee0cbb50c5ee4fe4eff76f1acc5e6cc2fcdd28313a1ffcf22aef5e",
"md5": "4bfb83f5861473eedab0c0e5456fc81b",
"sha256": "be9c04c0f7ed9be24c50bf604343e455506b897169466f013bed84633f28443f"
},
"downloads": -1,
"filename": "realtime_pubsub_client-1.0.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "4bfb83f5861473eedab0c0e5456fc81b",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.6",
"size": 11248,
"upload_time": "2024-10-20T12:11:58",
"upload_time_iso_8601": "2024-10-20T12:11:58.594172Z",
"url": "https://files.pythonhosted.org/packages/4e/8e/21c351ee0cbb50c5ee4fe4eff76f1acc5e6cc2fcdd28313a1ffcf22aef5e/realtime_pubsub_client-1.0.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "53669551fd8eb7c37da79e71eec669d1bea1493a18c6d75f5afdfb0be49d1d0f",
"md5": "84e07c06fd6f79acd3634d806aab3cad",
"sha256": "f7636b4bb916d0a5c0a4d0bb34103eb811b274ab80c5ca3f3811a1348f38bdc0"
},
"downloads": -1,
"filename": "realtime_pubsub_client-1.0.0.tar.gz",
"has_sig": false,
"md5_digest": "84e07c06fd6f79acd3634d806aab3cad",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.6",
"size": 10610,
"upload_time": "2024-10-20T12:11:59",
"upload_time_iso_8601": "2024-10-20T12:11:59.950860Z",
"url": "https://files.pythonhosted.org/packages/53/66/9551fd8eb7c37da79e71eec669d1bea1493a18c6d75f5afdfb0be49d1d0f/realtime_pubsub_client-1.0.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-10-20 12:11:59",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "realtime-pubsub-client"
}