taskiq-nats


Nametaskiq-nats JSON
Version 0.4.0 PyPI version JSON
download
home_pagehttps://github.com/taskiq-python/taskiq-nats
SummaryNATS integration for taskiq
upload_time2024-02-19 12:27:41
maintainer
docs_urlNone
authortaskiq-team
requires_python>=3.8.1,<4.0.0
license
keywords taskiq tasks distributed async nats result_backend
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Taskiq NATS

Taskiq-nats is a plugin for taskiq that adds NATS broker.
This package has support for NATS JetStream.

## Installation

To use this project you must have installed core taskiq library:

```bash
pip install taskiq taskiq-nats
```

## Usage

Here's a minimal setup example with a broker and one task.

### Default NATS broker.
```python
import asyncio
from taskiq_nats import NatsBroker, JetStreamBroker

broker = NatsBroker(
    [
        "nats://nats1:4222",
        "nats://nats2:4222",
    ],
    queue="random_queue_name",
)


@broker.task
async def my_lovely_task():
    print("I love taskiq")


async def main():
    await broker.startup()

    await my_lovely_task.kiq()

    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

```
### NATS broker based on JetStream
```python
import asyncio
from taskiq_nats import (
    PushBasedJetStreamBroker,
    PullBasedJetStreamBroker
)

broker = PushBasedJetStreamBroker(
    servers=[
        "nats://nats1:4222",
        "nats://nats2:4222",
    ],
    queue="awesome_queue_name",
)

# Or you can use pull based variant
broker = PullBasedJetStreamBroker(
    servers=[
        "nats://nats1:4222",
        "nats://nats2:4222",
    ],
    durable="awesome_durable_consumer_name",
)


@broker.task
async def my_lovely_task():
    print("I love taskiq")


async def main():
    await broker.startup()

    await my_lovely_task.kiq()

    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())
```

## NatsBroker configuration

Here's the constructor parameters:

* `servers` - a single string or a list of strings with nats nodes addresses.
* `subject` - name of the subect that will be used to exchange tasks betwee workers and clients.
* `queue` - optional name of the queue. By default NatsBroker broadcasts task to all workers,
    but if you want to handle every task only once, you need to supply this argument.
* `result_backend` - custom result backend.
* `task_id_generator` - custom function to generate task ids.
* Every other keyword argument will be sent to `nats.connect` function.

## JetStreamBroker configuration
### Common
* `servers` - a single string or a list of strings with nats nodes addresses.
* `subject` - name of the subect that will be used to exchange tasks betwee workers and clients.
* `stream_name` - name of the stream where subjects will be located.
* `queue` - a single string or a list of strings with nats nodes addresses.
* `result_backend` - custom result backend.
* `task_id_generator` - custom function to generate task ids.
* `stream_config` - a config for stream.
* `consumer_config` - a config for consumer.

### PushBasedJetStreamBroker
* `queue` - name of the queue. It's used to share messages between different consumers.

### PullBasedJetStreamBroker
* `durable` - durable name of the consumer. It's used to share messages between different consumers.
* `pull_consume_batch` - maximum number of message that can be fetched each time.
* `pull_consume_timeout` - timeout for messages fetch. If there is no messages, we start fetching messages again.

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/taskiq-python/taskiq-nats",
    "name": "taskiq-nats",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.8.1,<4.0.0",
    "maintainer_email": "",
    "keywords": "taskiq,tasks,distributed,async,nats,result_backend",
    "author": "taskiq-team",
    "author_email": "taskiq@norely.com",
    "download_url": "https://files.pythonhosted.org/packages/df/d4/8b551fd393d67fbc13a31703222e7740033664a0fd2a4ac7ff00a072997d/taskiq_nats-0.4.0.tar.gz",
    "platform": null,
    "description": "# Taskiq NATS\n\nTaskiq-nats is a plugin for taskiq that adds NATS broker.\nThis package has support for NATS JetStream.\n\n## Installation\n\nTo use this project you must have installed core taskiq library:\n\n```bash\npip install taskiq taskiq-nats\n```\n\n## Usage\n\nHere's a minimal setup example with a broker and one task.\n\n### Default NATS broker.\n```python\nimport asyncio\nfrom taskiq_nats import NatsBroker, JetStreamBroker\n\nbroker = NatsBroker(\n    [\n        \"nats://nats1:4222\",\n        \"nats://nats2:4222\",\n    ],\n    queue=\"random_queue_name\",\n)\n\n\n@broker.task\nasync def my_lovely_task():\n    print(\"I love taskiq\")\n\n\nasync def main():\n    await broker.startup()\n\n    await my_lovely_task.kiq()\n\n    await broker.shutdown()\n\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n\n```\n### NATS broker based on JetStream\n```python\nimport asyncio\nfrom taskiq_nats import (\n    PushBasedJetStreamBroker,\n    PullBasedJetStreamBroker\n)\n\nbroker = PushBasedJetStreamBroker(\n    servers=[\n        \"nats://nats1:4222\",\n        \"nats://nats2:4222\",\n    ],\n    queue=\"awesome_queue_name\",\n)\n\n# Or you can use pull based variant\nbroker = PullBasedJetStreamBroker(\n    servers=[\n        \"nats://nats1:4222\",\n        \"nats://nats2:4222\",\n    ],\n    durable=\"awesome_durable_consumer_name\",\n)\n\n\n@broker.task\nasync def my_lovely_task():\n    print(\"I love taskiq\")\n\n\nasync def main():\n    await broker.startup()\n\n    await my_lovely_task.kiq()\n\n    await broker.shutdown()\n\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n```\n\n## NatsBroker configuration\n\nHere's the constructor parameters:\n\n* `servers` - a single string or a list of strings with nats nodes addresses.\n* `subject` - name of the subect that will be used to exchange tasks betwee workers and clients.\n* `queue` - optional name of the queue. By default NatsBroker broadcasts task to all workers,\n    but if you want to handle every task only once, you need to supply this argument.\n* `result_backend` - custom result backend.\n* `task_id_generator` - custom function to generate task ids.\n* Every other keyword argument will be sent to `nats.connect` function.\n\n## JetStreamBroker configuration\n### Common\n* `servers` - a single string or a list of strings with nats nodes addresses.\n* `subject` - name of the subect that will be used to exchange tasks betwee workers and clients.\n* `stream_name` - name of the stream where subjects will be located.\n* `queue` - a single string or a list of strings with nats nodes addresses.\n* `result_backend` - custom result backend.\n* `task_id_generator` - custom function to generate task ids.\n* `stream_config` - a config for stream.\n* `consumer_config` - a config for consumer.\n\n### PushBasedJetStreamBroker\n* `queue` - name of the queue. It's used to share messages between different consumers.\n\n### PullBasedJetStreamBroker\n* `durable` - durable name of the consumer. It's used to share messages between different consumers.\n* `pull_consume_batch` - maximum number of message that can be fetched each time.\n* `pull_consume_timeout` - timeout for messages fetch. If there is no messages, we start fetching messages again.\n",
    "bugtrack_url": null,
    "license": "",
    "summary": "NATS integration for taskiq",
    "version": "0.4.0",
    "project_urls": {
        "Homepage": "https://github.com/taskiq-python/taskiq-nats",
        "Repository": "https://github.com/taskiq-python/taskiq-nats"
    },
    "split_keywords": [
        "taskiq",
        "tasks",
        "distributed",
        "async",
        "nats",
        "result_backend"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "18e5c013e5b65624ac470df434d0f649baeb78b58d0a5e244c0722510ea72c13",
                "md5": "5ed293522dee1843ee798d56b3f4ec56",
                "sha256": "1b8c81ad7891a7fb725199d32c9754bd25f437c0dec138ee6b0d08cbb98b2925"
            },
            "downloads": -1,
            "filename": "taskiq_nats-0.4.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "5ed293522dee1843ee798d56b3f4ec56",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8.1,<4.0.0",
            "size": 4475,
            "upload_time": "2024-02-19T12:27:40",
            "upload_time_iso_8601": "2024-02-19T12:27:40.279582Z",
            "url": "https://files.pythonhosted.org/packages/18/e5/c013e5b65624ac470df434d0f649baeb78b58d0a5e244c0722510ea72c13/taskiq_nats-0.4.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "dfd48b551fd393d67fbc13a31703222e7740033664a0fd2a4ac7ff00a072997d",
                "md5": "79c6c8fb19913abd92136ae2b7ce38ff",
                "sha256": "c6b16ccd4bff83260437c6797f903e234033083ca1417bcb2a7ef6e059f5d404"
            },
            "downloads": -1,
            "filename": "taskiq_nats-0.4.0.tar.gz",
            "has_sig": false,
            "md5_digest": "79c6c8fb19913abd92136ae2b7ce38ff",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8.1,<4.0.0",
            "size": 4120,
            "upload_time": "2024-02-19T12:27:41",
            "upload_time_iso_8601": "2024-02-19T12:27:41.619922Z",
            "url": "https://files.pythonhosted.org/packages/df/d4/8b551fd393d67fbc13a31703222e7740033664a0fd2a4ac7ff00a072997d/taskiq_nats-0.4.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-02-19 12:27:41",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "taskiq-python",
    "github_project": "taskiq-nats",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "taskiq-nats"
}
        
Elapsed time: 0.19598s