nats-app


Namenats-app JSON
Version 0.7.4 PyPI version JSON
download
home_pageNone
SummaryNATS App is wrapper client on NATS Connection
upload_time2025-08-28 20:43:27
maintainerNone
docs_urlNone
authorVadym Statishyn
requires_python<4.0,>=3.12
licenseApache-2.0
keywords nats
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # NATS App

[![pypi](https://img.shields.io/pypi/v/nats_app.svg)](https://pypi.python.org/pypi/nats_app)
[![versions](https://img.shields.io/pypi/pyversions/nats_app.svg)](https://github.com/centum/nats_app)
[![test](https://github.com/centum/nats_app/actions/workflows/test.yml/badge.svg)](https://github.com/centum/nats_app/actions/workflows/test.yml)
[![Apache 2 licensed](https://img.shields.io/badge/license-Apache2-blue.svg)](https://raw.githubusercontent.com/centum/nats_app/refs/heads/master/LICENSE)

NATS App is wrapper application on NATS Connection

### Create NATS Application

```python
NATS_URL = "nats://localhost:4222"

nc = NATSApp(NATS_URL)
await nc.connect()
```

### Add RPC Handler

```python
@nc.push_subscribe("app.test.echo", queue="worker")
async def rpc_func_echo(msg: Any) -> str:
    return f"Response with Msg.data: {msg.data}"
```

### Call RPC
```python
res = await nc.request("app.test.echo", {"args": ["test"]})
print(res)
```

### JetStream push subscription

```python
@nc.js_push_subscribe("app.test.js.subs", queue="worker")
async def handler(msg: Msg):
    print(msg.data)
    await msg.ack()
```

### JetStream pull subscription

```python
@nc.js_pull_subscribe("app.subject.js.subs", batch=1)
async def handler(msgs: list[Msg]):
    for m in msgs:
        print(m)
        await m.ack()
```

### JetStream publish

```python
await nc.js.publish("app.subject.js.subs", b"TEST123")
```


            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "nats-app",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.12",
    "maintainer_email": null,
    "keywords": "nats",
    "author": "Vadym Statishyn",
    "author_email": "statishin@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/d5/76/6aad9e146ffac4892856487745d293bfd3bd63896e474a6bd1e2362ddab5/nats_app-0.7.4.tar.gz",
    "platform": null,
    "description": "# NATS App\n\n[![pypi](https://img.shields.io/pypi/v/nats_app.svg)](https://pypi.python.org/pypi/nats_app)\n[![versions](https://img.shields.io/pypi/pyversions/nats_app.svg)](https://github.com/centum/nats_app)\n[![test](https://github.com/centum/nats_app/actions/workflows/test.yml/badge.svg)](https://github.com/centum/nats_app/actions/workflows/test.yml)\n[![Apache 2 licensed](https://img.shields.io/badge/license-Apache2-blue.svg)](https://raw.githubusercontent.com/centum/nats_app/refs/heads/master/LICENSE)\n\nNATS App is wrapper application on NATS Connection\n\n### Create NATS Application\n\n```python\nNATS_URL = \"nats://localhost:4222\"\n\nnc = NATSApp(NATS_URL)\nawait nc.connect()\n```\n\n### Add RPC Handler\n\n```python\n@nc.push_subscribe(\"app.test.echo\", queue=\"worker\")\nasync def rpc_func_echo(msg: Any) -> str:\n    return f\"Response with Msg.data: {msg.data}\"\n```\n\n### Call RPC\n```python\nres = await nc.request(\"app.test.echo\", {\"args\": [\"test\"]})\nprint(res)\n```\n\n### JetStream push subscription\n\n```python\n@nc.js_push_subscribe(\"app.test.js.subs\", queue=\"worker\")\nasync def handler(msg: Msg):\n    print(msg.data)\n    await msg.ack()\n```\n\n### JetStream pull subscription\n\n```python\n@nc.js_pull_subscribe(\"app.subject.js.subs\", batch=1)\nasync def handler(msgs: list[Msg]):\n    for m in msgs:\n        print(m)\n        await m.ack()\n```\n\n### JetStream publish\n\n```python\nawait nc.js.publish(\"app.subject.js.subs\", b\"TEST123\")\n```\n\n",
    "bugtrack_url": null,
    "license": "Apache-2.0",
    "summary": "NATS App is wrapper client on NATS Connection",
    "version": "0.7.4",
    "project_urls": null,
    "split_keywords": [
        "nats"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "7e2a4a1ba8502842c27ae3346f8e381c2275a46726dd778e41efd950e18a18c6",
                "md5": "73875743741d7f4644caa18dfc67a515",
                "sha256": "25329752438933275c2763500705fd740ede915064231bb7ea69e27ac8842f12"
            },
            "downloads": -1,
            "filename": "nats_app-0.7.4-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "73875743741d7f4644caa18dfc67a515",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.12",
            "size": 27079,
            "upload_time": "2025-08-28T20:43:26",
            "upload_time_iso_8601": "2025-08-28T20:43:26.167791Z",
            "url": "https://files.pythonhosted.org/packages/7e/2a/4a1ba8502842c27ae3346f8e381c2275a46726dd778e41efd950e18a18c6/nats_app-0.7.4-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "d5766aad9e146ffac4892856487745d293bfd3bd63896e474a6bd1e2362ddab5",
                "md5": "b6991f8f2dde52ed05dd36d243ef03b4",
                "sha256": "83e43fca424a739af55746fdafeee839fecc647531e35679e3fe6d3dc2c6816d"
            },
            "downloads": -1,
            "filename": "nats_app-0.7.4.tar.gz",
            "has_sig": false,
            "md5_digest": "b6991f8f2dde52ed05dd36d243ef03b4",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.12",
            "size": 20940,
            "upload_time": "2025-08-28T20:43:27",
            "upload_time_iso_8601": "2025-08-28T20:43:27.350929Z",
            "url": "https://files.pythonhosted.org/packages/d5/76/6aad9e146ffac4892856487745d293bfd3bd63896e474a6bd1e2362ddab5/nats_app-0.7.4.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-08-28 20:43:27",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "nats-app"
}
        
Elapsed time: 1.47830s