just-distribute


Namejust-distribute JSON
Version 0.1.2 PyPI version JSON
download
home_pagehttps://github.com/jakubgajski/just_distribute
SummaryProvides a @distribute decorator that enables concurrent execution of functions without boilerplate code.
upload_time2024-07-14 08:27:37
maintainerNone
docs_urlNone
authorJakub Gajski
requires_python<4.0,>=3.10
licenseMIT
keywords concurrency parallelism concurrent parallel decorator distributed
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            ### Overview  
  
Haven't planned your functions for large workloads or don't feel comfortable with various options for concurrency?  
Please, don't be sad - just distribute.

### Installation  
  
```bash
pip install just-distibute
```
   
### TL;DR  
   
```python
from just_distribute import distribute

@distribute(job='compute', workers=8)  # job in ('compute', 'io', 'web', 'ray')
def your_time_consuming_func(*args):
    ...
```
  
### Getting Started  
   
Always make sure your function you want to distribute has proper typehints, because just_distribute makes some 
assumptions based on type annotations. Also, the data to be distributed shall be passed as positional arguments, 
keyword arguments are treated as constants.
  
#### CPU intensive tasks
  
Instead of:  
  
```python
def some_existing_cpu_intensive_function(x: int, y: int) -> int:
    ...

# slow, probably need to rewrite it ;(
results = []
for const1, const2 in zip(range(1000), range(4000, 2000, -2)):
    results.append(
        some_existing_cpu_intensive_function(const1, const2)    
    )
```
  
Do:  
  
```python
from just_distribute import distribute


@distribute(job='compute', workers=8)
def some_existing_cpu_intensive_function(x: int, y: int) -> int:
    ...

# <happy CPU fan noises>
results = some_existing_cpu_intensive_function(range(1000), range(4000, 2000, -2))
```  
   
<details>
<summary>Curious what happens in the background?</summary>
For the compute / processes type workload, ProcessPool from the pathos package is used to spawn given number of processes.  
The pathos package utilizes dill as the object serialization protocol, which is an enhanced variant of pickle.  
  
Roughly what happens when using ProcessPool:  
- ~~child~~of age worker processes with independent memory are spawned  
- dill is used to serialize chunks of data and the execution code  
- serialized stuff is send to workers for execution  
- after execution partial results are collected and aggregated in the parent process  
  
Why I'm not using standard library, e.g. ProcessPoolExecutor from concurrent.futures? 
Because default serialization protocol - pickle - is very... picky on what can be serialized 
and therefore send to workers, while dill is more forgiving.  
  
To read more visit e.g.:  
- [multiprocessing @ geeksforgeeks](https://www.geeksforgeeks.org/multiprocessing-python-set-1/)
- [multiprocessing @ superfastpython](https://superfastpython.com/multiprocessing-in-python/)
</details>
  
#### I/O intensive tasks
  
Instead of:  
  
```python
def some_existing_io_intensive_function(data_to_write: bytes, filename: str, verbose: bool = True):
    ...

# slow, probably need to rewrite it ;(
data_store: dict = ...  # some processed data to save
for name, data in data_store.items():
    some_existing_io_intensive_function(data, name)
```
  
Do:  
  
```python
from just_distribute import distribute


@distribute(job='io', workers=8)
def some_existing_io_intensive_function(data_to_write: bytes, filename: str, verbose: bool = True):
    ...

data_store: dict = ...  # some processed data to save
# <happy HDD noises???>
# any keyword arguments are not distributed :)
some_existing_io_intensive_function(data_store.values(), data_store.keys(), verbose=False)
```    
   
<details>
<summary>Curious what happens in the background?</summary>
For the io / threads type workload, ThreadPoolExecutor from the standard library is used.  
Intuitively, when some tasks can be executed independently of Python interpreter(e.g. by an external C++ library or by the OS), 
main program can just be busy overseeing them like the boss.   
  
Roughly what happens when using ThreadPoolExecutor:  
- within a shared memory, a number of threads is created  
- each thread (ideally) has an exclusive subset of data delegated  
- while threads are running, the parent process is constantly switching context between them, asking "already done?"  
- when all threads are done, main process can continue with whatever left to be done  
  
To read more visit e.g.:  
- [mutithreading @ geeksforgeeks](https://www.geeksforgeeks.org/multithreading-python-set-1/)  
- [what is multithreading @ clouddevs](https://clouddevs.com/python/multithreading/#11-What-is-Multithreading)
</details>
  
#### Somewhere over the network :guitar:
  
Instead of:

```python
def some_existing_web_requesting_function(data_to_send: dict, url: str, api_key: str):
    ...

# slow, probably need to rewrite it ;(
data_store: list[dict] = ...  # some data to process on a remote service
for data in data_store:
    some_existing_web_requesting_function(data, url="https://some_web_api.com/process", api_key="***")
```
  
Do:  
  
```python
from just_distribute import distribute


@distribute(job='web', workers=8)
def some_existing_web_requesting_function(data_to_send: dict, url: str, api_key: str):
    ...

data_store: list[dict] = ...  # some data to process on a remote service
# <happy router blinking>
some_existing_web_requesting_function(data_store, url="https://some_web_api.com/process", api_key="***")
```  
   
<details>
<summary>Curious what happens in the background?</summary>
For the web / coroutines type workload, asyncio from the standard library is used for concurrency.  
Async function is similar to a generator - it is an object which execution flow can be paused and resumed.  
  
Roughly what happens when using distribute decorator with io job:  
- a queue is fed with data elements  
- a number of consumers are created, so use has control on how many concurrent request may be sent  
- a single-thread event loop is spawned  
- consumer is being paused by the loop when waiting idle for the response or resumed when next piece of data can be consumed  
- when the whole queue is consumed, aggregated data is being returned  
  
To read more visit e.g.:  
- [asyncio @ docs.python](https://docs.python.org/3/library/asyncio.html)  
- [asyncio @ superfastpython](https://superfastpython.com/python-asyncio/)
</details>
  
#### Or in the existing Ray cluster  
  
Instead of:
  
```python
def some_existing_cpu_intensive_function(x: int, y: int) -> int:
    ...

# slow, probably need to rewrite it ;(
results = []
for const1, const2 in zip(range(1000), range(4000, 2000, -2)):
    results.append(
        some_existing_cpu_intensive_function(const1, const2)    
    )
```
  
Do:  
  
```python
from just_distribute import distribute


@distribute(job='ray')
def some_existing_cpu_intensive_function(x: int, y: int) -> int:
    ...

# <happy CPU fan noises on the cluster's host>
results = some_existing_cpu_intensive_function(range(1000), range(4000, 2000, -2))
```  
  
For instruction how to set up Ray cluster on bare metal or in the cloud, see: [Ray documentation](https://docs.ray.io/en/latest/cluster/vms/getting-started.html)  
   
<details>
<summary>Curious what happens in the background?</summary>
For the ray / cluster type workload, Ray Python library is used, that abstracts multiprocessing to a more general scenario.  
Instead of being constrained to the capabilities of a single machine, Ray can be scaled up to many of them.  
  
Roughly what happens when using Ray (on an already existing cluster):  
- tasks, a Ray flavor of futures (objects with a promise of having a value at some point), are send to the cluster  
- Ray automatically spawns required number of workers (number of workers is defined implicitly)  
- results are moved from workers to common object store (shared memory)  
- results can be pulled back and aggregated    
  
To read more visit e.g.:  
- [key concepts @ docs.ray](https://docs.ray.io/en/latest/ray-core/key-concepts.html)  
</details>
  
### More advanced cases  
  
When wrapped function by default takes iterable, autobatch takes care of it:  
  
```python
from just_distribute import distribute

@distribute(job='compute', workers=8, autobatch=True) # default autobatch is True, so you can just omit this parameter
def intensive_computation(numbers: list[int]):
    ...

a: list[int] = ...
intensive_computation(a)  # works fine
```
  
When wrapped function by default takes equal length iterables:  

```python
from just_distribute import distribute

@distribute(job='compute', workers=8, autobatch=False)  # default autobatch is True
def intensive_computation(numbers1: list[int], numbers2: list[int]):
    for n1, n2 in zip(numbers1, numbers2):
        ...

a: list[int] = ...
b: list[int] = ...
intensive_computation(a, b)  # TypeError: 'int' object is not iterable -> because autobatch is off 
# and wrapped function takes iterables as an input

# manually batched
a: list[list[int]] = ...
b: list[list[int]] = ...
assert len(a) == len(b)  # True
assert all([len(_a) == len(_b) for _a, _b in zip(a, b)])  # True -> properly, manually batched data
intensive_computation(a, b)  # works fine

# or just use default autobatch=True
a: list[int] = ...
b: list[int] = ...
intensive_computation(a, b)  # works fine
```  
  
When wrapped function by default takes possibly different length iterables:  
  
```python
from just_distribute import distribute
from itertools import product

@distribute(job='compute', workers=8, autobatch=False)  # default autobatch is True
def intensive_computation(numbers1: list[int], numbers2: list[int]):
    for n1, n2 in product(numbers1, numbers2):
        ...

# manually batched    
a: list[list[int]] = ...
b: list[list[int]] = ...
intensive_computation(a, b)  # works fine

# or autobatch=True
a: list[int] = ...
b: list[int] = ...
intensive_computation(a, numbers2=b)  # works fine in this certain example, because autobatch takes care of numbers1 
# and numbers2 is treated as a constant
```  
  
When wrapped function has mixed type, non-constant (in distribute sense) parameters:  
  
```python
from just_distribute import distribute
from collections.abc import Iterable

@distribute(job='compute', workers=8)
def intensive_computation(numbers: list[int], power: int, verbose: bool = True):
    ...    

a = list(range(1000)) * 100
b = range(100)
assert len(a) > len(b)
assert len(a) % len(b) == 0  # for every element in b there is N elements in a
intensive_computation(a, b, verbose=False)  # works fine

# or autobatch=False and data manually batched
a: list[list[int]] = ...
b: list[int] = ...
intensive_computation(a, b, verbose=False)  # works fine
```
            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/jakubgajski/just_distribute",
    "name": "just-distribute",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.10",
    "maintainer_email": null,
    "keywords": "concurrency, parallelism, concurrent, parallel, decorator, distributed",
    "author": "Jakub Gajski",
    "author_email": "jakub.gajski@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/cb/9b/18f0b8bdaf3990b857f411c054b4a9d7a29a16321ec7a2a096164a368e97/just_distribute-0.1.2.tar.gz",
    "platform": null,
    "description": "### Overview  \n  \nHaven't planned your functions for large workloads or don't feel comfortable with various options for concurrency?  \nPlease, don't be sad - just distribute.\n\n### Installation  \n  \n```bash\npip install just-distibute\n```\n   \n### TL;DR  \n   \n```python\nfrom just_distribute import distribute\n\n@distribute(job='compute', workers=8)  # job in ('compute', 'io', 'web', 'ray')\ndef your_time_consuming_func(*args):\n    ...\n```\n  \n### Getting Started  \n   \nAlways make sure your function you want to distribute has proper typehints, because just_distribute makes some \nassumptions based on type annotations. Also, the data to be distributed shall be passed as positional arguments, \nkeyword arguments are treated as constants.\n  \n#### CPU intensive tasks\n  \nInstead of:  \n  \n```python\ndef some_existing_cpu_intensive_function(x: int, y: int) -> int:\n    ...\n\n# slow, probably need to rewrite it ;(\nresults = []\nfor const1, const2 in zip(range(1000), range(4000, 2000, -2)):\n    results.append(\n        some_existing_cpu_intensive_function(const1, const2)    \n    )\n```\n  \nDo:  \n  \n```python\nfrom just_distribute import distribute\n\n\n@distribute(job='compute', workers=8)\ndef some_existing_cpu_intensive_function(x: int, y: int) -> int:\n    ...\n\n# <happy CPU fan noises>\nresults = some_existing_cpu_intensive_function(range(1000), range(4000, 2000, -2))\n```  \n   \n<details>\n<summary>Curious what happens in the background?</summary>\nFor the compute / processes type workload, ProcessPool from the pathos package is used to spawn given number of processes.  \nThe pathos package utilizes dill as the object serialization protocol, which is an enhanced variant of pickle.  \n  \nRoughly what happens when using ProcessPool:  \n- ~~child~~of age worker processes with independent memory are spawned  \n- dill is used to serialize chunks of data and the execution code  \n- serialized stuff is send to workers for execution  \n- after execution partial results are collected and aggregated in the parent process  \n  \nWhy I'm not using standard library, e.g. ProcessPoolExecutor from concurrent.futures? \nBecause default serialization protocol - pickle - is very... picky on what can be serialized \nand therefore send to workers, while dill is more forgiving.  \n  \nTo read more visit e.g.:  \n- [multiprocessing @ geeksforgeeks](https://www.geeksforgeeks.org/multiprocessing-python-set-1/)\n- [multiprocessing @ superfastpython](https://superfastpython.com/multiprocessing-in-python/)\n</details>\n  \n#### I/O intensive tasks\n  \nInstead of:  \n  \n```python\ndef some_existing_io_intensive_function(data_to_write: bytes, filename: str, verbose: bool = True):\n    ...\n\n# slow, probably need to rewrite it ;(\ndata_store: dict = ...  # some processed data to save\nfor name, data in data_store.items():\n    some_existing_io_intensive_function(data, name)\n```\n  \nDo:  \n  \n```python\nfrom just_distribute import distribute\n\n\n@distribute(job='io', workers=8)\ndef some_existing_io_intensive_function(data_to_write: bytes, filename: str, verbose: bool = True):\n    ...\n\ndata_store: dict = ...  # some processed data to save\n# <happy HDD noises???>\n# any keyword arguments are not distributed :)\nsome_existing_io_intensive_function(data_store.values(), data_store.keys(), verbose=False)\n```    \n   \n<details>\n<summary>Curious what happens in the background?</summary>\nFor the io / threads type workload, ThreadPoolExecutor from the standard library is used.  \nIntuitively, when some tasks can be executed independently of Python interpreter(e.g. by an external C++ library or by the OS), \nmain program can just be busy overseeing them like the boss.   \n  \nRoughly what happens when using ThreadPoolExecutor:  \n- within a shared memory, a number of threads is created  \n- each thread (ideally) has an exclusive subset of data delegated  \n- while threads are running, the parent process is constantly switching context between them, asking \"already done?\"  \n- when all threads are done, main process can continue with whatever left to be done  \n  \nTo read more visit e.g.:  \n- [mutithreading @ geeksforgeeks](https://www.geeksforgeeks.org/multithreading-python-set-1/)  \n- [what is multithreading @ clouddevs](https://clouddevs.com/python/multithreading/#11-What-is-Multithreading)\n</details>\n  \n#### Somewhere over the network :guitar:\n  \nInstead of:\n\n```python\ndef some_existing_web_requesting_function(data_to_send: dict, url: str, api_key: str):\n    ...\n\n# slow, probably need to rewrite it ;(\ndata_store: list[dict] = ...  # some data to process on a remote service\nfor data in data_store:\n    some_existing_web_requesting_function(data, url=\"https://some_web_api.com/process\", api_key=\"***\")\n```\n  \nDo:  \n  \n```python\nfrom just_distribute import distribute\n\n\n@distribute(job='web', workers=8)\ndef some_existing_web_requesting_function(data_to_send: dict, url: str, api_key: str):\n    ...\n\ndata_store: list[dict] = ...  # some data to process on a remote service\n# <happy router blinking>\nsome_existing_web_requesting_function(data_store, url=\"https://some_web_api.com/process\", api_key=\"***\")\n```  \n   \n<details>\n<summary>Curious what happens in the background?</summary>\nFor the web / coroutines type workload, asyncio from the standard library is used for concurrency.  \nAsync function is similar to a generator - it is an object which execution flow can be paused and resumed.  \n  \nRoughly what happens when using distribute decorator with io job:  \n- a queue is fed with data elements  \n- a number of consumers are created, so use has control on how many concurrent request may be sent  \n- a single-thread event loop is spawned  \n- consumer is being paused by the loop when waiting idle for the response or resumed when next piece of data can be consumed  \n- when the whole queue is consumed, aggregated data is being returned  \n  \nTo read more visit e.g.:  \n- [asyncio @ docs.python](https://docs.python.org/3/library/asyncio.html)  \n- [asyncio @ superfastpython](https://superfastpython.com/python-asyncio/)\n</details>\n  \n#### Or in the existing Ray cluster  \n  \nInstead of:\n  \n```python\ndef some_existing_cpu_intensive_function(x: int, y: int) -> int:\n    ...\n\n# slow, probably need to rewrite it ;(\nresults = []\nfor const1, const2 in zip(range(1000), range(4000, 2000, -2)):\n    results.append(\n        some_existing_cpu_intensive_function(const1, const2)    \n    )\n```\n  \nDo:  \n  \n```python\nfrom just_distribute import distribute\n\n\n@distribute(job='ray')\ndef some_existing_cpu_intensive_function(x: int, y: int) -> int:\n    ...\n\n# <happy CPU fan noises on the cluster's host>\nresults = some_existing_cpu_intensive_function(range(1000), range(4000, 2000, -2))\n```  \n  \nFor instruction how to set up Ray cluster on bare metal or in the cloud, see: [Ray documentation](https://docs.ray.io/en/latest/cluster/vms/getting-started.html)  \n   \n<details>\n<summary>Curious what happens in the background?</summary>\nFor the ray / cluster type workload, Ray Python library is used, that abstracts multiprocessing to a more general scenario.  \nInstead of being constrained to the capabilities of a single machine, Ray can be scaled up to many of them.  \n  \nRoughly what happens when using Ray (on an already existing cluster):  \n- tasks, a Ray flavor of futures (objects with a promise of having a value at some point), are send to the cluster  \n- Ray automatically spawns required number of workers (number of workers is defined implicitly)  \n- results are moved from workers to common object store (shared memory)  \n- results can be pulled back and aggregated    \n  \nTo read more visit e.g.:  \n- [key concepts @ docs.ray](https://docs.ray.io/en/latest/ray-core/key-concepts.html)  \n</details>\n  \n### More advanced cases  \n  \nWhen wrapped function by default takes iterable, autobatch takes care of it:  \n  \n```python\nfrom just_distribute import distribute\n\n@distribute(job='compute', workers=8, autobatch=True) # default autobatch is True, so you can just omit this parameter\ndef intensive_computation(numbers: list[int]):\n    ...\n\na: list[int] = ...\nintensive_computation(a)  # works fine\n```\n  \nWhen wrapped function by default takes equal length iterables:  \n\n```python\nfrom just_distribute import distribute\n\n@distribute(job='compute', workers=8, autobatch=False)  # default autobatch is True\ndef intensive_computation(numbers1: list[int], numbers2: list[int]):\n    for n1, n2 in zip(numbers1, numbers2):\n        ...\n\na: list[int] = ...\nb: list[int] = ...\nintensive_computation(a, b)  # TypeError: 'int' object is not iterable -> because autobatch is off \n# and wrapped function takes iterables as an input\n\n# manually batched\na: list[list[int]] = ...\nb: list[list[int]] = ...\nassert len(a) == len(b)  # True\nassert all([len(_a) == len(_b) for _a, _b in zip(a, b)])  # True -> properly, manually batched data\nintensive_computation(a, b)  # works fine\n\n# or just use default autobatch=True\na: list[int] = ...\nb: list[int] = ...\nintensive_computation(a, b)  # works fine\n```  \n  \nWhen wrapped function by default takes possibly different length iterables:  \n  \n```python\nfrom just_distribute import distribute\nfrom itertools import product\n\n@distribute(job='compute', workers=8, autobatch=False)  # default autobatch is True\ndef intensive_computation(numbers1: list[int], numbers2: list[int]):\n    for n1, n2 in product(numbers1, numbers2):\n        ...\n\n# manually batched    \na: list[list[int]] = ...\nb: list[list[int]] = ...\nintensive_computation(a, b)  # works fine\n\n# or autobatch=True\na: list[int] = ...\nb: list[int] = ...\nintensive_computation(a, numbers2=b)  # works fine in this certain example, because autobatch takes care of numbers1 \n# and numbers2 is treated as a constant\n```  \n  \nWhen wrapped function has mixed type, non-constant (in distribute sense) parameters:  \n  \n```python\nfrom just_distribute import distribute\nfrom collections.abc import Iterable\n\n@distribute(job='compute', workers=8)\ndef intensive_computation(numbers: list[int], power: int, verbose: bool = True):\n    ...    \n\na = list(range(1000)) * 100\nb = range(100)\nassert len(a) > len(b)\nassert len(a) % len(b) == 0  # for every element in b there is N elements in a\nintensive_computation(a, b, verbose=False)  # works fine\n\n# or autobatch=False and data manually batched\na: list[list[int]] = ...\nb: list[int] = ...\nintensive_computation(a, b, verbose=False)  # works fine\n```",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Provides a @distribute decorator that enables concurrent execution of functions without boilerplate code.",
    "version": "0.1.2",
    "project_urls": {
        "Homepage": "https://github.com/jakubgajski/just_distribute",
        "Repository": "https://github.com/jakubgajski/just_distribute"
    },
    "split_keywords": [
        "concurrency",
        " parallelism",
        " concurrent",
        " parallel",
        " decorator",
        " distributed"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "600ae53e83c282b1fae9fd2ba10087772a8ef4c6bc77402815558b3a83620bc9",
                "md5": "76dcd0c20151e673c5377b48f520f26f",
                "sha256": "203353be3cb15e315d91650164a51a6234318115ba9724c940bb88e6092885fe"
            },
            "downloads": -1,
            "filename": "just_distribute-0.1.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "76dcd0c20151e673c5377b48f520f26f",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.10",
            "size": 7792,
            "upload_time": "2024-07-14T08:27:35",
            "upload_time_iso_8601": "2024-07-14T08:27:35.608123Z",
            "url": "https://files.pythonhosted.org/packages/60/0a/e53e83c282b1fae9fd2ba10087772a8ef4c6bc77402815558b3a83620bc9/just_distribute-0.1.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "cb9b18f0b8bdaf3990b857f411c054b4a9d7a29a16321ec7a2a096164a368e97",
                "md5": "764f547ffd5209cf3b7fb3062b24808d",
                "sha256": "962854d1cc6b3f3798fc635129b78c59a01fc39208caacf21a3edb930b8a1134"
            },
            "downloads": -1,
            "filename": "just_distribute-0.1.2.tar.gz",
            "has_sig": false,
            "md5_digest": "764f547ffd5209cf3b7fb3062b24808d",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.10",
            "size": 7296,
            "upload_time": "2024-07-14T08:27:37",
            "upload_time_iso_8601": "2024-07-14T08:27:37.165373Z",
            "url": "https://files.pythonhosted.org/packages/cb/9b/18f0b8bdaf3990b857f411c054b4a9d7a29a16321ec7a2a096164a368e97/just_distribute-0.1.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-07-14 08:27:37",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "jakubgajski",
    "github_project": "just_distribute",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "just-distribute"
}
        
Elapsed time: 0.49959s