ThreadPoolExecutorPlus


NameThreadPoolExecutorPlus JSON
Version 0.2.2 PyPI version JSON
download
home_pagehttps://github.com/GoodManWEN/ThreadPoolExecutorPlus
SummaryA fully replaceable executor that makes it possible to reuse idle threads and shrink thread list when there's no heavy load. - GitHub - GoodManWEN/ThreadPoolExecutorPlus: A fully replaceable executor that makes it possible to reuse idle threads and shrink thread list when there's no heavy load.
upload_time2021-08-29 14:29:54
maintainer
docs_urlNone
authorWEN
requires_python>=3.5
license
keywords concurrent.futures threading multi-threads threadpoolexecutor
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # ThreadPoolExecutorPlus
[![fury](https://badge.fury.io/py/ThreadPoolExecutorPlus.svg)](https://badge.fury.io/py/ThreadPoolExecutorPlus)
[![licence](https://img.shields.io/github/license/GoodManWEN/ThreadPoolExecutorPlus)](https://github.com/GoodManWEN/ThreadPoolExecutorPlus/blob/master/LICENSE)
[![pyversions](https://img.shields.io/pypi/pyversions/ThreadPoolExecutorPlus.svg)](https://pypi.org/project/ThreadPoolExecutorPlus/)
[![Publish](https://github.com/GoodManWEN/ThreadPoolExecutorPlus/workflows/Publish/badge.svg)](https://github.com/GoodManWEN/ThreadPoolExecutorPlus/actions?query=workflow:Publish)
[![Build](https://github.com/GoodManWEN/ThreadPoolExecutorPlus/workflows/Build/badge.svg)](https://github.com/GoodManWEN/ThreadPoolExecutorPlus/actions?query=workflow:Build)

This package provides you a duck typing of concurrent.futures.ThreadPoolExecutor , which has the very similar api and could fully replace ThreadPoolExecutor in your code.

The reason why this pack exists is we would like to solve several specific pain spot of memory control in native python one.

## Feature
- Fully replaceable with concurrent.futures.ThreadPoolExecutor , for example in asyncio.
- Whenever submit a new task , executor will perfer to use existing idle thread rather than create a new one.
- Executor will automatically shrink itself duriung leisure time in order to achieve higher efficiency and less memory.

## Install

    pip install ThreadPoolExecutorPlus

## Usage
Same api as concurrent.futures.ThreadPoolExecutor , with some more control function added:

##### set_daemon_opts(min_workers = None, max_workers = None, keep_alive_time = None)

    In order to guarantee same api interface , new features should be modfied after object created.  
    Could change minimum/maximum activate worker num , and set after how many seconds will the  
    idle thread terminated.   
    By default , min_workers = 4 , max_workers = 16 times cpu_core count on windows and 32x on  
    linux , keep_alive_time = 100s. 

## Example

Very the same code in official doc [#threadpoolexecutor-example](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor-example) , with executor replaced:
```Python3
# requests_test.py
import concurrent.futures
import ThreadPoolExecutorPlus
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

with ThreadPoolExecutorPlus.ThreadPoolExecutor(max_workers=5) as executor:
    # Try modify deamon options
    executor.set_daemon_opts(min_workers = 2 , max_workers = 10 , keep_alive_time = 60)
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))
```

Same code in offcial doc [#executing-code-in-thread-or-process-pools](https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools) with executor replaced:
```Python3
# Runs on python version above 3.7
import asyncio
import ThreadPoolExecutorPlus

def blocking_io():
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)

def cpu_bound():
    return sum(i * i for i in range(10 ** 7))

async def main():
    loop = asyncio.get_running_loop()

    with ThreadPoolExecutorPlus.ThreadPoolExecutor() as pool:
        result1 = await loop.run_in_executor(
            pool, blocking_io)
        result2 = await loop.run_in_executor(
            pool, cpu_bound)
        print('custom thread pool', result1)
        print('custom thread pool', result2)

asyncio.run(main())
```

Feature demo:
```Python3
# feature_demo.py
from ThreadPoolExecutorPlus import ThreadPoolExecutor
import time , datetime

def log(stmt , name = 'MAIN THREAD'):
    print(f"[{datetime.datetime.strftime(datetime.datetime.now() , '%Y-%m-%d %H:%M:%S')}][{name}] {stmt}")

def some_func(arg):
    # does some heavy lifting
    # outputs some results
    log(f"New task triggered in sub thread , sleep {arg} seconds." , 'SUB THREAD ')
    time.sleep(arg)
    log(f"Terminated." , 'SUB THREAD ') 
    return arg

with ThreadPoolExecutor() as executor:
    log(f"max_workers = {executor._max_workers}")
    log(f"min_workers = {executor._min_workers}")
    log("====================================================")

    # We continuously generate tasks which blocks 0.5s every 1 second.
    # Observe its thread control behaviour.
    # Thus find it perfer to reuse existing threads.
    log("Reuse test:")
    for _ in range(10):
        executor.submit(some_func , 0.5)
        time.sleep(1)
        log(f"Current poll size = {len(executor._threads)}")

    log("====================================================")

    # Observe the behaviour after all task done.
    # Controler will make fast reaction after new options set ,
    # and automaticlly shrink no-use threads.
    log("Shrink test:")
    log("Adjust timeout time to 10 seconds.")
    executor.set_daemon_opts(min_workers = 2 , max_workers = 10 , keep_alive_time = 10)
    for _ in range(10):
        executor.submit(some_func , 3)
        time.sleep(0.01)
    log("10 new tasks created.")


    time.sleep(3)
    log("All task done.")

    for _ in range(15):
        log(f"Current poll size = {len(executor._threads)} , {_ + 1}s passed.")
        time.sleep(1)
```



            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/GoodManWEN/ThreadPoolExecutorPlus",
    "name": "ThreadPoolExecutorPlus",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.5",
    "maintainer_email": "",
    "keywords": "concurrent.futures,threading,multi-threads,ThreadPoolExecutor",
    "author": "WEN",
    "author_email": "",
    "download_url": "https://files.pythonhosted.org/packages/68/33/c226855b03ffaeb3d5e27917cbddcd70a884346bb5bae20435f12575f373/ThreadPoolExecutorPlus-0.2.2.tar.gz",
    "platform": "",
    "description": "# ThreadPoolExecutorPlus\n[![fury](https://badge.fury.io/py/ThreadPoolExecutorPlus.svg)](https://badge.fury.io/py/ThreadPoolExecutorPlus)\n[![licence](https://img.shields.io/github/license/GoodManWEN/ThreadPoolExecutorPlus)](https://github.com/GoodManWEN/ThreadPoolExecutorPlus/blob/master/LICENSE)\n[![pyversions](https://img.shields.io/pypi/pyversions/ThreadPoolExecutorPlus.svg)](https://pypi.org/project/ThreadPoolExecutorPlus/)\n[![Publish](https://github.com/GoodManWEN/ThreadPoolExecutorPlus/workflows/Publish/badge.svg)](https://github.com/GoodManWEN/ThreadPoolExecutorPlus/actions?query=workflow:Publish)\n[![Build](https://github.com/GoodManWEN/ThreadPoolExecutorPlus/workflows/Build/badge.svg)](https://github.com/GoodManWEN/ThreadPoolExecutorPlus/actions?query=workflow:Build)\n\nThis package provides you a duck typing of concurrent.futures.ThreadPoolExecutor , which has the very similar api and could fully replace ThreadPoolExecutor in your code.\n\nThe reason why this pack exists is we would like to solve several specific pain spot of memory control in native python one.\n\n## Feature\n- Fully replaceable with concurrent.futures.ThreadPoolExecutor , for example in asyncio.\n- Whenever submit a new task , executor will perfer to use existing idle thread rather than create a new one.\n- Executor will automatically shrink itself duriung leisure time in order to achieve higher efficiency and less memory.\n\n## Install\n\n    pip install ThreadPoolExecutorPlus\n\n## Usage\nSame api as concurrent.futures.ThreadPoolExecutor , with some more control function added:\n\n##### set_daemon_opts(min_workers = None, max_workers = None, keep_alive_time = None)\n\n    In order to guarantee same api interface , new features should be modfied after object created.  \n    Could change minimum/maximum activate worker num , and set after how many seconds will the  \n    idle thread terminated.   \n    By default , min_workers = 4 , max_workers = 16 times cpu_core count on windows and 32x on  \n    linux , keep_alive_time = 100s. \n\n## Example\n\nVery the same code in official doc [#threadpoolexecutor-example](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor-example) , with executor replaced:\n```Python3\n# requests_test.py\nimport concurrent.futures\nimport ThreadPoolExecutorPlus\nimport urllib.request\n\nURLS = ['http://www.foxnews.com/',\n        'http://www.cnn.com/',\n        'http://europe.wsj.com/',\n        'http://www.bbc.co.uk/',\n        'http://some-made-up-domain.com/']\n\ndef load_url(url, timeout):\n    with urllib.request.urlopen(url, timeout=timeout) as conn:\n        return conn.read()\n\nwith ThreadPoolExecutorPlus.ThreadPoolExecutor(max_workers=5) as executor:\n    # Try modify deamon options\n    executor.set_daemon_opts(min_workers = 2 , max_workers = 10 , keep_alive_time = 60)\n    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}\n    for future in concurrent.futures.as_completed(future_to_url):\n        url = future_to_url[future]\n        try:\n            data = future.result()\n        except Exception as exc:\n            print('%r generated an exception: %s' % (url, exc))\n        else:\n            print('%r page is %d bytes' % (url, len(data)))\n```\n\nSame code in offcial doc [#executing-code-in-thread-or-process-pools](https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools) with executor replaced:\n```Python3\n# Runs on python version above 3.7\nimport asyncio\nimport ThreadPoolExecutorPlus\n\ndef blocking_io():\n    with open('/dev/urandom', 'rb') as f:\n        return f.read(100)\n\ndef cpu_bound():\n    return sum(i * i for i in range(10 ** 7))\n\nasync def main():\n    loop = asyncio.get_running_loop()\n\n    with ThreadPoolExecutorPlus.ThreadPoolExecutor() as pool:\n        result1 = await loop.run_in_executor(\n            pool, blocking_io)\n        result2 = await loop.run_in_executor(\n            pool, cpu_bound)\n        print('custom thread pool', result1)\n        print('custom thread pool', result2)\n\nasyncio.run(main())\n```\n\nFeature demo:\n```Python3\n# feature_demo.py\nfrom ThreadPoolExecutorPlus import ThreadPoolExecutor\nimport time , datetime\n\ndef log(stmt , name = 'MAIN THREAD'):\n    print(f\"[{datetime.datetime.strftime(datetime.datetime.now() , '%Y-%m-%d %H:%M:%S')}][{name}] {stmt}\")\n\ndef some_func(arg):\n    # does some heavy lifting\n    # outputs some results\n    log(f\"New task triggered in sub thread , sleep {arg} seconds.\" , 'SUB THREAD ')\n    time.sleep(arg)\n    log(f\"Terminated.\" , 'SUB THREAD ') \n    return arg\n\nwith ThreadPoolExecutor() as executor:\n    log(f\"max_workers = {executor._max_workers}\")\n    log(f\"min_workers = {executor._min_workers}\")\n    log(\"====================================================\")\n\n    # We continuously generate tasks which blocks 0.5s every 1 second.\n    # Observe its thread control behaviour.\n    # Thus find it perfer to reuse existing threads.\n    log(\"Reuse test:\")\n    for _ in range(10):\n        executor.submit(some_func , 0.5)\n        time.sleep(1)\n        log(f\"Current poll size = {len(executor._threads)}\")\n\n    log(\"====================================================\")\n\n    # Observe the behaviour after all task done.\n    # Controler will make fast reaction after new options set ,\n    # and automaticlly shrink no-use threads.\n    log(\"Shrink test:\")\n    log(\"Adjust timeout time to 10 seconds.\")\n    executor.set_daemon_opts(min_workers = 2 , max_workers = 10 , keep_alive_time = 10)\n    for _ in range(10):\n        executor.submit(some_func , 3)\n        time.sleep(0.01)\n    log(\"10 new tasks created.\")\n\n\n    time.sleep(3)\n    log(\"All task done.\")\n\n    for _ in range(15):\n        log(f\"Current poll size = {len(executor._threads)} , {_ + 1}s passed.\")\n        time.sleep(1)\n```\n\n\n",
    "bugtrack_url": null,
    "license": "",
    "summary": "A fully replaceable executor that makes it possible to reuse idle threads and shrink thread list when there's no heavy load. - GitHub - GoodManWEN/ThreadPoolExecutorPlus: A fully replaceable executor that makes it possible to reuse idle threads and shrink thread list when there's no heavy load.",
    "version": "0.2.2",
    "project_urls": {
        "Homepage": "https://github.com/GoodManWEN/ThreadPoolExecutorPlus"
    },
    "split_keywords": [
        "concurrent.futures",
        "threading",
        "multi-threads",
        "threadpoolexecutor"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "5b46396626fccab2c1bbf5798df99eb12e90a329323ca9f420451a94e2663357",
                "md5": "0ed2c6301d948dca6632408def437d54",
                "sha256": "bd5fada94b5563ccea24b2758c9a0629820f0a40815ad03f268794c63ff5e72f"
            },
            "downloads": -1,
            "filename": "ThreadPoolExecutorPlus-0.2.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "0ed2c6301d948dca6632408def437d54",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.5",
            "size": 10961,
            "upload_time": "2021-08-29T14:29:54",
            "upload_time_iso_8601": "2021-08-29T14:29:54.098393Z",
            "url": "https://files.pythonhosted.org/packages/5b/46/396626fccab2c1bbf5798df99eb12e90a329323ca9f420451a94e2663357/ThreadPoolExecutorPlus-0.2.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "6833c226855b03ffaeb3d5e27917cbddcd70a884346bb5bae20435f12575f373",
                "md5": "14c9fd5f720cdfef635cb14307c22e5a",
                "sha256": "aa958057f9ca72892f217c8a3af2b28d3a38d97510ece4189fe82f06ed0c7312"
            },
            "downloads": -1,
            "filename": "ThreadPoolExecutorPlus-0.2.2.tar.gz",
            "has_sig": false,
            "md5_digest": "14c9fd5f720cdfef635cb14307c22e5a",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.5",
            "size": 9251,
            "upload_time": "2021-08-29T14:29:54",
            "upload_time_iso_8601": "2021-08-29T14:29:54.982496Z",
            "url": "https://files.pythonhosted.org/packages/68/33/c226855b03ffaeb3d5e27917cbddcd70a884346bb5bae20435f12575f373/ThreadPoolExecutorPlus-0.2.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2021-08-29 14:29:54",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "GoodManWEN",
    "github_project": "ThreadPoolExecutorPlus",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "requirements": [],
    "lcname": "threadpoolexecutorplus"
}
        
WEN
Elapsed time: 9.37873s