aligned


Namealigned JSON
Version 0.0.93 PyPI version JSON
download
home_pagehttps://github.com/MatsMoll/aligned
SummaryA data managment and lineage tool for ML applications.
upload_time2024-04-16 19:42:08
maintainerNone
docs_urlNone
authorMats E. Mollestad
requires_python<4.0,>=3.10
licenseApache-2.0
keywords python typed ml prediction feature store feature-store feast tecton dbt data lineage
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Aligned

A data managment tool for ML applications.

Similar to how DBT is a data managment tool for business analytics, will Aligned manage ML projects.

Aligned does this through two things.
1. A light weight data managment system. Making it possible to query a data lake and databases.
2. Tooling to define a `model_contract`. Clearing up common unanswerd questions through code.


Furthermore, Aligned collect data lineage between models, basic feature transformations. While also making it easy to reduce data leakage with point-in-time valid data and fix other problems described in [Sculley et al. [2015]](https://papers.nips.cc/paper/2015/file/86df7dcfd896fcaf2674f757a2463eba-Paper.pdf).

## Examples

Bellow are some examples of how Aligned can be used.

### Aligned UI

Aligned provides an UI to view which data exists, the expectations we have and find faults.

[View the example UI](https://aligned-catalog.azurewebsites.net/).
However, this is still under development, so sign up for a [wait list](https://aligned-managed-web.vercel.app/) to get access.


### Example Repo

Want to look at examples of how to use `aligned`?
View the [`MatsMoll/aligned-example` repo](https://github.com/MatsMoll/aligned-example).

Or see how you could query a file in a data lake.

```python
store = await ContractStore.from_dir(".")
df = await store.execute_sql("SELECT * FROM titanic LIMIT 10").to_polars()
```

## Docs

Check out the [Aligned Docs](https://www.aligned.codes), but keep in mind that they are still work in progress.

---

### Available Features

Bellow are some of the features Aligned offers:

- [Data Catalog](https://aligned-managed-web.vercel.app/)
- [Data Lineage](https://aligned-managed-web.vercel.app/)
- [Model Performance Monitoring](https://aligned-managed-web.vercel.app/)
- [Data Freshness](#data-freshness)
- [Data Quality Assurance](#data-quality)
- [Feature Store](https://matsmoll.github.io/posts/understanding-the-chaotic-landscape-of-mlops#feature-store)
- [Exposing Models](#exposed-models)


All from the simple API of defining
- [Data Sources](#data-sources)
- [Feature Views](#feature-views)
- [Models](#describe-models)

As a result, loading model features is as easy as:

```python
entities = {"passenger_id": [1, 2, 3, 4]}
await store.model("titanic").features_for(entities).to_pandas()
```

Aligned is still in active development, so changes are likely.

## Model Contract

Aligned introduces a new concept called the "model contract", which tries to answer the following questions.

- What is predicted?
- What is assosiated with a prediction? - A user id?
- Where do we store predictions?
- Do a model depend on other models?
- Is the model exposed through an API?
- What needs to be sent in, to use the model?
- Is it classification, regression, gen ai?
- Where is the ground truth stored? - if any
- Who owns the model?
- Where do we store data sets?

All this is described through a `model_contract`, as shown bellow.

```python
@model_contract(
    name="eta_taxi",
    input_features=[
        trips.eucledian_distance,
        trips.number_of_passengers,
        traffic.expected_delay
    ],
    output_source=FileSource.delta_at("titanic_model/predictions")
)
class EtaTaxi:
    trip_id = Int32().as_entity()
    predicted_at = EventTimestamp()
    predicted_duration = trips.duration.as_regression_target()
```

## Data Sources

Alinged makes handling data sources easy, as you do not have to think about how it is done.

Furthermore, Aligned makes it easy to switch parts of the business logic to a local setup for debugging purposes.

```python
from aligned import FileSource, AwsS3Config, AzureBlobConfig

dir_type: Literal["local", "aws", "azure"] = ...

if dir_type == "aws":
    aws_config = AwsS3Config(...)
    root_directory = aws_config.directory("my-awesome-project")

elif dir_type == "azure":
    azure_config = AzureBlobConfig(...)
    root_directory = azure_config.directory("my-awesome-project")
else:
    root_directory = FileSource.directory("my-awesome-project")


taxi_project = root_directory.sub_directory("eta_taxi")

csv_source = taxi_project.csv_at("predictions.csv")
parquet_source = taxi_project.parquet_at("predictions.parquet")
delta_source = taxi_project.delta_at("predictions")
```

### Date Formatting
Managing a data lake can be hard. However, a common problem when using file formats can be managing date formats. As a result do Aligned provide a way to standardise this, so you can focus on what matters.

```python
from aligned import FileSource
from aligned.schemas.date_formatter import DateFormatter

iso_formatter = DateFormatter.iso_8601()
unix_formatter = DateFormatter.unix_timestamp(time_unit="us", time_zone="UTC")
custom_strtime_formatter = DateFormatter.string_format("%Y/%m/%d %H:%M:%S")

FileSource.csv_at("my/file.csv", date_formatter=unix_formatter)
```

## Feature Views

Aligned also makes it possible to define data and features through `feature_view`s.
Then get code completion and typesafety by referencing them in other features.

This makes the features light weight, data source independent, and flexible.

```python
@feature_view(
    name="passenger",
    description="Some features from the titanic dataset",
    source=FileSource.csv_at("titanic.csv"),
    materialized_source=FileSource.parquet_at("titanic.parquet"),
)
class TitanicPassenger:

    passenger_id = Int32().as_entity()

    age = (
        Float()
            .description("A float as some have decimals")
            .lower_bound(0)
            .upper_bound(110)
    )

    name = String()
    sex = String().accepted_values(["male", "female"])
    did_survive = Bool().description("If the passenger survived")
    sibsp = Int32().lower_bound(0).description("Number of siblings on titanic")
    cabin = String().is_optional()

    # Creates two one hot encoded values
    is_male, is_female = sex.one_hot_encode(['male', 'female'])
```

### Exposed models

Aligned mainly focuses on defining the expected input and output of different models. However, this in itself makes it hard to use the models. This is why Aligned makes it possible to define how our ML models are exposed by setting an `exposed_model` attribute.


```python
from aligned.exposed_model.mlflow import mlflow_server

@model_contract(
    name="eta_taxi",
    exposed_model=mlflow_server(
        host="http://localhost:8000",
    ),
    ...
)
class EtaTaxi:
    trip_id = Int32().as_entity()
    predicted_at = EventTimestamp()
    predicted_duration = trips.duration.as_regression_target()
```

This also makes it possible to get predictions with the following command:

```python
await store.model("eta_taxi").predict_over({
    "trip_id": [...]
}).to_polars()
```

Or store them directly in the `output_source` with something like:

```python
await store.model("eta_taxi").predict_over({
    "trip_id": [...]
}).upsert_into_output_source()
```

Some of the existing implementations are:
- MLFlow Server
- Run MLFLow model in memory
- Ollama completion endpoint
- Ollama embedded endpoint
- Send entities to generic endpoint

## Data Freshness
Making sure a source contains fresh data is a crucial part to create propper ML applications.
Therefore, Aligned provides an easy way to check how fresh a source is.

```python
@feature_view(
    name="departures",
    description="Features related to the departure of a taxi ride",
    source=taxi_db.table("departures"),
)
class TaxiDepartures:

    trip_id = UUID().as_entity()

    pickuped_at = EventTimestamp()

    number_of_passengers = Int32()

    dropoff_latitude = Float().is_required()
    dropoff_longitude = Float().is_required()

    pickup_latitude = Float().is_required()
    pickup_longitude = Float().is_required()


freshness = await TaxiDepartures.freshness_in_batch_source()

if freshness < datetime.now() - timedelta(days=2):
    raise ValueError("To old data to create an ML model")
```

## Data quality
Alinged will make sure all the different features gets formatted as the correct datatype.
In addition will aligned also make sure that the returend features aligne with defined constraints.

```python
@feature_view(...)
class TitanicPassenger:

    ...

    age = (
        Float()
            .is_required()
            .lower_bound(0)
            .upper_bound(110)
    )
    sibsp = Int32().lower_bound(0, is_inclusive=True)
```

Then since our feature view have a `is_required` and a `lower_bound`, will the `.validate(...)` command filter out the entites that do not follow that behavior.

```python
from aligned.validation.pandera import PanderaValidator

df = await store.model("titanic_model").features_for({
    "passenger_id": [1, 50, 110]
}).validate(
    PanderaValidator()  # Validates all features
).to_pandas()
```

## Contract Store

Aligned collects all the feature views and model contracts in a contract store. You can generate this in a few different ways, and each method serves some different use-cases.

For experimentational use-cases will the `await ContractStore.from_dir(".")` probably make the most sense. However, this will scan the full directory which can lead to slow startup times.

Therefore, it is also possible to manually add the different feature views and contracts with the following.

```python
store = ContractStore.empty()
store.add_feature_view(MyView)
store.add_model(MyModel)
```

This makes it possible to define different contracts per project, or team. As a result, you can also combine differnet stores with.

```python
combined_store = recommendation_store.combined_with(forecasting_store)
```

Lastly, we can also load the all features from a serializable format, such as a JSON file.

```python
await FileSource.json_at("contracts.json").as_contract_store()
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/MatsMoll/aligned",
    "name": "aligned",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.10",
    "maintainer_email": null,
    "keywords": "python, typed, ml, prediction, feature, store, feature-store, feast, tecton, dbt, data, lineage",
    "author": "Mats E. Mollestad",
    "author_email": "mats@mollestad.no",
    "download_url": "https://files.pythonhosted.org/packages/09/ea/83cfc120b6682c43a367b866a07b838d028bd13f7de54ce45848c90cbdaa/aligned-0.0.93.tar.gz",
    "platform": null,
    "description": "# Aligned\n\nA data managment tool for ML applications.\n\nSimilar to how DBT is a data managment tool for business analytics, will Aligned manage ML projects.\n\nAligned does this through two things.\n1. A light weight data managment system. Making it possible to query a data lake and databases.\n2. Tooling to define a `model_contract`. Clearing up common unanswerd questions through code.\n\n\nFurthermore, Aligned collect data lineage between models, basic feature transformations. While also making it easy to reduce data leakage with point-in-time valid data and fix other problems described in [Sculley et al. [2015]](https://papers.nips.cc/paper/2015/file/86df7dcfd896fcaf2674f757a2463eba-Paper.pdf).\n\n## Examples\n\nBellow are some examples of how Aligned can be used.\n\n### Aligned UI\n\nAligned provides an UI to view which data exists, the expectations we have and find faults.\n\n[View the example UI](https://aligned-catalog.azurewebsites.net/).\nHowever, this is still under development, so sign up for a [wait list](https://aligned-managed-web.vercel.app/) to get access.\n\n\n### Example Repo\n\nWant to look at examples of how to use `aligned`?\nView the [`MatsMoll/aligned-example` repo](https://github.com/MatsMoll/aligned-example).\n\nOr see how you could query a file in a data lake.\n\n```python\nstore = await ContractStore.from_dir(\".\")\ndf = await store.execute_sql(\"SELECT * FROM titanic LIMIT 10\").to_polars()\n```\n\n## Docs\n\nCheck out the [Aligned Docs](https://www.aligned.codes), but keep in mind that they are still work in progress.\n\n---\n\n### Available Features\n\nBellow are some of the features Aligned offers:\n\n- [Data Catalog](https://aligned-managed-web.vercel.app/)\n- [Data Lineage](https://aligned-managed-web.vercel.app/)\n- [Model Performance Monitoring](https://aligned-managed-web.vercel.app/)\n- [Data Freshness](#data-freshness)\n- [Data Quality Assurance](#data-quality)\n- [Feature Store](https://matsmoll.github.io/posts/understanding-the-chaotic-landscape-of-mlops#feature-store)\n- [Exposing Models](#exposed-models)\n\n\nAll from the simple API of defining\n- [Data Sources](#data-sources)\n- [Feature Views](#feature-views)\n- [Models](#describe-models)\n\nAs a result, loading model features is as easy as:\n\n```python\nentities = {\"passenger_id\": [1, 2, 3, 4]}\nawait store.model(\"titanic\").features_for(entities).to_pandas()\n```\n\nAligned is still in active development, so changes are likely.\n\n## Model Contract\n\nAligned introduces a new concept called the \"model contract\", which tries to answer the following questions.\n\n- What is predicted?\n- What is assosiated with a prediction? - A user id?\n- Where do we store predictions?\n- Do a model depend on other models?\n- Is the model exposed through an API?\n- What needs to be sent in, to use the model?\n- Is it classification, regression, gen ai?\n- Where is the ground truth stored? - if any\n- Who owns the model?\n- Where do we store data sets?\n\nAll this is described through a `model_contract`, as shown bellow.\n\n```python\n@model_contract(\n    name=\"eta_taxi\",\n    input_features=[\n        trips.eucledian_distance,\n        trips.number_of_passengers,\n        traffic.expected_delay\n    ],\n    output_source=FileSource.delta_at(\"titanic_model/predictions\")\n)\nclass EtaTaxi:\n    trip_id = Int32().as_entity()\n    predicted_at = EventTimestamp()\n    predicted_duration = trips.duration.as_regression_target()\n```\n\n## Data Sources\n\nAlinged makes handling data sources easy, as you do not have to think about how it is done.\n\nFurthermore, Aligned makes it easy to switch parts of the business logic to a local setup for debugging purposes.\n\n```python\nfrom aligned import FileSource, AwsS3Config, AzureBlobConfig\n\ndir_type: Literal[\"local\", \"aws\", \"azure\"] = ...\n\nif dir_type == \"aws\":\n    aws_config = AwsS3Config(...)\n    root_directory = aws_config.directory(\"my-awesome-project\")\n\nelif dir_type == \"azure\":\n    azure_config = AzureBlobConfig(...)\n    root_directory = azure_config.directory(\"my-awesome-project\")\nelse:\n    root_directory = FileSource.directory(\"my-awesome-project\")\n\n\ntaxi_project = root_directory.sub_directory(\"eta_taxi\")\n\ncsv_source = taxi_project.csv_at(\"predictions.csv\")\nparquet_source = taxi_project.parquet_at(\"predictions.parquet\")\ndelta_source = taxi_project.delta_at(\"predictions\")\n```\n\n### Date Formatting\nManaging a data lake can be hard. However, a common problem when using file formats can be managing date formats. As a result do Aligned provide a way to standardise this, so you can focus on what matters.\n\n```python\nfrom aligned import FileSource\nfrom aligned.schemas.date_formatter import DateFormatter\n\niso_formatter = DateFormatter.iso_8601()\nunix_formatter = DateFormatter.unix_timestamp(time_unit=\"us\", time_zone=\"UTC\")\ncustom_strtime_formatter = DateFormatter.string_format(\"%Y/%m/%d %H:%M:%S\")\n\nFileSource.csv_at(\"my/file.csv\", date_formatter=unix_formatter)\n```\n\n## Feature Views\n\nAligned also makes it possible to define data and features through `feature_view`s.\nThen get code completion and typesafety by referencing them in other features.\n\nThis makes the features light weight, data source independent, and flexible.\n\n```python\n@feature_view(\n    name=\"passenger\",\n    description=\"Some features from the titanic dataset\",\n    source=FileSource.csv_at(\"titanic.csv\"),\n    materialized_source=FileSource.parquet_at(\"titanic.parquet\"),\n)\nclass TitanicPassenger:\n\n    passenger_id = Int32().as_entity()\n\n    age = (\n        Float()\n            .description(\"A float as some have decimals\")\n            .lower_bound(0)\n            .upper_bound(110)\n    )\n\n    name = String()\n    sex = String().accepted_values([\"male\", \"female\"])\n    did_survive = Bool().description(\"If the passenger survived\")\n    sibsp = Int32().lower_bound(0).description(\"Number of siblings on titanic\")\n    cabin = String().is_optional()\n\n    # Creates two one hot encoded values\n    is_male, is_female = sex.one_hot_encode(['male', 'female'])\n```\n\n### Exposed models\n\nAligned mainly focuses on defining the expected input and output of different models. However, this in itself makes it hard to use the models. This is why Aligned makes it possible to define how our ML models are exposed by setting an `exposed_model` attribute.\n\n\n```python\nfrom aligned.exposed_model.mlflow import mlflow_server\n\n@model_contract(\n    name=\"eta_taxi\",\n    exposed_model=mlflow_server(\n        host=\"http://localhost:8000\",\n    ),\n    ...\n)\nclass EtaTaxi:\n    trip_id = Int32().as_entity()\n    predicted_at = EventTimestamp()\n    predicted_duration = trips.duration.as_regression_target()\n```\n\nThis also makes it possible to get predictions with the following command:\n\n```python\nawait store.model(\"eta_taxi\").predict_over({\n    \"trip_id\": [...]\n}).to_polars()\n```\n\nOr store them directly in the `output_source` with something like:\n\n```python\nawait store.model(\"eta_taxi\").predict_over({\n    \"trip_id\": [...]\n}).upsert_into_output_source()\n```\n\nSome of the existing implementations are:\n- MLFlow Server\n- Run MLFLow model in memory\n- Ollama completion endpoint\n- Ollama embedded endpoint\n- Send entities to generic endpoint\n\n## Data Freshness\nMaking sure a source contains fresh data is a crucial part to create propper ML applications.\nTherefore, Aligned provides an easy way to check how fresh a source is.\n\n```python\n@feature_view(\n    name=\"departures\",\n    description=\"Features related to the departure of a taxi ride\",\n    source=taxi_db.table(\"departures\"),\n)\nclass TaxiDepartures:\n\n    trip_id = UUID().as_entity()\n\n    pickuped_at = EventTimestamp()\n\n    number_of_passengers = Int32()\n\n    dropoff_latitude = Float().is_required()\n    dropoff_longitude = Float().is_required()\n\n    pickup_latitude = Float().is_required()\n    pickup_longitude = Float().is_required()\n\n\nfreshness = await TaxiDepartures.freshness_in_batch_source()\n\nif freshness < datetime.now() - timedelta(days=2):\n    raise ValueError(\"To old data to create an ML model\")\n```\n\n## Data quality\nAlinged will make sure all the different features gets formatted as the correct datatype.\nIn addition will aligned also make sure that the returend features aligne with defined constraints.\n\n```python\n@feature_view(...)\nclass TitanicPassenger:\n\n    ...\n\n    age = (\n        Float()\n            .is_required()\n            .lower_bound(0)\n            .upper_bound(110)\n    )\n    sibsp = Int32().lower_bound(0, is_inclusive=True)\n```\n\nThen since our feature view have a `is_required` and a `lower_bound`, will the `.validate(...)` command filter out the entites that do not follow that behavior.\n\n```python\nfrom aligned.validation.pandera import PanderaValidator\n\ndf = await store.model(\"titanic_model\").features_for({\n    \"passenger_id\": [1, 50, 110]\n}).validate(\n    PanderaValidator()  # Validates all features\n).to_pandas()\n```\n\n## Contract Store\n\nAligned collects all the feature views and model contracts in a contract store. You can generate this in a few different ways, and each method serves some different use-cases.\n\nFor experimentational use-cases will the `await ContractStore.from_dir(\".\")` probably make the most sense. However, this will scan the full directory which can lead to slow startup times.\n\nTherefore, it is also possible to manually add the different feature views and contracts with the following.\n\n```python\nstore = ContractStore.empty()\nstore.add_feature_view(MyView)\nstore.add_model(MyModel)\n```\n\nThis makes it possible to define different contracts per project, or team. As a result, you can also combine differnet stores with.\n\n```python\ncombined_store = recommendation_store.combined_with(forecasting_store)\n```\n\nLastly, we can also load the all features from a serializable format, such as a JSON file.\n\n```python\nawait FileSource.json_at(\"contracts.json\").as_contract_store()\n```\n",
    "bugtrack_url": null,
    "license": "Apache-2.0",
    "summary": "A data managment and lineage tool for ML applications.",
    "version": "0.0.93",
    "project_urls": {
        "Homepage": "https://github.com/MatsMoll/aligned",
        "Repository": "https://github.com/MatsMoll/aligned"
    },
    "split_keywords": [
        "python",
        " typed",
        " ml",
        " prediction",
        " feature",
        " store",
        " feature-store",
        " feast",
        " tecton",
        " dbt",
        " data",
        " lineage"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "4619cd4cde41ec0d4af90a96099885927efff0501ca08f7ee9cf3fe27dbd9bdc",
                "md5": "32f6529d60aa1d0a1e23dbd31a58e2b0",
                "sha256": "08bec273855dfdbd9c3b7575b8d5267fcb11c6348bb0399024d4ae3d83eade38"
            },
            "downloads": -1,
            "filename": "aligned-0.0.93-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "32f6529d60aa1d0a1e23dbd31a58e2b0",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.10",
            "size": 199793,
            "upload_time": "2024-04-16T19:42:05",
            "upload_time_iso_8601": "2024-04-16T19:42:05.342757Z",
            "url": "https://files.pythonhosted.org/packages/46/19/cd4cde41ec0d4af90a96099885927efff0501ca08f7ee9cf3fe27dbd9bdc/aligned-0.0.93-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "09ea83cfc120b6682c43a367b866a07b838d028bd13f7de54ce45848c90cbdaa",
                "md5": "4880f12c4edc67e18906815b539ecb3f",
                "sha256": "b8294c2031c19c418cc248a3a5a24ff8659aaf7bfc51ef86e293745da0a2c908"
            },
            "downloads": -1,
            "filename": "aligned-0.0.93.tar.gz",
            "has_sig": false,
            "md5_digest": "4880f12c4edc67e18906815b539ecb3f",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.10",
            "size": 159006,
            "upload_time": "2024-04-16T19:42:08",
            "upload_time_iso_8601": "2024-04-16T19:42:08.115051Z",
            "url": "https://files.pythonhosted.org/packages/09/ea/83cfc120b6682c43a367b866a07b838d028bd13f7de54ce45848c90cbdaa/aligned-0.0.93.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-04-16 19:42:08",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "MatsMoll",
    "github_project": "aligned",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "aligned"
}
        
Elapsed time: 0.22407s