# mongo_queue
Task queue built on mongo with channels and unique job id.
[Website](http://www.shunyeka.com) • [autobotAI Automation Platform](https://autobot.live/)
Inspired by [kapilt/mongoqueue](https://github.com/kapilt/mongoqueue)
### Change Log:
#### v0.2.1
- Added optional locking when finding job by id, so it can be marked completed.
#### v0.1.9
- Added method to get pending jobs by channels
#### v0.1.8
- Added `delay` while queuing a new job. This allows user to delay the job execution by x seconds. It is similar to `sleep` but this allows the functionality for the fresh jobs.
#### v0.1.7
- Added index for repair operation.
#### v0.1.6
- Added utility methods for getting running jobs and pending jobs count.
#### v0.1.5
- Fixes issue with the next job method, it was picking up jobs with dependency
#### v0.1.4
- Improved the job.next method. Removed the lookup and removed double operation.
#### v0.1.3
- Added dependency index for faster lookup. Update `complete` method to retry 3 times while pulling the dependencies.
- Corrected pull dependency query by adding filter. It was updating all the documents.
#### v0.1.2
- Added diskUsage for larger queue dependency resolution.
#### v0.1.1
- Added find_and_update for finding the next job added process to pick the next job if the previous is already locked with multiple retries..
#### v0.1.0
- Added optional inc_attempt parameter for job.release. This will allow user to choose if they want to increment the attempt when releasing a job.
#### v0.0.9
- Added method find_by_id to find a job by it's id.
#### v0.0.7
- Added mongo backward compatibility. The aggregate function was using lookup which is only available after Mongo 3.6 (Not avaialble in the DocumentDB), Modified lookup to use old syntax.
#### v0.0.6
- Added sleep and state feature while releasing a job. This provides a way to not pickup job until provided seconds and store state for long running jobs.
#### v0.0.5
- Added depends_on feature. You can create dependency between jobs by supplying depends_on[] with previously created job ids.
#### v0.0.3
- Added unique index with job_id and channel. This is to make sure that the same job is not added multiple times. If not job id provided an unique id generated by default.
## Usage
Install the package.
```
pip install mongo_queue
```
### Usage Example:
* Create Queue Object
```python
from mongo_queue.queue import Queue
from pymongo import MongoClient
queue = Queue(MongoClient('localhost', 27017).task_queue, consumer_id="consumer-1", timeout=300, max_attempts=3)
```
* Add task to queue default channel
```python
queue.put({"task_id": 1})
```
* Add task to queue with priority to default channel
```python
queue.put({"task_id": 1}, priority=1)
```
* Add task to queue in a specific channel
```python
queue.put({"task_id": 1}, priority=1, channel="channel_1")
```
* Add task to queue with unique job_id
```python
queue.put({"task_id": 1}, priority=1, channel="channel_1", job_id="x_job")
```
* Add task with dependency
```python
job1 = queue.put({"task_id": 1}, priority=1, channel="channel_1", job_id="x_job")
job2 = queue.put({"task_id": 2}, priority=1, channel="channel_1", job_id="x_job", depends_on=[job1])
```
* Get the next job to be executed from the default channel
```python
job = queue.next()
```
* Get the next job to be executed from a specific channel
```python
job = queue.next(channel="channel_1")
```
* Update job progress for long-running jobs
```python
job.progress(count=10)
```
* Put the job back in queue, this will be picked up again later, this will update attempts after max attempts the job will not be picked up again.
* You can also set state and sleep while releaseing a job
* `sleep` in seconds. The job will not be picked up again till the sleep time expires.
* `state` you can store state in the job for long running jobs.
```python
job.release()
# or
job.release(sleep=10, state={"some": "state"})
```
* Put the job back in queue with error, this will be picked up again later, this will update attempts after max attempts the job will not be picked up again.
```python
job.error("Some error occured")
```
* Complete the job. This will delete job from the database.
```python
job.complete()
```
## Build Steps
```bash
# Setup venv of python version 3.6 and above
python3.9 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
pip install wheel
pip install --upgrade twine
rm -rf dist
# Goto https://pypi.org/manage/account/token/ and capture the `$HOME/.pypirc` config and create the file.
vim $HOME/.pypirc
python3 setup.py sdist bdist_wheel
python3 -m twine upload -r pypi dist/*
```
# Local Development and Testing
```
export MONGO_URI=mongodb+srv://username:pwd@mongourl/test?retryWrites=true&w=majority
cd mong_queue # Root directory of the package
python3.9 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
python3 -m unittest mongo_queue.test
```
Raw data
{
"_id": null,
"home_page": "https://github.com/shunyeka/mongo_queue/",
"name": "mongo-queue-service",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.6",
"maintainer_email": null,
"keywords": "mongo, queue, priority queue, task queue",
"author": "Amit Chotaliya",
"author_email": "amit@shunyeka.com",
"download_url": "https://files.pythonhosted.org/packages/f4/8a/f853045fd67cea0b9660711c2a51c5a66153ec13152b82a6ae4e9d3c802c/mongo_queue_service-0.2.1.tar.gz",
"platform": null,
"description": "# mongo_queue\nTask queue built on mongo with channels and unique job id.\n\n[Website](http://www.shunyeka.com) \u2022 [autobotAI Automation Platform](https://autobot.live/)\n\nInspired by [kapilt/mongoqueue](https://github.com/kapilt/mongoqueue)\n\n### Change Log:\n\n#### v0.2.1\n\n- Added optional locking when finding job by id, so it can be marked completed.\n\n#### v0.1.9\n\n- Added method to get pending jobs by channels\n\n#### v0.1.8\n\n- Added `delay` while queuing a new job. This allows user to delay the job execution by x seconds. It is similar to `sleep` but this allows the functionality for the fresh jobs.\n\n#### v0.1.7\n\n- Added index for repair operation.\n\n#### v0.1.6\n\n- Added utility methods for getting running jobs and pending jobs count.\n\n#### v0.1.5\n\n- Fixes issue with the next job method, it was picking up jobs with dependency\n\n#### v0.1.4\n\n- Improved the job.next method. Removed the lookup and removed double operation.\n\n#### v0.1.3\n\n- Added dependency index for faster lookup. Update `complete` method to retry 3 times while pulling the dependencies.\n- Corrected pull dependency query by adding filter. It was updating all the documents.\n\n#### v0.1.2\n\n- Added diskUsage for larger queue dependency resolution.\n\n#### v0.1.1\n\n- Added find_and_update for finding the next job added process to pick the next job if the previous is already locked with multiple retries..\n\n#### v0.1.0\n\n- Added optional inc_attempt parameter for job.release. This will allow user to choose if they want to increment the attempt when releasing a job.\n\n#### v0.0.9\n\n- Added method find_by_id to find a job by it's id.\n\n#### v0.0.7\n\n- Added mongo backward compatibility. The aggregate function was using lookup which is only available after Mongo 3.6 (Not avaialble in the DocumentDB), Modified lookup to use old syntax.\n\n#### v0.0.6\n\n- Added sleep and state feature while releasing a job. This provides a way to not pickup job until provided seconds and store state for long running jobs.\n\n#### v0.0.5\n\n- Added depends_on feature. You can create dependency between jobs by supplying depends_on[] with previously created job ids. \n\n#### v0.0.3\n\n- Added unique index with job_id and channel. This is to make sure that the same job is not added multiple times. If not job id provided an unique id generated by default. \n\n## Usage\n\nInstall the package.\n\n```\npip install mongo_queue\n```\n\n### Usage Example:\n\n* Create Queue Object\n```python\nfrom mongo_queue.queue import Queue\nfrom pymongo import MongoClient\n\nqueue = Queue(MongoClient('localhost', 27017).task_queue, consumer_id=\"consumer-1\", timeout=300, max_attempts=3)\n```\n* Add task to queue default channel\n\n```python\nqueue.put({\"task_id\": 1})\n```\n\n* Add task to queue with priority to default channel\n\n```python\nqueue.put({\"task_id\": 1}, priority=1)\n```\n\n* Add task to queue in a specific channel\n\n```python\nqueue.put({\"task_id\": 1}, priority=1, channel=\"channel_1\")\n```\n\n* Add task to queue with unique job_id\n\n```python\nqueue.put({\"task_id\": 1}, priority=1, channel=\"channel_1\", job_id=\"x_job\")\n```\n\n* Add task with dependency\n\n```python\njob1 = queue.put({\"task_id\": 1}, priority=1, channel=\"channel_1\", job_id=\"x_job\")\njob2 = queue.put({\"task_id\": 2}, priority=1, channel=\"channel_1\", job_id=\"x_job\", depends_on=[job1])\n```\n\n* Get the next job to be executed from the default channel\n\n```python\njob = queue.next()\n```\n\n* Get the next job to be executed from a specific channel\n\n```python\njob = queue.next(channel=\"channel_1\")\n```\n\n* Update job progress for long-running jobs\n\n```python\njob.progress(count=10)\n```\n\n* Put the job back in queue, this will be picked up again later, this will update attempts after max attempts the job will not be picked up again.\n* You can also set state and sleep while releaseing a job\n* `sleep` in seconds. The job will not be picked up again till the sleep time expires.\n* `state` you can store state in the job for long running jobs.\n\n```python\njob.release()\n# or\njob.release(sleep=10, state={\"some\": \"state\"})\n```\n\n* Put the job back in queue with error, this will be picked up again later, this will update attempts after max attempts the job will not be picked up again.\n\n```python\njob.error(\"Some error occured\")\n```\n\n* Complete the job. This will delete job from the database.\n\n```python\njob.complete()\n```\n\n\n## Build Steps\n\n```bash\n# Setup venv of python version 3.6 and above\npython3.9 -m venv venv\nsource venv/bin/activate\npip install -r requirements.txt\npip install wheel\npip install --upgrade twine\nrm -rf dist\n# Goto https://pypi.org/manage/account/token/ and capture the `$HOME/.pypirc` config and create the file.\nvim $HOME/.pypirc\npython3 setup.py sdist bdist_wheel\npython3 -m twine upload -r pypi dist/* \n```\n\n# Local Development and Testing\n\n```\nexport MONGO_URI=mongodb+srv://username:pwd@mongourl/test?retryWrites=true&w=majority\ncd mong_queue # Root directory of the package\npython3.9 -m venv venv\nsource venv/bin/activate\npip install -r requirements.txt\npython3 -m unittest mongo_queue.test\n```\n",
"bugtrack_url": null,
"license": null,
"summary": "Queue service built on top of mongo.",
"version": "0.2.1",
"project_urls": {
"Download": "https://github.com/shunyeka/mongo_queue/archive/v0.1.8.tar.gz",
"Homepage": "https://github.com/shunyeka/mongo_queue/"
},
"split_keywords": [
"mongo",
" queue",
" priority queue",
" task queue"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "3aee3479d95a2ecbeaeb57f73e950bfaf88dac1fef70eea44a3f7f7716234728",
"md5": "ebc5f1dbc582b5c82533cb3ae0256d06",
"sha256": "58748ec80a0c2bdbcc74593ebd72874b8ac72a1ce741fbb86f6e2793a70f9960"
},
"downloads": -1,
"filename": "mongo_queue_service-0.2.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "ebc5f1dbc582b5c82533cb3ae0256d06",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.6",
"size": 11376,
"upload_time": "2024-05-29T17:39:29",
"upload_time_iso_8601": "2024-05-29T17:39:29.009011Z",
"url": "https://files.pythonhosted.org/packages/3a/ee/3479d95a2ecbeaeb57f73e950bfaf88dac1fef70eea44a3f7f7716234728/mongo_queue_service-0.2.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "f48af853045fd67cea0b9660711c2a51c5a66153ec13152b82a6ae4e9d3c802c",
"md5": "70b15863bfc639d0d2452cc492c793e1",
"sha256": "160ca2c20a49489cbc82fc6f73c3eadece1d2321b934148ebc76e798c6251274"
},
"downloads": -1,
"filename": "mongo_queue_service-0.2.1.tar.gz",
"has_sig": false,
"md5_digest": "70b15863bfc639d0d2452cc492c793e1",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.6",
"size": 11565,
"upload_time": "2024-05-29T17:39:31",
"upload_time_iso_8601": "2024-05-29T17:39:31.325922Z",
"url": "https://files.pythonhosted.org/packages/f4/8a/f853045fd67cea0b9660711c2a51c5a66153ec13152b82a6ae4e9d3c802c/mongo_queue_service-0.2.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-05-29 17:39:31",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "shunyeka",
"github_project": "mongo_queue",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"requirements": [
{
"name": "pymongo",
"specs": []
}
],
"lcname": "mongo-queue-service"
}