Name | python-inbound JSON |
Version |
1.0.7
JSON |
| download |
home_page | None |
Summary | An Event Bus framework for event driven systems. |
upload_time | 2024-12-09 07:42:57 |
maintainer | None |
docs_url | None |
author | None |
requires_python | <4.0,>=3.10 |
license | MIT |
keywords |
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# Inbound
![PyPI - Python Version](https://img.shields.io/pypi/pyversions/python-inbound?style=flat-square)
Inbound is an asyncio framework for building event-driven systems in Python. It provides a seamless interface for working with different Message Brokers, including In-memory, RabbitMQ, and Kafka.
## Features
- **Asynchronous Operations**: Build scalable and responsive systems with asyncio.
- Multiple Broker Support: Easily switch between In-memory, RabbitMQ, and soon to be more.
- **Elegant API**: Subscribe to channels, publish events, and listen to event streams with a simple and intuitive interface.
- **Serialization Flexibility**: Choose between serializers like msgpack and json based on your requirements or bring your own.
- **Streamlined Callbacks API**: Implement event-based systems quickly and effectively.
## Quick Start
### Installation
```bash
pip install python-inbound
```
### Basic Usage
```python
import asyncio
from inbound import EventBus
async def consume(bus: EventBus):
async with bus.subscribe("test-channel") as stream:
async for envelope in stream:
# Get the event from the envelope
event = envelope.event
print(event)
# After processing the event, acknowledge it
await envelope.ack()
# Unless you want to reject the event
# await envelope.nack()
async def produce(bus: EventBus):
coros = [
bus.publish(
channel="test-channel",
type="test-event",
data={"message": f"Hello World {i}"},
)
for i in range(20)
]
await asyncio.gather(*coros)
async def main():
event_bus = EventBus()
tasks = [asyncio.create_task(consume(event_bus)), asyncio.create_task(produce(event_bus))]
async with event_bus:
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
if __name__ == "__main__":
asyncio.run(main())
```
### Using Callbacks API
```python
import asyncio
from inbound import CallbackGroup, Event, broker_from_url
group = CallbackGroup()
@group.callback("test-channel", "test-event")
def callback(event: Event):
# Note: Envelopes are automatically acknowledged
# for callbacks
print(event)
async def main():
event_bus = EventBus(broker=broker_from_url("memory://?serializer=msgpack"))
event_bus.add_group(group)
async with event_bus:
for i in range(3):
await event_bus.publish(
channel="test-channel",
type="test-event",
data={"message": f"Hello World {i}"},
)
await asyncio.sleep(0.01)
await event_bus.wait_until_finished()
if __name__ == "__main__":
asyncio.run(main())
```
You can find more examples in the [examples](examples/) directory.
## Documentation
For comprehensive documentation, including advanced features and configurations, refer to Link to Documentation.
## License
This project is licensed under MIT License.
## Support & Feedback
If you encounter any issues or have feedback, please open an issue. We'd love to hear from you!
Made with ❤️ by Emergent Methods
Raw data
{
"_id": null,
"home_page": null,
"name": "python-inbound",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.10",
"maintainer_email": null,
"keywords": null,
"author": null,
"author_email": "Tim Pogue <tim@emergentmethods.ai>",
"download_url": null,
"platform": null,
"description": "# Inbound\n\n![PyPI - Python Version](https://img.shields.io/pypi/pyversions/python-inbound?style=flat-square)\n\nInbound is an asyncio framework for building event-driven systems in Python. It provides a seamless interface for working with different Message Brokers, including In-memory, RabbitMQ, and Kafka.\n\n## Features\n\n- **Asynchronous Operations**: Build scalable and responsive systems with asyncio.\n- Multiple Broker Support: Easily switch between In-memory, RabbitMQ, and soon to be more.\n- **Elegant API**: Subscribe to channels, publish events, and listen to event streams with a simple and intuitive interface.\n- **Serialization Flexibility**: Choose between serializers like msgpack and json based on your requirements or bring your own.\n- **Streamlined Callbacks API**: Implement event-based systems quickly and effectively.\n\n## Quick Start\n\n### Installation\n\n```bash\npip install python-inbound\n```\n\n### Basic Usage\n\n```python\nimport asyncio\nfrom inbound import EventBus\n\nasync def consume(bus: EventBus):\n async with bus.subscribe(\"test-channel\") as stream:\n async for envelope in stream:\n # Get the event from the envelope\n event = envelope.event\n print(event)\n # After processing the event, acknowledge it\n await envelope.ack()\n # Unless you want to reject the event\n # await envelope.nack()\n\n\nasync def produce(bus: EventBus):\n coros = [\n bus.publish(\n channel=\"test-channel\",\n type=\"test-event\",\n data={\"message\": f\"Hello World {i}\"},\n )\n for i in range(20)\n ]\n await asyncio.gather(*coros)\n\nasync def main():\n event_bus = EventBus()\n\n tasks = [asyncio.create_task(consume(event_bus)), asyncio.create_task(produce(event_bus))]\n async with event_bus:\n await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\n### Using Callbacks API\n\n```python\nimport asyncio\nfrom inbound import CallbackGroup, Event, broker_from_url\n\ngroup = CallbackGroup()\n\n@group.callback(\"test-channel\", \"test-event\")\ndef callback(event: Event):\n # Note: Envelopes are automatically acknowledged\n # for callbacks\n print(event)\n\nasync def main():\n event_bus = EventBus(broker=broker_from_url(\"memory://?serializer=msgpack\"))\n event_bus.add_group(group)\n\n async with event_bus:\n for i in range(3):\n await event_bus.publish(\n channel=\"test-channel\",\n type=\"test-event\",\n data={\"message\": f\"Hello World {i}\"},\n )\n await asyncio.sleep(0.01)\n await event_bus.wait_until_finished()\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\nYou can find more examples in the [examples](examples/) directory.\n\n## Documentation\nFor comprehensive documentation, including advanced features and configurations, refer to Link to Documentation.\n\n## License\nThis project is licensed under MIT License.\n\n## Support & Feedback\nIf you encounter any issues or have feedback, please open an issue. We'd love to hear from you!\n\nMade with \u2764\ufe0f by Emergent Methods\n\n\n\n\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "An Event Bus framework for event driven systems.",
"version": "1.0.7",
"project_urls": {
"repository": "https://gitlab.com/emergentmethods/python-inbound"
},
"split_keywords": [],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "de8d18a928650e4e389e98ebb14e3ff11af60eb36c1275328bd1a742dd7b1555",
"md5": "d5f7f259e8ee637bd3ab91ea43e9a9e0",
"sha256": "b2375aab4080c9fd7da809b8c1eaf66f6edd41be4ec164e56ba2fcf6ff79043f"
},
"downloads": -1,
"filename": "python_inbound-1.0.7-py3-none-any.whl",
"has_sig": false,
"md5_digest": "d5f7f259e8ee637bd3ab91ea43e9a9e0",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.10",
"size": 17874,
"upload_time": "2024-12-09T07:42:57",
"upload_time_iso_8601": "2024-12-09T07:42:57.067002Z",
"url": "https://files.pythonhosted.org/packages/de/8d/18a928650e4e389e98ebb14e3ff11af60eb36c1275328bd1a742dd7b1555/python_inbound-1.0.7-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-12-09 07:42:57",
"github": false,
"gitlab": true,
"bitbucket": false,
"codeberg": false,
"gitlab_user": "emergentmethods",
"gitlab_project": "python-inbound",
"lcname": "python-inbound"
}