asyncpal


Nameasyncpal JSON
Version 0.0.5 PyPI version JSON
download
home_pagehttps://github.com/pyrustic/asyncpal
SummaryPreemptive concurrency and parallelism for sporadic workloads
upload_time2024-09-08 14:05:14
maintainerPyrustic Evangelist
docs_urlNone
authorPyrustic Evangelist
requires_python>=3.5
licenseMIT
keywords application pyrustic
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![PyPI package version](https://img.shields.io/pypi/v/asyncpal)](https://pypi.org/project/asyncpal)
[![Downloads](https://static.pepy.tech/badge/asyncpal)](https://pepy.tech/project/asyncpal)


<!-- Cover -->
<div align="center">
    <img src="https://raw.githubusercontent.com/pyrustic/misc/master/assets/asyncpal/cover.jpg" alt="Cover image" width="640">
    <p align="center">
        <a href="https://commons.wikimedia.org/wiki/File:Carrera_de_carros_romanos.jpg">Poniol</a>, <a href="https://creativecommons.org/licenses/by-sa/3.0">CC BY-SA 3.0</a>, via Wikimedia Commons
    </p>
</div>

<!-- Intro Text -->
# Asyncpal
<b>Preemptive concurrency and parallelism for sporadic workloads</b>

## Table of contents
- [Overview](#overview)
    - [Designed for sporadic workloads](#designed-for-sporadic-workloads)
    - [Supplied with advanced capabilities](#supplied-with-advanced-capabilities)
    - [Featuring a familiar interface](#featuring-a-familiar-interface)
- [Examples](#examples)
- [Embarrassingly parallel workloads](#embarrassingly-parallel-workloads)
- [Initializers, finalizers, and the BrokenPoolError exception](#initializers-finalizers-and-the-brokenpoolerror-exception)
- [The peculiar cases of daemons and remote exceptions](#the-peculiar-cases-of-daemons-and-remote-exceptions)
- [Application programming interface](#application-programming-interface)
    - [ThreadPool class](#threadpool-class)
    - [ProcessPool class](#processpool-class)
    - [Future class](#future-class)
    - [Miscellaneous functions and classes](#miscellaneous-functions-and-classes)
    - [Exception classes](#exception-classes)
- [Testing and contributing](#testing-and-contributing)
- [Installation](#installation)


# Overview
**Asyncpal** is a [Python](https://www.python.org/) library designed for preemptive [concurrency](https://en.wikipedia.org/wiki/Concurrent_computing) and [parallelism](https://en.wikipedia.org/wiki/Parallel_computing). It achieves concurrency using the [thread pool](https://en.wikipedia.org/wiki/Thread_pool) design pattern that it extends with processes to enable parallelism.

## Designed for sporadic workloads
Although a thread pool is the right tool for the problems it solves, its creation and usage involve the allocation of resources that must be properly released. For this reason, it is recommended to use a thread pool with a context manager to ensure that resources are properly released once the pool executor has finished the tasks.

However, this strategy can introduce overhead in programs that sporadically submit tasks to a thread pool, as multiple pools may be created and destroyed throughout the execution of these programs.

Maintaining one or a few thread pools for the duration of a program can be an effective solution, assuming these thread pools can automatically **shrink** after workers have been idle for a short period defined by the programmer.

Asyncpal offers the ability to set an idle timeout for workers, allowing the pool to which they belong to shrink when they are not in use.

> Learn how Asyncpal ensures a [graceful shutdown](#the-peculiar-case-of-daemons) of open pools when an uncaught exception occurs.

## Supplied with advanced capabilities
Asyncpal pools provide methods to manage [embarrassingly parallel workloads](https://en.wikipedia.org/wiki/Embarrassingly_parallel), allowing for lazy or eager execution and optional workload splitting into large chunks, with or without preserving their original order.

Some level of introspection is achievable directly from the pool interface, such as counting busy workers or pending tasks. Additionally, a `Future` class (never directly instantiated by the user) is provided, whose objects allow a task to be cancelled or its result to be collected. Furthermore, the pending time and running duration of a task can be obtained directly from a `Future` object.

Overall, the characteristics of Asyncpal make it suitable for both implicit use in the background through higher-level abstractions provided by frameworks or libraries, and for explicit use with or without a context manager.

## Featuring a familiar interface
Asyncpal is inspired by the great preemptive concurrency and parallelism packages provided in Python and Java.

For instance, the `chunk_size` option for map operations is borrowed from Python's [multiprocessing](https://docs.python.org/3/library/multiprocessing.html) and [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html) packages, while the fixed-size pools, such as the `SingleThreadPool` class, are inspired by Java's [java.util.concurrent.Executors.newSingleThreadExecutor](https://docs.oracle.com/en%2Fjava%2Fjavase%2F22%2Fdocs%2Fapi%2F%2F/java.base/java/util/concurrent/Executors.html#newSingleThreadExecutor()) static method.


# Examples
The following code snippets are adapted from examples provided by Python's `concurrent.futures` documentation page.

## Thread pool example
The following code snippet is adapted from [ThreadPoolExecutor example](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor-example) provided by Python's concurrent.futures documentation [page](https://docs.python.org/3/library/concurrent.futures.html).

```python
import urllib.request
from asyncpal import ThreadPool, as_done

URLS = ["https://ubuntu.com/",
        "https://github.com/pyrustic/asyncpal/",
        "https://youtu.be/xLi83prR5fg",
        "https://news.ycombinator.com/",
        "https://nonexistant-subdomain.python.org/"]

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# Thread pool with a context manager (not mandatory, tho)
with ThreadPool(max_workers=5) as pool:
    # Start the load operations and mark each future with its URL
    future_to_url = {pool.submit(load_url, url, 60): url for url in URLS}
    for future in as_done(future_to_url):
        url = future_to_url[future]
        try:
            data = future.collect()  # collect the result or raise the exception
        except Exception as exc:
            print("%r generated an exception: %s" % (url, exc))
        else:
            print("%r page is %d bytes" % (url, len(data)))
```

> The function `as_done` accepts a list of Future objects and also the `keep_order` and `timeout` keyword arguments.

## Process pool example
The following code snippet is adapted from [ProcessPoolExecutor example](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example) provided by Python's concurrent.futures documentation [page](https://docs.python.org/3/library/concurrent.futures.html).

```python
import math
from asyncpal import ProcessPool

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with ProcessPool() as pool:
        # The 'map' method is lazy and slower than 'map_all'.
        # For very long iterables, 'map_all' may cause high memory usage.
        for number, prime in zip(PRIMES, pool.map(is_prime, PRIMES)):
            print("%d is prime: %s" % (number, prime))

if __name__ == "__main__":
    main()
```

> The method `map` also accepts these keyword arguments: `chunk_size`, `buffer_size`, `keep_order`, and `timeout`.


# Embarrassingly parallel workloads
Asyncpal pool classes provide four methods to perform [Map](https://en.wikipedia.org/wiki/Map_(parallel_pattern)) operations and for cases where control is more important than convenience, there are public functions for manually splitting a task into subtasks to submit to the pool.

## Pool class methods to perform Map operations
Pool class methods to perform Map operations are `map`, `map_all`, `starmap`, and `starmap_all`.

```python
from itertools import starmap
from asyncpal import ThreadPool

def add(x, y):
    return x + y

with ThreadPool(4) as pool:
    # numbers go from 0 to 99
    numbers = range(100)

    # The 'map' method is lazy and slower than 'map_all'.
    # Keyword arguments: chunk_size, buffer_size, keep_order, timeout
    iterator = pool.map(add, numbers, numbers, chunk_size=25)
    assert tuple(iterator) == tuple(map(add, numbers, numbers))

    # For very long iterables, 'map_all' may cause high memory usage.
    # Keyword arguments: chunk_size, keep_order, timeout
    iterator = pool.map_all(add, numbers, numbers, chunk_size=25)
    assert tuple(iterator) == tuple(map(add, numbers, numbers))

    # The 'starmap' method is lazy and slower than 'starmap_all'.
    # Keyword arguments: chunk_size, buffer_size, keep_order, timeout
    iterator = pool.starmap(add, zip(numbers, numbers), chunk_size=25)
    assert tuple(iterator) == tuple(starmap(add, zip(numbers, numbers)))

    # For very long iterables, 'starmap_all' may cause high memory usage.
    # Keyword arguments: chunk_size, keep_order, timeout
    iterator = pool.starmap_all(add, zip(numbers, numbers), chunk_size=25)
    assert tuple(iterator) == tuple(starmap(add, zip(numbers, numbers)))
```

> For convenience, there are also `map_unordered`, `map_all_unordered`, `starmap_unordered`, `starmap_all_unordered`.

## Useful functions
The `split_map_task` and `split_starmap_task` functions allow to manually split a task into subtasks. There are also the `wait`, `collect` and `as_done` functions which are intended to be applied to sequences of Future objects.

```python
from asyncpal import (ThreadPool, split_map_task, split_starmap_task,
                      wait, collect, as_done)

def add(x, y):
    return x + y

with ThreadPool(4) as pool:
    # numbers go from 0 to 99
    numbers = range(100)

    # Manually split a 'map' task into 4 subtasks
    futures = list()
    for subtask in split_map_task(add, numbers, numbers, chunk_size=25):
        future = pool.submit(subtask)
        futures.append(future)

    # We could've used 'split_starmap_task'
    for subtask in split_starmap_task(add, zip(numbers, numbers)):
        pass

    # We can block the current thread, waiting for the results to be available
    wait(futures, timeout=42)  # 42 seconds !

    # Or we can just collect results (beware, an exception may be raised)
    result = list()
    for sub_result in collect(futures, timeout=42):
        result.extend(sub_result)
    assert tuple(result) == tuple(map(add, numbers, numbers))

    # We could've used 'as_done' to filter out futures as they are done.
    # Note that by default, the keyword argument 'keep_order' is False !
    for future in as_done(futures, timeout=42):
        pass
```

# Initializers, finalizers, and the BrokenPoolError exception
At the creation of a pool, the programmer can provide an initializer and/or a finalizer. Consequently, each time the pool spawns a worker (whether it is a thread or a process), the initializer is executed at startup, and the finalizer is executed right before the worker shuts down.

Any exception raised during initialization, finalization, or in between will be caught by the pool, which will then enter a "broken" state. Once the pool is broken, it will shut down other workers, cancel pending tasks, and make them available via the `cancelled_tasks` property. It will also raise a `BrokenPoolError` exception whenever the programmer attempts to submit new tasks. 

Asyncpal offers a way to reduce the risk of encountering a `BrokenPoolError` exception at an inconvenient time by testing the pool beforehand. All pool classes provide a `test` method that replicate the pool with its configuration, perform some computation on it, then close it, letting any exception propagate to the top.

# The peculiar cases of daemons and remote exceptions
This section discusses the peculiar cases of daemons and remote exceptions.

## The peculiar case of daemons
In Python, a thread can be flagged as a [daemon thread](https://docs.python.org/3/library/threading.html#thread-objects). The significance of this flag is that the entire Python program exits when only daemon threads are left.

Prior to **Python 3.9**, `concurrent.futures` used daemon threads as workers for its thread pool and relied on [atexit](https://docs.python.org/3/library/atexit.html) hooks to gracefully shut down the pools that had not been explicitly closed. For compatibility with subinterpreters, which do not support daemon threads, it was decided to [remove the daemon flag](https://docs.python.org/3/whatsnew/3.9.html#concurrent-futures). However, simply removing the daemon flag would have been [problematic](https://bugs.python.org/issue37266#msg362890). 

The fix for this issue involved stopping the use of atexit hooks and instead relying on an [internal threading atexit hook](https://bugs.python.org/issue37266#msg362960). Asyncpal does not use the daemon flag either. Instead of relying on some internal Python function that might disappear without warning, it implements its own workaround. This workaround involves a single thread for the entire program, started by `asyncpal.pool.GlobalShutdown`, whose job is to join the main thread and, once joined, run the shutdown handlers registered by the pools. 

> Feel free to open an issue to criticize this workaround or to suggest a better idea.

## The peculiar case of remote exceptions
An exception raised in a worker of a `ProcessPool` must go through a multiprocessing queue to reach the pool instance. To be placed in a multiprocessor queue, a Python object must be **picklable**, that is, it must be possible to [serialize](https://en.wikipedia.org/wiki/Serialization) it with Python's [pickle](https://docs.python.org/3/library/pickle.html) mechanism.

An exception instance typically contains a traceback object that is not picklable and thus nullified by the `pickle` mechanism. Python's `concurrent.futures` and `multiprocessing.pool.Pool` use a hack that stringifies traceback to move it from the worker process to the main pool process.

Although this hack is interesting and useful for debugging a `ProcessPool`, it does not preserve the chain of exceptions because the `pickling` process not only nullifies the `__traceback__` attribute of the exception object, but also the `__cause__` and `__context__` attributes.

Asyncpal also stringifies the traceback, as it is a simpler solution than recreating the traceback object in the main process. Additionally, Asyncpal replicates the exception chain, so the programmer can navigate through the `__cause__` and `__context__` attributes of a remote exception.

> The `get_remote_traceback` function is exposed to quickly extract the traceback string of a remote exception.


# Application programming interface
> This section describes the API and refers to the API reference for more details.

Asyncpal consists of three key components: the **Pool**, the **Worker**, and the **Future**. From the programmer perspective, the pool represents the main interface of a system that spawns workers as needed and returns Future objects.

Preemptive concurrency is achieved with the `ThreadPool` class while parallelism is handled by the `ProcessPool` class. Under the hood, the thread pool spawns Python's `threading.Thread` as workers and the process pool spawns Python's `multiprocessing.Process` as workers.


## ThreadPool class
Preemptive concurrency is achieved with the `ThreadPool` class. Under the hood, the thread pool spawns Python's `threading.Thread` as workers.

For convenience, the following four derived classes are provided:

- `SingleThreadPool`: spawns only 1 worker
- `DualThreadPool`: spawns up to 2 workers
- `TripleThreadPool`: spawns up to 3 workers
- `QuadThreadPool`: spawns up to 4 workers

```python
from asyncpal import ThreadPool
from asyncpal.errors import BrokenPoolError, InitializerError, FinalizerError


def add(x, y):
  return x + y


def initializer(*args, **kwargs):
  pass


def finalizer(*args, **kwargs):
  pass


# all these arguments are optional
pool = ThreadPool(max_workers=4, name="my-pool", idle_timeout=60,
                  initializer=initializer, init_args=(1, 2),
                  init_kwargs={"arg": 1}, finalizer=finalizer,
                  final_args=(3, 4), max_tasks_per_worker=None)
# submit a task
future = pool.submit(add, 10, 2)

# test the pool
try:
  pool.test()
# exception coming from the initializer
except InitializerError as e:
  e.__cause__  # the cause
# exception coming from the finalizer
except FinalizerError:
  pass
# exception coming from the initializer
# or the finalizer
except BrokenPoolError:
  pass

# calling this will raise RuntimeError if the pool is closed
# or BrokenPoolError (or its subclass)
pool.check()

# retrieve useful data
pool.count_workers()
pool.count_busy_workers()
pool.count_free_workers()
pool.count_pending_tasks()

# manually spawn workers
pool.spawn_workers(2)  # 2 extra workers
# join all workers
pool.join(timeout=42)

# gracefully shut down the pool
pool.shutdown()
assert pool.is_terminated
# list of cancelled tasks
pool.cancelled_tasks
```

> Check out the API reference for [asyncpal.ThreadPool](https://github.com/pyrustic/asyncpal/blob/master/docs/api/modules/asyncpal/__init__/class-ThreadPool.md).

## ProcessPool class
Parallelism is achieved with the `ProcessPool` class. Under the hood, the process pool spawns Python's `multiprocessing.Process` as workers with the `spawn` context. 

The `ProcessPool` class is similar to the `ThreadPool` class. 

For convenience, the following four derived classes are provided:

- `SingleProcessPool`: spawns only 1 worker
- `DualProcessPool`: spawns up to 2 workers
- `TripleProcessPool`: spawns up to 3 workers
- `QuadProcessPool`: spawns up to 4 workers


> Note that you must guard your process pool with `if __name__ == '__main__'` and also avoid writting multiprocessing code directly in the `__main__` module of your projects.

> Check out the API reference for [asyncpal.ProcessPool](https://github.com/pyrustic/asyncpal/blob/master/docs/api/modules/asyncpal/__init__/class-ProcessPool.md).

## Future class

A [Future](https://en.wikipedia.org/wiki/Futures_and_promises) object is not meant to be instantiated by the programmer but rather returned by the `submit` method of pools.

```python
from asyncpal import ThreadPool


def divide(x, y):
  return x // y


with ThreadPool(4) as pool:
  # submit a task
  future = pool.submit(divide, 10, 2)

  # add a callback that accepts the future as argument
  # and that will be called when the future is done
  future.add_callback(lambda f: None)

  # safely collect the result (by default, it blocks)
  try:
    # blocks (max 42s) until the Future is done
    result = future.collect(timeout=42)
  except ZeroDivisionError as e:
    pass
  else:
    assert result == 5

  # get duration (in seconds)
  pending_time, running_time = future.duration

  # cancel the future (it is a bit too late, but ok)
  future.cancel()

  # we could've waited for the Future to be done (it blocks)
  future.wait(timeout=42)  # 42s !

  # get the result (returns None if the Future isn't done)
  result = future.result
  # get the exception (returns None if the Future isn't done)
  exc = future.exception

  # some useful properties
  future.cancel_flag  # boolean set to True after cancel() is called
  future.is_cancelled # boolean that confirms cancellation
  future.is_done      # True when Completed, Cancelled, or Failed
  future.is_pending   # True while task is pending
  future.is_running   # True while task is running
  # etc...
```


> Check out the API reference for [asyncpal.Future](https://github.com/pyrustic/asyncpal/blob/master/docs/api/modules/asyncpal/__init__/class-Future.md).

## Miscellaneous functions and classes

> Check out the API reference for [asyncpal](https://github.com/pyrustic/asyncpal/blob/master/docs/api/modules/asyncpal/__init__/funcs.md).


# Testing and contributing
Feel free to **open an issue** to report a bug, suggest some changes, show some useful code snippets, or discuss anything related to this project. You can also directly email [me](https://pyrustic.github.io/#contact).

## Setup your development environment
Following are instructions to setup your development environment

```bash
# create and activate a virtual environmentb
python -m venv venv
source venv/bin/activate

# clone the project then change into its directory
git clone https://github.com/pyrustic/asyncpal.git
cd asyncpal

# install the package locally (editable mode)
pip install -e .

# run tests
python -m tests

# deactivate the virtual environment
deactivate
```

<p align="right"><a href="#readme">Back to top</a></p>

# Installation
**Asyncpal** is **cross-platform**. It is built on [Ubuntu](https://ubuntu.com/download/desktop) and should work on **Python 3.8** or **newer**.

## Create and activate a virtual environment
```bash
python -m venv venv
source venv/bin/activate
```

## Install for the first time

```bash
pip install asyncpal
```

## Upgrade the package
```bash
pip install asyncpal --upgrade --upgrade-strategy eager
```

## Deactivate the virtual environment
```bash
deactivate
```

<p align="right"><a href="#readme">Back to top</a></p>

# About the author
Hello world, I'm Alex, a tech enthusiast ! Feel free to get in touch with [me](https://pyrustic.github.io/#contact) !

<br>
<br>
<br>

[Back to top](#readme)





            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/pyrustic/asyncpal",
    "name": "asyncpal",
    "maintainer": "Pyrustic Evangelist",
    "docs_url": null,
    "requires_python": ">=3.5",
    "maintainer_email": "rusticalex@yahoo.com",
    "keywords": "application, pyrustic",
    "author": "Pyrustic Evangelist",
    "author_email": "rusticalex@yahoo.com",
    "download_url": "https://files.pythonhosted.org/packages/ad/cc/a761c95f3ecd1af71911e4f34512e0312227b81e17af4dd292db3e9d5247/asyncpal-0.0.5.tar.gz",
    "platform": null,
    "description": "[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)\n[![PyPI package version](https://img.shields.io/pypi/v/asyncpal)](https://pypi.org/project/asyncpal)\n[![Downloads](https://static.pepy.tech/badge/asyncpal)](https://pepy.tech/project/asyncpal)\n\n\n<!-- Cover -->\n<div align=\"center\">\n    <img src=\"https://raw.githubusercontent.com/pyrustic/misc/master/assets/asyncpal/cover.jpg\" alt=\"Cover image\" width=\"640\">\n    <p align=\"center\">\n        <a href=\"https://commons.wikimedia.org/wiki/File:Carrera_de_carros_romanos.jpg\">Poniol</a>, <a href=\"https://creativecommons.org/licenses/by-sa/3.0\">CC BY-SA 3.0</a>, via Wikimedia Commons\n    </p>\n</div>\n\n<!-- Intro Text -->\n# Asyncpal\n<b>Preemptive concurrency and parallelism for sporadic workloads</b>\n\n## Table of contents\n- [Overview](#overview)\n    - [Designed for sporadic workloads](#designed-for-sporadic-workloads)\n    - [Supplied with advanced capabilities](#supplied-with-advanced-capabilities)\n    - [Featuring a familiar interface](#featuring-a-familiar-interface)\n- [Examples](#examples)\n- [Embarrassingly parallel workloads](#embarrassingly-parallel-workloads)\n- [Initializers, finalizers, and the BrokenPoolError exception](#initializers-finalizers-and-the-brokenpoolerror-exception)\n- [The peculiar cases of daemons and remote exceptions](#the-peculiar-cases-of-daemons-and-remote-exceptions)\n- [Application programming interface](#application-programming-interface)\n    - [ThreadPool class](#threadpool-class)\n    - [ProcessPool class](#processpool-class)\n    - [Future class](#future-class)\n    - [Miscellaneous functions and classes](#miscellaneous-functions-and-classes)\n    - [Exception classes](#exception-classes)\n- [Testing and contributing](#testing-and-contributing)\n- [Installation](#installation)\n\n\n# Overview\n**Asyncpal** is a [Python](https://www.python.org/) library designed for preemptive [concurrency](https://en.wikipedia.org/wiki/Concurrent_computing) and [parallelism](https://en.wikipedia.org/wiki/Parallel_computing). It achieves concurrency using the [thread pool](https://en.wikipedia.org/wiki/Thread_pool) design pattern that it extends with processes to enable parallelism.\n\n## Designed for sporadic workloads\nAlthough a thread pool is the right tool for the problems it solves, its creation and usage involve the allocation of resources that must be properly released. For this reason, it is recommended to use a thread pool with a context manager to ensure that resources are properly released once the pool executor has finished the tasks.\n\nHowever, this strategy can introduce overhead in programs that sporadically submit tasks to a thread pool, as multiple pools may be created and destroyed throughout the execution of these programs.\n\nMaintaining one or a few thread pools for the duration of a program can be an effective solution, assuming these thread pools can automatically **shrink** after workers have been idle for a short period defined by the programmer.\n\nAsyncpal offers the ability to set an idle timeout for workers, allowing the pool to which they belong to shrink when they are not in use.\n\n> Learn how Asyncpal ensures a [graceful shutdown](#the-peculiar-case-of-daemons) of open pools when an uncaught exception occurs.\n\n## Supplied with advanced capabilities\nAsyncpal pools provide methods to manage [embarrassingly parallel workloads](https://en.wikipedia.org/wiki/Embarrassingly_parallel), allowing for lazy or eager execution and optional workload splitting into large chunks, with or without preserving their original order.\n\nSome level of introspection is achievable directly from the pool interface, such as counting busy workers or pending tasks. Additionally, a `Future` class (never directly instantiated by the user) is provided, whose objects allow a task to be cancelled or its result to be collected. Furthermore, the pending time and running duration of a task can be obtained directly from a `Future` object.\n\nOverall, the characteristics of Asyncpal make it suitable for both implicit use in the background through higher-level abstractions provided by frameworks or libraries, and for explicit use with or without a context manager.\n\n## Featuring a familiar interface\nAsyncpal is inspired by the great preemptive concurrency and parallelism packages provided in Python and Java.\n\nFor instance, the `chunk_size` option for map operations is borrowed from Python's [multiprocessing](https://docs.python.org/3/library/multiprocessing.html) and [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html) packages, while the fixed-size pools, such as the `SingleThreadPool` class, are inspired by Java's [java.util.concurrent.Executors.newSingleThreadExecutor](https://docs.oracle.com/en%2Fjava%2Fjavase%2F22%2Fdocs%2Fapi%2F%2F/java.base/java/util/concurrent/Executors.html#newSingleThreadExecutor()) static method.\n\n\n# Examples\nThe following code snippets are adapted from examples provided by Python's `concurrent.futures` documentation page.\n\n## Thread pool example\nThe following code snippet is adapted from [ThreadPoolExecutor example](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor-example) provided by Python's concurrent.futures documentation [page](https://docs.python.org/3/library/concurrent.futures.html).\n\n```python\nimport urllib.request\nfrom asyncpal import ThreadPool, as_done\n\nURLS = [\"https://ubuntu.com/\",\n        \"https://github.com/pyrustic/asyncpal/\",\n        \"https://youtu.be/xLi83prR5fg\",\n        \"https://news.ycombinator.com/\",\n        \"https://nonexistant-subdomain.python.org/\"]\n\n# Retrieve a single page and report the URL and contents\ndef load_url(url, timeout):\n    with urllib.request.urlopen(url, timeout=timeout) as conn:\n        return conn.read()\n\n# Thread pool with a context manager (not mandatory, tho)\nwith ThreadPool(max_workers=5) as pool:\n    # Start the load operations and mark each future with its URL\n    future_to_url = {pool.submit(load_url, url, 60): url for url in URLS}\n    for future in as_done(future_to_url):\n        url = future_to_url[future]\n        try:\n            data = future.collect()  # collect the result or raise the exception\n        except Exception as exc:\n            print(\"%r generated an exception: %s\" % (url, exc))\n        else:\n            print(\"%r page is %d bytes\" % (url, len(data)))\n```\n\n> The function `as_done` accepts a list of Future objects and also the `keep_order` and `timeout` keyword arguments.\n\n## Process pool example\nThe following code snippet is adapted from [ProcessPoolExecutor example](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example) provided by Python's concurrent.futures documentation [page](https://docs.python.org/3/library/concurrent.futures.html).\n\n```python\nimport math\nfrom asyncpal import ProcessPool\n\nPRIMES = [\n    112272535095293,\n    112582705942171,\n    112272535095293,\n    115280095190773,\n    115797848077099,\n    1099726899285419]\n\ndef is_prime(n):\n    if n < 2:\n        return False\n    if n == 2:\n        return True\n    if n % 2 == 0:\n        return False\n    sqrt_n = int(math.floor(math.sqrt(n)))\n    for i in range(3, sqrt_n + 1, 2):\n        if n % i == 0:\n            return False\n    return True\n\ndef main():\n    with ProcessPool() as pool:\n        # The 'map' method is lazy and slower than 'map_all'.\n        # For very long iterables, 'map_all' may cause high memory usage.\n        for number, prime in zip(PRIMES, pool.map(is_prime, PRIMES)):\n            print(\"%d is prime: %s\" % (number, prime))\n\nif __name__ == \"__main__\":\n    main()\n```\n\n> The method `map` also accepts these keyword arguments: `chunk_size`, `buffer_size`, `keep_order`, and `timeout`.\n\n\n# Embarrassingly parallel workloads\nAsyncpal pool classes provide four methods to perform [Map](https://en.wikipedia.org/wiki/Map_(parallel_pattern)) operations and for cases where control is more important than convenience, there are public functions for manually splitting a task into subtasks to submit to the pool.\n\n## Pool class methods to perform Map operations\nPool class methods to perform Map operations are `map`, `map_all`, `starmap`, and `starmap_all`.\n\n```python\nfrom itertools import starmap\nfrom asyncpal import ThreadPool\n\ndef add(x, y):\n    return x + y\n\nwith ThreadPool(4) as pool:\n    # numbers go from 0 to 99\n    numbers = range(100)\n\n    # The 'map' method is lazy and slower than 'map_all'.\n    # Keyword arguments: chunk_size, buffer_size, keep_order, timeout\n    iterator = pool.map(add, numbers, numbers, chunk_size=25)\n    assert tuple(iterator) == tuple(map(add, numbers, numbers))\n\n    # For very long iterables, 'map_all' may cause high memory usage.\n    # Keyword arguments: chunk_size, keep_order, timeout\n    iterator = pool.map_all(add, numbers, numbers, chunk_size=25)\n    assert tuple(iterator) == tuple(map(add, numbers, numbers))\n\n    # The 'starmap' method is lazy and slower than 'starmap_all'.\n    # Keyword arguments: chunk_size, buffer_size, keep_order, timeout\n    iterator = pool.starmap(add, zip(numbers, numbers), chunk_size=25)\n    assert tuple(iterator) == tuple(starmap(add, zip(numbers, numbers)))\n\n    # For very long iterables, 'starmap_all' may cause high memory usage.\n    # Keyword arguments: chunk_size, keep_order, timeout\n    iterator = pool.starmap_all(add, zip(numbers, numbers), chunk_size=25)\n    assert tuple(iterator) == tuple(starmap(add, zip(numbers, numbers)))\n```\n\n> For convenience, there are also `map_unordered`, `map_all_unordered`, `starmap_unordered`, `starmap_all_unordered`.\n\n## Useful functions\nThe `split_map_task` and `split_starmap_task` functions allow to manually split a task into subtasks. There are also the `wait`, `collect` and `as_done` functions which are intended to be applied to sequences of Future objects.\n\n```python\nfrom asyncpal import (ThreadPool, split_map_task, split_starmap_task,\n                      wait, collect, as_done)\n\ndef add(x, y):\n    return x + y\n\nwith ThreadPool(4) as pool:\n    # numbers go from 0 to 99\n    numbers = range(100)\n\n    # Manually split a 'map' task into 4 subtasks\n    futures = list()\n    for subtask in split_map_task(add, numbers, numbers, chunk_size=25):\n        future = pool.submit(subtask)\n        futures.append(future)\n\n    # We could've used 'split_starmap_task'\n    for subtask in split_starmap_task(add, zip(numbers, numbers)):\n        pass\n\n    # We can block the current thread, waiting for the results to be available\n    wait(futures, timeout=42)  # 42 seconds !\n\n    # Or we can just collect results (beware, an exception may be raised)\n    result = list()\n    for sub_result in collect(futures, timeout=42):\n        result.extend(sub_result)\n    assert tuple(result) == tuple(map(add, numbers, numbers))\n\n    # We could've used 'as_done' to filter out futures as they are done.\n    # Note that by default, the keyword argument 'keep_order' is False !\n    for future in as_done(futures, timeout=42):\n        pass\n```\n\n# Initializers, finalizers, and the BrokenPoolError exception\nAt the creation of a pool, the programmer can provide an initializer and/or a finalizer. Consequently, each time the pool spawns a worker (whether it is a thread or a process), the initializer is executed at startup, and the finalizer is executed right before the worker shuts down.\n\nAny exception raised during initialization, finalization, or in between will be caught by the pool, which will then enter a \"broken\" state. Once the pool is broken, it will shut down other workers, cancel pending tasks, and make them available via the `cancelled_tasks` property. It will also raise a `BrokenPoolError` exception whenever the programmer attempts to submit new tasks. \n\nAsyncpal offers a way to reduce the risk of encountering a `BrokenPoolError` exception at an inconvenient time by testing the pool beforehand. All pool classes provide a `test` method that replicate the pool with its configuration, perform some computation on it, then close it, letting any exception propagate to the top.\n\n# The peculiar cases of daemons and remote exceptions\nThis section discusses the peculiar cases of daemons and remote exceptions.\n\n## The peculiar case of daemons\nIn Python, a thread can be flagged as a [daemon thread](https://docs.python.org/3/library/threading.html#thread-objects). The significance of this flag is that the entire Python program exits when only daemon threads are left.\n\nPrior to **Python 3.9**, `concurrent.futures` used daemon threads as workers for its thread pool and relied on [atexit](https://docs.python.org/3/library/atexit.html) hooks to gracefully shut down the pools that had not been explicitly closed. For compatibility with subinterpreters, which do not support daemon threads, it was decided to [remove the daemon flag](https://docs.python.org/3/whatsnew/3.9.html#concurrent-futures). However, simply removing the daemon flag would have been [problematic](https://bugs.python.org/issue37266#msg362890). \n\nThe fix for this issue involved stopping the use of atexit hooks and instead relying on an [internal threading atexit hook](https://bugs.python.org/issue37266#msg362960). Asyncpal does not use the daemon flag either. Instead of relying on some internal Python function that might disappear without warning, it implements its own workaround. This workaround involves a single thread for the entire program, started by `asyncpal.pool.GlobalShutdown`, whose job is to join the main thread and, once joined, run the shutdown handlers registered by the pools. \n\n> Feel free to open an issue to criticize this workaround or to suggest a better idea.\n\n## The peculiar case of remote exceptions\nAn exception raised in a worker of a `ProcessPool` must go through a multiprocessing queue to reach the pool instance. To be placed in a multiprocessor queue, a Python object must be **picklable**, that is, it must be possible to [serialize](https://en.wikipedia.org/wiki/Serialization) it with Python's [pickle](https://docs.python.org/3/library/pickle.html) mechanism.\n\nAn exception instance typically contains a traceback object that is not picklable and thus nullified by the `pickle` mechanism. Python's `concurrent.futures` and `multiprocessing.pool.Pool` use a hack that stringifies traceback to move it from the worker process to the main pool process.\n\nAlthough this hack is interesting and useful for debugging a `ProcessPool`, it does not preserve the chain of exceptions because the `pickling` process not only nullifies the `__traceback__` attribute of the exception object, but also the `__cause__` and `__context__` attributes.\n\nAsyncpal also stringifies the traceback, as it is a simpler solution than recreating the traceback object in the main process. Additionally, Asyncpal replicates the exception chain, so the programmer can navigate through the `__cause__` and `__context__` attributes of a remote exception.\n\n> The `get_remote_traceback` function is exposed to quickly extract the traceback string of a remote exception.\n\n\n# Application programming interface\n> This section describes the API and refers to the API reference for more details.\n\nAsyncpal consists of three key components: the **Pool**, the **Worker**, and the **Future**. From the programmer perspective, the pool represents the main interface of a system that spawns workers as needed and returns Future objects.\n\nPreemptive concurrency is achieved with the `ThreadPool` class while parallelism is handled by the `ProcessPool` class. Under the hood, the thread pool spawns Python's `threading.Thread` as workers and the process pool spawns Python's `multiprocessing.Process` as workers.\n\n\n## ThreadPool class\nPreemptive concurrency is achieved with the `ThreadPool` class. Under the hood, the thread pool spawns Python's `threading.Thread` as workers.\n\nFor convenience, the following four derived classes are provided:\n\n- `SingleThreadPool`: spawns only 1 worker\n- `DualThreadPool`: spawns up to 2 workers\n- `TripleThreadPool`: spawns up to 3 workers\n- `QuadThreadPool`: spawns up to 4 workers\n\n```python\nfrom asyncpal import ThreadPool\nfrom asyncpal.errors import BrokenPoolError, InitializerError, FinalizerError\n\n\ndef add(x, y):\n  return x + y\n\n\ndef initializer(*args, **kwargs):\n  pass\n\n\ndef finalizer(*args, **kwargs):\n  pass\n\n\n# all these arguments are optional\npool = ThreadPool(max_workers=4, name=\"my-pool\", idle_timeout=60,\n                  initializer=initializer, init_args=(1, 2),\n                  init_kwargs={\"arg\": 1}, finalizer=finalizer,\n                  final_args=(3, 4), max_tasks_per_worker=None)\n# submit a task\nfuture = pool.submit(add, 10, 2)\n\n# test the pool\ntry:\n  pool.test()\n# exception coming from the initializer\nexcept InitializerError as e:\n  e.__cause__  # the cause\n# exception coming from the finalizer\nexcept FinalizerError:\n  pass\n# exception coming from the initializer\n# or the finalizer\nexcept BrokenPoolError:\n  pass\n\n# calling this will raise RuntimeError if the pool is closed\n# or BrokenPoolError (or its subclass)\npool.check()\n\n# retrieve useful data\npool.count_workers()\npool.count_busy_workers()\npool.count_free_workers()\npool.count_pending_tasks()\n\n# manually spawn workers\npool.spawn_workers(2)  # 2 extra workers\n# join all workers\npool.join(timeout=42)\n\n# gracefully shut down the pool\npool.shutdown()\nassert pool.is_terminated\n# list of cancelled tasks\npool.cancelled_tasks\n```\n\n> Check out the API reference for [asyncpal.ThreadPool](https://github.com/pyrustic/asyncpal/blob/master/docs/api/modules/asyncpal/__init__/class-ThreadPool.md).\n\n## ProcessPool class\nParallelism is achieved with the `ProcessPool` class. Under the hood, the process pool spawns Python's `multiprocessing.Process` as workers with the `spawn` context. \n\nThe `ProcessPool` class is similar to the `ThreadPool` class. \n\nFor convenience, the following four derived classes are provided:\n\n- `SingleProcessPool`: spawns only 1 worker\n- `DualProcessPool`: spawns up to 2 workers\n- `TripleProcessPool`: spawns up to 3 workers\n- `QuadProcessPool`: spawns up to 4 workers\n\n\n> Note that you must guard your process pool with `if __name__ == '__main__'` and also avoid writting multiprocessing code directly in the `__main__` module of your projects.\n\n> Check out the API reference for [asyncpal.ProcessPool](https://github.com/pyrustic/asyncpal/blob/master/docs/api/modules/asyncpal/__init__/class-ProcessPool.md).\n\n## Future class\n\nA [Future](https://en.wikipedia.org/wiki/Futures_and_promises) object is not meant to be instantiated by the programmer but rather returned by the `submit` method of pools.\n\n```python\nfrom asyncpal import ThreadPool\n\n\ndef divide(x, y):\n  return x // y\n\n\nwith ThreadPool(4) as pool:\n  # submit a task\n  future = pool.submit(divide, 10, 2)\n\n  # add a callback that accepts the future as argument\n  # and that will be called when the future is done\n  future.add_callback(lambda f: None)\n\n  # safely collect the result (by default, it blocks)\n  try:\n    # blocks (max 42s) until the Future is done\n    result = future.collect(timeout=42)\n  except ZeroDivisionError as e:\n    pass\n  else:\n    assert result == 5\n\n  # get duration (in seconds)\n  pending_time, running_time = future.duration\n\n  # cancel the future (it is a bit too late, but ok)\n  future.cancel()\n\n  # we could've waited for the Future to be done (it blocks)\n  future.wait(timeout=42)  # 42s !\n\n  # get the result (returns None if the Future isn't done)\n  result = future.result\n  # get the exception (returns None if the Future isn't done)\n  exc = future.exception\n\n  # some useful properties\n  future.cancel_flag  # boolean set to True after cancel() is called\n  future.is_cancelled # boolean that confirms cancellation\n  future.is_done      # True when Completed, Cancelled, or Failed\n  future.is_pending   # True while task is pending\n  future.is_running   # True while task is running\n  # etc...\n```\n\n\n> Check out the API reference for [asyncpal.Future](https://github.com/pyrustic/asyncpal/blob/master/docs/api/modules/asyncpal/__init__/class-Future.md).\n\n## Miscellaneous functions and classes\n\n> Check out the API reference for [asyncpal](https://github.com/pyrustic/asyncpal/blob/master/docs/api/modules/asyncpal/__init__/funcs.md).\n\n\n# Testing and contributing\nFeel free to **open an issue** to report a bug, suggest some changes, show some useful code snippets, or discuss anything related to this project. You can also directly email [me](https://pyrustic.github.io/#contact).\n\n## Setup your development environment\nFollowing are instructions to setup your development environment\n\n```bash\n# create and activate a virtual environmentb\npython -m venv venv\nsource venv/bin/activate\n\n# clone the project then change into its directory\ngit clone https://github.com/pyrustic/asyncpal.git\ncd asyncpal\n\n# install the package locally (editable mode)\npip install -e .\n\n# run tests\npython -m tests\n\n# deactivate the virtual environment\ndeactivate\n```\n\n<p align=\"right\"><a href=\"#readme\">Back to top</a></p>\n\n# Installation\n**Asyncpal** is **cross-platform**. It is built on [Ubuntu](https://ubuntu.com/download/desktop) and should work on **Python 3.8** or **newer**.\n\n## Create and activate a virtual environment\n```bash\npython -m venv venv\nsource venv/bin/activate\n```\n\n## Install for the first time\n\n```bash\npip install asyncpal\n```\n\n## Upgrade the package\n```bash\npip install asyncpal --upgrade --upgrade-strategy eager\n```\n\n## Deactivate the virtual environment\n```bash\ndeactivate\n```\n\n<p align=\"right\"><a href=\"#readme\">Back to top</a></p>\n\n# About the author\nHello world, I'm Alex, a tech enthusiast ! Feel free to get in touch with [me](https://pyrustic.github.io/#contact) !\n\n<br>\n<br>\n<br>\n\n[Back to top](#readme)\n\n\n\n\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Preemptive concurrency and parallelism for sporadic workloads",
    "version": "0.0.5",
    "project_urls": {
        "Homepage": "https://github.com/pyrustic/asyncpal"
    },
    "split_keywords": [
        "application",
        " pyrustic"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "5d4e486a24c329174526777977bf5edfde3bd4ac2dcbf64aed473ceb07dd98f1",
                "md5": "3a18c2ac0723f11520e18593893c944b",
                "sha256": "05b31b18181de57bf9b108cbafd43fdd43a05de5b2530418f96b7afacb290888"
            },
            "downloads": -1,
            "filename": "asyncpal-0.0.5-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "3a18c2ac0723f11520e18593893c944b",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.5",
            "size": 39285,
            "upload_time": "2024-09-08T14:05:13",
            "upload_time_iso_8601": "2024-09-08T14:05:13.001257Z",
            "url": "https://files.pythonhosted.org/packages/5d/4e/486a24c329174526777977bf5edfde3bd4ac2dcbf64aed473ceb07dd98f1/asyncpal-0.0.5-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "adcca761c95f3ecd1af71911e4f34512e0312227b81e17af4dd292db3e9d5247",
                "md5": "9c8721a2d4c33dca6b545ce141051477",
                "sha256": "bf628e120d1a9712a84e27acfe1485a33a3d54ae146171f590b05aa045c47d4c"
            },
            "downloads": -1,
            "filename": "asyncpal-0.0.5.tar.gz",
            "has_sig": false,
            "md5_digest": "9c8721a2d4c33dca6b545ce141051477",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.5",
            "size": 38299,
            "upload_time": "2024-09-08T14:05:14",
            "upload_time_iso_8601": "2024-09-08T14:05:14.958327Z",
            "url": "https://files.pythonhosted.org/packages/ad/cc/a761c95f3ecd1af71911e4f34512e0312227b81e17af4dd292db3e9d5247/asyncpal-0.0.5.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-09-08 14:05:14",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "pyrustic",
    "github_project": "asyncpal",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "asyncpal"
}
        
Elapsed time: 0.55287s