taskiq-nats


Nametaskiq-nats JSON
Version 0.5.1 PyPI version JSON
download
home_pagehttps://github.com/taskiq-python/taskiq-nats
SummaryNATS integration for taskiq
upload_time2024-11-06 18:44:39
maintainerNone
docs_urlNone
authortaskiq-team
requires_python<4.0.0,>=3.8.1
licenseNone
keywords taskiq tasks distributed async nats result_backend
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Taskiq NATS

Taskiq-nats is a plugin for taskiq that adds NATS broker.
This package has support for NATS JetStream.

## Installation

To use this project you must have installed core taskiq library:

```bash
pip install taskiq taskiq-nats
```

## Usage

Here's a minimal setup example with a broker and one task.

### Default NATS broker.
```python
import asyncio
from taskiq_nats import NatsBroker, JetStreamBroker

broker = NatsBroker(
    [
        "nats://nats1:4222",
        "nats://nats2:4222",
    ],
    queue="random_queue_name",
)


@broker.task
async def my_lovely_task():
    print("I love taskiq")


async def main():
    await broker.startup()

    await my_lovely_task.kiq()

    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

```
### NATS broker based on JetStream
```python
import asyncio
from taskiq_nats import (
    PushBasedJetStreamBroker,
    PullBasedJetStreamBroker
)

broker = PushBasedJetStreamBroker(
    servers=[
        "nats://nats1:4222",
        "nats://nats2:4222",
    ],
    queue="awesome_queue_name",
)

# Or you can use pull based variant
broker = PullBasedJetStreamBroker(
    servers=[
        "nats://nats1:4222",
        "nats://nats2:4222",
    ],
    durable="awesome_durable_consumer_name",
)


@broker.task
async def my_lovely_task():
    print("I love taskiq")


async def main():
    await broker.startup()

    await my_lovely_task.kiq()

    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())
```

## NatsBroker configuration

Here's the constructor parameters:

* `servers` - a single string or a list of strings with nats nodes addresses.
* `subject` - name of the subect that will be used to exchange tasks betwee workers and clients.
* `queue` - optional name of the queue. By default NatsBroker broadcasts task to all workers,
    but if you want to handle every task only once, you need to supply this argument.
* `result_backend` - custom result backend.
* `task_id_generator` - custom function to generate task ids.
* Every other keyword argument will be sent to `nats.connect` function.

## JetStreamBroker configuration
### Common
* `servers` - a single string or a list of strings with nats nodes addresses.
* `subject` - name of the subect that will be used to exchange tasks betwee workers and clients.
* `stream_name` - name of the stream where subjects will be located.
* `queue` - a single string or a list of strings with nats nodes addresses.
* `result_backend` - custom result backend.
* `task_id_generator` - custom function to generate task ids.
* `stream_config` - a config for stream.
* `consumer_config` - a config for consumer.

### PushBasedJetStreamBroker
* `queue` - name of the queue. It's used to share messages between different consumers.

### PullBasedJetStreamBroker
* `durable` - durable name of the consumer. It's used to share messages between different consumers.
* `pull_consume_batch` - maximum number of message that can be fetched each time.
* `pull_consume_timeout` - timeout for messages fetch. If there is no messages, we start fetching messages again.


## NATS Result Backend
It's possible to use NATS JetStream to store tasks result.
```python
import asyncio
from taskiq_nats import PullBasedJetStreamBroker
from taskiq_nats.result_backend import NATSObjectStoreResultBackend


result_backend = NATSObjectStoreResultBackend(
    servers="localhost",
)
broker = PullBasedJetStreamBroker(
    servers="localhost",
).with_result_backend(
    result_backend=result_backend,
)


@broker.task
async def awesome_task() -> str:
    return "Hello, NATS!"


async def main() -> None:
    await broker.startup()
    task = await awesome_task.kiq()
    res = await task.wait_result()
    print(res)
    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/taskiq-python/taskiq-nats",
    "name": "taskiq-nats",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0.0,>=3.8.1",
    "maintainer_email": null,
    "keywords": "taskiq, tasks, distributed, async, nats, result_backend",
    "author": "taskiq-team",
    "author_email": "taskiq@norely.com",
    "download_url": "https://files.pythonhosted.org/packages/0f/1c/e46adc5031c92d2ce13ea13e426f664b258fd6e296c8fc6f1bb1009b698a/taskiq_nats-0.5.1.tar.gz",
    "platform": null,
    "description": "# Taskiq NATS\n\nTaskiq-nats is a plugin for taskiq that adds NATS broker.\nThis package has support for NATS JetStream.\n\n## Installation\n\nTo use this project you must have installed core taskiq library:\n\n```bash\npip install taskiq taskiq-nats\n```\n\n## Usage\n\nHere's a minimal setup example with a broker and one task.\n\n### Default NATS broker.\n```python\nimport asyncio\nfrom taskiq_nats import NatsBroker, JetStreamBroker\n\nbroker = NatsBroker(\n    [\n        \"nats://nats1:4222\",\n        \"nats://nats2:4222\",\n    ],\n    queue=\"random_queue_name\",\n)\n\n\n@broker.task\nasync def my_lovely_task():\n    print(\"I love taskiq\")\n\n\nasync def main():\n    await broker.startup()\n\n    await my_lovely_task.kiq()\n\n    await broker.shutdown()\n\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n\n```\n### NATS broker based on JetStream\n```python\nimport asyncio\nfrom taskiq_nats import (\n    PushBasedJetStreamBroker,\n    PullBasedJetStreamBroker\n)\n\nbroker = PushBasedJetStreamBroker(\n    servers=[\n        \"nats://nats1:4222\",\n        \"nats://nats2:4222\",\n    ],\n    queue=\"awesome_queue_name\",\n)\n\n# Or you can use pull based variant\nbroker = PullBasedJetStreamBroker(\n    servers=[\n        \"nats://nats1:4222\",\n        \"nats://nats2:4222\",\n    ],\n    durable=\"awesome_durable_consumer_name\",\n)\n\n\n@broker.task\nasync def my_lovely_task():\n    print(\"I love taskiq\")\n\n\nasync def main():\n    await broker.startup()\n\n    await my_lovely_task.kiq()\n\n    await broker.shutdown()\n\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n```\n\n## NatsBroker configuration\n\nHere's the constructor parameters:\n\n* `servers` - a single string or a list of strings with nats nodes addresses.\n* `subject` - name of the subect that will be used to exchange tasks betwee workers and clients.\n* `queue` - optional name of the queue. By default NatsBroker broadcasts task to all workers,\n    but if you want to handle every task only once, you need to supply this argument.\n* `result_backend` - custom result backend.\n* `task_id_generator` - custom function to generate task ids.\n* Every other keyword argument will be sent to `nats.connect` function.\n\n## JetStreamBroker configuration\n### Common\n* `servers` - a single string or a list of strings with nats nodes addresses.\n* `subject` - name of the subect that will be used to exchange tasks betwee workers and clients.\n* `stream_name` - name of the stream where subjects will be located.\n* `queue` - a single string or a list of strings with nats nodes addresses.\n* `result_backend` - custom result backend.\n* `task_id_generator` - custom function to generate task ids.\n* `stream_config` - a config for stream.\n* `consumer_config` - a config for consumer.\n\n### PushBasedJetStreamBroker\n* `queue` - name of the queue. It's used to share messages between different consumers.\n\n### PullBasedJetStreamBroker\n* `durable` - durable name of the consumer. It's used to share messages between different consumers.\n* `pull_consume_batch` - maximum number of message that can be fetched each time.\n* `pull_consume_timeout` - timeout for messages fetch. If there is no messages, we start fetching messages again.\n\n\n## NATS Result Backend\nIt's possible to use NATS JetStream to store tasks result.\n```python\nimport asyncio\nfrom taskiq_nats import PullBasedJetStreamBroker\nfrom taskiq_nats.result_backend import NATSObjectStoreResultBackend\n\n\nresult_backend = NATSObjectStoreResultBackend(\n    servers=\"localhost\",\n)\nbroker = PullBasedJetStreamBroker(\n    servers=\"localhost\",\n).with_result_backend(\n    result_backend=result_backend,\n)\n\n\n@broker.task\nasync def awesome_task() -> str:\n    return \"Hello, NATS!\"\n\n\nasync def main() -> None:\n    await broker.startup()\n    task = await awesome_task.kiq()\n    res = await task.wait_result()\n    print(res)\n    await broker.shutdown()\n\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n\n```\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "NATS integration for taskiq",
    "version": "0.5.1",
    "project_urls": {
        "Homepage": "https://github.com/taskiq-python/taskiq-nats",
        "Repository": "https://github.com/taskiq-python/taskiq-nats"
    },
    "split_keywords": [
        "taskiq",
        " tasks",
        " distributed",
        " async",
        " nats",
        " result_backend"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "c8a77ce378ba7653cd5382d5a8611e4fdb13e9b991d7a2f54bd55c081123290f",
                "md5": "0adb7ffab8452df1788c006e5767e847",
                "sha256": "87edcc082efe98435f59b344439b03436d4ef52eef52a6e34fd3e5bfe113e168"
            },
            "downloads": -1,
            "filename": "taskiq_nats-0.5.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "0adb7ffab8452df1788c006e5767e847",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0.0,>=3.8.1",
            "size": 7580,
            "upload_time": "2024-11-06T18:44:37",
            "upload_time_iso_8601": "2024-11-06T18:44:37.515321Z",
            "url": "https://files.pythonhosted.org/packages/c8/a7/7ce378ba7653cd5382d5a8611e4fdb13e9b991d7a2f54bd55c081123290f/taskiq_nats-0.5.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "0f1ce46adc5031c92d2ce13ea13e426f664b258fd6e296c8fc6f1bb1009b698a",
                "md5": "cade2a9ccc52fcf8319e3327ea699f94",
                "sha256": "fe305fef7c959613bda893a0ffe92e0696194de9df1181c9b6e3a573bf5ac339"
            },
            "downloads": -1,
            "filename": "taskiq_nats-0.5.1.tar.gz",
            "has_sig": false,
            "md5_digest": "cade2a9ccc52fcf8319e3327ea699f94",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0.0,>=3.8.1",
            "size": 7705,
            "upload_time": "2024-11-06T18:44:39",
            "upload_time_iso_8601": "2024-11-06T18:44:39.016020Z",
            "url": "https://files.pythonhosted.org/packages/0f/1c/e46adc5031c92d2ce13ea13e426f664b258fd6e296c8fc6f1bb1009b698a/taskiq_nats-0.5.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-11-06 18:44:39",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "taskiq-python",
    "github_project": "taskiq-nats",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "taskiq-nats"
}
        
Elapsed time: 0.34045s