pyqanat


Namepyqanat JSON
Version 0.0.0 PyPI version JSON
download
home_pagehttps://github.com/jarombouts/qanat
SummaryCreate simple event-driven, distributed data pipelines in Python
upload_time2023-11-13 13:22:44
maintainer
docs_urlNone
authorYour Name
requires_python>=3.11
licenseMIT
keywords data pipelines event-driven architecture microservices mqtt
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Qanat

Qanat: Create simple event-driven, distributed data pipelines in Python.

Qanat is a lightweight tool for setting up event-driven, distributed data pipelines with
ease. It lets you link together components into a type-checked, decoupled pipeline, with
inter-process communication handled automatically through MQTT (in the future RabbitMQ?)
and serialization handled through JSON.

## Usage Example

```python
from qanat import QanatPipeline
from dataclasses import dataclass
from typing import List

@dataclass
class Frame:
    image: bytes
    timestamp: str

@dataclass
class Detection:
    type: str
    confidence: float

@dataclass
class DetectionResults:
    objects: List[Detection]
    frame_timestamp: str

# Initialize the pipeline with the MQTT broker connection
pipeline = QanatPipeline(broker="mqtt://broker.hivemq.com:1883")

@pipeline.component(output="qanat-demo/frames/raw")
def frame_producer() -> Frame:
    """
    Simulates producing a frame and its metadata.
    """
    image_data = b'some_image_data'  # Replace with actual image data
    return Frame(image=image_data, timestamp="2021-07-01T00:00:00Z")

@pipeline.component(
    input="qanat-demo/frames/raw", 
    output="qanat-demo/detections/results"
)
def detector(frame: Frame) -> DetectionResults:
    """
    Processes a frame and detects objects, outputting detection results.
    """
    detected_objects = [Detection(type="smoke", confidence=0.98)]
    return DetectionResults(objects=detected_objects, frame_timestamp=frame.timestamp)

@pipeline.component(input="qanat-demo/detections/results")
def result_publisher(detection_results: DetectionResults):
    """
    Publishes the detection results.
    """
    print(f"Publishing results: {detection_results}")

# Start the event loop for testing and demonstration
if __name__ == "__main__":
    # For demo or testing purposes, this runs all components in the pipeline
    pipeline.start_event_loop() # Blocks until the event loop terminates

    # In real-world distributed usage, you would start three separate processes, 
    # one for each component, e.g.:
    # frame_producer.start() # Blocks until this specific component terminates
```


            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/jarombouts/qanat",
    "name": "pyqanat",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.11",
    "maintainer_email": "",
    "keywords": "data pipelines event-driven architecture microservices mqtt",
    "author": "Your Name",
    "author_email": "",
    "download_url": "https://files.pythonhosted.org/packages/45/44/f3a051697902b88f6407ac4ccd13c13a78468c7a9370a6bbd1c8d54956a5/pyqanat-0.0.0.tar.gz",
    "platform": null,
    "description": "# Qanat\n\nQanat: Create simple event-driven, distributed data pipelines in Python.\n\nQanat is a lightweight tool for setting up event-driven, distributed data pipelines with\nease. It lets you link together components into a type-checked, decoupled pipeline, with\ninter-process communication handled automatically through MQTT (in the future RabbitMQ?)\nand serialization handled through JSON.\n\n## Usage Example\n\n```python\nfrom qanat import QanatPipeline\nfrom dataclasses import dataclass\nfrom typing import List\n\n@dataclass\nclass Frame:\n    image: bytes\n    timestamp: str\n\n@dataclass\nclass Detection:\n    type: str\n    confidence: float\n\n@dataclass\nclass DetectionResults:\n    objects: List[Detection]\n    frame_timestamp: str\n\n# Initialize the pipeline with the MQTT broker connection\npipeline = QanatPipeline(broker=\"mqtt://broker.hivemq.com:1883\")\n\n@pipeline.component(output=\"qanat-demo/frames/raw\")\ndef frame_producer() -> Frame:\n    \"\"\"\n    Simulates producing a frame and its metadata.\n    \"\"\"\n    image_data = b'some_image_data'  # Replace with actual image data\n    return Frame(image=image_data, timestamp=\"2021-07-01T00:00:00Z\")\n\n@pipeline.component(\n    input=\"qanat-demo/frames/raw\", \n    output=\"qanat-demo/detections/results\"\n)\ndef detector(frame: Frame) -> DetectionResults:\n    \"\"\"\n    Processes a frame and detects objects, outputting detection results.\n    \"\"\"\n    detected_objects = [Detection(type=\"smoke\", confidence=0.98)]\n    return DetectionResults(objects=detected_objects, frame_timestamp=frame.timestamp)\n\n@pipeline.component(input=\"qanat-demo/detections/results\")\ndef result_publisher(detection_results: DetectionResults):\n    \"\"\"\n    Publishes the detection results.\n    \"\"\"\n    print(f\"Publishing results: {detection_results}\")\n\n# Start the event loop for testing and demonstration\nif __name__ == \"__main__\":\n    # For demo or testing purposes, this runs all components in the pipeline\n    pipeline.start_event_loop() # Blocks until the event loop terminates\n\n    # In real-world distributed usage, you would start three separate processes, \n    # one for each component, e.g.:\n    # frame_producer.start() # Blocks until this specific component terminates\n```\n\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Create simple event-driven, distributed data pipelines in Python",
    "version": "0.0.0",
    "project_urls": {
        "Homepage": "https://github.com/jarombouts/qanat"
    },
    "split_keywords": [
        "data",
        "pipelines",
        "event-driven",
        "architecture",
        "microservices",
        "mqtt"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "c1985e04804916af2e8b4c54b83e40a667b9fe0d78709b2ff54e32c19416a997",
                "md5": "a587b5596b5b3cd1ec70a6e418466b66",
                "sha256": "ab437df6f6ce2aa9785fc90acb296eb05a43d6431a3ac43560a1c78cfa95ad40"
            },
            "downloads": -1,
            "filename": "pyqanat-0.0.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "a587b5596b5b3cd1ec70a6e418466b66",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.11",
            "size": 4605,
            "upload_time": "2023-11-13T13:22:43",
            "upload_time_iso_8601": "2023-11-13T13:22:43.101458Z",
            "url": "https://files.pythonhosted.org/packages/c1/98/5e04804916af2e8b4c54b83e40a667b9fe0d78709b2ff54e32c19416a997/pyqanat-0.0.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "4544f3a051697902b88f6407ac4ccd13c13a78468c7a9370a6bbd1c8d54956a5",
                "md5": "1cf766009ccc932a795f7b157325cb29",
                "sha256": "8b4121f792847d293622c3fba58bcb26937ddc1c9b39ed8318d2be92665901e3"
            },
            "downloads": -1,
            "filename": "pyqanat-0.0.0.tar.gz",
            "has_sig": false,
            "md5_digest": "1cf766009ccc932a795f7b157325cb29",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.11",
            "size": 4231,
            "upload_time": "2023-11-13T13:22:44",
            "upload_time_iso_8601": "2023-11-13T13:22:44.862956Z",
            "url": "https://files.pythonhosted.org/packages/45/44/f3a051697902b88f6407ac4ccd13c13a78468c7a9370a6bbd1c8d54956a5/pyqanat-0.0.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-11-13 13:22:44",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "jarombouts",
    "github_project": "qanat",
    "github_not_found": true,
    "lcname": "pyqanat"
}
        
Elapsed time: 0.13501s