Name | pyeventhub JSON |
Version |
0.1.2
JSON |
| download |
home_page | https://github.com/fadedreams/pyeventhub |
Summary | This Python module implements an event handling framework (`EventEmitter`) with support for prioritization (`PriorityEvent`), event filtering, and processing using threading, queues (`queue` module), heap operations (`heapq` module), and logging (`logging` module) capabilities. |
upload_time | 2024-06-21 10:28:03 |
maintainer | None |
docs_url | None |
author | fadedreams7 |
requires_python | <4.0,>=3.8 |
license | MIT |
keywords |
event emitter
event hub
utilities
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
## Event Handling Framework with EventEmitter
This Python module implements an event handling framework (`EventEmitter`) that supports prioritization (`PriorityEvent`), event filtering, and processing using threading, queues (`queue` module), heap operations (`heapq` module), and logging (`logging` module) capabilities.
## Classes
### EventHandler
Defines an interface for handling events.
### NotificationHandler
Extends `EventHandler` and handles notifications with specific `ID` and `Message` attributes.
### Notification
Represents a notification entity with attributes `ID`, `Message`, and `CreatedAt`.
### PriorityEvent
Represents an event with its associated priority for queueing and processing.
### EventEmitter
Manages event registration, emission, and handling. Features include:
- **Logging:** Enables or disables logging and logs events with their priorities.
- **Event Registration:** Registers listeners and handlers for specific events.
- **Event Emission:** Emits events with context for cancellation and timeout.
- **Event Processing:** Processes events in priority order using heap operations.
- **Event Handling:** Dispatches events to registered handlers.
## Example Usage
main.py
```python
import threading
import time
import queue
import logging
from pyeventhub import EventEmitter, NotificationHandler, Notification
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# Example usage
emitter = EventEmitter(logging=True)
# Register notification handlers
handler1 = NotificationHandler(name="EmailHandler")
handler2 = NotificationHandler(name="SMSHandler")
emitter.on_event("email_notification", handler1)
emitter.on_event("sms_notification", handler2)
# Registering listeners with filters
def email_filter(data):
if isinstance(data, Notification):
return data.ID > 0 # Example filter: process only notifications with ID > 0
return False
chEmail = emitter.on("email_notification", email_filter)
chSMS = emitter.on("sms_notification")
# Simulate sending notifications with priority
def send_notifications():
emitter.emit_with_context(None, "email_notification", Notification(ID=1, Message="New email received", CreatedAt=time.time()), 2) # Higher priority
emitter.emit_with_context(None, "sms_notification", Notification(ID=2, Message="You have a new SMS", CreatedAt=time.time()), 1) # Lower priority
# Handle notifications asynchronously
def handle_notifications():
while True:
if not chEmail.empty():
notification = chEmail.get()
print("Received email notification")
handler1.handle(notification)
if not chSMS.empty():
notification = chSMS.get()
print("Received SMS notification")
handler2.handle(notification)
send_thread = threading.Thread(target=send_notifications)
handle_thread = threading.Thread(target=handle_notifications)
send_thread.start()
handle_thread.start()
# Allow some time for notifications to be processed
time.sleep(2)
# Clean up
emitter.close()
Raw data
{
"_id": null,
"home_page": "https://github.com/fadedreams/pyeventhub",
"name": "pyeventhub",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.8",
"maintainer_email": null,
"keywords": "event emitter, event hub, utilities",
"author": "fadedreams7",
"author_email": "fadedreams7@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/53/9e/e59d5308d8940bb0f20a6724a27d23f722bc3359e8ce9916dc1e3b53101e/pyeventhub-0.1.2.tar.gz",
"platform": null,
"description": "\n## Event Handling Framework with EventEmitter\n\nThis Python module implements an event handling framework (`EventEmitter`) that supports prioritization (`PriorityEvent`), event filtering, and processing using threading, queues (`queue` module), heap operations (`heapq` module), and logging (`logging` module) capabilities.\n\n## Classes\n\n### EventHandler\n\nDefines an interface for handling events.\n\n### NotificationHandler\n\nExtends `EventHandler` and handles notifications with specific `ID` and `Message` attributes.\n\n### Notification\n\nRepresents a notification entity with attributes `ID`, `Message`, and `CreatedAt`.\n\n### PriorityEvent\n\nRepresents an event with its associated priority for queueing and processing.\n\n### EventEmitter\n\nManages event registration, emission, and handling. Features include:\n\n- **Logging:** Enables or disables logging and logs events with their priorities.\n- **Event Registration:** Registers listeners and handlers for specific events.\n- **Event Emission:** Emits events with context for cancellation and timeout.\n- **Event Processing:** Processes events in priority order using heap operations.\n- **Event Handling:** Dispatches events to registered handlers.\n\n## Example Usage\nmain.py\n```python\nimport threading\nimport time\nimport queue\nimport logging\nfrom pyeventhub import EventEmitter, NotificationHandler, Notification\n\nif __name__ == \"__main__\":\n logging.basicConfig(level=logging.INFO)\n\n # Example usage\n emitter = EventEmitter(logging=True)\n\n # Register notification handlers\n handler1 = NotificationHandler(name=\"EmailHandler\")\n handler2 = NotificationHandler(name=\"SMSHandler\")\n\n emitter.on_event(\"email_notification\", handler1)\n emitter.on_event(\"sms_notification\", handler2)\n\n # Registering listeners with filters\n def email_filter(data):\n if isinstance(data, Notification):\n return data.ID > 0 # Example filter: process only notifications with ID > 0\n return False\n\n chEmail = emitter.on(\"email_notification\", email_filter)\n chSMS = emitter.on(\"sms_notification\")\n\n # Simulate sending notifications with priority\n def send_notifications():\n emitter.emit_with_context(None, \"email_notification\", Notification(ID=1, Message=\"New email received\", CreatedAt=time.time()), 2) # Higher priority\n emitter.emit_with_context(None, \"sms_notification\", Notification(ID=2, Message=\"You have a new SMS\", CreatedAt=time.time()), 1) # Lower priority\n\n # Handle notifications asynchronously\n def handle_notifications():\n while True:\n if not chEmail.empty():\n notification = chEmail.get()\n print(\"Received email notification\")\n handler1.handle(notification)\n if not chSMS.empty():\n notification = chSMS.get()\n print(\"Received SMS notification\")\n handler2.handle(notification)\n\n send_thread = threading.Thread(target=send_notifications)\n handle_thread = threading.Thread(target=handle_notifications)\n\n send_thread.start()\n handle_thread.start()\n\n # Allow some time for notifications to be processed\n time.sleep(2)\n\n # Clean up\n emitter.close()\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "This Python module implements an event handling framework (`EventEmitter`) with support for prioritization (`PriorityEvent`), event filtering, and processing using threading, queues (`queue` module), heap operations (`heapq` module), and logging (`logging` module) capabilities.",
"version": "0.1.2",
"project_urls": {
"Homepage": "https://github.com/fadedreams/pyeventhub",
"Repository": "https://github.com/fadedreams/pyeventhub"
},
"split_keywords": [
"event emitter",
" event hub",
" utilities"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "aa67bf1060d13dd123b866365f6c7eb7c9252e7032ec8d49952fd85b49c32e2c",
"md5": "ead7669043ec4dae20335d282f093d24",
"sha256": "c87888d95b54fff4f4567dd23e199141f64478baba431ddd7bc68ca5da37966f"
},
"downloads": -1,
"filename": "pyeventhub-0.1.2-py3-none-any.whl",
"has_sig": false,
"md5_digest": "ead7669043ec4dae20335d282f093d24",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.8",
"size": 6632,
"upload_time": "2024-06-21T10:28:01",
"upload_time_iso_8601": "2024-06-21T10:28:01.858731Z",
"url": "https://files.pythonhosted.org/packages/aa/67/bf1060d13dd123b866365f6c7eb7c9252e7032ec8d49952fd85b49c32e2c/pyeventhub-0.1.2-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "539ee59d5308d8940bb0f20a6724a27d23f722bc3359e8ce9916dc1e3b53101e",
"md5": "f0ca488284802a1742ad2130415a0530",
"sha256": "9667b57746ed236a57248c85e60e8560700d62bf7d25f82e49ac57afd0c2e8a6"
},
"downloads": -1,
"filename": "pyeventhub-0.1.2.tar.gz",
"has_sig": false,
"md5_digest": "f0ca488284802a1742ad2130415a0530",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.8",
"size": 3974,
"upload_time": "2024-06-21T10:28:03",
"upload_time_iso_8601": "2024-06-21T10:28:03.123763Z",
"url": "https://files.pythonhosted.org/packages/53/9e/e59d5308d8940bb0f20a6724a27d23f722bc3359e8ce9916dc1e3b53101e/pyeventhub-0.1.2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-06-21 10:28:03",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "fadedreams",
"github_project": "pyeventhub",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "pyeventhub"
}