# 🚌 TinyBus
A modern, async-first Event Bus for Python 3.12+ inspired by Eclipse Vert.x. TinyBus provides a clean, type-safe way to implement event-driven architectures with both request-response and publish-subscribe patterns.
## Why
Because I've spent quite a lot of time evaluating event bus libraries but none seemed to support the async Request-Response pattern I needed. Some alternatives like [Ethereum's Lahja](https://github.com/ethereum/lahja) or [lightbus](https://github.com/adamcharnock/lightbus) seem too heavy and complex with lots of features I don't need (inter-process communication, queues, RPC...)
The Request-Response Events pattern is quite useful when growing an async backend with lots of internal messages being passed around (using the Actor [Model terminology](https://doc.akka.io/libraries/akka-core/current/typed/guide/actors-intro.html), method calls are considered messages) as it allows to decouple caller and callee very transparently and fits nicely in the asynchronous "mental model".
## Installation
```bash
uv add tinybus
```
## Quick Example
Here's a simple user service implementation showcasing TinyBus's main features
```python
import uuid
from enum import Enum
from typing import Optional
from pydantic import BaseModel
from tinybus import EventBus, Message
# First, we must define our addresses.
class Address(str, Enum):
CREATE_USER = "create_user"
GET_USER = "get_user"
# Then, the clases that hold the data being sent around
class User(BaseModel):
user_id: uuid.UUID
username: str
email: str
class CreateUserRequest(BaseModel):
username: str
email: str
class CreateUserResponse(BaseModel):
user: User
class GetUserResponse(BaseModel):
user: User
# This service is subscribed to all updates sent to the CREATE_USER and GET_USER
# addresses and will run the appropriate methods when a valid request is sent.
class UserService:
def __init__(self, event_bus: EventBus):
self._event_bus = event_bus
# A fake data layer
self._users: dict[uuid.UUID, User] = {}
# Register handlers
# If these methods are not used directly anywhere outside this class, you'll likely
# want to make them private (self._create_user)
self._event_bus.consumer(Address.CREATE_USER, self.create_user)
self._event_bus.consumer(Address.GET_USER, self.get_user)
async def create_user(self, message: Message[CreateUserRequest]) -> CreateUserResponse:
# A Message is fairly simple, it has a header and a body.
# Most of the time, you'll only be using the body because it contains the actual data you'll use.
request = message.body
# In the real world, you'll use a data layer to create your user (likely an async operation)
user_id = uuid.uuid4()
user = User(
id=user_id,
username=request.username,
email=request.email
)
self.users[user_id] = user
# We can also publish events without expecting a return value
await self.event_bus.publish("user.created", user)
# Return a response that will be received by the Address listener
return CreateUserResponse(user=user)
# Messages can hold any value, including builtins.
async def get_user(self, message: Message[uuid.UUID]) -> Optional[GetUserResponse]:
requested_user_id = message.body
if found_user := self.users.get(user_id) is not None:
return GetUserResponse(user=found_user)
else:
# You can return None too (no need to wrap it in a custom object)
return None
async def main():
# Create an event bus
event_bus = EventBus()
# Create service
user_service = UserService(event_bus)
# Register event listener
@event_bus.on("user.created")
async def on_user_created(user: User):
print(f"User created: {user.username}")
# Send a request to the handler for the CREATE_USER address
response = await event_bus.request(
Address.CREATE_USER,
CreateUserRequest(username="john", email="john@example.com")
)
# Likewise, call the handler of the GET_USER address
user = await event_bus.request(
Address.GET_USER,
response.id
)
print(f"Retrieved user: {user.username}")
```
## Key Concepts
### Request-Response Pattern
TinyBus implements an address-based messaging system where consumers register handlers for specific addresses. When a request is made to an address, the corresponding handler processes it and returns a response
```python
# Register the consumer for a given address.
# Addresses can be strings too although we recommend using Enums for readability
@event_bus.consumer("greeting")
async def handle_greeting(msg: Message[str]) -> str:
return f"Hello, {msg.body}!"
# Send a request
response = await event_bus.request("greeting", "World")
print(response) # Prints: Hello, World!
```
### Publish-Subscribe Pattern
The event bus also supports event-based communication where multiple listeners can subscribe to events where you do not care about the result or what happens when it is delivered.
```python
# Register listeners using the .on annotation
@event_bus.on("user.created")
async def notify_admin(user: User):
print(f"New user registered: {user.email}")
# or the .on method directly
async def send_welcome_email(user: User):
print(f"Sending welcome email to {user.email}")
event_bus.on("user.created", send_welcome_email)
# Publish an event
await event_bus.publish("user.created", user)
# > "New user registered: john@example.com"
```
## Contributing
Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
## License
TinyBus is MIT licensed. See the [LICENSE](LICENSE) file for details.
Raw data
{
"_id": null,
"home_page": null,
"name": "tinybus",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.12",
"maintainer_email": null,
"keywords": "event bus, event driven design, asynchronous, microservices",
"author": null,
"author_email": "Ram\u00f3n Vila Ferreres <ramonvilafer@proton.me>",
"download_url": "https://files.pythonhosted.org/packages/3b/8e/5b968cb1be035b23ad762b22d720bb02f3f89476cdc04b40e8766ea1c724/tinybus-0.2.2.tar.gz",
"platform": null,
"description": "# \ud83d\ude8c TinyBus\n\nA modern, async-first Event Bus for Python 3.12+ inspired by Eclipse Vert.x. TinyBus provides a clean, type-safe way to implement event-driven architectures with both request-response and publish-subscribe patterns.\n\n## Why\nBecause I've spent quite a lot of time evaluating event bus libraries but none seemed to support the async Request-Response pattern I needed. Some alternatives like [Ethereum's Lahja](https://github.com/ethereum/lahja) or [lightbus](https://github.com/adamcharnock/lightbus) seem too heavy and complex with lots of features I don't need (inter-process communication, queues, RPC...)\n\nThe Request-Response Events pattern is quite useful when growing an async backend with lots of internal messages being passed around (using the Actor [Model terminology](https://doc.akka.io/libraries/akka-core/current/typed/guide/actors-intro.html), method calls are considered messages) as it allows to decouple caller and callee very transparently and fits nicely in the asynchronous \"mental model\".\n\n## Installation\n```bash\nuv add tinybus\n```\n\n## Quick Example\n\nHere's a simple user service implementation showcasing TinyBus's main features\n\n```python\nimport uuid\n\nfrom enum import Enum\nfrom typing import Optional\n\nfrom pydantic import BaseModel\nfrom tinybus import EventBus, Message\n\n# First, we must define our addresses.\nclass Address(str, Enum):\n CREATE_USER = \"create_user\"\n GET_USER = \"get_user\"\n\n# Then, the clases that hold the data being sent around\nclass User(BaseModel):\n user_id: uuid.UUID\n username: str\n email: str\n\nclass CreateUserRequest(BaseModel):\n username: str\n email: str\n\nclass CreateUserResponse(BaseModel):\n user: User\n\nclass GetUserResponse(BaseModel):\n user: User\n\n# This service is subscribed to all updates sent to the CREATE_USER and GET_USER\n# addresses and will run the appropriate methods when a valid request is sent.\nclass UserService:\n def __init__(self, event_bus: EventBus):\n self._event_bus = event_bus\n\n # A fake data layer\n self._users: dict[uuid.UUID, User] = {}\n \n # Register handlers\n # If these methods are not used directly anywhere outside this class, you'll likely\n # want to make them private (self._create_user)\n self._event_bus.consumer(Address.CREATE_USER, self.create_user)\n self._event_bus.consumer(Address.GET_USER, self.get_user)\n \n async def create_user(self, message: Message[CreateUserRequest]) -> CreateUserResponse:\n # A Message is fairly simple, it has a header and a body.\n # Most of the time, you'll only be using the body because it contains the actual data you'll use.\n request = message.body\n\n # In the real world, you'll use a data layer to create your user (likely an async operation)\n user_id = uuid.uuid4()\n user = User(\n id=user_id,\n username=request.username,\n email=request.email\n )\n self.users[user_id] = user\n \n # We can also publish events without expecting a return value\n await self.event_bus.publish(\"user.created\", user)\n\n # Return a response that will be received by the Address listener\n return CreateUserResponse(user=user)\n \n # Messages can hold any value, including builtins.\n async def get_user(self, message: Message[uuid.UUID]) -> Optional[GetUserResponse]:\n requested_user_id = message.body\n if found_user := self.users.get(user_id) is not None:\n return GetUserResponse(user=found_user)\n else:\n # You can return None too (no need to wrap it in a custom object)\n return None\n\n\nasync def main():\n # Create an event bus\n event_bus = EventBus()\n \n # Create service\n user_service = UserService(event_bus)\n \n # Register event listener\n @event_bus.on(\"user.created\")\n async def on_user_created(user: User):\n print(f\"User created: {user.username}\")\n \n # Send a request to the handler for the CREATE_USER address\n response = await event_bus.request(\n Address.CREATE_USER,\n CreateUserRequest(username=\"john\", email=\"john@example.com\")\n )\n \n # Likewise, call the handler of the GET_USER address\n user = await event_bus.request(\n Address.GET_USER,\n response.id\n )\n \n print(f\"Retrieved user: {user.username}\")\n```\n\n\n## Key Concepts\n\n### Request-Response Pattern\n\nTinyBus implements an address-based messaging system where consumers register handlers for specific addresses. When a request is made to an address, the corresponding handler processes it and returns a response\n\n```python\n# Register the consumer for a given address.\n# Addresses can be strings too although we recommend using Enums for readability\n@event_bus.consumer(\"greeting\")\nasync def handle_greeting(msg: Message[str]) -> str:\n return f\"Hello, {msg.body}!\"\n\n# Send a request\nresponse = await event_bus.request(\"greeting\", \"World\")\nprint(response) # Prints: Hello, World!\n```\n\n### Publish-Subscribe Pattern\n\nThe event bus also supports event-based communication where multiple listeners can subscribe to events where you do not care about the result or what happens when it is delivered.\n\n```python\n# Register listeners using the .on annotation\n@event_bus.on(\"user.created\")\nasync def notify_admin(user: User):\n print(f\"New user registered: {user.email}\")\n\n# or the .on method directly\nasync def send_welcome_email(user: User):\n print(f\"Sending welcome email to {user.email}\")\nevent_bus.on(\"user.created\", send_welcome_email)\n\n# Publish an event\nawait event_bus.publish(\"user.created\", user)\n# > \"New user registered: john@example.com\"\n```\n\n\n## Contributing\n\nContributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.\n\n## License\n\nTinyBus is MIT licensed. See the [LICENSE](LICENSE) file for details.",
"bugtrack_url": null,
"license": null,
"summary": "A modern, minimal event bus for python inspired by Eclipse Vert.x",
"version": "0.2.2",
"project_urls": {
"Repository": "https://github.com/rmonvfer/tinybus"
},
"split_keywords": [
"event bus",
" event driven design",
" asynchronous",
" microservices"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "4f7a5e0386a46c8c83dbc88fbcd70f36187e6ebb0ed6234168313300a00fb05c",
"md5": "eeaab3119848a2d0493391bde62d7836",
"sha256": "488679176cc84ae3c6409eeabee289d2b683e85005796112dbcf95a076149035"
},
"downloads": -1,
"filename": "tinybus-0.2.2-py3-none-any.whl",
"has_sig": false,
"md5_digest": "eeaab3119848a2d0493391bde62d7836",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.12",
"size": 10759,
"upload_time": "2024-12-06T18:57:07",
"upload_time_iso_8601": "2024-12-06T18:57:07.666194Z",
"url": "https://files.pythonhosted.org/packages/4f/7a/5e0386a46c8c83dbc88fbcd70f36187e6ebb0ed6234168313300a00fb05c/tinybus-0.2.2-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "3b8e5b968cb1be035b23ad762b22d720bb02f3f89476cdc04b40e8766ea1c724",
"md5": "c05415483c3c4bceba662e4330c59462",
"sha256": "9baaff218833914c088cc3a0c23dfd121465ffed89426c1377e61839631c11a5"
},
"downloads": -1,
"filename": "tinybus-0.2.2.tar.gz",
"has_sig": false,
"md5_digest": "c05415483c3c4bceba662e4330c59462",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.12",
"size": 13337,
"upload_time": "2024-12-06T18:57:08",
"upload_time_iso_8601": "2024-12-06T18:57:08.975921Z",
"url": "https://files.pythonhosted.org/packages/3b/8e/5b968cb1be035b23ad762b22d720bb02f3f89476cdc04b40e8766ea1c724/tinybus-0.2.2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-12-06 18:57:08",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "rmonvfer",
"github_project": "tinybus",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "tinybus"
}