# Taskiq NATS
Taskiq-nats is a plugin for taskiq that adds NATS broker.
This package has support for NATS JetStream.
## Installation
To use this project you must have installed core taskiq library:
```bash
pip install taskiq taskiq-nats
```
## Usage
Here's a minimal setup example with a broker and one task.
### Default NATS broker.
```python
import asyncio
from taskiq_nats import NatsBroker, JetStreamBroker
broker = NatsBroker(
[
"nats://nats1:4222",
"nats://nats2:4222",
],
queue="random_queue_name",
)
@broker.task
async def my_lovely_task():
print("I love taskiq")
async def main():
await broker.startup()
await my_lovely_task.kiq()
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
```
### NATS broker based on JetStream
```python
import asyncio
from taskiq_nats import (
PushBasedJetStreamBroker,
PullBasedJetStreamBroker
)
broker = PushBasedJetStreamBroker(
servers=[
"nats://nats1:4222",
"nats://nats2:4222",
],
queue="awesome_queue_name",
)
# Or you can use pull based variant
broker = PullBasedJetStreamBroker(
servers=[
"nats://nats1:4222",
"nats://nats2:4222",
],
durable="awesome_durable_consumer_name",
)
@broker.task
async def my_lovely_task():
print("I love taskiq")
async def main():
await broker.startup()
await my_lovely_task.kiq()
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
```
## NatsBroker configuration
Here's the constructor parameters:
* `servers` - a single string or a list of strings with nats nodes addresses.
* `subject` - name of the subect that will be used to exchange tasks betwee workers and clients.
* `queue` - optional name of the queue. By default NatsBroker broadcasts task to all workers,
but if you want to handle every task only once, you need to supply this argument.
* `result_backend` - custom result backend.
* `task_id_generator` - custom function to generate task ids.
* Every other keyword argument will be sent to `nats.connect` function.
## JetStreamBroker configuration
### Common
* `servers` - a single string or a list of strings with nats nodes addresses.
* `subject` - name of the subect that will be used to exchange tasks betwee workers and clients.
* `stream_name` - name of the stream where subjects will be located.
* `queue` - a single string or a list of strings with nats nodes addresses.
* `result_backend` - custom result backend.
* `task_id_generator` - custom function to generate task ids.
* `stream_config` - a config for stream.
* `consumer_config` - a config for consumer.
### PushBasedJetStreamBroker
* `queue` - name of the queue. It's used to share messages between different consumers.
### PullBasedJetStreamBroker
* `durable` - durable name of the consumer. It's used to share messages between different consumers.
* `pull_consume_batch` - maximum number of message that can be fetched each time.
* `pull_consume_timeout` - timeout for messages fetch. If there is no messages, we start fetching messages again.
Raw data
{
"_id": null,
"home_page": "https://github.com/taskiq-python/taskiq-nats",
"name": "taskiq-nats",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.8.1,<4.0.0",
"maintainer_email": "",
"keywords": "taskiq,tasks,distributed,async,nats,result_backend",
"author": "taskiq-team",
"author_email": "taskiq@norely.com",
"download_url": "https://files.pythonhosted.org/packages/df/d4/8b551fd393d67fbc13a31703222e7740033664a0fd2a4ac7ff00a072997d/taskiq_nats-0.4.0.tar.gz",
"platform": null,
"description": "# Taskiq NATS\n\nTaskiq-nats is a plugin for taskiq that adds NATS broker.\nThis package has support for NATS JetStream.\n\n## Installation\n\nTo use this project you must have installed core taskiq library:\n\n```bash\npip install taskiq taskiq-nats\n```\n\n## Usage\n\nHere's a minimal setup example with a broker and one task.\n\n### Default NATS broker.\n```python\nimport asyncio\nfrom taskiq_nats import NatsBroker, JetStreamBroker\n\nbroker = NatsBroker(\n [\n \"nats://nats1:4222\",\n \"nats://nats2:4222\",\n ],\n queue=\"random_queue_name\",\n)\n\n\n@broker.task\nasync def my_lovely_task():\n print(\"I love taskiq\")\n\n\nasync def main():\n await broker.startup()\n\n await my_lovely_task.kiq()\n\n await broker.shutdown()\n\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n\n```\n### NATS broker based on JetStream\n```python\nimport asyncio\nfrom taskiq_nats import (\n PushBasedJetStreamBroker,\n PullBasedJetStreamBroker\n)\n\nbroker = PushBasedJetStreamBroker(\n servers=[\n \"nats://nats1:4222\",\n \"nats://nats2:4222\",\n ],\n queue=\"awesome_queue_name\",\n)\n\n# Or you can use pull based variant\nbroker = PullBasedJetStreamBroker(\n servers=[\n \"nats://nats1:4222\",\n \"nats://nats2:4222\",\n ],\n durable=\"awesome_durable_consumer_name\",\n)\n\n\n@broker.task\nasync def my_lovely_task():\n print(\"I love taskiq\")\n\n\nasync def main():\n await broker.startup()\n\n await my_lovely_task.kiq()\n\n await broker.shutdown()\n\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\n## NatsBroker configuration\n\nHere's the constructor parameters:\n\n* `servers` - a single string or a list of strings with nats nodes addresses.\n* `subject` - name of the subect that will be used to exchange tasks betwee workers and clients.\n* `queue` - optional name of the queue. By default NatsBroker broadcasts task to all workers,\n but if you want to handle every task only once, you need to supply this argument.\n* `result_backend` - custom result backend.\n* `task_id_generator` - custom function to generate task ids.\n* Every other keyword argument will be sent to `nats.connect` function.\n\n## JetStreamBroker configuration\n### Common\n* `servers` - a single string or a list of strings with nats nodes addresses.\n* `subject` - name of the subect that will be used to exchange tasks betwee workers and clients.\n* `stream_name` - name of the stream where subjects will be located.\n* `queue` - a single string or a list of strings with nats nodes addresses.\n* `result_backend` - custom result backend.\n* `task_id_generator` - custom function to generate task ids.\n* `stream_config` - a config for stream.\n* `consumer_config` - a config for consumer.\n\n### PushBasedJetStreamBroker\n* `queue` - name of the queue. It's used to share messages between different consumers.\n\n### PullBasedJetStreamBroker\n* `durable` - durable name of the consumer. It's used to share messages between different consumers.\n* `pull_consume_batch` - maximum number of message that can be fetched each time.\n* `pull_consume_timeout` - timeout for messages fetch. If there is no messages, we start fetching messages again.\n",
"bugtrack_url": null,
"license": "",
"summary": "NATS integration for taskiq",
"version": "0.4.0",
"project_urls": {
"Homepage": "https://github.com/taskiq-python/taskiq-nats",
"Repository": "https://github.com/taskiq-python/taskiq-nats"
},
"split_keywords": [
"taskiq",
"tasks",
"distributed",
"async",
"nats",
"result_backend"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "18e5c013e5b65624ac470df434d0f649baeb78b58d0a5e244c0722510ea72c13",
"md5": "5ed293522dee1843ee798d56b3f4ec56",
"sha256": "1b8c81ad7891a7fb725199d32c9754bd25f437c0dec138ee6b0d08cbb98b2925"
},
"downloads": -1,
"filename": "taskiq_nats-0.4.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "5ed293522dee1843ee798d56b3f4ec56",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8.1,<4.0.0",
"size": 4475,
"upload_time": "2024-02-19T12:27:40",
"upload_time_iso_8601": "2024-02-19T12:27:40.279582Z",
"url": "https://files.pythonhosted.org/packages/18/e5/c013e5b65624ac470df434d0f649baeb78b58d0a5e244c0722510ea72c13/taskiq_nats-0.4.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "dfd48b551fd393d67fbc13a31703222e7740033664a0fd2a4ac7ff00a072997d",
"md5": "79c6c8fb19913abd92136ae2b7ce38ff",
"sha256": "c6b16ccd4bff83260437c6797f903e234033083ca1417bcb2a7ef6e059f5d404"
},
"downloads": -1,
"filename": "taskiq_nats-0.4.0.tar.gz",
"has_sig": false,
"md5_digest": "79c6c8fb19913abd92136ae2b7ce38ff",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8.1,<4.0.0",
"size": 4120,
"upload_time": "2024-02-19T12:27:41",
"upload_time_iso_8601": "2024-02-19T12:27:41.619922Z",
"url": "https://files.pythonhosted.org/packages/df/d4/8b551fd393d67fbc13a31703222e7740033664a0fd2a4ac7ff00a072997d/taskiq_nats-0.4.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-02-19 12:27:41",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "taskiq-python",
"github_project": "taskiq-nats",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "taskiq-nats"
}