MultiTasking: Non-blocking Python methods using decorators
==========================================================
|Python version| |PyPi version| |PyPi status| |PyPi downloads|
|CodeFactor| |Star this repo| |Follow me on twitter|
--------------
**MultiTasking** is a lightweight Python library that lets you convert
your Python methods into asynchronous, non-blocking methods simply by
using a decorator. Perfect for I/O-bound tasks, API calls, web scraping,
and any scenario where you want to run multiple operations concurrently
without the complexity of manual thread or process management.
β¨ **What's New in v0.0.12**
----------------------------
- π― **Full Type Hint Support**: Complete type annotations for better
IDE support and code safety
- π **Enhanced Documentation**: Comprehensive docstrings and inline
comments for better maintainability
- π§ **Improved Error Handling**: More robust exception handling with
specific error types
- π **Better Performance**: Optimized task creation and management
logic
- π‘οΈ **Code Quality**: PEP8 compliant, linter-friendly codebase
Quick Start
-----------
.. code:: python
import multitasking
import time
@multitasking.task
def fetch_data(url_id):
# Simulate API call or I/O operation
time.sleep(1)
return f"Data from {url_id}"
# These run concurrently, not sequentially!
for i in range(5):
fetch_data(i)
# Wait for all tasks to complete
multitasking.wait_for_tasks()
print("All data fetched!")
Basic Example
-------------
.. code:: python
# example.py
import multitasking
import time
import random
import signal
# Kill all tasks on ctrl-c (recommended for development)
signal.signal(signal.SIGINT, multitasking.killall)
# Or, wait for tasks to finish gracefully on ctrl-c:
# signal.signal(signal.SIGINT, multitasking.wait_for_tasks)
@multitasking.task # <== this is all it takes! π
def hello(count):
sleep_time = random.randint(1, 10) / 2
print(f"Hello {count} (sleeping for {sleep_time}s)")
time.sleep(sleep_time)
print(f"Goodbye {count} (slept for {sleep_time}s)")
if __name__ == "__main__":
# Launch 10 concurrent tasks
for i in range(10):
hello(i + 1)
# Wait for all tasks to complete
multitasking.wait_for_tasks()
print("All tasks completed!")
**Output:**
.. code:: bash
$ python example.py
Hello 1 (sleeping for 0.5s)
Hello 2 (sleeping for 1.0s)
Hello 3 (sleeping for 5.0s)
Hello 4 (sleeping for 0.5s)
Hello 5 (sleeping for 2.5s)
Hello 6 (sleeping for 3.0s)
Hello 7 (sleeping for 0.5s)
Hello 8 (sleeping for 4.0s)
Hello 9 (sleeping for 3.0s)
Hello 10 (sleeping for 1.0s)
Goodbye 1 (slept for 0.5s)
Goodbye 4 (slept for 0.5s)
Goodbye 7 (slept for 0.5s)
Goodbye 2 (slept for 1.0s)
Goodbye 10 (slept for 1.0s)
Goodbye 5 (slept for 2.5s)
Goodbye 6 (slept for 3.0s)
Goodbye 9 (slept for 3.0s)
Goodbye 8 (slept for 4.0s)
Goodbye 3 (slept for 5.0s)
All tasks completed!
Advanced Usage
==============
Real-World Examples
-------------------
**Web Scraping with Concurrent Requests:**
.. code:: python
import multitasking
import requests
import signal
signal.signal(signal.SIGINT, multitasking.killall)
@multitasking.task
def fetch_url(url):
try:
response = requests.get(url, timeout=10)
print(f"β
{url}: {response.status_code}")
return response.text
except Exception as e:
print(f"β {url}: {str(e)}")
return None
# Fetch multiple URLs concurrently
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/200",
"https://httpbin.org/json"
]
for url in urls:
fetch_url(url)
multitasking.wait_for_tasks()
print(f"Processed {len(urls)} URLs concurrently!")
**Database Operations:**
.. code:: python
import multitasking
import sqlite3
import time
@multitasking.task
def process_batch(batch_id, data_batch):
# Simulate database processing
conn = sqlite3.connect(f'batch_{batch_id}.db')
# ... database operations ...
conn.close()
print(f"Processed batch {batch_id} with {len(data_batch)} records")
# Process multiple data batches concurrently
large_dataset = list(range(1000))
batch_size = 100
for i in range(0, len(large_dataset), batch_size):
batch = large_dataset[i:i + batch_size]
process_batch(i // batch_size, batch)
multitasking.wait_for_tasks()
Pool Management
---------------
MultiTasking uses execution pools to manage concurrent tasks. You can
create and configure multiple pools for different types of operations:
.. code:: python
import multitasking
# Create a pool for API calls (higher concurrency)
multitasking.createPool("api_pool", threads=20, engine="thread")
# Create a pool for CPU-intensive tasks (lower concurrency)
multitasking.createPool("cpu_pool", threads=4, engine="process")
# Switch between pools
multitasking.use_tag("api_pool") # Future tasks use this pool
@multitasking.task
def api_call(endpoint):
# This will use the api_pool
pass
# Get pool information
pool_info = multitasking.getPool("api_pool")
print(f"Pool: {pool_info}") # {'engine': 'thread', 'name': 'api_pool', 'threads': 20}
Task Monitoring
---------------
Monitor and control your tasks with built-in functions:
.. code:: python
import multitasking
import time
@multitasking.task
def long_running_task(task_id):
time.sleep(2)
print(f"Task {task_id} completed")
# Start some tasks
for i in range(5):
long_running_task(i)
# Monitor active tasks
while multitasking.get_active_tasks():
active_count = len(multitasking.get_active_tasks())
total_count = len(multitasking.get_list_of_tasks())
print(f"Progress: {total_count - active_count}/{total_count} completed")
time.sleep(0.5)
print("All tasks finished!")
Configuration & Settings
========================
Thread/Process Limits
---------------------
The default maximum threads equals the number of CPU cores. You can
customize this:
.. code:: python
import multitasking
# Set maximum concurrent tasks
multitasking.set_max_threads(10)
# Scale based on CPU cores (good rule of thumb for I/O-bound tasks)
multitasking.set_max_threads(multitasking.config["CPU_CORES"] * 5)
# Unlimited concurrent tasks (use carefully!)
multitasking.set_max_threads(0)
Execution Engine Selection
--------------------------
Choose between threading and multiprocessing based on your use case:
.. code:: python
import multitasking
# For I/O-bound tasks (default, recommended for most cases)
multitasking.set_engine("thread")
# For CPU-bound tasks (avoids GIL limitations)
multitasking.set_engine("process")
**When to use threads vs processes:**
- **Threads** (default): Best for I/O-bound tasks like file operations,
network requests, database queries
- **Processes**: Best for CPU-intensive tasks like mathematical
computations, image processing, data analysis
Advanced Pool Configuration
---------------------------
Create specialized pools for different workloads:
.. code:: python
import multitasking
# Fast pool for quick API calls
multitasking.createPool("fast_api", threads=50, engine="thread")
# CPU pool for heavy computation
multitasking.createPool("compute", threads=2, engine="process")
# Unlimited pool for lightweight tasks
multitasking.createPool("unlimited", threads=0, engine="thread")
# Get current pool info
current_pool = multitasking.getPool()
print(f"Using pool: {current_pool['name']}")
Best Practices
==============
Performance Tips
----------------
1. **Choose the right engine**: Use threads for I/O-bound tasks,
processes for CPU-bound tasks
2. **Tune thread counts**: Start with CPU cores Γ 2-5 for I/O tasks, CPU
cores for CPU tasks
3. **Use pools wisely**: Create separate pools for different types of
operations
4. **Monitor memory usage**: Each thread/process consumes memory
5. **Handle exceptions**: Always wrap risky operations in try-catch
blocks
Error Handling
--------------
.. code:: python
import multitasking
import requests
@multitasking.task
def robust_fetch(url):
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
print(f"β° Timeout fetching {url}")
except requests.exceptions.RequestException as e:
print(f"β Error fetching {url}: {e}")
except Exception as e:
print(f"π₯ Unexpected error: {e}")
return None
Resource Management
-------------------
.. code:: python
import multitasking
import signal
# Graceful shutdown on interrupt
def cleanup_handler(signum, frame):
print("π Shutting down gracefully...")
multitasking.wait_for_tasks()
print("β
All tasks completed")
exit(0)
signal.signal(signal.SIGINT, cleanup_handler)
# Your application code here...
Troubleshooting
===============
Common Issues
-------------
**Tasks not running concurrently?** Check if youβre calling
``wait_for_tasks()`` inside your task loop instead of after it.
**High memory usage?** Reduce the number of concurrent threads or switch
to a process-based engine.
**Tasks hanging?** Ensure your tasks can complete (avoid infinite loops)
and handle exceptions properly.
**Import errors?** Make sure youβre using Python 3.6+ and have installed
the latest version.
Debugging
---------
.. code:: python
import multitasking
# Enable task monitoring
active_tasks = multitasking.get_active_tasks()
all_tasks = multitasking.get_list_of_tasks()
print(f"Active: {len(active_tasks)}, Total: {len(all_tasks)}")
# Get current pool configuration
pool_info = multitasking.getPool()
print(f"Current pool: {pool_info}")
Installation
============
**Requirements:** - Python 3.6 or higher - No external dependencies!
**Install via pip:**
.. code:: bash
$ pip install multitasking --upgrade --no-cache-dir
**Development installation:**
.. code:: bash
$ git clone https://github.com/ranaroussi/multitasking.git
$ cd multitasking
$ pip install -e .
Compatibility
=============
- **Python**: 3.6+ (type hints require 3.6+)
- **Operating Systems**: Windows, macOS, Linux
- **Environments**: Works in Jupyter notebooks, scripts, web
applications
- **Frameworks**: Compatible with Flask, Django, FastAPI, and other
Python frameworks
API Reference
=============
Decorators
----------
- ``@multitasking.task`` - Convert function to asynchronous task
Configuration Functions
-----------------------
- ``set_max_threads(count)`` - Set maximum concurrent tasks
- ``set_engine(type)`` - Choose βthreadβ or βprocessβ engine
- ``createPool(name, threads, engine)`` - Create custom execution pool
Task Management
---------------
- ``wait_for_tasks(sleep=0)`` - Wait for all tasks to complete
- ``get_active_tasks()`` - Get list of running tasks
- ``get_list_of_tasks()`` - Get list of all tasks
- ``killall()`` - Emergency shutdown (force exit)
.. _pool-management-1:
Pool Management
---------------
- ``getPool(name=None)`` - Get pool information
- ``createPool(name, threads=None, engine=None)`` - Create new pool
Performance Benchmarks
======================
Hereβs a simple benchmark comparing synchronous vs asynchronous
execution:
.. code:: python
import multitasking
import time
import requests
# Synchronous version
def sync_fetch():
start = time.time()
for i in range(10):
requests.get("https://httpbin.org/delay/1")
print(f"Synchronous: {time.time() - start:.2f}s")
# Asynchronous version
@multitasking.task
def async_fetch():
requests.get("https://httpbin.org/delay/1")
def concurrent_fetch():
start = time.time()
for i in range(10):
async_fetch()
multitasking.wait_for_tasks()
print(f"Concurrent: {time.time() - start:.2f}s")
# Results: Synchronous ~10s, Concurrent ~1s (10x speedup!)
Contributing
============
We welcome contributions! Hereβs how you can help:
1. **Report bugs**: Open an issue with details and reproduction steps
2. **Suggest features**: Share your ideas for improvements
3. **Submit PRs**: Fork, create a feature branch, and submit a pull
request
4. **Improve docs**: Help make the documentation even better
**Development setup:**
.. code:: bash
$ git clone https://github.com/ranaroussi/multitasking.git
$ cd multitasking
$ pip install -e .
$ python -m pytest # Run tests
Legal Stuff
===========
**MultiTasking** is distributed under the **Apache Software License**.
See the `LICENSE.txt <./LICENSE.txt>`__ file in the release for details.
Support
=======
- π **Documentation**: This README and inline code documentation
- π **Issues**: `GitHub Issues <https://github.com/ranaroussi/multitasking/issues>`__
- π¦ **Twitter**: [@aroussi](https://twitter.com/aroussi)
Changelog
=========
**v0.0.12-rc** - β¨ Added comprehensive type hints throughout the
codebase - π Enhanced documentation with detailed docstrings and inline
comments - π§ Improved error handling with specific exception types - π
Optimized task creation and pool management logic - π‘οΈ Made codebase
fully PEP8 compliant and linter-friendly - π§Ή Better code organization
and maintainability
**v0.0.11** (Latest) - Previous stable release
--------------
**Happy Multitasking! π**
*Please drop me a note with any feedback you have.*
**Ran Aroussi**
.. |Python version| image:: https://img.shields.io/badge/python-3.6+-blue.svg?style=flat
:target: https://pypi.python.org/pypi/multitasking
.. |PyPi version| image:: https://img.shields.io/pypi/v/multitasking.svg?maxAge=60
:target: https://pypi.python.org/pypi/multitasking
.. |PyPi status| image:: https://img.shields.io/pypi/status/multitasking.svg?maxAge=2592000
:target: https://pypi.python.org/pypi/multitasking
.. |PyPi downloads| image:: https://img.shields.io/pypi/dm/multitasking.svg?maxAge=2592000
:target: https://pypi.python.org/pypi/multitasking
.. |CodeFactor| image:: https://www.codefactor.io/repository/github/ranaroussi/multitasking/badge
:target: https://www.codefactor.io/repository/github/ranaroussi/multitasking
.. |Star this repo| image:: https://img.shields.io/github/stars/ranaroussi/multitasking.svg?style=social&label=Star&maxAge=60
:target: https://github.com/ranaroussi/multitasking
.. |Follow me on twitter| image:: https://img.shields.io/twitter/follow/aroussi.svg?style=social&label=Follow%20Me&maxAge=60
:target: https://twitter.com/aroussi
Raw data
{
"_id": null,
"home_page": "https://github.com/ranaroussi/multitasking",
"name": "multitasking",
"maintainer": null,
"docs_url": null,
"requires_python": null,
"maintainer_email": null,
"keywords": "multitasking multitask threading async",
"author": "Ran Aroussi",
"author_email": "ran@aroussi.com",
"download_url": "https://files.pythonhosted.org/packages/17/0d/74f0293dfd7dcc3837746d0138cbedd60b31701ecc75caec7d3f281feba0/multitasking-0.0.12.tar.gz",
"platform": "any",
"description": "MultiTasking: Non-blocking Python methods using decorators\n==========================================================\n\n|Python version| |PyPi version| |PyPi status| |PyPi downloads|\n|CodeFactor| |Star this repo| |Follow me on twitter|\n\n--------------\n\n**MultiTasking** is a lightweight Python library that lets you convert\nyour Python methods into asynchronous, non-blocking methods simply by\nusing a decorator. Perfect for I/O-bound tasks, API calls, web scraping,\nand any scenario where you want to run multiple operations concurrently\nwithout the complexity of manual thread or process management.\n\n\u2728 **What's New in v0.0.12**\n----------------------------\n\n- \ud83c\udfaf **Full Type Hint Support**: Complete type annotations for better\n IDE support and code safety\n- \ud83d\udcda **Enhanced Documentation**: Comprehensive docstrings and inline\n comments for better maintainability\n- \ud83d\udd27 **Improved Error Handling**: More robust exception handling with\n specific error types\n- \ud83d\ude80 **Better Performance**: Optimized task creation and management\n logic\n- \ud83d\udee1\ufe0f **Code Quality**: PEP8 compliant, linter-friendly codebase\n\nQuick Start\n-----------\n\n.. code:: python\n\n import multitasking\n import time\n\n @multitasking.task\n def fetch_data(url_id):\n # Simulate API call or I/O operation\n time.sleep(1)\n return f\"Data from {url_id}\"\n\n # These run concurrently, not sequentially!\n for i in range(5):\n fetch_data(i)\n\n # Wait for all tasks to complete\n multitasking.wait_for_tasks()\n print(\"All data fetched!\")\n\nBasic Example\n-------------\n\n.. code:: python\n\n # example.py\n import multitasking\n import time\n import random\n import signal\n\n # Kill all tasks on ctrl-c (recommended for development)\n signal.signal(signal.SIGINT, multitasking.killall)\n\n # Or, wait for tasks to finish gracefully on ctrl-c:\n # signal.signal(signal.SIGINT, multitasking.wait_for_tasks)\n\n @multitasking.task # <== this is all it takes! \ud83c\udf89\n def hello(count):\n sleep_time = random.randint(1, 10) / 2\n print(f\"Hello {count} (sleeping for {sleep_time}s)\")\n time.sleep(sleep_time)\n print(f\"Goodbye {count} (slept for {sleep_time}s)\")\n\n if __name__ == \"__main__\":\n # Launch 10 concurrent tasks\n for i in range(10):\n hello(i + 1)\n\n # Wait for all tasks to complete\n multitasking.wait_for_tasks()\n print(\"All tasks completed!\")\n\n**Output:**\n\n.. code:: bash\n\n $ python example.py\n\n Hello 1 (sleeping for 0.5s)\n Hello 2 (sleeping for 1.0s)\n Hello 3 (sleeping for 5.0s)\n Hello 4 (sleeping for 0.5s)\n Hello 5 (sleeping for 2.5s)\n Hello 6 (sleeping for 3.0s)\n Hello 7 (sleeping for 0.5s)\n Hello 8 (sleeping for 4.0s)\n Hello 9 (sleeping for 3.0s)\n Hello 10 (sleeping for 1.0s)\n Goodbye 1 (slept for 0.5s)\n Goodbye 4 (slept for 0.5s)\n Goodbye 7 (slept for 0.5s)\n Goodbye 2 (slept for 1.0s)\n Goodbye 10 (slept for 1.0s)\n Goodbye 5 (slept for 2.5s)\n Goodbye 6 (slept for 3.0s)\n Goodbye 9 (slept for 3.0s)\n Goodbye 8 (slept for 4.0s)\n Goodbye 3 (slept for 5.0s)\n All tasks completed!\n\nAdvanced Usage\n==============\n\nReal-World Examples\n-------------------\n\n**Web Scraping with Concurrent Requests:**\n\n.. code:: python\n\n import multitasking\n import requests\n import signal\n\n signal.signal(signal.SIGINT, multitasking.killall)\n\n @multitasking.task\n def fetch_url(url):\n try:\n response = requests.get(url, timeout=10)\n print(f\"\u2705 {url}: {response.status_code}\")\n return response.text\n except Exception as e:\n print(f\"\u274c {url}: {str(e)}\")\n return None\n\n # Fetch multiple URLs concurrently\n urls = [\n \"https://httpbin.org/delay/1\",\n \"https://httpbin.org/delay/2\",\n \"https://httpbin.org/status/200\",\n \"https://httpbin.org/json\"\n ]\n\n for url in urls:\n fetch_url(url)\n\n multitasking.wait_for_tasks()\n print(f\"Processed {len(urls)} URLs concurrently!\")\n\n**Database Operations:**\n\n.. code:: python\n\n import multitasking\n import sqlite3\n import time\n\n @multitasking.task\n def process_batch(batch_id, data_batch):\n # Simulate database processing\n conn = sqlite3.connect(f'batch_{batch_id}.db')\n # ... database operations ...\n conn.close()\n print(f\"Processed batch {batch_id} with {len(data_batch)} records\")\n\n # Process multiple data batches concurrently\n large_dataset = list(range(1000))\n batch_size = 100\n\n for i in range(0, len(large_dataset), batch_size):\n batch = large_dataset[i:i + batch_size]\n process_batch(i // batch_size, batch)\n\n multitasking.wait_for_tasks()\n\nPool Management\n---------------\n\nMultiTasking uses execution pools to manage concurrent tasks. You can\ncreate and configure multiple pools for different types of operations:\n\n.. code:: python\n\n import multitasking\n\n # Create a pool for API calls (higher concurrency)\n multitasking.createPool(\"api_pool\", threads=20, engine=\"thread\")\n\n # Create a pool for CPU-intensive tasks (lower concurrency)\n multitasking.createPool(\"cpu_pool\", threads=4, engine=\"process\")\n\n # Switch between pools\n multitasking.use_tag(\"api_pool\") # Future tasks use this pool\n\n @multitasking.task\n def api_call(endpoint):\n # This will use the api_pool\n pass\n\n # Get pool information\n pool_info = multitasking.getPool(\"api_pool\")\n print(f\"Pool: {pool_info}\") # {'engine': 'thread', 'name': 'api_pool', 'threads': 20}\n\nTask Monitoring\n---------------\n\nMonitor and control your tasks with built-in functions:\n\n.. code:: python\n\n import multitasking\n import time\n\n @multitasking.task\n def long_running_task(task_id):\n time.sleep(2)\n print(f\"Task {task_id} completed\")\n\n # Start some tasks\n for i in range(5):\n long_running_task(i)\n\n # Monitor active tasks\n while multitasking.get_active_tasks():\n active_count = len(multitasking.get_active_tasks())\n total_count = len(multitasking.get_list_of_tasks())\n print(f\"Progress: {total_count - active_count}/{total_count} completed\")\n time.sleep(0.5)\n\n print(\"All tasks finished!\")\n\nConfiguration & Settings\n========================\n\nThread/Process Limits\n---------------------\n\nThe default maximum threads equals the number of CPU cores. You can\ncustomize this:\n\n.. code:: python\n\n import multitasking\n\n # Set maximum concurrent tasks\n multitasking.set_max_threads(10)\n\n # Scale based on CPU cores (good rule of thumb for I/O-bound tasks)\n multitasking.set_max_threads(multitasking.config[\"CPU_CORES\"] * 5)\n\n # Unlimited concurrent tasks (use carefully!)\n multitasking.set_max_threads(0)\n\nExecution Engine Selection\n--------------------------\n\nChoose between threading and multiprocessing based on your use case:\n\n.. code:: python\n\n import multitasking\n\n # For I/O-bound tasks (default, recommended for most cases)\n multitasking.set_engine(\"thread\")\n\n # For CPU-bound tasks (avoids GIL limitations)\n multitasking.set_engine(\"process\")\n\n**When to use threads vs processes:**\n\n- **Threads** (default): Best for I/O-bound tasks like file operations,\n network requests, database queries\n- **Processes**: Best for CPU-intensive tasks like mathematical\n computations, image processing, data analysis\n\nAdvanced Pool Configuration\n---------------------------\n\nCreate specialized pools for different workloads:\n\n.. code:: python\n\n import multitasking\n\n # Fast pool for quick API calls\n multitasking.createPool(\"fast_api\", threads=50, engine=\"thread\")\n\n # CPU pool for heavy computation\n multitasking.createPool(\"compute\", threads=2, engine=\"process\")\n\n # Unlimited pool for lightweight tasks\n multitasking.createPool(\"unlimited\", threads=0, engine=\"thread\")\n\n # Get current pool info\n current_pool = multitasking.getPool()\n print(f\"Using pool: {current_pool['name']}\")\n\nBest Practices\n==============\n\nPerformance Tips\n----------------\n\n1. **Choose the right engine**: Use threads for I/O-bound tasks,\n processes for CPU-bound tasks\n2. **Tune thread counts**: Start with CPU cores \u00d7 2-5 for I/O tasks, CPU\n cores for CPU tasks\n3. **Use pools wisely**: Create separate pools for different types of\n operations\n4. **Monitor memory usage**: Each thread/process consumes memory\n5. **Handle exceptions**: Always wrap risky operations in try-catch\n blocks\n\nError Handling\n--------------\n\n.. code:: python\n\n import multitasking\n import requests\n\n @multitasking.task\n def robust_fetch(url):\n try:\n response = requests.get(url, timeout=10)\n response.raise_for_status()\n return response.json()\n except requests.exceptions.Timeout:\n print(f\"\u23f0 Timeout fetching {url}\")\n except requests.exceptions.RequestException as e:\n print(f\"\u274c Error fetching {url}: {e}\")\n except Exception as e:\n print(f\"\ud83d\udca5 Unexpected error: {e}\")\n return None\n\nResource Management\n-------------------\n\n.. code:: python\n\n import multitasking\n import signal\n\n # Graceful shutdown on interrupt\n def cleanup_handler(signum, frame):\n print(\"\ud83d\uded1 Shutting down gracefully...\")\n multitasking.wait_for_tasks()\n print(\"\u2705 All tasks completed\")\n exit(0)\n\n signal.signal(signal.SIGINT, cleanup_handler)\n\n # Your application code here...\n\nTroubleshooting\n===============\n\nCommon Issues\n-------------\n\n**Tasks not running concurrently?** Check if you\u2019re calling\n``wait_for_tasks()`` inside your task loop instead of after it.\n\n**High memory usage?** Reduce the number of concurrent threads or switch\nto a process-based engine.\n\n**Tasks hanging?** Ensure your tasks can complete (avoid infinite loops)\nand handle exceptions properly.\n\n**Import errors?** Make sure you\u2019re using Python 3.6+ and have installed\nthe latest version.\n\nDebugging\n---------\n\n.. code:: python\n\n import multitasking\n\n # Enable task monitoring\n active_tasks = multitasking.get_active_tasks()\n all_tasks = multitasking.get_list_of_tasks()\n\n print(f\"Active: {len(active_tasks)}, Total: {len(all_tasks)}\")\n\n # Get current pool configuration\n pool_info = multitasking.getPool()\n print(f\"Current pool: {pool_info}\")\n\nInstallation\n============\n\n**Requirements:** - Python 3.6 or higher - No external dependencies!\n\n**Install via pip:**\n\n.. code:: bash\n\n $ pip install multitasking --upgrade --no-cache-dir\n\n**Development installation:**\n\n.. code:: bash\n\n $ git clone https://github.com/ranaroussi/multitasking.git\n $ cd multitasking\n $ pip install -e .\n\nCompatibility\n=============\n\n- **Python**: 3.6+ (type hints require 3.6+)\n- **Operating Systems**: Windows, macOS, Linux\n- **Environments**: Works in Jupyter notebooks, scripts, web\n applications\n- **Frameworks**: Compatible with Flask, Django, FastAPI, and other\n Python frameworks\n\nAPI Reference\n=============\n\nDecorators\n----------\n\n- ``@multitasking.task`` - Convert function to asynchronous task\n\nConfiguration Functions\n-----------------------\n\n- ``set_max_threads(count)`` - Set maximum concurrent tasks\n- ``set_engine(type)`` - Choose \u201cthread\u201d or \u201cprocess\u201d engine\n- ``createPool(name, threads, engine)`` - Create custom execution pool\n\nTask Management\n---------------\n\n- ``wait_for_tasks(sleep=0)`` - Wait for all tasks to complete\n- ``get_active_tasks()`` - Get list of running tasks\n- ``get_list_of_tasks()`` - Get list of all tasks\n- ``killall()`` - Emergency shutdown (force exit)\n\n.. _pool-management-1:\n\nPool Management\n---------------\n\n- ``getPool(name=None)`` - Get pool information\n- ``createPool(name, threads=None, engine=None)`` - Create new pool\n\nPerformance Benchmarks\n======================\n\nHere\u2019s a simple benchmark comparing synchronous vs asynchronous\nexecution:\n\n.. code:: python\n\n import multitasking\n import time\n import requests\n\n # Synchronous version\n def sync_fetch():\n start = time.time()\n for i in range(10):\n requests.get(\"https://httpbin.org/delay/1\")\n print(f\"Synchronous: {time.time() - start:.2f}s\")\n\n # Asynchronous version\n @multitasking.task\n def async_fetch():\n requests.get(\"https://httpbin.org/delay/1\")\n\n def concurrent_fetch():\n start = time.time()\n for i in range(10):\n async_fetch()\n multitasking.wait_for_tasks()\n print(f\"Concurrent: {time.time() - start:.2f}s\")\n\n # Results: Synchronous ~10s, Concurrent ~1s (10x speedup!)\n\nContributing\n============\n\nWe welcome contributions! Here\u2019s how you can help:\n\n1. **Report bugs**: Open an issue with details and reproduction steps\n2. **Suggest features**: Share your ideas for improvements\n3. **Submit PRs**: Fork, create a feature branch, and submit a pull\n request\n4. **Improve docs**: Help make the documentation even better\n\n**Development setup:**\n\n.. code:: bash\n\n $ git clone https://github.com/ranaroussi/multitasking.git\n $ cd multitasking\n $ pip install -e .\n $ python -m pytest # Run tests\n\nLegal Stuff\n===========\n\n**MultiTasking** is distributed under the **Apache Software License**.\nSee the `LICENSE.txt <./LICENSE.txt>`__ file in the release for details.\n\nSupport\n=======\n\n- \ud83d\udcd6 **Documentation**: This README and inline code documentation\n- \ud83d\udc1b **Issues**: `GitHub Issues <https://github.com/ranaroussi/multitasking/issues>`__\n- \ud83d\udc26 **Twitter**: [@aroussi](https://twitter.com/aroussi)\n\nChangelog\n=========\n\n**v0.0.12-rc** - \u2728 Added comprehensive type hints throughout the\ncodebase - \ud83d\udcda Enhanced documentation with detailed docstrings and inline\ncomments - \ud83d\udd27 Improved error handling with specific exception types - \ud83d\ude80\nOptimized task creation and pool management logic - \ud83d\udee1\ufe0f Made codebase\nfully PEP8 compliant and linter-friendly - \ud83e\uddf9 Better code organization\nand maintainability\n\n**v0.0.11** (Latest) - Previous stable release\n\n--------------\n\n**Happy Multitasking! \ud83d\ude80**\n\n*Please drop me a note with any feedback you have.*\n\n**Ran Aroussi**\n\n.. |Python version| image:: https://img.shields.io/badge/python-3.6+-blue.svg?style=flat\n :target: https://pypi.python.org/pypi/multitasking\n.. |PyPi version| image:: https://img.shields.io/pypi/v/multitasking.svg?maxAge=60\n :target: https://pypi.python.org/pypi/multitasking\n.. |PyPi status| image:: https://img.shields.io/pypi/status/multitasking.svg?maxAge=2592000\n :target: https://pypi.python.org/pypi/multitasking\n.. |PyPi downloads| image:: https://img.shields.io/pypi/dm/multitasking.svg?maxAge=2592000\n :target: https://pypi.python.org/pypi/multitasking\n.. |CodeFactor| image:: https://www.codefactor.io/repository/github/ranaroussi/multitasking/badge\n :target: https://www.codefactor.io/repository/github/ranaroussi/multitasking\n.. |Star this repo| image:: https://img.shields.io/github/stars/ranaroussi/multitasking.svg?style=social&label=Star&maxAge=60\n :target: https://github.com/ranaroussi/multitasking\n.. |Follow me on twitter| image:: https://img.shields.io/twitter/follow/aroussi.svg?style=social&label=Follow%20Me&maxAge=60\n :target: https://twitter.com/aroussi\n",
"bugtrack_url": null,
"license": "Apache",
"summary": "Non-blocking Python methods using decorators",
"version": "0.0.12",
"project_urls": {
"Homepage": "https://github.com/ranaroussi/multitasking"
},
"split_keywords": [
"multitasking",
"multitask",
"threading",
"async"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "170d74f0293dfd7dcc3837746d0138cbedd60b31701ecc75caec7d3f281feba0",
"md5": "90bd51a0fe6da513052a96468cd3b46e",
"sha256": "2fba2fa8ed8c4b85e227c5dd7dc41c7d658de3b6f247927316175a57349b84d1"
},
"downloads": -1,
"filename": "multitasking-0.0.12.tar.gz",
"has_sig": false,
"md5_digest": "90bd51a0fe6da513052a96468cd3b46e",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 19984,
"upload_time": "2025-07-20T21:27:51",
"upload_time_iso_8601": "2025-07-20T21:27:51.636237Z",
"url": "https://files.pythonhosted.org/packages/17/0d/74f0293dfd7dcc3837746d0138cbedd60b31701ecc75caec7d3f281feba0/multitasking-0.0.12.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-20 21:27:51",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "ranaroussi",
"github_project": "multitasking",
"travis_ci": true,
"coveralls": false,
"github_actions": true,
"requirements": [],
"lcname": "multitasking"
}