distwq


Namedistwq JSON
Version 1.1.0 PyPI version JSON
download
home_pagehttps://github.com/iraikov/distwq
SummaryDistributed queue operations with mpi4py
upload_time2023-02-03 22:32:54
maintainer
docs_urlNone
authorIvan Raikov
requires_python>=3.8,<4.0
licenseGPL-3.0-or-later
keywords mpi mpi4py distributed computing distributed queue
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # distwq

Distributed work queue operations using mpi4py.

Allows for easy parallelization in controller/worker mode with one
controller submitting function or method calls to workers.  Supports
multiple ranks per worker (collective workers). Uses mpi4py if
available, otherwise processes calls sequentially in one process.

Based on mpi.py from the pyunicorn project.


## EXAMPLE

```python

# Example of using distributed work queue distwq
# PYTHONPATH must include the directories in which distwq and this file are located.

import distwq
import numpy as np  
import scipy
from scipy import signal

def do_work(freq):
    fs = 10e3
    N = 1e5
    amp = 2*np.sqrt(2)
    freq = float(freq)
    noise_power = 0.001 * fs / 2
    time = np.arange(N) / fs
    x = amp*np.sin(2*np.pi*freq*time)
    x += np.random.normal(scale=np.sqrt(noise_power), size=time.shape)
    f, pdens = signal.periodogram(x, fs)
    return f, pdens


def main(controller):
    n = 150
    for i in range(0, n):
        controller.submit_call("do_work", (i+1,), module_name="example_distwq")
    s = []
    for i in range(0, n):
        s.append(controller.get_next_result())
    print("results length : %d" % len(s))
    print(s)
    controller.info()

if __name__ == '__main__':
    if distwq.is_controller:
        distwq.run(fun_name="main", verbose=True, nprocs_per_worker=3)
    else:
        distwq.run(fun_name=None, verbose=True, nprocs_per_worker=3)

```

## API

### MPIController

#### submit\_call

```python
 submit_call(name_to_call, args=(), kwargs={}, module_name="__main__", time_est=1, task_id=None)
```

Submit a call for parallel execution.

If called by the controller and workers are available, the call is submitted
to a worker for asynchronous execution.

If called by a worker or if no workers are available, the call is instead
executed synchronously on this MPI node.

**Examples:**

1. Provide ids and time estimate explicitly:

```python
for n in range(0,10):
distwq.submit_call("doit", (n,A[n]), id=n, time_est=n**2)

for n in range(0,10):
result[n] = distwq.get_result(n)
```

2. Use generated ids stored in a list:

```python
for n in range(0,10):
ids.append(distwq.submit_call("doit", (n,A[n])))

for n in range(0,10):
results.append(distwq.get_result(ids.pop()))
```

3. Ignore ids altogether:

```python
for n in range(0,10):
distwq.submit_call("doit", (n,A[n]))

for n in range(0,10):
results.append(distwq.get_next_result())
```

4. Call a module function and use keyword arguments:

```python

distwq.submit_call("solve", (), {"a":a, "b":b},
module="numpy.linalg")
```

- name\_to\_call (str): name of callable object (usually a function or
static method of a class) as contained in the namespace specified
by module.
- args (tuple): the positional arguments to provide to the callable
object.  Tuples of length 1 must be written (arg,).  Default: ()
- kwargs (dict): the keyword arguments to provide to the callable
object.  Default: {}
- module (str): optional name of the imported module or submodule in
whose namespace the callable object is contained. For objects
defined on the script level, this is "__main__", for objects
defined in an imported package, this is the package name. Must be a
key of the dictionary sys.modules (check there after import if in
doubt).  Default: "__main__"
- time_est (float): estimated relative completion time for this call;
used to find a suitable worker. Default: 1
-  id (int or None): unique id for this call. Must be a possible dictionary key.
If None, a random id is assigned and returned. Can be re-used after
get_result() for this is. Default: None
- worker: int > 0 and < comm.size, or None: optional no. of worker to assign the call to. If None, the
call is assigned to the worker with the smallest current total time
estimate. Default: None

**Returns**:

id of call, to be used in get_result().

#### get\_result

```python
 get_result(task_id)
```

Returns result of earlier submitted call.

Can only be called by the controller.

If the call is not yet finished, waits for it to finish.
Results should be collected in the same order as calls were submitted.
For each worker, the results of calls assigned to that worker must be
collected in the same order as those calls were submitted.
Can only be called once per call.

- id (int) : id of an earlier submitted call, as provided to or returned by submit_call().

**Returns**:

return value of call.

#### get\_next\_result

```python
 get_next_result()
```

Returns result of next earlier submitted call whose result has not yet
been obtained.

Can only be called by the controller.

If the call is not yet finished, waits for it to finish.

**Returns**:

id, return value of call, or None of there are no more calls in the queue.

#### info

```python
 info()
```

Print processing statistics.

Can only be called by the controller.

#### exit

```python
 exit()
```

Tells all workers to exit.

Can only be called by the controller.

#### abort

```python
 abort()
```

Abort execution on all MPI nodes immediately.

Can be called by controller and workers.

### MPIWorker

#### serve

```python
 serve()
```

Serves submitted calls until told to finish.

Call this function if workers need to perform initialization
different from the controller, like this:

```python
 def workerfun(worker):
     do = whatever + initialization - is * necessary
     worker.serve()
     do = whatever + cleanup - is * necessary
```

If workerfun() is not defined, serve() will be called automatically by run().


### MPICollectiveBroker

#### serve

```python
 serve()
```

Broker and serve submitted calls until told to finish. A task
is received from the controller and sent to all collective
workers associated with this broker via scatter.

Call this function if workers need to perform initialization
different from the controller, like this:

```python
def workerfun(worker):
    do = whatever + initialization - is * necessary
    worker.serve()
     do = whatever + cleanup - is * necessary
```

If workerfun() is not defined, serve() will be called automatically by
run().

### Procedures

#### run

```python
run(fun_name=None, module_name='__main__', verbose=False, worker_grouping_method=None, nprocs_per_worker=1, broker_is_worker=False, args=())
```

Runs in controller/worker mode until fun(controller/worker) finishes.

Must be called on all MPI nodes.

On the controller, run() calls fun\_name() and returns when fun\_name() returns.

On each worker, run() calls fun() if that is defined, or calls serve()
otherwise, and returns when fun() returns, or when fun() returns on
the controller, or when controller calls exit().

- module\_name (string): module where fun_name is located
- verbose (bool): whether processing information should be printed.
- worker\_grouping\_method (str): whether to separate worker processes into groups via MPI\_Comm\_Spawn ("spawn") or MPI\_Comm\_Split ("split")
- nprocs\_per\_worker (int): how many processes per worker
- broker\_is\_worker (bool): when worker\_grouping\_method is "spawn" or "split" and nprocs\_per\_worker > 1, MPI\_Comm\_Spawn or MPI\_Comm\_split will be used to create workers, and a CollectiveBroker object is used to relay tasks and results between controller and worker. When broker\_is\_worker is true, the broker also participates in serving tasks, otherwise it only relays calls.
- args (tuple): additional args to pass to fun

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/iraikov/distwq",
    "name": "distwq",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.8,<4.0",
    "maintainer_email": "",
    "keywords": "MPI,mpi4py,distributed computing,distributed queue",
    "author": "Ivan Raikov",
    "author_email": "ivan.g.raikov@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/f1/97/a99ca6c9ffac04a26c1452a500e74fbb83e613ad3390136b867c806a622f/distwq-1.1.0.tar.gz",
    "platform": null,
    "description": "# distwq\n\nDistributed work queue operations using mpi4py.\n\nAllows for easy parallelization in controller/worker mode with one\ncontroller submitting function or method calls to workers.  Supports\nmultiple ranks per worker (collective workers). Uses mpi4py if\navailable, otherwise processes calls sequentially in one process.\n\nBased on mpi.py from the pyunicorn project.\n\n\n## EXAMPLE\n\n```python\n\n# Example of using distributed work queue distwq\n# PYTHONPATH must include the directories in which distwq and this file are located.\n\nimport distwq\nimport numpy as np  \nimport scipy\nfrom scipy import signal\n\ndef do_work(freq):\n    fs = 10e3\n    N = 1e5\n    amp = 2*np.sqrt(2)\n    freq = float(freq)\n    noise_power = 0.001 * fs / 2\n    time = np.arange(N) / fs\n    x = amp*np.sin(2*np.pi*freq*time)\n    x += np.random.normal(scale=np.sqrt(noise_power), size=time.shape)\n    f, pdens = signal.periodogram(x, fs)\n    return f, pdens\n\n\ndef main(controller):\n    n = 150\n    for i in range(0, n):\n        controller.submit_call(\"do_work\", (i+1,), module_name=\"example_distwq\")\n    s = []\n    for i in range(0, n):\n        s.append(controller.get_next_result())\n    print(\"results length : %d\" % len(s))\n    print(s)\n    controller.info()\n\nif __name__ == '__main__':\n    if distwq.is_controller:\n        distwq.run(fun_name=\"main\", verbose=True, nprocs_per_worker=3)\n    else:\n        distwq.run(fun_name=None, verbose=True, nprocs_per_worker=3)\n\n```\n\n## API\n\n### MPIController\n\n#### submit\\_call\n\n```python\n submit_call(name_to_call, args=(), kwargs={}, module_name=\"__main__\", time_est=1, task_id=None)\n```\n\nSubmit a call for parallel execution.\n\nIf called by the controller and workers are available, the call is submitted\nto a worker for asynchronous execution.\n\nIf called by a worker or if no workers are available, the call is instead\nexecuted synchronously on this MPI node.\n\n**Examples:**\n\n1. Provide ids and time estimate explicitly:\n\n```python\nfor n in range(0,10):\ndistwq.submit_call(\"doit\", (n,A[n]), id=n, time_est=n**2)\n\nfor n in range(0,10):\nresult[n] = distwq.get_result(n)\n```\n\n2. Use generated ids stored in a list:\n\n```python\nfor n in range(0,10):\nids.append(distwq.submit_call(\"doit\", (n,A[n])))\n\nfor n in range(0,10):\nresults.append(distwq.get_result(ids.pop()))\n```\n\n3. Ignore ids altogether:\n\n```python\nfor n in range(0,10):\ndistwq.submit_call(\"doit\", (n,A[n]))\n\nfor n in range(0,10):\nresults.append(distwq.get_next_result())\n```\n\n4. Call a module function and use keyword arguments:\n\n```python\n\ndistwq.submit_call(\"solve\", (), {\"a\":a, \"b\":b},\nmodule=\"numpy.linalg\")\n```\n\n- name\\_to\\_call (str): name of callable object (usually a function or\nstatic method of a class) as contained in the namespace specified\nby module.\n- args (tuple): the positional arguments to provide to the callable\nobject.  Tuples of length 1 must be written (arg,).  Default: ()\n- kwargs (dict): the keyword arguments to provide to the callable\nobject.  Default: {}\n- module (str): optional name of the imported module or submodule in\nwhose namespace the callable object is contained. For objects\ndefined on the script level, this is \"__main__\", for objects\ndefined in an imported package, this is the package name. Must be a\nkey of the dictionary sys.modules (check there after import if in\ndoubt).  Default: \"__main__\"\n- time_est (float): estimated relative completion time for this call;\nused to find a suitable worker. Default: 1\n-  id (int or None): unique id for this call. Must be a possible dictionary key.\nIf None, a random id is assigned and returned. Can be re-used after\nget_result() for this is. Default: None\n- worker: int > 0 and < comm.size, or None: optional no. of worker to assign the call to. If None, the\ncall is assigned to the worker with the smallest current total time\nestimate. Default: None\n\n**Returns**:\n\nid of call, to be used in get_result().\n\n#### get\\_result\n\n```python\n get_result(task_id)\n```\n\nReturns result of earlier submitted call.\n\nCan only be called by the controller.\n\nIf the call is not yet finished, waits for it to finish.\nResults should be collected in the same order as calls were submitted.\nFor each worker, the results of calls assigned to that worker must be\ncollected in the same order as those calls were submitted.\nCan only be called once per call.\n\n- id (int) : id of an earlier submitted call, as provided to or returned by submit_call().\n\n**Returns**:\n\nreturn value of call.\n\n#### get\\_next\\_result\n\n```python\n get_next_result()\n```\n\nReturns result of next earlier submitted call whose result has not yet\nbeen obtained.\n\nCan only be called by the controller.\n\nIf the call is not yet finished, waits for it to finish.\n\n**Returns**:\n\nid, return value of call, or None of there are no more calls in the queue.\n\n#### info\n\n```python\n info()\n```\n\nPrint processing statistics.\n\nCan only be called by the controller.\n\n#### exit\n\n```python\n exit()\n```\n\nTells all workers to exit.\n\nCan only be called by the controller.\n\n#### abort\n\n```python\n abort()\n```\n\nAbort execution on all MPI nodes immediately.\n\nCan be called by controller and workers.\n\n### MPIWorker\n\n#### serve\n\n```python\n serve()\n```\n\nServes submitted calls until told to finish.\n\nCall this function if workers need to perform initialization\ndifferent from the controller, like this:\n\n```python\n def workerfun(worker):\n     do = whatever + initialization - is * necessary\n     worker.serve()\n     do = whatever + cleanup - is * necessary\n```\n\nIf workerfun() is not defined, serve() will be called automatically by run().\n\n\n### MPICollectiveBroker\n\n#### serve\n\n```python\n serve()\n```\n\nBroker and serve submitted calls until told to finish. A task\nis received from the controller and sent to all collective\nworkers associated with this broker via scatter.\n\nCall this function if workers need to perform initialization\ndifferent from the controller, like this:\n\n```python\ndef workerfun(worker):\n    do = whatever + initialization - is * necessary\n    worker.serve()\n     do = whatever + cleanup - is * necessary\n```\n\nIf workerfun() is not defined, serve() will be called automatically by\nrun().\n\n### Procedures\n\n#### run\n\n```python\nrun(fun_name=None, module_name='__main__', verbose=False, worker_grouping_method=None, nprocs_per_worker=1, broker_is_worker=False, args=())\n```\n\nRuns in controller/worker mode until fun(controller/worker) finishes.\n\nMust be called on all MPI nodes.\n\nOn the controller, run() calls fun\\_name() and returns when fun\\_name() returns.\n\nOn each worker, run() calls fun() if that is defined, or calls serve()\notherwise, and returns when fun() returns, or when fun() returns on\nthe controller, or when controller calls exit().\n\n- module\\_name (string): module where fun_name is located\n- verbose (bool): whether processing information should be printed.\n- worker\\_grouping\\_method (str): whether to separate worker processes into groups via MPI\\_Comm\\_Spawn (\"spawn\") or MPI\\_Comm\\_Split (\"split\")\n- nprocs\\_per\\_worker (int): how many processes per worker\n- broker\\_is\\_worker (bool): when worker\\_grouping\\_method is \"spawn\" or \"split\" and nprocs\\_per\\_worker > 1, MPI\\_Comm\\_Spawn or MPI\\_Comm\\_split will be used to create workers, and a CollectiveBroker object is used to relay tasks and results between controller and worker. When broker\\_is\\_worker is true, the broker also participates in serving tasks, otherwise it only relays calls.\n- args (tuple): additional args to pass to fun\n",
    "bugtrack_url": null,
    "license": "GPL-3.0-or-later",
    "summary": "Distributed queue operations with mpi4py",
    "version": "1.1.0",
    "split_keywords": [
        "mpi",
        "mpi4py",
        "distributed computing",
        "distributed queue"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "da62e5f5668cc60c8b92e1d45ab52a50bd50052bdfb77c99f76a5bb7d9837c82",
                "md5": "d68f41fbdbcb58a89ac09aaefccad4eb",
                "sha256": "ca9297d2f512059b4035add868f2e6e3ebdf3f86cd0ea387567cb6b556e4d65b"
            },
            "downloads": -1,
            "filename": "distwq-1.1.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "d68f41fbdbcb58a89ac09aaefccad4eb",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8,<4.0",
            "size": 30202,
            "upload_time": "2023-02-03T22:32:52",
            "upload_time_iso_8601": "2023-02-03T22:32:52.821729Z",
            "url": "https://files.pythonhosted.org/packages/da/62/e5f5668cc60c8b92e1d45ab52a50bd50052bdfb77c99f76a5bb7d9837c82/distwq-1.1.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "f197a99ca6c9ffac04a26c1452a500e74fbb83e613ad3390136b867c806a622f",
                "md5": "33a84838eae11ba89f6fc327ee7bfad3",
                "sha256": "ffe10d76da8ba87667e534d7b836026960487dec04fa0749670fb1172f0170b7"
            },
            "downloads": -1,
            "filename": "distwq-1.1.0.tar.gz",
            "has_sig": false,
            "md5_digest": "33a84838eae11ba89f6fc327ee7bfad3",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8,<4.0",
            "size": 31939,
            "upload_time": "2023-02-03T22:32:54",
            "upload_time_iso_8601": "2023-02-03T22:32:54.581048Z",
            "url": "https://files.pythonhosted.org/packages/f1/97/a99ca6c9ffac04a26c1452a500e74fbb83e613ad3390136b867c806a622f/distwq-1.1.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-02-03 22:32:54",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "github_user": "iraikov",
    "github_project": "distwq",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "tox": true,
    "lcname": "distwq"
}
        
Elapsed time: 0.04100s