tqueue


Nametqueue JSON
Version 0.0.22 PyPI version JSON
download
home_page
SummaryThreading Queue
upload_time2023-12-26 08:58:40
maintainer
docs_urlNone
author
requires_python>=3.7
licenseMIT License
keywords queue threading
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # tqueue package

This library allows you to do your tasks in multiple threads easily.

This is helpful when you have a lot of data to process.

Assume that you have a large list of items to process. You need to write a producer to put items in the queue one by one.

Workers will get data from the queue and then process it. Putting data into a queue should be quicker than processing it (worker).

### Installation

```bash
pip install tqueue
```


### Usage
1. Import library
```python
from tqueue import ThreadingQueue
```
2. Create a worker
- Create a worker function that gets the data as the first parameter
- Worker can be a normal function or a coroutine function
- Worker will be called in child threads

```python
def worker(data):
    pass
async def worker2(data):
    pass
```

3. Set threading for a producer
Apply the threading for a producer:
- a. Set the number of threads and the worker
- b. Put data into the queue

- You can also use ThreadingQueue as a context manager

```python
def producer():
    # Start the queue
    with ThreadingQueue(40, worker) as tq:
        ...
        tq.put(data)
```

- You can also use it async

```python
async def producer():
    # Start the queue
    async with ThreadingQueue(40, worker) as tq:
        ...
        await tq.put(data)
```

4. Run producer

* Async producer:
```python
await producer()
```
or
```python
asyncio.run(producer())
```


### Note
1. You can add more keyword params for all workers running in threads via `worker_params`
2. Apart from the number of threads and the worker, you can set `log_dir` to store logs to file 
3. and `worker_params_builder` to generate parameters for each worker.
4. `on_thread_close` is an optional param as a function that is helpful when you need to close the database connection when a thread done
5. Apart from all the above params, the rest of the keyword params will be passed to the worker.

* If you change the lib from the 0.0.14 version to the newer, please update the code to fix the bug:
```python
# 0.0.14
with ThreadingQueue(num_of_threads, worker) as tq:
    ...
    await tq.put(data)
```

```python
# From 0.0.15

# Sync
with ThreadingQueue(num_of_threads, worker) as tq:
    ...
    tq.put(data)

# Async
async with ThreadingQueue(num_of_threads, worker) as tq:
    ...
    await tq.put(data)
```

* In both sync and async cases, you can provide a worker as an async function.
* The async version is a little bit better in performance because it uses `asyncio.sleep` to wait when the queue is full compared to `time.sleep` in the sync version. In most cases, the difference in performance is not much.

### Example

```python
import json
import pymysql
import asyncio

from tqueue import ThreadingQueue


NUM_OF_THREADS = 40


def get_db_connection():
    return pymysql.connect(host='localhost',
                           user='root',
                           password='123456',
                           database='example',
                           cursorclass=pymysql.cursors.DictCursor)


# Build params for the worker, the params will be persistent with the thread
# This function is called when init a new thread or retry
def worker_params_builder():
    # Threads use db connection separately
    conn = get_db_connection()
    conn.autocommit(1)
    cursor = conn.cursor()
    return {"cursor": cursor, "connection": conn}


# To clear resources: close database connection, ...
# This function is called when the thread ends
def on_close_thread(cursor, connection):
    cursor.close()
    connection.close()


def worker(image_info, cursor, uid: int, **kwargs):
    # Update image info into database
    
    sql = "UPDATE images SET width = %s, height = %s, uid = %s WHERE id = %s"
    cursor.execute(sql, (image_info["width"], image_info["height"], uid, image_info["id"]))
    

def producer(source_file: str):
    with ThreadingQueue(
        NUM_OF_THREADS, worker,
        log_dir=f"logs/update-images",
        worker_params_builder=worker_params_builder,
        on_close_thread=on_close_thread,
        params={"uid": 123},
        retry_count=1
    ) as tq:
        with open(source_file, 'r') as f:
            for line in f:
                if not line:
                    continue
                data = json.loads(line)
    
                tq.put(data)


if __name__ == "__main__":
    producer("images.jsonl")
```

### Development

#### Build project

1. Update the version number in file `src/tqueue/__version__.py`
2. Update the Change log
3. Build and publish the changes

```bash
python3 -m build
python3 -m twine upload dist/*
```

## Release Information

### Fixed

* No exception when log to file anymore


---

[Full changelog](https://github.com/haiz/tqueue/blob/main/CHANGELOG.md)

            

Raw data

            {
    "_id": null,
    "home_page": "",
    "name": "tqueue",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.7",
    "maintainer_email": "",
    "keywords": "queue,threading",
    "author": "",
    "author_email": "Hai Cao <cthai83@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/6c/af/6294b40828e91efbc6690db72f66726ad58a85e161e2834e07045a542327/tqueue-0.0.22.tar.gz",
    "platform": null,
    "description": "# tqueue package\n\nThis library allows you to do your tasks in multiple threads easily.\n\nThis is helpful when you have a lot of data to process.\n\nAssume that you have a large list of items to process. You need to write a producer to put items in the queue one by one.\n\nWorkers will get data from the queue and then process it. Putting data into a queue should be quicker than processing it (worker).\n\n### Installation\n\n```bash\npip install tqueue\n```\n\n\n### Usage\n1. Import library\n```python\nfrom tqueue import ThreadingQueue\n```\n2. Create a worker\n- Create a worker function that gets the data as the first parameter\n- Worker can be a normal function or a coroutine function\n- Worker will be called in child threads\n\n```python\ndef worker(data):\n    pass\nasync def worker2(data):\n    pass\n```\n\n3. Set threading for a producer\nApply the threading for a producer:\n- a. Set the number of threads and the worker\n- b. Put data into the queue\n\n- You can also use ThreadingQueue as a context manager\n\n```python\ndef producer():\n    # Start the queue\n    with ThreadingQueue(40, worker) as tq:\n        ...\n        tq.put(data)\n```\n\n- You can also use it async\n\n```python\nasync def producer():\n    # Start the queue\n    async with ThreadingQueue(40, worker) as tq:\n        ...\n        await tq.put(data)\n```\n\n4. Run producer\n\n* Async producer:\n```python\nawait producer()\n```\nor\n```python\nasyncio.run(producer())\n```\n\n\n### Note\n1. You can add more keyword params for all workers running in threads via `worker_params`\n2. Apart from the number of threads and the worker, you can set `log_dir` to store logs to file \n3. and `worker_params_builder` to generate parameters for each worker.\n4. `on_thread_close` is an optional param as a function that is helpful when you need to close the database connection when a thread done\n5. Apart from all the above params, the rest of the keyword params will be passed to the worker.\n\n* If you change the lib from the 0.0.14 version to the newer, please update the code to fix the bug:\n```python\n# 0.0.14\nwith ThreadingQueue(num_of_threads, worker) as tq:\n    ...\n    await tq.put(data)\n```\n\n```python\n# From 0.0.15\n\n# Sync\nwith ThreadingQueue(num_of_threads, worker) as tq:\n    ...\n    tq.put(data)\n\n# Async\nasync with ThreadingQueue(num_of_threads, worker) as tq:\n    ...\n    await tq.put(data)\n```\n\n* In both sync and async cases, you can provide a worker as an async function.\n* The async version is a little bit better in performance because it uses `asyncio.sleep` to wait when the queue is full compared to `time.sleep` in the sync version. In most cases, the difference in performance is not much.\n\n### Example\n\n```python\nimport json\nimport pymysql\nimport asyncio\n\nfrom tqueue import ThreadingQueue\n\n\nNUM_OF_THREADS = 40\n\n\ndef get_db_connection():\n    return pymysql.connect(host='localhost',\n                           user='root',\n                           password='123456',\n                           database='example',\n                           cursorclass=pymysql.cursors.DictCursor)\n\n\n# Build params for the worker, the params will be persistent with the thread\n# This function is called when init a new thread or retry\ndef worker_params_builder():\n    # Threads use db connection separately\n    conn = get_db_connection()\n    conn.autocommit(1)\n    cursor = conn.cursor()\n    return {\"cursor\": cursor, \"connection\": conn}\n\n\n# To clear resources: close database connection, ...\n# This function is called when the thread ends\ndef on_close_thread(cursor, connection):\n    cursor.close()\n    connection.close()\n\n\ndef worker(image_info, cursor, uid: int, **kwargs):\n    # Update image info into database\n    \n    sql = \"UPDATE images SET width = %s, height = %s, uid = %s WHERE id = %s\"\n    cursor.execute(sql, (image_info[\"width\"], image_info[\"height\"], uid, image_info[\"id\"]))\n    \n\ndef producer(source_file: str):\n    with ThreadingQueue(\n        NUM_OF_THREADS, worker,\n        log_dir=f\"logs/update-images\",\n        worker_params_builder=worker_params_builder,\n        on_close_thread=on_close_thread,\n        params={\"uid\": 123},\n        retry_count=1\n    ) as tq:\n        with open(source_file, 'r') as f:\n            for line in f:\n                if not line:\n                    continue\n                data = json.loads(line)\n    \n                tq.put(data)\n\n\nif __name__ == \"__main__\":\n    producer(\"images.jsonl\")\n```\n\n### Development\n\n#### Build project\n\n1. Update the version number in file `src/tqueue/__version__.py`\n2. Update the Change log\n3. Build and publish the changes\n\n```bash\npython3 -m build\npython3 -m twine upload dist/*\n```\n\n## Release Information\n\n### Fixed\n\n* No exception when log to file anymore\n\n\n---\n\n[Full changelog](https://github.com/haiz/tqueue/blob/main/CHANGELOG.md)\n",
    "bugtrack_url": null,
    "license": "MIT License",
    "summary": "Threading Queue",
    "version": "0.0.22",
    "project_urls": {
        "Changelog": "https://github.com/haiz/tqueue/blob/main/CHANGELOG.md",
        "Documentation": "https://github.com/haiz/tqueue/blob/main/README.md",
        "Examples": "https://github.com/haiz/tqueue/blob/main/examples",
        "Homepage": "https://github.com/haiz/tqueue",
        "Source": "https://github.com/haiz/tqueue"
    },
    "split_keywords": [
        "queue",
        "threading"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "ab5b390570ee45c14ea035410a545f1924a54d5cb1f3d178adf7b6d6b081239c",
                "md5": "eaca3c69a4e5f71e7202488fc43edbf4",
                "sha256": "de07b4087fabd8cdabdc6c4b257e6fe07a00c49eabae113610f19e2003f11985"
            },
            "downloads": -1,
            "filename": "tqueue-0.0.22-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "eaca3c69a4e5f71e7202488fc43edbf4",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.7",
            "size": 7557,
            "upload_time": "2023-12-26T08:58:38",
            "upload_time_iso_8601": "2023-12-26T08:58:38.962244Z",
            "url": "https://files.pythonhosted.org/packages/ab/5b/390570ee45c14ea035410a545f1924a54d5cb1f3d178adf7b6d6b081239c/tqueue-0.0.22-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "6caf6294b40828e91efbc6690db72f66726ad58a85e161e2834e07045a542327",
                "md5": "372755cf523b3c253a3618c4e4480e7e",
                "sha256": "05897566a417e433b1d479ebe92edf872f3c3e3d5507a0cd2874b808023b05ea"
            },
            "downloads": -1,
            "filename": "tqueue-0.0.22.tar.gz",
            "has_sig": false,
            "md5_digest": "372755cf523b3c253a3618c4e4480e7e",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.7",
            "size": 7541,
            "upload_time": "2023-12-26T08:58:40",
            "upload_time_iso_8601": "2023-12-26T08:58:40.924702Z",
            "url": "https://files.pythonhosted.org/packages/6c/af/6294b40828e91efbc6690db72f66726ad58a85e161e2834e07045a542327/tqueue-0.0.22.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-12-26 08:58:40",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "haiz",
    "github_project": "tqueue",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "tqueue"
}
        
Elapsed time: 0.18975s