mongo-queue-service


Namemongo-queue-service JSON
Version 0.2.1 PyPI version JSON
download
home_pagehttps://github.com/shunyeka/mongo_queue/
SummaryQueue service built on top of mongo.
upload_time2024-05-29 17:39:31
maintainerNone
docs_urlNone
authorAmit Chotaliya
requires_python>=3.6
licenseNone
keywords mongo queue priority queue task queue
VCS
bugtrack_url
requirements pymongo
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # mongo_queue
Task queue built on mongo with channels and unique job id.

[Website](http://www.shunyeka.com) • [autobotAI Automation Platform](https://autobot.live/)

Inspired by [kapilt/mongoqueue](https://github.com/kapilt/mongoqueue)

### Change Log:

#### v0.2.1

- Added optional locking when finding job by id, so it can be marked completed.

#### v0.1.9

- Added method to get pending jobs by channels

#### v0.1.8

- Added `delay` while queuing a new job. This allows user to delay the job execution by x seconds. It is similar to `sleep` but this allows the functionality for the fresh jobs.

#### v0.1.7

- Added index for repair operation.

#### v0.1.6

- Added utility methods for getting running jobs and pending jobs count.

#### v0.1.5

- Fixes issue with the next job method, it was picking up jobs with dependency

#### v0.1.4

- Improved the job.next method. Removed the lookup and removed double operation.

#### v0.1.3

- Added dependency index for faster lookup. Update `complete` method to retry 3 times while pulling the dependencies.
- Corrected pull dependency query by adding filter. It was updating all the documents.

#### v0.1.2

- Added diskUsage for larger queue dependency resolution.

#### v0.1.1

- Added find_and_update for finding the next job added process to pick the next job if the previous is already locked with multiple retries..

#### v0.1.0

- Added optional inc_attempt parameter for job.release. This will allow user to choose if they want to increment the attempt when releasing a job.

#### v0.0.9

- Added method find_by_id to find a job by it's id.

#### v0.0.7

- Added mongo backward compatibility. The aggregate function was using lookup which is only available after Mongo 3.6 (Not avaialble in the DocumentDB), Modified lookup to use old syntax.

#### v0.0.6

- Added sleep and state feature while releasing a job. This provides a way to not pickup job until provided seconds and store state for long running jobs.

#### v0.0.5

- Added depends_on feature. You can create dependency between jobs by supplying depends_on[] with previously created job ids. 

#### v0.0.3

-  Added unique index with job_id and channel. This is to make sure that the same job is not added multiple times. If not job id provided an unique id generated by default. 

## Usage

Install the package.

```
pip install mongo_queue
```

###  Usage Example:

*  Create Queue Object
```python
from mongo_queue.queue import Queue
from pymongo import MongoClient

queue = Queue(MongoClient('localhost', 27017).task_queue, consumer_id="consumer-1", timeout=300, max_attempts=3)
```
* Add task to queue default channel

```python
queue.put({"task_id": 1})
```

* Add task to queue with priority to default channel

```python
queue.put({"task_id": 1}, priority=1)
```

* Add task to queue in a specific channel

```python
queue.put({"task_id": 1}, priority=1, channel="channel_1")
```

* Add task to queue with unique job_id

```python
queue.put({"task_id": 1}, priority=1, channel="channel_1", job_id="x_job")
```

* Add task with dependency

```python
job1 = queue.put({"task_id": 1}, priority=1, channel="channel_1", job_id="x_job")
job2 = queue.put({"task_id": 2}, priority=1, channel="channel_1", job_id="x_job", depends_on=[job1])
```

* Get the next job to be executed from the default channel

```python
job = queue.next()
```

* Get the next job to be executed from a specific channel

```python
job = queue.next(channel="channel_1")
```

* Update job progress for long-running jobs

```python
job.progress(count=10)
```

* Put the job back in queue, this will be picked up again later, this will update attempts after max attempts the job will not be picked up again.
* You can also set state and sleep while releaseing a job
* `sleep` in seconds. The job will not be picked up again till the sleep time expires.
* `state` you can store state in the job for long running jobs.

```python
job.release()
# or
job.release(sleep=10, state={"some": "state"})
```

* Put the job back in queue with error, this will be picked up again later, this will update attempts after max attempts the job will not be picked up again.

```python
job.error("Some error occured")
```

* Complete the job. This will delete job from the database.

```python
job.complete()
```


## Build Steps

```bash
# Setup venv of python version 3.6 and above
python3.9 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
pip install wheel
pip install --upgrade twine
rm -rf dist
# Goto https://pypi.org/manage/account/token/ and capture the `$HOME/.pypirc` config and create the file.
vim $HOME/.pypirc
python3 setup.py sdist bdist_wheel
python3 -m twine upload -r pypi dist/* 
```

# Local Development and Testing

```
export MONGO_URI=mongodb+srv://username:pwd@mongourl/test?retryWrites=true&w=majority
cd mong_queue # Root directory of the package
python3.9 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
python3 -m unittest mongo_queue.test
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/shunyeka/mongo_queue/",
    "name": "mongo-queue-service",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.6",
    "maintainer_email": null,
    "keywords": "mongo, queue, priority queue, task queue",
    "author": "Amit Chotaliya",
    "author_email": "amit@shunyeka.com",
    "download_url": "https://files.pythonhosted.org/packages/f4/8a/f853045fd67cea0b9660711c2a51c5a66153ec13152b82a6ae4e9d3c802c/mongo_queue_service-0.2.1.tar.gz",
    "platform": null,
    "description": "# mongo_queue\nTask queue built on mongo with channels and unique job id.\n\n[Website](http://www.shunyeka.com) \u2022 [autobotAI Automation Platform](https://autobot.live/)\n\nInspired by [kapilt/mongoqueue](https://github.com/kapilt/mongoqueue)\n\n### Change Log:\n\n#### v0.2.1\n\n- Added optional locking when finding job by id, so it can be marked completed.\n\n#### v0.1.9\n\n- Added method to get pending jobs by channels\n\n#### v0.1.8\n\n- Added `delay` while queuing a new job. This allows user to delay the job execution by x seconds. It is similar to `sleep` but this allows the functionality for the fresh jobs.\n\n#### v0.1.7\n\n- Added index for repair operation.\n\n#### v0.1.6\n\n- Added utility methods for getting running jobs and pending jobs count.\n\n#### v0.1.5\n\n- Fixes issue with the next job method, it was picking up jobs with dependency\n\n#### v0.1.4\n\n- Improved the job.next method. Removed the lookup and removed double operation.\n\n#### v0.1.3\n\n- Added dependency index for faster lookup. Update `complete` method to retry 3 times while pulling the dependencies.\n- Corrected pull dependency query by adding filter. It was updating all the documents.\n\n#### v0.1.2\n\n- Added diskUsage for larger queue dependency resolution.\n\n#### v0.1.1\n\n- Added find_and_update for finding the next job added process to pick the next job if the previous is already locked with multiple retries..\n\n#### v0.1.0\n\n- Added optional inc_attempt parameter for job.release. This will allow user to choose if they want to increment the attempt when releasing a job.\n\n#### v0.0.9\n\n- Added method find_by_id to find a job by it's id.\n\n#### v0.0.7\n\n- Added mongo backward compatibility. The aggregate function was using lookup which is only available after Mongo 3.6 (Not avaialble in the DocumentDB), Modified lookup to use old syntax.\n\n#### v0.0.6\n\n- Added sleep and state feature while releasing a job. This provides a way to not pickup job until provided seconds and store state for long running jobs.\n\n#### v0.0.5\n\n- Added depends_on feature. You can create dependency between jobs by supplying depends_on[] with previously created job ids. \n\n#### v0.0.3\n\n-  Added unique index with job_id and channel. This is to make sure that the same job is not added multiple times. If not job id provided an unique id generated by default. \n\n## Usage\n\nInstall the package.\n\n```\npip install mongo_queue\n```\n\n###  Usage Example:\n\n*  Create Queue Object\n```python\nfrom mongo_queue.queue import Queue\nfrom pymongo import MongoClient\n\nqueue = Queue(MongoClient('localhost', 27017).task_queue, consumer_id=\"consumer-1\", timeout=300, max_attempts=3)\n```\n* Add task to queue default channel\n\n```python\nqueue.put({\"task_id\": 1})\n```\n\n* Add task to queue with priority to default channel\n\n```python\nqueue.put({\"task_id\": 1}, priority=1)\n```\n\n* Add task to queue in a specific channel\n\n```python\nqueue.put({\"task_id\": 1}, priority=1, channel=\"channel_1\")\n```\n\n* Add task to queue with unique job_id\n\n```python\nqueue.put({\"task_id\": 1}, priority=1, channel=\"channel_1\", job_id=\"x_job\")\n```\n\n* Add task with dependency\n\n```python\njob1 = queue.put({\"task_id\": 1}, priority=1, channel=\"channel_1\", job_id=\"x_job\")\njob2 = queue.put({\"task_id\": 2}, priority=1, channel=\"channel_1\", job_id=\"x_job\", depends_on=[job1])\n```\n\n* Get the next job to be executed from the default channel\n\n```python\njob = queue.next()\n```\n\n* Get the next job to be executed from a specific channel\n\n```python\njob = queue.next(channel=\"channel_1\")\n```\n\n* Update job progress for long-running jobs\n\n```python\njob.progress(count=10)\n```\n\n* Put the job back in queue, this will be picked up again later, this will update attempts after max attempts the job will not be picked up again.\n* You can also set state and sleep while releaseing a job\n* `sleep` in seconds. The job will not be picked up again till the sleep time expires.\n* `state` you can store state in the job for long running jobs.\n\n```python\njob.release()\n# or\njob.release(sleep=10, state={\"some\": \"state\"})\n```\n\n* Put the job back in queue with error, this will be picked up again later, this will update attempts after max attempts the job will not be picked up again.\n\n```python\njob.error(\"Some error occured\")\n```\n\n* Complete the job. This will delete job from the database.\n\n```python\njob.complete()\n```\n\n\n## Build Steps\n\n```bash\n# Setup venv of python version 3.6 and above\npython3.9 -m venv venv\nsource venv/bin/activate\npip install -r requirements.txt\npip install wheel\npip install --upgrade twine\nrm -rf dist\n# Goto https://pypi.org/manage/account/token/ and capture the `$HOME/.pypirc` config and create the file.\nvim $HOME/.pypirc\npython3 setup.py sdist bdist_wheel\npython3 -m twine upload -r pypi dist/* \n```\n\n# Local Development and Testing\n\n```\nexport MONGO_URI=mongodb+srv://username:pwd@mongourl/test?retryWrites=true&w=majority\ncd mong_queue # Root directory of the package\npython3.9 -m venv venv\nsource venv/bin/activate\npip install -r requirements.txt\npython3 -m unittest mongo_queue.test\n```\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "Queue service built on top of mongo.",
    "version": "0.2.1",
    "project_urls": {
        "Download": "https://github.com/shunyeka/mongo_queue/archive/v0.1.8.tar.gz",
        "Homepage": "https://github.com/shunyeka/mongo_queue/"
    },
    "split_keywords": [
        "mongo",
        " queue",
        " priority queue",
        " task queue"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "3aee3479d95a2ecbeaeb57f73e950bfaf88dac1fef70eea44a3f7f7716234728",
                "md5": "ebc5f1dbc582b5c82533cb3ae0256d06",
                "sha256": "58748ec80a0c2bdbcc74593ebd72874b8ac72a1ce741fbb86f6e2793a70f9960"
            },
            "downloads": -1,
            "filename": "mongo_queue_service-0.2.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "ebc5f1dbc582b5c82533cb3ae0256d06",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.6",
            "size": 11376,
            "upload_time": "2024-05-29T17:39:29",
            "upload_time_iso_8601": "2024-05-29T17:39:29.009011Z",
            "url": "https://files.pythonhosted.org/packages/3a/ee/3479d95a2ecbeaeb57f73e950bfaf88dac1fef70eea44a3f7f7716234728/mongo_queue_service-0.2.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "f48af853045fd67cea0b9660711c2a51c5a66153ec13152b82a6ae4e9d3c802c",
                "md5": "70b15863bfc639d0d2452cc492c793e1",
                "sha256": "160ca2c20a49489cbc82fc6f73c3eadece1d2321b934148ebc76e798c6251274"
            },
            "downloads": -1,
            "filename": "mongo_queue_service-0.2.1.tar.gz",
            "has_sig": false,
            "md5_digest": "70b15863bfc639d0d2452cc492c793e1",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.6",
            "size": 11565,
            "upload_time": "2024-05-29T17:39:31",
            "upload_time_iso_8601": "2024-05-29T17:39:31.325922Z",
            "url": "https://files.pythonhosted.org/packages/f4/8a/f853045fd67cea0b9660711c2a51c5a66153ec13152b82a6ae4e9d3c802c/mongo_queue_service-0.2.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-05-29 17:39:31",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "shunyeka",
    "github_project": "mongo_queue",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "requirements": [
        {
            "name": "pymongo",
            "specs": []
        }
    ],
    "lcname": "mongo-queue-service"
}
        
Elapsed time: 3.01031s