Name | nats-py JSON |
Version |
2.9.0
JSON |
| download |
home_page | None |
Summary | NATS client for Python |
upload_time | 2024-08-26 15:14:17 |
maintainer | None |
docs_url | None |
author | None |
requires_python | >=3.7 |
license | Apache 2 License |
keywords |
|
VCS |
 |
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
|
coveralls test coverage |
No coveralls.
|
# NATS - Python3 Client for Asyncio
An [asyncio](https://docs.python.org/3/library/asyncio.html) Python client for the [NATS messaging system](https://nats.io).
[](https://nats-io.github.io/nats.py/)
[](https://pypi.org/project/nats-py)
[](http://travis-ci.com/nats-io/nats.py)
[](https://pypi.org/project/nats-py)
[](https://www.apache.org/licenses/LICENSE-2.0)
## Supported platforms
Should be compatible with at least [Python +3.8](https://docs.python.org/3.8/library/asyncio.html).
## Installing
```bash
pip install nats-py
```
## Getting started
```python
import asyncio
import nats
from nats.errors import ConnectionClosedError, TimeoutError, NoServersError
async def main():
# It is very likely that the demo server will see traffic from clients other than yours.
# To avoid this, start your own locally and modify the example to use it.
nc = await nats.connect("nats://demo.nats.io:4222")
# You can also use the following for TLS against the demo server.
#
# nc = await nats.connect("tls://demo.nats.io:4443")
async def message_handler(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
print("Received a message on '{subject} {reply}': {data}".format(
subject=subject, reply=reply, data=data))
# Simple publisher and async subscriber via coroutine.
sub = await nc.subscribe("foo", cb=message_handler)
# Stop receiving after 2 messages.
await sub.unsubscribe(limit=2)
await nc.publish("foo", b'Hello')
await nc.publish("foo", b'World')
await nc.publish("foo", b'!!!!!')
# Synchronous style with iterator also supported.
sub = await nc.subscribe("bar")
await nc.publish("bar", b'First')
await nc.publish("bar", b'Second')
try:
async for msg in sub.messages:
print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
await sub.unsubscribe()
except Exception as e:
pass
async def help_request(msg):
print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
await nc.publish(msg.reply, b'I can help')
# Use queue named 'workers' for distributing requests
# among subscribers.
sub = await nc.subscribe("help", "workers", help_request)
# Send a request and expect a single response
# and trigger timeout if not faster than 500 ms.
try:
response = await nc.request("help", b'help me', timeout=0.5)
print("Received response: {message}".format(
message=response.data.decode()))
except TimeoutError:
print("Request timed out")
# Remove interest in subscription.
await sub.unsubscribe()
# Terminate connection to NATS.
await nc.drain()
if __name__ == '__main__':
asyncio.run(main())
```
## JetStream
Starting v2.0.0 series, the client now has JetStream support:
```python
import asyncio
import nats
from nats.errors import TimeoutError
async def main():
nc = await nats.connect("localhost")
# Create JetStream context.
js = nc.jetstream()
# Persist messages on 'foo's subject.
await js.add_stream(name="sample-stream", subjects=["foo"])
for i in range(0, 10):
ack = await js.publish("foo", f"hello world: {i}".encode())
print(ack)
# Create pull based consumer on 'foo'.
psub = await js.pull_subscribe("foo", "psub")
# Fetch and ack messagess from consumer.
for i in range(0, 10):
msgs = await psub.fetch(1)
for msg in msgs:
await msg.ack()
print(msg)
# Create single ephemeral push based subscriber.
sub = await js.subscribe("foo")
msg = await sub.next_msg()
await msg.ack()
# Create single push based subscriber that is durable across restarts.
sub = await js.subscribe("foo", durable="myapp")
msg = await sub.next_msg()
await msg.ack()
# Create deliver group that will be have load balanced messages.
async def qsub_a(msg):
print("QSUB A:", msg)
await msg.ack()
async def qsub_b(msg):
print("QSUB B:", msg)
await msg.ack()
await js.subscribe("foo", "workers", cb=qsub_a)
await js.subscribe("foo", "workers", cb=qsub_b)
for i in range(0, 10):
ack = await js.publish("foo", f"hello world: {i}".encode())
print("\t", ack)
# Create ordered consumer with flow control and heartbeats
# that auto resumes on failures.
osub = await js.subscribe("foo", ordered_consumer=True)
data = bytearray()
while True:
try:
msg = await osub.next_msg()
data.extend(msg.data)
except TimeoutError:
break
print("All data in stream:", len(data))
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
```
## TLS
TLS connections can be configured with an [ssl context](https://docs.python.org/3/library/ssl.html#context-creation)
```python
ssl_ctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
ssl_ctx.load_verify_locations('ca.pem')
ssl_ctx.load_cert_chain(certfile='client-cert.pem',
keyfile='client-key.pem')
await nats.connect(servers=["tls://127.0.0.1:4443"], tls=ssl_ctx, tls_hostname="localhost")
```
Setting the scheme to `tls` in the connect URL will make the client create a [default ssl context](https://docs.python.org/3/library/ssl.html#ssl.create_default_context) automatically:
```python
import asyncio
import ssl
from nats.aio.client import Client as NATS
async def run():
nc = NATS()
await nc.connect("tls://demo.nats.io:4443")
```
*Note*: If getting SSL certificate errors in OS X, try first installing the `certifi` certificate bundle. If using Python 3.7 for example, then run:
```ps
$ /Applications/Python\ 3.7/Install\ Certificates.command
-- pip install --upgrade certifi
Collecting certifi
...
-- removing any existing file or link
-- creating symlink to certifi certificate bundle
-- setting permissions
-- update complete
```
## NKEYS and JWT User Credentials
Since [v0.9.0](https://github.com/nats-io/nats.py/releases/tag/v0.9.0) release,
you can also optionally install [NKEYS](https://github.com/nats-io/nkeys.py) in order to use
the new NATS v2.0 auth features:
```sh
pip install nats-py[nkeys]
```
Usage:
```python
await nats.connect("tls://connect.ngs.global:4222", user_credentials="/path/to/secret.creds")
```
## Development
1. [Install nats server](https://docs.nats.io/running-a-nats-service/introduction/installation).
1. Make sure the server is available in your PATH: `nats-server -v`.
1. Install dependencies: `python3 -m pipenv install --dev`.
1. Run tests: `python3 -m pytest`.
## License
Unless otherwise noted, the NATS source files are distributed under
the Apache Version 2.0 license found in the LICENSE file.
Raw data
{
"_id": null,
"home_page": null,
"name": "nats-py",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.7",
"maintainer_email": null,
"keywords": null,
"author": null,
"author_email": "Waldemar Quevedo <wally@synadia.com>",
"download_url": "https://files.pythonhosted.org/packages/ba/dc/9a01dd9561b736622c0aa3a19f6b40f3eeb22051eaea1475ceb81d5da48d/nats_py-2.9.0.tar.gz",
"platform": null,
"description": "# NATS - Python3 Client for Asyncio\n\nAn [asyncio](https://docs.python.org/3/library/asyncio.html) Python client for the [NATS messaging system](https://nats.io).\n\n[](https://nats-io.github.io/nats.py/)\n[](https://pypi.org/project/nats-py)\n[](http://travis-ci.com/nats-io/nats.py)\n[](https://pypi.org/project/nats-py)\n[](https://www.apache.org/licenses/LICENSE-2.0)\n\n## Supported platforms\n\nShould be compatible with at least [Python +3.8](https://docs.python.org/3.8/library/asyncio.html).\n\n## Installing\n\n```bash\npip install nats-py\n```\n\n## Getting started\n\n```python\nimport asyncio\nimport nats\nfrom nats.errors import ConnectionClosedError, TimeoutError, NoServersError\n\nasync def main():\n # It is very likely that the demo server will see traffic from clients other than yours.\n # To avoid this, start your own locally and modify the example to use it.\n nc = await nats.connect(\"nats://demo.nats.io:4222\")\n\n # You can also use the following for TLS against the demo server.\n #\n # nc = await nats.connect(\"tls://demo.nats.io:4443\")\n\n async def message_handler(msg):\n subject = msg.subject\n reply = msg.reply\n data = msg.data.decode()\n print(\"Received a message on '{subject} {reply}': {data}\".format(\n subject=subject, reply=reply, data=data))\n\n # Simple publisher and async subscriber via coroutine.\n sub = await nc.subscribe(\"foo\", cb=message_handler)\n\n # Stop receiving after 2 messages.\n await sub.unsubscribe(limit=2)\n await nc.publish(\"foo\", b'Hello')\n await nc.publish(\"foo\", b'World')\n await nc.publish(\"foo\", b'!!!!!')\n\n # Synchronous style with iterator also supported.\n sub = await nc.subscribe(\"bar\")\n await nc.publish(\"bar\", b'First')\n await nc.publish(\"bar\", b'Second')\n\n try:\n async for msg in sub.messages:\n print(f\"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}\")\n await sub.unsubscribe()\n except Exception as e:\n pass\n\n async def help_request(msg):\n print(f\"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}\")\n await nc.publish(msg.reply, b'I can help')\n\n # Use queue named 'workers' for distributing requests\n # among subscribers.\n sub = await nc.subscribe(\"help\", \"workers\", help_request)\n\n # Send a request and expect a single response\n # and trigger timeout if not faster than 500 ms.\n try:\n response = await nc.request(\"help\", b'help me', timeout=0.5)\n print(\"Received response: {message}\".format(\n message=response.data.decode()))\n except TimeoutError:\n print(\"Request timed out\")\n\n # Remove interest in subscription.\n await sub.unsubscribe()\n\n # Terminate connection to NATS.\n await nc.drain()\n\nif __name__ == '__main__':\n asyncio.run(main())\n```\n\n## JetStream\n\nStarting v2.0.0 series, the client now has JetStream support:\n\n```python\nimport asyncio\nimport nats\nfrom nats.errors import TimeoutError\n\nasync def main():\n nc = await nats.connect(\"localhost\")\n\n # Create JetStream context.\n js = nc.jetstream()\n\n # Persist messages on 'foo's subject.\n await js.add_stream(name=\"sample-stream\", subjects=[\"foo\"])\n\n for i in range(0, 10):\n ack = await js.publish(\"foo\", f\"hello world: {i}\".encode())\n print(ack)\n\n # Create pull based consumer on 'foo'.\n psub = await js.pull_subscribe(\"foo\", \"psub\")\n\n # Fetch and ack messagess from consumer.\n for i in range(0, 10):\n msgs = await psub.fetch(1)\n for msg in msgs:\n await msg.ack()\n print(msg)\n\n # Create single ephemeral push based subscriber.\n sub = await js.subscribe(\"foo\")\n msg = await sub.next_msg()\n await msg.ack()\n\n # Create single push based subscriber that is durable across restarts.\n sub = await js.subscribe(\"foo\", durable=\"myapp\")\n msg = await sub.next_msg()\n await msg.ack()\n\n # Create deliver group that will be have load balanced messages.\n async def qsub_a(msg):\n print(\"QSUB A:\", msg)\n await msg.ack()\n\n async def qsub_b(msg):\n print(\"QSUB B:\", msg)\n await msg.ack()\n await js.subscribe(\"foo\", \"workers\", cb=qsub_a)\n await js.subscribe(\"foo\", \"workers\", cb=qsub_b)\n\n for i in range(0, 10):\n ack = await js.publish(\"foo\", f\"hello world: {i}\".encode())\n print(\"\\t\", ack)\n\n # Create ordered consumer with flow control and heartbeats\n # that auto resumes on failures.\n osub = await js.subscribe(\"foo\", ordered_consumer=True)\n data = bytearray()\n\n while True:\n try:\n msg = await osub.next_msg()\n data.extend(msg.data)\n except TimeoutError:\n break\n print(\"All data in stream:\", len(data))\n\n await nc.close()\n\nif __name__ == '__main__':\n asyncio.run(main())\n```\n\n## TLS\n\nTLS connections can be configured with an [ssl context](https://docs.python.org/3/library/ssl.html#context-creation)\n\n```python\nssl_ctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)\nssl_ctx.load_verify_locations('ca.pem')\nssl_ctx.load_cert_chain(certfile='client-cert.pem',\n keyfile='client-key.pem')\nawait nats.connect(servers=[\"tls://127.0.0.1:4443\"], tls=ssl_ctx, tls_hostname=\"localhost\")\n```\n\nSetting the scheme to `tls` in the connect URL will make the client create a [default ssl context](https://docs.python.org/3/library/ssl.html#ssl.create_default_context) automatically:\n\n```python\nimport asyncio\nimport ssl\nfrom nats.aio.client import Client as NATS\n\nasync def run():\n nc = NATS()\n await nc.connect(\"tls://demo.nats.io:4443\")\n```\n\n*Note*: If getting SSL certificate errors in OS X, try first installing the `certifi` certificate bundle. If using Python 3.7 for example, then run:\n\n```ps\n$ /Applications/Python\\ 3.7/Install\\ Certificates.command\n -- pip install --upgrade certifi\nCollecting certifi\n...\n -- removing any existing file or link\n -- creating symlink to certifi certificate bundle\n -- setting permissions\n -- update complete\n```\n\n## NKEYS and JWT User Credentials\n\nSince [v0.9.0](https://github.com/nats-io/nats.py/releases/tag/v0.9.0) release,\nyou can also optionally install [NKEYS](https://github.com/nats-io/nkeys.py) in order to use\nthe new NATS v2.0 auth features:\n\n```sh\npip install nats-py[nkeys]\n```\n\nUsage:\n\n```python\nawait nats.connect(\"tls://connect.ngs.global:4222\", user_credentials=\"/path/to/secret.creds\")\n```\n\n## Development\n\n1. [Install nats server](https://docs.nats.io/running-a-nats-service/introduction/installation).\n1. Make sure the server is available in your PATH: `nats-server -v`.\n1. Install dependencies: `python3 -m pipenv install --dev`.\n1. Run tests: `python3 -m pytest`.\n\n## License\n\nUnless otherwise noted, the NATS source files are distributed under\nthe Apache Version 2.0 license found in the LICENSE file.\n",
"bugtrack_url": null,
"license": "Apache 2 License",
"summary": "NATS client for Python",
"version": "2.9.0",
"project_urls": {
"Bug Tracker": "https://github.com/nats-io/nats.py/issues",
"Homepage": "https://github.com/nats-io/nats.py"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "badc9a01dd9561b736622c0aa3a19f6b40f3eeb22051eaea1475ceb81d5da48d",
"md5": "c8f3db602047c0cb6fe985ab61b080eb",
"sha256": "01886eb9e0a87f0ec630652cf1fae65d2a8556378a609bc6cc07d2ea60c8d0dd"
},
"downloads": -1,
"filename": "nats_py-2.9.0.tar.gz",
"has_sig": false,
"md5_digest": "c8f3db602047c0cb6fe985ab61b080eb",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.7",
"size": 110714,
"upload_time": "2024-08-26T15:14:17",
"upload_time_iso_8601": "2024-08-26T15:14:17.337528Z",
"url": "https://files.pythonhosted.org/packages/ba/dc/9a01dd9561b736622c0aa3a19f6b40f3eeb22051eaea1475ceb81d5da48d/nats_py-2.9.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-08-26 15:14:17",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "nats-io",
"github_project": "nats.py",
"travis_ci": true,
"coveralls": false,
"github_actions": false,
"lcname": "nats-py"
}