processq


Nameprocessq JSON
Version 0.0.1 PyPI version JSON
download
home_page
SummaryProcess Queue
upload_time2024-01-09 08:06:50
maintainer
docs_urlNone
author
requires_python>=3.7
licenseMIT License
keywords process queue
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # processq package

This library allows you to do your tasks in multiple processes easily.

This is helpful when you have a lot of data to process.

Assume that you have a large list of items to process. You need to write a producer to put items in the queue one by one.

Workers will get data from the queue and then process it. Putting data into a queue should be quicker than processing it (worker).

### Installation

```bash
pip install processq
```


### Usage
1. Import library

```python
from processq import ProcessQueue
```
2. Create a worker
- Create a worker function that gets the data as the first parameter
- Worker can be a normal function or a coroutine function
- Worker will be called in child processes

```python
def worker(data):
    pass
async def worker2(data):
    pass
```

3. Set process for a producer
Apply the process for a producer:
- a. Set the number of processes and the worker
- b. Put data into the queue

- You can also use ProcessQueue as a context manager

```python
def producer():
    # Start the queue
    with ProcessQueue(40, worker) as pq:
        ...
        pq.put(data)
```

- You can also use it async

```python
async def producer():
    # Start the queue
    async with ProcessQueue(40, worker) as pq:
        ...
        await pq.put(data)
```

4. Run producer

* Async producer:
```python
await producer()
```
or
```python
asyncio.run(producer())
```


### Note
1. You can add more keyword params for all workers running in processes via `worker_params`
2. Apart from the number of processes and the worker, you can set `log_dir` to store logs to file 
3. and `worker_params_builder` to generate parameters for each worker.
4. `on_process_close` is an optional param as a function that is helpful when you need to close the database connection when a process done
5. Apart from all the above params, the rest of the keyword params will be passed to the worker.

* If you change the lib from the 0.0.14 version to the newer, please update the code to fix the bug:
```python
# 0.0.14
with ProcessQueue(num_of_processes, worker) as pq:
    ...
    await pq.put(data)
```

```python
# From 0.0.15

# Sync
with ProcessQueue(num_of_processes, worker) as pq:
    ...
    pq.put(data)

# Async
async with ProcessQueue(num_of_processes, worker) as pq:
    ...
    await pq.put(data)
```

* In both sync and async cases, you can provide a worker as an async function.
* The async version is a little bit better in performance because it uses `asyncio.sleep` to wait when the queue is full compared to `time.sleep` in the sync version. In most cases, the difference in performance is not much.

### Example

```python
import json
import pymysql
import asyncio

from processq import ProcessQueue

NUM_OF_PROCESSES = 40


def get_db_connection():
    return pymysql.connect(host='localhost',
                           user='root',
                           password='123456',
                           database='example',
                           cursorclass=pymysql.cursors.DictCursor)


# Build params for the worker, the params will be persistent with the process
# This function is called when init a new process or retry
def worker_params_builder():
    # Processes use db connection separately
    conn = get_db_connection()
    conn.autocommit(1)
    cursor = conn.cursor()
    return {"cursor": cursor, "connection": conn}


# To clear resources: close database connection, ...
# This function is called when the process ends
def on_close_process(cursor, connection):
    cursor.close()
    connection.close()


def worker(image_info, cursor, uid: int, **kwargs):
    # Update image info into database

    sql = "UPDATE images SET width = %s, height = %s, uid = %s WHERE id = %s"
    cursor.execute(sql, (image_info["width"], image_info["height"], uid, image_info["id"]))


def producer(source_file: str):
    with ProcessQueue(
            NUM_OF_PROCESSES, worker,
            log_dir=f"logs/update-images",
            worker_params_builder=worker_params_builder,
            on_close_process=on_close_process,
            params={"uid": 123},
            retry_count=1
    ) as pq:
        with open(source_file, 'r') as f:
            for line in f:
                if not line:
                    continue
                data = json.loads(line)

                pq.put(data)


if __name__ == "__main__":
    producer("images.jsonl")
```

### Development

#### Build project

1. Update the version number in file `src/processq/__version__.py`
2. Update the Change log
3. Build and publish the changes

```bash
python3 -m build
python3 -m twine upload dist/*
```

## Release Information

### Added

* Todo


---

[Full changelog](https://github.com/haiz/processq/blob/main/CHANGELOG.md)

            

Raw data

            {
    "_id": null,
    "home_page": "",
    "name": "processq",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.7",
    "maintainer_email": "",
    "keywords": "process,queue",
    "author": "",
    "author_email": "Hai Cao <cthai83@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/3d/f7/b51203c0c21718b99231b820d6d33b236828de220d5ff613a54c3a7698d5/processq-0.0.1.tar.gz",
    "platform": null,
    "description": "# processq package\n\nThis library allows you to do your tasks in multiple processes easily.\n\nThis is helpful when you have a lot of data to process.\n\nAssume that you have a large list of items to process. You need to write a producer to put items in the queue one by one.\n\nWorkers will get data from the queue and then process it. Putting data into a queue should be quicker than processing it (worker).\n\n### Installation\n\n```bash\npip install processq\n```\n\n\n### Usage\n1. Import library\n\n```python\nfrom processq import ProcessQueue\n```\n2. Create a worker\n- Create a worker function that gets the data as the first parameter\n- Worker can be a normal function or a coroutine function\n- Worker will be called in child processes\n\n```python\ndef worker(data):\n    pass\nasync def worker2(data):\n    pass\n```\n\n3. Set process for a producer\nApply the process for a producer:\n- a. Set the number of processes and the worker\n- b. Put data into the queue\n\n- You can also use ProcessQueue as a context manager\n\n```python\ndef producer():\n    # Start the queue\n    with ProcessQueue(40, worker) as pq:\n        ...\n        pq.put(data)\n```\n\n- You can also use it async\n\n```python\nasync def producer():\n    # Start the queue\n    async with ProcessQueue(40, worker) as pq:\n        ...\n        await pq.put(data)\n```\n\n4. Run producer\n\n* Async producer:\n```python\nawait producer()\n```\nor\n```python\nasyncio.run(producer())\n```\n\n\n### Note\n1. You can add more keyword params for all workers running in processes via `worker_params`\n2. Apart from the number of processes and the worker, you can set `log_dir` to store logs to file \n3. and `worker_params_builder` to generate parameters for each worker.\n4. `on_process_close` is an optional param as a function that is helpful when you need to close the database connection when a process done\n5. Apart from all the above params, the rest of the keyword params will be passed to the worker.\n\n* If you change the lib from the 0.0.14 version to the newer, please update the code to fix the bug:\n```python\n# 0.0.14\nwith ProcessQueue(num_of_processes, worker) as pq:\n    ...\n    await pq.put(data)\n```\n\n```python\n# From 0.0.15\n\n# Sync\nwith ProcessQueue(num_of_processes, worker) as pq:\n    ...\n    pq.put(data)\n\n# Async\nasync with ProcessQueue(num_of_processes, worker) as pq:\n    ...\n    await pq.put(data)\n```\n\n* In both sync and async cases, you can provide a worker as an async function.\n* The async version is a little bit better in performance because it uses `asyncio.sleep` to wait when the queue is full compared to `time.sleep` in the sync version. In most cases, the difference in performance is not much.\n\n### Example\n\n```python\nimport json\nimport pymysql\nimport asyncio\n\nfrom processq import ProcessQueue\n\nNUM_OF_PROCESSES = 40\n\n\ndef get_db_connection():\n    return pymysql.connect(host='localhost',\n                           user='root',\n                           password='123456',\n                           database='example',\n                           cursorclass=pymysql.cursors.DictCursor)\n\n\n# Build params for the worker, the params will be persistent with the process\n# This function is called when init a new process or retry\ndef worker_params_builder():\n    # Processes use db connection separately\n    conn = get_db_connection()\n    conn.autocommit(1)\n    cursor = conn.cursor()\n    return {\"cursor\": cursor, \"connection\": conn}\n\n\n# To clear resources: close database connection, ...\n# This function is called when the process ends\ndef on_close_process(cursor, connection):\n    cursor.close()\n    connection.close()\n\n\ndef worker(image_info, cursor, uid: int, **kwargs):\n    # Update image info into database\n\n    sql = \"UPDATE images SET width = %s, height = %s, uid = %s WHERE id = %s\"\n    cursor.execute(sql, (image_info[\"width\"], image_info[\"height\"], uid, image_info[\"id\"]))\n\n\ndef producer(source_file: str):\n    with ProcessQueue(\n            NUM_OF_PROCESSES, worker,\n            log_dir=f\"logs/update-images\",\n            worker_params_builder=worker_params_builder,\n            on_close_process=on_close_process,\n            params={\"uid\": 123},\n            retry_count=1\n    ) as pq:\n        with open(source_file, 'r') as f:\n            for line in f:\n                if not line:\n                    continue\n                data = json.loads(line)\n\n                pq.put(data)\n\n\nif __name__ == \"__main__\":\n    producer(\"images.jsonl\")\n```\n\n### Development\n\n#### Build project\n\n1. Update the version number in file `src/processq/__version__.py`\n2. Update the Change log\n3. Build and publish the changes\n\n```bash\npython3 -m build\npython3 -m twine upload dist/*\n```\n\n## Release Information\n\n### Added\n\n* Todo\n\n\n---\n\n[Full changelog](https://github.com/haiz/processq/blob/main/CHANGELOG.md)\n",
    "bugtrack_url": null,
    "license": "MIT License",
    "summary": "Process Queue",
    "version": "0.0.1",
    "project_urls": {
        "Changelog": "https://github.com/haiz/processq/blob/main/CHANGELOG.md",
        "Documentation": "https://github.com/haiz/processq/blob/main/README.md",
        "Examples": "https://github.com/haiz/processq/blob/main/examples",
        "Homepage": "https://github.com/haiz/processq",
        "Source": "https://github.com/haiz/processq"
    },
    "split_keywords": [
        "process",
        "queue"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "9bc8c8fbf61d4c3afa9ee2bbbec851e90cd597629d4c6d28fdbfca6adde1701b",
                "md5": "316393fa0d47a83179302c8ebf3d33d5",
                "sha256": "480994bbe86e769fc12bed15ed52d8c58ac9dd8b978903840b7cbc65ea5125a1"
            },
            "downloads": -1,
            "filename": "processq-0.0.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "316393fa0d47a83179302c8ebf3d33d5",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.7",
            "size": 7492,
            "upload_time": "2024-01-09T08:06:48",
            "upload_time_iso_8601": "2024-01-09T08:06:48.894684Z",
            "url": "https://files.pythonhosted.org/packages/9b/c8/c8fbf61d4c3afa9ee2bbbec851e90cd597629d4c6d28fdbfca6adde1701b/processq-0.0.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "3df7b51203c0c21718b99231b820d6d33b236828de220d5ff613a54c3a7698d5",
                "md5": "6ee09577b5a861d662c6aca3dfe2abdc",
                "sha256": "9692d0ab17e2d608a6cbcdfad075a672e196e2982ebb450244180da1b3d08fba"
            },
            "downloads": -1,
            "filename": "processq-0.0.1.tar.gz",
            "has_sig": false,
            "md5_digest": "6ee09577b5a861d662c6aca3dfe2abdc",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.7",
            "size": 7012,
            "upload_time": "2024-01-09T08:06:50",
            "upload_time_iso_8601": "2024-01-09T08:06:50.899325Z",
            "url": "https://files.pythonhosted.org/packages/3d/f7/b51203c0c21718b99231b820d6d33b236828de220d5ff613a54c3a7698d5/processq-0.0.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-01-09 08:06:50",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "haiz",
    "github_project": "processq",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "processq"
}
        
Elapsed time: 0.17083s