tinybus


Nametinybus JSON
Version 0.2.2 PyPI version JSON
download
home_pageNone
SummaryA modern, minimal event bus for python inspired by Eclipse Vert.x
upload_time2024-12-06 18:57:08
maintainerNone
docs_urlNone
authorNone
requires_python>=3.12
licenseNone
keywords event bus event driven design asynchronous microservices
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # 🚌 TinyBus

A modern, async-first Event Bus for Python 3.12+ inspired by Eclipse Vert.x. TinyBus provides a clean, type-safe way to implement event-driven architectures with both request-response and publish-subscribe patterns.

## Why
Because I've spent quite a lot of time evaluating event bus libraries but none seemed to support the async Request-Response pattern I needed. Some alternatives like [Ethereum's Lahja](https://github.com/ethereum/lahja) or [lightbus](https://github.com/adamcharnock/lightbus) seem too heavy and complex with lots of features I don't need (inter-process communication, queues, RPC...)

The Request-Response Events pattern is quite useful when growing an async backend with lots of internal messages being passed around (using the Actor [Model terminology](https://doc.akka.io/libraries/akka-core/current/typed/guide/actors-intro.html), method calls are considered messages) as it allows to decouple caller and callee very transparently and fits nicely in the asynchronous "mental model".

## Installation
```bash
uv add tinybus
```

## Quick Example

Here's a simple user service implementation showcasing TinyBus's main features

```python
import uuid

from enum import Enum
from typing import Optional

from pydantic import BaseModel
from tinybus import EventBus, Message

# First, we must define our addresses.
class Address(str, Enum):
    CREATE_USER = "create_user"
    GET_USER = "get_user"

# Then, the clases that hold the data being sent around
class User(BaseModel):
    user_id: uuid.UUID
    username: str
    email: str

class CreateUserRequest(BaseModel):
    username: str
    email: str

class CreateUserResponse(BaseModel):
    user: User

class GetUserResponse(BaseModel):
    user: User

# This service is subscribed to all updates sent to the CREATE_USER and GET_USER
# addresses and will run the appropriate methods when a valid request is sent.
class UserService:
    def __init__(self, event_bus: EventBus):
        self._event_bus = event_bus

        # A fake data layer
        self._users: dict[uuid.UUID, User] = {}
        
        # Register handlers
        # If these methods are not used directly anywhere outside this class, you'll likely
        # want to make them private (self._create_user)
        self._event_bus.consumer(Address.CREATE_USER, self.create_user)
        self._event_bus.consumer(Address.GET_USER, self.get_user)
    
    async def create_user(self, message: Message[CreateUserRequest]) -> CreateUserResponse:
        # A Message is fairly simple, it has a header and a body.
        # Most of the time, you'll only be using the body because it contains the actual data you'll use.
        request = message.body

        # In the real world, you'll use a data layer to create your user (likely an async operation)
        user_id = uuid.uuid4()
        user = User(
            id=user_id,
            username=request.username,
            email=request.email
        )
        self.users[user_id] = user
        
        # We can also publish events without expecting a return value
        await self.event_bus.publish("user.created", user)

        # Return a response that will be received by the Address listener
        return CreateUserResponse(user=user)
    
    # Messages can hold any value, including builtins.
    async def get_user(self, message: Message[uuid.UUID]) -> Optional[GetUserResponse]:
        requested_user_id = message.body
        if found_user := self.users.get(user_id) is not None:
            return GetUserResponse(user=found_user)
        else:
            # You can return None too (no need to wrap it in a custom object)
            return None


async def main():
    # Create an event bus
    event_bus = EventBus()
    
    # Create service
    user_service = UserService(event_bus)
    
    # Register event listener
    @event_bus.on("user.created")
    async def on_user_created(user: User):
        print(f"User created: {user.username}")
    
    # Send a request to the handler for the CREATE_USER address
    response = await event_bus.request(
        Address.CREATE_USER,
        CreateUserRequest(username="john", email="john@example.com")
    )
    
    # Likewise, call the handler of the GET_USER address
    user = await event_bus.request(
        Address.GET_USER,
        response.id
    )
    
    print(f"Retrieved user: {user.username}")
```


## Key Concepts

### Request-Response Pattern

TinyBus implements an address-based messaging system where consumers register handlers for specific addresses. When a request is made to an address, the corresponding handler processes it and returns a response

```python
# Register the consumer for a given address.
# Addresses can be strings too although we recommend using Enums for readability
@event_bus.consumer("greeting")
async def handle_greeting(msg: Message[str]) -> str:
    return f"Hello, {msg.body}!"

# Send a request
response = await event_bus.request("greeting", "World")
print(response) # Prints: Hello, World!
```

### Publish-Subscribe Pattern

The event bus also supports event-based communication where multiple listeners can subscribe to events where you do not care about the result or what happens when it is delivered.

```python
# Register listeners using the .on annotation
@event_bus.on("user.created")
async def notify_admin(user: User):
    print(f"New user registered: {user.email}")

# or the .on method directly
async def send_welcome_email(user: User):
    print(f"Sending welcome email to {user.email}")
event_bus.on("user.created", send_welcome_email)

# Publish an event
await event_bus.publish("user.created", user)
# > "New user registered: john@example.com"
```


## Contributing

Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.

## License

TinyBus is MIT licensed. See the [LICENSE](LICENSE) file for details.
            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "tinybus",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.12",
    "maintainer_email": null,
    "keywords": "event bus, event driven design, asynchronous, microservices",
    "author": null,
    "author_email": "Ram\u00f3n Vila Ferreres <ramonvilafer@proton.me>",
    "download_url": "https://files.pythonhosted.org/packages/3b/8e/5b968cb1be035b23ad762b22d720bb02f3f89476cdc04b40e8766ea1c724/tinybus-0.2.2.tar.gz",
    "platform": null,
    "description": "# \ud83d\ude8c TinyBus\n\nA modern, async-first Event Bus for Python 3.12+ inspired by Eclipse Vert.x. TinyBus provides a clean, type-safe way to implement event-driven architectures with both request-response and publish-subscribe patterns.\n\n## Why\nBecause I've spent quite a lot of time evaluating event bus libraries but none seemed to support the async Request-Response pattern I needed. Some alternatives like [Ethereum's Lahja](https://github.com/ethereum/lahja) or [lightbus](https://github.com/adamcharnock/lightbus) seem too heavy and complex with lots of features I don't need (inter-process communication, queues, RPC...)\n\nThe Request-Response Events pattern is quite useful when growing an async backend with lots of internal messages being passed around (using the Actor [Model terminology](https://doc.akka.io/libraries/akka-core/current/typed/guide/actors-intro.html), method calls are considered messages) as it allows to decouple caller and callee very transparently and fits nicely in the asynchronous \"mental model\".\n\n## Installation\n```bash\nuv add tinybus\n```\n\n## Quick Example\n\nHere's a simple user service implementation showcasing TinyBus's main features\n\n```python\nimport uuid\n\nfrom enum import Enum\nfrom typing import Optional\n\nfrom pydantic import BaseModel\nfrom tinybus import EventBus, Message\n\n# First, we must define our addresses.\nclass Address(str, Enum):\n    CREATE_USER = \"create_user\"\n    GET_USER = \"get_user\"\n\n# Then, the clases that hold the data being sent around\nclass User(BaseModel):\n    user_id: uuid.UUID\n    username: str\n    email: str\n\nclass CreateUserRequest(BaseModel):\n    username: str\n    email: str\n\nclass CreateUserResponse(BaseModel):\n    user: User\n\nclass GetUserResponse(BaseModel):\n    user: User\n\n# This service is subscribed to all updates sent to the CREATE_USER and GET_USER\n# addresses and will run the appropriate methods when a valid request is sent.\nclass UserService:\n    def __init__(self, event_bus: EventBus):\n        self._event_bus = event_bus\n\n        # A fake data layer\n        self._users: dict[uuid.UUID, User] = {}\n        \n        # Register handlers\n        # If these methods are not used directly anywhere outside this class, you'll likely\n        # want to make them private (self._create_user)\n        self._event_bus.consumer(Address.CREATE_USER, self.create_user)\n        self._event_bus.consumer(Address.GET_USER, self.get_user)\n    \n    async def create_user(self, message: Message[CreateUserRequest]) -> CreateUserResponse:\n        # A Message is fairly simple, it has a header and a body.\n        # Most of the time, you'll only be using the body because it contains the actual data you'll use.\n        request = message.body\n\n        # In the real world, you'll use a data layer to create your user (likely an async operation)\n        user_id = uuid.uuid4()\n        user = User(\n            id=user_id,\n            username=request.username,\n            email=request.email\n        )\n        self.users[user_id] = user\n        \n        # We can also publish events without expecting a return value\n        await self.event_bus.publish(\"user.created\", user)\n\n        # Return a response that will be received by the Address listener\n        return CreateUserResponse(user=user)\n    \n    # Messages can hold any value, including builtins.\n    async def get_user(self, message: Message[uuid.UUID]) -> Optional[GetUserResponse]:\n        requested_user_id = message.body\n        if found_user := self.users.get(user_id) is not None:\n            return GetUserResponse(user=found_user)\n        else:\n            # You can return None too (no need to wrap it in a custom object)\n            return None\n\n\nasync def main():\n    # Create an event bus\n    event_bus = EventBus()\n    \n    # Create service\n    user_service = UserService(event_bus)\n    \n    # Register event listener\n    @event_bus.on(\"user.created\")\n    async def on_user_created(user: User):\n        print(f\"User created: {user.username}\")\n    \n    # Send a request to the handler for the CREATE_USER address\n    response = await event_bus.request(\n        Address.CREATE_USER,\n        CreateUserRequest(username=\"john\", email=\"john@example.com\")\n    )\n    \n    # Likewise, call the handler of the GET_USER address\n    user = await event_bus.request(\n        Address.GET_USER,\n        response.id\n    )\n    \n    print(f\"Retrieved user: {user.username}\")\n```\n\n\n## Key Concepts\n\n### Request-Response Pattern\n\nTinyBus implements an address-based messaging system where consumers register handlers for specific addresses. When a request is made to an address, the corresponding handler processes it and returns a response\n\n```python\n# Register the consumer for a given address.\n# Addresses can be strings too although we recommend using Enums for readability\n@event_bus.consumer(\"greeting\")\nasync def handle_greeting(msg: Message[str]) -> str:\n    return f\"Hello, {msg.body}!\"\n\n# Send a request\nresponse = await event_bus.request(\"greeting\", \"World\")\nprint(response) # Prints: Hello, World!\n```\n\n### Publish-Subscribe Pattern\n\nThe event bus also supports event-based communication where multiple listeners can subscribe to events where you do not care about the result or what happens when it is delivered.\n\n```python\n# Register listeners using the .on annotation\n@event_bus.on(\"user.created\")\nasync def notify_admin(user: User):\n    print(f\"New user registered: {user.email}\")\n\n# or the .on method directly\nasync def send_welcome_email(user: User):\n    print(f\"Sending welcome email to {user.email}\")\nevent_bus.on(\"user.created\", send_welcome_email)\n\n# Publish an event\nawait event_bus.publish(\"user.created\", user)\n# > \"New user registered: john@example.com\"\n```\n\n\n## Contributing\n\nContributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.\n\n## License\n\nTinyBus is MIT licensed. See the [LICENSE](LICENSE) file for details.",
    "bugtrack_url": null,
    "license": null,
    "summary": "A modern, minimal event bus for python inspired by Eclipse Vert.x",
    "version": "0.2.2",
    "project_urls": {
        "Repository": "https://github.com/rmonvfer/tinybus"
    },
    "split_keywords": [
        "event bus",
        " event driven design",
        " asynchronous",
        " microservices"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "4f7a5e0386a46c8c83dbc88fbcd70f36187e6ebb0ed6234168313300a00fb05c",
                "md5": "eeaab3119848a2d0493391bde62d7836",
                "sha256": "488679176cc84ae3c6409eeabee289d2b683e85005796112dbcf95a076149035"
            },
            "downloads": -1,
            "filename": "tinybus-0.2.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "eeaab3119848a2d0493391bde62d7836",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.12",
            "size": 10759,
            "upload_time": "2024-12-06T18:57:07",
            "upload_time_iso_8601": "2024-12-06T18:57:07.666194Z",
            "url": "https://files.pythonhosted.org/packages/4f/7a/5e0386a46c8c83dbc88fbcd70f36187e6ebb0ed6234168313300a00fb05c/tinybus-0.2.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "3b8e5b968cb1be035b23ad762b22d720bb02f3f89476cdc04b40e8766ea1c724",
                "md5": "c05415483c3c4bceba662e4330c59462",
                "sha256": "9baaff218833914c088cc3a0c23dfd121465ffed89426c1377e61839631c11a5"
            },
            "downloads": -1,
            "filename": "tinybus-0.2.2.tar.gz",
            "has_sig": false,
            "md5_digest": "c05415483c3c4bceba662e4330c59462",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.12",
            "size": 13337,
            "upload_time": "2024-12-06T18:57:08",
            "upload_time_iso_8601": "2024-12-06T18:57:08.975921Z",
            "url": "https://files.pythonhosted.org/packages/3b/8e/5b968cb1be035b23ad762b22d720bb02f3f89476cdc04b40e8766ea1c724/tinybus-0.2.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-12-06 18:57:08",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "rmonvfer",
    "github_project": "tinybus",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "tinybus"
}
        
Elapsed time: 0.56223s