BrokRPC


NameBrokRPC JSON
Version 0.2.5 PyPI version JSON
download
home_pageNone
Summaryframework for gRPC like server-client communication over message brokers
upload_time2025-01-07 16:24:33
maintainerNone
docs_urlNone
authorzerlok
requires_python<4.0,>=3.12
licenseMIT
keywords python protobuf amqp grpc message-queue message-broker
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # BrokRPC

[![Latest Version](https://img.shields.io/pypi/v/BrokRPC.svg)](https://pypi.python.org/pypi/BrokRPC)
[![Python Supported versions](https://img.shields.io/pypi/pyversions/BrokRPC.svg)](https://pypi.python.org/pypi/BrokRPC)
[![MyPy Strict](https://img.shields.io/badge/mypy-strict-blue)](https://mypy.readthedocs.io/en/stable/getting_started.html#strict-mode-and-configuration)
[![Test Coverage](https://codecov.io/gh/zerlok/BrokRPC/branch/main/graph/badge.svg)](https://codecov.io/gh/zerlok/BrokRPC)
[![Downloads](https://img.shields.io/pypi/dm/BrokRPC.svg)](https://pypistats.org/packages/BrokRPC)
[![GitHub stars](https://img.shields.io/github/stars/zerlok/BrokRPC)](https://github.com/zerlok/BrokRPC/stargazers)

BrokRPC (**Brok**er **R**emote **P**rocedure **C**all) is a framework for gRPC like server-client communication over 
message brokers.

## key features

* strict typing (even `disallow_any_expr=true`)
* same protobuf structures as in gRPC
* similar calls as in gRPC
  * unary-unary
  * (TODO) unary-stream
  * (TODO) stream-unary
  * (TODO) stream-stream
* declarative style, abstract from broker commands (such as declare_exchange / queue_bind)
* publisher & consumer middlewares
* message serializers

## codegen

You can generate python code for server & client from `.proto` files. 
The [pyprotostuben](https://github.com/zerlok/pyprotostuben) project provides protoc plugin `protoc-gen-brokrpc`. See 
pyprotostuben project example for more details.

You may configure codegen output using protobuf extensions from [buf schema registry](https://buf.build/zerlok/brokrpc).

## supported brokers & protocols

* [AMQP](https://www.rabbitmq.com/tutorials/amqp-concepts)
  * [aiormq](https://github.com/mosquito/aiormq)
* (TODO) redis
* (TODO) kafka
* (TODO) NATS

## usage

[pypi package](https://pypi.python.org/pypi/BrokRPC)

install with your favorite python package manager

```shell
pip install BrokRPC[aiormq]
```

### Broker

use Broker as high level API to create consumers & publishers

```python
from brokrpc.broker import Broker

# create & connect to broker with specified params
async with Broker(...) as broker:
    assert broker.is_connected
    # work with broker
    ...
```

### Consumer

```python
import asyncio
from brokrpc.broker import Broker
from brokrpc.message import Message
from brokrpc.options import BindingOptions, QueueOptions, ExchangeOptions
from brokrpc.serializer.json import JSONSerializer


async def register_consumer(broker: Broker) -> None:
    # define consumer function (you also can use async function & `Consumer` interface).
    def consume_binary_message(message: Message[bytes]) -> None:
      print(message)
  
    # consumer is not attached yet
  
    async with broker.consumer(consume_binary_message, BindingOptions(binding_keys=["my-consumer"])):
      # in this code block consumer is attached to broker and can receive messages
      ...
  
    # outside CM consumer is detached from broker and cannot receive messages
  
    async def consume_json(message: Message[object]) -> bool:
        obj = message.body
        if not isinstance(obj, dict):
            return False
    
        username = obj.get("username")
        if not username:
            return False
        
        print(f"Hello, {username}")
        await asyncio.sleep(1.0)  # simulate long processing
    
        return True
  
    async with broker.consumer(
        consume_json,
        BindingOptions(
            exchange=ExchangeOptions(name="json"),
            binding_keys=["my-json-consumer"],
            queue=QueueOptions(name="jsons", auto_delete=True),
        ),
        serializer=JSONSerializer(),
    ):
        ...
```

### ConsumerMiddlewares

...

### Publisher

```python
from brokrpc.broker import Broker
from brokrpc.message import AppMessage
from brokrpc.serializer.json import JSONSerializer


async def publish_messages(broker: Broker) -> None:
    async with broker.publisher() as pub:
        # in this code block publisher is attached to broker and can send messages
        await pub.publish(AppMessage(body=b"this is a binary message", routing_key="test-consumer"))
    
    async with broker.publisher(serializer=JSONSerializer()) as json_pub:
        await json_pub.publish(AppMessage(body={"username": "John Smith"}, routing_key="my-json-consumer"))
```

### Publisher Middlewares

...

### Message

* `Message`
* `AppMessage`
* `PackedMessage`
* `UnpackedMessage`

### Serializer

* `JSONSerializer`
* `ProtobufSerializer`

### RPC Server

* `Server`

### RPC Handler

...

### RPC Client

* `Client`

### RPC Caller

...

## RPC example

### RPC server

run server process with following code

```python
import asyncio

from brokrpc.broker import Broker
from brokrpc.options import ExchangeOptions
from brokrpc.rpc.model import Request
from brokrpc.rpc.server import Server
from brokrpc.serializer.json import JSONSerializer


# define app RPC handler
async def handle_request(request: Request[object]) -> str:
    print(f"{request=!s}")
    print(f"{request.body=}")
    return f"I greet you, {request.body}"


async def main() -> None:
    # create broker & RPC server
    broker = Broker(
        options="amqp://guest:guest@localhost:5672/",
        default_exchange=ExchangeOptions(name="simple-test-app"),
    )
    server = Server(broker)

    # register RPC handler
    server.register_unary_unary_handler(
        func=handle_request,
        routing_key="test-greeting",
        serializer=JSONSerializer(),
    )

    # connect to broker
    async with broker:
        # start RPC server until SIGINT or SIGTERM
        await server.run_until_terminated()


if __name__ == "__main__":
    asyncio.run(main())
```

### RPC client

make a call to RPC server via RPC client with following code

```python
import asyncio

from brokrpc.broker import Broker
from brokrpc.options import ExchangeOptions
from brokrpc.rpc.client import Client
from brokrpc.serializer.json import JSONSerializer


async def main() -> None:
    async with (
        # create & connect to broker
        Broker(
            options="amqp://guest:guest@localhost:5672/",
            default_exchange=ExchangeOptions(name="simple-test-app"),
        ) as broker,
        # create RPC client & get RPC caller
        Client(broker).unary_unary_caller(
            routing_key="test-greeting",
            serializer=JSONSerializer(),
        ) as caller,
    ):
        # publish app message & receive RPC response
        response = await caller.invoke("John")

        print(response)
        print(f"{response.body=}")


if __name__ == "__main__":
    asyncio.run(main())
```


            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "BrokRPC",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.12",
    "maintainer_email": null,
    "keywords": "python, protobuf, amqp, grpc, message-queue, message-broker",
    "author": "zerlok",
    "author_email": "danil.troshnev@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/95/8e/d3a5b1efea98dc97aa26fd4629a1b9fb91f855af203468873dfede37c243/brokrpc-0.2.5.tar.gz",
    "platform": null,
    "description": "# BrokRPC\n\n[![Latest Version](https://img.shields.io/pypi/v/BrokRPC.svg)](https://pypi.python.org/pypi/BrokRPC)\n[![Python Supported versions](https://img.shields.io/pypi/pyversions/BrokRPC.svg)](https://pypi.python.org/pypi/BrokRPC)\n[![MyPy Strict](https://img.shields.io/badge/mypy-strict-blue)](https://mypy.readthedocs.io/en/stable/getting_started.html#strict-mode-and-configuration)\n[![Test Coverage](https://codecov.io/gh/zerlok/BrokRPC/branch/main/graph/badge.svg)](https://codecov.io/gh/zerlok/BrokRPC)\n[![Downloads](https://img.shields.io/pypi/dm/BrokRPC.svg)](https://pypistats.org/packages/BrokRPC)\n[![GitHub stars](https://img.shields.io/github/stars/zerlok/BrokRPC)](https://github.com/zerlok/BrokRPC/stargazers)\n\nBrokRPC (**Brok**er **R**emote **P**rocedure **C**all) is a framework for gRPC like server-client communication over \nmessage brokers.\n\n## key features\n\n* strict typing (even `disallow_any_expr=true`)\n* same protobuf structures as in gRPC\n* similar calls as in gRPC\n  * unary-unary\n  * (TODO) unary-stream\n  * (TODO) stream-unary\n  * (TODO) stream-stream\n* declarative style, abstract from broker commands (such as declare_exchange / queue_bind)\n* publisher & consumer middlewares\n* message serializers\n\n## codegen\n\nYou can generate python code for server & client from `.proto` files. \nThe [pyprotostuben](https://github.com/zerlok/pyprotostuben) project provides protoc plugin `protoc-gen-brokrpc`. See \npyprotostuben project example for more details.\n\nYou may configure codegen output using protobuf extensions from [buf schema registry](https://buf.build/zerlok/brokrpc).\n\n## supported brokers & protocols\n\n* [AMQP](https://www.rabbitmq.com/tutorials/amqp-concepts)\n  * [aiormq](https://github.com/mosquito/aiormq)\n* (TODO) redis\n* (TODO) kafka\n* (TODO) NATS\n\n## usage\n\n[pypi package](https://pypi.python.org/pypi/BrokRPC)\n\ninstall with your favorite python package manager\n\n```shell\npip install BrokRPC[aiormq]\n```\n\n### Broker\n\nuse Broker as high level API to create consumers & publishers\n\n```python\nfrom brokrpc.broker import Broker\n\n# create & connect to broker with specified params\nasync with Broker(...) as broker:\n    assert broker.is_connected\n    # work with broker\n    ...\n```\n\n### Consumer\n\n```python\nimport asyncio\nfrom brokrpc.broker import Broker\nfrom brokrpc.message import Message\nfrom brokrpc.options import BindingOptions, QueueOptions, ExchangeOptions\nfrom brokrpc.serializer.json import JSONSerializer\n\n\nasync def register_consumer(broker: Broker) -> None:\n    # define consumer function (you also can use async function & `Consumer` interface).\n    def consume_binary_message(message: Message[bytes]) -> None:\n      print(message)\n  \n    # consumer is not attached yet\n  \n    async with broker.consumer(consume_binary_message, BindingOptions(binding_keys=[\"my-consumer\"])):\n      # in this code block consumer is attached to broker and can receive messages\n      ...\n  \n    # outside CM consumer is detached from broker and cannot receive messages\n  \n    async def consume_json(message: Message[object]) -> bool:\n        obj = message.body\n        if not isinstance(obj, dict):\n            return False\n    \n        username = obj.get(\"username\")\n        if not username:\n            return False\n        \n        print(f\"Hello, {username}\")\n        await asyncio.sleep(1.0)  # simulate long processing\n    \n        return True\n  \n    async with broker.consumer(\n        consume_json,\n        BindingOptions(\n            exchange=ExchangeOptions(name=\"json\"),\n            binding_keys=[\"my-json-consumer\"],\n            queue=QueueOptions(name=\"jsons\", auto_delete=True),\n        ),\n        serializer=JSONSerializer(),\n    ):\n        ...\n```\n\n### ConsumerMiddlewares\n\n...\n\n### Publisher\n\n```python\nfrom brokrpc.broker import Broker\nfrom brokrpc.message import AppMessage\nfrom brokrpc.serializer.json import JSONSerializer\n\n\nasync def publish_messages(broker: Broker) -> None:\n    async with broker.publisher() as pub:\n        # in this code block publisher is attached to broker and can send messages\n        await pub.publish(AppMessage(body=b\"this is a binary message\", routing_key=\"test-consumer\"))\n    \n    async with broker.publisher(serializer=JSONSerializer()) as json_pub:\n        await json_pub.publish(AppMessage(body={\"username\": \"John Smith\"}, routing_key=\"my-json-consumer\"))\n```\n\n### Publisher Middlewares\n\n...\n\n### Message\n\n* `Message`\n* `AppMessage`\n* `PackedMessage`\n* `UnpackedMessage`\n\n### Serializer\n\n* `JSONSerializer`\n* `ProtobufSerializer`\n\n### RPC Server\n\n* `Server`\n\n### RPC Handler\n\n...\n\n### RPC Client\n\n* `Client`\n\n### RPC Caller\n\n...\n\n## RPC example\n\n### RPC server\n\nrun server process with following code\n\n```python\nimport asyncio\n\nfrom brokrpc.broker import Broker\nfrom brokrpc.options import ExchangeOptions\nfrom brokrpc.rpc.model import Request\nfrom brokrpc.rpc.server import Server\nfrom brokrpc.serializer.json import JSONSerializer\n\n\n# define app RPC handler\nasync def handle_request(request: Request[object]) -> str:\n    print(f\"{request=!s}\")\n    print(f\"{request.body=}\")\n    return f\"I greet you, {request.body}\"\n\n\nasync def main() -> None:\n    # create broker & RPC server\n    broker = Broker(\n        options=\"amqp://guest:guest@localhost:5672/\",\n        default_exchange=ExchangeOptions(name=\"simple-test-app\"),\n    )\n    server = Server(broker)\n\n    # register RPC handler\n    server.register_unary_unary_handler(\n        func=handle_request,\n        routing_key=\"test-greeting\",\n        serializer=JSONSerializer(),\n    )\n\n    # connect to broker\n    async with broker:\n        # start RPC server until SIGINT or SIGTERM\n        await server.run_until_terminated()\n\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n```\n\n### RPC client\n\nmake a call to RPC server via RPC client with following code\n\n```python\nimport asyncio\n\nfrom brokrpc.broker import Broker\nfrom brokrpc.options import ExchangeOptions\nfrom brokrpc.rpc.client import Client\nfrom brokrpc.serializer.json import JSONSerializer\n\n\nasync def main() -> None:\n    async with (\n        # create & connect to broker\n        Broker(\n            options=\"amqp://guest:guest@localhost:5672/\",\n            default_exchange=ExchangeOptions(name=\"simple-test-app\"),\n        ) as broker,\n        # create RPC client & get RPC caller\n        Client(broker).unary_unary_caller(\n            routing_key=\"test-greeting\",\n            serializer=JSONSerializer(),\n        ) as caller,\n    ):\n        # publish app message & receive RPC response\n        response = await caller.invoke(\"John\")\n\n        print(response)\n        print(f\"{response.body=}\")\n\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n```\n\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "framework for gRPC like server-client communication over message brokers",
    "version": "0.2.5",
    "project_urls": {
        "Homepage": "https://github.com/zerlok/BrokRPC",
        "Issues": "https://github.com/zerlok/BrokRPC/issues"
    },
    "split_keywords": [
        "python",
        " protobuf",
        " amqp",
        " grpc",
        " message-queue",
        " message-broker"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "57c2681ac5dd5b9caaeb395bcdfbc1d4648e56ac5b455e479f40b19a548a7596",
                "md5": "ba00d7dbcce5c9168f0ebac5073f7f60",
                "sha256": "05fd7eacc2b3ea04086097bbfbf65f7232288a7deab0e95b3b069ad0d630ed5b"
            },
            "downloads": -1,
            "filename": "brokrpc-0.2.5-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "ba00d7dbcce5c9168f0ebac5073f7f60",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.12",
            "size": 43169,
            "upload_time": "2025-01-07T16:24:30",
            "upload_time_iso_8601": "2025-01-07T16:24:30.853365Z",
            "url": "https://files.pythonhosted.org/packages/57/c2/681ac5dd5b9caaeb395bcdfbc1d4648e56ac5b455e479f40b19a548a7596/brokrpc-0.2.5-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "958ed3a5b1efea98dc97aa26fd4629a1b9fb91f855af203468873dfede37c243",
                "md5": "26d152da9fc5b5f9977d33765832a50d",
                "sha256": "583dfd1225c0523b42f53527a1584e4b6e8d3454d01164392ea4823aa54124bb"
            },
            "downloads": -1,
            "filename": "brokrpc-0.2.5.tar.gz",
            "has_sig": false,
            "md5_digest": "26d152da9fc5b5f9977d33765832a50d",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.12",
            "size": 31498,
            "upload_time": "2025-01-07T16:24:33",
            "upload_time_iso_8601": "2025-01-07T16:24:33.487879Z",
            "url": "https://files.pythonhosted.org/packages/95/8e/d3a5b1efea98dc97aa26fd4629a1b9fb91f855af203468873dfede37c243/brokrpc-0.2.5.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-01-07 16:24:33",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "zerlok",
    "github_project": "BrokRPC",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "brokrpc"
}
        
Elapsed time: 0.45108s