| Name | unixevents JSON |
| Version |
0.0.2
JSON |
| download |
| home_page | None |
| Summary | Unixevents is a lightweight inter-process communication (IPC) library available in both JavaScript (Node.js) and Python. It enables real-time, event-driven communication across processes or applications â regardless of the language they are written in. |
| upload_time | 2025-09-11 16:35:56 |
| maintainer | None |
| docs_url | None |
| author | None |
| requires_python | >=3.9 |
| license | None |
| keywords |
communication
events
ipc
unixevents
|
| VCS |
 |
| bugtrack_url |
|
| requirements |
No requirements were recorded.
|
| Travis-CI |
No Travis.
|
| coveralls test coverage |
No coveralls.
|
# Unixevents
**Unixevents** is a lightweight inter-process communication (IPC) library available in both **JavaScript (Node.js)** and **Python**. It enables real-time, event-driven communication across processes or applications â regardless of the language they are written in.
## đ Cross-Language Compatibility
- One of the core strengths of Unixevents is its **cross-language support**. A message sent over a Unixevents channel from a process written in Python can be seamlessly received by a process written in JavaScript (Node.js), and vice versa.
- This makes Unixevents ideal for building hybrid applications where different parts of the system are implemented in different languages but need to communicate efficiently and in real time.
> âšī¸ **More language support is coming soon** â Unixevents is designed with extensibility in mind, with plans to support additional programming languages in the near future.
## â
Features
- Available in **Python** and **Node.js**
- Fully compatible **across both languages**
- Simple, **event-driven API**
- Uses **Unix domain sockets** under the hood for high performance and low latency
- Supports **JSON-based message exchange**
- Designed for **future multi-language support**
## Installation
```bash
pip install unixevents
```
## Quick Start
### Basic Server-Client Communication
**Server (server.py):**
```python
from unixevents import Linker
server = Linker('server', 'channel')
def handle_greeting(data):
print(f"Received greeting: {data}")
server.send('welcome', {'message': 'Hello from server!'})
server.receive('greeting', handle_greeting)
print("Server is running... Press Ctrl+C to stop")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
server.close()
```
**Client (client.py):**
```python
from unixevents import Linker
client = Linker('client', 'channel')
def handle_welcome(data):
print(f"Server says: {data['message']}")
client.receive('welcome', handle_welcome)
client.send('greeting', {'name': 'Alice', 'message': 'Hello!'})
time.sleep(2)
client.close()
```
## API Reference
### Creating Instances
#### `Linker(role, channel, debug=False)`
Create a new Linker instance.
- `role` (str): Either 'server' or 'client'
- `channel` (str): Unique channel name for communication
- `debug` (bool): Enable debug logging (optional)
### Sending Events
#### `send(event, data, callback=None)`
Send an event with data.
- `event` (str): Event name
- `data` (Any): Data to send (will be JSON serialized)
- `callback` (Callable): Optional callback function(error, success)
```python
linker.send('message', {'text': 'Hello'})
def on_sent(error, success):
if success:
print("Message sent successfully")
else:
print(f"Failed to send: {error}")
linker.send('message', {'text': 'Hello'}, on_sent)
```
#### `send_sync(event, payload)`
Synchronous version of send.
```python
success = linker.send_sync('message', {'text': 'Hello'})
```
#### `send_async(event, payload)`
Asynchronous version of send.
```python
import asyncio
async def main():
success = await linker.send_async('message', {'text': 'Hello'})
asyncio.run(main())
```
### Receiving Events
#### `receive(event, listener)`
Register an event handler that will be called every time the event is received.
- `event` (str): Event name to listen for
- `listener` (Callable): Function to handle the event
```python
def handle_message(data):
print(f"Received: {data}")
linker.receive('message', handle_message)
```
#### `receive_once(event, listener)`
Register an event handler that will be called only once.
```python
def handle_init(data):
print(f"Initialization data: {data}")
linker.receive_once('init', handle_init)
```
### Initialization
#### `init_sync(role, channel, debug=None)`
Initialize the Linker synchronously (called automatically if role and channel provided to constructor).
```python
linker = Linker()
success = linker.init_sync('server', 'mychannel', debug=True)
```
#### `init_async(role, channel, debug=None)`
Initialize the Linker asynchronously.
```python
async def setup():
linker = Linker()
success = await linker.init_async('client', 'mychannel')
```
### Lifecycle Management
#### `close()`
Close the connection and clean up resources.
```python
linker.close()
```
### Debug Methods
#### `enable_debug()`
Enable debug logging.
```python
linker.enable_debug()
```
#### `disable_debug()`
Disable debug logging.
```python
linker.disable_debug()
```
## Advanced Examples
### Request-Response Pattern
**Server:**
```python
from unixevents import Linker
import uuid
server = Linker('server', 'rpc-channel')
def handle_request(data):
request_id = data.get('id')
method = data.get('method')
params = data.get('params', {})
if method == 'add':
result = params.get('a', 0) + params.get('b', 0)
elif method == 'multiply':
result = params.get('a', 0) * params.get('b', 0)
else:
result = None
server.send('response', {
'id': request_id,
'result': result
})
server.receive('request', handle_request)
```
**Client:**
```python
from unixevents import Linker
import uuid
import threading
client = Linker('client', 'rpc-channel')
pending_requests = {}
def handle_response(data):
request_id = data.get('id')
if request_id in pending_requests:
event = pending_requests[request_id]
event.result = data.get('result')
event.set()
client.receive('response', handle_response)
def rpc_call(method, params):
request_id = str(uuid.uuid4())
event = threading.Event()
pending_requests[request_id] = event
client.send('request', {
'id': request_id,
'method': method,
'params': params
})
event.wait(timeout=5)
result = getattr(event, 'result', None)
del pending_requests[request_id]
return result
result = rpc_call('add', {'a': 5, 'b': 3})
print(f"5 + 3 = {result}")
result = rpc_call('multiply', {'a': 4, 'b': 7})
print(f"4 * 7 = {result}")
```
### Broadcasting to Multiple Clients
**Server:**
```python
from unixevents import Linker
import time
import threading
server = Linker('server', 'broadcast-channel')
clients = []
def handle_join(data):
client_name = data.get('name')
clients.append(client_name)
print(f"{client_name} joined")
server.send('user_joined', {'name': client_name})
server.receive('join', handle_join)
def broadcast_time():
while True:
current_time = time.strftime('%Y-%m-%d %H:%M:%S')
server.send('time_update', {'time': current_time})
time.sleep(1)
broadcast_thread = threading.Thread(target=broadcast_time, daemon=True)
broadcast_thread.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
server.close()
```
**Client:**
```python
from unixevents import Linker
import time
client = Linker('client', 'broadcast-channel')
def handle_time_update(data):
print(f"Server time: {data['time']}")
def handle_user_joined(data):
print(f"New user joined: {data['name']}")
client.receive('time_update', handle_time_update)
client.receive('user_joined', handle_user_joined)
client.send('join', {'name': 'Client1'})
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
client.close()
```
### Async/Await Support
```python
import asyncio
from unixevents import Linker
async def async_server():
server = Linker()
await server.init_async('server', 'async-channel')
def handle_async_event(data):
print(f"Received async: {data}")
server.send_sync('processed', {'status': 'done'})
server.receive('process', handle_async_event)
await asyncio.sleep(60)
server.close()
async def async_client():
client = Linker()
await client.init_async('client', 'async-channel')
success = await client.send_async('process', {'data': 'test'})
print(f"Sent: {success}")
await asyncio.sleep(2)
client.close()
async def main():
await asyncio.gather(
async_server(),
async_client()
)
asyncio.run(main())
```
### Debug Mode
Enable debug mode to see detailed logs:
```python
linker = Linker('server', 'mychannel', debug=True)
linker.enable_debug()
linker.disable_debug()
```
Raw data
{
"_id": null,
"home_page": null,
"name": "unixevents",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": "communication, events, ipc, unixevents",
"author": null,
"author_email": "Rohit Singh <rohsins@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/3b/d2/79ecaa1ff2c98b3433ef469f3ec418688508431c206caebba8b1047ef1dd/unixevents-0.0.2.tar.gz",
"platform": null,
"description": "# Unixevents\n\n**Unixevents** is a lightweight inter-process communication (IPC) library available in both **JavaScript (Node.js)** and **Python**. It enables real-time, event-driven communication across processes or applications \u2014 regardless of the language they are written in.\n\n## \ud83d\udd17 Cross-Language Compatibility\n\n - One of the core strengths of Unixevents is its **cross-language support**. A message sent over a Unixevents channel from a process written in Python can be seamlessly received by a process written in JavaScript (Node.js), and vice versa.\n - This makes Unixevents ideal for building hybrid applications where different parts of the system are implemented in different languages but need to communicate efficiently and in real time.\n > \u2139\ufe0f **More language support is coming soon** \u2014 Unixevents is designed with extensibility in mind, with plans to support additional programming languages in the near future.\n\n## \u2705 Features\n\n- Available in **Python** and **Node.js**\n- Fully compatible **across both languages**\n- Simple, **event-driven API**\n- Uses **Unix domain sockets** under the hood for high performance and low latency\n- Supports **JSON-based message exchange**\n- Designed for **future multi-language support**\n\n## Installation\n\n```bash\npip install unixevents\n```\n\n## Quick Start\n\n### Basic Server-Client Communication\n\n**Server (server.py):**\n```python\nfrom unixevents import Linker\n\nserver = Linker('server', 'channel')\n\ndef handle_greeting(data):\n print(f\"Received greeting: {data}\")\n server.send('welcome', {'message': 'Hello from server!'})\n\nserver.receive('greeting', handle_greeting)\n\nprint(\"Server is running... Press Ctrl+C to stop\")\ntry:\n while True:\n time.sleep(1)\nexcept KeyboardInterrupt:\n server.close()\n```\n\n**Client (client.py):**\n```python\nfrom unixevents import Linker\n\nclient = Linker('client', 'channel')\n\ndef handle_welcome(data):\n print(f\"Server says: {data['message']}\")\n\nclient.receive('welcome', handle_welcome)\n\nclient.send('greeting', {'name': 'Alice', 'message': 'Hello!'})\n\ntime.sleep(2)\nclient.close()\n```\n\n## API Reference\n\n### Creating Instances\n\n#### `Linker(role, channel, debug=False)`\nCreate a new Linker instance.\n\n- `role` (str): Either 'server' or 'client'\n- `channel` (str): Unique channel name for communication\n- `debug` (bool): Enable debug logging (optional)\n\n### Sending Events\n\n#### `send(event, data, callback=None)`\nSend an event with data.\n\n- `event` (str): Event name\n- `data` (Any): Data to send (will be JSON serialized)\n- `callback` (Callable): Optional callback function(error, success)\n\n```python\n\nlinker.send('message', {'text': 'Hello'})\n\ndef on_sent(error, success):\n if success:\n print(\"Message sent successfully\")\n else:\n print(f\"Failed to send: {error}\")\n\nlinker.send('message', {'text': 'Hello'}, on_sent)\n```\n\n#### `send_sync(event, payload)`\nSynchronous version of send.\n\n```python\nsuccess = linker.send_sync('message', {'text': 'Hello'})\n```\n\n#### `send_async(event, payload)`\nAsynchronous version of send.\n\n```python\nimport asyncio\n\nasync def main():\n success = await linker.send_async('message', {'text': 'Hello'})\n \nasyncio.run(main())\n```\n\n### Receiving Events\n\n#### `receive(event, listener)`\nRegister an event handler that will be called every time the event is received.\n\n- `event` (str): Event name to listen for\n- `listener` (Callable): Function to handle the event\n\n```python\ndef handle_message(data):\n print(f\"Received: {data}\")\n\nlinker.receive('message', handle_message)\n```\n\n#### `receive_once(event, listener)`\nRegister an event handler that will be called only once.\n\n```python\ndef handle_init(data):\n print(f\"Initialization data: {data}\")\n\nlinker.receive_once('init', handle_init)\n```\n\n### Initialization\n\n#### `init_sync(role, channel, debug=None)`\nInitialize the Linker synchronously (called automatically if role and channel provided to constructor).\n\n```python\nlinker = Linker()\nsuccess = linker.init_sync('server', 'mychannel', debug=True)\n```\n\n#### `init_async(role, channel, debug=None)`\nInitialize the Linker asynchronously.\n\n```python\nasync def setup():\n linker = Linker()\n success = await linker.init_async('client', 'mychannel')\n```\n\n### Lifecycle Management\n\n#### `close()`\nClose the connection and clean up resources.\n\n```python\nlinker.close()\n```\n### Debug Methods\n\n#### `enable_debug()`\nEnable debug logging.\n\n```python\nlinker.enable_debug()\n```\n\n#### `disable_debug()`\nDisable debug logging.\n\n```python\nlinker.disable_debug()\n```\n\n## Advanced Examples\n\n### Request-Response Pattern\n\n**Server:**\n```python\nfrom unixevents import Linker\nimport uuid\n\nserver = Linker('server', 'rpc-channel')\n\ndef handle_request(data):\n request_id = data.get('id')\n method = data.get('method')\n params = data.get('params', {})\n \n if method == 'add':\n result = params.get('a', 0) + params.get('b', 0)\n elif method == 'multiply':\n result = params.get('a', 0) * params.get('b', 0)\n else:\n result = None\n \n server.send('response', {\n 'id': request_id,\n 'result': result\n })\n\nserver.receive('request', handle_request)\n```\n\n**Client:**\n```python\nfrom unixevents import Linker\nimport uuid\nimport threading\n\nclient = Linker('client', 'rpc-channel')\npending_requests = {}\n\ndef handle_response(data):\n request_id = data.get('id')\n if request_id in pending_requests:\n event = pending_requests[request_id]\n event.result = data.get('result')\n event.set()\n\nclient.receive('response', handle_response)\n\ndef rpc_call(method, params):\n request_id = str(uuid.uuid4())\n event = threading.Event()\n pending_requests[request_id] = event\n \n client.send('request', {\n 'id': request_id,\n 'method': method,\n 'params': params\n })\n \n event.wait(timeout=5)\n result = getattr(event, 'result', None)\n del pending_requests[request_id]\n return result\n\nresult = rpc_call('add', {'a': 5, 'b': 3})\nprint(f\"5 + 3 = {result}\")\n\nresult = rpc_call('multiply', {'a': 4, 'b': 7})\nprint(f\"4 * 7 = {result}\")\n```\n\n### Broadcasting to Multiple Clients\n\n**Server:**\n```python\nfrom unixevents import Linker\nimport time\nimport threading\n\nserver = Linker('server', 'broadcast-channel')\nclients = []\n\ndef handle_join(data):\n client_name = data.get('name')\n clients.append(client_name)\n print(f\"{client_name} joined\")\n \n server.send('user_joined', {'name': client_name})\n\nserver.receive('join', handle_join)\n\ndef broadcast_time():\n while True:\n current_time = time.strftime('%Y-%m-%d %H:%M:%S')\n server.send('time_update', {'time': current_time})\n time.sleep(1)\n\nbroadcast_thread = threading.Thread(target=broadcast_time, daemon=True)\nbroadcast_thread.start()\n\ntry:\n while True:\n time.sleep(1)\nexcept KeyboardInterrupt:\n server.close()\n```\n\n**Client:**\n```python\nfrom unixevents import Linker\nimport time\n\nclient = Linker('client', 'broadcast-channel')\n\ndef handle_time_update(data):\n print(f\"Server time: {data['time']}\")\n\ndef handle_user_joined(data):\n print(f\"New user joined: {data['name']}\")\n\nclient.receive('time_update', handle_time_update)\nclient.receive('user_joined', handle_user_joined)\n\nclient.send('join', {'name': 'Client1'})\n\ntry:\n while True:\n time.sleep(1)\nexcept KeyboardInterrupt:\n client.close()\n```\n\n### Async/Await Support\n\n```python\nimport asyncio\nfrom unixevents import Linker\n\nasync def async_server():\n server = Linker()\n await server.init_async('server', 'async-channel')\n \n def handle_async_event(data):\n print(f\"Received async: {data}\")\n server.send_sync('processed', {'status': 'done'})\n \n server.receive('process', handle_async_event)\n \n await asyncio.sleep(60)\n server.close()\n\nasync def async_client():\n client = Linker()\n await client.init_async('client', 'async-channel')\n \n success = await client.send_async('process', {'data': 'test'})\n print(f\"Sent: {success}\")\n \n await asyncio.sleep(2)\n client.close()\n\nasync def main():\n await asyncio.gather(\n async_server(),\n async_client()\n )\n\nasyncio.run(main())\n```\n\n### Debug Mode\n\nEnable debug mode to see detailed logs:\n\n```python\n\nlinker = Linker('server', 'mychannel', debug=True)\n\nlinker.enable_debug()\nlinker.disable_debug()\n```\n\n",
"bugtrack_url": null,
"license": null,
"summary": "Unixevents is a lightweight inter-process communication (IPC) library available in both JavaScript (Node.js) and Python. It enables real-time, event-driven communication across processes or applications \u2014 regardless of the language they are written in.",
"version": "0.0.2",
"project_urls": {
"Homepage": "https://github.com/rohsins/unixevents_py",
"Issues": "https://github.com/rohsins/unixevents_py/issues"
},
"split_keywords": [
"communication",
" events",
" ipc",
" unixevents"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "84c88ee0e945e4a06ae7ba3f002bc5cc649d8d3fd6e7531b1a397082d8ca3607",
"md5": "a2ba37fd34fe9922634d1c92b3eea947",
"sha256": "55a136395e03466e7e0303d1f1faece4aab315ad04c763983f2d959b360954b9"
},
"downloads": -1,
"filename": "unixevents-0.0.2-py3-none-any.whl",
"has_sig": false,
"md5_digest": "a2ba37fd34fe9922634d1c92b3eea947",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 6749,
"upload_time": "2025-09-11T16:35:53",
"upload_time_iso_8601": "2025-09-11T16:35:53.922648Z",
"url": "https://files.pythonhosted.org/packages/84/c8/8ee0e945e4a06ae7ba3f002bc5cc649d8d3fd6e7531b1a397082d8ca3607/unixevents-0.0.2-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "3bd279ecaa1ff2c98b3433ef469f3ec418688508431c206caebba8b1047ef1dd",
"md5": "9a5227de7a3ff5645a164d979b61896c",
"sha256": "57fb6eb5b082c89059178682068ede8b75386c5b5d96ded032aa91896f949478"
},
"downloads": -1,
"filename": "unixevents-0.0.2.tar.gz",
"has_sig": false,
"md5_digest": "9a5227de7a3ff5645a164d979b61896c",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 6225,
"upload_time": "2025-09-11T16:35:56",
"upload_time_iso_8601": "2025-09-11T16:35:56.159386Z",
"url": "https://files.pythonhosted.org/packages/3b/d2/79ecaa1ff2c98b3433ef469f3ec418688508431c206caebba8b1047ef1dd/unixevents-0.0.2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-09-11 16:35:56",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "rohsins",
"github_project": "unixevents_py",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "unixevents"
}