django-nats-consumer


Namedjango-nats-consumer JSON
Version 1.3.0 PyPI version JSON
download
home_pagehttps://github.com/dev360/django-nats-consumer
SummaryDjango NATS Consumer
upload_time2025-02-09 21:01:13
maintainerNone
docs_urlNone
authorNone
requires_python>=3.9
licenseBSD-3-Clause
keywords django nats jetstream consumer async
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # django-nats-consumer
NATS + Django = ⚡️

## Installation

Please pay attention to the development status; this is Pre-Alpha software; expect the api to evolve as I start using this more in production.

```bash
pip install django-nats-consumer
```


## Usage

**settings.py**
```python
INSTALLED_APPS = [
    ...
    "nats_consumer",
    ...
]

NATS_CONSUMER = {
    "connect_args": {
        "servers": ["nats://localhost:4222"],
        "allow_reconnect": True,
        "max_reconnect_attempts": 5,
        "reconnect_time_wait": 1,
        "connect_timeout": 10,
    },
}
```

**{app_name}/consumers.py**
```python
# Consumers need to be in the consumers module in order to be loaded,
# or you can import them to force them to be loaded.
from nats_consumer import JetstreamPushConsumer

import logging

from nats_consumer import JetstreamPushConsumer, operations

logger = logging.getLogger(__name__)


class OrderConsumer(JetstreamPushConsumer):
    stream_name = "orders"
    subjects = [
        "orders.created",
    ]

    # You need to setup the streams
    async def setup(self):
        return [
            operations.CreateStream(
                name=self.stream_name,
                subjects=self.subjects,
                storage="file"
            ),
        ]

    async def handle_message(self, message):
        # The message only shows if its logged as error
        logger.error(f"Received message: {message.data}")

```

**publish.py**
```python
import asyncio

from nats_consumer import get_nats_client

async def publish_messages():
    ns = await get_nats_client()
    js = ns.jetstream()
    for i in range(5):
        data = {"id": i, "name": f"Order {i}"}
        data_b = json.dumps(data).encode("utf-8")
        print(f"Publishing message {i}...")
        await js.publish("orders.created", data_b)

if __name__ == "__main__":
    asyncio.run(publish_messages())

```

## Running Consumers
**To run a single consumer:**
```bash
python manage.py nats_consumer OrderConsumer --setup
```

**To run multiple consumers:**
```bash
python manage.py nats_consumer OrderConsumer AnotherConsumer
```

**To run all consumers:**
```bash
python manage.py nats_consumer
```

**Additional Options:**
```bash
# Enable uvloop for better performance
python manage.py nats_consumer --uvloop

# Enable auto-reload for development (watches for file changes)
python manage.py nats_consumer --reload

# Combine options
python manage.py nats_consumer OrderConsumer --uvloop --reload
```

**Note**
When running your code in production, uvloop typically provides better performance than the default asyncio event loop, but it's only available on Unix-like systems (Linux, macOS). To use uvloop, please install using:

```bash
pip install django-nats-consumer[uvloop]
```

And add the following to your settings.py:
```python
NATS_CONSUMER = {
    "event_loop_policy": "uvloop.EventLoopPolicy",
    ...
}
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/dev360/django-nats-consumer",
    "name": "django-nats-consumer",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.9",
    "maintainer_email": null,
    "keywords": "django, nats, jetstream, consumer, async",
    "author": null,
    "author_email": null,
    "download_url": "https://files.pythonhosted.org/packages/1b/f0/0f381ba1efb9d01830a0906150f6ca002be45e4f618c08ca6d161e6295d7/django_nats_consumer-1.3.0.tar.gz",
    "platform": null,
    "description": "# django-nats-consumer\nNATS + Django = \u26a1\ufe0f\n\n## Installation\n\nPlease pay attention to the development status; this is Pre-Alpha software; expect the api to evolve as I start using this more in production.\n\n```bash\npip install django-nats-consumer\n```\n\n\n## Usage\n\n**settings.py**\n```python\nINSTALLED_APPS = [\n    ...\n    \"nats_consumer\",\n    ...\n]\n\nNATS_CONSUMER = {\n    \"connect_args\": {\n        \"servers\": [\"nats://localhost:4222\"],\n        \"allow_reconnect\": True,\n        \"max_reconnect_attempts\": 5,\n        \"reconnect_time_wait\": 1,\n        \"connect_timeout\": 10,\n    },\n}\n```\n\n**{app_name}/consumers.py**\n```python\n# Consumers need to be in the consumers module in order to be loaded,\n# or you can import them to force them to be loaded.\nfrom nats_consumer import JetstreamPushConsumer\n\nimport logging\n\nfrom nats_consumer import JetstreamPushConsumer, operations\n\nlogger = logging.getLogger(__name__)\n\n\nclass OrderConsumer(JetstreamPushConsumer):\n    stream_name = \"orders\"\n    subjects = [\n        \"orders.created\",\n    ]\n\n    # You need to setup the streams\n    async def setup(self):\n        return [\n            operations.CreateStream(\n                name=self.stream_name,\n                subjects=self.subjects,\n                storage=\"file\"\n            ),\n        ]\n\n    async def handle_message(self, message):\n        # The message only shows if its logged as error\n        logger.error(f\"Received message: {message.data}\")\n\n```\n\n**publish.py**\n```python\nimport asyncio\n\nfrom nats_consumer import get_nats_client\n\nasync def publish_messages():\n    ns = await get_nats_client()\n    js = ns.jetstream()\n    for i in range(5):\n        data = {\"id\": i, \"name\": f\"Order {i}\"}\n        data_b = json.dumps(data).encode(\"utf-8\")\n        print(f\"Publishing message {i}...\")\n        await js.publish(\"orders.created\", data_b)\n\nif __name__ == \"__main__\":\n    asyncio.run(publish_messages())\n\n```\n\n## Running Consumers\n**To run a single consumer:**\n```bash\npython manage.py nats_consumer OrderConsumer --setup\n```\n\n**To run multiple consumers:**\n```bash\npython manage.py nats_consumer OrderConsumer AnotherConsumer\n```\n\n**To run all consumers:**\n```bash\npython manage.py nats_consumer\n```\n\n**Additional Options:**\n```bash\n# Enable uvloop for better performance\npython manage.py nats_consumer --uvloop\n\n# Enable auto-reload for development (watches for file changes)\npython manage.py nats_consumer --reload\n\n# Combine options\npython manage.py nats_consumer OrderConsumer --uvloop --reload\n```\n\n**Note**\nWhen running your code in production, uvloop typically provides better performance than the default asyncio event loop, but it's only available on Unix-like systems (Linux, macOS). To use uvloop, please install using:\n\n```bash\npip install django-nats-consumer[uvloop]\n```\n\nAnd add the following to your settings.py:\n```python\nNATS_CONSUMER = {\n    \"event_loop_policy\": \"uvloop.EventLoopPolicy\",\n    ...\n}\n```\n",
    "bugtrack_url": null,
    "license": "BSD-3-Clause",
    "summary": "Django NATS Consumer",
    "version": "1.3.0",
    "project_urls": {
        "Homepage": "https://github.com/dev360/django-nats-consumer",
        "Repository": "https://github.com/dev360/django-nats-consumer"
    },
    "split_keywords": [
        "django",
        " nats",
        " jetstream",
        " consumer",
        " async"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "e794dd6be61dd3f77af85b41d34a4600d97715ee099ad0f8e4abf6988b3c1c14",
                "md5": "9ac6e0f206573e97ae7b46cd2e37a340",
                "sha256": "ac996f0d06433ae21e5fc93943f7b4d0a855e172d76c70cc805549ad82fe1af5"
            },
            "downloads": -1,
            "filename": "django_nats_consumer-1.3.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "9ac6e0f206573e97ae7b46cd2e37a340",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.9",
            "size": 11574,
            "upload_time": "2025-02-09T21:01:11",
            "upload_time_iso_8601": "2025-02-09T21:01:11.597603Z",
            "url": "https://files.pythonhosted.org/packages/e7/94/dd6be61dd3f77af85b41d34a4600d97715ee099ad0f8e4abf6988b3c1c14/django_nats_consumer-1.3.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "1bf00f381ba1efb9d01830a0906150f6ca002be45e4f618c08ca6d161e6295d7",
                "md5": "b4f410593ffd4ca397f0a2146dd0b77e",
                "sha256": "a8d72b7663400c6061ba2c643d1518591f2a9f112985f8563834234dc47e1d8f"
            },
            "downloads": -1,
            "filename": "django_nats_consumer-1.3.0.tar.gz",
            "has_sig": false,
            "md5_digest": "b4f410593ffd4ca397f0a2146dd0b77e",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.9",
            "size": 10501,
            "upload_time": "2025-02-09T21:01:13",
            "upload_time_iso_8601": "2025-02-09T21:01:13.546518Z",
            "url": "https://files.pythonhosted.org/packages/1b/f0/0f381ba1efb9d01830a0906150f6ca002be45e4f618c08ca6d161e6295d7/django_nats_consumer-1.3.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-02-09 21:01:13",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "dev360",
    "github_project": "django-nats-consumer",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "django-nats-consumer"
}
        
Elapsed time: 0.57193s