# django-nats-consumer
NATS + Django = ⚡️
## Installation
Please pay attention to the development status; this is Pre-Alpha software; expect the api to evolve as I start using this more in production.
```bash
pip install django-nats-consumer
```
## Usage
**settings.py**
```python
INSTALLED_APPS = [
...
"nats_consumer",
...
]
NATS_CONSUMER = {
"connect_args": {
"servers": ["nats://localhost:4222"],
"allow_reconnect": True,
"max_reconnect_attempts": 5,
"reconnect_time_wait": 1,
"connect_timeout": 10,
},
}
```
**{app_name}/consumers.py**
```python
# Consumers need to be in the consumers module in order to be loaded,
# or you can import them to force them to be loaded.
from nats_consumer import JetstreamPushConsumer
import logging
from nats_consumer import JetstreamPushConsumer, operations
logger = logging.getLogger(__name__)
class OrderConsumer(JetstreamPushConsumer):
stream_name = "orders"
subjects = [
"orders.created",
]
# You need to setup the streams
async def setup(self):
return [
operations.CreateStream(
name=self.stream_name,
subjects=self.subjects,
storage="file"
),
]
async def handle_message(self, message):
# The message only shows if its logged as error
logger.error(f"Received message: {message.data}")
```
**publish.py**
```python
import asyncio
from nats_consumer import get_nats_client
async def publish_messages():
ns = await get_nats_client()
js = ns.jetstream()
for i in range(5):
data = {"id": i, "name": f"Order {i}"}
data_b = json.dumps(data).encode("utf-8")
print(f"Publishing message {i}...")
await js.publish("orders.created", data_b)
if __name__ == "__main__":
asyncio.run(publish_messages())
```
## Running Consumers
**To run a single consumer:**
```bash
python manage.py nats_consumer OrderConsumer --setup
```
**To run multiple consumers:**
```bash
python manage.py nats_consumer OrderConsumer AnotherConsumer
```
**To run all consumers:**
```bash
python manage.py nats_consumer
```
**Additional Options:**
```bash
# Enable uvloop for better performance
python manage.py nats_consumer --uvloop
# Enable auto-reload for development (watches for file changes)
python manage.py nats_consumer --reload
# Combine options
python manage.py nats_consumer OrderConsumer --uvloop --reload
```
**Note**
When running your code in production, uvloop typically provides better performance than the default asyncio event loop, but it's only available on Unix-like systems (Linux, macOS). To use uvloop, please install using:
```bash
pip install django-nats-consumer[uvloop]
```
And add the following to your settings.py:
```python
NATS_CONSUMER = {
"event_loop_policy": "uvloop.EventLoopPolicy",
...
}
```
Raw data
{
"_id": null,
"home_page": "https://github.com/dev360/django-nats-consumer",
"name": "django-nats-consumer",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": "django, nats, jetstream, consumer, async",
"author": null,
"author_email": null,
"download_url": "https://files.pythonhosted.org/packages/1b/f0/0f381ba1efb9d01830a0906150f6ca002be45e4f618c08ca6d161e6295d7/django_nats_consumer-1.3.0.tar.gz",
"platform": null,
"description": "# django-nats-consumer\nNATS + Django = \u26a1\ufe0f\n\n## Installation\n\nPlease pay attention to the development status; this is Pre-Alpha software; expect the api to evolve as I start using this more in production.\n\n```bash\npip install django-nats-consumer\n```\n\n\n## Usage\n\n**settings.py**\n```python\nINSTALLED_APPS = [\n ...\n \"nats_consumer\",\n ...\n]\n\nNATS_CONSUMER = {\n \"connect_args\": {\n \"servers\": [\"nats://localhost:4222\"],\n \"allow_reconnect\": True,\n \"max_reconnect_attempts\": 5,\n \"reconnect_time_wait\": 1,\n \"connect_timeout\": 10,\n },\n}\n```\n\n**{app_name}/consumers.py**\n```python\n# Consumers need to be in the consumers module in order to be loaded,\n# or you can import them to force them to be loaded.\nfrom nats_consumer import JetstreamPushConsumer\n\nimport logging\n\nfrom nats_consumer import JetstreamPushConsumer, operations\n\nlogger = logging.getLogger(__name__)\n\n\nclass OrderConsumer(JetstreamPushConsumer):\n stream_name = \"orders\"\n subjects = [\n \"orders.created\",\n ]\n\n # You need to setup the streams\n async def setup(self):\n return [\n operations.CreateStream(\n name=self.stream_name,\n subjects=self.subjects,\n storage=\"file\"\n ),\n ]\n\n async def handle_message(self, message):\n # The message only shows if its logged as error\n logger.error(f\"Received message: {message.data}\")\n\n```\n\n**publish.py**\n```python\nimport asyncio\n\nfrom nats_consumer import get_nats_client\n\nasync def publish_messages():\n ns = await get_nats_client()\n js = ns.jetstream()\n for i in range(5):\n data = {\"id\": i, \"name\": f\"Order {i}\"}\n data_b = json.dumps(data).encode(\"utf-8\")\n print(f\"Publishing message {i}...\")\n await js.publish(\"orders.created\", data_b)\n\nif __name__ == \"__main__\":\n asyncio.run(publish_messages())\n\n```\n\n## Running Consumers\n**To run a single consumer:**\n```bash\npython manage.py nats_consumer OrderConsumer --setup\n```\n\n**To run multiple consumers:**\n```bash\npython manage.py nats_consumer OrderConsumer AnotherConsumer\n```\n\n**To run all consumers:**\n```bash\npython manage.py nats_consumer\n```\n\n**Additional Options:**\n```bash\n# Enable uvloop for better performance\npython manage.py nats_consumer --uvloop\n\n# Enable auto-reload for development (watches for file changes)\npython manage.py nats_consumer --reload\n\n# Combine options\npython manage.py nats_consumer OrderConsumer --uvloop --reload\n```\n\n**Note**\nWhen running your code in production, uvloop typically provides better performance than the default asyncio event loop, but it's only available on Unix-like systems (Linux, macOS). To use uvloop, please install using:\n\n```bash\npip install django-nats-consumer[uvloop]\n```\n\nAnd add the following to your settings.py:\n```python\nNATS_CONSUMER = {\n \"event_loop_policy\": \"uvloop.EventLoopPolicy\",\n ...\n}\n```\n",
"bugtrack_url": null,
"license": "BSD-3-Clause",
"summary": "Django NATS Consumer",
"version": "1.3.0",
"project_urls": {
"Homepage": "https://github.com/dev360/django-nats-consumer",
"Repository": "https://github.com/dev360/django-nats-consumer"
},
"split_keywords": [
"django",
" nats",
" jetstream",
" consumer",
" async"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "e794dd6be61dd3f77af85b41d34a4600d97715ee099ad0f8e4abf6988b3c1c14",
"md5": "9ac6e0f206573e97ae7b46cd2e37a340",
"sha256": "ac996f0d06433ae21e5fc93943f7b4d0a855e172d76c70cc805549ad82fe1af5"
},
"downloads": -1,
"filename": "django_nats_consumer-1.3.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "9ac6e0f206573e97ae7b46cd2e37a340",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 11574,
"upload_time": "2025-02-09T21:01:11",
"upload_time_iso_8601": "2025-02-09T21:01:11.597603Z",
"url": "https://files.pythonhosted.org/packages/e7/94/dd6be61dd3f77af85b41d34a4600d97715ee099ad0f8e4abf6988b3c1c14/django_nats_consumer-1.3.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "1bf00f381ba1efb9d01830a0906150f6ca002be45e4f618c08ca6d161e6295d7",
"md5": "b4f410593ffd4ca397f0a2146dd0b77e",
"sha256": "a8d72b7663400c6061ba2c643d1518591f2a9f112985f8563834234dc47e1d8f"
},
"downloads": -1,
"filename": "django_nats_consumer-1.3.0.tar.gz",
"has_sig": false,
"md5_digest": "b4f410593ffd4ca397f0a2146dd0b77e",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 10501,
"upload_time": "2025-02-09T21:01:13",
"upload_time_iso_8601": "2025-02-09T21:01:13.546518Z",
"url": "https://files.pythonhosted.org/packages/1b/f0/0f381ba1efb9d01830a0906150f6ca002be45e4f618c08ca6d161e6295d7/django_nats_consumer-1.3.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-02-09 21:01:13",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "dev360",
"github_project": "django-nats-consumer",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "django-nats-consumer"
}