unixevents


Nameunixevents JSON
Version 0.0.2 PyPI version JSON
download
home_pageNone
SummaryUnixevents is a lightweight inter-process communication (IPC) library available in both JavaScript (Node.js) and Python. It enables real-time, event-driven communication across processes or applications — regardless of the language they are written in.
upload_time2025-09-11 16:35:56
maintainerNone
docs_urlNone
authorNone
requires_python>=3.9
licenseNone
keywords communication events ipc unixevents
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Unixevents

**Unixevents** is a lightweight inter-process communication (IPC) library available in both **JavaScript (Node.js)** and **Python**. It enables real-time, event-driven communication across processes or applications — regardless of the language they are written in.

## 🔗 Cross-Language Compatibility

 - One of the core strengths of Unixevents is its **cross-language support**. A message sent over a Unixevents channel from a process written in Python can be seamlessly received by a process written in JavaScript (Node.js), and vice versa.
 - This makes Unixevents ideal for building hybrid applications where different parts of the system are implemented in different languages but need to communicate efficiently and in real time.
 > â„šī¸ **More language support is coming soon** — Unixevents is designed with extensibility in mind, with plans to support additional programming languages in the near future.

## ✅ Features

- Available in **Python** and **Node.js**
- Fully compatible **across both languages**
- Simple, **event-driven API**
- Uses **Unix domain sockets** under the hood for high performance and low latency
- Supports **JSON-based message exchange**
- Designed for **future multi-language support**

## Installation

```bash
pip install unixevents
```

## Quick Start

### Basic Server-Client Communication

**Server (server.py):**
```python
from unixevents import Linker

server = Linker('server', 'channel')

def handle_greeting(data):
    print(f"Received greeting: {data}")
    server.send('welcome', {'message': 'Hello from server!'})

server.receive('greeting', handle_greeting)

print("Server is running... Press Ctrl+C to stop")
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    server.close()
```

**Client (client.py):**
```python
from unixevents import Linker

client = Linker('client', 'channel')

def handle_welcome(data):
    print(f"Server says: {data['message']}")

client.receive('welcome', handle_welcome)

client.send('greeting', {'name': 'Alice', 'message': 'Hello!'})

time.sleep(2)
client.close()
```

## API Reference

### Creating Instances

#### `Linker(role, channel, debug=False)`
Create a new Linker instance.

- `role` (str): Either 'server' or 'client'
- `channel` (str): Unique channel name for communication
- `debug` (bool): Enable debug logging (optional)

### Sending Events

#### `send(event, data, callback=None)`
Send an event with data.

- `event` (str): Event name
- `data` (Any): Data to send (will be JSON serialized)
- `callback` (Callable): Optional callback function(error, success)

```python

linker.send('message', {'text': 'Hello'})

def on_sent(error, success):
    if success:
        print("Message sent successfully")
    else:
        print(f"Failed to send: {error}")

linker.send('message', {'text': 'Hello'}, on_sent)
```

#### `send_sync(event, payload)`
Synchronous version of send.

```python
success = linker.send_sync('message', {'text': 'Hello'})
```

#### `send_async(event, payload)`
Asynchronous version of send.

```python
import asyncio

async def main():
    success = await linker.send_async('message', {'text': 'Hello'})
    
asyncio.run(main())
```

### Receiving Events

#### `receive(event, listener)`
Register an event handler that will be called every time the event is received.

- `event` (str): Event name to listen for
- `listener` (Callable): Function to handle the event

```python
def handle_message(data):
    print(f"Received: {data}")

linker.receive('message', handle_message)
```

#### `receive_once(event, listener)`
Register an event handler that will be called only once.

```python
def handle_init(data):
    print(f"Initialization data: {data}")

linker.receive_once('init', handle_init)
```

### Initialization

#### `init_sync(role, channel, debug=None)`
Initialize the Linker synchronously (called automatically if role and channel provided to constructor).

```python
linker = Linker()
success = linker.init_sync('server', 'mychannel', debug=True)
```

#### `init_async(role, channel, debug=None)`
Initialize the Linker asynchronously.

```python
async def setup():
    linker = Linker()
    success = await linker.init_async('client', 'mychannel')
```

### Lifecycle Management

#### `close()`
Close the connection and clean up resources.

```python
linker.close()
```
### Debug Methods

#### `enable_debug()`
Enable debug logging.

```python
linker.enable_debug()
```

#### `disable_debug()`
Disable debug logging.

```python
linker.disable_debug()
```

## Advanced Examples

### Request-Response Pattern

**Server:**
```python
from unixevents import Linker
import uuid

server = Linker('server', 'rpc-channel')

def handle_request(data):
    request_id = data.get('id')
    method = data.get('method')
    params = data.get('params', {})
    
    if method == 'add':
        result = params.get('a', 0) + params.get('b', 0)
    elif method == 'multiply':
        result = params.get('a', 0) * params.get('b', 0)
    else:
        result = None
        
    server.send('response', {
        'id': request_id,
        'result': result
    })

server.receive('request', handle_request)
```

**Client:**
```python
from unixevents import Linker
import uuid
import threading

client = Linker('client', 'rpc-channel')
pending_requests = {}

def handle_response(data):
    request_id = data.get('id')
    if request_id in pending_requests:
        event = pending_requests[request_id]
        event.result = data.get('result')
        event.set()

client.receive('response', handle_response)

def rpc_call(method, params):
    request_id = str(uuid.uuid4())
    event = threading.Event()
    pending_requests[request_id] = event
    
    client.send('request', {
        'id': request_id,
        'method': method,
        'params': params
    })
    
    event.wait(timeout=5)
    result = getattr(event, 'result', None)
    del pending_requests[request_id]
    return result

result = rpc_call('add', {'a': 5, 'b': 3})
print(f"5 + 3 = {result}")

result = rpc_call('multiply', {'a': 4, 'b': 7})
print(f"4 * 7 = {result}")
```

### Broadcasting to Multiple Clients

**Server:**
```python
from unixevents import Linker
import time
import threading

server = Linker('server', 'broadcast-channel')
clients = []

def handle_join(data):
    client_name = data.get('name')
    clients.append(client_name)
    print(f"{client_name} joined")
    
    server.send('user_joined', {'name': client_name})

server.receive('join', handle_join)

def broadcast_time():
    while True:
        current_time = time.strftime('%Y-%m-%d %H:%M:%S')
        server.send('time_update', {'time': current_time})
        time.sleep(1)

broadcast_thread = threading.Thread(target=broadcast_time, daemon=True)
broadcast_thread.start()

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    server.close()
```

**Client:**
```python
from unixevents import Linker
import time

client = Linker('client', 'broadcast-channel')

def handle_time_update(data):
    print(f"Server time: {data['time']}")

def handle_user_joined(data):
    print(f"New user joined: {data['name']}")

client.receive('time_update', handle_time_update)
client.receive('user_joined', handle_user_joined)

client.send('join', {'name': 'Client1'})

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    client.close()
```

### Async/Await Support

```python
import asyncio
from unixevents import Linker

async def async_server():
    server = Linker()
    await server.init_async('server', 'async-channel')
    
    def handle_async_event(data):
        print(f"Received async: {data}")
        server.send_sync('processed', {'status': 'done'})
    
    server.receive('process', handle_async_event)
    
    await asyncio.sleep(60)
    server.close()

async def async_client():
    client = Linker()
    await client.init_async('client', 'async-channel')
    
    success = await client.send_async('process', {'data': 'test'})
    print(f"Sent: {success}")
    
    await asyncio.sleep(2)
    client.close()

async def main():
    await asyncio.gather(
        async_server(),
        async_client()
    )

asyncio.run(main())
```

### Debug Mode

Enable debug mode to see detailed logs:

```python

linker = Linker('server', 'mychannel', debug=True)

linker.enable_debug()
linker.disable_debug()
```


            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "unixevents",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.9",
    "maintainer_email": null,
    "keywords": "communication, events, ipc, unixevents",
    "author": null,
    "author_email": "Rohit Singh <rohsins@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/3b/d2/79ecaa1ff2c98b3433ef469f3ec418688508431c206caebba8b1047ef1dd/unixevents-0.0.2.tar.gz",
    "platform": null,
    "description": "# Unixevents\n\n**Unixevents** is a lightweight inter-process communication (IPC) library available in both **JavaScript (Node.js)** and **Python**. It enables real-time, event-driven communication across processes or applications \u2014 regardless of the language they are written in.\n\n## \ud83d\udd17 Cross-Language Compatibility\n\n - One of the core strengths of Unixevents is its **cross-language support**. A message sent over a Unixevents channel from a process written in Python can be seamlessly received by a process written in JavaScript (Node.js), and vice versa.\n - This makes Unixevents ideal for building hybrid applications where different parts of the system are implemented in different languages but need to communicate efficiently and in real time.\n > \u2139\ufe0f **More language support is coming soon** \u2014 Unixevents is designed with extensibility in mind, with plans to support additional programming languages in the near future.\n\n## \u2705 Features\n\n- Available in **Python** and **Node.js**\n- Fully compatible **across both languages**\n- Simple, **event-driven API**\n- Uses **Unix domain sockets** under the hood for high performance and low latency\n- Supports **JSON-based message exchange**\n- Designed for **future multi-language support**\n\n## Installation\n\n```bash\npip install unixevents\n```\n\n## Quick Start\n\n### Basic Server-Client Communication\n\n**Server (server.py):**\n```python\nfrom unixevents import Linker\n\nserver = Linker('server', 'channel')\n\ndef handle_greeting(data):\n    print(f\"Received greeting: {data}\")\n    server.send('welcome', {'message': 'Hello from server!'})\n\nserver.receive('greeting', handle_greeting)\n\nprint(\"Server is running... Press Ctrl+C to stop\")\ntry:\n    while True:\n        time.sleep(1)\nexcept KeyboardInterrupt:\n    server.close()\n```\n\n**Client (client.py):**\n```python\nfrom unixevents import Linker\n\nclient = Linker('client', 'channel')\n\ndef handle_welcome(data):\n    print(f\"Server says: {data['message']}\")\n\nclient.receive('welcome', handle_welcome)\n\nclient.send('greeting', {'name': 'Alice', 'message': 'Hello!'})\n\ntime.sleep(2)\nclient.close()\n```\n\n## API Reference\n\n### Creating Instances\n\n#### `Linker(role, channel, debug=False)`\nCreate a new Linker instance.\n\n- `role` (str): Either 'server' or 'client'\n- `channel` (str): Unique channel name for communication\n- `debug` (bool): Enable debug logging (optional)\n\n### Sending Events\n\n#### `send(event, data, callback=None)`\nSend an event with data.\n\n- `event` (str): Event name\n- `data` (Any): Data to send (will be JSON serialized)\n- `callback` (Callable): Optional callback function(error, success)\n\n```python\n\nlinker.send('message', {'text': 'Hello'})\n\ndef on_sent(error, success):\n    if success:\n        print(\"Message sent successfully\")\n    else:\n        print(f\"Failed to send: {error}\")\n\nlinker.send('message', {'text': 'Hello'}, on_sent)\n```\n\n#### `send_sync(event, payload)`\nSynchronous version of send.\n\n```python\nsuccess = linker.send_sync('message', {'text': 'Hello'})\n```\n\n#### `send_async(event, payload)`\nAsynchronous version of send.\n\n```python\nimport asyncio\n\nasync def main():\n    success = await linker.send_async('message', {'text': 'Hello'})\n    \nasyncio.run(main())\n```\n\n### Receiving Events\n\n#### `receive(event, listener)`\nRegister an event handler that will be called every time the event is received.\n\n- `event` (str): Event name to listen for\n- `listener` (Callable): Function to handle the event\n\n```python\ndef handle_message(data):\n    print(f\"Received: {data}\")\n\nlinker.receive('message', handle_message)\n```\n\n#### `receive_once(event, listener)`\nRegister an event handler that will be called only once.\n\n```python\ndef handle_init(data):\n    print(f\"Initialization data: {data}\")\n\nlinker.receive_once('init', handle_init)\n```\n\n### Initialization\n\n#### `init_sync(role, channel, debug=None)`\nInitialize the Linker synchronously (called automatically if role and channel provided to constructor).\n\n```python\nlinker = Linker()\nsuccess = linker.init_sync('server', 'mychannel', debug=True)\n```\n\n#### `init_async(role, channel, debug=None)`\nInitialize the Linker asynchronously.\n\n```python\nasync def setup():\n    linker = Linker()\n    success = await linker.init_async('client', 'mychannel')\n```\n\n### Lifecycle Management\n\n#### `close()`\nClose the connection and clean up resources.\n\n```python\nlinker.close()\n```\n### Debug Methods\n\n#### `enable_debug()`\nEnable debug logging.\n\n```python\nlinker.enable_debug()\n```\n\n#### `disable_debug()`\nDisable debug logging.\n\n```python\nlinker.disable_debug()\n```\n\n## Advanced Examples\n\n### Request-Response Pattern\n\n**Server:**\n```python\nfrom unixevents import Linker\nimport uuid\n\nserver = Linker('server', 'rpc-channel')\n\ndef handle_request(data):\n    request_id = data.get('id')\n    method = data.get('method')\n    params = data.get('params', {})\n    \n    if method == 'add':\n        result = params.get('a', 0) + params.get('b', 0)\n    elif method == 'multiply':\n        result = params.get('a', 0) * params.get('b', 0)\n    else:\n        result = None\n        \n    server.send('response', {\n        'id': request_id,\n        'result': result\n    })\n\nserver.receive('request', handle_request)\n```\n\n**Client:**\n```python\nfrom unixevents import Linker\nimport uuid\nimport threading\n\nclient = Linker('client', 'rpc-channel')\npending_requests = {}\n\ndef handle_response(data):\n    request_id = data.get('id')\n    if request_id in pending_requests:\n        event = pending_requests[request_id]\n        event.result = data.get('result')\n        event.set()\n\nclient.receive('response', handle_response)\n\ndef rpc_call(method, params):\n    request_id = str(uuid.uuid4())\n    event = threading.Event()\n    pending_requests[request_id] = event\n    \n    client.send('request', {\n        'id': request_id,\n        'method': method,\n        'params': params\n    })\n    \n    event.wait(timeout=5)\n    result = getattr(event, 'result', None)\n    del pending_requests[request_id]\n    return result\n\nresult = rpc_call('add', {'a': 5, 'b': 3})\nprint(f\"5 + 3 = {result}\")\n\nresult = rpc_call('multiply', {'a': 4, 'b': 7})\nprint(f\"4 * 7 = {result}\")\n```\n\n### Broadcasting to Multiple Clients\n\n**Server:**\n```python\nfrom unixevents import Linker\nimport time\nimport threading\n\nserver = Linker('server', 'broadcast-channel')\nclients = []\n\ndef handle_join(data):\n    client_name = data.get('name')\n    clients.append(client_name)\n    print(f\"{client_name} joined\")\n    \n    server.send('user_joined', {'name': client_name})\n\nserver.receive('join', handle_join)\n\ndef broadcast_time():\n    while True:\n        current_time = time.strftime('%Y-%m-%d %H:%M:%S')\n        server.send('time_update', {'time': current_time})\n        time.sleep(1)\n\nbroadcast_thread = threading.Thread(target=broadcast_time, daemon=True)\nbroadcast_thread.start()\n\ntry:\n    while True:\n        time.sleep(1)\nexcept KeyboardInterrupt:\n    server.close()\n```\n\n**Client:**\n```python\nfrom unixevents import Linker\nimport time\n\nclient = Linker('client', 'broadcast-channel')\n\ndef handle_time_update(data):\n    print(f\"Server time: {data['time']}\")\n\ndef handle_user_joined(data):\n    print(f\"New user joined: {data['name']}\")\n\nclient.receive('time_update', handle_time_update)\nclient.receive('user_joined', handle_user_joined)\n\nclient.send('join', {'name': 'Client1'})\n\ntry:\n    while True:\n        time.sleep(1)\nexcept KeyboardInterrupt:\n    client.close()\n```\n\n### Async/Await Support\n\n```python\nimport asyncio\nfrom unixevents import Linker\n\nasync def async_server():\n    server = Linker()\n    await server.init_async('server', 'async-channel')\n    \n    def handle_async_event(data):\n        print(f\"Received async: {data}\")\n        server.send_sync('processed', {'status': 'done'})\n    \n    server.receive('process', handle_async_event)\n    \n    await asyncio.sleep(60)\n    server.close()\n\nasync def async_client():\n    client = Linker()\n    await client.init_async('client', 'async-channel')\n    \n    success = await client.send_async('process', {'data': 'test'})\n    print(f\"Sent: {success}\")\n    \n    await asyncio.sleep(2)\n    client.close()\n\nasync def main():\n    await asyncio.gather(\n        async_server(),\n        async_client()\n    )\n\nasyncio.run(main())\n```\n\n### Debug Mode\n\nEnable debug mode to see detailed logs:\n\n```python\n\nlinker = Linker('server', 'mychannel', debug=True)\n\nlinker.enable_debug()\nlinker.disable_debug()\n```\n\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "Unixevents is a lightweight inter-process communication (IPC) library available in both JavaScript (Node.js) and Python. It enables real-time, event-driven communication across processes or applications \u2014 regardless of the language they are written in.",
    "version": "0.0.2",
    "project_urls": {
        "Homepage": "https://github.com/rohsins/unixevents_py",
        "Issues": "https://github.com/rohsins/unixevents_py/issues"
    },
    "split_keywords": [
        "communication",
        " events",
        " ipc",
        " unixevents"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "84c88ee0e945e4a06ae7ba3f002bc5cc649d8d3fd6e7531b1a397082d8ca3607",
                "md5": "a2ba37fd34fe9922634d1c92b3eea947",
                "sha256": "55a136395e03466e7e0303d1f1faece4aab315ad04c763983f2d959b360954b9"
            },
            "downloads": -1,
            "filename": "unixevents-0.0.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "a2ba37fd34fe9922634d1c92b3eea947",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.9",
            "size": 6749,
            "upload_time": "2025-09-11T16:35:53",
            "upload_time_iso_8601": "2025-09-11T16:35:53.922648Z",
            "url": "https://files.pythonhosted.org/packages/84/c8/8ee0e945e4a06ae7ba3f002bc5cc649d8d3fd6e7531b1a397082d8ca3607/unixevents-0.0.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "3bd279ecaa1ff2c98b3433ef469f3ec418688508431c206caebba8b1047ef1dd",
                "md5": "9a5227de7a3ff5645a164d979b61896c",
                "sha256": "57fb6eb5b082c89059178682068ede8b75386c5b5d96ded032aa91896f949478"
            },
            "downloads": -1,
            "filename": "unixevents-0.0.2.tar.gz",
            "has_sig": false,
            "md5_digest": "9a5227de7a3ff5645a164d979b61896c",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.9",
            "size": 6225,
            "upload_time": "2025-09-11T16:35:56",
            "upload_time_iso_8601": "2025-09-11T16:35:56.159386Z",
            "url": "https://files.pythonhosted.org/packages/3b/d2/79ecaa1ff2c98b3433ef469f3ec418688508431c206caebba8b1047ef1dd/unixevents-0.0.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-09-11 16:35:56",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "rohsins",
    "github_project": "unixevents_py",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "unixevents"
}
        
Elapsed time: 0.70036s