nats-py


Namenats-py JSON
Version 2.7.2 PyPI version JSON
download
home_page
SummaryNATS client for Python
upload_time2024-02-27 17:22:23
maintainer
docs_urlNone
author
requires_python>=3.7
licenseApache 2 License
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI
coveralls test coverage No coveralls.
            # NATS - Python3 Client for Asyncio

An [asyncio](https://docs.python.org/3/library/asyncio.html) Python client for the [NATS messaging system](https://nats.io).

[![docs](https://img.shields.io/static/v1?label=docs&message=docs&color=informational)](https://nats-io.github.io/nats.py/)
[![pypi](https://img.shields.io/pypi/v/nats-py.svg)](https://pypi.org/project/nats-py)
[![Build Status](https://travis-ci.com/nats-io/nats.py.svg?branch=main)](http://travis-ci.com/nats-io/nats.py)
[![Versions](https://img.shields.io/pypi/pyversions/nats-py.svg)](https://pypi.org/project/nats-py)
[![License Apache 2.0](https://img.shields.io/badge/License-Apache2-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0)

## Supported platforms

Should be compatible with at least [Python +3.7](https://docs.python.org/3.7/library/asyncio.html).

## Installing

```bash
pip install nats-py
```

## Getting started

```python
import asyncio
import nats
from nats.errors import ConnectionClosedError, TimeoutError, NoServersError

async def main():
    # It is very likely that the demo server will see traffic from clients other than yours.
    # To avoid this, start your own locally and modify the example to use it.
    nc = await nats.connect("nats://demo.nats.io:4222")

    # You can also use the following for TLS against the demo server.
    #
    # nc = await nats.connect("tls://demo.nats.io:4443")

    async def message_handler(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        print("Received a message on '{subject} {reply}': {data}".format(
            subject=subject, reply=reply, data=data))

    # Simple publisher and async subscriber via coroutine.
    sub = await nc.subscribe("foo", cb=message_handler)

    # Stop receiving after 2 messages.
    await sub.unsubscribe(limit=2)
    await nc.publish("foo", b'Hello')
    await nc.publish("foo", b'World')
    await nc.publish("foo", b'!!!!!')

    # Synchronous style with iterator also supported.
    sub = await nc.subscribe("bar")
    await nc.publish("bar", b'First')
    await nc.publish("bar", b'Second')

    try:
        async for msg in sub.messages:
            print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
            await sub.unsubscribe()
    except Exception as e:
        pass

    async def help_request(msg):
        print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
        await nc.publish(msg.reply, b'I can help')

    # Use queue named 'workers' for distributing requests
    # among subscribers.
    sub = await nc.subscribe("help", "workers", help_request)

    # Send a request and expect a single response
    # and trigger timeout if not faster than 500 ms.
    try:
        response = await nc.request("help", b'help me', timeout=0.5)
        print("Received response: {message}".format(
            message=response.data.decode()))
    except TimeoutError:
        print("Request timed out")

    # Remove interest in subscription.
    await sub.unsubscribe()

    # Terminate connection to NATS.
    await nc.drain()

if __name__ == '__main__':
    asyncio.run(main())
```

## JetStream

Starting v2.0.0 series, the client now has JetStream support:

```python
import asyncio
import nats
from nats.errors import TimeoutError

async def main():
    nc = await nats.connect("localhost")

    # Create JetStream context.
    js = nc.jetstream()

    # Persist messages on 'foo's subject.
    await js.add_stream(name="sample-stream", subjects=["foo"])

    for i in range(0, 10):
        ack = await js.publish("foo", f"hello world: {i}".encode())
        print(ack)

    # Create pull based consumer on 'foo'.
    psub = await js.pull_subscribe("foo", "psub")

    # Fetch and ack messagess from consumer.
    for i in range(0, 10):
        msgs = await psub.fetch(1)
        for msg in msgs:
            await msg.ack()
            print(msg)

    # Create single ephemeral push based subscriber.
    sub = await js.subscribe("foo")
    msg = await sub.next_msg()
    await msg.ack()

    # Create single push based subscriber that is durable across restarts.
    sub = await js.subscribe("foo", durable="myapp")
    msg = await sub.next_msg()
    await msg.ack()

    # Create deliver group that will be have load balanced messages.
    async def qsub_a(msg):
        print("QSUB A:", msg)
        await msg.ack()

    async def qsub_b(msg):
        print("QSUB B:", msg)
        await msg.ack()
    await js.subscribe("foo", "workers", cb=qsub_a)
    await js.subscribe("foo", "workers", cb=qsub_b)

    for i in range(0, 10):
        ack = await js.publish("foo", f"hello world: {i}".encode())
        print("\t", ack)

    # Create ordered consumer with flow control and heartbeats
    # that auto resumes on failures.
    osub = await js.subscribe("foo", ordered_consumer=True)
    data = bytearray()

    while True:
        try:
            msg = await osub.next_msg()
            data.extend(msg.data)
        except TimeoutError:
            break
    print("All data in stream:", len(data))

    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())
```

## TLS

TLS connections can be configured with an [ssl context](https://docs.python.org/3/library/ssl.html#context-creation)

```python
ssl_ctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
ssl_ctx.load_verify_locations('ca.pem')
ssl_ctx.load_cert_chain(certfile='client-cert.pem',
                        keyfile='client-key.pem')
await nats.connect(servers=["tls://127.0.0.1:4443"], tls=ssl_ctx, tls_hostname="localhost")
```

Setting the scheme to `tls` in the connect URL will make the client create a [default ssl context](https://docs.python.org/3/library/ssl.html#ssl.create_default_context) automatically:

```python
import asyncio
import ssl
from nats.aio.client import Client as NATS

async def run():
    nc = NATS()
    await nc.connect("tls://demo.nats.io:4443")
```

*Note*: If getting SSL certificate errors in OS X, try first installing the `certifi` certificate bundle. If using Python 3.7 for example, then run:

```ps
$ /Applications/Python\ 3.7/Install\ Certificates.command
 -- pip install --upgrade certifi
Collecting certifi
...
 -- removing any existing file or link
 -- creating symlink to certifi certificate bundle
 -- setting permissions
 -- update complete
```

## NKEYS and JWT User Credentials

Since [v0.9.0](https://github.com/nats-io/nats.py/releases/tag/v0.9.0) release,
you can also optionally install [NKEYS](https://github.com/nats-io/nkeys.py) in order to use
the new NATS v2.0 auth features:

```sh
pip install nats-py[nkeys]
```

Usage:

```python
await nats.connect("tls://connect.ngs.global:4222", user_credentials="/path/to/secret.creds")
```

## Development

1. [Install nats server](https://docs.nats.io/running-a-nats-service/introduction/installation).
1. Make sure the server is available in your PATH: `nats-server -v`.
1. Install dependencies: `python3 -m pipenv install --dev`.
1. Run tests: `python3 -m pytest`.

## License

Unless otherwise noted, the NATS source files are distributed under
the Apache Version 2.0 license found in the LICENSE file.

            

Raw data

            {
    "_id": null,
    "home_page": "",
    "name": "nats-py",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.7",
    "maintainer_email": "",
    "keywords": "",
    "author": "",
    "author_email": "Waldemar Quevedo <wally@synadia.com>",
    "download_url": "https://files.pythonhosted.org/packages/a4/30/16d3c830715d5b6e48e323a53db6796525a725bcf71a83b97991f8cc6866/nats-py-2.7.2.tar.gz",
    "platform": null,
    "description": "# NATS - Python3 Client for Asyncio\n\nAn [asyncio](https://docs.python.org/3/library/asyncio.html) Python client for the [NATS messaging system](https://nats.io).\n\n[![docs](https://img.shields.io/static/v1?label=docs&message=docs&color=informational)](https://nats-io.github.io/nats.py/)\n[![pypi](https://img.shields.io/pypi/v/nats-py.svg)](https://pypi.org/project/nats-py)\n[![Build Status](https://travis-ci.com/nats-io/nats.py.svg?branch=main)](http://travis-ci.com/nats-io/nats.py)\n[![Versions](https://img.shields.io/pypi/pyversions/nats-py.svg)](https://pypi.org/project/nats-py)\n[![License Apache 2.0](https://img.shields.io/badge/License-Apache2-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0)\n\n## Supported platforms\n\nShould be compatible with at least [Python +3.7](https://docs.python.org/3.7/library/asyncio.html).\n\n## Installing\n\n```bash\npip install nats-py\n```\n\n## Getting started\n\n```python\nimport asyncio\nimport nats\nfrom nats.errors import ConnectionClosedError, TimeoutError, NoServersError\n\nasync def main():\n    # It is very likely that the demo server will see traffic from clients other than yours.\n    # To avoid this, start your own locally and modify the example to use it.\n    nc = await nats.connect(\"nats://demo.nats.io:4222\")\n\n    # You can also use the following for TLS against the demo server.\n    #\n    # nc = await nats.connect(\"tls://demo.nats.io:4443\")\n\n    async def message_handler(msg):\n        subject = msg.subject\n        reply = msg.reply\n        data = msg.data.decode()\n        print(\"Received a message on '{subject} {reply}': {data}\".format(\n            subject=subject, reply=reply, data=data))\n\n    # Simple publisher and async subscriber via coroutine.\n    sub = await nc.subscribe(\"foo\", cb=message_handler)\n\n    # Stop receiving after 2 messages.\n    await sub.unsubscribe(limit=2)\n    await nc.publish(\"foo\", b'Hello')\n    await nc.publish(\"foo\", b'World')\n    await nc.publish(\"foo\", b'!!!!!')\n\n    # Synchronous style with iterator also supported.\n    sub = await nc.subscribe(\"bar\")\n    await nc.publish(\"bar\", b'First')\n    await nc.publish(\"bar\", b'Second')\n\n    try:\n        async for msg in sub.messages:\n            print(f\"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}\")\n            await sub.unsubscribe()\n    except Exception as e:\n        pass\n\n    async def help_request(msg):\n        print(f\"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}\")\n        await nc.publish(msg.reply, b'I can help')\n\n    # Use queue named 'workers' for distributing requests\n    # among subscribers.\n    sub = await nc.subscribe(\"help\", \"workers\", help_request)\n\n    # Send a request and expect a single response\n    # and trigger timeout if not faster than 500 ms.\n    try:\n        response = await nc.request(\"help\", b'help me', timeout=0.5)\n        print(\"Received response: {message}\".format(\n            message=response.data.decode()))\n    except TimeoutError:\n        print(\"Request timed out\")\n\n    # Remove interest in subscription.\n    await sub.unsubscribe()\n\n    # Terminate connection to NATS.\n    await nc.drain()\n\nif __name__ == '__main__':\n    asyncio.run(main())\n```\n\n## JetStream\n\nStarting v2.0.0 series, the client now has JetStream support:\n\n```python\nimport asyncio\nimport nats\nfrom nats.errors import TimeoutError\n\nasync def main():\n    nc = await nats.connect(\"localhost\")\n\n    # Create JetStream context.\n    js = nc.jetstream()\n\n    # Persist messages on 'foo's subject.\n    await js.add_stream(name=\"sample-stream\", subjects=[\"foo\"])\n\n    for i in range(0, 10):\n        ack = await js.publish(\"foo\", f\"hello world: {i}\".encode())\n        print(ack)\n\n    # Create pull based consumer on 'foo'.\n    psub = await js.pull_subscribe(\"foo\", \"psub\")\n\n    # Fetch and ack messagess from consumer.\n    for i in range(0, 10):\n        msgs = await psub.fetch(1)\n        for msg in msgs:\n            await msg.ack()\n            print(msg)\n\n    # Create single ephemeral push based subscriber.\n    sub = await js.subscribe(\"foo\")\n    msg = await sub.next_msg()\n    await msg.ack()\n\n    # Create single push based subscriber that is durable across restarts.\n    sub = await js.subscribe(\"foo\", durable=\"myapp\")\n    msg = await sub.next_msg()\n    await msg.ack()\n\n    # Create deliver group that will be have load balanced messages.\n    async def qsub_a(msg):\n        print(\"QSUB A:\", msg)\n        await msg.ack()\n\n    async def qsub_b(msg):\n        print(\"QSUB B:\", msg)\n        await msg.ack()\n    await js.subscribe(\"foo\", \"workers\", cb=qsub_a)\n    await js.subscribe(\"foo\", \"workers\", cb=qsub_b)\n\n    for i in range(0, 10):\n        ack = await js.publish(\"foo\", f\"hello world: {i}\".encode())\n        print(\"\\t\", ack)\n\n    # Create ordered consumer with flow control and heartbeats\n    # that auto resumes on failures.\n    osub = await js.subscribe(\"foo\", ordered_consumer=True)\n    data = bytearray()\n\n    while True:\n        try:\n            msg = await osub.next_msg()\n            data.extend(msg.data)\n        except TimeoutError:\n            break\n    print(\"All data in stream:\", len(data))\n\n    await nc.close()\n\nif __name__ == '__main__':\n    asyncio.run(main())\n```\n\n## TLS\n\nTLS connections can be configured with an [ssl context](https://docs.python.org/3/library/ssl.html#context-creation)\n\n```python\nssl_ctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)\nssl_ctx.load_verify_locations('ca.pem')\nssl_ctx.load_cert_chain(certfile='client-cert.pem',\n                        keyfile='client-key.pem')\nawait nats.connect(servers=[\"tls://127.0.0.1:4443\"], tls=ssl_ctx, tls_hostname=\"localhost\")\n```\n\nSetting the scheme to `tls` in the connect URL will make the client create a [default ssl context](https://docs.python.org/3/library/ssl.html#ssl.create_default_context) automatically:\n\n```python\nimport asyncio\nimport ssl\nfrom nats.aio.client import Client as NATS\n\nasync def run():\n    nc = NATS()\n    await nc.connect(\"tls://demo.nats.io:4443\")\n```\n\n*Note*: If getting SSL certificate errors in OS X, try first installing the `certifi` certificate bundle. If using Python 3.7 for example, then run:\n\n```ps\n$ /Applications/Python\\ 3.7/Install\\ Certificates.command\n -- pip install --upgrade certifi\nCollecting certifi\n...\n -- removing any existing file or link\n -- creating symlink to certifi certificate bundle\n -- setting permissions\n -- update complete\n```\n\n## NKEYS and JWT User Credentials\n\nSince [v0.9.0](https://github.com/nats-io/nats.py/releases/tag/v0.9.0) release,\nyou can also optionally install [NKEYS](https://github.com/nats-io/nkeys.py) in order to use\nthe new NATS v2.0 auth features:\n\n```sh\npip install nats-py[nkeys]\n```\n\nUsage:\n\n```python\nawait nats.connect(\"tls://connect.ngs.global:4222\", user_credentials=\"/path/to/secret.creds\")\n```\n\n## Development\n\n1. [Install nats server](https://docs.nats.io/running-a-nats-service/introduction/installation).\n1. Make sure the server is available in your PATH: `nats-server -v`.\n1. Install dependencies: `python3 -m pipenv install --dev`.\n1. Run tests: `python3 -m pytest`.\n\n## License\n\nUnless otherwise noted, the NATS source files are distributed under\nthe Apache Version 2.0 license found in the LICENSE file.\n",
    "bugtrack_url": null,
    "license": "Apache 2 License",
    "summary": "NATS client for Python",
    "version": "2.7.2",
    "project_urls": {
        "Bug Tracker": "https://github.com/nats-io/nats.py/issues",
        "Homepage": "https://github.com/nats-io/nats.py"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "a43016d3c830715d5b6e48e323a53db6796525a725bcf71a83b97991f8cc6866",
                "md5": "417b37ef5a8fca6191d456e8123213a8",
                "sha256": "0c97b4a57bed0ef1ff9ae6c19bc115ec7ca8ede5ab3e001fd00a377056a547cf"
            },
            "downloads": -1,
            "filename": "nats-py-2.7.2.tar.gz",
            "has_sig": false,
            "md5_digest": "417b37ef5a8fca6191d456e8123213a8",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.7",
            "size": 98698,
            "upload_time": "2024-02-27T17:22:23",
            "upload_time_iso_8601": "2024-02-27T17:22:23.316302Z",
            "url": "https://files.pythonhosted.org/packages/a4/30/16d3c830715d5b6e48e323a53db6796525a725bcf71a83b97991f8cc6866/nats-py-2.7.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-02-27 17:22:23",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "nats-io",
    "github_project": "nats.py",
    "travis_ci": true,
    "coveralls": false,
    "github_actions": false,
    "lcname": "nats-py"
}
        
Elapsed time: 0.19671s