# Taskiq NATS
Taskiq-nats is a plugin for taskiq that adds NATS broker.
This package has support for NATS JetStream.
## Installation
To use this project you must have installed core taskiq library:
```bash
pip install taskiq taskiq-nats
```
## Usage
Here's a minimal setup example with a broker and one task.
### Default NATS broker.
```python
import asyncio
from taskiq_nats import NatsBroker, JetStreamBroker
broker = NatsBroker(
[
"nats://nats1:4222",
"nats://nats2:4222",
],
queue="random_queue_name",
)
@broker.task
async def my_lovely_task():
print("I love taskiq")
async def main():
await broker.startup()
await my_lovely_task.kiq()
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
```
### NATS broker based on JetStream
```python
import asyncio
from taskiq_nats import (
PushBasedJetStreamBroker,
PullBasedJetStreamBroker
)
broker = PushBasedJetStreamBroker(
servers=[
"nats://nats1:4222",
"nats://nats2:4222",
],
queue="awesome_queue_name",
)
# Or you can use pull based variant
broker = PullBasedJetStreamBroker(
servers=[
"nats://nats1:4222",
"nats://nats2:4222",
],
durable="awesome_durable_consumer_name",
)
@broker.task
async def my_lovely_task():
print("I love taskiq")
async def main():
await broker.startup()
await my_lovely_task.kiq()
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
```
## NatsBroker configuration
Here's the constructor parameters:
* `servers` - a single string or a list of strings with nats nodes addresses.
* `subject` - name of the subect that will be used to exchange tasks betwee workers and clients.
* `queue` - optional name of the queue. By default NatsBroker broadcasts task to all workers,
but if you want to handle every task only once, you need to supply this argument.
* `result_backend` - custom result backend.
* `task_id_generator` - custom function to generate task ids.
* Every other keyword argument will be sent to `nats.connect` function.
## JetStreamBroker configuration
### Common
* `servers` - a single string or a list of strings with nats nodes addresses.
* `subject` - name of the subect that will be used to exchange tasks betwee workers and clients.
* `stream_name` - name of the stream where subjects will be located.
* `queue` - a single string or a list of strings with nats nodes addresses.
* `result_backend` - custom result backend.
* `task_id_generator` - custom function to generate task ids.
* `stream_config` - a config for stream.
* `consumer_config` - a config for consumer.
### PushBasedJetStreamBroker
* `queue` - name of the queue. It's used to share messages between different consumers.
### PullBasedJetStreamBroker
* `durable` - durable name of the consumer. It's used to share messages between different consumers.
* `pull_consume_batch` - maximum number of message that can be fetched each time.
* `pull_consume_timeout` - timeout for messages fetch. If there is no messages, we start fetching messages again.
## NATS Result Backend
It's possible to use NATS JetStream to store tasks result.
```python
import asyncio
from taskiq_nats import PullBasedJetStreamBroker
from taskiq_nats.result_backend import NATSObjectStoreResultBackend
result_backend = NATSObjectStoreResultBackend(
servers="localhost",
)
broker = PullBasedJetStreamBroker(
servers="localhost",
).with_result_backend(
result_backend=result_backend,
)
@broker.task
async def awesome_task() -> str:
return "Hello, NATS!"
async def main() -> None:
await broker.startup()
task = await awesome_task.kiq()
res = await task.wait_result()
print(res)
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
```
Raw data
{
"_id": null,
"home_page": "https://github.com/taskiq-python/taskiq-nats",
"name": "taskiq-nats",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0.0,>=3.8.1",
"maintainer_email": null,
"keywords": "taskiq, tasks, distributed, async, nats, result_backend",
"author": "taskiq-team",
"author_email": "taskiq@norely.com",
"download_url": "https://files.pythonhosted.org/packages/0f/1c/e46adc5031c92d2ce13ea13e426f664b258fd6e296c8fc6f1bb1009b698a/taskiq_nats-0.5.1.tar.gz",
"platform": null,
"description": "# Taskiq NATS\n\nTaskiq-nats is a plugin for taskiq that adds NATS broker.\nThis package has support for NATS JetStream.\n\n## Installation\n\nTo use this project you must have installed core taskiq library:\n\n```bash\npip install taskiq taskiq-nats\n```\n\n## Usage\n\nHere's a minimal setup example with a broker and one task.\n\n### Default NATS broker.\n```python\nimport asyncio\nfrom taskiq_nats import NatsBroker, JetStreamBroker\n\nbroker = NatsBroker(\n [\n \"nats://nats1:4222\",\n \"nats://nats2:4222\",\n ],\n queue=\"random_queue_name\",\n)\n\n\n@broker.task\nasync def my_lovely_task():\n print(\"I love taskiq\")\n\n\nasync def main():\n await broker.startup()\n\n await my_lovely_task.kiq()\n\n await broker.shutdown()\n\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n\n```\n### NATS broker based on JetStream\n```python\nimport asyncio\nfrom taskiq_nats import (\n PushBasedJetStreamBroker,\n PullBasedJetStreamBroker\n)\n\nbroker = PushBasedJetStreamBroker(\n servers=[\n \"nats://nats1:4222\",\n \"nats://nats2:4222\",\n ],\n queue=\"awesome_queue_name\",\n)\n\n# Or you can use pull based variant\nbroker = PullBasedJetStreamBroker(\n servers=[\n \"nats://nats1:4222\",\n \"nats://nats2:4222\",\n ],\n durable=\"awesome_durable_consumer_name\",\n)\n\n\n@broker.task\nasync def my_lovely_task():\n print(\"I love taskiq\")\n\n\nasync def main():\n await broker.startup()\n\n await my_lovely_task.kiq()\n\n await broker.shutdown()\n\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\n## NatsBroker configuration\n\nHere's the constructor parameters:\n\n* `servers` - a single string or a list of strings with nats nodes addresses.\n* `subject` - name of the subect that will be used to exchange tasks betwee workers and clients.\n* `queue` - optional name of the queue. By default NatsBroker broadcasts task to all workers,\n but if you want to handle every task only once, you need to supply this argument.\n* `result_backend` - custom result backend.\n* `task_id_generator` - custom function to generate task ids.\n* Every other keyword argument will be sent to `nats.connect` function.\n\n## JetStreamBroker configuration\n### Common\n* `servers` - a single string or a list of strings with nats nodes addresses.\n* `subject` - name of the subect that will be used to exchange tasks betwee workers and clients.\n* `stream_name` - name of the stream where subjects will be located.\n* `queue` - a single string or a list of strings with nats nodes addresses.\n* `result_backend` - custom result backend.\n* `task_id_generator` - custom function to generate task ids.\n* `stream_config` - a config for stream.\n* `consumer_config` - a config for consumer.\n\n### PushBasedJetStreamBroker\n* `queue` - name of the queue. It's used to share messages between different consumers.\n\n### PullBasedJetStreamBroker\n* `durable` - durable name of the consumer. It's used to share messages between different consumers.\n* `pull_consume_batch` - maximum number of message that can be fetched each time.\n* `pull_consume_timeout` - timeout for messages fetch. If there is no messages, we start fetching messages again.\n\n\n## NATS Result Backend\nIt's possible to use NATS JetStream to store tasks result.\n```python\nimport asyncio\nfrom taskiq_nats import PullBasedJetStreamBroker\nfrom taskiq_nats.result_backend import NATSObjectStoreResultBackend\n\n\nresult_backend = NATSObjectStoreResultBackend(\n servers=\"localhost\",\n)\nbroker = PullBasedJetStreamBroker(\n servers=\"localhost\",\n).with_result_backend(\n result_backend=result_backend,\n)\n\n\n@broker.task\nasync def awesome_task() -> str:\n return \"Hello, NATS!\"\n\n\nasync def main() -> None:\n await broker.startup()\n task = await awesome_task.kiq()\n res = await task.wait_result()\n print(res)\n await broker.shutdown()\n\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n\n```\n",
"bugtrack_url": null,
"license": null,
"summary": "NATS integration for taskiq",
"version": "0.5.1",
"project_urls": {
"Homepage": "https://github.com/taskiq-python/taskiq-nats",
"Repository": "https://github.com/taskiq-python/taskiq-nats"
},
"split_keywords": [
"taskiq",
" tasks",
" distributed",
" async",
" nats",
" result_backend"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "c8a77ce378ba7653cd5382d5a8611e4fdb13e9b991d7a2f54bd55c081123290f",
"md5": "0adb7ffab8452df1788c006e5767e847",
"sha256": "87edcc082efe98435f59b344439b03436d4ef52eef52a6e34fd3e5bfe113e168"
},
"downloads": -1,
"filename": "taskiq_nats-0.5.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "0adb7ffab8452df1788c006e5767e847",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0.0,>=3.8.1",
"size": 7580,
"upload_time": "2024-11-06T18:44:37",
"upload_time_iso_8601": "2024-11-06T18:44:37.515321Z",
"url": "https://files.pythonhosted.org/packages/c8/a7/7ce378ba7653cd5382d5a8611e4fdb13e9b991d7a2f54bd55c081123290f/taskiq_nats-0.5.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "0f1ce46adc5031c92d2ce13ea13e426f664b258fd6e296c8fc6f1bb1009b698a",
"md5": "cade2a9ccc52fcf8319e3327ea699f94",
"sha256": "fe305fef7c959613bda893a0ffe92e0696194de9df1181c9b6e3a573bf5ac339"
},
"downloads": -1,
"filename": "taskiq_nats-0.5.1.tar.gz",
"has_sig": false,
"md5_digest": "cade2a9ccc52fcf8319e3327ea699f94",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0.0,>=3.8.1",
"size": 7705,
"upload_time": "2024-11-06T18:44:39",
"upload_time_iso_8601": "2024-11-06T18:44:39.016020Z",
"url": "https://files.pythonhosted.org/packages/0f/1c/e46adc5031c92d2ce13ea13e426f664b258fd6e296c8fc6f1bb1009b698a/taskiq_nats-0.5.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-11-06 18:44:39",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "taskiq-python",
"github_project": "taskiq-nats",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "taskiq-nats"
}