# Qanat
Qanat: Create simple event-driven, distributed data pipelines in Python.
Qanat is a lightweight tool for setting up event-driven, distributed data pipelines with
ease. It lets you link together components into a type-checked, decoupled pipeline, with
inter-process communication handled automatically through MQTT (in the future RabbitMQ?)
and serialization handled through JSON.
## Usage Example
```python
from qanat import QanatPipeline
from dataclasses import dataclass
from typing import List
@dataclass
class Frame:
image: bytes
timestamp: str
@dataclass
class Detection:
type: str
confidence: float
@dataclass
class DetectionResults:
objects: List[Detection]
frame_timestamp: str
# Initialize the pipeline with the MQTT broker connection
pipeline = QanatPipeline(broker="mqtt://broker.hivemq.com:1883")
@pipeline.component(output="qanat-demo/frames/raw")
def frame_producer() -> Frame:
"""
Simulates producing a frame and its metadata.
"""
image_data = b'some_image_data' # Replace with actual image data
return Frame(image=image_data, timestamp="2021-07-01T00:00:00Z")
@pipeline.component(
input="qanat-demo/frames/raw",
output="qanat-demo/detections/results"
)
def detector(frame: Frame) -> DetectionResults:
"""
Processes a frame and detects objects, outputting detection results.
"""
detected_objects = [Detection(type="smoke", confidence=0.98)]
return DetectionResults(objects=detected_objects, frame_timestamp=frame.timestamp)
@pipeline.component(input="qanat-demo/detections/results")
def result_publisher(detection_results: DetectionResults):
"""
Publishes the detection results.
"""
print(f"Publishing results: {detection_results}")
# Start the event loop for testing and demonstration
if __name__ == "__main__":
# For demo or testing purposes, this runs all components in the pipeline
pipeline.start_event_loop() # Blocks until the event loop terminates
# In real-world distributed usage, you would start three separate processes,
# one for each component, e.g.:
# frame_producer.start() # Blocks until this specific component terminates
```
Raw data
{
"_id": null,
"home_page": "https://github.com/jarombouts/qanat",
"name": "pyqanat",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.11",
"maintainer_email": "",
"keywords": "data pipelines event-driven architecture microservices mqtt",
"author": "Your Name",
"author_email": "",
"download_url": "https://files.pythonhosted.org/packages/45/44/f3a051697902b88f6407ac4ccd13c13a78468c7a9370a6bbd1c8d54956a5/pyqanat-0.0.0.tar.gz",
"platform": null,
"description": "# Qanat\n\nQanat: Create simple event-driven, distributed data pipelines in Python.\n\nQanat is a lightweight tool for setting up event-driven, distributed data pipelines with\nease. It lets you link together components into a type-checked, decoupled pipeline, with\ninter-process communication handled automatically through MQTT (in the future RabbitMQ?)\nand serialization handled through JSON.\n\n## Usage Example\n\n```python\nfrom qanat import QanatPipeline\nfrom dataclasses import dataclass\nfrom typing import List\n\n@dataclass\nclass Frame:\n image: bytes\n timestamp: str\n\n@dataclass\nclass Detection:\n type: str\n confidence: float\n\n@dataclass\nclass DetectionResults:\n objects: List[Detection]\n frame_timestamp: str\n\n# Initialize the pipeline with the MQTT broker connection\npipeline = QanatPipeline(broker=\"mqtt://broker.hivemq.com:1883\")\n\n@pipeline.component(output=\"qanat-demo/frames/raw\")\ndef frame_producer() -> Frame:\n \"\"\"\n Simulates producing a frame and its metadata.\n \"\"\"\n image_data = b'some_image_data' # Replace with actual image data\n return Frame(image=image_data, timestamp=\"2021-07-01T00:00:00Z\")\n\n@pipeline.component(\n input=\"qanat-demo/frames/raw\", \n output=\"qanat-demo/detections/results\"\n)\ndef detector(frame: Frame) -> DetectionResults:\n \"\"\"\n Processes a frame and detects objects, outputting detection results.\n \"\"\"\n detected_objects = [Detection(type=\"smoke\", confidence=0.98)]\n return DetectionResults(objects=detected_objects, frame_timestamp=frame.timestamp)\n\n@pipeline.component(input=\"qanat-demo/detections/results\")\ndef result_publisher(detection_results: DetectionResults):\n \"\"\"\n Publishes the detection results.\n \"\"\"\n print(f\"Publishing results: {detection_results}\")\n\n# Start the event loop for testing and demonstration\nif __name__ == \"__main__\":\n # For demo or testing purposes, this runs all components in the pipeline\n pipeline.start_event_loop() # Blocks until the event loop terminates\n\n # In real-world distributed usage, you would start three separate processes, \n # one for each component, e.g.:\n # frame_producer.start() # Blocks until this specific component terminates\n```\n\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Create simple event-driven, distributed data pipelines in Python",
"version": "0.0.0",
"project_urls": {
"Homepage": "https://github.com/jarombouts/qanat"
},
"split_keywords": [
"data",
"pipelines",
"event-driven",
"architecture",
"microservices",
"mqtt"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "c1985e04804916af2e8b4c54b83e40a667b9fe0d78709b2ff54e32c19416a997",
"md5": "a587b5596b5b3cd1ec70a6e418466b66",
"sha256": "ab437df6f6ce2aa9785fc90acb296eb05a43d6431a3ac43560a1c78cfa95ad40"
},
"downloads": -1,
"filename": "pyqanat-0.0.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "a587b5596b5b3cd1ec70a6e418466b66",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.11",
"size": 4605,
"upload_time": "2023-11-13T13:22:43",
"upload_time_iso_8601": "2023-11-13T13:22:43.101458Z",
"url": "https://files.pythonhosted.org/packages/c1/98/5e04804916af2e8b4c54b83e40a667b9fe0d78709b2ff54e32c19416a997/pyqanat-0.0.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "4544f3a051697902b88f6407ac4ccd13c13a78468c7a9370a6bbd1c8d54956a5",
"md5": "1cf766009ccc932a795f7b157325cb29",
"sha256": "8b4121f792847d293622c3fba58bcb26937ddc1c9b39ed8318d2be92665901e3"
},
"downloads": -1,
"filename": "pyqanat-0.0.0.tar.gz",
"has_sig": false,
"md5_digest": "1cf766009ccc932a795f7b157325cb29",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.11",
"size": 4231,
"upload_time": "2023-11-13T13:22:44",
"upload_time_iso_8601": "2023-11-13T13:22:44.862956Z",
"url": "https://files.pythonhosted.org/packages/45/44/f3a051697902b88f6407ac4ccd13c13a78468c7a9370a6bbd1c8d54956a5/pyqanat-0.0.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-11-13 13:22:44",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "jarombouts",
"github_project": "qanat",
"github_not_found": true,
"lcname": "pyqanat"
}