mplite


Namemplite JSON
Version 1.3.0 PyPI version JSON
download
home_pagehttps://github.com/root-11/mplite
SummaryA module that makes multiprocessing easy.
upload_time2024-03-27 10:09:47
maintainerNone
docs_urlNone
authorroot-11
requires_pythonNone
licenseMIT
keywords multiprocessing tasks
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage
            # mplite

![Build status](https://github.com/root-11/mplite/actions/workflows/python-test.yml/badge.svg)
[![codecov](https://codecov.io/gh/root-11/mplite/branch/main/graph/badge.svg?token=QRBR8W5AB3)](https://codecov.io/gh/root-11/mplite)
[![Downloads](https://pepy.tech/badge/mplite)](https://pepy.tech/project/mplite)
[![Downloads](https://pepy.tech/badge/mplite/month)](https://pepy.tech/project/mplite/month)
[![PyPI version](https://badge.fury.io/py/mplite.svg)](https://badge.fury.io/py/mplite)

A light weight wrapper for pythons multiprocessing module that makes multiprocessing easy.

In case anyone is looking for a very easy way to use multiprocessing with args and kwargs, here is a neat wrapper as [mplite](https://pypi.org/project/mplite/):

The [test](https://github.com/root-11/mplite/blob/main/tests/test_basics.py) is also the showcase:

*1. get the imports*

```
from mplite import TaskManager, Task
import time
```

*2. Create the function that each cpu should work on individually.*

```
def f(*args, **kwargs):
    time.sleep(args[0])
    return args[0]/kwargs['hello']
```

*2.1. I also add a function that will fail to illustrate that the TaskManager doesn't crash...*
```
def broken(*args, **kwargs):
    raise NotImplementedError("this task must fail!")
```


*3. create the main function you'd like to run everything from:*

```
def main():
    args = list(range(10)) * 5
    start = time.time()
    
    with TaskManager() as tm:
        # add the first tasks
        tasks = [Task(f, *(arg/10,), **{'hello': arg}) for arg in args]

        print("an example of a tasks is available as string:\n\t", str(tasks[0]), '\n\t', repr(tasks[0]))

        results = tm.execute(tasks)   # this will contain results and tracebacks!
        
        end = time.time()
        print(f"did nothing for {end-start} seconds, producing {len(results)} results")
        print(f"hereof {len([result for result in results if isinstance(result, str) ])} had errors.")
        print(f"the rest where results: {[i for i in results if not isinstance(i,str)]}")
        
        # add more tasks to the SAME pool of workers:
        tasks = [Task(broken, *(i,)) for i in range(3)]
        results = tm.execute(tasks)
        print("More expected errors:")
        for result in results:
            print("expected -->", result)  

if __name__ == "__main__":
    main()
```

*Expected outputs*

```
an example of a tasks is available as string:
	 Task(f=f, *(0.0,), **{'hello': 0}) 
	 Task(f=f, *(0.0,), **{'hello': 0})

  0%|          | 0/50 [00:00<?, ?tasks/s]
  2%|▏         | 1/50 [00:00<00:07,  6.96tasks/s]
  4%|▍         | 2/50 [00:00<00:06,  7.75tasks/s]
  6%|▌         | 3/50 [00:00<00:05,  8.15tasks/s]
 14%|█▍        | 7/50 [00:00<00:03, 14.16tasks/s]
 18%|█▊        | 9/50 [00:00<00:02, 14.36tasks/s]
 24%|██▍       | 12/50 [00:00<00:02, 14.13tasks/s]
 32%|███▏      | 16/50 [00:01<00:01, 17.34tasks/s]
 38%|███▊      | 19/50 [00:01<00:01, 18.03tasks/s]
 42%|████▏     | 21/50 [00:01<00:01, 16.66tasks/s]
 46%|████▌     | 23/50 [00:01<00:01, 15.06tasks/s]
 52%|█████▏    | 26/50 [00:01<00:01, 17.60tasks/s]
 56%|█████▌    | 28/50 [00:01<00:01, 16.86tasks/s]
 62%|██████▏   | 31/50 [00:02<00:01, 16.72tasks/s]
 66%|██████▌   | 33/50 [00:02<00:00, 17.37tasks/s]
 70%|███████   | 35/50 [00:02<00:00, 17.72tasks/s]
 74%|███████▍  | 37/50 [00:02<00:00, 17.52tasks/s]
 80%|████████  | 40/50 [00:02<00:00, 19.88tasks/s]
 86%|████████▌ | 43/50 [00:02<00:00, 15.19tasks/s]
 90%|█████████ | 45/50 [00:02<00:00, 13.69tasks/s]
 94%|█████████▍| 47/50 [00:03<00:00, 14.46tasks/s]
 98%|█████████▊| 49/50 [00:03<00:00, 10.98tasks/s]
100%|██████████| 50/50 [00:03<00:00, 14.40tasks/s]

did nothing for 3.601374387741089 seconds, producing 50 results
hereof 5 had errors.
the rest where results: [0.1, 0.1, 0.0999..., 0.1, 0.1, 0.1, 0.1, 0.0999..., 0.0999..., 0.0999..., 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.0999..., 0.0999..., 0.0999..., 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.0999..., 0.0999..., 0.0999..., 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.0999..., 0.0999..., 0.0999..., 0.1, 0.1, 0.1, 0.1, 0.0999..., 0.0999..., 0.1, 0.1]

  0%|          | 0/3 [00:00<?, ?tasks/s]
100%|██████████| 3/3 [00:00<00:00, 80.66tasks/s]

More expected errors:

expected --> Traceback (most recent call last):
  File "d:\github\mplite\mplite\__init__.py", line 97, in execute
    return self.f(*self.args,**self.kwargs)
  File "d:\github\mplite\tests\test_basics.py", line 36, in broken
    raise NotImplementedError("this task must fail!")
NotImplementedError: this task must fail!

expected --> Traceback (most recent call last):
  File "d:\github\mplite\mplite\__init__.py", line 97, in execute
    return self.f(*self.args,**self.kwargs)
  File "d:\github\mplite\tests\test_basics.py", line 36, in broken
    raise NotImplementedError("this task must fail!")
NotImplementedError: this task must fail!

expected --> Traceback (most recent call last):
  File "d:\github\mplite\mplite\__init__.py", line 97, in execute
    return self.f(*self.args,**self.kwargs)
  File "d:\github\mplite\tests\test_basics.py", line 36, in broken
    raise NotImplementedError("this task must fail!")
NotImplementedError: this task must fail!

```

Note that tasks **can't crash**! In case of exceptions during
task execution, the traceback is captured and the compute
core continues to execute the next task.

### How to test worker functions

Also, if you want to check that the inputs to the task
are formed correctly, you can do the check from the interpreter,
by calling `.execute()` on the task:

```
>>> t = Task(f, *(1,2,3), **{"this":42})
>>> t.execute()
```

### How to handle incremental tasks

From version 1.1.0 it is possible to add tasks incrementally.

Let's say I'd like to solve the pyramid task where I add up all numbers

```
1+2  3+4  5+6  7+8  9+10
 =    =    =    =    = 
 3 +  7    11 + 15   19
   =         =       =
   10        26  +  19
   =             =
   10      +     45
           = 
          55
```

This requires that I:

1. create a queue with 1,2,3,...,10
2. add tasks for the numbers to be added pairwise
3. receive the result
4. when I have a pair of numbers submit them AGAIN.

Here is an example of what the code can look like:
```

def test_incremental_workload():
    with TaskManager() as tm:       
        # 1. create initial workload
        checksum = 55
        for a in range(1,10,2):
            t = Task(adder, a, a+1)
            print(t)
            tm.submit(t)
    
        # 2. create incremental workload
        a,b = None,None
        while True:
            result = tm.take()
            if result is None:
                if tm.open_tasks == 0:
                    break
                else:
                    continue
            
            if a is None:
                a = result
            else:
                b = result
            
            if a and b:
                t = Task(adder, a,b)
                print(t)
                tm.submit(t)
                a,b = None,None

        print(a,b,flush=True)
        assert a == checksum or b == checksum,(a,b,checksum)


```

Output:
```
Task(f=adder, *(1, 2), **{})
Task(f=adder, *(3, 4), **{})
Task(f=adder, *(5, 6), **{})
Task(f=adder, *(7, 8), **{})
Task(f=adder, *(9, 10), **{})
Task(f=adder, *(3, 7), **{})
Task(f=adder, *(11, 15), **{})
Task(f=adder, *(19, 10), **{})
Task(f=adder, *(26, 29), **{})
55 None

```

Use mplite wisely. Executing each tasks has a certain overhead associated with it. 
The fewer the number of tasks and the heavier (computationally) each of them the better.

Example with number of calls with a number of iterations in the call:
```
import multiprocessing
import time
from mplite import TaskManager, Task


def run_calcs_calls(mp_enabled=True, rng=50_000_000, calls=20, cpus=1):
    start = time.perf_counter()
    L = []
    if mp_enabled:
        with TaskManager(cpu_count=cpus) as tm:
            tasks = []
            for call in range(1, calls+1):
                tasks.append(Task(fun, *(call, rng)))
            L = tm.execute(tasks)
    else:
        for call in range(1, calls+1):
            res = fun(call, rng)
            L.append(res)

    task_times = [tm for res, tm in L]
    cpu_count = cpus if mp_enabled else 1
    cpu_task_time = sum(task_times)/cpu_count

    if mp_enabled:
        print('mplite - enabled')
    else:
        print('mplite - disabled')

    print('cpu_count: ', cpu_count)
    print(f'avg. time taken per cpu: ', cpu_task_time)
    end = time.perf_counter()
    total_time = end - start
    print('total time taken: ', total_time)
    print()
    return total_time, cpu_task_time, cpu_count


def fun(call_id, rng):
    # burn some time iterating thru
    start = time.perf_counter()
    t = 0
    for i in range(rng):
        t = i/call_id
    end = time.perf_counter()
    return t, end - start


def test_mplite_performance():    
    # change calls and range to see the knock on effect on performance
    print('========CALLS TEST===========')
    for cpus in [1, multiprocessing.cpu_count()]:
        for ix, (calls, rng) in enumerate([(10, 50_000_000), (2000, 50)], start=1):
            print('calls: ', calls, ', range: ', rng)
            total_time_mp_e, cpu_task_time_mp_e, cpu_count_mp_e = run_calcs_calls(True, rng, calls, cpus)
            total_time_mp_d, cpu_task_time_mp_d, cpu_count_mp_d = run_calcs_calls(False, rng, calls, cpus)
            artifacts = [cpus, calls, rng, total_time_mp_e, cpu_task_time_mp_e, cpu_count_mp_e, total_time_mp_d, cpu_task_time_mp_d, cpu_count_mp_d]
            if cpu_count_mp_e > cpu_count_mp_d:
                if ix == 1: # assert mplite is faster for less calls and heavier process
                    assert total_time_mp_e < total_time_mp_d, artifacts
            else:
                assert True
```

Output:
```
========CALLS TEST===========
calls:  10 , range:  50000000
mplite - enabled
cpu_count:  1
avg. time taken per cpu:  18.5264333
total time taken:  18.8809622

mplite - disabled
cpu_count:  1
avg. time taken per cpu:  18.912037
total time taken:  18.9126078

calls:  2000 , range:  50
mplite - enabled
cpu_count:  1
avg. time taken per cpu:  0.005216900000000357
total time taken:  0.490177800000005

mplite - disabled
cpu_count:  1
avg. time taken per cpu:  0.003248700000142435
total time taken:  0.003983699999999146

calls:  10 , range:  50000000
mplite - enabled
cpu_count:  12
avg. time taken per cpu:  3.410191883333333
total time taken:  4.978601699999999

mplite - disabled
cpu_count:  1
avg. time taken per cpu:  19.312383399999995
total time taken:  19.312710600000003

calls:  2000 , range:  50
mplite - enabled
cpu_count:  12
avg. time taken per cpu:  0.0005722500000000056
total time taken:  0.9079466999999966

mplite - disabled
cpu_count:  1
avg. time taken per cpu:  0.0038669999999427773
total time taken:  0.004872100000000046

```

Example with sleep time in each adder function:
```
import multiprocessing
import time
from mplite import TaskManager, Task


def run_calcs_sleep(mp_enabled, sleep=2, cpus=1):
    args = list(range(20))
    start = time.perf_counter()
    prev_mem = 0
    L = []

    if mp_enabled:
        with TaskManager(cpus) as tm:
            tasks = []
            for arg in args:
                tasks.append(Task(adder, *(prev_mem, arg, sleep)))
                prev_mem = arg
            L = tm.execute(tasks)
    else:
        for arg in args:
            res = adder(prev_mem, arg, sleep)
            L.append(res)
            prev_mem = arg

    end = time.perf_counter()

    cpu_count = cpus if mp_enabled else 1

    if mp_enabled:
        print('mplite - enabled')
    else:
        print('mplite - disabled')

    total_time = end - start
    print('cpu_count: ', cpu_count)
    print('total time taken: ', total_time)
    print()
    return total_time, cpu_count


def adder(a, b, sleep):
    time.sleep(sleep)
    return a+b


def test_mplite_performance():
    # change sleep times to see the knock on effect on performance
    print('========SLEEP TEST===========')
    for cpus in [1, multiprocessing.cpu_count()]:
        for ix, sleep in enumerate([2, 0.02, 0.01], start=1):
            print('sleep timer value: ', sleep)
            total_time_mp_e, cpu_count_mp_e = run_calcs_sleep(True, sleep, cpus)
            total_time_mp_d, cpu_count_mp_d = run_calcs_sleep(False, sleep, cpus)
            artifacts = [cpus, total_time_mp_e, cpu_count_mp_e, total_time_mp_d, cpu_count_mp_d]
            if cpu_count_mp_e > cpu_count_mp_d:
                if ix == 1:  # assert mplite is faster for longer sleep
                    assert total_time_mp_e < total_time_mp_d, artifacts
            else:
                assert True
```

Output:
```
========SLEEP TEST===========
sleep timer value:  2
mplite - enabled
cpu_count:  1
total time taken:  40.4222287

mplite - disabled
cpu_count:  1
total time taken:  40.006973200000004

sleep timer value:  0.02
mplite - enabled
cpu_count:  1
total time taken:  0.7628226999999868

mplite - disabled
cpu_count:  1
total time taken:  0.4116598999999894

sleep timer value:  0.01
mplite - enabled
cpu_count:  1
total time taken:  0.5629501999999889

mplite - disabled
cpu_count:  1
total time taken:  0.21054430000000934

sleep timer value:  2
mplite - enabled
cpu_count:  12
total time taken:  4.821827799999994

mplite - disabled
cpu_count:  1
total time taken:  40.011519899999996

sleep timer value:  0.02
mplite - enabled
cpu_count:  12
total time taken:  0.713870500000013

mplite - disabled
cpu_count:  1
total time taken:  0.41133019999998055

sleep timer value:  0.01
mplite - enabled
cpu_count:  12
total time taken:  0.6938743000000045

Ran 1 test in 192.739s
mplite - disabled
cpu_count:  1
total time taken:  0.20631170000001475



```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/root-11/mplite",
    "name": "mplite",
    "maintainer": null,
    "docs_url": null,
    "requires_python": null,
    "maintainer_email": null,
    "keywords": "multiprocessing, tasks",
    "author": "root-11",
    "author_email": null,
    "download_url": null,
    "platform": "any",
    "description": "# mplite\n\n![Build status](https://github.com/root-11/mplite/actions/workflows/python-test.yml/badge.svg)\n[![codecov](https://codecov.io/gh/root-11/mplite/branch/main/graph/badge.svg?token=QRBR8W5AB3)](https://codecov.io/gh/root-11/mplite)\n[![Downloads](https://pepy.tech/badge/mplite)](https://pepy.tech/project/mplite)\n[![Downloads](https://pepy.tech/badge/mplite/month)](https://pepy.tech/project/mplite/month)\n[![PyPI version](https://badge.fury.io/py/mplite.svg)](https://badge.fury.io/py/mplite)\n\nA light weight wrapper for pythons multiprocessing module that makes multiprocessing easy.\n\nIn case anyone is looking for a very easy way to use multiprocessing with args and kwargs, here is a neat wrapper as [mplite](https://pypi.org/project/mplite/):\n\nThe [test](https://github.com/root-11/mplite/blob/main/tests/test_basics.py) is also the showcase:\n\n*1. get the imports*\n\n```\nfrom mplite import TaskManager, Task\nimport time\n```\n\n*2. Create the function that each cpu should work on individually.*\n\n```\ndef f(*args, **kwargs):\n    time.sleep(args[0])\n    return args[0]/kwargs['hello']\n```\n\n*2.1. I also add a function that will fail to illustrate that the TaskManager doesn't crash...*\n```\ndef broken(*args, **kwargs):\n    raise NotImplementedError(\"this task must fail!\")\n```\n\n\n*3. create the main function you'd like to run everything from:*\n\n```\ndef main():\n    args = list(range(10)) * 5\n    start = time.time()\n    \n    with TaskManager() as tm:\n        # add the first tasks\n        tasks = [Task(f, *(arg/10,), **{'hello': arg}) for arg in args]\n\n        print(\"an example of a tasks is available as string:\\n\\t\", str(tasks[0]), '\\n\\t', repr(tasks[0]))\n\n        results = tm.execute(tasks)   # this will contain results and tracebacks!\n        \n        end = time.time()\n        print(f\"did nothing for {end-start} seconds, producing {len(results)} results\")\n        print(f\"hereof {len([result for result in results if isinstance(result, str) ])} had errors.\")\n        print(f\"the rest where results: {[i for i in results if not isinstance(i,str)]}\")\n        \n        # add more tasks to the SAME pool of workers:\n        tasks = [Task(broken, *(i,)) for i in range(3)]\n        results = tm.execute(tasks)\n        print(\"More expected errors:\")\n        for result in results:\n            print(\"expected -->\", result)  \n\nif __name__ == \"__main__\":\n    main()\n```\n\n*Expected outputs*\n\n```\nan example of a tasks is available as string:\n\t Task(f=f, *(0.0,), **{'hello': 0}) \n\t Task(f=f, *(0.0,), **{'hello': 0})\n\n  0%|          | 0/50 [00:00<?, ?tasks/s]\n  2%|\u258f         | 1/50 [00:00<00:07,  6.96tasks/s]\n  4%|\u258d         | 2/50 [00:00<00:06,  7.75tasks/s]\n  6%|\u258c         | 3/50 [00:00<00:05,  8.15tasks/s]\n 14%|\u2588\u258d        | 7/50 [00:00<00:03, 14.16tasks/s]\n 18%|\u2588\u258a        | 9/50 [00:00<00:02, 14.36tasks/s]\n 24%|\u2588\u2588\u258d       | 12/50 [00:00<00:02, 14.13tasks/s]\n 32%|\u2588\u2588\u2588\u258f      | 16/50 [00:01<00:01, 17.34tasks/s]\n 38%|\u2588\u2588\u2588\u258a      | 19/50 [00:01<00:01, 18.03tasks/s]\n 42%|\u2588\u2588\u2588\u2588\u258f     | 21/50 [00:01<00:01, 16.66tasks/s]\n 46%|\u2588\u2588\u2588\u2588\u258c     | 23/50 [00:01<00:01, 15.06tasks/s]\n 52%|\u2588\u2588\u2588\u2588\u2588\u258f    | 26/50 [00:01<00:01, 17.60tasks/s]\n 56%|\u2588\u2588\u2588\u2588\u2588\u258c    | 28/50 [00:01<00:01, 16.86tasks/s]\n 62%|\u2588\u2588\u2588\u2588\u2588\u2588\u258f   | 31/50 [00:02<00:01, 16.72tasks/s]\n 66%|\u2588\u2588\u2588\u2588\u2588\u2588\u258c   | 33/50 [00:02<00:00, 17.37tasks/s]\n 70%|\u2588\u2588\u2588\u2588\u2588\u2588\u2588   | 35/50 [00:02<00:00, 17.72tasks/s]\n 74%|\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u258d  | 37/50 [00:02<00:00, 17.52tasks/s]\n 80%|\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2588  | 40/50 [00:02<00:00, 19.88tasks/s]\n 86%|\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u258c | 43/50 [00:02<00:00, 15.19tasks/s]\n 90%|\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2588 | 45/50 [00:02<00:00, 13.69tasks/s]\n 94%|\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u258d| 47/50 [00:03<00:00, 14.46tasks/s]\n 98%|\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u258a| 49/50 [00:03<00:00, 10.98tasks/s]\n100%|\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2588| 50/50 [00:03<00:00, 14.40tasks/s]\n\ndid nothing for 3.601374387741089 seconds, producing 50 results\nhereof 5 had errors.\nthe rest where results: [0.1, 0.1, 0.0999..., 0.1, 0.1, 0.1, 0.1, 0.0999..., 0.0999..., 0.0999..., 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.0999..., 0.0999..., 0.0999..., 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.0999..., 0.0999..., 0.0999..., 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.0999..., 0.0999..., 0.0999..., 0.1, 0.1, 0.1, 0.1, 0.0999..., 0.0999..., 0.1, 0.1]\n\n  0%|          | 0/3 [00:00<?, ?tasks/s]\n100%|\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2588| 3/3 [00:00<00:00, 80.66tasks/s]\n\nMore expected errors:\n\nexpected --> Traceback (most recent call last):\n  File \"d:\\github\\mplite\\mplite\\__init__.py\", line 97, in execute\n    return self.f(*self.args,**self.kwargs)\n  File \"d:\\github\\mplite\\tests\\test_basics.py\", line 36, in broken\n    raise NotImplementedError(\"this task must fail!\")\nNotImplementedError: this task must fail!\n\nexpected --> Traceback (most recent call last):\n  File \"d:\\github\\mplite\\mplite\\__init__.py\", line 97, in execute\n    return self.f(*self.args,**self.kwargs)\n  File \"d:\\github\\mplite\\tests\\test_basics.py\", line 36, in broken\n    raise NotImplementedError(\"this task must fail!\")\nNotImplementedError: this task must fail!\n\nexpected --> Traceback (most recent call last):\n  File \"d:\\github\\mplite\\mplite\\__init__.py\", line 97, in execute\n    return self.f(*self.args,**self.kwargs)\n  File \"d:\\github\\mplite\\tests\\test_basics.py\", line 36, in broken\n    raise NotImplementedError(\"this task must fail!\")\nNotImplementedError: this task must fail!\n\n```\n\nNote that tasks **can't crash**! In case of exceptions during\ntask execution, the traceback is captured and the compute\ncore continues to execute the next task.\n\n### How to test worker functions\n\nAlso, if you want to check that the inputs to the task\nare formed correctly, you can do the check from the interpreter,\nby calling `.execute()` on the task:\n\n```\n>>> t = Task(f, *(1,2,3), **{\"this\":42})\n>>> t.execute()\n```\n\n### How to handle incremental tasks\n\nFrom version 1.1.0 it is possible to add tasks incrementally.\n\nLet's say I'd like to solve the pyramid task where I add up all numbers\n\n```\n1+2  3+4  5+6  7+8  9+10\n =    =    =    =    = \n 3 +  7    11 + 15   19\n   =         =       =\n   10        26  +  19\n   =             =\n   10      +     45\n           = \n          55\n```\n\nThis requires that I:\n\n1. create a queue with 1,2,3,...,10\n2. add tasks for the numbers to be added pairwise\n3. receive the result\n4. when I have a pair of numbers submit them AGAIN.\n\nHere is an example of what the code can look like:\n```\n\ndef test_incremental_workload():\n    with TaskManager() as tm:       \n        # 1. create initial workload\n        checksum = 55\n        for a in range(1,10,2):\n            t = Task(adder, a, a+1)\n            print(t)\n            tm.submit(t)\n    \n        # 2. create incremental workload\n        a,b = None,None\n        while True:\n            result = tm.take()\n            if result is None:\n                if tm.open_tasks == 0:\n                    break\n                else:\n                    continue\n            \n            if a is None:\n                a = result\n            else:\n                b = result\n            \n            if a and b:\n                t = Task(adder, a,b)\n                print(t)\n                tm.submit(t)\n                a,b = None,None\n\n        print(a,b,flush=True)\n        assert a == checksum or b == checksum,(a,b,checksum)\n\n\n```\n\nOutput:\n```\nTask(f=adder, *(1, 2), **{})\nTask(f=adder, *(3, 4), **{})\nTask(f=adder, *(5, 6), **{})\nTask(f=adder, *(7, 8), **{})\nTask(f=adder, *(9, 10), **{})\nTask(f=adder, *(3, 7), **{})\nTask(f=adder, *(11, 15), **{})\nTask(f=adder, *(19, 10), **{})\nTask(f=adder, *(26, 29), **{})\n55 None\n\n```\n\nUse mplite wisely. Executing each tasks has a certain overhead associated with it. \nThe fewer the number of tasks and the heavier (computationally) each of them the better.\n\nExample with number of calls with a number of iterations in the call:\n```\nimport multiprocessing\nimport time\nfrom mplite import TaskManager, Task\n\n\ndef run_calcs_calls(mp_enabled=True, rng=50_000_000, calls=20, cpus=1):\n    start = time.perf_counter()\n    L = []\n    if mp_enabled:\n        with TaskManager(cpu_count=cpus) as tm:\n            tasks = []\n            for call in range(1, calls+1):\n                tasks.append(Task(fun, *(call, rng)))\n            L = tm.execute(tasks)\n    else:\n        for call in range(1, calls+1):\n            res = fun(call, rng)\n            L.append(res)\n\n    task_times = [tm for res, tm in L]\n    cpu_count = cpus if mp_enabled else 1\n    cpu_task_time = sum(task_times)/cpu_count\n\n    if mp_enabled:\n        print('mplite - enabled')\n    else:\n        print('mplite - disabled')\n\n    print('cpu_count: ', cpu_count)\n    print(f'avg. time taken per cpu: ', cpu_task_time)\n    end = time.perf_counter()\n    total_time = end - start\n    print('total time taken: ', total_time)\n    print()\n    return total_time, cpu_task_time, cpu_count\n\n\ndef fun(call_id, rng):\n    # burn some time iterating thru\n    start = time.perf_counter()\n    t = 0\n    for i in range(rng):\n        t = i/call_id\n    end = time.perf_counter()\n    return t, end - start\n\n\ndef test_mplite_performance():    \n    # change calls and range to see the knock on effect on performance\n    print('========CALLS TEST===========')\n    for cpus in [1, multiprocessing.cpu_count()]:\n        for ix, (calls, rng) in enumerate([(10, 50_000_000), (2000, 50)], start=1):\n            print('calls: ', calls, ', range: ', rng)\n            total_time_mp_e, cpu_task_time_mp_e, cpu_count_mp_e = run_calcs_calls(True, rng, calls, cpus)\n            total_time_mp_d, cpu_task_time_mp_d, cpu_count_mp_d = run_calcs_calls(False, rng, calls, cpus)\n            artifacts = [cpus, calls, rng, total_time_mp_e, cpu_task_time_mp_e, cpu_count_mp_e, total_time_mp_d, cpu_task_time_mp_d, cpu_count_mp_d]\n            if cpu_count_mp_e > cpu_count_mp_d:\n                if ix == 1: # assert mplite is faster for less calls and heavier process\n                    assert total_time_mp_e < total_time_mp_d, artifacts\n            else:\n                assert True\n```\n\nOutput:\n```\n========CALLS TEST===========\ncalls:  10 , range:  50000000\nmplite - enabled\ncpu_count:  1\navg. time taken per cpu:  18.5264333\ntotal time taken:  18.8809622\n\nmplite - disabled\ncpu_count:  1\navg. time taken per cpu:  18.912037\ntotal time taken:  18.9126078\n\ncalls:  2000 , range:  50\nmplite - enabled\ncpu_count:  1\navg. time taken per cpu:  0.005216900000000357\ntotal time taken:  0.490177800000005\n\nmplite - disabled\ncpu_count:  1\navg. time taken per cpu:  0.003248700000142435\ntotal time taken:  0.003983699999999146\n\ncalls:  10 , range:  50000000\nmplite - enabled\ncpu_count:  12\navg. time taken per cpu:  3.410191883333333\ntotal time taken:  4.978601699999999\n\nmplite - disabled\ncpu_count:  1\navg. time taken per cpu:  19.312383399999995\ntotal time taken:  19.312710600000003\n\ncalls:  2000 , range:  50\nmplite - enabled\ncpu_count:  12\navg. time taken per cpu:  0.0005722500000000056\ntotal time taken:  0.9079466999999966\n\nmplite - disabled\ncpu_count:  1\navg. time taken per cpu:  0.0038669999999427773\ntotal time taken:  0.004872100000000046\n\n```\n\nExample with sleep time in each adder function:\n```\nimport multiprocessing\nimport time\nfrom mplite import TaskManager, Task\n\n\ndef run_calcs_sleep(mp_enabled, sleep=2, cpus=1):\n    args = list(range(20))\n    start = time.perf_counter()\n    prev_mem = 0\n    L = []\n\n    if mp_enabled:\n        with TaskManager(cpus) as tm:\n            tasks = []\n            for arg in args:\n                tasks.append(Task(adder, *(prev_mem, arg, sleep)))\n                prev_mem = arg\n            L = tm.execute(tasks)\n    else:\n        for arg in args:\n            res = adder(prev_mem, arg, sleep)\n            L.append(res)\n            prev_mem = arg\n\n    end = time.perf_counter()\n\n    cpu_count = cpus if mp_enabled else 1\n\n    if mp_enabled:\n        print('mplite - enabled')\n    else:\n        print('mplite - disabled')\n\n    total_time = end - start\n    print('cpu_count: ', cpu_count)\n    print('total time taken: ', total_time)\n    print()\n    return total_time, cpu_count\n\n\ndef adder(a, b, sleep):\n    time.sleep(sleep)\n    return a+b\n\n\ndef test_mplite_performance():\n    # change sleep times to see the knock on effect on performance\n    print('========SLEEP TEST===========')\n    for cpus in [1, multiprocessing.cpu_count()]:\n        for ix, sleep in enumerate([2, 0.02, 0.01], start=1):\n            print('sleep timer value: ', sleep)\n            total_time_mp_e, cpu_count_mp_e = run_calcs_sleep(True, sleep, cpus)\n            total_time_mp_d, cpu_count_mp_d = run_calcs_sleep(False, sleep, cpus)\n            artifacts = [cpus, total_time_mp_e, cpu_count_mp_e, total_time_mp_d, cpu_count_mp_d]\n            if cpu_count_mp_e > cpu_count_mp_d:\n                if ix == 1:  # assert mplite is faster for longer sleep\n                    assert total_time_mp_e < total_time_mp_d, artifacts\n            else:\n                assert True\n```\n\nOutput:\n```\n========SLEEP TEST===========\nsleep timer value:  2\nmplite - enabled\ncpu_count:  1\ntotal time taken:  40.4222287\n\nmplite - disabled\ncpu_count:  1\ntotal time taken:  40.006973200000004\n\nsleep timer value:  0.02\nmplite - enabled\ncpu_count:  1\ntotal time taken:  0.7628226999999868\n\nmplite - disabled\ncpu_count:  1\ntotal time taken:  0.4116598999999894\n\nsleep timer value:  0.01\nmplite - enabled\ncpu_count:  1\ntotal time taken:  0.5629501999999889\n\nmplite - disabled\ncpu_count:  1\ntotal time taken:  0.21054430000000934\n\nsleep timer value:  2\nmplite - enabled\ncpu_count:  12\ntotal time taken:  4.821827799999994\n\nmplite - disabled\ncpu_count:  1\ntotal time taken:  40.011519899999996\n\nsleep timer value:  0.02\nmplite - enabled\ncpu_count:  12\ntotal time taken:  0.713870500000013\n\nmplite - disabled\ncpu_count:  1\ntotal time taken:  0.41133019999998055\n\nsleep timer value:  0.01\nmplite - enabled\ncpu_count:  12\ntotal time taken:  0.6938743000000045\n\nRan 1 test in 192.739s\nmplite - disabled\ncpu_count:  1\ntotal time taken:  0.20631170000001475\n\n\n\n```\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "A module that makes multiprocessing easy.",
    "version": "1.3.0",
    "project_urls": {
        "Homepage": "https://github.com/root-11/mplite"
    },
    "split_keywords": [
        "multiprocessing",
        " tasks"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "bcee5534af5bbfb75d996e41255c448d019dffdd0cc2c5cfb7dae605170b6073",
                "md5": "c3e2010ada4e77fb2778dc067f172db9",
                "sha256": "0a6be89c97850372b229194096ce84946f6beab2e840c12f0fd3ed22816fa089"
            },
            "downloads": -1,
            "filename": "mplite-1.3.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "c3e2010ada4e77fb2778dc067f172db9",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": null,
            "size": 14618,
            "upload_time": "2024-03-27T10:09:47",
            "upload_time_iso_8601": "2024-03-27T10:09:47.317418Z",
            "url": "https://files.pythonhosted.org/packages/bc/ee/5534af5bbfb75d996e41255c448d019dffdd0cc2c5cfb7dae605170b6073/mplite-1.3.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-03-27 10:09:47",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "root-11",
    "github_project": "mplite",
    "travis_ci": false,
    "coveralls": true,
    "github_actions": true,
    "requirements": [],
    "lcname": "mplite"
}
        
Elapsed time: 0.25514s