<p align="center">
<img src="https://raw.githubusercontent.com/asynq-io/eventiq/main/assets/logo.svg" style="width: 250px">
</p>
<p align="center">
<em>Asyncio native pub/sub framework for Python</em>
</p>
![Tests](https://github.com/asynq-io/eventiq/workflows/Tests/badge.svg)
![Build](https://github.com/asynq-io/eventiq/workflows/Publish/badge.svg)
![License](https://img.shields.io/github/license/asynq-io/eventiq)
![Mypy](https://img.shields.io/badge/mypy-checked-blue)
[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/charliermarsh/ruff/main/assets/badge/v1.json)](https://github.com/charliermarsh/ruff)
[![Pydantic v2](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/pydantic/pydantic/main/docs/badge/v2.json)](https://docs.pydantic.dev/latest/contributing/#badges)
[![security: bandit](https://img.shields.io/badge/security-bandit-yellow.svg)](https://github.com/PyCQA/bandit)
![Python](https://img.shields.io/pypi/pyversions/eventiq)
![Format](https://img.shields.io/pypi/format/eventiq)
![PyPi](https://img.shields.io/pypi/v/eventiq)
## Installation
```shell
pip install eventiq
```
or
```shell
poetry add eventiq
```
### Installing optional dependencies
```shell
pip install 'eventiq[broker]'
```
### Available brokers
- `nats`
- `rabbitmq`
- `kafka`
- `redis`
## Features
- Modern, `asyncio` based python 3.8+ syntax
- Fully type annotated
- Minimal external dependencies (`anyio`, `pydantic`, `typer`)
- Automatic message parsing based on type annotations using `pydantic`
- Code hot-reload
- Highly scalable: each service can process hundreds of tasks concurrently,
all messages are load balanced between all instances by default
- Resilient - at least once delivery for all messages by default (except for Redis*)
- Customizable & pluggable message encoder/decoder (`json` as default)
- Multiple broker support
- Memory (for testing)
- Nats
- Kafka
- Rabbitmq
- Redis
- Result Backend implementation for Nats & Redis
- Lifespan protocol support
- Lightweight (and completely optional) dependency injection system based on type annotations
- Easy and lightweight (~3k lines of code including types definitions and brokers implementations)
- [Cloud Events](https://cloudevents.io/) standard as base message structure (no more python specific `*args` and `**kwargs` in messages)
- [AsyncAPI](https://www.asyncapi.com/en) documentation generation from code
- Twelve factor app approach - stdout logging, configuration through environment variables
- Easily extensible via Middlewares
- Multiple extensions and integrations including:
- Prometheus - mertics exporter
- OpenTelemetry - tracing and metrics
- Message Pack - message pack encoder for messages
- FastAPI - integrating eventiq Service with FastAPI applications (WIP)
- Dataref - data reference resolver for messages (WIP)
- Eventiq Workflows - orchestration engine built on top of eventiq (WIP)
## Basic Usage
```Python
import asyncio
from eventiq import Service, Middleware, CloudEvent, GenericConsumer
from eventiq.backends.nats import JetStreamBroker
class SendMessageMiddleware(Middleware):
async def after_broker_connect(self):
print(f"After service start, running with {service.broker}")
await asyncio.sleep(10)
for i in range(100):
message = CloudEvent(topic="test.topic", data={"counter": i})
await service.publish(message)
print("Published messages(s)")
broker = JetStreamBroker(url="nats://localhost:4222")
service = Service(
name="example-service",
broker=broker,
)
service.add_middleware(SendMessageMiddleware)
@service.subscribe(topic="test.topic")
async def example_run(message: CloudEvent):
print(f"Received Message {message.id} with data: {message.data}")
@service.subscribe(topic="test.topic2")
class MyConsumer(GenericConsumer[CloudEvent]):
async def process(self, message: CloudEvent):
print(f"Received Message {message.id} with data: {message.data}")
await self.publish(CloudEvent(topic="test.topic", data={"response": "ok"})
```
Run with
```shell
eventiq run app:service --log-level=info
```
## Watching for changes
```shell
eventiq run app:service --log-level=info --reload=.
```
## Testing
`StubBroker` class is provided as in memory replacement for running unit tests
```python
import os
def get_broker(**kwargs):
if os.getenv('ENV') == 'TEST':
from eventiq.backends.stub import StubBroker
return StubBroker()
else:
from eventiq.backends.rabbitmq import RabbitmqBroker
return RabbitmqBroker(**kwargs)
broker = get_broker()
```
Furthermore, subscribers are just regular python coroutines, so it's possible to test them simply by invocation
```python
# main.py
@service.subscribe(topic="test.topic")
async def my_subscriber(message: CloudEvent):
return 42
# tests.py
from main import my_subscriber
async def test_my_subscriber():
result = await my_subscriber(None)
assert result == 42
```
## CLI
Getting help:
```shell
eventiq --help
```
Installing shell autocompletion:
```shell
eventiq --install-completion [bash|zsh|fish|powershell|pwsh]
```
Basic commands
- `run` - run service
- `docs` - generate AsyncAPI docs
- `send` - send message to broker
Raw data
{
"_id": null,
"home_page": null,
"name": "eventiq",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": "asyncapi, asyncio, cloud-events, event-driven, framework, microservice, python",
"author": null,
"author_email": "RaRhAeu <rarha_eu@protonmail.com>",
"download_url": "https://files.pythonhosted.org/packages/ce/cb/136811bf4ced129fc811479f55cabc7ecfdd73317e29f1bb07bb301c48a6/eventiq-1.1.7.tar.gz",
"platform": null,
"description": "<p align=\"center\">\n<img src=\"https://raw.githubusercontent.com/asynq-io/eventiq/main/assets/logo.svg\" style=\"width: 250px\">\n\n</p>\n<p align=\"center\">\n<em>Asyncio native pub/sub framework for Python</em>\n</p>\n\n![Tests](https://github.com/asynq-io/eventiq/workflows/Tests/badge.svg)\n![Build](https://github.com/asynq-io/eventiq/workflows/Publish/badge.svg)\n![License](https://img.shields.io/github/license/asynq-io/eventiq)\n![Mypy](https://img.shields.io/badge/mypy-checked-blue)\n[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/charliermarsh/ruff/main/assets/badge/v1.json)](https://github.com/charliermarsh/ruff)\n[![Pydantic v2](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/pydantic/pydantic/main/docs/badge/v2.json)](https://docs.pydantic.dev/latest/contributing/#badges)\n[![security: bandit](https://img.shields.io/badge/security-bandit-yellow.svg)](https://github.com/PyCQA/bandit)\n![Python](https://img.shields.io/pypi/pyversions/eventiq)\n![Format](https://img.shields.io/pypi/format/eventiq)\n![PyPi](https://img.shields.io/pypi/v/eventiq)\n\n## Installation\n```shell\npip install eventiq\n```\nor\n```shell\npoetry add eventiq\n```\n\n### Installing optional dependencies\n\n```shell\npip install 'eventiq[broker]'\n```\n\n### Available brokers\n\n- `nats`\n- `rabbitmq`\n- `kafka`\n- `redis`\n\n## Features\n\n- Modern, `asyncio` based python 3.8+ syntax\n- Fully type annotated\n- Minimal external dependencies (`anyio`, `pydantic`, `typer`)\n- Automatic message parsing based on type annotations using `pydantic`\n- Code hot-reload\n- Highly scalable: each service can process hundreds of tasks concurrently,\n all messages are load balanced between all instances by default\n- Resilient - at least once delivery for all messages by default (except for Redis*)\n- Customizable & pluggable message encoder/decoder (`json` as default)\n- Multiple broker support\n - Memory (for testing)\n - Nats\n - Kafka\n - Rabbitmq\n - Redis\n- Result Backend implementation for Nats & Redis\n- Lifespan protocol support\n- Lightweight (and completely optional) dependency injection system based on type annotations\n- Easy and lightweight (~3k lines of code including types definitions and brokers implementations)\n- [Cloud Events](https://cloudevents.io/) standard as base message structure (no more python specific `*args` and `**kwargs` in messages)\n- [AsyncAPI](https://www.asyncapi.com/en) documentation generation from code\n- Twelve factor app approach - stdout logging, configuration through environment variables\n- Easily extensible via Middlewares\n- Multiple extensions and integrations including:\n - Prometheus - mertics exporter\n - OpenTelemetry - tracing and metrics\n - Message Pack - message pack encoder for messages\n - FastAPI - integrating eventiq Service with FastAPI applications (WIP)\n - Dataref - data reference resolver for messages (WIP)\n - Eventiq Workflows - orchestration engine built on top of eventiq (WIP)\n\n## Basic Usage\n\n```Python\nimport asyncio\nfrom eventiq import Service, Middleware, CloudEvent, GenericConsumer\nfrom eventiq.backends.nats import JetStreamBroker\n\nclass SendMessageMiddleware(Middleware):\n async def after_broker_connect(self):\n print(f\"After service start, running with {service.broker}\")\n await asyncio.sleep(10)\n for i in range(100):\n message = CloudEvent(topic=\"test.topic\", data={\"counter\": i})\n await service.publish(message)\n print(\"Published messages(s)\")\n\nbroker = JetStreamBroker(url=\"nats://localhost:4222\")\n\nservice = Service(\n name=\"example-service\",\n broker=broker,\n)\nservice.add_middleware(SendMessageMiddleware)\n\n@service.subscribe(topic=\"test.topic\")\nasync def example_run(message: CloudEvent):\n print(f\"Received Message {message.id} with data: {message.data}\")\n\n@service.subscribe(topic=\"test.topic2\")\nclass MyConsumer(GenericConsumer[CloudEvent]):\n async def process(self, message: CloudEvent):\n print(f\"Received Message {message.id} with data: {message.data}\")\n await self.publish(CloudEvent(topic=\"test.topic\", data={\"response\": \"ok\"})\n\n```\n\nRun with\n\n```shell\neventiq run app:service --log-level=info\n```\n\n## Watching for changes\n\n```shell\neventiq run app:service --log-level=info --reload=.\n```\n\n## Testing\n\n`StubBroker` class is provided as in memory replacement for running unit tests\n\n```python\nimport os\n\n\ndef get_broker(**kwargs):\n if os.getenv('ENV') == 'TEST':\n from eventiq.backends.stub import StubBroker\n return StubBroker()\n else:\n from eventiq.backends.rabbitmq import RabbitmqBroker\n return RabbitmqBroker(**kwargs)\n\nbroker = get_broker()\n\n```\n\nFurthermore, subscribers are just regular python coroutines, so it's possible to test them simply by invocation\n\n```python\n\n# main.py\n@service.subscribe(topic=\"test.topic\")\nasync def my_subscriber(message: CloudEvent):\n return 42\n\n# tests.py\nfrom main import my_subscriber\n\nasync def test_my_subscriber():\n result = await my_subscriber(None)\n assert result == 42\n\n```\n\n## CLI\n\nGetting help:\n```shell\neventiq --help\n```\n\nInstalling shell autocompletion:\n```shell\neventiq --install-completion [bash|zsh|fish|powershell|pwsh]\n```\n\nBasic commands\n\n- `run` - run service\n- `docs` - generate AsyncAPI docs\n- `send` - send message to broker\n",
"bugtrack_url": null,
"license": null,
"summary": "Publish/Subscribe asyncio framework for Python",
"version": "1.1.7",
"project_urls": {
"Documentation": "https://github.com/asynq-io/eventiq#readme",
"Issues": "https://github.com/asynq-io/eventiq/issues",
"Source": "https://github.com/asynq-io/eventiq"
},
"split_keywords": [
"asyncapi",
" asyncio",
" cloud-events",
" event-driven",
" framework",
" microservice",
" python"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "357d9f132559677bb44aaf59f599a38739b1c9241c191944a2d3c565f659a59a",
"md5": "8efd98815ded7bd4a80c45844d1c7da4",
"sha256": "38fd376451c8b9c5778bdb422b9189bb58eb0c51d0010eb0714d6e460c24f59e"
},
"downloads": -1,
"filename": "eventiq-1.1.7-py3-none-any.whl",
"has_sig": false,
"md5_digest": "8efd98815ded7bd4a80c45844d1c7da4",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 45148,
"upload_time": "2024-10-17T21:20:15",
"upload_time_iso_8601": "2024-10-17T21:20:15.241182Z",
"url": "https://files.pythonhosted.org/packages/35/7d/9f132559677bb44aaf59f599a38739b1c9241c191944a2d3c565f659a59a/eventiq-1.1.7-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "cecb136811bf4ced129fc811479f55cabc7ecfdd73317e29f1bb07bb301c48a6",
"md5": "51e21d45ee3283d3085493c1b763b08c",
"sha256": "b50778cd92d6ee7eaa016f0b4b5b3c7ba864781c81ba59e4055e9e7212ef8827"
},
"downloads": -1,
"filename": "eventiq-1.1.7.tar.gz",
"has_sig": false,
"md5_digest": "51e21d45ee3283d3085493c1b763b08c",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 37761,
"upload_time": "2024-10-17T21:20:16",
"upload_time_iso_8601": "2024-10-17T21:20:16.987720Z",
"url": "https://files.pythonhosted.org/packages/ce/cb/136811bf4ced129fc811479f55cabc7ecfdd73317e29f1bb07bb301c48a6/eventiq-1.1.7.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-10-17 21:20:16",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "asynq-io",
"github_project": "eventiq#readme",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "eventiq"
}