![Python Version](https://img.shields.io/badge/python-3.8+-blue)
[![PyPI - Version](https://img.shields.io/pypi/v/dejaq)](https://pypi.org/project/dejaq/)
[![Conda Version](https://img.shields.io/conda/v/danionella/dejaq)](https://anaconda.org/danionella/dejaq)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
![GitHub last commit](https://img.shields.io/github/last-commit/danionella/dejaq)
# Déjà Queue
A fast alternative to `multiprocessing.Queue`. Faster, because it takes advantage of a shared memory ring buffer (rather than slow pipes) and [pickle protocol 5 out-of-band data](https://peps.python.org/pep-0574/) to minimize copies. [`dejaq.DejaQueue`](#dejaqdejaqueue) supports any type of [picklable](https://docs.python.org/3/library/pickle.html#what-can-be-pickled-and-unpickled) Python object, including numpy arrays or nested dictionaries with mixed content.
<img src="https://github.com/user-attachments/assets/00465436-47f8-4b2a-a236-d288ee34df28" width="100%">
The speed advantege of `DejaQueue` becomes substantial for items of > 1 MB size. It enables efficient inter-job communication in big-data processing pipelines, which can be implemented in a few lines of code with [`dejaq.Parallel`](#dejaqparallel).
Auto-generated (minimal) API documentation: https://danionella.github.io/dejaq
## Installation
- `conda install danionella::dejaq `
- or, if you prefer pip: `pip install dejaq`
- for development, clone this repository, navigate to the root directory and type `pip install -e .`
## Examples
### `dejaq.DejaQueue`
```python
import numpy as np
from multiprocessing import Process
from dejaq import DejaQueue
def produce(queue):
for i in range(10):
arr = np.random.randn(100,200,300)
data = dict(array=arr, i=i)
queue.put(data)
print(f'produced {type(arr)} {arr.shape} {arr.dtype}; meta: {i}; hash: {hash(arr.tobytes())}\n', flush=True)
def consume(queue, pid):
while True:
data = queue.get()
array, i = data['array'], data['i']
print(f'consumer {pid} consumed {type(array)} {array.shape} {array.dtype}; index: {i}; hash: {hash(array.tobytes())}\n', flush=True)
queue = DejaQueue(buffer_bytes=100e6)
producer = Process(target=produce, args=(queue,))
consumers = [Process(target=consume, args=(queue, pid)) for pid in range(3)]
for c in consumers:
c.start()
producer.start()
```
### `dejaq.Parallel`
The following examples show how to use `dejaq.Parallel` to parallelize a function or a class, and how to create job pipelines.
Here we execute a function and map iterable inputs across 10 workers. To enable pipelining, the results of each stage are provided as iterable generator. Use the `.compute()` method to get the final result (note that each stage pre-fetches results from `n_workers` calls, so some of the execution already starts before `.compute`). Results are always ordered.
```python
from time import sleep
from dejaq import Parallel
def slow_function(arg):
sleep(1.0)
return arg + 5
input_iterable = range(100)
slow_function = Parallel(n_workers=10)(slow_function)
stage = slow_function(input_iterable)
result = stage.compute() # or list(stage)
# or shorter:
result = Parallel(n_workers=10)(slow_function)(input_iterable).compute()
```
You can also use `Parallel` as a function decorator:
```python
@Parallel(n_workers=10)
def slow_function_decorated(arg):
sleep(1.0)
return arg + 5
result = slow_function_decorated(input_iterable).compute()
```
Similarly, you can decorate a class. It will be instantiated within a worker. Iterable items will be fed to the `__call__` method. Note how the additional init arguments are provided:
```python
@Parallel(n_workers=1)
class Reader:
def __init__(self, arg1):
self.arg1 = arg1
def __call__(self, item):
return item + self.arg1
result = Reader(arg1=0.5)(input_iterable).compute()
```
Finally, you can create pipelines of chained jobs. In this example, we have a single threaded reader and consumer, but a parallel processing stage (an example use case is sequentially reading a file, compressing chunks in parallel and then sequentially writing to an output file):
```python
@Parallel(n_workers=1)
class Producer:
def __init__(self, arg1):
self.arg1 = arg1
def __call__(self, item):
return item + self.arg1
@Parallel(n_workers=10)
class Processor:
def __init__(self, arg1):
self.arg1 = arg1
def __call__(self, arg):
sleep(1.0) #simulating a slow function
return arg * self.arg1
@Parallel(n_workers=1)
class Consumer:
def __init__(self, arg1):
self.arg1 = arg1
def __call__(self, arg):
return arg - self.arg1
input_iterable = range(100)
stage1 = Producer(0.5)(input_iterable)
stage2 = Processor(10.0)(stage1)
stage3 = Consumer(1000)(stage2)
result = stage3.compute()
# or:
result = Consumer(1000)(Processor(10.0)(Producer(0.5)(input_iterable))).compute()
```
# See also
- [ArrayQueues](https://github.com/portugueslab/arrayqueues)
- [joblib.Parallel](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html)
- [Déjà Q](https://en.wikipedia.org/wiki/Deja_Q)
Raw data
{
"_id": null,
"home_page": null,
"name": "dejaq",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": null,
"keywords": "multiprocessing, queue, shared_memory",
"author": "jlab.berlin, Benjamin Judkewitz",
"author_email": null,
"download_url": "https://files.pythonhosted.org/packages/a1/e6/5751cbfbb53e1bcc4c258c8ea5f1a834ae4d1fe70166ffb5703f7f22020a/dejaq-0.0.8.tar.gz",
"platform": null,
"description": "\n![Python Version](https://img.shields.io/badge/python-3.8+-blue)\n[![PyPI - Version](https://img.shields.io/pypi/v/dejaq)](https://pypi.org/project/dejaq/)\n[![Conda Version](https://img.shields.io/conda/v/danionella/dejaq)](https://anaconda.org/danionella/dejaq)\n[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)\n![GitHub last commit](https://img.shields.io/github/last-commit/danionella/dejaq)\n\n# D\u00e9j\u00e0 Queue\n\nA fast alternative to `multiprocessing.Queue`. Faster, because it takes advantage of a shared memory ring buffer (rather than slow pipes) and [pickle protocol 5 out-of-band data](https://peps.python.org/pep-0574/) to minimize copies. [`dejaq.DejaQueue`](#dejaqdejaqueue) supports any type of [picklable](https://docs.python.org/3/library/pickle.html#what-can-be-pickled-and-unpickled) Python object, including numpy arrays or nested dictionaries with mixed content.\n\n<img src=\"https://github.com/user-attachments/assets/00465436-47f8-4b2a-a236-d288ee34df28\" width=\"100%\">\n\nThe speed advantege of `DejaQueue` becomes substantial for items of > 1 MB size. It enables efficient inter-job communication in big-data processing pipelines, which can be implemented in a few lines of code with [`dejaq.Parallel`](#dejaqparallel).\n\nAuto-generated (minimal) API documentation: https://danionella.github.io/dejaq\n\n\n## Installation\n- `conda install danionella::dejaq `\n\n- or, if you prefer pip: `pip install dejaq`\n\n- for development, clone this repository, navigate to the root directory and type `pip install -e .`\n\n## Examples\n### `dejaq.DejaQueue`\n```python\nimport numpy as np\nfrom multiprocessing import Process\nfrom dejaq import DejaQueue\n\ndef produce(queue):\n for i in range(10):\n arr = np.random.randn(100,200,300)\n data = dict(array=arr, i=i)\n queue.put(data)\n print(f'produced {type(arr)} {arr.shape} {arr.dtype}; meta: {i}; hash: {hash(arr.tobytes())}\\n', flush=True)\n\ndef consume(queue, pid):\n while True:\n data = queue.get()\n array, i = data['array'], data['i']\n print(f'consumer {pid} consumed {type(array)} {array.shape} {array.dtype}; index: {i}; hash: {hash(array.tobytes())}\\n', flush=True)\n\nqueue = DejaQueue(buffer_bytes=100e6)\nproducer = Process(target=produce, args=(queue,))\nconsumers = [Process(target=consume, args=(queue, pid)) for pid in range(3)]\nfor c in consumers:\n c.start()\nproducer.start()\n```\n\n\n### `dejaq.Parallel`\nThe following examples show how to use `dejaq.Parallel` to parallelize a function or a class, and how to create job pipelines.\n\nHere we execute a function and map iterable inputs across 10 workers. To enable pipelining, the results of each stage are provided as iterable generator. Use the `.compute()` method to get the final result (note that each stage pre-fetches results from `n_workers` calls, so some of the execution already starts before `.compute`). Results are always ordered.\n\n```python\nfrom time import sleep\nfrom dejaq import Parallel\n\ndef slow_function(arg):\n sleep(1.0)\n return arg + 5\n\ninput_iterable = range(100)\nslow_function = Parallel(n_workers=10)(slow_function)\nstage = slow_function(input_iterable)\nresult = stage.compute() # or list(stage)\n# or shorter: \nresult = Parallel(n_workers=10)(slow_function)(input_iterable).compute()\n```\n\nYou can also use `Parallel` as a function decorator:\n```python\n@Parallel(n_workers=10)\ndef slow_function_decorated(arg):\n sleep(1.0)\n return arg + 5\n\nresult = slow_function_decorated(input_iterable).compute()\n```\n\nSimilarly, you can decorate a class. It will be instantiated within a worker. Iterable items will be fed to the `__call__` method. Note how the additional init arguments are provided:\n```python\n@Parallel(n_workers=1)\nclass Reader:\n def __init__(self, arg1):\n self.arg1 = arg1\n def __call__(self, item):\n return item + self.arg1\n\nresult = Reader(arg1=0.5)(input_iterable).compute()\n```\n\nFinally, you can create pipelines of chained jobs. In this example, we have a single threaded reader and consumer, but a parallel processing stage (an example use case is sequentially reading a file, compressing chunks in parallel and then sequentially writing to an output file):\n```python\n@Parallel(n_workers=1)\nclass Producer:\n def __init__(self, arg1):\n self.arg1 = arg1\n def __call__(self, item):\n return item + self.arg1\n\n@Parallel(n_workers=10)\nclass Processor:\n def __init__(self, arg1):\n self.arg1 = arg1\n def __call__(self, arg):\n sleep(1.0) #simulating a slow function\n return arg * self.arg1\n\n@Parallel(n_workers=1)\nclass Consumer:\n def __init__(self, arg1):\n self.arg1 = arg1\n def __call__(self, arg):\n return arg - self.arg1\n\ninput_iterable = range(100)\nstage1 = Producer(0.5)(input_iterable)\nstage2 = Processor(10.0)(stage1)\nstage3 = Consumer(1000)(stage2)\nresult = stage3.compute()\n\n# or:\nresult = Consumer(1000)(Processor(10.0)(Producer(0.5)(input_iterable))).compute()\n```\n\n# See also\n- [ArrayQueues](https://github.com/portugueslab/arrayqueues) \n- [joblib.Parallel](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html)\n- [D\u00e9j\u00e0 Q](https://en.wikipedia.org/wiki/Deja_Q)\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "D\u00e9j\u00e0 Queue \u2013 A fast multiprocessing queue for Python",
"version": "0.0.8",
"project_urls": {
"Homepage": "https://github.com/danionella/dejaq"
},
"split_keywords": [
"multiprocessing",
" queue",
" shared_memory"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "68ed03332b7a5e0f026b67335123e4cb9c530b80ee6855034129b5fc25aa9ccf",
"md5": "f275f595cb0a07259d5b85068feaa626",
"sha256": "e11a59c86fbc521ddddc86fd2d09f67208ec6d8558c210295d630c4975d5aa63"
},
"downloads": -1,
"filename": "dejaq-0.0.8-py3-none-any.whl",
"has_sig": false,
"md5_digest": "f275f595cb0a07259d5b85068feaa626",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 8672,
"upload_time": "2024-09-16T14:26:18",
"upload_time_iso_8601": "2024-09-16T14:26:18.258991Z",
"url": "https://files.pythonhosted.org/packages/68/ed/03332b7a5e0f026b67335123e4cb9c530b80ee6855034129b5fc25aa9ccf/dejaq-0.0.8-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "a1e65751cbfbb53e1bcc4c258c8ea5f1a834ae4d1fe70166ffb5703f7f22020a",
"md5": "63976954fcb5b3f0d3701e040c4e2010",
"sha256": "9c55943c89bc30274eb42f3f866524f3e7660274ad81160ba892a989702b0273"
},
"downloads": -1,
"filename": "dejaq-0.0.8.tar.gz",
"has_sig": false,
"md5_digest": "63976954fcb5b3f0d3701e040c4e2010",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 8066,
"upload_time": "2024-09-16T14:26:19",
"upload_time_iso_8601": "2024-09-16T14:26:19.681761Z",
"url": "https://files.pythonhosted.org/packages/a1/e6/5751cbfbb53e1bcc4c258c8ea5f1a834ae4d1fe70166ffb5703f7f22020a/dejaq-0.0.8.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-09-16 14:26:19",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "danionella",
"github_project": "dejaq",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "dejaq"
}