Moethread
=======================================
## Table of Contents
* [Overview](#overview)
* [Library Installalion](#library-installalion)
* [Library Usage](#library-usage)
## Overview
Moethread is a python wrapper for the **ThreadPoolExecutor** library to easily multithread resource bound tasks. The library offers a decorator style of parallelizing
function calls.
**NOTE**, this only works for resource bound (API calls, network requests, disk read/write operations, etc) operations. If your task is **CPU** intensive, then this library may not offer much benefit and you're better off exploring other options such as **multiporcessing**.
## Library Installalion
To install the library simply run the following command in a cmd, shell or whatever...
```bash
# Windows
pip install moethread
# Linux
pip3 install moethread
```
## Library usage?
To start, you need to import the library
```python
from moethread import parallel_call
```
If you need to read results back from the parallelized function, then you have to define the internal variables/objects globally where you can
access them outside of that function. The function to parallelize will accept arguments and keyword arguments.
Arguments are primitives/constants/variables that you'd like to pass through to your function. If you'd like to have **counters** inside the parallelized function, then define those globally as shown in the following code snippet.
```python
global counter
counter = 0
```
As for the data which needs to be parallelized, this needs to be specified in the keywords argument. The keyword **data** is reserved for the input data.
The input data is a dictionary collection of whatever needs to run in parallel.
For example if you have a dataset of images and you would like to read those images in parallel and those images have labels, then you have to create a dictionary of image paths and their corrosponding labels. You have to make sure that the two lists are aligned.
```python
image_paths = ["image_0.jpg", "image_1.jpg", ...] # some dummy paths
image_labels = [0, 1, ...] # some dummy labels
assert len(image_paths) == len(image_labels)
# It's your responsiblity to ensure that elements align, e.g. image_labels[0] is the label for image_paths[0]
data = {"image_path": image_paths, "image_label": image_labels}
```
The next step is write the building block of your function. You will add the decorator **@parallel_call** on top of the function and assign **\*args and \*\*kwargs**
as your function parameters. Inside the function, you will read the data dictionary which contains the path to image and its corrosponding label.
```python
@parallel_call # decorator
def function_to_parallelize(*args, **kwargs):
# Define globals...
global counter
# Read data in...
image_path = kwargs.get('data').get('image_path')
image_label = kwargs.get('data').get('image_label')
# Read image
image = cv2.imread(image_path)
if image_label == 1:
counter += 1 # assume images with label == 1 are valid images
## Do whatever you like to do below...
```
Lastly, you will just call the function and specify the number of threads. If you set threads = -1, then the libary will figure out the suitable number of threads for the task.
```python
function_to_parallelize(data=data, threads=-1) # automatically assigns the needed number of threads...
```
Putting it all together.
```python
from moethread import parallel_call
image_paths = ["image_0.jpg", "image_1.jpg", ...] # some paths
image_labels = [0, 1, ...] # some dummy labels
assert len(image_paths) == len(image_labels)
# It's your responsiblity to ensure that elements align, e.g. image_labels[0] is the label for image_paths[0]
data = {"image_path": image_paths, "image_label": image_labels}
global counter
counter = 0
@parallel_call # decorator
def function_to_parallelize(*args, **kwargs):
# Define globals...
global counter
# Read data in...
image_path = kwargs.get('data').get('image_path')
image_label = kwargs.get('data').get('image_label')
# Read image
image = cv2.imread(image_path)
if image_label == 1:
counter += 1 # assume images with label == 1 are valid images
## Do whatever you like to do below...
function_to_parallelize(data=data, threads=-1) # Automatically assigns the needed number of threads...
```
### Another example, Pull-request processing.
This examples shows how to read github pull requests and parse body content and return a list of github users who produced failed pull-requests.
```python
from moethread import parallel_call
global invalid_pulls
github_users = []
invalid_pulls = 0
github_token = ghx_test124
etag = None
params = {'state': 'open'}
pulls = list(self._iter(int(-1), url, repo.pulls.ShortPullRequest, params, etag))
@parallel_call
def process_pulls(*args, **kwargs):
global invalid_pulls
pull = kwargs.get('data').get('pulls')
response = self._get(f'{url}/{pull.number}/reviews', auth=('', github_token))
if response.ok:
reviews = json.loads(response.text)
for review in reviews:
body = review.get('body', '').lower()
err = "failure"
if err in body:
res = self._get(pull.user.url, auth=('', github_token))
if res.ok:
github_user = json.loads(res.text)
github_users.append(github_user.get('login', ''))
invalid_pulls += 1
break
elif response.status_code != 404:
pass
process_pulls(data={"pulls": pulls}, threads=-1)
```
## Ready to go functions
The library is packed with some ready to go functions that can be used to perform several operations using `parallel_call` without having to write code. All you have to do is to call those functions.
- mtdo()
- mtdo_from_json()
- mtdo_from_csv()
```python
def mtdo(....)
"""
Performs a multithreaded data operation.
Args:
src_dir (str): source directory containing data to copy.
dst_dir (str): destination directory to copy data to.
op (str): operation type [cp: copy, mv: move, rm: delete, ren: rename].
file_type (str, optional): type of data to copy, e.g '*.json' - copies json files only. Defaults to all data types '*.*'.
sep_folder (str, optional): separation folder where right side directory structure is appended to destination directory,
e.g. app/data/src/files, sep_folder='data', dest path -> os.path.join(dest_dir, 'src/files'). Defaults to ''.
overwrite (bool, optional): whether to overwrite data in destination or skip already copied data on later trials. Defaults to False.
prefix (str): prefix for image renaming, e.g prefix=data and image_name=im.jpg --> data_im.jpg
threads (int, optional): number of threads to launch. Defaults to 8.
**kwargs: Extra keywords such as (chunk_size: split data into equal sized chunks, verbose: supress moethread stdout), defaults to (chunk_size=5000, verbose=True)
"""
```
```python
def mtdo_from_json(....)
"""Performs a multithreaded data operation for paths in json file.
Args:
file_path (str): input json file containing paths
data_key (str): dictionary key holding file paths
label_key (str): (optional) dictionary key holding labels for folders name to copy/move data to (classifying copied/moved data based on labels)
op (str): operation type [cp: copy, mv: move].
threads (int, optional): number of threads to launch. Defaults to 8.
**kwargs: Extra keywords such as (chunk_size: split data into equal sized chunks, verbose: supress moethread stdout), defaults to (chunk_size=5000, verbose=True)
"""
```
```python
def mtdo_from_csv(....)
"""Performs a multithreaded data operation for paths in csv file.
Args:
file_path (str): input json file containing paths
data_key (str): dictionary key holding file paths
label_key (str): (optional) dictionary key holding labels for folders name to copy/move data to (classifying copied/moved data based on labels)
op (str): operation type [cp: copy, mv: move].
threads (int, optional): number of threads to launch. Defaults to 8.
**kwargs: Extra keywords such as (chunk_size: split data into equal sized chunks, verbose: supress moethread stdout), defaults to (chunk_size=5000, verbose=True)
"""
```
----------------------------------------
Author: Hamdan, Muhammad (@mhamdan91 - آ©)
Raw data
{
"_id": null,
"home_page": "https://github.com/mhamdan91/moethread",
"name": "moethread",
"maintainer": "",
"docs_url": null,
"requires_python": "",
"maintainer_email": "",
"keywords": "python,multithreading,wrappers,decorator,pool,multitasking,easy multithreading,thread,parallel,concurrent",
"author": "mhamdan91 (Hamdan, Muhammad)",
"author_email": "<mhamdan.dev@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/a1/2d/82ca8876af55143ca8e9c2c9f3401939501154ee1c01958f93125d98444d/moethread-1.4.2.tar.gz",
"platform": null,
"description": "Moethread\r\n=======================================\r\n## Table of Contents\r\n\r\n * [Overview](#overview)\r\n * [Library Installalion](#library-installalion)\r\n * [Library Usage](#library-usage)\r\n\r\n\r\n## Overview\r\nMoethread is a python wrapper for the **ThreadPoolExecutor** library to easily multithread resource bound tasks. The library offers a decorator style of parallelizing\r\nfunction calls.\r\n**NOTE**, this only works for resource bound (API calls, network requests, disk read/write operations, etc) operations. If your task is **CPU** intensive, then this library may not offer much benefit and you're better off exploring other options such as **multiporcessing**.\r\n\r\n\r\n## Library Installalion\r\nTo install the library simply run the following command in a cmd, shell or whatever...\r\n\r\n```bash\r\n# Windows\r\npip install moethread\r\n\r\n# Linux\r\npip3 install moethread\r\n```\r\n\r\n## Library usage?\r\nTo start, you need to import the library\r\n\r\n```python\r\nfrom moethread import parallel_call\r\n\r\n```\r\n\r\nIf you need to read results back from the parallelized function, then you have to define the internal variables/objects globally where you can\r\naccess them outside of that function. The function to parallelize will accept arguments and keyword arguments.\r\nArguments are primitives/constants/variables that you'd like to pass through to your function. If you'd like to have **counters** inside the parallelized function, then define those globally as shown in the following code snippet.\r\n```python\r\nglobal counter\r\ncounter = 0\r\n```\r\n\r\n\r\nAs for the data which needs to be parallelized, this needs to be specified in the keywords argument. The keyword **data** is reserved for the input data.\r\nThe input data is a dictionary collection of whatever needs to run in parallel.\r\n\r\nFor example if you have a dataset of images and you would like to read those images in parallel and those images have labels, then you have to create a dictionary of image paths and their corrosponding labels. You have to make sure that the two lists are aligned.\r\n\r\n```python\r\nimage_paths = [\"image_0.jpg\", \"image_1.jpg\", ...] \t# some dummy paths\r\nimage_labels = [0, 1, ...] \t\t # some dummy labels\r\nassert len(image_paths) == len(image_labels)\r\n\r\n# It's your responsiblity to ensure that elements align, e.g. image_labels[0] is the label for image_paths[0]\r\ndata = {\"image_path\": image_paths, \"image_label\": image_labels}\r\n```\r\n\r\nThe next step is write the building block of your function. You will add the decorator **@parallel_call** on top of the function and assign **\\*args and \\*\\*kwargs**\r\nas your function parameters. Inside the function, you will read the data dictionary which contains the path to image and its corrosponding label.\r\n\r\n```python\r\n\r\n@parallel_call # decorator\r\ndef function_to_parallelize(*args, **kwargs):\r\n\t# Define globals...\r\n\tglobal counter\r\n\t# Read data in...\r\n\timage_path = kwargs.get('data').get('image_path')\r\n\timage_label = kwargs.get('data').get('image_label')\r\n\t# Read image\r\n\timage = cv2.imread(image_path)\r\n\tif image_label == 1:\r\n\t\tcounter += 1 # assume images with label == 1 are valid images\r\n\t## Do whatever you like to do below...\r\n\r\n```\r\n\r\nLastly, you will just call the function and specify the number of threads. If you set threads = -1, then the libary will figure out the suitable number of threads for the task.\r\n\r\n```python\r\nfunction_to_parallelize(data=data, threads=-1) # automatically assigns the needed number of threads...\r\n```\r\n\r\nPutting it all together.\r\n\r\n```python\r\nfrom moethread import parallel_call\r\n\r\nimage_paths = [\"image_0.jpg\", \"image_1.jpg\", ...] \t# some paths\r\nimage_labels = [0, 1, ...] \t\t # some dummy labels\r\nassert len(image_paths) == len(image_labels)\r\n\r\n# It's your responsiblity to ensure that elements align, e.g. image_labels[0] is the label for image_paths[0]\r\ndata = {\"image_path\": image_paths, \"image_label\": image_labels}\r\nglobal counter\r\ncounter = 0\r\n\r\n@parallel_call # decorator\r\ndef function_to_parallelize(*args, **kwargs):\r\n\t# Define globals...\r\n\tglobal counter\r\n\t# Read data in...\r\n\timage_path = kwargs.get('data').get('image_path')\r\n\timage_label = kwargs.get('data').get('image_label')\r\n\t# Read image\r\n\timage = cv2.imread(image_path)\r\n\tif image_label == 1:\r\n\t\tcounter += 1 # assume images with label == 1 are valid images\r\n\t## Do whatever you like to do below...\r\n\r\nfunction_to_parallelize(data=data, threads=-1) # Automatically assigns the needed number of threads...\r\n```\r\n\r\n### Another example, Pull-request processing.\r\nThis examples shows how to read github pull requests and parse body content and return a list of github users who produced failed pull-requests.\r\n\r\n```python\r\nfrom moethread import parallel_call\r\n\r\nglobal invalid_pulls\r\ngithub_users = []\r\ninvalid_pulls = 0\r\ngithub_token = ghx_test124\r\netag = None\r\nparams = {'state': 'open'}\r\npulls = list(self._iter(int(-1), url, repo.pulls.ShortPullRequest, params, etag))\r\n@parallel_call\r\ndef process_pulls(*args, **kwargs):\r\n global invalid_pulls\r\n pull = kwargs.get('data').get('pulls')\r\n response = self._get(f'{url}/{pull.number}/reviews', auth=('', github_token))\r\n if response.ok:\r\n reviews = json.loads(response.text)\r\n for review in reviews:\r\n body = review.get('body', '').lower()\r\n err = \"failure\"\r\n if err in body:\r\n res = self._get(pull.user.url, auth=('', github_token))\r\n if res.ok:\r\n github_user = json.loads(res.text)\r\n github_users.append(github_user.get('login', ''))\r\n invalid_pulls += 1\r\n break\r\n elif response.status_code != 404:\r\n pass\r\nprocess_pulls(data={\"pulls\": pulls}, threads=-1)\r\n\r\n```\r\n\r\n## Ready to go functions\r\nThe library is packed with some ready to go functions that can be used to perform several operations using `parallel_call` without having to write code. All you have to do is to call those functions.\r\n- mtdo()\r\n- mtdo_from_json()\r\n- mtdo_from_csv()\r\n\r\n```python\r\ndef mtdo(....)\r\n\t\"\"\"\r\n\tPerforms a multithreaded data operation.\r\n\r\n\tArgs:\r\n\t\tsrc_dir (str): source directory containing data to copy.\r\n\t\tdst_dir (str): destination directory to copy data to.\r\n\t\top (str): operation type [cp: copy, mv: move, rm: delete, ren: rename].\r\n\t\tfile_type (str, optional): type of data to copy, e.g '*.json' - copies json files only. Defaults to all data types '*.*'.\r\n\t\tsep_folder (str, optional): separation folder where right side directory structure is appended to destination directory,\r\n\t\t\t\t\t\t\t\t\te.g. app/data/src/files, sep_folder='data', dest path -> os.path.join(dest_dir, 'src/files'). Defaults to ''.\r\n\t\toverwrite (bool, optional): whether to overwrite data in destination or skip already copied data on later trials. Defaults to False.\r\n\t\tprefix (str): prefix for image renaming, e.g prefix=data and image_name=im.jpg --> data_im.jpg\r\n\t\tthreads (int, optional): number of threads to launch. Defaults to 8.\r\n\t\t**kwargs: Extra keywords such as (chunk_size: split data into equal sized chunks, verbose: supress moethread stdout), defaults to (chunk_size=5000, verbose=True)\r\n\t\"\"\"\r\n```\r\n\r\n```python\r\ndef mtdo_from_json(....)\r\n\t\"\"\"Performs a multithreaded data operation for paths in json file.\r\n\r\n\tArgs:\r\n\t\tfile_path (str): input json file containing paths\r\n\t\tdata_key (str): dictionary key holding file paths\r\n\t\tlabel_key (str): (optional) dictionary key holding labels for folders name to copy/move data to (classifying copied/moved data based on labels)\r\n\t\top (str): operation type [cp: copy, mv: move].\r\n\t\tthreads (int, optional): number of threads to launch. Defaults to 8.\r\n\t\t**kwargs: Extra keywords such as (chunk_size: split data into equal sized chunks, verbose: supress moethread stdout), defaults to (chunk_size=5000, verbose=True)\r\n\t\"\"\"\r\n```\r\n\r\n```python\r\ndef mtdo_from_csv(....)\r\n\t\"\"\"Performs a multithreaded data operation for paths in csv file.\r\n\r\n\tArgs:\r\n\t\tfile_path (str): input json file containing paths\r\n\t\tdata_key (str): dictionary key holding file paths\r\n\t\tlabel_key (str): (optional) dictionary key holding labels for folders name to copy/move data to (classifying copied/moved data based on labels)\r\n\t\top (str): operation type [cp: copy, mv: move].\r\n\t\tthreads (int, optional): number of threads to launch. Defaults to 8.\r\n\t\t**kwargs: Extra keywords such as (chunk_size: split data into equal sized chunks, verbose: supress moethread stdout), defaults to (chunk_size=5000, verbose=True)\r\n\t\"\"\"\r\n```\r\n\r\n----------------------------------------\r\nAuthor: Hamdan, Muhammad (@mhamdan91 - \u0622\u00a9)\r\n",
"bugtrack_url": null,
"license": "",
"summary": "Python wrapper for ThreadPoolExecutor to easily multithread resource bound tasks",
"version": "1.4.2",
"project_urls": {
"Homepage": "https://github.com/mhamdan91/moethread"
},
"split_keywords": [
"python",
"multithreading",
"wrappers",
"decorator",
"pool",
"multitasking",
"easy multithreading",
"thread",
"parallel",
"concurrent"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "960699626e72891611fef054cb59e6e27548c42597477b15c7a8106639e19df5",
"md5": "40b3f857a6fe413802d9844d3e26126c",
"sha256": "95162d298b605856794496ad2091e4adbd99592c4059f9423fd6aa2d225f74b5"
},
"downloads": -1,
"filename": "moethread-1.4.2-py3-none-any.whl",
"has_sig": false,
"md5_digest": "40b3f857a6fe413802d9844d3e26126c",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": null,
"size": 11115,
"upload_time": "2024-02-11T05:31:34",
"upload_time_iso_8601": "2024-02-11T05:31:34.210870Z",
"url": "https://files.pythonhosted.org/packages/96/06/99626e72891611fef054cb59e6e27548c42597477b15c7a8106639e19df5/moethread-1.4.2-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "a12d82ca8876af55143ca8e9c2c9f3401939501154ee1c01958f93125d98444d",
"md5": "115eab914b30a22227b108a9174fa8c0",
"sha256": "86e97b15b41fb9b12a81b5c5f4faf79647b4b64c8f64213cdc580c756087c00e"
},
"downloads": -1,
"filename": "moethread-1.4.2.tar.gz",
"has_sig": false,
"md5_digest": "115eab914b30a22227b108a9174fa8c0",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 11629,
"upload_time": "2024-02-11T05:31:35",
"upload_time_iso_8601": "2024-02-11T05:31:35.927832Z",
"url": "https://files.pythonhosted.org/packages/a1/2d/82ca8876af55143ca8e9c2c9f3401939501154ee1c01958f93125d98444d/moethread-1.4.2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-02-11 05:31:35",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "mhamdan91",
"github_project": "moethread",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"requirements": [],
"lcname": "moethread"
}