just-distribute


Namejust-distribute JSON
Version 0.1.1 PyPI version JSON
download
home_pagehttps://github.com/jakubgajski/just_distribute
SummaryProvides a @distribute decorator that enables concurrent execution of functions without boilerplate code.
upload_time2024-06-13 09:33:28
maintainerNone
docs_urlNone
authorJakub Gajski
requires_python<4.0,>=3.10
licenseMIT
keywords concurrency parallelism concurrent parallel decorator distributed
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            ### Overview  
  
Haven't planned your functions for large workloads or don't feel comfortable with various options for concurrency?  
Please, don't be sad - just distribute.

### Installation  
  
```bash
pip install just-distibute
```
   
### TL;DR  
   
```python
from just_distribute import distribute

@distribute(job='compute', workers=8)  # job in ('compute', 'io', 'web', 'ray')
def your_time_consuming_func(*args):
    ...
```
  
### Getting Started  
   
Always make sure your function you want to distribute has proper typehints, because just_distribute makes some 
assumptions based on type annotations. Also, data to be distributed shall be passed as positional arguments, 
keyword arguments are treated as constants
  
#### CPU intensive tasks
  
Instead of:  
  
```python
def some_existing_cpu_intensive_function(x: int, y: int) -> int:
    ...

# slow, probably need to rewrite it ;(
results = []
for const1, const2 in zip(range(1000), range(4000, 2000, -2)):
    results.append(
        some_existing_cpu_intensive_function(const1, const2)    
    )
```
  
Do:  
  
```python
from just_distribute import distribute


@distribute(job='compute', workers=8)
def some_existing_cpu_intensive_function(x: int, y: int) -> int:
    ...

# <happy CPU fan noises>
results = some_existing_cpu_intensive_function(range(1000), range(4000, 2000, -2))
```
  
#### I/O intensive tasks
  
Instead of:  
  
```python
def some_existing_io_intensive_function(data_to_write: bytes, filename: str, verbose: bool = True):
    ...

# slow, probably need to rewrite it ;(
data_store: dict = ...  # some processed data to save
for name, data in data_store.items():
    some_existing_io_intensive_function(data, name)
```
  
Do:  
  
```python
from just_distribute import distribute


@distribute(job='io', workers=8)
def some_existing_io_intensive_function(data_to_write: bytes, filename: str, verbose: bool = True):
    ...

data_store: dict = ...  # some processed data to save
# <happy HDD noises???>
# any keyword arguments are not distributed :)
some_existing_io_intensive_function(data_store.values(), data_store.keys(), verbose=False)
```  
  
#### Somewhere over the network :guitar:
  
Instead of:

```python
def some_existing_web_requesting_function(data_to_send: dict, url: str, api_key: str):
    ...

# slow, probably need to rewrite it ;(
data_store: list[dict] = ...  # some data to process on a remote service
for data in data_store:
    some_existing_web_requesting_function(data, url="https://some_web_api.com/process", api_key="***")
```
  
Do:  
  
```python
from just_distribute import distribute


@distribute(job='web', workers=8)
def some_existing_web_requesting_function(data_to_send: dict, url: str, api_key: str):
    ...

data_store: list[dict] = ...  # some data to process on a remote service
# <happy router blinking>
some_existing_web_requesting_function(data_store, url="https://some_web_api.com/process", api_key="***")
```  
  
#### Or in the existing Ray cluster  
  
Instead of:
  
```python
def some_existing_cpu_intensive_function(x: int, y: int) -> int:
    ...

# slow, probably need to rewrite it ;(
results = []
for const1, const2 in zip(range(1000), range(4000, 2000, -2)):
    results.append(
        some_existing_cpu_intensive_function(const1, const2)    
    )
```
  
Do:  
  
```python
from just_distribute import distribute


@distribute(job='ray')
def some_existing_cpu_intensive_function(x: int, y: int) -> int:
    ...

# <happy CPU fan noises on the cluster's host>
results = some_existing_cpu_intensive_function(range(1000), range(4000, 2000, -2))
```  
  
For instruction how to set up Ray cluster on bare metal or in the cloud, see: [Ray documentation](https://docs.ray.io/en/latest/cluster/vms/getting-started.html)  
  
### More advanced cases  
  
When wrapped function by default takes iterable, autobatch takes care of it:  
  
```python
from just_distribute import distribute

@distribute(job='compute', workers=8, autobatch=True) # default autobatch is True, so you can just omit this parameter
def intensive_computation(numbers: list[int]):
    ...

a: list[int] = ...
intensive_computation(a)  # works fine
```
  
When wrapped function by default takes equal length iterables:  

```python
from just_distribute import distribute

@distribute(job='compute', workers=8, autobatch=False)  # default autobatch is True
def intensive_computation(numbers1: list[int], numbers2: list[int]):
    for n1, n2 in zip(numbers1, numbers2):
        ...

a: list[int] = ...
b: list[int] = ...
intensive_computation(a, b)  # TypeError: 'int' object is not iterable -> because autobatch is off 
# and wrapped function takes iterables as an input

# manually batched
a: list[list[int]] = ...
b: list[list[int]] = ...
assert len(a) == len(b)  # True
assert all([len(_a) == len(_b) for _a, _b in zip(a, b)])  # True -> properly, manually batched data
intensive_computation(a, b)  # works fine

# or just use default autobatch=True
a: list[int] = ...
b: list[int] = ...
intensive_computation(a, b)  # works fine
```  
  
When wrapped function by default takes possibly different length iterables:  
  
```python
from just_distribute import distribute
from itertools import product

@distribute(job='compute', workers=8, autobatch=False)  # default autobatch is True
def intensive_computation(numbers1: list[int], numbers2: list[int]):
    for n1, n2 in product(numbers1, numbers2):
        ...

# manually batched    
a: list[list[int]] = ...
b: list[list[int]] = ...
intensive_computation(a, b)  # works fine

# or autobatch=True
a: list[int] = ...
b: list[int] = ...
intensive_computation(a, numbers2=b)  # works fine in this certain example, because autobatch takes care of numbers1 
# and numbers2 is treated as a constant
```  
  
When wrapped function has mixed type, non-constant (in distribute sense) parameters:  
  
```python
from just_distribute import distribute
from collections.abc import Iterable

@distribute(job='compute', workers=8)
def intensive_computation(numbers: list[int], power: int, verbose: bool = True):
    ...    

a = list(range(1000)) * 100
b = range(100)
assert len(a) > len(b)
assert len(a) % len(b) == 0  # for every element in b there is N elements in a
intensive_computation(a, b, verbose=False)  # works fine

# or autobatch=False and data manually batched
a: list[list[int]] = ...
b: list[int] = ...
intensive_computation(a, b, verbose=False)  # works fine
```
            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/jakubgajski/just_distribute",
    "name": "just-distribute",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.10",
    "maintainer_email": null,
    "keywords": "concurrency, parallelism, concurrent, parallel, decorator, distributed",
    "author": "Jakub Gajski",
    "author_email": "jakub.gajski@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/c1/60/7bbafff61ee334e241cbd34cccf0dd75ebaf19ce76921696e5fecffa724c/just_distribute-0.1.1.tar.gz",
    "platform": null,
    "description": "### Overview  \n  \nHaven't planned your functions for large workloads or don't feel comfortable with various options for concurrency?  \nPlease, don't be sad - just distribute.\n\n### Installation  \n  \n```bash\npip install just-distibute\n```\n   \n### TL;DR  \n   \n```python\nfrom just_distribute import distribute\n\n@distribute(job='compute', workers=8)  # job in ('compute', 'io', 'web', 'ray')\ndef your_time_consuming_func(*args):\n    ...\n```\n  \n### Getting Started  \n   \nAlways make sure your function you want to distribute has proper typehints, because just_distribute makes some \nassumptions based on type annotations. Also, data to be distributed shall be passed as positional arguments, \nkeyword arguments are treated as constants\n  \n#### CPU intensive tasks\n  \nInstead of:  \n  \n```python\ndef some_existing_cpu_intensive_function(x: int, y: int) -> int:\n    ...\n\n# slow, probably need to rewrite it ;(\nresults = []\nfor const1, const2 in zip(range(1000), range(4000, 2000, -2)):\n    results.append(\n        some_existing_cpu_intensive_function(const1, const2)    \n    )\n```\n  \nDo:  \n  \n```python\nfrom just_distribute import distribute\n\n\n@distribute(job='compute', workers=8)\ndef some_existing_cpu_intensive_function(x: int, y: int) -> int:\n    ...\n\n# <happy CPU fan noises>\nresults = some_existing_cpu_intensive_function(range(1000), range(4000, 2000, -2))\n```\n  \n#### I/O intensive tasks\n  \nInstead of:  \n  \n```python\ndef some_existing_io_intensive_function(data_to_write: bytes, filename: str, verbose: bool = True):\n    ...\n\n# slow, probably need to rewrite it ;(\ndata_store: dict = ...  # some processed data to save\nfor name, data in data_store.items():\n    some_existing_io_intensive_function(data, name)\n```\n  \nDo:  \n  \n```python\nfrom just_distribute import distribute\n\n\n@distribute(job='io', workers=8)\ndef some_existing_io_intensive_function(data_to_write: bytes, filename: str, verbose: bool = True):\n    ...\n\ndata_store: dict = ...  # some processed data to save\n# <happy HDD noises???>\n# any keyword arguments are not distributed :)\nsome_existing_io_intensive_function(data_store.values(), data_store.keys(), verbose=False)\n```  \n  \n#### Somewhere over the network :guitar:\n  \nInstead of:\n\n```python\ndef some_existing_web_requesting_function(data_to_send: dict, url: str, api_key: str):\n    ...\n\n# slow, probably need to rewrite it ;(\ndata_store: list[dict] = ...  # some data to process on a remote service\nfor data in data_store:\n    some_existing_web_requesting_function(data, url=\"https://some_web_api.com/process\", api_key=\"***\")\n```\n  \nDo:  \n  \n```python\nfrom just_distribute import distribute\n\n\n@distribute(job='web', workers=8)\ndef some_existing_web_requesting_function(data_to_send: dict, url: str, api_key: str):\n    ...\n\ndata_store: list[dict] = ...  # some data to process on a remote service\n# <happy router blinking>\nsome_existing_web_requesting_function(data_store, url=\"https://some_web_api.com/process\", api_key=\"***\")\n```  \n  \n#### Or in the existing Ray cluster  \n  \nInstead of:\n  \n```python\ndef some_existing_cpu_intensive_function(x: int, y: int) -> int:\n    ...\n\n# slow, probably need to rewrite it ;(\nresults = []\nfor const1, const2 in zip(range(1000), range(4000, 2000, -2)):\n    results.append(\n        some_existing_cpu_intensive_function(const1, const2)    \n    )\n```\n  \nDo:  \n  \n```python\nfrom just_distribute import distribute\n\n\n@distribute(job='ray')\ndef some_existing_cpu_intensive_function(x: int, y: int) -> int:\n    ...\n\n# <happy CPU fan noises on the cluster's host>\nresults = some_existing_cpu_intensive_function(range(1000), range(4000, 2000, -2))\n```  \n  \nFor instruction how to set up Ray cluster on bare metal or in the cloud, see: [Ray documentation](https://docs.ray.io/en/latest/cluster/vms/getting-started.html)  \n  \n### More advanced cases  \n  \nWhen wrapped function by default takes iterable, autobatch takes care of it:  \n  \n```python\nfrom just_distribute import distribute\n\n@distribute(job='compute', workers=8, autobatch=True) # default autobatch is True, so you can just omit this parameter\ndef intensive_computation(numbers: list[int]):\n    ...\n\na: list[int] = ...\nintensive_computation(a)  # works fine\n```\n  \nWhen wrapped function by default takes equal length iterables:  \n\n```python\nfrom just_distribute import distribute\n\n@distribute(job='compute', workers=8, autobatch=False)  # default autobatch is True\ndef intensive_computation(numbers1: list[int], numbers2: list[int]):\n    for n1, n2 in zip(numbers1, numbers2):\n        ...\n\na: list[int] = ...\nb: list[int] = ...\nintensive_computation(a, b)  # TypeError: 'int' object is not iterable -> because autobatch is off \n# and wrapped function takes iterables as an input\n\n# manually batched\na: list[list[int]] = ...\nb: list[list[int]] = ...\nassert len(a) == len(b)  # True\nassert all([len(_a) == len(_b) for _a, _b in zip(a, b)])  # True -> properly, manually batched data\nintensive_computation(a, b)  # works fine\n\n# or just use default autobatch=True\na: list[int] = ...\nb: list[int] = ...\nintensive_computation(a, b)  # works fine\n```  \n  \nWhen wrapped function by default takes possibly different length iterables:  \n  \n```python\nfrom just_distribute import distribute\nfrom itertools import product\n\n@distribute(job='compute', workers=8, autobatch=False)  # default autobatch is True\ndef intensive_computation(numbers1: list[int], numbers2: list[int]):\n    for n1, n2 in product(numbers1, numbers2):\n        ...\n\n# manually batched    \na: list[list[int]] = ...\nb: list[list[int]] = ...\nintensive_computation(a, b)  # works fine\n\n# or autobatch=True\na: list[int] = ...\nb: list[int] = ...\nintensive_computation(a, numbers2=b)  # works fine in this certain example, because autobatch takes care of numbers1 \n# and numbers2 is treated as a constant\n```  \n  \nWhen wrapped function has mixed type, non-constant (in distribute sense) parameters:  \n  \n```python\nfrom just_distribute import distribute\nfrom collections.abc import Iterable\n\n@distribute(job='compute', workers=8)\ndef intensive_computation(numbers: list[int], power: int, verbose: bool = True):\n    ...    \n\na = list(range(1000)) * 100\nb = range(100)\nassert len(a) > len(b)\nassert len(a) % len(b) == 0  # for every element in b there is N elements in a\nintensive_computation(a, b, verbose=False)  # works fine\n\n# or autobatch=False and data manually batched\na: list[list[int]] = ...\nb: list[int] = ...\nintensive_computation(a, b, verbose=False)  # works fine\n```",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Provides a @distribute decorator that enables concurrent execution of functions without boilerplate code.",
    "version": "0.1.1",
    "project_urls": {
        "Homepage": "https://github.com/jakubgajski/just_distribute",
        "Repository": "https://github.com/jakubgajski/just_distribute"
    },
    "split_keywords": [
        "concurrency",
        " parallelism",
        " concurrent",
        " parallel",
        " decorator",
        " distributed"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "ed2f0bdff541aaca78bd053290e769adabc9855095ed82fb6b4cfd3784e0fd67",
                "md5": "596c45a84d2bc70562d5e65ca4a31ca4",
                "sha256": "567385752c0c1f4b5b70e2c6eda07ef497a8bbe7ab2f70dc88f81fef65d32470"
            },
            "downloads": -1,
            "filename": "just_distribute-0.1.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "596c45a84d2bc70562d5e65ca4a31ca4",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.10",
            "size": 6339,
            "upload_time": "2024-06-13T09:33:24",
            "upload_time_iso_8601": "2024-06-13T09:33:24.647689Z",
            "url": "https://files.pythonhosted.org/packages/ed/2f/0bdff541aaca78bd053290e769adabc9855095ed82fb6b4cfd3784e0fd67/just_distribute-0.1.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "c1607bbafff61ee334e241cbd34cccf0dd75ebaf19ce76921696e5fecffa724c",
                "md5": "7931c95ba5053815fb54012bed14a2ba",
                "sha256": "40cea1639195976b309053e49c904a1332301acbfbc0dee86a514b087023bc74"
            },
            "downloads": -1,
            "filename": "just_distribute-0.1.1.tar.gz",
            "has_sig": false,
            "md5_digest": "7931c95ba5053815fb54012bed14a2ba",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.10",
            "size": 5894,
            "upload_time": "2024-06-13T09:33:28",
            "upload_time_iso_8601": "2024-06-13T09:33:28.313782Z",
            "url": "https://files.pythonhosted.org/packages/c1/60/7bbafff61ee334e241cbd34cccf0dd75ebaf19ce76921696e5fecffa724c/just_distribute-0.1.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-06-13 09:33:28",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "jakubgajski",
    "github_project": "just_distribute",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "just-distribute"
}
        
Elapsed time: 0.26836s