django-nats-consumer


Namedjango-nats-consumer JSON
Version 1.2.1 PyPI version JSON
download
home_pagehttps://github.com/dev360/django-nats-consumer
SummaryDjango NATS Consumer
upload_time2025-01-28 05:09:29
maintainerNone
docs_urlNone
authorNone
requires_python>=3.8
licenseBSD-3-Clause
keywords django nats jetstream consumer async
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # django-nats-consumer
NATS + Django = ⚡️

## Installation

Please pay attention to the development status; this is Pre-Alpha software; expect the api to evolve as I start using this more in production.

I hope you find some value in it - writing a good consumer takes some finesse.


```bash
pip install django-nats-consumer
```


## Usage

**settings.py**
```python
INSTALLED_APPS = [
    ...
    "nats_consumer",
]

NATS_CONSUMER = {
    "connect_args": {
        "servers": ["nats://localhost:4222"],
        "allow_reconnect": True,
        "max_reconnect_attempts": 5,
        "reconnect_time_wait": 1,
        "connect_timeout": 10,
    },
}
```

**{app_name}/consumers.py**
```python
# Consumers need to be in the consumers module in order to be loaded,
# or you can import them to force them to be loaded.
from nats_consumer import JetstreamPushConsumer

import logging

from nats_consumer import JetstreamPushConsumer, operations

logger = logging.getLogger(__name__)


class OrderConsumer(JetstreamPushConsumer):
    stream_name = "orders"
    subjects = [
        "orders.created",
    ]

    # You need to setup the streams
    async def setup(self):
        return [
            operations.CreateStream(
                name=self.stream_name,
                subjects=self.subjects,
                storage="file"
            ),
        ]

    async def handle_message(self, message):
        # The message only shows if its logged as error
        logger.error(f"Received message: {message.data}")

```

**publish.py**
```python
import asyncio

from nats_consumer import get_nats_client

async def publish_messages():
    ns = await get_nats_client()
    js = ns.jetstream()
    for i in range(5):
        data = {"id": i, "name": f"Order {i}"}
        data_b = json.dumps(data).encode("utf-8")
        print(f"Publishing message {i}...")
        await js.publish("orders.created", data_b)

if __name__ == "__main__":
    asyncio.run(publish_messages())

```

## Running Consumers
**To run a single consumer:**
```bash
python manage.py nats_consumer OrderConsumer --setup
```

**To run multiple consumers:**
```bash
python manage.py nats_consumer OrderConsumer AnotherConsumer
```

**To run all consumers:**
```bash
python manage.py nats_consumer
```

## Feature roadmap
- Encoding/decoding of messages (json, protobuf, etc)
- Better error handling, configurable retry
- Better log output from the consumer
- Configurable DLQ strategies
- [insert your feature here]

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/dev360/django-nats-consumer",
    "name": "django-nats-consumer",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": null,
    "keywords": "django, nats, jetstream, consumer, async",
    "author": null,
    "author_email": null,
    "download_url": "https://files.pythonhosted.org/packages/8e/cb/6016fcf2e51464b3b3cdfe0e23a8906b4419d07a37926c2de0ad1401ac85/django_nats_consumer-1.2.1.tar.gz",
    "platform": null,
    "description": "# django-nats-consumer\nNATS + Django = \u26a1\ufe0f\n\n## Installation\n\nPlease pay attention to the development status; this is Pre-Alpha software; expect the api to evolve as I start using this more in production.\n\nI hope you find some value in it - writing a good consumer takes some finesse.\n\n\n```bash\npip install django-nats-consumer\n```\n\n\n## Usage\n\n**settings.py**\n```python\nINSTALLED_APPS = [\n    ...\n    \"nats_consumer\",\n]\n\nNATS_CONSUMER = {\n    \"connect_args\": {\n        \"servers\": [\"nats://localhost:4222\"],\n        \"allow_reconnect\": True,\n        \"max_reconnect_attempts\": 5,\n        \"reconnect_time_wait\": 1,\n        \"connect_timeout\": 10,\n    },\n}\n```\n\n**{app_name}/consumers.py**\n```python\n# Consumers need to be in the consumers module in order to be loaded,\n# or you can import them to force them to be loaded.\nfrom nats_consumer import JetstreamPushConsumer\n\nimport logging\n\nfrom nats_consumer import JetstreamPushConsumer, operations\n\nlogger = logging.getLogger(__name__)\n\n\nclass OrderConsumer(JetstreamPushConsumer):\n    stream_name = \"orders\"\n    subjects = [\n        \"orders.created\",\n    ]\n\n    # You need to setup the streams\n    async def setup(self):\n        return [\n            operations.CreateStream(\n                name=self.stream_name,\n                subjects=self.subjects,\n                storage=\"file\"\n            ),\n        ]\n\n    async def handle_message(self, message):\n        # The message only shows if its logged as error\n        logger.error(f\"Received message: {message.data}\")\n\n```\n\n**publish.py**\n```python\nimport asyncio\n\nfrom nats_consumer import get_nats_client\n\nasync def publish_messages():\n    ns = await get_nats_client()\n    js = ns.jetstream()\n    for i in range(5):\n        data = {\"id\": i, \"name\": f\"Order {i}\"}\n        data_b = json.dumps(data).encode(\"utf-8\")\n        print(f\"Publishing message {i}...\")\n        await js.publish(\"orders.created\", data_b)\n\nif __name__ == \"__main__\":\n    asyncio.run(publish_messages())\n\n```\n\n## Running Consumers\n**To run a single consumer:**\n```bash\npython manage.py nats_consumer OrderConsumer --setup\n```\n\n**To run multiple consumers:**\n```bash\npython manage.py nats_consumer OrderConsumer AnotherConsumer\n```\n\n**To run all consumers:**\n```bash\npython manage.py nats_consumer\n```\n\n## Feature roadmap\n- Encoding/decoding of messages (json, protobuf, etc)\n- Better error handling, configurable retry\n- Better log output from the consumer\n- Configurable DLQ strategies\n- [insert your feature here]\n",
    "bugtrack_url": null,
    "license": "BSD-3-Clause",
    "summary": "Django NATS Consumer",
    "version": "1.2.1",
    "project_urls": {
        "Homepage": "https://github.com/dev360/django-nats-consumer",
        "Repository": "https://github.com/dev360/django-nats-consumer"
    },
    "split_keywords": [
        "django",
        " nats",
        " jetstream",
        " consumer",
        " async"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "0c83d37210632f008234391d7a75223a55b7ff827e525779454f0164aed4fe60",
                "md5": "a12aed8adf406497452c137555708933",
                "sha256": "d24b69a07599ac464793aa15243a50e46b293af36685223f89215485db27a677"
            },
            "downloads": -1,
            "filename": "django_nats_consumer-1.2.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "a12aed8adf406497452c137555708933",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 10737,
            "upload_time": "2025-01-28T05:09:27",
            "upload_time_iso_8601": "2025-01-28T05:09:27.749069Z",
            "url": "https://files.pythonhosted.org/packages/0c/83/d37210632f008234391d7a75223a55b7ff827e525779454f0164aed4fe60/django_nats_consumer-1.2.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "8ecb6016fcf2e51464b3b3cdfe0e23a8906b4419d07a37926c2de0ad1401ac85",
                "md5": "d04a098148504b5e5efda348a1fc4e63",
                "sha256": "f539dac9a619b577e694ffcb4c1896adfae374eab1b01b2f20e26722f69f9d7c"
            },
            "downloads": -1,
            "filename": "django_nats_consumer-1.2.1.tar.gz",
            "has_sig": false,
            "md5_digest": "d04a098148504b5e5efda348a1fc4e63",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 9569,
            "upload_time": "2025-01-28T05:09:29",
            "upload_time_iso_8601": "2025-01-28T05:09:29.243322Z",
            "url": "https://files.pythonhosted.org/packages/8e/cb/6016fcf2e51464b3b3cdfe0e23a8906b4419d07a37926c2de0ad1401ac85/django_nats_consumer-1.2.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-01-28 05:09:29",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "dev360",
    "github_project": "django-nats-consumer",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "django-nats-consumer"
}
        
Elapsed time: 1.17601s