BluePyParallel


NameBluePyParallel JSON
Version 0.2.2 PyPI version JSON
download
home_page
SummaryProvides an embarrassingly parallel tool with sql backend.
upload_time2024-03-13 09:53:49
maintainer
docs_urlNone
authorBlue Brain Project, EPFL
requires_python>=3.8
licenseApache License 2.0
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage
            # BluePyParallel: Bluebrain Python Embarrassingly Parallel library

Provides an embarrassingly parallel tool with sql backend.

## Introduction

Provides an embarrassingly parallel tool with sql backend, inspired by [BluePyMM](https://github.com/BlueBrain/BluePyMM) of @wvangeit.


## Installation

This package should be installed using pip:

```bash
pip install bluepyparallel
```


## Usage

### General computation

```python

factory_name = "multiprocessing"  # Can also be None, dask or ipyparallel
batch_size = 10  # This value is used to split the data into batches before processing them
chunk_size = 1000  # This value is used to gather the elements to process before sending them to the workers

# Setup the parallel factory
parallel_factory = init_parallel_factory(
    factory_name,
    batch_size=batch_size,
    chunk_size=chunk_size,
    processes=4,  # This parameter is specific to the multiprocessing factory
)

# Get the mapper from the factory
mapper = parallel_factory.get_mapper()

# Use the mapper to map the given function to each element of mapped_data and gather the results
result = sorted(mapper(function, mapped_data, *function_args, **function_kwargs))
```

### Working with Pandas

This library provides a specific function working with large :class:`pandas.DataFrame`: :func:`bluepyparallel.evaluator.evaluate`.
This function converts the DataFrame into a list of dict (one for each row), then maps a given function to element and finally gathers the results.

Example:

```python
input_df = pd.DataFrame(index=[1, 2], columns=['data'], data=[100, 200])

def evaluation_function(row):
    result_1, result_2 = compute_something(row['data'])
    return {'new_column_1': result_1, 'new_columns_2': result_2}

# Use the mapper to map the given function to each element of the DataFrame
result_df = evaluate(
    input_df,  # This is the DataFrame to process
    evaluation_function,  # This is the function that should be applied to each row of the DataFrame
    parallel_factory="multiprocessing",  # This could also be a Factory previously defined
    new_columns=[['new_column_1', 0], ['new_columns_2', None]],  # this defines default values for columns
)
assert result_df.columns == ['data', 'new_columns_1', 'new_columns_2']
```
It is in a way  a generalisation of the pandas `.apply` method.


### Working with an SQL backend

As it aims at working with time consuming functions, it also provides a checkpoint and resume mechanism using a SQL backend.
The SQL backend uses the [SQLAlchemy](https://docs.sqlalchemy.org) library, so it can work with a large variety of database types (like SQLite, PostgreSQL, MySQL, ...).
To activate this feature, just pass a [URL that can be processed by SQLAlchemy](https://docs.sqlalchemy.org/en/latest/core/engines.html?highlight=url#database-urls)  to the ``db_url`` parameter of :func:`bluepyparallel.evaluator.evaluate`.

.. note:: A specific driver might have to be installed to access the database (like `psycopg2 <https://www.psycopg.org/docs/>`_ for PostgreSQL for example).

Example:

```python
# Use the mapper to map the given function to each element of the DataFrame
result_df = evaluate(
    input_df,  # This is the DataFrame to process
    evaluation_function,  # This is the function that should be applied to each row of the DataFrame
    parallel_factory="multiprocessing",  # This could also be a Factory previously defined
    db_url="sqlite:///db.sql",  # This could also just be "db.sql" and would be automatically turned to SQLite URL
)
```

Now, if the computation crashed for any reason, the partial result is stored in the ``db.sql`` file.
If the crash was due to an external cause (therefore executing the code again should work), it is possible to resume the
computation from the last computed element. Thus, only the missing elements are computed, which can save a lot of time.


## Running with distributed Dask MPI on HPC systems

This is an example of a [sbatch](https://slurm.schedmd.com/sbatch.html) script that can be
adapted to execute the script using multiple nodes and workers with distributed dask and MPI.
In this example, the code called by the ``run.py`` should be parallelized using BluePyParallel.

Dask variables are not strictly required, but highly recommended, and they can be fine tuned.


```bash
#!/bin/bash -l

# Dask configuration
export DASK_DISTRIBUTED__LOGGING__DISTRIBUTED="info"
export DASK_DISTRIBUTED__WORKER__USE_FILE_LOCKING=False
export DASK_DISTRIBUTED__WORKER__MEMORY__TARGET=False  # don't spill to disk
export DASK_DISTRIBUTED__WORKER__MEMORY__SPILL=False  # don't spill to disk
export DASK_DISTRIBUTED__WORKER__MEMORY__PAUSE=0.80  # pause execution at 80% memory use
export DASK_DISTRIBUTED__WORKER__MEMORY__TERMINATE=0.95  # restart the worker at 95% use
export DASK_DISTRIBUTED__WORKER__MULTIPROCESSING_METHOD=spawn
export DASK_DISTRIBUTED__WORKER__DAEMON=True
# Reduce dask profile memory usage/leak (see https://github.com/dask/distributed/issues/4091)
export DASK_DISTRIBUTED__WORKER__PROFILE__INTERVAL=10000ms  # Time between statistical profiling queries
export DASK_DISTRIBUTED__WORKER__PROFILE__CYCLE=1000000ms  # Time between starting new profile

# Split tasks to avoid some dask errors (e.g. Event loop was unresponsive in Worker)
export PARALLEL_BATCH_SIZE=1000

srun -v run.py
```

To ensure only the `evaluate` function is run with parallel dask, one has to initialise the parallel factory
before anything else is done in the code. For example, ``run.py`` could look like:

```python
if __name__ == "__main__":
    parallel_factory = init_parallel_factory('dask_dataframe')
    df = pd.read_csv("inuput_data.csv")
    df = some_preprocessing(df)
    df = evaluate(df, function_to_evaluate, parallel_factory=parallel_factory)
    df.to_csv("output_data.csv")
```

This is because everything before `init_parallel_factory` will be run in parallel, as mpi is not initialized yet.

.. note:: We recommend to use `dask_dataframe` instead of `dask`, as it is in practice more stable for large computations.

## Funding & Acknowledgment

The development of this software was supported by funding to the Blue Brain Project, a research
center of the École polytechnique fédérale de Lausanne (EPFL), from the Swiss government’s ETH
Board of the Swiss Federal Institutes of Technology.

For license and authors, see `LICENSE.txt` and `AUTHORS.md` respectively.

Copyright © 2023-2024 Blue Brain Project/EPFL

            

Raw data

            {
    "_id": null,
    "home_page": "",
    "name": "BluePyParallel",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": "",
    "keywords": "",
    "author": "Blue Brain Project, EPFL",
    "author_email": "",
    "download_url": "https://files.pythonhosted.org/packages/cb/90/ee5d6a8ffadb0d242bee7d0e173a13706bffc215de140fb49bda5fd8cb6a/BluePyParallel-0.2.2.tar.gz",
    "platform": null,
    "description": "# BluePyParallel: Bluebrain Python Embarrassingly Parallel library\n\nProvides an embarrassingly parallel tool with sql backend.\n\n## Introduction\n\nProvides an embarrassingly parallel tool with sql backend, inspired by [BluePyMM](https://github.com/BlueBrain/BluePyMM) of @wvangeit.\n\n\n## Installation\n\nThis package should be installed using pip:\n\n```bash\npip install bluepyparallel\n```\n\n\n## Usage\n\n### General computation\n\n```python\n\nfactory_name = \"multiprocessing\"  # Can also be None, dask or ipyparallel\nbatch_size = 10  # This value is used to split the data into batches before processing them\nchunk_size = 1000  # This value is used to gather the elements to process before sending them to the workers\n\n# Setup the parallel factory\nparallel_factory = init_parallel_factory(\n    factory_name,\n    batch_size=batch_size,\n    chunk_size=chunk_size,\n    processes=4,  # This parameter is specific to the multiprocessing factory\n)\n\n# Get the mapper from the factory\nmapper = parallel_factory.get_mapper()\n\n# Use the mapper to map the given function to each element of mapped_data and gather the results\nresult = sorted(mapper(function, mapped_data, *function_args, **function_kwargs))\n```\n\n### Working with Pandas\n\nThis library provides a specific function working with large :class:`pandas.DataFrame`: :func:`bluepyparallel.evaluator.evaluate`.\nThis function converts the DataFrame into a list of dict (one for each row), then maps a given function to element and finally gathers the results.\n\nExample:\n\n```python\ninput_df = pd.DataFrame(index=[1, 2], columns=['data'], data=[100, 200])\n\ndef evaluation_function(row):\n    result_1, result_2 = compute_something(row['data'])\n    return {'new_column_1': result_1, 'new_columns_2': result_2}\n\n# Use the mapper to map the given function to each element of the DataFrame\nresult_df = evaluate(\n    input_df,  # This is the DataFrame to process\n    evaluation_function,  # This is the function that should be applied to each row of the DataFrame\n    parallel_factory=\"multiprocessing\",  # This could also be a Factory previously defined\n    new_columns=[['new_column_1', 0], ['new_columns_2', None]],  # this defines default values for columns\n)\nassert result_df.columns == ['data', 'new_columns_1', 'new_columns_2']\n```\nIt is in a way  a generalisation of the pandas `.apply` method.\n\n\n### Working with an SQL backend\n\nAs it aims at working with time consuming functions, it also provides a checkpoint and resume mechanism using a SQL backend.\nThe SQL backend uses the [SQLAlchemy](https://docs.sqlalchemy.org) library, so it can work with a large variety of database types (like SQLite, PostgreSQL, MySQL, ...).\nTo activate this feature, just pass a [URL that can be processed by SQLAlchemy](https://docs.sqlalchemy.org/en/latest/core/engines.html?highlight=url#database-urls)  to the ``db_url`` parameter of :func:`bluepyparallel.evaluator.evaluate`.\n\n.. note:: A specific driver might have to be installed to access the database (like `psycopg2 <https://www.psycopg.org/docs/>`_ for PostgreSQL for example).\n\nExample:\n\n```python\n# Use the mapper to map the given function to each element of the DataFrame\nresult_df = evaluate(\n    input_df,  # This is the DataFrame to process\n    evaluation_function,  # This is the function that should be applied to each row of the DataFrame\n    parallel_factory=\"multiprocessing\",  # This could also be a Factory previously defined\n    db_url=\"sqlite:///db.sql\",  # This could also just be \"db.sql\" and would be automatically turned to SQLite URL\n)\n```\n\nNow, if the computation crashed for any reason, the partial result is stored in the ``db.sql`` file.\nIf the crash was due to an external cause (therefore executing the code again should work), it is possible to resume the\ncomputation from the last computed element. Thus, only the missing elements are computed, which can save a lot of time.\n\n\n## Running with distributed Dask MPI on HPC systems\n\nThis is an example of a [sbatch](https://slurm.schedmd.com/sbatch.html) script that can be\nadapted to execute the script using multiple nodes and workers with distributed dask and MPI.\nIn this example, the code called by the ``run.py`` should be parallelized using BluePyParallel.\n\nDask variables are not strictly required, but highly recommended, and they can be fine tuned.\n\n\n```bash\n#!/bin/bash -l\n\n# Dask configuration\nexport DASK_DISTRIBUTED__LOGGING__DISTRIBUTED=\"info\"\nexport DASK_DISTRIBUTED__WORKER__USE_FILE_LOCKING=False\nexport DASK_DISTRIBUTED__WORKER__MEMORY__TARGET=False  # don't spill to disk\nexport DASK_DISTRIBUTED__WORKER__MEMORY__SPILL=False  # don't spill to disk\nexport DASK_DISTRIBUTED__WORKER__MEMORY__PAUSE=0.80  # pause execution at 80% memory use\nexport DASK_DISTRIBUTED__WORKER__MEMORY__TERMINATE=0.95  # restart the worker at 95% use\nexport DASK_DISTRIBUTED__WORKER__MULTIPROCESSING_METHOD=spawn\nexport DASK_DISTRIBUTED__WORKER__DAEMON=True\n# Reduce dask profile memory usage/leak (see https://github.com/dask/distributed/issues/4091)\nexport DASK_DISTRIBUTED__WORKER__PROFILE__INTERVAL=10000ms  # Time between statistical profiling queries\nexport DASK_DISTRIBUTED__WORKER__PROFILE__CYCLE=1000000ms  # Time between starting new profile\n\n# Split tasks to avoid some dask errors (e.g. Event loop was unresponsive in Worker)\nexport PARALLEL_BATCH_SIZE=1000\n\nsrun -v run.py\n```\n\nTo ensure only the `evaluate` function is run with parallel dask, one has to initialise the parallel factory\nbefore anything else is done in the code. For example, ``run.py`` could look like:\n\n```python\nif __name__ == \"__main__\":\n    parallel_factory = init_parallel_factory('dask_dataframe')\n    df = pd.read_csv(\"inuput_data.csv\")\n    df = some_preprocessing(df)\n    df = evaluate(df, function_to_evaluate, parallel_factory=parallel_factory)\n    df.to_csv(\"output_data.csv\")\n```\n\nThis is because everything before `init_parallel_factory` will be run in parallel, as mpi is not initialized yet.\n\n.. note:: We recommend to use `dask_dataframe` instead of `dask`, as it is in practice more stable for large computations.\n\n## Funding & Acknowledgment\n\nThe development of this software was supported by funding to the Blue Brain Project, a research\ncenter of the \u00c9cole polytechnique f\u00e9d\u00e9rale de Lausanne (EPFL), from the Swiss government\u2019s ETH\nBoard of the Swiss Federal Institutes of Technology.\n\nFor license and authors, see `LICENSE.txt` and `AUTHORS.md` respectively.\n\nCopyright \u00a9 2023-2024 Blue Brain Project/EPFL\n",
    "bugtrack_url": null,
    "license": "Apache License 2.0",
    "summary": "Provides an embarrassingly parallel tool with sql backend.",
    "version": "0.2.2",
    "project_urls": {
        "Homepage": "https://BluePyParallel.readthedocs.io",
        "Repository": "https://github.com/BlueBrain/BluePyParallel",
        "Tracker": "https://github.com/BlueBrain/BluePyParallel/issues"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "704707fb5c465fa2b06678997587093a1d2269e14c454fd7759d085f61bde824",
                "md5": "9b290bc422b445ab964ea0c544f40b55",
                "sha256": "6b51e3ea7b1ffb466aebc1893e93a41ebb94889ebcda06887837b6eb5bbb474c"
            },
            "downloads": -1,
            "filename": "BluePyParallel-0.2.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "9b290bc422b445ab964ea0c544f40b55",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 20169,
            "upload_time": "2024-03-13T09:53:48",
            "upload_time_iso_8601": "2024-03-13T09:53:48.207672Z",
            "url": "https://files.pythonhosted.org/packages/70/47/07fb5c465fa2b06678997587093a1d2269e14c454fd7759d085f61bde824/BluePyParallel-0.2.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "cb90ee5d6a8ffadb0d242bee7d0e173a13706bffc215de140fb49bda5fd8cb6a",
                "md5": "0fc27dbcafb58a6923da8e19d9258663",
                "sha256": "25d93618fc78900475f4b536613b3b5019bc98a411dd98b7b18259a59dfa951b"
            },
            "downloads": -1,
            "filename": "BluePyParallel-0.2.2.tar.gz",
            "has_sig": false,
            "md5_digest": "0fc27dbcafb58a6923da8e19d9258663",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 46243,
            "upload_time": "2024-03-13T09:53:49",
            "upload_time_iso_8601": "2024-03-13T09:53:49.688457Z",
            "url": "https://files.pythonhosted.org/packages/cb/90/ee5d6a8ffadb0d242bee7d0e173a13706bffc215de140fb49bda5fd8cb6a/BluePyParallel-0.2.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-03-13 09:53:49",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "BlueBrain",
    "github_project": "BluePyParallel",
    "travis_ci": false,
    "coveralls": true,
    "github_actions": true,
    "tox": true,
    "lcname": "bluepyparallel"
}
        
Elapsed time: 0.23519s