pyeventhub


Namepyeventhub JSON
Version 0.1.2 PyPI version JSON
download
home_pagehttps://github.com/fadedreams/pyeventhub
SummaryThis Python module implements an event handling framework (`EventEmitter`) with support for prioritization (`PriorityEvent`), event filtering, and processing using threading, queues (`queue` module), heap operations (`heapq` module), and logging (`logging` module) capabilities.
upload_time2024-06-21 10:28:03
maintainerNone
docs_urlNone
authorfadedreams7
requires_python<4.0,>=3.8
licenseMIT
keywords event emitter event hub utilities
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            
## Event Handling Framework with EventEmitter

This Python module implements an event handling framework (`EventEmitter`) that supports prioritization (`PriorityEvent`), event filtering, and processing using threading, queues (`queue` module), heap operations (`heapq` module), and logging (`logging` module) capabilities.

## Classes

### EventHandler

Defines an interface for handling events.

### NotificationHandler

Extends `EventHandler` and handles notifications with specific `ID` and `Message` attributes.

### Notification

Represents a notification entity with attributes `ID`, `Message`, and `CreatedAt`.

### PriorityEvent

Represents an event with its associated priority for queueing and processing.

### EventEmitter

Manages event registration, emission, and handling. Features include:

- **Logging:** Enables or disables logging and logs events with their priorities.
- **Event Registration:** Registers listeners and handlers for specific events.
- **Event Emission:** Emits events with context for cancellation and timeout.
- **Event Processing:** Processes events in priority order using heap operations.
- **Event Handling:** Dispatches events to registered handlers.

## Example Usage
main.py
```python
import threading
import time
import queue
import logging
from pyeventhub import EventEmitter, NotificationHandler, Notification

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)

    # Example usage
    emitter = EventEmitter(logging=True)

    # Register notification handlers
    handler1 = NotificationHandler(name="EmailHandler")
    handler2 = NotificationHandler(name="SMSHandler")

    emitter.on_event("email_notification", handler1)
    emitter.on_event("sms_notification", handler2)

    # Registering listeners with filters
    def email_filter(data):
        if isinstance(data, Notification):
            return data.ID > 0  # Example filter: process only notifications with ID > 0
        return False

    chEmail = emitter.on("email_notification", email_filter)
    chSMS = emitter.on("sms_notification")

    # Simulate sending notifications with priority
    def send_notifications():
        emitter.emit_with_context(None, "email_notification", Notification(ID=1, Message="New email received", CreatedAt=time.time()), 2)  # Higher priority
        emitter.emit_with_context(None, "sms_notification", Notification(ID=2, Message="You have a new SMS", CreatedAt=time.time()), 1)  # Lower priority

    # Handle notifications asynchronously
    def handle_notifications():
        while True:
            if not chEmail.empty():
                notification = chEmail.get()
                print("Received email notification")
                handler1.handle(notification)
            if not chSMS.empty():
                notification = chSMS.get()
                print("Received SMS notification")
                handler2.handle(notification)

    send_thread = threading.Thread(target=send_notifications)
    handle_thread = threading.Thread(target=handle_notifications)

    send_thread.start()
    handle_thread.start()

    # Allow some time for notifications to be processed
    time.sleep(2)

    # Clean up
    emitter.close()

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/fadedreams/pyeventhub",
    "name": "pyeventhub",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.8",
    "maintainer_email": null,
    "keywords": "event emitter, event hub, utilities",
    "author": "fadedreams7",
    "author_email": "fadedreams7@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/53/9e/e59d5308d8940bb0f20a6724a27d23f722bc3359e8ce9916dc1e3b53101e/pyeventhub-0.1.2.tar.gz",
    "platform": null,
    "description": "\n## Event Handling Framework with EventEmitter\n\nThis Python module implements an event handling framework (`EventEmitter`) that supports prioritization (`PriorityEvent`), event filtering, and processing using threading, queues (`queue` module), heap operations (`heapq` module), and logging (`logging` module) capabilities.\n\n## Classes\n\n### EventHandler\n\nDefines an interface for handling events.\n\n### NotificationHandler\n\nExtends `EventHandler` and handles notifications with specific `ID` and `Message` attributes.\n\n### Notification\n\nRepresents a notification entity with attributes `ID`, `Message`, and `CreatedAt`.\n\n### PriorityEvent\n\nRepresents an event with its associated priority for queueing and processing.\n\n### EventEmitter\n\nManages event registration, emission, and handling. Features include:\n\n- **Logging:** Enables or disables logging and logs events with their priorities.\n- **Event Registration:** Registers listeners and handlers for specific events.\n- **Event Emission:** Emits events with context for cancellation and timeout.\n- **Event Processing:** Processes events in priority order using heap operations.\n- **Event Handling:** Dispatches events to registered handlers.\n\n## Example Usage\nmain.py\n```python\nimport threading\nimport time\nimport queue\nimport logging\nfrom pyeventhub import EventEmitter, NotificationHandler, Notification\n\nif __name__ == \"__main__\":\n    logging.basicConfig(level=logging.INFO)\n\n    # Example usage\n    emitter = EventEmitter(logging=True)\n\n    # Register notification handlers\n    handler1 = NotificationHandler(name=\"EmailHandler\")\n    handler2 = NotificationHandler(name=\"SMSHandler\")\n\n    emitter.on_event(\"email_notification\", handler1)\n    emitter.on_event(\"sms_notification\", handler2)\n\n    # Registering listeners with filters\n    def email_filter(data):\n        if isinstance(data, Notification):\n            return data.ID > 0  # Example filter: process only notifications with ID > 0\n        return False\n\n    chEmail = emitter.on(\"email_notification\", email_filter)\n    chSMS = emitter.on(\"sms_notification\")\n\n    # Simulate sending notifications with priority\n    def send_notifications():\n        emitter.emit_with_context(None, \"email_notification\", Notification(ID=1, Message=\"New email received\", CreatedAt=time.time()), 2)  # Higher priority\n        emitter.emit_with_context(None, \"sms_notification\", Notification(ID=2, Message=\"You have a new SMS\", CreatedAt=time.time()), 1)  # Lower priority\n\n    # Handle notifications asynchronously\n    def handle_notifications():\n        while True:\n            if not chEmail.empty():\n                notification = chEmail.get()\n                print(\"Received email notification\")\n                handler1.handle(notification)\n            if not chSMS.empty():\n                notification = chSMS.get()\n                print(\"Received SMS notification\")\n                handler2.handle(notification)\n\n    send_thread = threading.Thread(target=send_notifications)\n    handle_thread = threading.Thread(target=handle_notifications)\n\n    send_thread.start()\n    handle_thread.start()\n\n    # Allow some time for notifications to be processed\n    time.sleep(2)\n\n    # Clean up\n    emitter.close()\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "This Python module implements an event handling framework (`EventEmitter`) with support for prioritization (`PriorityEvent`), event filtering, and processing using threading, queues (`queue` module), heap operations (`heapq` module), and logging (`logging` module) capabilities.",
    "version": "0.1.2",
    "project_urls": {
        "Homepage": "https://github.com/fadedreams/pyeventhub",
        "Repository": "https://github.com/fadedreams/pyeventhub"
    },
    "split_keywords": [
        "event emitter",
        " event hub",
        " utilities"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "aa67bf1060d13dd123b866365f6c7eb7c9252e7032ec8d49952fd85b49c32e2c",
                "md5": "ead7669043ec4dae20335d282f093d24",
                "sha256": "c87888d95b54fff4f4567dd23e199141f64478baba431ddd7bc68ca5da37966f"
            },
            "downloads": -1,
            "filename": "pyeventhub-0.1.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "ead7669043ec4dae20335d282f093d24",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.8",
            "size": 6632,
            "upload_time": "2024-06-21T10:28:01",
            "upload_time_iso_8601": "2024-06-21T10:28:01.858731Z",
            "url": "https://files.pythonhosted.org/packages/aa/67/bf1060d13dd123b866365f6c7eb7c9252e7032ec8d49952fd85b49c32e2c/pyeventhub-0.1.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "539ee59d5308d8940bb0f20a6724a27d23f722bc3359e8ce9916dc1e3b53101e",
                "md5": "f0ca488284802a1742ad2130415a0530",
                "sha256": "9667b57746ed236a57248c85e60e8560700d62bf7d25f82e49ac57afd0c2e8a6"
            },
            "downloads": -1,
            "filename": "pyeventhub-0.1.2.tar.gz",
            "has_sig": false,
            "md5_digest": "f0ca488284802a1742ad2130415a0530",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.8",
            "size": 3974,
            "upload_time": "2024-06-21T10:28:03",
            "upload_time_iso_8601": "2024-06-21T10:28:03.123763Z",
            "url": "https://files.pythonhosted.org/packages/53/9e/e59d5308d8940bb0f20a6724a27d23f722bc3359e8ce9916dc1e3b53101e/pyeventhub-0.1.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-06-21 10:28:03",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "fadedreams",
    "github_project": "pyeventhub",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "pyeventhub"
}
        
Elapsed time: 4.56247s