Name | processq JSON |
Version |
0.0.1
JSON |
| download |
home_page | |
Summary | Process Queue |
upload_time | 2024-01-09 08:06:50 |
maintainer | |
docs_url | None |
author | |
requires_python | >=3.7 |
license | MIT License |
keywords |
process
queue
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# processq package
This library allows you to do your tasks in multiple processes easily.
This is helpful when you have a lot of data to process.
Assume that you have a large list of items to process. You need to write a producer to put items in the queue one by one.
Workers will get data from the queue and then process it. Putting data into a queue should be quicker than processing it (worker).
### Installation
```bash
pip install processq
```
### Usage
1. Import library
```python
from processq import ProcessQueue
```
2. Create a worker
- Create a worker function that gets the data as the first parameter
- Worker can be a normal function or a coroutine function
- Worker will be called in child processes
```python
def worker(data):
pass
async def worker2(data):
pass
```
3. Set process for a producer
Apply the process for a producer:
- a. Set the number of processes and the worker
- b. Put data into the queue
- You can also use ProcessQueue as a context manager
```python
def producer():
# Start the queue
with ProcessQueue(40, worker) as pq:
...
pq.put(data)
```
- You can also use it async
```python
async def producer():
# Start the queue
async with ProcessQueue(40, worker) as pq:
...
await pq.put(data)
```
4. Run producer
* Async producer:
```python
await producer()
```
or
```python
asyncio.run(producer())
```
### Note
1. You can add more keyword params for all workers running in processes via `worker_params`
2. Apart from the number of processes and the worker, you can set `log_dir` to store logs to file
3. and `worker_params_builder` to generate parameters for each worker.
4. `on_process_close` is an optional param as a function that is helpful when you need to close the database connection when a process done
5. Apart from all the above params, the rest of the keyword params will be passed to the worker.
* If you change the lib from the 0.0.14 version to the newer, please update the code to fix the bug:
```python
# 0.0.14
with ProcessQueue(num_of_processes, worker) as pq:
...
await pq.put(data)
```
```python
# From 0.0.15
# Sync
with ProcessQueue(num_of_processes, worker) as pq:
...
pq.put(data)
# Async
async with ProcessQueue(num_of_processes, worker) as pq:
...
await pq.put(data)
```
* In both sync and async cases, you can provide a worker as an async function.
* The async version is a little bit better in performance because it uses `asyncio.sleep` to wait when the queue is full compared to `time.sleep` in the sync version. In most cases, the difference in performance is not much.
### Example
```python
import json
import pymysql
import asyncio
from processq import ProcessQueue
NUM_OF_PROCESSES = 40
def get_db_connection():
return pymysql.connect(host='localhost',
user='root',
password='123456',
database='example',
cursorclass=pymysql.cursors.DictCursor)
# Build params for the worker, the params will be persistent with the process
# This function is called when init a new process or retry
def worker_params_builder():
# Processes use db connection separately
conn = get_db_connection()
conn.autocommit(1)
cursor = conn.cursor()
return {"cursor": cursor, "connection": conn}
# To clear resources: close database connection, ...
# This function is called when the process ends
def on_close_process(cursor, connection):
cursor.close()
connection.close()
def worker(image_info, cursor, uid: int, **kwargs):
# Update image info into database
sql = "UPDATE images SET width = %s, height = %s, uid = %s WHERE id = %s"
cursor.execute(sql, (image_info["width"], image_info["height"], uid, image_info["id"]))
def producer(source_file: str):
with ProcessQueue(
NUM_OF_PROCESSES, worker,
log_dir=f"logs/update-images",
worker_params_builder=worker_params_builder,
on_close_process=on_close_process,
params={"uid": 123},
retry_count=1
) as pq:
with open(source_file, 'r') as f:
for line in f:
if not line:
continue
data = json.loads(line)
pq.put(data)
if __name__ == "__main__":
producer("images.jsonl")
```
### Development
#### Build project
1. Update the version number in file `src/processq/__version__.py`
2. Update the Change log
3. Build and publish the changes
```bash
python3 -m build
python3 -m twine upload dist/*
```
## Release Information
### Added
* Todo
---
[Full changelog](https://github.com/haiz/processq/blob/main/CHANGELOG.md)
Raw data
{
"_id": null,
"home_page": "",
"name": "processq",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.7",
"maintainer_email": "",
"keywords": "process,queue",
"author": "",
"author_email": "Hai Cao <cthai83@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/3d/f7/b51203c0c21718b99231b820d6d33b236828de220d5ff613a54c3a7698d5/processq-0.0.1.tar.gz",
"platform": null,
"description": "# processq package\n\nThis library allows you to do your tasks in multiple processes easily.\n\nThis is helpful when you have a lot of data to process.\n\nAssume that you have a large list of items to process. You need to write a producer to put items in the queue one by one.\n\nWorkers will get data from the queue and then process it. Putting data into a queue should be quicker than processing it (worker).\n\n### Installation\n\n```bash\npip install processq\n```\n\n\n### Usage\n1. Import library\n\n```python\nfrom processq import ProcessQueue\n```\n2. Create a worker\n- Create a worker function that gets the data as the first parameter\n- Worker can be a normal function or a coroutine function\n- Worker will be called in child processes\n\n```python\ndef worker(data):\n pass\nasync def worker2(data):\n pass\n```\n\n3. Set process for a producer\nApply the process for a producer:\n- a. Set the number of processes and the worker\n- b. Put data into the queue\n\n- You can also use ProcessQueue as a context manager\n\n```python\ndef producer():\n # Start the queue\n with ProcessQueue(40, worker) as pq:\n ...\n pq.put(data)\n```\n\n- You can also use it async\n\n```python\nasync def producer():\n # Start the queue\n async with ProcessQueue(40, worker) as pq:\n ...\n await pq.put(data)\n```\n\n4. Run producer\n\n* Async producer:\n```python\nawait producer()\n```\nor\n```python\nasyncio.run(producer())\n```\n\n\n### Note\n1. You can add more keyword params for all workers running in processes via `worker_params`\n2. Apart from the number of processes and the worker, you can set `log_dir` to store logs to file \n3. and `worker_params_builder` to generate parameters for each worker.\n4. `on_process_close` is an optional param as a function that is helpful when you need to close the database connection when a process done\n5. Apart from all the above params, the rest of the keyword params will be passed to the worker.\n\n* If you change the lib from the 0.0.14 version to the newer, please update the code to fix the bug:\n```python\n# 0.0.14\nwith ProcessQueue(num_of_processes, worker) as pq:\n ...\n await pq.put(data)\n```\n\n```python\n# From 0.0.15\n\n# Sync\nwith ProcessQueue(num_of_processes, worker) as pq:\n ...\n pq.put(data)\n\n# Async\nasync with ProcessQueue(num_of_processes, worker) as pq:\n ...\n await pq.put(data)\n```\n\n* In both sync and async cases, you can provide a worker as an async function.\n* The async version is a little bit better in performance because it uses `asyncio.sleep` to wait when the queue is full compared to `time.sleep` in the sync version. In most cases, the difference in performance is not much.\n\n### Example\n\n```python\nimport json\nimport pymysql\nimport asyncio\n\nfrom processq import ProcessQueue\n\nNUM_OF_PROCESSES = 40\n\n\ndef get_db_connection():\n return pymysql.connect(host='localhost',\n user='root',\n password='123456',\n database='example',\n cursorclass=pymysql.cursors.DictCursor)\n\n\n# Build params for the worker, the params will be persistent with the process\n# This function is called when init a new process or retry\ndef worker_params_builder():\n # Processes use db connection separately\n conn = get_db_connection()\n conn.autocommit(1)\n cursor = conn.cursor()\n return {\"cursor\": cursor, \"connection\": conn}\n\n\n# To clear resources: close database connection, ...\n# This function is called when the process ends\ndef on_close_process(cursor, connection):\n cursor.close()\n connection.close()\n\n\ndef worker(image_info, cursor, uid: int, **kwargs):\n # Update image info into database\n\n sql = \"UPDATE images SET width = %s, height = %s, uid = %s WHERE id = %s\"\n cursor.execute(sql, (image_info[\"width\"], image_info[\"height\"], uid, image_info[\"id\"]))\n\n\ndef producer(source_file: str):\n with ProcessQueue(\n NUM_OF_PROCESSES, worker,\n log_dir=f\"logs/update-images\",\n worker_params_builder=worker_params_builder,\n on_close_process=on_close_process,\n params={\"uid\": 123},\n retry_count=1\n ) as pq:\n with open(source_file, 'r') as f:\n for line in f:\n if not line:\n continue\n data = json.loads(line)\n\n pq.put(data)\n\n\nif __name__ == \"__main__\":\n producer(\"images.jsonl\")\n```\n\n### Development\n\n#### Build project\n\n1. Update the version number in file `src/processq/__version__.py`\n2. Update the Change log\n3. Build and publish the changes\n\n```bash\npython3 -m build\npython3 -m twine upload dist/*\n```\n\n## Release Information\n\n### Added\n\n* Todo\n\n\n---\n\n[Full changelog](https://github.com/haiz/processq/blob/main/CHANGELOG.md)\n",
"bugtrack_url": null,
"license": "MIT License",
"summary": "Process Queue",
"version": "0.0.1",
"project_urls": {
"Changelog": "https://github.com/haiz/processq/blob/main/CHANGELOG.md",
"Documentation": "https://github.com/haiz/processq/blob/main/README.md",
"Examples": "https://github.com/haiz/processq/blob/main/examples",
"Homepage": "https://github.com/haiz/processq",
"Source": "https://github.com/haiz/processq"
},
"split_keywords": [
"process",
"queue"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "9bc8c8fbf61d4c3afa9ee2bbbec851e90cd597629d4c6d28fdbfca6adde1701b",
"md5": "316393fa0d47a83179302c8ebf3d33d5",
"sha256": "480994bbe86e769fc12bed15ed52d8c58ac9dd8b978903840b7cbc65ea5125a1"
},
"downloads": -1,
"filename": "processq-0.0.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "316393fa0d47a83179302c8ebf3d33d5",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.7",
"size": 7492,
"upload_time": "2024-01-09T08:06:48",
"upload_time_iso_8601": "2024-01-09T08:06:48.894684Z",
"url": "https://files.pythonhosted.org/packages/9b/c8/c8fbf61d4c3afa9ee2bbbec851e90cd597629d4c6d28fdbfca6adde1701b/processq-0.0.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "3df7b51203c0c21718b99231b820d6d33b236828de220d5ff613a54c3a7698d5",
"md5": "6ee09577b5a861d662c6aca3dfe2abdc",
"sha256": "9692d0ab17e2d608a6cbcdfad075a672e196e2982ebb450244180da1b3d08fba"
},
"downloads": -1,
"filename": "processq-0.0.1.tar.gz",
"has_sig": false,
"md5_digest": "6ee09577b5a861d662c6aca3dfe2abdc",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.7",
"size": 7012,
"upload_time": "2024-01-09T08:06:50",
"upload_time_iso_8601": "2024-01-09T08:06:50.899325Z",
"url": "https://files.pythonhosted.org/packages/3d/f7/b51203c0c21718b99231b820d6d33b236828de220d5ff613a54c3a7698d5/processq-0.0.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-01-09 08:06:50",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "haiz",
"github_project": "processq",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "processq"
}