# distwq
Distributed work queue operations using mpi4py.
Allows for easy parallelization in controller/worker mode with one
controller submitting function or method calls to workers. Supports
multiple ranks per worker (collective workers). Uses mpi4py if
available, otherwise processes calls sequentially in one process.
Based on mpi.py from the pyunicorn project.
## Global Variables
- `is_controller` (bool): True if current process is the controller
- `is_worker` (bool): True if current process is a worker
- `spawned` (bool): True if current process was spawned via MPI_Comm_spawn
- `workers_available` (bool): True if workers are available
- `size` (int): Total number of MPI processes
- `rank` (int): Rank of current MPI process
- `n_workers` (int): Number of worker processes
## Enums
### CollectiveMode
- `Gather = 1`: Use MPI gather/scatter for collective operations
- `SendRecv = 2`: Use MPI send/receive for collective operations
### MessageTag
- `READY = 0`: Worker ready message
- `DONE = 1`: Task completion message
- `TASK = 2`: Task assignment message
- `EXIT = 3`: Worker exit message
### GroupingMethod
- `NoGrouping = 0`: No worker grouping
- `GroupSpawn = 1`: Group workers via MPI_Comm_spawn
- `GroupSplit = 2`: Group workers via MPI_Comm_split
## EXAMPLE
```python
# Example of using distributed work queue distwq
# PYTHONPATH must include the directories in which distwq and this file are located.
import distwq
import numpy as np
import scipy
from scipy import signal
def do_work(freq):
fs = 10e3
N = 1e5
amp = 2*np.sqrt(2)
freq = float(freq)
noise_power = 0.001 * fs / 2
time = np.arange(N) / fs
x = amp*np.sin(2*np.pi*freq*time)
x += np.random.normal(scale=np.sqrt(noise_power), size=time.shape)
f, pdens = signal.periodogram(x, fs)
return f, pdens
def main(controller):
n = 150
for i in range(0, n):
controller.submit_call("do_work", (i+1,), module_name="example_distwq")
s = []
for i in range(0, n):
s.append(controller.get_next_result())
print("results length : %d" % len(s))
print(s)
controller.info()
if __name__ == '__main__':
if distwq.is_controller:
distwq.run(fun_name="main", verbose=True, nprocs_per_worker=3)
else:
distwq.run(fun_name=None, verbose=True, nprocs_per_worker=3)
```
## API
### MPIController
#### submit_call
```python
submit_call(name_to_call, args=(), kwargs={}, module_name="__main__", time_est=1, task_id=None, worker=None)
```
Submit a call for parallel execution.
If called by the controller and workers are available, the call is submitted
to a worker for asynchronous execution.
If called by a worker or if no workers are available, the call is instead
executed synchronously on this MPI node.
**Examples:**
1. Provide ids and time estimate explicitly:
```python
for n in range(0,10):
controller.submit_call("doit", (n,A[n]), task_id=n, time_est=n**2)
for n in range(0,10):
result[n] = controller.get_result(n)
```
2. Use generated ids stored in a list:
```python
for n in range(0,10):
ids.append(controller.submit_call("doit", (n,A[n])))
for n in range(0,10):
results.append(controller.get_result(ids.pop()))
```
3. Ignore ids altogether:
```python
for n in range(0,10):
controller.submit_call("doit", (n,A[n]))
for n in range(0,10):
results.append(controller.get_next_result())
```
4. Call a module function and use keyword arguments:
```python
controller.submit_call("solve", (), {"a":a, "b":b}, module_name="numpy.linalg")
```
**Parameters:**
- `name_to_call` (str): name of callable object (usually a function or static method of a class) as contained in the namespace specified by module.
- `args` (tuple): the positional arguments to provide to the callable object. Tuples of length 1 must be written (arg,). Default: ()
- `kwargs` (dict): the keyword arguments to provide to the callable object. Default: {}
- `module_name` (str): optional name of the imported module or submodule in whose namespace the callable object is contained. For objects defined on the script level, this is "__main__", for objects defined in an imported package, this is the package name. Must be a key of the dictionary sys.modules (check there after import if in doubt). Default: "__main__"
- `time_est` (int): estimated relative completion time for this call; used to find a suitable worker. Default: 1
- `task_id` (int or None): unique id for this call. Must be a possible dictionary key. If None, a random id is assigned and returned. Can be re-used after get_result() for this call. Default: None
- `worker` (int > 0 and < comm.size, or None): optional no. of worker to assign the call to. If None, the call is assigned to the worker with the smallest current total time estimate. Default: None
**Returns:**
id of call, to be used in get_result().
#### queue_call
```python
queue_call(name_to_call, args=(), kwargs={}, module_name="__main__", time_est=1, task_id=None, requested_worker=None)
```
Submit a call for later execution.
If called by the controller and workers are available, the call is put on the wait queue and submitted to a worker when it is available. Method process() checks the wait queue and submits calls on the wait queue.
**Parameters:**
Same as submit_call(), except:
- `requested_worker` (int > 0 and < comm.size, or None): optional no. of worker to assign the call to. If None, or the worker is not available, the call is assigned to the worker with the smallest current total time estimate. Default: None
**Returns:**
id of call, to be used in get_result().
#### submit_multiple
```python
submit_multiple(name_to_call, args=[], kwargs=[], module_name="__main__", time_est=1, task_ids=None, workers=None)
```
Submit multiple calls for parallel execution.
Analogous to submit_call, but accepts lists of arguments and submits to multiple workers for asynchronous execution.
**Parameters:**
- `name_to_call` (str): name of callable object
- `args` (list): the positional arguments to provide to the callable object for each task, as a list of tuples. Default: []
- `kwargs` (list): the keyword arguments to provide to the callable object for each task, as a list of dictionaries. Default: []
- `module_name` (str): optional name of the imported module or submodule. Default: "__main__"
- `time_est` (int): estimated relative completion time for this call. Default: 1
- `task_ids` (list or None): unique ids for each call. If None, random ids are assigned. Default: None
- `workers` (list of int > 0 and < comm.size, or None): optional worker ids to assign the tasks to. If None, the tasks are assigned in order to the workers with the smallest current total time estimate. Default: None
**Returns:**
List of task ids for the submitted calls.
#### get_result
```python
get_result(task_id)
```
Return result of earlier submitted call.
Can only be called by the controller.
If the call is not yet finished, waits for it to finish.
Results should be collected in the same order as calls were submitted.
For each worker, the results of calls assigned to that worker must be
collected in the same order as those calls were submitted.
Can only be called once per call.
**Parameters:**
- `task_id` (int): id of an earlier submitted call, as provided to or returned by submit_call().
**Returns:**
Tuple of (task_id, return value of call).
#### get_next_result
```python
get_next_result()
```
Return result of next earlier submitted call whose result has not yet
been obtained.
Can only be called by the controller.
If the call is not yet finished, waits for it to finish.
**Returns:**
Tuple of (id, return value of call), or None if there are no more calls in the queue.
#### probe_next_result
```python
probe_next_result()
```
Return result of next earlier submitted call whose result has not yet
been obtained.
Can only be called by the controller.
If no result is available, returns None.
**Returns:**
Tuple of (id, return value of call), or None if there are no results ready.
#### probe_all_next_results
```python
probe_all_next_results()
```
Return all available results of earlier submitted calls whose result has not yet
been obtained.
Can only be called by the controller.
If no result is available, returns empty list.
**Returns:**
List of tuples (id, return value of call).
#### get_ready_worker
```python
get_ready_worker()
```
Returns the id and data of a ready worker.
**Returns:**
Tuple of (worker_id, worker_data) if a worker is available, or (None, None) if no workers are available or ready.
#### info
```python
info()
```
Print processing statistics.
Can only be called by the controller.
#### exit
```python
exit()
```
Tell all workers to exit.
Can only be called by the controller.
#### abort
```python
abort()
```
Abort execution on all MPI nodes immediately.
Can be called by controller and workers.
### MPIWorker
#### serve
```python
serve()
```
Serve submitted calls until told to finish.
Call this function if workers need to perform initialization
different from the controller, like this:
```python
def workerfun(worker):
do = whatever + initialization - is * necessary
worker.serve()
do = whatever + cleanup - is * necessary
```
If workerfun() is not defined, serve() will be called automatically by run().
#### abort
```python
abort()
```
Abort execution on all MPI nodes immediately.
### MPICollectiveWorker
Used for collective operations where multiple processes work together on tasks.
#### serve
```python
serve()
```
Serve submitted calls until told to finish. Tasks are obtained via scatter and results are returned via gather, i.e. all collective workers spawned by a CollectiveBroker will participate in these collective calls.
#### publish_service
```python
publish_service()
```
Publish the worker service for discovery by other processes.
#### connect_service
```python
connect_service(n_lookup_attempts=5)
```
Connect to the worker service.
**Parameters:**
- `n_lookup_attempts` (int): Number of attempts to lookup the service. Default: 5
#### abort
```python
abort()
```
Abort execution on all MPI nodes immediately.
### MPICollectiveBroker
#### serve
```python
serve()
```
Broker and serve submitted calls until told to finish. A task
is received from the controller and sent to all collective
workers associated with this broker via scatter.
Call this function if workers need to perform initialization
different from the controller, like this:
```python
def workerfun(worker):
do = whatever + initialization - is * necessary
worker.serve()
do = whatever + cleanup - is * necessary
```
If workerfun() is not defined, serve() will be called automatically by run().
#### abort
```python
abort()
```
Abort execution on all MPI nodes immediately.
## Procedures
### run
```python
run(fun_name=None, module_name="__main__", broker_fun_name=None, broker_module_name="__main__",
max_workers=-1, worker_grouping_method=GroupingMethod.NoGrouping, sequential_spawn=False,
spawn_startup_wait=None, spawn_executable=None, spawn_args=[], nprocs_per_worker=1,
collective_mode="gather", broker_is_worker=False, worker_service_name="distwq.init",
enable_worker_service=False, time_limit=None, verbose=False, args=())
```
Run in controller/worker mode until fun(controller/worker) finishes.
Must be called on all MPI nodes.
On the controller, run() calls fun_name() and returns when fun_name() returns.
On each worker, run() calls fun() if that is defined, or calls serve()
otherwise, and returns when fun() returns, or when fun() returns on
the controller, or when controller calls exit().
**Parameters:**
- `fun_name` (str or None): name of function to call on controller. Default: None
- `module_name` (str): module where fun_name is located. Default: "__main__"
- `broker_fun_name` (str or None): name of function to call on brokers. Default: None
- `broker_module_name` (str): module where broker_fun_name is located. Default: "__main__"
- `max_workers` (int): maximum number of workers to use. -1 means use all available. Default: -1
- `worker_grouping_method` (str or GroupingMethod): specifies grouping method for workers: "spawn", "split", or GroupingMethod enum values. Default: GroupingMethod.NoGrouping
- `sequential_spawn` (bool): whether to spawn processes in sequence. Default: False
- `spawn_startup_wait` (int or None): optional startup wait time for spawned processes. Default: None
- `spawn_executable` (str or None): optional executable name for call to spawn (default is sys.executable). Default: None
- `spawn_args` (list): optional arguments to prepend to list of arguments in call to spawn; or a callable that takes the list of arguments that distwq needs to pass to the python interpreter, and returns a new list of arguments. Default: []
- `nprocs_per_worker` (int): how many processes per worker. Default: 1
- `collective_mode` (str): collective communication mode ("gather" or "sendrecv"). Default: "gather"
- `broker_is_worker` (bool): when worker_grouping_method is GroupSpawn or GroupSplit and nprocs_per_worker > 1, MPI_Comm_spawn or MPI_Comm_split will be used to create workers, and a CollectiveBroker object is used to relay tasks and results between controller and worker. When broker_is_worker is true, the broker also participates in serving tasks, otherwise it only relays calls. Default: False
- `worker_service_name` (str): name for worker service discovery. Default: "distwq.init"
- `enable_worker_service` (bool): whether to enable worker service for discovery. Default: False
- `time_limit` (int or None): maximum wall clock time, in seconds. Default: None
- `verbose` (bool): whether processing information should be printed. Default: False
- `args` (tuple): additional args to pass to fun. Default: ()
**Returns:**
Return value of the controller function, or None for workers.
Raw data
{
"_id": null,
"home_page": null,
"name": "distwq",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": "MPI, distributed computing, distributed queue, mpi4py",
"author": null,
"author_email": "Ivan Raikov <ivan.g.raikov@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/cd/49/ac93bc113ff03cce5624e0a426416c30913d37fc8cdfd6a5d6f7b12ea31d/distwq-1.2.1.tar.gz",
"platform": null,
"description": "# distwq\n\nDistributed work queue operations using mpi4py.\n\nAllows for easy parallelization in controller/worker mode with one\ncontroller submitting function or method calls to workers. Supports\nmultiple ranks per worker (collective workers). Uses mpi4py if\navailable, otherwise processes calls sequentially in one process.\n\nBased on mpi.py from the pyunicorn project.\n\n## Global Variables\n\n- `is_controller` (bool): True if current process is the controller\n- `is_worker` (bool): True if current process is a worker\n- `spawned` (bool): True if current process was spawned via MPI_Comm_spawn\n- `workers_available` (bool): True if workers are available\n- `size` (int): Total number of MPI processes\n- `rank` (int): Rank of current MPI process\n- `n_workers` (int): Number of worker processes\n\n## Enums\n\n### CollectiveMode\n- `Gather = 1`: Use MPI gather/scatter for collective operations\n- `SendRecv = 2`: Use MPI send/receive for collective operations\n\n### MessageTag\n- `READY = 0`: Worker ready message\n- `DONE = 1`: Task completion message\n- `TASK = 2`: Task assignment message\n- `EXIT = 3`: Worker exit message\n\n### GroupingMethod\n- `NoGrouping = 0`: No worker grouping\n- `GroupSpawn = 1`: Group workers via MPI_Comm_spawn\n- `GroupSplit = 2`: Group workers via MPI_Comm_split\n\n## EXAMPLE\n\n```python\n# Example of using distributed work queue distwq\n# PYTHONPATH must include the directories in which distwq and this file are located.\n\nimport distwq\nimport numpy as np \nimport scipy\nfrom scipy import signal\n\ndef do_work(freq):\n fs = 10e3\n N = 1e5\n amp = 2*np.sqrt(2)\n freq = float(freq)\n noise_power = 0.001 * fs / 2\n time = np.arange(N) / fs\n x = amp*np.sin(2*np.pi*freq*time)\n x += np.random.normal(scale=np.sqrt(noise_power), size=time.shape)\n f, pdens = signal.periodogram(x, fs)\n return f, pdens\n\ndef main(controller):\n n = 150\n for i in range(0, n):\n controller.submit_call(\"do_work\", (i+1,), module_name=\"example_distwq\")\n s = []\n for i in range(0, n):\n s.append(controller.get_next_result())\n print(\"results length : %d\" % len(s))\n print(s)\n controller.info()\n\nif __name__ == '__main__':\n if distwq.is_controller:\n distwq.run(fun_name=\"main\", verbose=True, nprocs_per_worker=3)\n else:\n distwq.run(fun_name=None, verbose=True, nprocs_per_worker=3)\n```\n\n## API\n\n### MPIController\n\n#### submit_call\n\n```python\nsubmit_call(name_to_call, args=(), kwargs={}, module_name=\"__main__\", time_est=1, task_id=None, worker=None)\n```\n\nSubmit a call for parallel execution.\n\nIf called by the controller and workers are available, the call is submitted\nto a worker for asynchronous execution.\n\nIf called by a worker or if no workers are available, the call is instead\nexecuted synchronously on this MPI node.\n\n**Examples:**\n\n1. Provide ids and time estimate explicitly:\n\n```python\nfor n in range(0,10):\n controller.submit_call(\"doit\", (n,A[n]), task_id=n, time_est=n**2)\n\nfor n in range(0,10):\n result[n] = controller.get_result(n)\n```\n\n2. Use generated ids stored in a list:\n\n```python\nfor n in range(0,10):\n ids.append(controller.submit_call(\"doit\", (n,A[n])))\n\nfor n in range(0,10):\n results.append(controller.get_result(ids.pop()))\n```\n\n3. Ignore ids altogether:\n\n```python\nfor n in range(0,10):\n controller.submit_call(\"doit\", (n,A[n]))\n\nfor n in range(0,10):\n results.append(controller.get_next_result())\n```\n\n4. Call a module function and use keyword arguments:\n\n```python\ncontroller.submit_call(\"solve\", (), {\"a\":a, \"b\":b}, module_name=\"numpy.linalg\")\n```\n\n**Parameters:**\n- `name_to_call` (str): name of callable object (usually a function or static method of a class) as contained in the namespace specified by module.\n- `args` (tuple): the positional arguments to provide to the callable object. Tuples of length 1 must be written (arg,). Default: ()\n- `kwargs` (dict): the keyword arguments to provide to the callable object. Default: {}\n- `module_name` (str): optional name of the imported module or submodule in whose namespace the callable object is contained. For objects defined on the script level, this is \"__main__\", for objects defined in an imported package, this is the package name. Must be a key of the dictionary sys.modules (check there after import if in doubt). Default: \"__main__\"\n- `time_est` (int): estimated relative completion time for this call; used to find a suitable worker. Default: 1\n- `task_id` (int or None): unique id for this call. Must be a possible dictionary key. If None, a random id is assigned and returned. Can be re-used after get_result() for this call. Default: None\n- `worker` (int > 0 and < comm.size, or None): optional no. of worker to assign the call to. If None, the call is assigned to the worker with the smallest current total time estimate. Default: None\n\n**Returns:**\nid of call, to be used in get_result().\n\n#### queue_call\n\n```python\nqueue_call(name_to_call, args=(), kwargs={}, module_name=\"__main__\", time_est=1, task_id=None, requested_worker=None)\n```\n\nSubmit a call for later execution.\n\nIf called by the controller and workers are available, the call is put on the wait queue and submitted to a worker when it is available. Method process() checks the wait queue and submits calls on the wait queue.\n\n**Parameters:**\nSame as submit_call(), except:\n- `requested_worker` (int > 0 and < comm.size, or None): optional no. of worker to assign the call to. If None, or the worker is not available, the call is assigned to the worker with the smallest current total time estimate. Default: None\n\n**Returns:**\nid of call, to be used in get_result().\n\n#### submit_multiple\n\n```python\nsubmit_multiple(name_to_call, args=[], kwargs=[], module_name=\"__main__\", time_est=1, task_ids=None, workers=None)\n```\n\nSubmit multiple calls for parallel execution.\n\nAnalogous to submit_call, but accepts lists of arguments and submits to multiple workers for asynchronous execution.\n\n**Parameters:**\n- `name_to_call` (str): name of callable object\n- `args` (list): the positional arguments to provide to the callable object for each task, as a list of tuples. Default: []\n- `kwargs` (list): the keyword arguments to provide to the callable object for each task, as a list of dictionaries. Default: []\n- `module_name` (str): optional name of the imported module or submodule. Default: \"__main__\"\n- `time_est` (int): estimated relative completion time for this call. Default: 1\n- `task_ids` (list or None): unique ids for each call. If None, random ids are assigned. Default: None\n- `workers` (list of int > 0 and < comm.size, or None): optional worker ids to assign the tasks to. If None, the tasks are assigned in order to the workers with the smallest current total time estimate. Default: None\n\n**Returns:**\nList of task ids for the submitted calls.\n\n#### get_result\n\n```python\nget_result(task_id)\n```\n\nReturn result of earlier submitted call.\n\nCan only be called by the controller.\n\nIf the call is not yet finished, waits for it to finish.\nResults should be collected in the same order as calls were submitted.\nFor each worker, the results of calls assigned to that worker must be\ncollected in the same order as those calls were submitted.\nCan only be called once per call.\n\n**Parameters:**\n- `task_id` (int): id of an earlier submitted call, as provided to or returned by submit_call().\n\n**Returns:**\nTuple of (task_id, return value of call).\n\n#### get_next_result\n\n```python\nget_next_result()\n```\n\nReturn result of next earlier submitted call whose result has not yet\nbeen obtained.\n\nCan only be called by the controller.\n\nIf the call is not yet finished, waits for it to finish.\n\n**Returns:**\nTuple of (id, return value of call), or None if there are no more calls in the queue.\n\n#### probe_next_result\n\n```python\nprobe_next_result()\n```\n\nReturn result of next earlier submitted call whose result has not yet\nbeen obtained.\n\nCan only be called by the controller.\n\nIf no result is available, returns None.\n\n**Returns:**\nTuple of (id, return value of call), or None if there are no results ready.\n\n#### probe_all_next_results\n\n```python\nprobe_all_next_results()\n```\n\nReturn all available results of earlier submitted calls whose result has not yet\nbeen obtained.\n\nCan only be called by the controller.\n\nIf no result is available, returns empty list.\n\n**Returns:**\nList of tuples (id, return value of call).\n\n#### get_ready_worker\n\n```python\nget_ready_worker()\n```\n\nReturns the id and data of a ready worker.\n\n**Returns:**\nTuple of (worker_id, worker_data) if a worker is available, or (None, None) if no workers are available or ready.\n\n#### info\n\n```python\ninfo()\n```\n\nPrint processing statistics.\n\nCan only be called by the controller.\n\n#### exit\n\n```python\nexit()\n```\n\nTell all workers to exit.\n\nCan only be called by the controller.\n\n#### abort\n\n```python\nabort()\n```\n\nAbort execution on all MPI nodes immediately.\n\nCan be called by controller and workers.\n\n### MPIWorker\n\n#### serve\n\n```python\nserve()\n```\n\nServe submitted calls until told to finish.\n\nCall this function if workers need to perform initialization\ndifferent from the controller, like this:\n\n```python\ndef workerfun(worker):\n do = whatever + initialization - is * necessary\n worker.serve()\n do = whatever + cleanup - is * necessary\n```\n\nIf workerfun() is not defined, serve() will be called automatically by run().\n\n#### abort\n\n```python\nabort()\n```\n\nAbort execution on all MPI nodes immediately.\n\n### MPICollectiveWorker\n\nUsed for collective operations where multiple processes work together on tasks.\n\n#### serve\n\n```python\nserve()\n```\n\nServe submitted calls until told to finish. Tasks are obtained via scatter and results are returned via gather, i.e. all collective workers spawned by a CollectiveBroker will participate in these collective calls.\n\n#### publish_service\n\n```python\npublish_service()\n```\n\nPublish the worker service for discovery by other processes.\n\n#### connect_service\n\n```python\nconnect_service(n_lookup_attempts=5)\n```\n\nConnect to the worker service.\n\n**Parameters:**\n- `n_lookup_attempts` (int): Number of attempts to lookup the service. Default: 5\n\n#### abort\n\n```python\nabort()\n```\n\nAbort execution on all MPI nodes immediately.\n\n### MPICollectiveBroker\n\n#### serve\n\n```python\nserve()\n```\n\nBroker and serve submitted calls until told to finish. A task\nis received from the controller and sent to all collective\nworkers associated with this broker via scatter.\n\nCall this function if workers need to perform initialization\ndifferent from the controller, like this:\n\n```python\ndef workerfun(worker):\n do = whatever + initialization - is * necessary\n worker.serve()\n do = whatever + cleanup - is * necessary\n```\n\nIf workerfun() is not defined, serve() will be called automatically by run().\n\n#### abort\n\n```python\nabort()\n```\n\nAbort execution on all MPI nodes immediately.\n\n## Procedures\n\n### run\n\n```python\nrun(fun_name=None, module_name=\"__main__\", broker_fun_name=None, broker_module_name=\"__main__\", \n max_workers=-1, worker_grouping_method=GroupingMethod.NoGrouping, sequential_spawn=False, \n spawn_startup_wait=None, spawn_executable=None, spawn_args=[], nprocs_per_worker=1, \n collective_mode=\"gather\", broker_is_worker=False, worker_service_name=\"distwq.init\", \n enable_worker_service=False, time_limit=None, verbose=False, args=())\n```\n\nRun in controller/worker mode until fun(controller/worker) finishes.\n\nMust be called on all MPI nodes.\n\nOn the controller, run() calls fun_name() and returns when fun_name() returns.\n\nOn each worker, run() calls fun() if that is defined, or calls serve()\notherwise, and returns when fun() returns, or when fun() returns on\nthe controller, or when controller calls exit().\n\n**Parameters:**\n- `fun_name` (str or None): name of function to call on controller. Default: None\n- `module_name` (str): module where fun_name is located. Default: \"__main__\"\n- `broker_fun_name` (str or None): name of function to call on brokers. Default: None\n- `broker_module_name` (str): module where broker_fun_name is located. Default: \"__main__\"\n- `max_workers` (int): maximum number of workers to use. -1 means use all available. Default: -1\n- `worker_grouping_method` (str or GroupingMethod): specifies grouping method for workers: \"spawn\", \"split\", or GroupingMethod enum values. Default: GroupingMethod.NoGrouping\n- `sequential_spawn` (bool): whether to spawn processes in sequence. Default: False\n- `spawn_startup_wait` (int or None): optional startup wait time for spawned processes. Default: None\n- `spawn_executable` (str or None): optional executable name for call to spawn (default is sys.executable). Default: None\n- `spawn_args` (list): optional arguments to prepend to list of arguments in call to spawn; or a callable that takes the list of arguments that distwq needs to pass to the python interpreter, and returns a new list of arguments. Default: []\n- `nprocs_per_worker` (int): how many processes per worker. Default: 1\n- `collective_mode` (str): collective communication mode (\"gather\" or \"sendrecv\"). Default: \"gather\"\n- `broker_is_worker` (bool): when worker_grouping_method is GroupSpawn or GroupSplit and nprocs_per_worker > 1, MPI_Comm_spawn or MPI_Comm_split will be used to create workers, and a CollectiveBroker object is used to relay tasks and results between controller and worker. When broker_is_worker is true, the broker also participates in serving tasks, otherwise it only relays calls. Default: False\n- `worker_service_name` (str): name for worker service discovery. Default: \"distwq.init\"\n- `enable_worker_service` (bool): whether to enable worker service for discovery. Default: False\n- `time_limit` (int or None): maximum wall clock time, in seconds. Default: None\n- `verbose` (bool): whether processing information should be printed. Default: False\n- `args` (tuple): additional args to pass to fun. Default: ()\n\n**Returns:**\nReturn value of the controller function, or None for workers.\n",
"bugtrack_url": null,
"license": null,
"summary": "Distributed queue operations with mpi4py",
"version": "1.2.1",
"project_urls": null,
"split_keywords": [
"mpi",
" distributed computing",
" distributed queue",
" mpi4py"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "385a209865aa2c72f4624d1b8438a00953ae54d34708d1d1033d5bf35d9bb8da",
"md5": "9ccc2df308a4025dad2c12b7dbbfc78b",
"sha256": "1819fe400de2eb7c69496f0428840a0d4cda7c8d0adcc4b66e2e9205ea474960"
},
"downloads": -1,
"filename": "distwq-1.2.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "9ccc2df308a4025dad2c12b7dbbfc78b",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 31545,
"upload_time": "2025-07-30T12:24:36",
"upload_time_iso_8601": "2025-07-30T12:24:36.444694Z",
"url": "https://files.pythonhosted.org/packages/38/5a/209865aa2c72f4624d1b8438a00953ae54d34708d1d1033d5bf35d9bb8da/distwq-1.2.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "cd49ac93bc113ff03cce5624e0a426416c30913d37fc8cdfd6a5d6f7b12ea31d",
"md5": "df082ad9bd59dd4c485a01909d8dbc42",
"sha256": "13a387fe61d9f5479c6bf6e3fba7679dee538665e50636078900505977853eb8"
},
"downloads": -1,
"filename": "distwq-1.2.1.tar.gz",
"has_sig": false,
"md5_digest": "df082ad9bd59dd4c485a01909d8dbc42",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 52249,
"upload_time": "2025-07-30T12:24:37",
"upload_time_iso_8601": "2025-07-30T12:24:37.573180Z",
"url": "https://files.pythonhosted.org/packages/cd/49/ac93bc113ff03cce5624e0a426416c30913d37fc8cdfd6a5d6f7b12ea31d/distwq-1.2.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-30 12:24:37",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "distwq"
}