Name | nats-app JSON |
Version |
0.1.6
JSON |
| download |
home_page | None |
Summary | NATS App is wrapper client on NATS Connection |
upload_time | 2025-01-14 18:17:15 |
maintainer | None |
docs_url | None |
author | Vadym Statishyn |
requires_python | <4.0,>=3.12 |
license | Apache-2.0 |
keywords |
nats
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# NATS App
[![pypi](https://img.shields.io/pypi/v/nats_app.svg)](https://pypi.python.org/pypi/nats_app)
[![versions](https://img.shields.io/pypi/pyversions/nats_app.svg)](https://github.com/centum/nats_app)
[![test](https://github.com/centum/nats_app/actions/workflows/test.yml/badge.svg)](https://github.com/centum/nats_app/actions/workflows/test.yml)
[![Apache 2 licensed](https://img.shields.io/badge/license-Apache2-blue.svg)](https://raw.githubusercontent.com/centum/nats_app/refs/heads/master/LICENSE)
NATS App is wrapper application on NATS Connection
### Create NATS Application
```python
NATS_URL = "nats://localhost:4222"
nc = NATSApp(NATS_URL)
await nc.connect()
```
### Add RPC Handler
```python
@nc.push_subscribe("app.test.echo", queue="worker")
async def rpc_func_echo(msg: Any) -> str:
return f"Response with Msg.data: {msg.data}"
```
### Call RPC
```python
res = await nc.request("app.test.echo", {"args": ["test"]})
print(res)
```
### JetStream push subscription
```python
@nc.js_push_subscribe("app.test.js.subs", queue="worker")
async def handler(msg: Msg):
print(msg.data)
await msg.ack()
```
### JetStream pull subscription
```python
@nc.js_pull_subscribe("app.subject.js.subs", batch=1)
async def handler(msgs: list[Msg]):
for m in msgs:
print(m)
await m.ack()
```
### JetStream publish
```python
await nc.js.publish("app.subject.js.subs", b"TEST123")
```
Raw data
{
"_id": null,
"home_page": null,
"name": "nats-app",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.12",
"maintainer_email": null,
"keywords": "nats",
"author": "Vadym Statishyn",
"author_email": "statishin@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/29/9b/abafb11083f9974a945676d9c890d624c00d9ee3951ed5bf873f5ad3ac03/nats_app-0.1.6.tar.gz",
"platform": null,
"description": "# NATS App\n\n[![pypi](https://img.shields.io/pypi/v/nats_app.svg)](https://pypi.python.org/pypi/nats_app)\n[![versions](https://img.shields.io/pypi/pyversions/nats_app.svg)](https://github.com/centum/nats_app)\n[![test](https://github.com/centum/nats_app/actions/workflows/test.yml/badge.svg)](https://github.com/centum/nats_app/actions/workflows/test.yml)\n[![Apache 2 licensed](https://img.shields.io/badge/license-Apache2-blue.svg)](https://raw.githubusercontent.com/centum/nats_app/refs/heads/master/LICENSE)\n\nNATS App is wrapper application on NATS Connection\n\n### Create NATS Application\n\n```python\nNATS_URL = \"nats://localhost:4222\"\n\nnc = NATSApp(NATS_URL)\nawait nc.connect()\n```\n\n### Add RPC Handler\n\n```python\n@nc.push_subscribe(\"app.test.echo\", queue=\"worker\")\nasync def rpc_func_echo(msg: Any) -> str:\n return f\"Response with Msg.data: {msg.data}\"\n```\n\n### Call RPC\n```python\nres = await nc.request(\"app.test.echo\", {\"args\": [\"test\"]})\nprint(res)\n```\n\n### JetStream push subscription\n\n```python\n@nc.js_push_subscribe(\"app.test.js.subs\", queue=\"worker\")\nasync def handler(msg: Msg):\n print(msg.data)\n await msg.ack()\n```\n\n### JetStream pull subscription\n\n```python\n@nc.js_pull_subscribe(\"app.subject.js.subs\", batch=1)\nasync def handler(msgs: list[Msg]):\n for m in msgs:\n print(m)\n await m.ack()\n```\n\n### JetStream publish\n\n```python\nawait nc.js.publish(\"app.subject.js.subs\", b\"TEST123\")\n```\n\n",
"bugtrack_url": null,
"license": "Apache-2.0",
"summary": "NATS App is wrapper client on NATS Connection",
"version": "0.1.6",
"project_urls": null,
"split_keywords": [
"nats"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "5e08b46b4e9fc00d131955e3dbc7cc093bf647ec3e45c409a715673b8bcf3c90",
"md5": "b4026004f6067172c8b4c1ddde384a25",
"sha256": "e7f3702290aa91b066a2e0a8f8be19a9105c669640cb9f59e655c6c91798105c"
},
"downloads": -1,
"filename": "nats_app-0.1.6-py3-none-any.whl",
"has_sig": false,
"md5_digest": "b4026004f6067172c8b4c1ddde384a25",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.12",
"size": 20183,
"upload_time": "2025-01-14T18:17:13",
"upload_time_iso_8601": "2025-01-14T18:17:13.658991Z",
"url": "https://files.pythonhosted.org/packages/5e/08/b46b4e9fc00d131955e3dbc7cc093bf647ec3e45c409a715673b8bcf3c90/nats_app-0.1.6-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "299babafb11083f9974a945676d9c890d624c00d9ee3951ed5bf873f5ad3ac03",
"md5": "e06b5b027130347fdca6461fcd91639f",
"sha256": "3a9853ec8bc694622edad88fb7f2997aee0e8c36e502da2246edff42c3608d91"
},
"downloads": -1,
"filename": "nats_app-0.1.6.tar.gz",
"has_sig": false,
"md5_digest": "e06b5b027130347fdca6461fcd91639f",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.12",
"size": 15531,
"upload_time": "2025-01-14T18:17:15",
"upload_time_iso_8601": "2025-01-14T18:17:15.803423Z",
"url": "https://files.pythonhosted.org/packages/29/9b/abafb11083f9974a945676d9c890d624c00d9ee3951ed5bf873f5ad3ac03/nats_app-0.1.6.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-01-14 18:17:15",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "nats-app"
}