distex


Namedistex JSON
Version 0.7.2 PyPI version JSON
download
home_pagehttps://github.com/erdewit/distex
SummaryAsync distributed process pool using asyncio
upload_time2022-09-28 09:00:04
maintainer
docs_urlNone
authorEwald R. de Wit
requires_python
licenseBSD
keywords python asyncio parallel distributed computing process pool task queue
VCS
bugtrack_url
requirements dill cloudpickle eventkit
Travis-CI No Travis.
coveralls test coverage No coveralls.
            |Build| |PyVersion| |Status| |PyPiVersion| |License| |Docs|

Introduction
============

Distex offers a distributed process pool to utilize multiple CPUs or machines.
It uses
`asyncio <https://docs.python.org/3.6/library/asyncio.html>`_
to efficiently manage the worker processes.

Features:

* Scales from 1 to 1000's of processors;
* Can handle in the order of 50.000 small tasks per second;
* Easy to use with SSH (secure shell) hosts;
* Full async support;
* Maps over unbounded iterables;
* Compatible with
  `concurrent.futures.ProcessPool <https://docs.python.org/3/library/concurrent.futures.html>`_
  (or PEP3148_).


Installation
------------

::

    pip3 install -U distex

When using remote hosts then distex must be installed on those too.
Make sure that the ``distex_proc`` script can be found in the path.

For SSH hosts: Authentication should be done with SSH keys since there is
no support for passwords. The remote installation  can be tested with::

    ssh <host> distex_proc

Dependencies:

* Python_ version 3.6 or higher;
* On Unix the ``uvloop`` package is recommended: ``pip3 install uvloop``
* SSH client and server (optional).

Examples
--------

A process pool can have local and remote workers.
Here is a pool that uses 4 local workers:

.. code-block:: python

    from distex import Pool

    def f(x):
        return x*x

    pool = Pool(4)
    for y in pool.map(f, range(100)):
        print(y)

To create a pool that also uses 8 workers on host ``maxi``, using ssh:

.. code-block:: python

    pool = Pool(4, 'ssh://maxi/8')

To use a pool in combination with
`eventkit <https://github.com/erdewit/eventkit>`_:

.. code-block:: python

    from distex import Pool
    import eventkit as ev
    import bz2

    pool = Pool()
    # await pool  # un-comment in Jupyter
    data = [b'A' * 1000000] * 1000

    pipe = ev.Sequence(data).poolmap(pool, bz2.compress).map(len).mean().last()

    print(pipe.run())  # in Jupyter: print(await pipe)
    pool.shutdown()

There is full support for every asynchronous construct imaginable:

.. code-block:: python

    import asyncio
    from distex import Pool

    def init():
        # pool initializer: set the start time for every worker
        import time
        import builtins
        builtins.t0 = time.time()

    async def timer(i=0):
        # async code running in the pool
        import time
        import asyncio
        await asyncio.sleep(1)
        return time.time() - t0

    async def ait():
        # async iterator running on the user side
        for i in range(20):
            await asyncio.sleep(0.1)
            yield i

    async def main():
        async with Pool(4, initializer=init, qsize=1) as pool:
            async for t in pool.map_async(timer, ait()):
                print(t)
            print(await pool.run_on_all_async(timer))


    asyncio.run(main())


High level architecture
-----------------------

Distex does not use remote 'task servers'.
Instead it is done the other way around: A local
server is started first; Then the local and remote workers are started
and each of them will connect on its own back to the server. When all
workers have connected then the pool is ready for duty.

Each worker consists of a single-threaded process that is running
an asyncio event loop. This loop is used both for communication and for
running asynchronous tasks. Synchronous tasks are run in a blocking fashion.

When using ssh, a remote (or 'reverse') tunnel is created from a remote Unix socket
to the local Unix socket that the local server is listening on.
Multiple workers on a remote machine will use the same Unix socket and
share the same ssh tunnel.

The plain ``ssh`` executable is used instead of much nicer solutions such
as `AsyncSSH <https://github.com/ronf/asyncssh>`_. This is to keep the
CPU usage of encrypting/decrypting outside of the event loop and offload
it to the ``ssh`` process(es).

Documentation
-------------

`Distex documentation <http://rawgit.com/erdewit/distex/master/docs/html/api.html>`_


:author: Ewald de Wit <ewald.de.wit@gmail.com>

.. _Python: http://www.python.org

.. _ssh-keygen: https://linux.die.net/man/1/ssh-keygen

.. _ssh-copy-id: https://linux.die.net/man/1/ssh-copy-id

.. _PEP3148: https://www.python.org/dev/peps/pep-3148

.. |PyPiVersion| image:: https://img.shields.io/pypi/v/distex.svg
   :alt: PyPi
   :target: https://pypi.python.org/pypi/distex

.. |Build| image:: https://github.com/erdewit/distex/actions/workflows/test.yml/badge.svg?branch=master
   :alt: Build
   :target: https://github.com/erdewit/distex/actions

.. |PyVersion| image:: https://img.shields.io/badge/python-3.6+-blue.svg
   :alt:

.. |Status| image:: https://img.shields.io/badge/status-beta-green.svg
   :alt:

.. |License| image:: https://img.shields.io/badge/license-BSD-blue.svg
   :alt:

.. |Docs| image:: https://readthedocs.org/projects/distex/badge/?version=latest
   :alt: Documentation
   :target: https://distex.readthedocs.io/

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/erdewit/distex",
    "name": "distex",
    "maintainer": "",
    "docs_url": null,
    "requires_python": "",
    "maintainer_email": "",
    "keywords": "python asyncio parallel distributed computing process pool task queue",
    "author": "Ewald R. de Wit",
    "author_email": "ewald.de.wit@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/2e/fb/63ceb228a978316bbe8d902d3eb1677f85f775740c2a1ce35f394af69979/distex-0.7.2.tar.gz",
    "platform": null,
    "description": "|Build| |PyVersion| |Status| |PyPiVersion| |License| |Docs|\n\nIntroduction\n============\n\nDistex offers a distributed process pool to utilize multiple CPUs or machines.\nIt uses\n`asyncio <https://docs.python.org/3.6/library/asyncio.html>`_\nto efficiently manage the worker processes.\n\nFeatures:\n\n* Scales from 1 to 1000's of processors;\n* Can handle in the order of 50.000 small tasks per second;\n* Easy to use with SSH (secure shell) hosts;\n* Full async support;\n* Maps over unbounded iterables;\n* Compatible with\n  `concurrent.futures.ProcessPool <https://docs.python.org/3/library/concurrent.futures.html>`_\n  (or PEP3148_).\n\n\nInstallation\n------------\n\n::\n\n    pip3 install -U distex\n\nWhen using remote hosts then distex must be installed on those too.\nMake sure that the ``distex_proc`` script can be found in the path.\n\nFor SSH hosts: Authentication should be done with SSH keys since there is\nno support for passwords. The remote installation  can be tested with::\n\n    ssh <host> distex_proc\n\nDependencies:\n\n* Python_ version 3.6 or higher;\n* On Unix the ``uvloop`` package is recommended: ``pip3 install uvloop``\n* SSH client and server (optional).\n\nExamples\n--------\n\nA process pool can have local and remote workers.\nHere is a pool that uses 4 local workers:\n\n.. code-block:: python\n\n    from distex import Pool\n\n    def f(x):\n        return x*x\n\n    pool = Pool(4)\n    for y in pool.map(f, range(100)):\n        print(y)\n\nTo create a pool that also uses 8 workers on host ``maxi``, using ssh:\n\n.. code-block:: python\n\n    pool = Pool(4, 'ssh://maxi/8')\n\nTo use a pool in combination with\n`eventkit <https://github.com/erdewit/eventkit>`_:\n\n.. code-block:: python\n\n    from distex import Pool\n    import eventkit as ev\n    import bz2\n\n    pool = Pool()\n    # await pool  # un-comment in Jupyter\n    data = [b'A' * 1000000] * 1000\n\n    pipe = ev.Sequence(data).poolmap(pool, bz2.compress).map(len).mean().last()\n\n    print(pipe.run())  # in Jupyter: print(await pipe)\n    pool.shutdown()\n\nThere is full support for every asynchronous construct imaginable:\n\n.. code-block:: python\n\n    import asyncio\n    from distex import Pool\n\n    def init():\n        # pool initializer: set the start time for every worker\n        import time\n        import builtins\n        builtins.t0 = time.time()\n\n    async def timer(i=0):\n        # async code running in the pool\n        import time\n        import asyncio\n        await asyncio.sleep(1)\n        return time.time() - t0\n\n    async def ait():\n        # async iterator running on the user side\n        for i in range(20):\n            await asyncio.sleep(0.1)\n            yield i\n\n    async def main():\n        async with Pool(4, initializer=init, qsize=1) as pool:\n            async for t in pool.map_async(timer, ait()):\n                print(t)\n            print(await pool.run_on_all_async(timer))\n\n\n    asyncio.run(main())\n\n\nHigh level architecture\n-----------------------\n\nDistex does not use remote 'task servers'.\nInstead it is done the other way around: A local\nserver is started first; Then the local and remote workers are started\nand each of them will connect on its own back to the server. When all\nworkers have connected then the pool is ready for duty.\n\nEach worker consists of a single-threaded process that is running\nan asyncio event loop. This loop is used both for communication and for\nrunning asynchronous tasks. Synchronous tasks are run in a blocking fashion.\n\nWhen using ssh, a remote (or 'reverse') tunnel is created from a remote Unix socket\nto the local Unix socket that the local server is listening on.\nMultiple workers on a remote machine will use the same Unix socket and\nshare the same ssh tunnel.\n\nThe plain ``ssh`` executable is used instead of much nicer solutions such\nas `AsyncSSH <https://github.com/ronf/asyncssh>`_. This is to keep the\nCPU usage of encrypting/decrypting outside of the event loop and offload\nit to the ``ssh`` process(es).\n\nDocumentation\n-------------\n\n`Distex documentation <http://rawgit.com/erdewit/distex/master/docs/html/api.html>`_\n\n\n:author: Ewald de Wit <ewald.de.wit@gmail.com>\n\n.. _Python: http://www.python.org\n\n.. _ssh-keygen: https://linux.die.net/man/1/ssh-keygen\n\n.. _ssh-copy-id: https://linux.die.net/man/1/ssh-copy-id\n\n.. _PEP3148: https://www.python.org/dev/peps/pep-3148\n\n.. |PyPiVersion| image:: https://img.shields.io/pypi/v/distex.svg\n   :alt: PyPi\n   :target: https://pypi.python.org/pypi/distex\n\n.. |Build| image:: https://github.com/erdewit/distex/actions/workflows/test.yml/badge.svg?branch=master\n   :alt: Build\n   :target: https://github.com/erdewit/distex/actions\n\n.. |PyVersion| image:: https://img.shields.io/badge/python-3.6+-blue.svg\n   :alt:\n\n.. |Status| image:: https://img.shields.io/badge/status-beta-green.svg\n   :alt:\n\n.. |License| image:: https://img.shields.io/badge/license-BSD-blue.svg\n   :alt:\n\n.. |Docs| image:: https://readthedocs.org/projects/distex/badge/?version=latest\n   :alt: Documentation\n   :target: https://distex.readthedocs.io/\n",
    "bugtrack_url": null,
    "license": "BSD",
    "summary": "Async distributed process pool using asyncio",
    "version": "0.7.2",
    "split_keywords": [
        "python",
        "asyncio",
        "parallel",
        "distributed",
        "computing",
        "process",
        "pool",
        "task",
        "queue"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "md5": "c706f3160cf71b920e88fa3b2018d637",
                "sha256": "7b6a735359a4d442efee8fb84885cc3224a6b0a31a9cdef6bd8f58abce00b1b7"
            },
            "downloads": -1,
            "filename": "distex-0.7.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "c706f3160cf71b920e88fa3b2018d637",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": null,
            "size": 19291,
            "upload_time": "2022-09-28T09:00:01",
            "upload_time_iso_8601": "2022-09-28T09:00:01.783282Z",
            "url": "https://files.pythonhosted.org/packages/99/ba/7a5f5b16337f6d36f830028c5c3574d5a9b07e7e4e4ed4867b0e6f3b991d/distex-0.7.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "md5": "e22c90e84fb2a59440bb101f79e4cc60",
                "sha256": "afc8439af107cfdb261dc5c95354c54886ede2cbae39d7a921b8ad2713c6d720"
            },
            "downloads": -1,
            "filename": "distex-0.7.2.tar.gz",
            "has_sig": false,
            "md5_digest": "e22c90e84fb2a59440bb101f79e4cc60",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 18237,
            "upload_time": "2022-09-28T09:00:04",
            "upload_time_iso_8601": "2022-09-28T09:00:04.226373Z",
            "url": "https://files.pythonhosted.org/packages/2e/fb/63ceb228a978316bbe8d902d3eb1677f85f775740c2a1ce35f394af69979/distex-0.7.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2022-09-28 09:00:04",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "github_user": "erdewit",
    "github_project": "distex",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "requirements": [
        {
            "name": "dill",
            "specs": []
        },
        {
            "name": "cloudpickle",
            "specs": []
        },
        {
            "name": "eventkit",
            "specs": []
        }
    ],
    "lcname": "distex"
}
        
Elapsed time: 0.01380s