streamdaq


Namestreamdaq JSON
Version 0.1.10 PyPI version JSON
download
home_pageNone
SummaryPlug-and-play real-time quality monitoring for data streams!
upload_time2025-08-06 15:31:06
maintainerNone
docs_urlNone
authorApostolos Giannoulidis
requires_python>=3.11
licenseMIT
keywords data-quality streaming real-time monitoring data-engineering
VCS
bugtrack_url
requirements pathway matplotlib datasketch datasketches
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Stream DaQ

## TL; DR
Plug-and-play quality monitoring for high-volume and velocity data streams! In Python, of course!

```python
# pip install streamdaq

from streamdaq import StreamDaQ, DaQMeasures as dqm, Windows
from some_existing_workflow import check_most_frequent_items

daq = StreamDaQ().configure(
    window=Windows.tumbling(3),
    instance="user_id",
    time_column="timestamp",
    wait_for_late=1,
    time_format='%Y-%m-%d %H:%M:%S'
)

# Step 2: Define what Data Quality means for you
daq.add(dqm.count('interaction_events'), assess="(5, 15]", name="count")
    .add(dqm.max('interaction_events'), assess=">5.09", name="max_interact")
    .add(dqm.most_frequent('interaction_events'), assess=check_most_frequent_items, name="freq_interact")
    .add(dqm.distinct_count_approx('interaction_events'), assess="==9", name="approx_dist_interact")

# Complete list of Data Quality Measures (dqm): https://github.com/Bilpapster/stream-DaQ/blob/main/streamdaq/DaQMeasures.py

# Step 3: Kick-off monitoring and let Stream DaQ do the work while you focus on the important
daq.watch_out()
```
More examples can be found in the [examples directory](https://github.com/Bilpapster/stream-DaQ/tree/main/examples) of the project. Even more examples are on their way to be integrated shortly! Thank you for your patience! 

![Stream DaQ logo](https://github.com/user-attachments/assets/ebe3a950-5fbb-49d8-b6b1-f232ca7dc362)

## Motivation
Remember the joy of bath time with those trusty rubber ducks, keeping us company while floating through the bubbles?
Well, think of **Stream DaQ** as the duck for your data — keeping your streaming data clean and afloat in a sea of
information. Just like those bath ducks helped make our playtime fun and carefree, Stream DaQ keeps an eye on your data
and lets you know the moment things get messy, so you can take action ***in real time***!


## The project

**Stream DaQ** is originally developed in Python, leveraging internally
the [Pathway](https://github.com/pathwaycom/pathway) stream processing library, which is an open source project, as
well. Previous versions of the project were featuring more Python stream processing libraries,
namely [Faust](https://faust-streaming.github.io/faust/) and [Bytewax](https://bytewax.io/). You can find source code
using these frameworks in the `faust-vs-bytewax` branch of the repository. Comparisons between the two libraries are
also available there. Our immediate plans is to extend the functionality of Stream DaQ primarily in Pathway. The latest
advancements of the tool will always be available in the `main` branch (you are here).

The project is developed by the Data Engineering Team (DELAB) of [Datalab AUTh](https://datalab.csd.auth.gr/), under the
supervision of [Prof. Anastasios Gounaris](https://datalab-old.csd.auth.gr/~gounaris/).

## Key functionality

**Stream DaQ** keeps an eye on your data stream, letting you know when travelling data are not as expected. In **real
time**. So that you can take actions. There are several key aspects of the tool making it a powerful option for data
quality monitoring on data streams:

1. *Highly configurable*: Stream DaQ comes with plenty of built-in data quality measurements, so that you can choose
   which of them fit your use case. We know that every data-centric application is different, so being able to **define
   ** what "data quality" means for you is precious.
2. *Real time alerts*: Stream DaQ defines highly meaningful data quality checks for data streams, letting the check
   results be a stream on their own, as well. This architectural choice enables real time alerts, in case the standards
   or thresholds you have defined are not met!

## Stream DaQ's architecture

![StreamDaQ architecture animation](https://github.com/user-attachments/assets/df59f529-c74d-425e-a19f-66a95482c716)

## Current Suite of Measurements
The following list provides a brief overview of the current Suite of Measurements supported by **Stream DaQ**. Note that for all the functionalities below, **Stream DaQ** comes with a wide range of _implementation variations_, in order to cover a broad and heterogeneous spectrum of DQ needs, depending on the domain, the task and the system at hand. For all these functionalities, **Stream DaQ** lets room for extensive customization via custom thresholding and variation selection with only a couple lines of code. Stay tuned, since new functionalities are on their way to be incorporated into **Stream DaQ**! Since then, Hapy Qua(ck)litying!

- **Descriptive statistics**: min, max, mean, median, std, number/fraction above mean
- **Availability**: at least a value in a window
- **Frozen stream detection**: all/most values are the same in a window
- **Range/Set validation**: values fall within a range or set of accepted values
- **Regex validation**: values comply with a privded regex
- **Unordered stream detection**: elements violate (time) ordering
- **Volume & Cardinalities**: count, distinct, unique, most_frequent, heavy hitters (approx)
- **Distribution percentiles**
- **Correlation analysis** between fields
- **Value Lengths**: {min, max, mean, median} length (_String-specific_)
- **Integer - Fractional parts**: {min, max, mean, median} integer or fractional part (_Float-specific_)

## Example code

Data quality monitoring is a highly case-specific task. We feel you! That's why we have tried to make it easier than
ever to define what data quality means to your *very own case*. You just need a couple of lines to define what you want
to be monitored and that's it! Sit back, and focus on the important stuff; the **real time** results. Stream DaQ
reliably handles all the rest for you. See it in a toy example!

```python
# pip install streamdaq

from streamdaq import StreamDaQ, DaQMeasures as dqm, Windows


def is_seven_frequent(most_frequent: int | float | tuple) -> bool:
    from collections.abc import Iterable

    # if most frequent items are more than one, check for set membership
    if isinstance(most_frequent, Iterable):
        return 7 in most_frequent

    # if there is a single most frequent item, check just for equality
    return most_frequent == 7


daq = StreamDaQ().configure(
    window=Windows.tumbling(3),
    instance="user_id",
    time_column="timestamp",
    wait_for_late=1,
    time_format='%Y-%m-%d %H:%M:%S'
)

# Step 2: Define what Data Quality means for you
daq.add(dqm.count('interaction_events'), assess="(5, 15]", name="count")
    .add(dqm.most_frequent('interaction_events'), assess=is_seven_frequent, name="freq_interact")
    .add(dqm.distinct_count_approx('interaction_events'), assess="==9", name="approx_dist_interact")

# Step 3: Kick-off monitoring and let Stream DaQ do the work while you focus on the important
daq.watch_out()
```

In this simple example, we define three different data quality checks:

- the number of elements (`count`): we expect them to be in the range `(5, 15]`;
- the most frequent values (`most_frequent`): we expect number 7 to be frequent;
- the number of distinct values (`number_of_distinct_approx`): we expect the distict values to be exactly 9 (`==9`).

These checks are monitored in real time for every stream window. Windows can be tumbling, sliding, or session-based and are 
a fundamental notion of the *Stream DaQ* ecosystem. That's why *Stream DaQ* gives you full control on configuring windows
that are suitable for your use-case!

Based on your desired window settings, Stream DaQ takes over to continuously monitor (`watch_out`) the stream, as new data arrives.
The monitoring results are reported in real time, as a meta stream. That is, every row of the result is a new stream
object itself, as following:

```markdown
user_id | window_start | window_end   | count       | max_interact | med_interact | freq_interact   
UserA   | 1744195038.0 | 1744195041.0 | (14, True)  | (10, True)   | (6.5, True)  | (6, False)      
UserA   | 1744195041.0 | 1744195044.0 | (16, False) | (10, True)   | (5.0, True)  | (7, True)      
UserB   | 1744195044.0 | 1744195047.0 | (16, False) | (10, True)   | (7.0, True)  | (9, False)      
UserB   | 1744195047.0 | 1744195050.0 | (8, True)   | (8, True)    | (5.0, True)  | ((2, 7), True)
```

The above checks are just a small subset of the large amount of built-in, plug-and-play data quality validations *Stream DaQ* comes with. A detailed 
list of all the available checks will be included shortly.

## Execution

Just install the Python library (Python >= 3.11) and ... happy quacklitying!

   ```bash
    pip install streamdaq
   ```

## Work in progress

The project is in full development at current time, so more functionalities, documentation, examples and demonstrations
are on their way to be included shortly. We thank you for your patience.

## Acknowledgements

Special thanks to [Maria Kavouridou](https://www.linkedin.com/in/maria-kavouridou/) for putting effort and love, in
order to give birth to the Stream DaQ logo.


            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "streamdaq",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.11",
    "maintainer_email": "Vassilis Papastergios <papster@csd.auth.gr>",
    "keywords": "data-quality, streaming, real-time, monitoring, data-engineering",
    "author": "Apostolos Giannoulidis",
    "author_email": "Vassilis Papastergios <papster@csd.auth.gr>, Anastasios Gounaris <gounaria@csd.auth.gr>",
    "download_url": "https://files.pythonhosted.org/packages/2f/58/922625bc3cc075dc0fe5f16d76fd0d66677e13d4ab632bf36d3ffc59d2f2/streamdaq-0.1.10.tar.gz",
    "platform": null,
    "description": "# Stream DaQ\n\n## TL; DR\nPlug-and-play quality monitoring for high-volume and velocity data streams! In Python, of course!\n\n```python\n# pip install streamdaq\n\nfrom streamdaq import StreamDaQ, DaQMeasures as dqm, Windows\nfrom some_existing_workflow import check_most_frequent_items\n\ndaq = StreamDaQ().configure(\n    window=Windows.tumbling(3),\n    instance=\"user_id\",\n    time_column=\"timestamp\",\n    wait_for_late=1,\n    time_format='%Y-%m-%d %H:%M:%S'\n)\n\n# Step 2: Define what Data Quality means for you\ndaq.add(dqm.count('interaction_events'), assess=\"(5, 15]\", name=\"count\")\n    .add(dqm.max('interaction_events'), assess=\">5.09\", name=\"max_interact\")\n    .add(dqm.most_frequent('interaction_events'), assess=check_most_frequent_items, name=\"freq_interact\")\n    .add(dqm.distinct_count_approx('interaction_events'), assess=\"==9\", name=\"approx_dist_interact\")\n\n# Complete list of Data Quality Measures (dqm): https://github.com/Bilpapster/stream-DaQ/blob/main/streamdaq/DaQMeasures.py\n\n# Step 3: Kick-off monitoring and let Stream DaQ do the work while you focus on the important\ndaq.watch_out()\n```\nMore examples can be found in the [examples directory](https://github.com/Bilpapster/stream-DaQ/tree/main/examples) of the project. Even more examples are on their way to be integrated shortly! Thank you for your patience! \n\n![Stream DaQ logo](https://github.com/user-attachments/assets/ebe3a950-5fbb-49d8-b6b1-f232ca7dc362)\n\n## Motivation\nRemember the joy of bath time with those trusty rubber ducks, keeping us company while floating through the bubbles?\nWell, think of **Stream DaQ** as the duck for your data \u2014 keeping your streaming data clean and afloat in a sea of\ninformation. Just like those bath ducks helped make our playtime fun and carefree, Stream DaQ keeps an eye on your data\nand lets you know the moment things get messy, so you can take action ***in real time***!\n\n\n## The project\n\n**Stream DaQ** is originally developed in Python, leveraging internally\nthe [Pathway](https://github.com/pathwaycom/pathway) stream processing library, which is an open source project, as\nwell. Previous versions of the project were featuring more Python stream processing libraries,\nnamely [Faust](https://faust-streaming.github.io/faust/) and [Bytewax](https://bytewax.io/). You can find source code\nusing these frameworks in the `faust-vs-bytewax` branch of the repository. Comparisons between the two libraries are\nalso available there. Our immediate plans is to extend the functionality of Stream DaQ primarily in Pathway. The latest\nadvancements of the tool will always be available in the `main` branch (you are here).\n\nThe project is developed by the Data Engineering Team (DELAB) of [Datalab AUTh](https://datalab.csd.auth.gr/), under the\nsupervision of [Prof. Anastasios Gounaris](https://datalab-old.csd.auth.gr/~gounaris/).\n\n## Key functionality\n\n**Stream DaQ** keeps an eye on your data stream, letting you know when travelling data are not as expected. In **real\ntime**. So that you can take actions. There are several key aspects of the tool making it a powerful option for data\nquality monitoring on data streams:\n\n1. *Highly configurable*: Stream DaQ comes with plenty of built-in data quality measurements, so that you can choose\n   which of them fit your use case. We know that every data-centric application is different, so being able to **define\n   ** what \"data quality\" means for you is precious.\n2. *Real time alerts*: Stream DaQ defines highly meaningful data quality checks for data streams, letting the check\n   results be a stream on their own, as well. This architectural choice enables real time alerts, in case the standards\n   or thresholds you have defined are not met!\n\n## Stream DaQ's architecture\n\n![StreamDaQ architecture animation](https://github.com/user-attachments/assets/df59f529-c74d-425e-a19f-66a95482c716)\n\n## Current Suite of Measurements\nThe following list provides a brief overview of the current Suite of Measurements supported by **Stream DaQ**. Note that for all the functionalities below, **Stream DaQ** comes with a wide range of _implementation variations_, in order to cover a broad and heterogeneous spectrum of DQ needs, depending on the domain, the task and the system at hand. For all these functionalities, **Stream DaQ** lets room for extensive customization via custom thresholding and variation selection with only a couple lines of code. Stay tuned, since new functionalities are on their way to be incorporated into **Stream DaQ**! Since then, Hapy Qua(ck)litying!\n\n- **Descriptive statistics**: min, max, mean, median, std, number/fraction above mean\n- **Availability**: at least a value in a window\n- **Frozen stream detection**: all/most values are the same in a window\n- **Range/Set validation**: values fall within a range or set of accepted values\n- **Regex validation**: values comply with a privded regex\n- **Unordered stream detection**: elements violate (time) ordering\n- **Volume & Cardinalities**: count, distinct, unique, most_frequent, heavy hitters (approx)\n- **Distribution percentiles**\n- **Correlation analysis** between fields\n- **Value Lengths**: {min, max, mean, median} length (_String-specific_)\n- **Integer - Fractional parts**: {min, max, mean, median} integer or fractional part (_Float-specific_)\n\n## Example code\n\nData quality monitoring is a highly case-specific task. We feel you! That's why we have tried to make it easier than\never to define what data quality means to your *very own case*. You just need a couple of lines to define what you want\nto be monitored and that's it! Sit back, and focus on the important stuff; the **real time** results. Stream DaQ\nreliably handles all the rest for you. See it in a toy example!\n\n```python\n# pip install streamdaq\n\nfrom streamdaq import StreamDaQ, DaQMeasures as dqm, Windows\n\n\ndef is_seven_frequent(most_frequent: int | float | tuple) -> bool:\n    from collections.abc import Iterable\n\n    # if most frequent items are more than one, check for set membership\n    if isinstance(most_frequent, Iterable):\n        return 7 in most_frequent\n\n    # if there is a single most frequent item, check just for equality\n    return most_frequent == 7\n\n\ndaq = StreamDaQ().configure(\n    window=Windows.tumbling(3),\n    instance=\"user_id\",\n    time_column=\"timestamp\",\n    wait_for_late=1,\n    time_format='%Y-%m-%d %H:%M:%S'\n)\n\n# Step 2: Define what Data Quality means for you\ndaq.add(dqm.count('interaction_events'), assess=\"(5, 15]\", name=\"count\")\n    .add(dqm.most_frequent('interaction_events'), assess=is_seven_frequent, name=\"freq_interact\")\n    .add(dqm.distinct_count_approx('interaction_events'), assess=\"==9\", name=\"approx_dist_interact\")\n\n# Step 3: Kick-off monitoring and let Stream DaQ do the work while you focus on the important\ndaq.watch_out()\n```\n\nIn this simple example, we define three different data quality checks:\n\n- the number of elements (`count`): we expect them to be in the range `(5, 15]`;\n- the most frequent values (`most_frequent`): we expect number 7 to be frequent;\n- the number of distinct values (`number_of_distinct_approx`): we expect the distict values to be exactly 9 (`==9`).\n\nThese checks are monitored in real time for every stream window. Windows can be tumbling, sliding, or session-based and are \na fundamental notion of the *Stream DaQ* ecosystem. That's why *Stream DaQ* gives you full control on configuring windows\nthat are suitable for your use-case!\n\nBased on your desired window settings, Stream DaQ takes over to continuously monitor (`watch_out`) the stream, as new data arrives.\nThe monitoring results are reported in real time, as a meta stream. That is, every row of the result is a new stream\nobject itself, as following:\n\n```markdown\nuser_id | window_start | window_end   | count       | max_interact | med_interact | freq_interact   \nUserA   | 1744195038.0 | 1744195041.0 | (14, True)  | (10, True)   | (6.5, True)  | (6, False)      \nUserA   | 1744195041.0 | 1744195044.0 | (16, False) | (10, True)   | (5.0, True)  | (7, True)      \nUserB   | 1744195044.0 | 1744195047.0 | (16, False) | (10, True)   | (7.0, True)  | (9, False)      \nUserB   | 1744195047.0 | 1744195050.0 | (8, True)   | (8, True)    | (5.0, True)  | ((2, 7), True)\n```\n\nThe above checks are just a small subset of the large amount of built-in, plug-and-play data quality validations *Stream DaQ* comes with. A detailed \nlist of all the available checks will be included shortly.\n\n## Execution\n\nJust install the Python library (Python >= 3.11) and ... happy quacklitying!\n\n   ```bash\n    pip install streamdaq\n   ```\n\n## Work in progress\n\nThe project is in full development at current time, so more functionalities, documentation, examples and demonstrations\nare on their way to be included shortly. We thank you for your patience.\n\n## Acknowledgements\n\nSpecial thanks to [Maria Kavouridou](https://www.linkedin.com/in/maria-kavouridou/) for putting effort and love, in\norder to give birth to the Stream DaQ logo.\n\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Plug-and-play real-time quality monitoring for data streams!",
    "version": "0.1.10",
    "project_urls": {
        "Bug Tracker": "https://github.com/Bilpapster/stream-DaQ/issues",
        "Documentation": "https://bilpapster.github.io/stream-DaQ/",
        "Homepage": "https://bilpapster.github.io/stream-DaQ/",
        "Repository": "https://github.com/Bilpapster/stream-DaQ"
    },
    "split_keywords": [
        "data-quality",
        " streaming",
        " real-time",
        " monitoring",
        " data-engineering"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "2edfdc3a8cbcdd3f7b454a3b549c294c77b8c65f6b5340a89e8a0f06f8e6a2aa",
                "md5": "ad543aeebd3e361826c0c009446a0b0a",
                "sha256": "45ce17a8b4b01b82635230e12159745fd2057f862883e252f7150c99e49f0a4f"
            },
            "downloads": -1,
            "filename": "streamdaq-0.1.10-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "ad543aeebd3e361826c0c009446a0b0a",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.11",
            "size": 25174,
            "upload_time": "2025-08-06T15:31:05",
            "upload_time_iso_8601": "2025-08-06T15:31:05.710826Z",
            "url": "https://files.pythonhosted.org/packages/2e/df/dc3a8cbcdd3f7b454a3b549c294c77b8c65f6b5340a89e8a0f06f8e6a2aa/streamdaq-0.1.10-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "2f58922625bc3cc075dc0fe5f16d76fd0d66677e13d4ab632bf36d3ffc59d2f2",
                "md5": "dc3705d9b65d41d720593ca7097ef743",
                "sha256": "b4880887d3bb6925480d2f0cd1f3ff7b9ccbda48d03575f6bb8748077207e3d4"
            },
            "downloads": -1,
            "filename": "streamdaq-0.1.10.tar.gz",
            "has_sig": false,
            "md5_digest": "dc3705d9b65d41d720593ca7097ef743",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.11",
            "size": 27111,
            "upload_time": "2025-08-06T15:31:06",
            "upload_time_iso_8601": "2025-08-06T15:31:06.789462Z",
            "url": "https://files.pythonhosted.org/packages/2f/58/922625bc3cc075dc0fe5f16d76fd0d66677e13d4ab632bf36d3ffc59d2f2/streamdaq-0.1.10.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-08-06 15:31:06",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "Bilpapster",
    "github_project": "stream-DaQ",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "requirements": [
        {
            "name": "pathway",
            "specs": []
        },
        {
            "name": "matplotlib",
            "specs": []
        },
        {
            "name": "datasketch",
            "specs": []
        },
        {
            "name": "datasketches",
            "specs": []
        }
    ],
    "lcname": "streamdaq"
}
        
Elapsed time: 1.49729s