progress-executor


Nameprogress-executor JSON
Version 0.5 PyPI version JSON
download
home_pagehttps://github.com/JulienBrn/progress_executor
SummarySubclasses of concurrent.future.Executor that correctly handle cancelling and progress
upload_time2023-09-15 08:31:32
maintainer
docs_urlNone
authorJulien Braine
requires_python>=3.11
licenseMIT
keywords python
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Known Issues

We believe the current version of tqdm has a bug when the number of instances decreases. 
If you are using future.add_tqdm_callback for progress, you should modify the _decr_instances method in tqdm/std.py line 690 of tqdm to the following (as described in [this issue](https://github.com/tqdm/tqdm/issues/1496)):


```python
@classmethod
def _decr_instances(cls, instance):
    """
    Remove from list and reposition another unfixed bar
    to fill the new gap.

    This means that by default (where all nested bars are unfixed),
    order is not maintained but screen flicker/blank space is minimised.
    (tqdm<=4.44.1 moved ALL subsequent unfixed bars up.)
    """
    with cls._lock:
        try:
            cls._instances.remove(instance)
        except KeyError:
            # if not instance.gui:  # pragma: no cover
            #     raise
            pass  # py2: maybe magically removed already
        # else:
        if not instance.gui:
            #CUSTOM Addition
            for inst in cls._instances:
                pos = getattr(inst, "pos", 0)
                adjust = 1 if pos < 0 else -1
                if pos and abs(pos) > abs(instance.pos):
                    inst.pos += adjust
            #END of CUSTOM Addition
            last = (instance.nrows or 20) - 1
            # find unfixed (`pos >= 0`) overflow (`pos >= nrows - 1`)
            instances = list(filter(
                lambda i: hasattr(i, "pos") and last <= i.pos,
                cls._instances))
            # set first found to current `pos`
            if instances:
                inst = min(instances, key=lambda i: i.pos)
                inst.clear(nolock=True)
                inst.pos = abs(instance.pos)
```

# Motivation

While Concurrent.futures enables one to seemlessly launch with different executors ([threadpool and processpool](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor)) the same code,
it is non trivial to add the two following features:
1. Handling cancellation of a task currently executing
2. Handling progress of a task from elsewhere (i.e. one can print a tqdm bar within the task, but what if one wishes to aggregate progress from different tasks?)

Furthermore, we additionally add a new executor that behaves in "sync" (no threads, no processes) for testing purposes

A full example is provided at the bottom of this page.



# Problem

Correctly handling cancellation is more complicated than can be expected: 
a thread cannot be cancelled from the outside without harm.

As for progress, as soon as one wishes to do computation on the progress of different tasks one needs to retrieve the data from the different processes.
The synchronization aspects are not extremelly difficult but are still error prone.

# Solution

## The big picture

While cancellation of processes could probably (see alternatives) be handled by a reimplementation of the [concurrent.futures.ProcessPoolExecutor]((https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor)), we do not see a good way to do so for the ThreadPoolExecutor. As we wish to have a uniform API with similar semantics, we opted for another solution.

This solution has the drawback of requiring the programmer to give regurlar progress information, but we believe this is a relatively low drawback as the programmer should already be doing so. 
We then use the regurlar progress calls to check for cancellation and cancel from within the task if necessary.


## A limitation for progress calls

As we wish progress updates to happen in the main thread, and yet not block whatever else the main thread is doing, we implemented it as a async coroutine.
This means that the main thread should be running as asyncio loop (this is probably already the case if you wanted to use concurrent.futures) where fairness is respected.


# API



## Using ProgressExecutors 

Our executors inherit for the concurrent.futures.Executor. We provide three executors that can be created (for example) with the following lines:

```python
from progress_executor import ThreadPoolProgressExecutor, ProcessPoolProgressExecutor, SyncProgressExecutor

tp = ThreadPoolProgressExecutor(max_workers =3)
pp = ProcessPoolProgressExecutor(max_workers =3)
se = SyncProgressExecutor()
```


To create tasks, one should use the submit function of concurrent.futures.Executor, and we highly recommand creating a task from that future (as suggested by modern asyncio).
The use of asyncio.TaskGroup in an async with block is highly recommanded (see asyncio docs) 

```python
with executor:
    async with asyncio.TaskGroup() as tg:
        future1 = executor.submit(f1, *args, **kwargs)
        task1 = tg.create_task(future1)

        future2 = executor.submit(f2, *args, **kwargs)
        task2 = tg.create_task(future2)
```

The following lines runs the task, but without any changes compared to using the executors of concurrent.futures (except for cancellation). This is because we have not taken advantage that the futures returned by a the submit method of ProgressExecutor are ProgressFuture.

## ProgressFuture Method

To correctly use ProgressFuture, there three modifications that are required:

1. Use  `tg.create_task(future.check_for_progress())` instead of `tg.create_task(future)`. This adds the progress callbacks to the asyncio loop.
2. Add callbacks on progress. You can either use `future.add_progress_callback(old_state, new_state)`, where state is a dictionary with items `n, total, status`,
or use `future.add_tqdm_callback(tqdm_cls=tqdm.tqdm, init_kwargs = {}, trigger: Set[Literal["now", "running", "cancelled"]] = {"now"} )` where `tqdm_cls` should be a class/function (not instance)
that has similar API to tqdm.tqdm, `init_kwargs` is the initializer arguments for `tqdm_cls` and `trigger` states when `tqdm_cls(**init_kwargs)` is called.
3. Add a progress argument to your function.

## Full Example

```python
import logging, beautifullogger
import sys, time, asyncio, tqdm
from progress_executor import *

logger = logging.getLogger(__name__)


def long_compute(n):
    tot = 17
    for i in range(int(n*25000000)):
        tot = tot//2 if tot % 2 ==0 else 3*tot+1
    return tot


def f(n, progress: Updater):
    progress.total = n
    for i in progress(range(2*n)): #you can use progress directly on an iterator
        if i %2 ==0:
            long_compute(0.1)
        else:
            time.sleep(0.1)
    return n

tp = ThreadPoolProgressExecutor(max_workers =3)
pp = ProcessPoolProgressExecutor(max_workers =3)
se = SyncProgressExecutor()
executor = tp #Change here to see the differences


async def main():
    vals = [30, 40, 35, 60, 20, 50, 38, 27]*2
    try:
        with executor:
            async with asyncio.TaskGroup() as tg:
                tasks=[]
                for i, val in enumerate(vals):
                    fut = executor.submit(f, val)
                    fut.add_tqdm_callback(tqdm.tqdm, dict(desc=f"Task {i}"), triggers=["now", "running", "cancelled"])
                    tasks.append(tg.create_task(fut.check_for_progress()))
                #See what happends if you uncomment these two lines
                # await asyncio.sleep(2) 
                # tasks[0].cancel()
    finally:
        print("FINISHED")
        for i, (val,task) in enumerate(zip(vals, tasks)):
            print(f"Task {i} with val={val} has result {'cancelled' if task.cancelled() else task.result()}")
    

if __name__ == "__main__": #Necessary due to multiprocessing
    beautifullogger.setup(warning_level_modules=["asyncio"]) #Just for pretty printing
    logger.info("Running start")
    asyncio.run(main())
    logger.info("Running end")
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/JulienBrn/progress_executor",
    "name": "progress-executor",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.11",
    "maintainer_email": "",
    "keywords": "python",
    "author": "Julien Braine",
    "author_email": "julienbraine@yahoo.fr",
    "download_url": "https://files.pythonhosted.org/packages/fe/a0/3044b85b439cab38946b031a660857e3a4231e566c677cad79d2e32c3b21/progress_executor-0.5.tar.gz",
    "platform": null,
    "description": "# Known Issues\n\nWe believe the current version of tqdm has a bug when the number of instances decreases. \nIf you are using future.add_tqdm_callback for progress, you should modify the _decr_instances method in tqdm/std.py line 690 of tqdm to the following (as described in [this issue](https://github.com/tqdm/tqdm/issues/1496)):\n\n\n```python\n@classmethod\ndef _decr_instances(cls, instance):\n    \"\"\"\n    Remove from list and reposition another unfixed bar\n    to fill the new gap.\n\n    This means that by default (where all nested bars are unfixed),\n    order is not maintained but screen flicker/blank space is minimised.\n    (tqdm<=4.44.1 moved ALL subsequent unfixed bars up.)\n    \"\"\"\n    with cls._lock:\n        try:\n            cls._instances.remove(instance)\n        except KeyError:\n            # if not instance.gui:  # pragma: no cover\n            #     raise\n            pass  # py2: maybe magically removed already\n        # else:\n        if not instance.gui:\n            #CUSTOM Addition\n            for inst in cls._instances:\n                pos = getattr(inst, \"pos\", 0)\n                adjust = 1 if pos < 0 else -1\n                if pos and abs(pos) > abs(instance.pos):\n                    inst.pos += adjust\n            #END of CUSTOM Addition\n            last = (instance.nrows or 20) - 1\n            # find unfixed (`pos >= 0`) overflow (`pos >= nrows - 1`)\n            instances = list(filter(\n                lambda i: hasattr(i, \"pos\") and last <= i.pos,\n                cls._instances))\n            # set first found to current `pos`\n            if instances:\n                inst = min(instances, key=lambda i: i.pos)\n                inst.clear(nolock=True)\n                inst.pos = abs(instance.pos)\n```\n\n# Motivation\n\nWhile Concurrent.futures enables one to seemlessly launch with different executors ([threadpool and processpool](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor)) the same code,\nit is non trivial to add the two following features:\n1. Handling cancellation of a task currently executing\n2. Handling progress of a task from elsewhere (i.e. one can print a tqdm bar within the task, but what if one wishes to aggregate progress from different tasks?)\n\nFurthermore, we additionally add a new executor that behaves in \"sync\" (no threads, no processes) for testing purposes\n\nA full example is provided at the bottom of this page.\n\n\n\n# Problem\n\nCorrectly handling cancellation is more complicated than can be expected: \na thread cannot be cancelled from the outside without harm.\n\nAs for progress, as soon as one wishes to do computation on the progress of different tasks one needs to retrieve the data from the different processes.\nThe synchronization aspects are not extremelly difficult but are still error prone.\n\n# Solution\n\n## The big picture\n\nWhile cancellation of processes could probably (see alternatives) be handled by a reimplementation of the [concurrent.futures.ProcessPoolExecutor]((https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor)), we do not see a good way to do so for the ThreadPoolExecutor. As we wish to have a uniform API with similar semantics, we opted for another solution.\n\nThis solution has the drawback of requiring the programmer to give regurlar progress information, but we believe this is a relatively low drawback as the programmer should already be doing so. \nWe then use the regurlar progress calls to check for cancellation and cancel from within the task if necessary.\n\n\n## A limitation for progress calls\n\nAs we wish progress updates to happen in the main thread, and yet not block whatever else the main thread is doing, we implemented it as a async coroutine.\nThis means that the main thread should be running as asyncio loop (this is probably already the case if you wanted to use concurrent.futures) where fairness is respected.\n\n\n# API\n\n\n\n## Using ProgressExecutors \n\nOur executors inherit for the concurrent.futures.Executor. We provide three executors that can be created (for example) with the following lines:\n\n```python\nfrom progress_executor import ThreadPoolProgressExecutor, ProcessPoolProgressExecutor, SyncProgressExecutor\n\ntp = ThreadPoolProgressExecutor(max_workers =3)\npp = ProcessPoolProgressExecutor(max_workers =3)\nse = SyncProgressExecutor()\n```\n\n\nTo create tasks, one should use the submit function of concurrent.futures.Executor, and we highly recommand creating a task from that future (as suggested by modern asyncio).\nThe use of asyncio.TaskGroup in an async with block is highly recommanded (see asyncio docs) \n\n```python\nwith executor:\n    async with asyncio.TaskGroup() as tg:\n        future1 = executor.submit(f1, *args, **kwargs)\n        task1 = tg.create_task(future1)\n\n        future2 = executor.submit(f2, *args, **kwargs)\n        task2 = tg.create_task(future2)\n```\n\nThe following lines runs the task, but without any changes compared to using the executors of concurrent.futures (except for cancellation). This is because we have not taken advantage that the futures returned by a the submit method of ProgressExecutor are ProgressFuture.\n\n## ProgressFuture Method\n\nTo correctly use ProgressFuture, there three modifications that are required:\n\n1. Use  `tg.create_task(future.check_for_progress())` instead of `tg.create_task(future)`. This adds the progress callbacks to the asyncio loop.\n2. Add callbacks on progress. You can either use `future.add_progress_callback(old_state, new_state)`, where state is a dictionary with items `n, total, status`,\nor use `future.add_tqdm_callback(tqdm_cls=tqdm.tqdm, init_kwargs = {}, trigger: Set[Literal[\"now\", \"running\", \"cancelled\"]] = {\"now\"} )` where `tqdm_cls` should be a class/function (not instance)\nthat has similar API to tqdm.tqdm, `init_kwargs` is the initializer arguments for `tqdm_cls` and `trigger` states when `tqdm_cls(**init_kwargs)` is called.\n3. Add a progress argument to your function.\n\n## Full Example\n\n```python\nimport logging, beautifullogger\nimport sys, time, asyncio, tqdm\nfrom progress_executor import *\n\nlogger = logging.getLogger(__name__)\n\n\ndef long_compute(n):\n    tot = 17\n    for i in range(int(n*25000000)):\n        tot = tot//2 if tot % 2 ==0 else 3*tot+1\n    return tot\n\n\ndef f(n, progress: Updater):\n    progress.total = n\n    for i in progress(range(2*n)): #you can use progress directly on an iterator\n        if i %2 ==0:\n            long_compute(0.1)\n        else:\n            time.sleep(0.1)\n    return n\n\ntp = ThreadPoolProgressExecutor(max_workers =3)\npp = ProcessPoolProgressExecutor(max_workers =3)\nse = SyncProgressExecutor()\nexecutor = tp #Change here to see the differences\n\n\nasync def main():\n    vals = [30, 40, 35, 60, 20, 50, 38, 27]*2\n    try:\n        with executor:\n            async with asyncio.TaskGroup() as tg:\n                tasks=[]\n                for i, val in enumerate(vals):\n                    fut = executor.submit(f, val)\n                    fut.add_tqdm_callback(tqdm.tqdm, dict(desc=f\"Task {i}\"), triggers=[\"now\", \"running\", \"cancelled\"])\n                    tasks.append(tg.create_task(fut.check_for_progress()))\n                #See what happends if you uncomment these two lines\n                # await asyncio.sleep(2) \n                # tasks[0].cancel()\n    finally:\n        print(\"FINISHED\")\n        for i, (val,task) in enumerate(zip(vals, tasks)):\n            print(f\"Task {i} with val={val} has result {'cancelled' if task.cancelled() else task.result()}\")\n    \n\nif __name__ == \"__main__\": #Necessary due to multiprocessing\n    beautifullogger.setup(warning_level_modules=[\"asyncio\"]) #Just for pretty printing\n    logger.info(\"Running start\")\n    asyncio.run(main())\n    logger.info(\"Running end\")\n```\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Subclasses of concurrent.future.Executor that correctly handle cancelling and progress",
    "version": "0.5",
    "project_urls": {
        "Download": "https://github.com/JulienBrn/progress_executor.git",
        "Homepage": "https://github.com/JulienBrn/progress_executor"
    },
    "split_keywords": [
        "python"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "fea03044b85b439cab38946b031a660857e3a4231e566c677cad79d2e32c3b21",
                "md5": "b8d22f349af49a4c429fbe704957979d",
                "sha256": "07bfa2e53d8afe565933251005f55ce027c9093eb8078a3ea3f64cbee11707d8"
            },
            "downloads": -1,
            "filename": "progress_executor-0.5.tar.gz",
            "has_sig": false,
            "md5_digest": "b8d22f349af49a4c429fbe704957979d",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.11",
            "size": 11131,
            "upload_time": "2023-09-15T08:31:32",
            "upload_time_iso_8601": "2023-09-15T08:31:32.404858Z",
            "url": "https://files.pythonhosted.org/packages/fe/a0/3044b85b439cab38946b031a660857e3a4231e566c677cad79d2e32c3b21/progress_executor-0.5.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-09-15 08:31:32",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "JulienBrn",
    "github_project": "progress_executor",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "progress-executor"
}
        
Elapsed time: 0.30916s