brq


Namebrq JSON
Version 0.4.1 PyPI version JSON
download
home_pageNone
Summarybrq
upload_time2024-09-13 02:37:10
maintainerNone
docs_urlNone
authorNone
requires_python>=3.10
licenseBSD 3-Clause License
keywords brq
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage
            ![](https://img.shields.io/github/license/wh1isper/brq)
![](https://img.shields.io/github/v/release/wh1isper/brq)
![](https://img.shields.io/docker/image-size/wh1isper/brq)
![](https://img.shields.io/pypi/dm/brq)
![](https://img.shields.io/github/last-commit/wh1isper/brq)
![](https://img.shields.io/pypi/pyversions/brq)
[![codecov](https://codecov.io/gh/Wh1isper/brq/graph/badge.svg?token=84A7BQZIS2)](https://codecov.io/gh/Wh1isper/brq)

# brq

`brq` is a lightweight python library for job queue based on the redis stream, with no central server and self-organized by `Consumer`.

![Architecture.png](./assets/Architecture.png)

## Prerequisites

Redis >= 6.2, tested with latest redis 6/7 docker image. Recommended to use redis>=7, which includes more inspection features.

## Install

`pip install brq`

## Feature

> See [examples](%22./examples%22) for running examples.

- Defer job and automatic retry error job
- Dead queue for unprocessable job, you can process it later
- Multiple consumers in one consumer group
- No scheduler needed, consumer handles itself

## Configuration

If using `BrqConfig`(for example, `@task`), you can use a `.env` file and environment variables to configure brq. The prefix of environment variables is `BRQ_`.

> For example, `BRQ_REDIS_PORT=6379 python consumer.py` for specifying redis port.

See [configs](./brq/configs.py) for more details.

## Echo job overview

### Producer

```python
import os

from brq.producer import Producer
from brq.configs import BrqConfig


async def main():
    config = BrqConfig()
    async with config.open_redis_client() as async_redis_client:
        await Producer(
            async_redis_client,
            redis_prefix=config.redis_key_prefix,
            redis_seperator=config.redis_key_seperator,
            max_message_len=config.producer_max_message_length,
        ).run_job("echo", ["hello"])


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())
```

### Consumer

The only thing you need is `@task`, and the target function can be `sync` or `async` and `sync` function will be converted to `async` function and run in a thread automatically.

```python
from brq import task


@task
def echo(message):
    print(f"Received message: {message}")


if __name__ == "__main__":
    # Run the task once, for local debug
    # echo("hello")

    # Run as a daemon
    echo.serve()
```

This is the same as the following, the classic way...But more flexible.

```python
import os

from brq.consumer import Consumer
from brq.daemon import Daemon
from brq.tools import get_redis_client, get_redis_url


async def echo(message):
    print(message)


async def main():
    redis_url = get_redis_url(
        host=os.getenv("REDIS_HOST", "localhost"),
        port=int(os.getenv("REDIS_PORT", 6379)),
        db=int(os.getenv("REDIS_DB", 0)),
        cluster=bool(os.getenv("REDIS_CLUSTER", "false") in ["True", "true", "1"]),
        tls=bool(os.getenv("REDIS_TLS", "false") in ["True", "true", "1"]),
        username=os.getenv("REDIS_USERNAME", ""),
        password=os.getenv("REDIS_PASSWORD", ""),
    )
    async with get_redis_client(redis_url) as async_redis_client:
        daemon = Daemon(Consumer(async_redis_client, echo))
        await daemon.run_forever()


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())
```

## Technical details: deferred jobs

We can use `defer_until` as a `datetime` or `defer_hours`+`defer_minutes`+`defer_seconds` to calculate a timestamp based on current redis timestamp. And use `unique` to set the job to be unique or not.

By default, `unique=True` means `Job` with the **exactly** same `function_name`, `args` and `kwargs` will be unique, which allows the same `Job` to add into the deferred queue more than once. In this case, we differentiate tasks by the current redis timestamp(`Job.create_at`) and an additional uuid(`Job.uid`), just like `redis stream` did.

If `unique=False`, the same `Job` will be added into the deferred queue only once. Duplicates will update the job's defer time. In this case, you can use your own uuid in `args`(or `kwargs`) to differentiate `Job`.

## Develop

Install pre-commit before commit

```
pip install pre-commit
pre-commit install
```

Install package locally

```
pip install -e .[test]
```

Run unit-test before PR

```
pytest -v
```

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "brq",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.10",
    "maintainer_email": null,
    "keywords": "brq",
    "author": null,
    "author_email": "wh1isper <jizhongsheng957@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/c7/d8/c8cd5cb08fde18c254a353865e737893994235cf161af94bebbf1c320cc0/brq-0.4.1.tar.gz",
    "platform": null,
    "description": "![](https://img.shields.io/github/license/wh1isper/brq)\n![](https://img.shields.io/github/v/release/wh1isper/brq)\n![](https://img.shields.io/docker/image-size/wh1isper/brq)\n![](https://img.shields.io/pypi/dm/brq)\n![](https://img.shields.io/github/last-commit/wh1isper/brq)\n![](https://img.shields.io/pypi/pyversions/brq)\n[![codecov](https://codecov.io/gh/Wh1isper/brq/graph/badge.svg?token=84A7BQZIS2)](https://codecov.io/gh/Wh1isper/brq)\n\n# brq\n\n`brq` is a lightweight python library for job queue based on the redis stream, with no central server and self-organized by `Consumer`.\n\n![Architecture.png](./assets/Architecture.png)\n\n## Prerequisites\n\nRedis >= 6.2, tested with latest redis 6/7 docker image. Recommended to use redis>=7, which includes more inspection features.\n\n## Install\n\n`pip install brq`\n\n## Feature\n\n> See [examples](%22./examples%22) for running examples.\n\n- Defer job and automatic retry error job\n- Dead queue for unprocessable job, you can process it later\n- Multiple consumers in one consumer group\n- No scheduler needed, consumer handles itself\n\n## Configuration\n\nIf using `BrqConfig`(for example, `@task`), you can use a `.env` file and environment variables to configure brq. The prefix of environment variables is `BRQ_`.\n\n> For example, `BRQ_REDIS_PORT=6379 python consumer.py` for specifying redis port.\n\nSee [configs](./brq/configs.py) for more details.\n\n## Echo job overview\n\n### Producer\n\n```python\nimport os\n\nfrom brq.producer import Producer\nfrom brq.configs import BrqConfig\n\n\nasync def main():\n    config = BrqConfig()\n    async with config.open_redis_client() as async_redis_client:\n        await Producer(\n            async_redis_client,\n            redis_prefix=config.redis_key_prefix,\n            redis_seperator=config.redis_key_seperator,\n            max_message_len=config.producer_max_message_length,\n        ).run_job(\"echo\", [\"hello\"])\n\n\nif __name__ == \"__main__\":\n    import asyncio\n\n    asyncio.run(main())\n```\n\n### Consumer\n\nThe only thing you need is `@task`, and the target function can be `sync` or `async` and `sync` function will be converted to `async` function and run in a thread automatically.\n\n```python\nfrom brq import task\n\n\n@task\ndef echo(message):\n    print(f\"Received message: {message}\")\n\n\nif __name__ == \"__main__\":\n    # Run the task once, for local debug\n    # echo(\"hello\")\n\n    # Run as a daemon\n    echo.serve()\n```\n\nThis is the same as the following, the classic way...But more flexible.\n\n```python\nimport os\n\nfrom brq.consumer import Consumer\nfrom brq.daemon import Daemon\nfrom brq.tools import get_redis_client, get_redis_url\n\n\nasync def echo(message):\n    print(message)\n\n\nasync def main():\n    redis_url = get_redis_url(\n        host=os.getenv(\"REDIS_HOST\", \"localhost\"),\n        port=int(os.getenv(\"REDIS_PORT\", 6379)),\n        db=int(os.getenv(\"REDIS_DB\", 0)),\n        cluster=bool(os.getenv(\"REDIS_CLUSTER\", \"false\") in [\"True\", \"true\", \"1\"]),\n        tls=bool(os.getenv(\"REDIS_TLS\", \"false\") in [\"True\", \"true\", \"1\"]),\n        username=os.getenv(\"REDIS_USERNAME\", \"\"),\n        password=os.getenv(\"REDIS_PASSWORD\", \"\"),\n    )\n    async with get_redis_client(redis_url) as async_redis_client:\n        daemon = Daemon(Consumer(async_redis_client, echo))\n        await daemon.run_forever()\n\n\nif __name__ == \"__main__\":\n    import asyncio\n\n    asyncio.run(main())\n```\n\n## Technical details: deferred jobs\n\nWe can use `defer_until` as a `datetime` or `defer_hours`+`defer_minutes`+`defer_seconds` to calculate a timestamp based on current redis timestamp. And use `unique` to set the job to be unique or not.\n\nBy default, `unique=True` means `Job` with the **exactly** same `function_name`, `args` and `kwargs` will be unique, which allows the same `Job` to add into the deferred queue more than once. In this case, we differentiate tasks by the current redis timestamp(`Job.create_at`) and an additional uuid(`Job.uid`), just like `redis stream` did.\n\nIf `unique=False`, the same `Job` will be added into the deferred queue only once. Duplicates will update the job's defer time. In this case, you can use your own uuid in `args`(or `kwargs`) to differentiate `Job`.\n\n## Develop\n\nInstall pre-commit before commit\n\n```\npip install pre-commit\npre-commit install\n```\n\nInstall package locally\n\n```\npip install -e .[test]\n```\n\nRun unit-test before PR\n\n```\npytest -v\n```\n",
    "bugtrack_url": null,
    "license": "BSD 3-Clause License",
    "summary": "brq",
    "version": "0.4.1",
    "project_urls": {
        "Source": "https://github.com/b-scheduler/brq"
    },
    "split_keywords": [
        "brq"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "58585c78d9493e38e3557bf5be6adbb1a5cf368cd5c13e72f3c12bbcaef35da4",
                "md5": "715c25f8f2effcc2c3c0ff841e0fb57c",
                "sha256": "f689f35ed93ea99ae97a156580ddcca7ea46ae5d01b19315532521c047c602b9"
            },
            "downloads": -1,
            "filename": "brq-0.4.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "715c25f8f2effcc2c3c0ff841e0fb57c",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.10",
            "size": 21129,
            "upload_time": "2024-09-13T02:37:09",
            "upload_time_iso_8601": "2024-09-13T02:37:09.654507Z",
            "url": "https://files.pythonhosted.org/packages/58/58/5c78d9493e38e3557bf5be6adbb1a5cf368cd5c13e72f3c12bbcaef35da4/brq-0.4.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "c7d8c8cd5cb08fde18c254a353865e737893994235cf161af94bebbf1c320cc0",
                "md5": "deb7c7a76a3198559a24f627270774a6",
                "sha256": "0332cf2bccb0e296c5ad7c45e02150123beeed35de291ccfcfdd6f4afd201644"
            },
            "downloads": -1,
            "filename": "brq-0.4.1.tar.gz",
            "has_sig": false,
            "md5_digest": "deb7c7a76a3198559a24f627270774a6",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.10",
            "size": 79652,
            "upload_time": "2024-09-13T02:37:10",
            "upload_time_iso_8601": "2024-09-13T02:37:10.861577Z",
            "url": "https://files.pythonhosted.org/packages/c7/d8/c8cd5cb08fde18c254a353865e737893994235cf161af94bebbf1c320cc0/brq-0.4.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-09-13 02:37:10",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "b-scheduler",
    "github_project": "brq",
    "travis_ci": false,
    "coveralls": true,
    "github_actions": true,
    "lcname": "brq"
}
        
Elapsed time: 0.82403s