# django-nats-consumer
NATS + Django = ⚡️
## Installation
Please pay attention to the development status; this is Pre-Alpha software; expect the api to evolve as I start using this more in production.
I hope you find some value in it - writing a good consumer takes some finesse.
```bash
pip install django-nats-consumer
```
## Usage
**settings.py**
```python
INSTALLED_APPS = [
...
"nats_consumer",
]
NATS_CONSUMER = {
"connect_args": {
"servers": ["nats://localhost:4222"],
"allow_reconnect": True,
"max_reconnect_attempts": 5,
"reconnect_time_wait": 1,
"connect_timeout": 10,
},
}
```
**{app_name}/consumers.py**
```python
# Consumers need to be in the consumers module in order to be loaded,
# or you can import them to force them to be loaded.
from nats_consumer import JetstreamPushConsumer
import logging
from nats_consumer import JetstreamPushConsumer, operations
logger = logging.getLogger(__name__)
class OrderConsumer(JetstreamPushConsumer):
stream_name = "orders"
subjects = [
"orders.created",
]
# You need to setup the streams
async def setup(self):
return [
operations.CreateStream(
name=self.stream_name,
subjects=self.subjects,
storage="file"
),
]
async def handle_message(self, message):
# The message only shows if its logged as error
logger.error(f"Received message: {message.data}")
```
**publish.py**
```python
import asyncio
from nats_consumer import get_nats_client
async def publish_messages():
ns = await get_nats_client()
js = ns.jetstream()
for i in range(5):
data = {"id": i, "name": f"Order {i}"}
data_b = json.dumps(data).encode("utf-8")
print(f"Publishing message {i}...")
await js.publish("orders.created", data_b)
if __name__ == "__main__":
asyncio.run(publish_messages())
```
## Running Consumers
**To run a single consumer:**
```bash
python manage.py nats_consumer OrderConsumer --setup
```
**To run multiple consumers:**
```bash
python manage.py nats_consumer OrderConsumer AnotherConsumer
```
**To run all consumers:**
```bash
python manage.py nats_consumer
```
## Feature roadmap
- Encoding/decoding of messages (json, protobuf, etc)
- Better error handling, configurable retry
- Better log output from the consumer
- Configurable DLQ strategies
- [insert your feature here]
Raw data
{
"_id": null,
"home_page": "https://github.com/dev360/django-nats-consumer",
"name": "django-nats-consumer",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": null,
"keywords": "django, nats, jetstream, consumer, async",
"author": null,
"author_email": null,
"download_url": "https://files.pythonhosted.org/packages/8e/cb/6016fcf2e51464b3b3cdfe0e23a8906b4419d07a37926c2de0ad1401ac85/django_nats_consumer-1.2.1.tar.gz",
"platform": null,
"description": "# django-nats-consumer\nNATS + Django = \u26a1\ufe0f\n\n## Installation\n\nPlease pay attention to the development status; this is Pre-Alpha software; expect the api to evolve as I start using this more in production.\n\nI hope you find some value in it - writing a good consumer takes some finesse.\n\n\n```bash\npip install django-nats-consumer\n```\n\n\n## Usage\n\n**settings.py**\n```python\nINSTALLED_APPS = [\n ...\n \"nats_consumer\",\n]\n\nNATS_CONSUMER = {\n \"connect_args\": {\n \"servers\": [\"nats://localhost:4222\"],\n \"allow_reconnect\": True,\n \"max_reconnect_attempts\": 5,\n \"reconnect_time_wait\": 1,\n \"connect_timeout\": 10,\n },\n}\n```\n\n**{app_name}/consumers.py**\n```python\n# Consumers need to be in the consumers module in order to be loaded,\n# or you can import them to force them to be loaded.\nfrom nats_consumer import JetstreamPushConsumer\n\nimport logging\n\nfrom nats_consumer import JetstreamPushConsumer, operations\n\nlogger = logging.getLogger(__name__)\n\n\nclass OrderConsumer(JetstreamPushConsumer):\n stream_name = \"orders\"\n subjects = [\n \"orders.created\",\n ]\n\n # You need to setup the streams\n async def setup(self):\n return [\n operations.CreateStream(\n name=self.stream_name,\n subjects=self.subjects,\n storage=\"file\"\n ),\n ]\n\n async def handle_message(self, message):\n # The message only shows if its logged as error\n logger.error(f\"Received message: {message.data}\")\n\n```\n\n**publish.py**\n```python\nimport asyncio\n\nfrom nats_consumer import get_nats_client\n\nasync def publish_messages():\n ns = await get_nats_client()\n js = ns.jetstream()\n for i in range(5):\n data = {\"id\": i, \"name\": f\"Order {i}\"}\n data_b = json.dumps(data).encode(\"utf-8\")\n print(f\"Publishing message {i}...\")\n await js.publish(\"orders.created\", data_b)\n\nif __name__ == \"__main__\":\n asyncio.run(publish_messages())\n\n```\n\n## Running Consumers\n**To run a single consumer:**\n```bash\npython manage.py nats_consumer OrderConsumer --setup\n```\n\n**To run multiple consumers:**\n```bash\npython manage.py nats_consumer OrderConsumer AnotherConsumer\n```\n\n**To run all consumers:**\n```bash\npython manage.py nats_consumer\n```\n\n## Feature roadmap\n- Encoding/decoding of messages (json, protobuf, etc)\n- Better error handling, configurable retry\n- Better log output from the consumer\n- Configurable DLQ strategies\n- [insert your feature here]\n",
"bugtrack_url": null,
"license": "BSD-3-Clause",
"summary": "Django NATS Consumer",
"version": "1.2.1",
"project_urls": {
"Homepage": "https://github.com/dev360/django-nats-consumer",
"Repository": "https://github.com/dev360/django-nats-consumer"
},
"split_keywords": [
"django",
" nats",
" jetstream",
" consumer",
" async"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "0c83d37210632f008234391d7a75223a55b7ff827e525779454f0164aed4fe60",
"md5": "a12aed8adf406497452c137555708933",
"sha256": "d24b69a07599ac464793aa15243a50e46b293af36685223f89215485db27a677"
},
"downloads": -1,
"filename": "django_nats_consumer-1.2.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "a12aed8adf406497452c137555708933",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 10737,
"upload_time": "2025-01-28T05:09:27",
"upload_time_iso_8601": "2025-01-28T05:09:27.749069Z",
"url": "https://files.pythonhosted.org/packages/0c/83/d37210632f008234391d7a75223a55b7ff827e525779454f0164aed4fe60/django_nats_consumer-1.2.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "8ecb6016fcf2e51464b3b3cdfe0e23a8906b4419d07a37926c2de0ad1401ac85",
"md5": "d04a098148504b5e5efda348a1fc4e63",
"sha256": "f539dac9a619b577e694ffcb4c1896adfae374eab1b01b2f20e26722f69f9d7c"
},
"downloads": -1,
"filename": "django_nats_consumer-1.2.1.tar.gz",
"has_sig": false,
"md5_digest": "d04a098148504b5e5efda348a1fc4e63",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 9569,
"upload_time": "2025-01-28T05:09:29",
"upload_time_iso_8601": "2025-01-28T05:09:29.243322Z",
"url": "https://files.pythonhosted.org/packages/8e/cb/6016fcf2e51464b3b3cdfe0e23a8906b4419d07a37926c2de0ad1401ac85/django_nats_consumer-1.2.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-01-28 05:09:29",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "dev360",
"github_project": "django-nats-consumer",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "django-nats-consumer"
}