aioprocessing


Nameaioprocessing JSON
Version 2.0.1 PyPI version JSON
download
home_page
SummaryA Python 3.5+ library that integrates the multiprocessing module with asyncio.
upload_time2022-09-15 20:33:26
maintainer
docs_urlNone
author
requires_python>=3.5
license
keywords asyncio multiprocessing coroutine
VCS
bugtrack_url
requirements flake8
Travis-CI No Travis.
coveralls test coverage No coveralls.
            aioprocessing
=============
[![Build Status](https://github.com/dano/aioprocessing/workflows/aioprocessing%20tests/badge.svg?branch=master)](https://github.com/dano/aioprocessing/actions)


`aioprocessing` provides asynchronous, [`asyncio`](https://docs.python.org/3/library/asyncio.html) compatible, coroutine 
versions of many blocking instance methods on objects in the [`multiprocessing`](https://docs.python.org/3/library/multiprocessing.html) 
library. To use [`dill`](https://pypi.org/project/dill) for universal pickling, install using `pip install aioprocessing[dill]`. Here's an example demonstrating the `aioprocessing` versions of 
`Event`, `Queue`, and `Lock`:

```python
import time
import asyncio
import aioprocessing


def func(queue, event, lock, items):
    """ Demo worker function.

    This worker function runs in its own process, and uses
    normal blocking calls to aioprocessing objects, exactly 
    the way you would use oridinary multiprocessing objects.

    """
    with lock:
        event.set()
        for item in items:
            time.sleep(3)
            queue.put(item+5)
    queue.close()


async def example(queue, event, lock):
    l = [1,2,3,4,5]
    p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l))
    p.start()
    while True:
        result = await queue.coro_get()
        if result is None:
            break
        print("Got result {}".format(result))
    await p.coro_join()

async def example2(queue, event, lock):
    await event.coro_wait()
    async with lock:
        await queue.coro_put(78)
        await queue.coro_put(None) # Shut down the worker

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    queue = aioprocessing.AioQueue()
    lock = aioprocessing.AioLock()
    event = aioprocessing.AioEvent()
    tasks = [
        asyncio.ensure_future(example(queue, event, lock)), 
        asyncio.ensure_future(example2(queue, event, lock)),
    ]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
```

The aioprocessing objects can be used just like their multiprocessing
equivalents - as they are in `func` above - but they can also be 
seamlessly used inside of `asyncio` coroutines, without ever blocking
the event loop.


What's new
----------
`v2.0.1`
- Fixed a bug that kept the `AioBarrier` and `AioEvent` proxies returned from `AioManager` instances from working. Thanks to Giorgos Apostolopoulos for the fix.

`v2.0.0`

- Add support for universal pickling using [`dill`](https://github.com/uqfoundation/dill), installable with `pip install aioprocessing[dill]`. The library will now attempt to import [`multiprocess`](https://github.com/uqfoundation/multiprocess), falling back to stdlib `multiprocessing`. Force stdlib behaviour by setting a non-empty environment variable `AIOPROCESSING_DILL_DISABLED=1`. This can be used to avoid [errors](https://github.com/dano/aioprocessing/pull/36#discussion_r631178933) when attempting to combine `aioprocessing[dill]` with stdlib `multiprocessing` based objects like `concurrent.futures.ProcessPoolExecutor`.


How does it work?
-----------------

In most cases, this library makes blocking calls to `multiprocessing` methods
asynchronous by executing the call in a [`ThreadPoolExecutor`](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor), using
[`asyncio.run_in_executor()`](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.BaseEventLoop.run_in_executor). 
It does *not* re-implement multiprocessing using asynchronous I/O. This means 
there is extra overhead added when you use `aioprocessing` objects instead of 
`multiprocessing` objects, because each one is generally introducing a
`ThreadPoolExecutor` containing at least one [`threading.Thread`](https://docs.python.org/2/library/threading.html#thread-objects). It also means 
that all the normal risks you get when you mix threads with fork apply here, too 
(See http://bugs.python.org/issue6721 for more info).

The one exception to this is `aioprocessing.AioPool`, which makes use of the 
existing `callback` and `error_callback` keyword arguments in the various 
[`Pool.*_async`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.apply_async) methods to run them as `asyncio` coroutines. Note that 
`multiprocessing.Pool` is actually using threads internally, so the thread/fork
mixing caveat still applies.

Each `multiprocessing` class is replaced by an equivalent `aioprocessing` class,
distinguished by the `Aio` prefix. So, `Pool` becomes `AioPool`, etc. All methods
that could block on I/O also have a coroutine version that can be used with `asyncio`. For example, `multiprocessing.Lock.acquire()` can be replaced with `aioprocessing.AioLock.coro_acquire()`. You can pass an `asyncio` EventLoop object to any `coro_*` method using the `loop` keyword argument. For example, `lock.coro_acquire(loop=my_loop)`.

Note that you can also use the `aioprocessing` synchronization primitives as replacements 
for their equivalent `threading` primitives, in single-process, multi-threaded programs 
that use `asyncio`.


What parts of multiprocessing are supported?
--------------------------------------------

Most of them! All methods that could do blocking I/O in the following objects
have equivalent versions in `aioprocessing` that extend the `multiprocessing`
versions by adding coroutine versions of all the blocking methods.

- `Pool`
- `Process`
- `Pipe`
- `Lock`
- `RLock`
- `Semaphore`
- `BoundedSemaphore`
- `Event`
- `Condition`
- `Barrier`
- `connection.Connection`
- `connection.Listener`
- `connection.Client`
- `Queue`
- `JoinableQueue`
- `SimpleQueue`
- All `managers.SyncManager` `Proxy` versions of the items above (`SyncManager.Queue`, `SyncManager.Lock()`, etc.).


What versions of Python are compatible?
---------------------------------------

`aioprocessing` will work out of the box on Python 3.5+.

Gotchas
-------
Keep in mind that, while the API exposes coroutines for interacting with
`multiprocessing` APIs, internally they are almost always being delegated
to a `ThreadPoolExecutor`, this means the caveats that apply with using
`ThreadPoolExecutor` with `asyncio` apply: namely, you won't be able to
cancel any of the coroutines, because the work being done in the worker
thread can't be interrupted.


            

Raw data

            {
    "_id": null,
    "home_page": "",
    "name": "aioprocessing",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.5",
    "maintainer_email": "",
    "keywords": "asyncio,multiprocessing,coroutine",
    "author": "",
    "author_email": "Dan O'Reilly <oreilldf@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/4d/85/9a75151e7049bf144c01384279201d82d99484bd658f8e6fb013552d8724/aioprocessing-2.0.1.tar.gz",
    "platform": null,
    "description": "aioprocessing\n=============\n[![Build Status](https://github.com/dano/aioprocessing/workflows/aioprocessing%20tests/badge.svg?branch=master)](https://github.com/dano/aioprocessing/actions)\n\n\n`aioprocessing` provides asynchronous, [`asyncio`](https://docs.python.org/3/library/asyncio.html) compatible, coroutine \nversions of many blocking instance methods on objects in the [`multiprocessing`](https://docs.python.org/3/library/multiprocessing.html) \nlibrary. To use [`dill`](https://pypi.org/project/dill) for universal pickling, install using `pip install aioprocessing[dill]`. Here's an example demonstrating the `aioprocessing` versions of \n`Event`, `Queue`, and `Lock`:\n\n```python\nimport time\nimport asyncio\nimport aioprocessing\n\n\ndef func(queue, event, lock, items):\n    \"\"\" Demo worker function.\n\n    This worker function runs in its own process, and uses\n    normal blocking calls to aioprocessing objects, exactly \n    the way you would use oridinary multiprocessing objects.\n\n    \"\"\"\n    with lock:\n        event.set()\n        for item in items:\n            time.sleep(3)\n            queue.put(item+5)\n    queue.close()\n\n\nasync def example(queue, event, lock):\n    l = [1,2,3,4,5]\n    p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l))\n    p.start()\n    while True:\n        result = await queue.coro_get()\n        if result is None:\n            break\n        print(\"Got result {}\".format(result))\n    await p.coro_join()\n\nasync def example2(queue, event, lock):\n    await event.coro_wait()\n    async with lock:\n        await queue.coro_put(78)\n        await queue.coro_put(None) # Shut down the worker\n\nif __name__ == \"__main__\":\n    loop = asyncio.get_event_loop()\n    queue = aioprocessing.AioQueue()\n    lock = aioprocessing.AioLock()\n    event = aioprocessing.AioEvent()\n    tasks = [\n        asyncio.ensure_future(example(queue, event, lock)), \n        asyncio.ensure_future(example2(queue, event, lock)),\n    ]\n    loop.run_until_complete(asyncio.wait(tasks))\n    loop.close()\n```\n\nThe aioprocessing objects can be used just like their multiprocessing\nequivalents - as they are in `func` above - but they can also be \nseamlessly used inside of `asyncio` coroutines, without ever blocking\nthe event loop.\n\n\nWhat's new\n----------\n`v2.0.1`\n- Fixed a bug that kept the `AioBarrier` and `AioEvent` proxies returned from `AioManager` instances from working. Thanks to Giorgos Apostolopoulos for the fix.\n\n`v2.0.0`\n\n- Add support for universal pickling using [`dill`](https://github.com/uqfoundation/dill), installable with `pip install aioprocessing[dill]`. The library will now attempt to import [`multiprocess`](https://github.com/uqfoundation/multiprocess), falling back to stdlib `multiprocessing`. Force stdlib behaviour by setting a non-empty environment variable `AIOPROCESSING_DILL_DISABLED=1`. This can be used to avoid [errors](https://github.com/dano/aioprocessing/pull/36#discussion_r631178933) when attempting to combine `aioprocessing[dill]` with stdlib `multiprocessing` based objects like `concurrent.futures.ProcessPoolExecutor`.\n\n\nHow does it work?\n-----------------\n\nIn most cases, this library makes blocking calls to `multiprocessing` methods\nasynchronous by executing the call in a [`ThreadPoolExecutor`](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor), using\n[`asyncio.run_in_executor()`](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.BaseEventLoop.run_in_executor). \nIt does *not* re-implement multiprocessing using asynchronous I/O. This means \nthere is extra overhead added when you use `aioprocessing` objects instead of \n`multiprocessing` objects, because each one is generally introducing a\n`ThreadPoolExecutor` containing at least one [`threading.Thread`](https://docs.python.org/2/library/threading.html#thread-objects). It also means \nthat all the normal risks you get when you mix threads with fork apply here, too \n(See http://bugs.python.org/issue6721 for more info).\n\nThe one exception to this is `aioprocessing.AioPool`, which makes use of the \nexisting `callback` and `error_callback` keyword arguments in the various \n[`Pool.*_async`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.apply_async) methods to run them as `asyncio` coroutines. Note that \n`multiprocessing.Pool` is actually using threads internally, so the thread/fork\nmixing caveat still applies.\n\nEach `multiprocessing` class is replaced by an equivalent `aioprocessing` class,\ndistinguished by the `Aio` prefix. So, `Pool` becomes `AioPool`, etc. All methods\nthat could block on I/O also have a coroutine version that can be used with `asyncio`. For example, `multiprocessing.Lock.acquire()` can be replaced with `aioprocessing.AioLock.coro_acquire()`. You can pass an `asyncio` EventLoop object to any `coro_*` method using the `loop` keyword argument. For example, `lock.coro_acquire(loop=my_loop)`.\n\nNote that you can also use the `aioprocessing` synchronization primitives as replacements \nfor their equivalent `threading` primitives, in single-process, multi-threaded programs \nthat use `asyncio`.\n\n\nWhat parts of multiprocessing are supported?\n--------------------------------------------\n\nMost of them! All methods that could do blocking I/O in the following objects\nhave equivalent versions in `aioprocessing` that extend the `multiprocessing`\nversions by adding coroutine versions of all the blocking methods.\n\n- `Pool`\n- `Process`\n- `Pipe`\n- `Lock`\n- `RLock`\n- `Semaphore`\n- `BoundedSemaphore`\n- `Event`\n- `Condition`\n- `Barrier`\n- `connection.Connection`\n- `connection.Listener`\n- `connection.Client`\n- `Queue`\n- `JoinableQueue`\n- `SimpleQueue`\n- All `managers.SyncManager` `Proxy` versions of the items above (`SyncManager.Queue`, `SyncManager.Lock()`, etc.).\n\n\nWhat versions of Python are compatible?\n---------------------------------------\n\n`aioprocessing` will work out of the box on Python 3.5+.\n\nGotchas\n-------\nKeep in mind that, while the API exposes coroutines for interacting with\n`multiprocessing` APIs, internally they are almost always being delegated\nto a `ThreadPoolExecutor`, this means the caveats that apply with using\n`ThreadPoolExecutor` with `asyncio` apply: namely, you won't be able to\ncancel any of the coroutines, because the work being done in the worker\nthread can't be interrupted.\n\n",
    "bugtrack_url": null,
    "license": "",
    "summary": "A Python 3.5+ library that integrates the multiprocessing module with asyncio.",
    "version": "2.0.1",
    "project_urls": {
        "Home": "https://github.com/dano/aioprocessing"
    },
    "split_keywords": [
        "asyncio",
        "multiprocessing",
        "coroutine"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "ea7b34129c3bb87078f37b1ca64b547e8669fdde00db9fa724f0b3a8ec54bb27",
                "md5": "47a9b48a62583b15ec38975edc084553",
                "sha256": "8fcac4b0108b72eb9df76e06a9d7e05720ee1e8330829d3fd53fa059879be586"
            },
            "downloads": -1,
            "filename": "aioprocessing-2.0.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "47a9b48a62583b15ec38975edc084553",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.5",
            "size": 14415,
            "upload_time": "2022-09-15T20:33:25",
            "upload_time_iso_8601": "2022-09-15T20:33:25.610312Z",
            "url": "https://files.pythonhosted.org/packages/ea/7b/34129c3bb87078f37b1ca64b547e8669fdde00db9fa724f0b3a8ec54bb27/aioprocessing-2.0.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "4d859a75151e7049bf144c01384279201d82d99484bd658f8e6fb013552d8724",
                "md5": "e533c65f5879374c27f266c2040405ea",
                "sha256": "fe01c7b1a38c78168611d3040e73d93036c3b7c8a649d636dc9ed7a3bc9b1ba2"
            },
            "downloads": -1,
            "filename": "aioprocessing-2.0.1.tar.gz",
            "has_sig": false,
            "md5_digest": "e533c65f5879374c27f266c2040405ea",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.5",
            "size": 12818,
            "upload_time": "2022-09-15T20:33:26",
            "upload_time_iso_8601": "2022-09-15T20:33:26.765370Z",
            "url": "https://files.pythonhosted.org/packages/4d/85/9a75151e7049bf144c01384279201d82d99484bd658f8e6fb013552d8724/aioprocessing-2.0.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2022-09-15 20:33:26",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "dano",
    "github_project": "aioprocessing",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "requirements": [
        {
            "name": "flake8",
            "specs": []
        }
    ],
    "lcname": "aioprocessing"
}
        
Elapsed time: 0.08156s