MLPool


NameMLPool JSON
Version 0.1.0 PyPI version JSON
download
home_page
SummaryOffloading CPU intensive model scoring to a pool of workers
upload_time2022-12-08 05:23:01
maintainer
docs_urlNone
authorET
requires_python
license
keywords python machine learning multiprocessing
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            
### Use case / why


Say, we have a ML model, which is CPU hungry and takes time to score, which we want to 
serve through an API / Web Sockets while:

- keeping overall application complexity low (no dedicated service to serve ML models)
- minimising latency
- maximasing throughput (the number of requests a single Cloud Run instance / container can handle)
- providing the ability to avoid making blocking calls, which would block coroutines / web sockets

A typical simple API serving ML model(s) instantiates the models right in the main process, which
results in the models consuming the API process's RAM and CPU cycles for scoring. This directly 
affects the API performance in terms of latency and throughput. It would be helpful if we could move 
model scoring bit somewhere else away from the API process - other cores. This would let us:

- use CPU resources more efficiently (loading more cores instead of a single one provided that the container has 1+ cores at its disposal)
- avoid hurting API performance by moving model scoring that require memory and CPU away from the API process
- decrease latency
- increase throughput 

On top of that, the solution seems to integrate well with the Cloud Run autoscaling strategy. When 
the load is low, the workers serving the models are pretty much idle. As the load increases,
the workers get busy, which in turn increases overall Cloud Run instance CPU usage. Once
it reaches a certain threshold, Cloud Run will spin up another instance to handle the load.


---

### How to use / Examples

- Instantiating MLPool

In order to use MLPool, for every ML model a user wants to run on the pool, they need to provide 
a callable, which loads a model into memory and prepares it. Under the hood, MLPool will ensure 
every worker in the pool running in a dedicated process loads the model(s) in memory when it starts (done only once).

Consider these functions that load the text classification models as an example:

```python
def load_text_classifier(weights_path: str) -> TextClassifier:
    return TextClassifier(weights_path)
```

When instantiating MLPool, a user needs to pass the parameter models_to_load, which is a dictionary where the keys are
model names MLPool will serve, and the values are the callables loading the models (example function above).

In addition, the user can specify the number of workers they want, scoring results TTL, the number of jobs
that could be queued on the pool etc.

```python
if __name__ == "__main__":
    with MLPool(
        models_to_load={
            "text_classifier": partial(load_text_classifier, "text_classification.pt"),
            "any_other_model": load_any_other_model_callable
        },
        nb_workers=5,
    ) as pool:
        ...
```

! IMPORTANT, when instantiating MLPool do it under `if __name__ == "__main__":`

- Scoring on MLPool

MLPool gives user full control of what gets executed on the pool, so in order to score a model
on the pool, the user needs to provide a callable such as:

```python
def score_text_classifier(model: TextClassifier, text):
    return model.classify_text(text)
```
where the first parameter is the model and then anything else the user wants to pass

To schedule execution of the function above on the pool, the user can call `.create_job()` or asyncio friendly
`.create_job_async()` methods passing the:

- `score_model_function` - a function to run on the pool, which accepts a model as the first argument
- `model_name` - model to be passed to the function as the first argument, must be one of the functions provided when instantiating MLPool
- `args`, `kwargs` - any other arguments to pass to the function

```python
job_id = pool.create_job(
    score_model_function=score_text_classifier,
    model_name="text_classifier",
    kwargs={
        "text": "MEMPHIS, Tenn. – Four days ago, Jon Rahm was \
        enduring the season’s worst weather conditions on Sunday at The \
        Open on his way to a closing 75 at Royal Portrush, which \
        considering the wind and the rain was a respectable showing"
    }
)
print(pool.get_result(job_id))
```

To execution results back the user can call `get_result` or asyncio friendly `get_result_async` passing
id of the job.


- Cancelling

Once a job was created, it is possible to cancel model scoring if it hasn't run yet using `.cancel_job()`. Convenient, if, 
say, your web socket client disconnects

---

### End to end example:

Say, we have two models we want to serve.

```python
from functools import partial

from fastapi import FastAPI
import pydantic
import uvicorn

from ml_pool import MLPool
from ml_pool.logger import get_logger
from examples.models import HungryIris, HungryDiabetesClassifier


logger = get_logger("api")

app = FastAPI()


# --------------------- functions a user to provide --------------------------
def load_iris(model_path: str) -> HungryIris:
    # TODO: Loading and preparing ML model goes here

    return HungryIris(model_path)


def score_iris(model: HungryIris, features):
    # TODO: Feature engineering etc goes here

    return model.predict(features)


def load_diabetes_classifier(model_path: str) -> HungryDiabetesClassifier:
    # TODO: Loading and preparing ML model goes here

    return HungryDiabetesClassifier(model_path)


def score_diabetes_classifier(model: HungryDiabetesClassifier, features):
    # TODO: Feature engineering etc goes here

    return model.predict(features)


# ------------------------------- schemas ------------------------------------
class Request(pydantic.BaseModel):
    features: list[float]


class Response(pydantic.BaseModel):
    prediction: int


# ------------------------------- endpoints ----------------------------------
@app.post("/iris")
async def serve_iris(request: Request) -> Response:
    features = request.features
    job_id = await pool.create_job_async(
        score_model_function=score_iris,
        model_name="iris",
        kwargs={"features": features},
    )
    result = await pool.get_result_async(job_id)
    return Response(prediction=result)


@app.post("/diabetes")
async def serve_diabetes_classifier(request: Request) -> Response:
    features = request.features
    job_id = await pool.create_job_async(
        score_model_function=score_diabetes_classifier,
        model_name="diabetes_classifier",
        args=(features,),
    )
    result = await pool.get_result_async(job_id)
    return Response(prediction=result)


if __name__ == "__main__":
    with MLPool(
        models_to_load={
            "iris": partial(load_iris, "../models/iris_xgb.json"),
            "diabetes_classifier": partial(
                load_diabetes_classifier, "../models/diabetes_xgb.json"
            ),
        },
        nb_workers=5,
    ) as pool:
        uvicorn.run(app, workers=1)

```

---

### Gotchas:

- MLPool comes with significant overhead which has to do with IPC in Python (pickling/unpicking, etc), 
so if your model is light and takes very little time to run (~milliseconds), then introducing MLPool will
only slow things down. In this case it might make sense to score the model directly in the API process. 
If there is heavy feature engineering work associated with scoring the model, then it could 
make sense, it depends.

- ! It is common to spin up multiple instances of the API app inside a container using tools such as
gunicorn etc. Be careful when using MLPool in such configuration as you could overload CPU constantly triggering
Cloud Run to scale up lol. In our case each container runs only a single instance of the API app, spinning up more instances
within the same container won't help as the bottleneck is CPU hungry model scoring.

---

### Known issues:

- If a worker dies, but it was processing something, then the caller will infinitely wait for the result!

- Worker monitoring threads run too rarely, it is possible to create a new job after workers have already failed (the window is ~0.2 sec)

- DO NOT terminate workers, might corrupt the queue and shared dict (dies with locks acquired), 
try stopping gracefully, doesnt join within timeout, terminate

---

### TODO:

- Release as a package

- Test with your WS project

- Test with proper CPU hungry model

---

### Brainstorming (maybe TODO / extras):

- Check the size of user provided args, kwargs. If they are too large, instead of copying them, put them in a memory store (Apache Arrow, Manager.dict?)
and pass the object ID through the queue? The worker then needs to check if it gets the object or ID of the object.
Consider the MPIRE's approach to copy-on-write (https://github.com/Slimmer-AI/mpire) + excellent read by the author (https://towardsdatascience.com/mpire-for-python-multiprocessing-is-really-easy-d2ae7999a3e9)

- What if user provided callable relies on other objects/clients such as BigQuery client?

            

Raw data

            {
    "_id": null,
    "home_page": "",
    "name": "MLPool",
    "maintainer": "",
    "docs_url": null,
    "requires_python": "",
    "maintainer_email": "",
    "keywords": "python,machine learning,multiprocessing",
    "author": "ET",
    "author_email": "<titov.1994@list.ru>",
    "download_url": "https://files.pythonhosted.org/packages/7c/5c/89a352b2b9d9ec53146867a0ffcb751d76f37d4f6b09f8eb414f49269743/MLPool-0.1.0.tar.gz",
    "platform": null,
    "description": "\n### Use case / why\n\n\nSay, we have a ML model, which is CPU hungry and takes time to score, which we want to \nserve through an API / Web Sockets while:\n\n- keeping overall application complexity low (no dedicated service to serve ML models)\n- minimising latency\n- maximasing throughput (the number of requests a single Cloud Run instance / container can handle)\n- providing the ability to avoid making blocking calls, which would block coroutines / web sockets\n\nA typical simple API serving ML model(s) instantiates the models right in the main process, which\nresults in the models consuming the API process's RAM and CPU cycles for scoring. This directly \naffects the API performance in terms of latency and throughput. It would be helpful if we could move \nmodel scoring bit somewhere else away from the API process - other cores. This would let us:\n\n- use CPU resources more efficiently (loading more cores instead of a single one provided that the container has 1+ cores at its disposal)\n- avoid hurting API performance by moving model scoring that require memory and CPU away from the API process\n- decrease latency\n- increase throughput \n\nOn top of that, the solution seems to integrate well with the Cloud Run autoscaling strategy. When \nthe load is low, the workers serving the models are pretty much idle. As the load increases,\nthe workers get busy, which in turn increases overall Cloud Run instance CPU usage. Once\nit reaches a certain threshold, Cloud Run will spin up another instance to handle the load.\n\n\n---\n\n### How to use / Examples\n\n- Instantiating MLPool\n\nIn order to use MLPool, for every ML model a user wants to run on the pool, they need to provide \na callable, which loads a model into memory and prepares it. Under the hood, MLPool will ensure \nevery worker in the pool running in a dedicated process loads the model(s) in memory when it starts (done only once).\n\nConsider these functions that load the text classification models as an example:\n\n```python\ndef load_text_classifier(weights_path: str) -> TextClassifier:\n    return TextClassifier(weights_path)\n```\n\nWhen instantiating MLPool, a user needs to pass the parameter models_to_load, which is a dictionary where the keys are\nmodel names MLPool will serve, and the values are the callables loading the models (example function above).\n\nIn addition, the user can specify the number of workers they want, scoring results TTL, the number of jobs\nthat could be queued on the pool etc.\n\n```python\nif __name__ == \"__main__\":\n    with MLPool(\n        models_to_load={\n            \"text_classifier\": partial(load_text_classifier, \"text_classification.pt\"),\n            \"any_other_model\": load_any_other_model_callable\n        },\n        nb_workers=5,\n    ) as pool:\n        ...\n```\n\n! IMPORTANT, when instantiating MLPool do it under `if __name__ == \"__main__\":`\n\n- Scoring on MLPool\n\nMLPool gives user full control of what gets executed on the pool, so in order to score a model\non the pool, the user needs to provide a callable such as:\n\n```python\ndef score_text_classifier(model: TextClassifier, text):\n    return model.classify_text(text)\n```\nwhere the first parameter is the model and then anything else the user wants to pass\n\nTo schedule execution of the function above on the pool, the user can call `.create_job()` or asyncio friendly\n`.create_job_async()` methods passing the:\n\n- `score_model_function` - a function to run on the pool, which accepts a model as the first argument\n- `model_name` - model to be passed to the function as the first argument, must be one of the functions provided when instantiating MLPool\n- `args`, `kwargs` - any other arguments to pass to the function\n\n```python\njob_id = pool.create_job(\n    score_model_function=score_text_classifier,\n    model_name=\"text_classifier\",\n    kwargs={\n        \"text\": \"MEMPHIS, Tenn. \u2013 Four days ago, Jon Rahm was \\\n        enduring the season\u2019s worst weather conditions on Sunday at The \\\n        Open on his way to a closing 75 at Royal Portrush, which \\\n        considering the wind and the rain was a respectable showing\"\n    }\n)\nprint(pool.get_result(job_id))\n```\n\nTo execution results back the user can call `get_result` or asyncio friendly `get_result_async` passing\nid of the job.\n\n\n- Cancelling\n\nOnce a job was created, it is possible to cancel model scoring if it hasn't run yet using `.cancel_job()`. Convenient, if, \nsay, your web socket client disconnects\n\n---\n\n### End to end example:\n\nSay, we have two models we want to serve.\n\n```python\nfrom functools import partial\n\nfrom fastapi import FastAPI\nimport pydantic\nimport uvicorn\n\nfrom ml_pool import MLPool\nfrom ml_pool.logger import get_logger\nfrom examples.models import HungryIris, HungryDiabetesClassifier\n\n\nlogger = get_logger(\"api\")\n\napp = FastAPI()\n\n\n# --------------------- functions a user to provide --------------------------\ndef load_iris(model_path: str) -> HungryIris:\n    # TODO: Loading and preparing ML model goes here\n\n    return HungryIris(model_path)\n\n\ndef score_iris(model: HungryIris, features):\n    # TODO: Feature engineering etc goes here\n\n    return model.predict(features)\n\n\ndef load_diabetes_classifier(model_path: str) -> HungryDiabetesClassifier:\n    # TODO: Loading and preparing ML model goes here\n\n    return HungryDiabetesClassifier(model_path)\n\n\ndef score_diabetes_classifier(model: HungryDiabetesClassifier, features):\n    # TODO: Feature engineering etc goes here\n\n    return model.predict(features)\n\n\n# ------------------------------- schemas ------------------------------------\nclass Request(pydantic.BaseModel):\n    features: list[float]\n\n\nclass Response(pydantic.BaseModel):\n    prediction: int\n\n\n# ------------------------------- endpoints ----------------------------------\n@app.post(\"/iris\")\nasync def serve_iris(request: Request) -> Response:\n    features = request.features\n    job_id = await pool.create_job_async(\n        score_model_function=score_iris,\n        model_name=\"iris\",\n        kwargs={\"features\": features},\n    )\n    result = await pool.get_result_async(job_id)\n    return Response(prediction=result)\n\n\n@app.post(\"/diabetes\")\nasync def serve_diabetes_classifier(request: Request) -> Response:\n    features = request.features\n    job_id = await pool.create_job_async(\n        score_model_function=score_diabetes_classifier,\n        model_name=\"diabetes_classifier\",\n        args=(features,),\n    )\n    result = await pool.get_result_async(job_id)\n    return Response(prediction=result)\n\n\nif __name__ == \"__main__\":\n    with MLPool(\n        models_to_load={\n            \"iris\": partial(load_iris, \"../models/iris_xgb.json\"),\n            \"diabetes_classifier\": partial(\n                load_diabetes_classifier, \"../models/diabetes_xgb.json\"\n            ),\n        },\n        nb_workers=5,\n    ) as pool:\n        uvicorn.run(app, workers=1)\n\n```\n\n---\n\n### Gotchas:\n\n- MLPool comes with significant overhead which has to do with IPC in Python (pickling/unpicking, etc), \nso if your model is light and takes very little time to run (~milliseconds), then introducing MLPool will\nonly slow things down. In this case it might make sense to score the model directly in the API process. \nIf there is heavy feature engineering work associated with scoring the model, then it could \nmake sense, it depends.\n\n- ! It is common to spin up multiple instances of the API app inside a container using tools such as\ngunicorn etc. Be careful when using MLPool in such configuration as you could overload CPU constantly triggering\nCloud Run to scale up lol. In our case each container runs only a single instance of the API app, spinning up more instances\nwithin the same container won't help as the bottleneck is CPU hungry model scoring.\n\n---\n\n### Known issues:\n\n- If a worker dies, but it was processing something, then the caller will infinitely wait for the result!\n\n- Worker monitoring threads run too rarely, it is possible to create a new job after workers have already failed (the window is ~0.2 sec)\n\n- DO NOT terminate workers, might corrupt the queue and shared dict (dies with locks acquired), \ntry stopping gracefully, doesnt join within timeout, terminate\n\n---\n\n### TODO:\n\n- Release as a package\n\n- Test with your WS project\n\n- Test with proper CPU hungry model\n\n---\n\n### Brainstorming (maybe TODO / extras):\n\n- Check the size of user provided args, kwargs. If they are too large, instead of copying them, put them in a memory store (Apache Arrow, Manager.dict?)\nand pass the object ID through the queue? The worker then needs to check if it gets the object or ID of the object.\nConsider the MPIRE's approach to copy-on-write (https://github.com/Slimmer-AI/mpire) + excellent read by the author (https://towardsdatascience.com/mpire-for-python-multiprocessing-is-really-easy-d2ae7999a3e9)\n\n- What if user provided callable relies on other objects/clients such as BigQuery client?\n",
    "bugtrack_url": null,
    "license": "",
    "summary": "Offloading CPU intensive model scoring to a pool of workers",
    "version": "0.1.0",
    "split_keywords": [
        "python",
        "machine learning",
        "multiprocessing"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "md5": "ffc5c0d60d4c73320485430cff56133d",
                "sha256": "bc1475dd6c599e49d46e2cfa7fe47bf4aee82450b5738e9ed810abe0f32f2334"
            },
            "downloads": -1,
            "filename": "MLPool-0.1.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "ffc5c0d60d4c73320485430cff56133d",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": null,
            "size": 17284,
            "upload_time": "2022-12-08T05:22:59",
            "upload_time_iso_8601": "2022-12-08T05:22:59.039556Z",
            "url": "https://files.pythonhosted.org/packages/2b/9f/20bd3aa2623c9ec0065f3b2abf33a80d2e6ec8897bf26a223e06ff151a7f/MLPool-0.1.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "md5": "c8a1d5e3a5ccae139237fe3f78c21672",
                "sha256": "68d3af31d2d603f390de4016b3d6b615d4959c458872815e7fe352c1fb19403d"
            },
            "downloads": -1,
            "filename": "MLPool-0.1.0.tar.gz",
            "has_sig": false,
            "md5_digest": "c8a1d5e3a5ccae139237fe3f78c21672",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 14572,
            "upload_time": "2022-12-08T05:23:01",
            "upload_time_iso_8601": "2022-12-08T05:23:01.429036Z",
            "url": "https://files.pythonhosted.org/packages/7c/5c/89a352b2b9d9ec53146867a0ffcb751d76f37d4f6b09f8eb414f49269743/MLPool-0.1.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2022-12-08 05:23:01",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "lcname": "mlpool"
}
        
ET
Elapsed time: 0.11424s