dejaq


Namedejaq JSON
Version 0.0.8 PyPI version JSON
download
home_pageNone
SummaryDéjà Queue – A fast multiprocessing queue for Python
upload_time2024-09-16 14:26:19
maintainerNone
docs_urlNone
authorjlab.berlin, Benjamin Judkewitz
requires_python>=3.8
licenseMIT
keywords multiprocessing queue shared_memory
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            
![Python Version](https://img.shields.io/badge/python-3.8+-blue)
[![PyPI - Version](https://img.shields.io/pypi/v/dejaq)](https://pypi.org/project/dejaq/)
[![Conda Version](https://img.shields.io/conda/v/danionella/dejaq)](https://anaconda.org/danionella/dejaq)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
![GitHub last commit](https://img.shields.io/github/last-commit/danionella/dejaq)

# Déjà Queue

A fast alternative to `multiprocessing.Queue`. Faster, because it takes advantage of a shared memory ring buffer (rather than slow pipes) and [pickle protocol 5 out-of-band data](https://peps.python.org/pep-0574/) to minimize copies. [`dejaq.DejaQueue`](#dejaqdejaqueue) supports any type of [picklable](https://docs.python.org/3/library/pickle.html#what-can-be-pickled-and-unpickled) Python object, including numpy arrays or nested dictionaries with mixed content.

<img src="https://github.com/user-attachments/assets/00465436-47f8-4b2a-a236-d288ee34df28" width="100%">

The speed advantege of `DejaQueue` becomes substantial for items of > 1 MB size. It enables efficient inter-job communication in big-data processing pipelines, which can be implemented in a few lines of code with [`dejaq.Parallel`](#dejaqparallel).

Auto-generated (minimal) API documentation: https://danionella.github.io/dejaq


## Installation
- `conda install danionella::dejaq `

- or, if you prefer pip: `pip install dejaq`

- for development, clone this repository, navigate to the root directory and type `pip install -e .`

## Examples
### `dejaq.DejaQueue`
```python
import numpy as np
from multiprocessing import Process
from dejaq import DejaQueue

def produce(queue):
    for i in range(10):
        arr = np.random.randn(100,200,300)
        data = dict(array=arr, i=i)
        queue.put(data)
        print(f'produced {type(arr)} {arr.shape} {arr.dtype}; meta: {i}; hash: {hash(arr.tobytes())}\n', flush=True)

def consume(queue, pid):
    while True:
        data = queue.get()
        array, i = data['array'], data['i']
        print(f'consumer {pid} consumed {type(array)} {array.shape} {array.dtype}; index: {i}; hash: {hash(array.tobytes())}\n', flush=True)

queue = DejaQueue(buffer_bytes=100e6)
producer = Process(target=produce, args=(queue,))
consumers = [Process(target=consume, args=(queue, pid)) for pid in range(3)]
for c in consumers:
    c.start()
producer.start()
```


### `dejaq.Parallel`
The following examples show how to use `dejaq.Parallel` to parallelize a function or a class, and how to create job pipelines.

Here we execute a function and map iterable inputs across 10 workers. To enable pipelining, the results of each stage are provided as iterable generator. Use the `.compute()` method to get the final result (note that each stage pre-fetches results from `n_workers` calls, so some of the execution already starts before `.compute`). Results are always ordered.

```python
from time import sleep
from dejaq import Parallel

def slow_function(arg):
    sleep(1.0)
    return arg + 5

input_iterable = range(100)
slow_function = Parallel(n_workers=10)(slow_function)
stage = slow_function(input_iterable)
result = stage.compute() # or list(stage)
# or shorter: 
result = Parallel(n_workers=10)(slow_function)(input_iterable).compute()
```

You can also use `Parallel` as a function decorator:
```python
@Parallel(n_workers=10)
def slow_function_decorated(arg):
    sleep(1.0)
    return arg + 5

result = slow_function_decorated(input_iterable).compute()
```

Similarly, you can decorate a class. It will be instantiated within a worker. Iterable items will be fed to the `__call__` method. Note how the additional init arguments are provided:
```python
@Parallel(n_workers=1)
class Reader:
    def __init__(self, arg1):
        self.arg1 = arg1
    def __call__(self, item):
        return item + self.arg1

result = Reader(arg1=0.5)(input_iterable).compute()
```

Finally, you can create pipelines of chained jobs. In this example, we have a single threaded reader and consumer, but a parallel processing stage (an example use case is sequentially reading a file, compressing chunks in parallel and then sequentially writing to an output file):
```python
@Parallel(n_workers=1)
class Producer:
    def __init__(self, arg1):
        self.arg1 = arg1
    def __call__(self, item):
        return item + self.arg1

@Parallel(n_workers=10)
class Processor:
    def __init__(self, arg1):
        self.arg1 = arg1
    def __call__(self, arg):
        sleep(1.0) #simulating a slow function
        return arg * self.arg1

@Parallel(n_workers=1)
class Consumer:
    def __init__(self, arg1):
        self.arg1 = arg1
    def __call__(self, arg):
        return arg - self.arg1

input_iterable = range(100)
stage1 = Producer(0.5)(input_iterable)
stage2 = Processor(10.0)(stage1)
stage3 = Consumer(1000)(stage2)
result = stage3.compute()

# or:
result = Consumer(1000)(Processor(10.0)(Producer(0.5)(input_iterable))).compute()
```

# See also
- [ArrayQueues](https://github.com/portugueslab/arrayqueues) 
- [joblib.Parallel](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html)
- [Déjà Q](https://en.wikipedia.org/wiki/Deja_Q)

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "dejaq",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": null,
    "keywords": "multiprocessing, queue, shared_memory",
    "author": "jlab.berlin, Benjamin Judkewitz",
    "author_email": null,
    "download_url": "https://files.pythonhosted.org/packages/a1/e6/5751cbfbb53e1bcc4c258c8ea5f1a834ae4d1fe70166ffb5703f7f22020a/dejaq-0.0.8.tar.gz",
    "platform": null,
    "description": "\n![Python Version](https://img.shields.io/badge/python-3.8+-blue)\n[![PyPI - Version](https://img.shields.io/pypi/v/dejaq)](https://pypi.org/project/dejaq/)\n[![Conda Version](https://img.shields.io/conda/v/danionella/dejaq)](https://anaconda.org/danionella/dejaq)\n[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)\n![GitHub last commit](https://img.shields.io/github/last-commit/danionella/dejaq)\n\n# D\u00e9j\u00e0 Queue\n\nA fast alternative to `multiprocessing.Queue`. Faster, because it takes advantage of a shared memory ring buffer (rather than slow pipes) and [pickle protocol 5 out-of-band data](https://peps.python.org/pep-0574/) to minimize copies. [`dejaq.DejaQueue`](#dejaqdejaqueue) supports any type of [picklable](https://docs.python.org/3/library/pickle.html#what-can-be-pickled-and-unpickled) Python object, including numpy arrays or nested dictionaries with mixed content.\n\n<img src=\"https://github.com/user-attachments/assets/00465436-47f8-4b2a-a236-d288ee34df28\" width=\"100%\">\n\nThe speed advantege of `DejaQueue` becomes substantial for items of > 1 MB size. It enables efficient inter-job communication in big-data processing pipelines, which can be implemented in a few lines of code with [`dejaq.Parallel`](#dejaqparallel).\n\nAuto-generated (minimal) API documentation: https://danionella.github.io/dejaq\n\n\n## Installation\n- `conda install danionella::dejaq `\n\n- or, if you prefer pip: `pip install dejaq`\n\n- for development, clone this repository, navigate to the root directory and type `pip install -e .`\n\n## Examples\n### `dejaq.DejaQueue`\n```python\nimport numpy as np\nfrom multiprocessing import Process\nfrom dejaq import DejaQueue\n\ndef produce(queue):\n    for i in range(10):\n        arr = np.random.randn(100,200,300)\n        data = dict(array=arr, i=i)\n        queue.put(data)\n        print(f'produced {type(arr)} {arr.shape} {arr.dtype}; meta: {i}; hash: {hash(arr.tobytes())}\\n', flush=True)\n\ndef consume(queue, pid):\n    while True:\n        data = queue.get()\n        array, i = data['array'], data['i']\n        print(f'consumer {pid} consumed {type(array)} {array.shape} {array.dtype}; index: {i}; hash: {hash(array.tobytes())}\\n', flush=True)\n\nqueue = DejaQueue(buffer_bytes=100e6)\nproducer = Process(target=produce, args=(queue,))\nconsumers = [Process(target=consume, args=(queue, pid)) for pid in range(3)]\nfor c in consumers:\n    c.start()\nproducer.start()\n```\n\n\n### `dejaq.Parallel`\nThe following examples show how to use `dejaq.Parallel` to parallelize a function or a class, and how to create job pipelines.\n\nHere we execute a function and map iterable inputs across 10 workers. To enable pipelining, the results of each stage are provided as iterable generator. Use the `.compute()` method to get the final result (note that each stage pre-fetches results from `n_workers` calls, so some of the execution already starts before `.compute`). Results are always ordered.\n\n```python\nfrom time import sleep\nfrom dejaq import Parallel\n\ndef slow_function(arg):\n    sleep(1.0)\n    return arg + 5\n\ninput_iterable = range(100)\nslow_function = Parallel(n_workers=10)(slow_function)\nstage = slow_function(input_iterable)\nresult = stage.compute() # or list(stage)\n# or shorter: \nresult = Parallel(n_workers=10)(slow_function)(input_iterable).compute()\n```\n\nYou can also use `Parallel` as a function decorator:\n```python\n@Parallel(n_workers=10)\ndef slow_function_decorated(arg):\n    sleep(1.0)\n    return arg + 5\n\nresult = slow_function_decorated(input_iterable).compute()\n```\n\nSimilarly, you can decorate a class. It will be instantiated within a worker. Iterable items will be fed to the `__call__` method. Note how the additional init arguments are provided:\n```python\n@Parallel(n_workers=1)\nclass Reader:\n    def __init__(self, arg1):\n        self.arg1 = arg1\n    def __call__(self, item):\n        return item + self.arg1\n\nresult = Reader(arg1=0.5)(input_iterable).compute()\n```\n\nFinally, you can create pipelines of chained jobs. In this example, we have a single threaded reader and consumer, but a parallel processing stage (an example use case is sequentially reading a file, compressing chunks in parallel and then sequentially writing to an output file):\n```python\n@Parallel(n_workers=1)\nclass Producer:\n    def __init__(self, arg1):\n        self.arg1 = arg1\n    def __call__(self, item):\n        return item + self.arg1\n\n@Parallel(n_workers=10)\nclass Processor:\n    def __init__(self, arg1):\n        self.arg1 = arg1\n    def __call__(self, arg):\n        sleep(1.0) #simulating a slow function\n        return arg * self.arg1\n\n@Parallel(n_workers=1)\nclass Consumer:\n    def __init__(self, arg1):\n        self.arg1 = arg1\n    def __call__(self, arg):\n        return arg - self.arg1\n\ninput_iterable = range(100)\nstage1 = Producer(0.5)(input_iterable)\nstage2 = Processor(10.0)(stage1)\nstage3 = Consumer(1000)(stage2)\nresult = stage3.compute()\n\n# or:\nresult = Consumer(1000)(Processor(10.0)(Producer(0.5)(input_iterable))).compute()\n```\n\n# See also\n- [ArrayQueues](https://github.com/portugueslab/arrayqueues) \n- [joblib.Parallel](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html)\n- [D\u00e9j\u00e0 Q](https://en.wikipedia.org/wiki/Deja_Q)\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "D\u00e9j\u00e0 Queue \u2013 A fast multiprocessing queue for Python",
    "version": "0.0.8",
    "project_urls": {
        "Homepage": "https://github.com/danionella/dejaq"
    },
    "split_keywords": [
        "multiprocessing",
        " queue",
        " shared_memory"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "68ed03332b7a5e0f026b67335123e4cb9c530b80ee6855034129b5fc25aa9ccf",
                "md5": "f275f595cb0a07259d5b85068feaa626",
                "sha256": "e11a59c86fbc521ddddc86fd2d09f67208ec6d8558c210295d630c4975d5aa63"
            },
            "downloads": -1,
            "filename": "dejaq-0.0.8-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "f275f595cb0a07259d5b85068feaa626",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 8672,
            "upload_time": "2024-09-16T14:26:18",
            "upload_time_iso_8601": "2024-09-16T14:26:18.258991Z",
            "url": "https://files.pythonhosted.org/packages/68/ed/03332b7a5e0f026b67335123e4cb9c530b80ee6855034129b5fc25aa9ccf/dejaq-0.0.8-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "a1e65751cbfbb53e1bcc4c258c8ea5f1a834ae4d1fe70166ffb5703f7f22020a",
                "md5": "63976954fcb5b3f0d3701e040c4e2010",
                "sha256": "9c55943c89bc30274eb42f3f866524f3e7660274ad81160ba892a989702b0273"
            },
            "downloads": -1,
            "filename": "dejaq-0.0.8.tar.gz",
            "has_sig": false,
            "md5_digest": "63976954fcb5b3f0d3701e040c4e2010",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 8066,
            "upload_time": "2024-09-16T14:26:19",
            "upload_time_iso_8601": "2024-09-16T14:26:19.681761Z",
            "url": "https://files.pythonhosted.org/packages/a1/e6/5751cbfbb53e1bcc4c258c8ea5f1a834ae4d1fe70166ffb5703f7f22020a/dejaq-0.0.8.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-09-16 14:26:19",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "danionella",
    "github_project": "dejaq",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "dejaq"
}
        
Elapsed time: 0.33574s