Name | tqueue JSON |
Version |
0.0.22
JSON |
| download |
home_page | |
Summary | Threading Queue |
upload_time | 2023-12-26 08:58:40 |
maintainer | |
docs_url | None |
author | |
requires_python | >=3.7 |
license | MIT License |
keywords |
queue
threading
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# tqueue package
This library allows you to do your tasks in multiple threads easily.
This is helpful when you have a lot of data to process.
Assume that you have a large list of items to process. You need to write a producer to put items in the queue one by one.
Workers will get data from the queue and then process it. Putting data into a queue should be quicker than processing it (worker).
### Installation
```bash
pip install tqueue
```
### Usage
1. Import library
```python
from tqueue import ThreadingQueue
```
2. Create a worker
- Create a worker function that gets the data as the first parameter
- Worker can be a normal function or a coroutine function
- Worker will be called in child threads
```python
def worker(data):
pass
async def worker2(data):
pass
```
3. Set threading for a producer
Apply the threading for a producer:
- a. Set the number of threads and the worker
- b. Put data into the queue
- You can also use ThreadingQueue as a context manager
```python
def producer():
# Start the queue
with ThreadingQueue(40, worker) as tq:
...
tq.put(data)
```
- You can also use it async
```python
async def producer():
# Start the queue
async with ThreadingQueue(40, worker) as tq:
...
await tq.put(data)
```
4. Run producer
* Async producer:
```python
await producer()
```
or
```python
asyncio.run(producer())
```
### Note
1. You can add more keyword params for all workers running in threads via `worker_params`
2. Apart from the number of threads and the worker, you can set `log_dir` to store logs to file
3. and `worker_params_builder` to generate parameters for each worker.
4. `on_thread_close` is an optional param as a function that is helpful when you need to close the database connection when a thread done
5. Apart from all the above params, the rest of the keyword params will be passed to the worker.
* If you change the lib from the 0.0.14 version to the newer, please update the code to fix the bug:
```python
# 0.0.14
with ThreadingQueue(num_of_threads, worker) as tq:
...
await tq.put(data)
```
```python
# From 0.0.15
# Sync
with ThreadingQueue(num_of_threads, worker) as tq:
...
tq.put(data)
# Async
async with ThreadingQueue(num_of_threads, worker) as tq:
...
await tq.put(data)
```
* In both sync and async cases, you can provide a worker as an async function.
* The async version is a little bit better in performance because it uses `asyncio.sleep` to wait when the queue is full compared to `time.sleep` in the sync version. In most cases, the difference in performance is not much.
### Example
```python
import json
import pymysql
import asyncio
from tqueue import ThreadingQueue
NUM_OF_THREADS = 40
def get_db_connection():
return pymysql.connect(host='localhost',
user='root',
password='123456',
database='example',
cursorclass=pymysql.cursors.DictCursor)
# Build params for the worker, the params will be persistent with the thread
# This function is called when init a new thread or retry
def worker_params_builder():
# Threads use db connection separately
conn = get_db_connection()
conn.autocommit(1)
cursor = conn.cursor()
return {"cursor": cursor, "connection": conn}
# To clear resources: close database connection, ...
# This function is called when the thread ends
def on_close_thread(cursor, connection):
cursor.close()
connection.close()
def worker(image_info, cursor, uid: int, **kwargs):
# Update image info into database
sql = "UPDATE images SET width = %s, height = %s, uid = %s WHERE id = %s"
cursor.execute(sql, (image_info["width"], image_info["height"], uid, image_info["id"]))
def producer(source_file: str):
with ThreadingQueue(
NUM_OF_THREADS, worker,
log_dir=f"logs/update-images",
worker_params_builder=worker_params_builder,
on_close_thread=on_close_thread,
params={"uid": 123},
retry_count=1
) as tq:
with open(source_file, 'r') as f:
for line in f:
if not line:
continue
data = json.loads(line)
tq.put(data)
if __name__ == "__main__":
producer("images.jsonl")
```
### Development
#### Build project
1. Update the version number in file `src/tqueue/__version__.py`
2. Update the Change log
3. Build and publish the changes
```bash
python3 -m build
python3 -m twine upload dist/*
```
## Release Information
### Fixed
* No exception when log to file anymore
---
[Full changelog](https://github.com/haiz/tqueue/blob/main/CHANGELOG.md)
Raw data
{
"_id": null,
"home_page": "",
"name": "tqueue",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.7",
"maintainer_email": "",
"keywords": "queue,threading",
"author": "",
"author_email": "Hai Cao <cthai83@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/6c/af/6294b40828e91efbc6690db72f66726ad58a85e161e2834e07045a542327/tqueue-0.0.22.tar.gz",
"platform": null,
"description": "# tqueue package\n\nThis library allows you to do your tasks in multiple threads easily.\n\nThis is helpful when you have a lot of data to process.\n\nAssume that you have a large list of items to process. You need to write a producer to put items in the queue one by one.\n\nWorkers will get data from the queue and then process it. Putting data into a queue should be quicker than processing it (worker).\n\n### Installation\n\n```bash\npip install tqueue\n```\n\n\n### Usage\n1. Import library\n```python\nfrom tqueue import ThreadingQueue\n```\n2. Create a worker\n- Create a worker function that gets the data as the first parameter\n- Worker can be a normal function or a coroutine function\n- Worker will be called in child threads\n\n```python\ndef worker(data):\n pass\nasync def worker2(data):\n pass\n```\n\n3. Set threading for a producer\nApply the threading for a producer:\n- a. Set the number of threads and the worker\n- b. Put data into the queue\n\n- You can also use ThreadingQueue as a context manager\n\n```python\ndef producer():\n # Start the queue\n with ThreadingQueue(40, worker) as tq:\n ...\n tq.put(data)\n```\n\n- You can also use it async\n\n```python\nasync def producer():\n # Start the queue\n async with ThreadingQueue(40, worker) as tq:\n ...\n await tq.put(data)\n```\n\n4. Run producer\n\n* Async producer:\n```python\nawait producer()\n```\nor\n```python\nasyncio.run(producer())\n```\n\n\n### Note\n1. You can add more keyword params for all workers running in threads via `worker_params`\n2. Apart from the number of threads and the worker, you can set `log_dir` to store logs to file \n3. and `worker_params_builder` to generate parameters for each worker.\n4. `on_thread_close` is an optional param as a function that is helpful when you need to close the database connection when a thread done\n5. Apart from all the above params, the rest of the keyword params will be passed to the worker.\n\n* If you change the lib from the 0.0.14 version to the newer, please update the code to fix the bug:\n```python\n# 0.0.14\nwith ThreadingQueue(num_of_threads, worker) as tq:\n ...\n await tq.put(data)\n```\n\n```python\n# From 0.0.15\n\n# Sync\nwith ThreadingQueue(num_of_threads, worker) as tq:\n ...\n tq.put(data)\n\n# Async\nasync with ThreadingQueue(num_of_threads, worker) as tq:\n ...\n await tq.put(data)\n```\n\n* In both sync and async cases, you can provide a worker as an async function.\n* The async version is a little bit better in performance because it uses `asyncio.sleep` to wait when the queue is full compared to `time.sleep` in the sync version. In most cases, the difference in performance is not much.\n\n### Example\n\n```python\nimport json\nimport pymysql\nimport asyncio\n\nfrom tqueue import ThreadingQueue\n\n\nNUM_OF_THREADS = 40\n\n\ndef get_db_connection():\n return pymysql.connect(host='localhost',\n user='root',\n password='123456',\n database='example',\n cursorclass=pymysql.cursors.DictCursor)\n\n\n# Build params for the worker, the params will be persistent with the thread\n# This function is called when init a new thread or retry\ndef worker_params_builder():\n # Threads use db connection separately\n conn = get_db_connection()\n conn.autocommit(1)\n cursor = conn.cursor()\n return {\"cursor\": cursor, \"connection\": conn}\n\n\n# To clear resources: close database connection, ...\n# This function is called when the thread ends\ndef on_close_thread(cursor, connection):\n cursor.close()\n connection.close()\n\n\ndef worker(image_info, cursor, uid: int, **kwargs):\n # Update image info into database\n \n sql = \"UPDATE images SET width = %s, height = %s, uid = %s WHERE id = %s\"\n cursor.execute(sql, (image_info[\"width\"], image_info[\"height\"], uid, image_info[\"id\"]))\n \n\ndef producer(source_file: str):\n with ThreadingQueue(\n NUM_OF_THREADS, worker,\n log_dir=f\"logs/update-images\",\n worker_params_builder=worker_params_builder,\n on_close_thread=on_close_thread,\n params={\"uid\": 123},\n retry_count=1\n ) as tq:\n with open(source_file, 'r') as f:\n for line in f:\n if not line:\n continue\n data = json.loads(line)\n \n tq.put(data)\n\n\nif __name__ == \"__main__\":\n producer(\"images.jsonl\")\n```\n\n### Development\n\n#### Build project\n\n1. Update the version number in file `src/tqueue/__version__.py`\n2. Update the Change log\n3. Build and publish the changes\n\n```bash\npython3 -m build\npython3 -m twine upload dist/*\n```\n\n## Release Information\n\n### Fixed\n\n* No exception when log to file anymore\n\n\n---\n\n[Full changelog](https://github.com/haiz/tqueue/blob/main/CHANGELOG.md)\n",
"bugtrack_url": null,
"license": "MIT License",
"summary": "Threading Queue",
"version": "0.0.22",
"project_urls": {
"Changelog": "https://github.com/haiz/tqueue/blob/main/CHANGELOG.md",
"Documentation": "https://github.com/haiz/tqueue/blob/main/README.md",
"Examples": "https://github.com/haiz/tqueue/blob/main/examples",
"Homepage": "https://github.com/haiz/tqueue",
"Source": "https://github.com/haiz/tqueue"
},
"split_keywords": [
"queue",
"threading"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "ab5b390570ee45c14ea035410a545f1924a54d5cb1f3d178adf7b6d6b081239c",
"md5": "eaca3c69a4e5f71e7202488fc43edbf4",
"sha256": "de07b4087fabd8cdabdc6c4b257e6fe07a00c49eabae113610f19e2003f11985"
},
"downloads": -1,
"filename": "tqueue-0.0.22-py3-none-any.whl",
"has_sig": false,
"md5_digest": "eaca3c69a4e5f71e7202488fc43edbf4",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.7",
"size": 7557,
"upload_time": "2023-12-26T08:58:38",
"upload_time_iso_8601": "2023-12-26T08:58:38.962244Z",
"url": "https://files.pythonhosted.org/packages/ab/5b/390570ee45c14ea035410a545f1924a54d5cb1f3d178adf7b6d6b081239c/tqueue-0.0.22-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "6caf6294b40828e91efbc6690db72f66726ad58a85e161e2834e07045a542327",
"md5": "372755cf523b3c253a3618c4e4480e7e",
"sha256": "05897566a417e433b1d479ebe92edf872f3c3e3d5507a0cd2874b808023b05ea"
},
"downloads": -1,
"filename": "tqueue-0.0.22.tar.gz",
"has_sig": false,
"md5_digest": "372755cf523b3c253a3618c4e4480e7e",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.7",
"size": 7541,
"upload_time": "2023-12-26T08:58:40",
"upload_time_iso_8601": "2023-12-26T08:58:40.924702Z",
"url": "https://files.pythonhosted.org/packages/6c/af/6294b40828e91efbc6690db72f66726ad58a85e161e2834e07045a542327/tqueue-0.0.22.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-12-26 08:58:40",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "haiz",
"github_project": "tqueue",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "tqueue"
}