<p align="center">
<img src="https://raw.githubusercontent.com/asynq-io/eventiq/main/assets/logo.svg" style="width: 250px">
</p>
<p align="center">
<em>Asyncio native pub/sub framework for Python</em>
</p>




[](https://github.com/charliermarsh/ruff)
[](https://docs.pydantic.dev/latest/contributing/#badges)
[](https://github.com/PyCQA/bandit)



## Installation
```shell
pip install eventiq
```
or
```shell
poetry add eventiq
```
### Installing optional dependencies
```shell
pip install 'eventiq[broker]'
```
### Available brokers
- `nats`
- `rabbitmq`
- `kafka`
- `redis`
## Features
- Modern, `asyncio` based python 3.8+ syntax
- Fully type annotated
- Minimal external dependencies (`anyio`, `pydantic`, `typer`)
- Automatic message parsing based on type annotations using `pydantic`
- Code hot-reload
- Highly scalable: each service can process hundreds of tasks concurrently,
all messages are load balanced between all instances by default
- Resilient - at least once delivery for all messages by default (except for Redis*)
- Customizable & pluggable message encoder/decoder (`json` as default)
- Multiple broker support
- Memory (for testing)
- Nats
- Kafka
- Rabbitmq
- Redis
- Result Backend implementation for Nats & Redis
- Lifespan protocol support
- Lightweight (and completely optional) dependency injection system based on type annotations
- Easy and lightweight (~3k lines of code including types definitions and brokers implementations)
- [Cloud Events](https://cloudevents.io/) standard as base message structure (no more python specific `*args` and `**kwargs` in messages)
- [AsyncAPI](https://www.asyncapi.com/en) documentation generation from code
- Twelve factor app approach - stdout logging, configuration through environment variables
- Easily extensible via Middlewares
- Multiple extensions and integrations including:
- Prometheus - mertics exporter
- OpenTelemetry - tracing and metrics
- Message Pack - message pack encoder for messages
- FastAPI - integrating eventiq Service with FastAPI applications (WIP)
- Dataref - data reference resolver for messages (WIP)
- Eventiq Workflows - orchestration engine built on top of eventiq (WIP)
## Basic Usage
```Python
import asyncio
from eventiq import Service, Middleware, CloudEvent, GenericConsumer
from eventiq.backends.nats import JetStreamBroker
class SendMessageMiddleware(Middleware):
async def after_broker_connect(self):
print(f"After service start, running with {service.broker}")
await asyncio.sleep(10)
for i in range(100):
message = CloudEvent(topic="test.topic", data={"counter": i})
await service.publish(message)
print("Published messages(s)")
broker = JetStreamBroker(url="nats://localhost:4222")
service = Service(
name="example-service",
broker=broker,
)
service.add_middleware(SendMessageMiddleware)
@service.subscribe(topic="test.topic")
async def example_run(message: CloudEvent):
print(f"Received Message {message.id} with data: {message.data}")
@service.subscribe(topic="test.topic2")
class MyConsumer(GenericConsumer[CloudEvent]):
async def process(self, message: CloudEvent):
print(f"Received Message {message.id} with data: {message.data}")
await self.publish(CloudEvent(topic="test.topic", data={"response": "ok"})
```
Run with
```shell
eventiq run app:service --log-level=info
```
## Watching for changes
```shell
eventiq run app:service --log-level=info --reload=.
```
## Testing
`StubBroker` class is provided as in memory replacement for running unit tests
```python
import os
def get_broker(**kwargs):
if os.getenv('ENV') == 'TEST':
from eventiq.backends.stub import StubBroker
return StubBroker()
else:
from eventiq.backends.rabbitmq import RabbitmqBroker
return RabbitmqBroker(**kwargs)
broker = get_broker()
```
Furthermore, subscribers are just regular python coroutines, so it's possible to test them simply by invocation
```python
# main.py
@service.subscribe(topic="test.topic")
async def my_subscriber(message: CloudEvent):
return 42
# tests.py
from main import my_subscriber
async def test_my_subscriber():
result = await my_subscriber(None)
assert result == 42
```
## CLI
Getting help:
```shell
eventiq --help
```
Installing shell autocompletion:
```shell
eventiq --install-completion [bash|zsh|fish|powershell|pwsh]
```
Basic commands
- `run` - run service
- `docs` - generate AsyncAPI docs
- `send` - send message to broker
Raw data
{
"_id": null,
"home_page": null,
"name": "eventiq",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": "asyncapi, asyncio, cloud-events, event-driven, framework, microservice, python",
"author": null,
"author_email": "RaRhAeu <rarha_eu@protonmail.com>",
"download_url": "https://files.pythonhosted.org/packages/36/ba/094dcfaf38aeee03c5e6702c8817121a7e95ff9c1d00ea5e02b0b5a571b8/eventiq-1.1.11.tar.gz",
"platform": null,
"description": "<p align=\"center\">\n<img src=\"https://raw.githubusercontent.com/asynq-io/eventiq/main/assets/logo.svg\" style=\"width: 250px\">\n\n</p>\n<p align=\"center\">\n<em>Asyncio native pub/sub framework for Python</em>\n</p>\n\n\n\n\n\n[](https://github.com/charliermarsh/ruff)\n[](https://docs.pydantic.dev/latest/contributing/#badges)\n[](https://github.com/PyCQA/bandit)\n\n\n\n\n## Installation\n```shell\npip install eventiq\n```\nor\n```shell\npoetry add eventiq\n```\n\n### Installing optional dependencies\n\n```shell\npip install 'eventiq[broker]'\n```\n\n### Available brokers\n\n- `nats`\n- `rabbitmq`\n- `kafka`\n- `redis`\n\n## Features\n\n- Modern, `asyncio` based python 3.8+ syntax\n- Fully type annotated\n- Minimal external dependencies (`anyio`, `pydantic`, `typer`)\n- Automatic message parsing based on type annotations using `pydantic`\n- Code hot-reload\n- Highly scalable: each service can process hundreds of tasks concurrently,\n all messages are load balanced between all instances by default\n- Resilient - at least once delivery for all messages by default (except for Redis*)\n- Customizable & pluggable message encoder/decoder (`json` as default)\n- Multiple broker support\n - Memory (for testing)\n - Nats\n - Kafka\n - Rabbitmq\n - Redis\n- Result Backend implementation for Nats & Redis\n- Lifespan protocol support\n- Lightweight (and completely optional) dependency injection system based on type annotations\n- Easy and lightweight (~3k lines of code including types definitions and brokers implementations)\n- [Cloud Events](https://cloudevents.io/) standard as base message structure (no more python specific `*args` and `**kwargs` in messages)\n- [AsyncAPI](https://www.asyncapi.com/en) documentation generation from code\n- Twelve factor app approach - stdout logging, configuration through environment variables\n- Easily extensible via Middlewares\n- Multiple extensions and integrations including:\n - Prometheus - mertics exporter\n - OpenTelemetry - tracing and metrics\n - Message Pack - message pack encoder for messages\n - FastAPI - integrating eventiq Service with FastAPI applications (WIP)\n - Dataref - data reference resolver for messages (WIP)\n - Eventiq Workflows - orchestration engine built on top of eventiq (WIP)\n\n## Basic Usage\n\n```Python\nimport asyncio\nfrom eventiq import Service, Middleware, CloudEvent, GenericConsumer\nfrom eventiq.backends.nats import JetStreamBroker\n\nclass SendMessageMiddleware(Middleware):\n async def after_broker_connect(self):\n print(f\"After service start, running with {service.broker}\")\n await asyncio.sleep(10)\n for i in range(100):\n message = CloudEvent(topic=\"test.topic\", data={\"counter\": i})\n await service.publish(message)\n print(\"Published messages(s)\")\n\nbroker = JetStreamBroker(url=\"nats://localhost:4222\")\n\nservice = Service(\n name=\"example-service\",\n broker=broker,\n)\nservice.add_middleware(SendMessageMiddleware)\n\n@service.subscribe(topic=\"test.topic\")\nasync def example_run(message: CloudEvent):\n print(f\"Received Message {message.id} with data: {message.data}\")\n\n@service.subscribe(topic=\"test.topic2\")\nclass MyConsumer(GenericConsumer[CloudEvent]):\n async def process(self, message: CloudEvent):\n print(f\"Received Message {message.id} with data: {message.data}\")\n await self.publish(CloudEvent(topic=\"test.topic\", data={\"response\": \"ok\"})\n\n```\n\nRun with\n\n```shell\neventiq run app:service --log-level=info\n```\n\n## Watching for changes\n\n```shell\neventiq run app:service --log-level=info --reload=.\n```\n\n## Testing\n\n`StubBroker` class is provided as in memory replacement for running unit tests\n\n```python\nimport os\n\n\ndef get_broker(**kwargs):\n if os.getenv('ENV') == 'TEST':\n from eventiq.backends.stub import StubBroker\n return StubBroker()\n else:\n from eventiq.backends.rabbitmq import RabbitmqBroker\n return RabbitmqBroker(**kwargs)\n\nbroker = get_broker()\n\n```\n\nFurthermore, subscribers are just regular python coroutines, so it's possible to test them simply by invocation\n\n```python\n\n# main.py\n@service.subscribe(topic=\"test.topic\")\nasync def my_subscriber(message: CloudEvent):\n return 42\n\n# tests.py\nfrom main import my_subscriber\n\nasync def test_my_subscriber():\n result = await my_subscriber(None)\n assert result == 42\n\n```\n\n## CLI\n\nGetting help:\n```shell\neventiq --help\n```\n\nInstalling shell autocompletion:\n```shell\neventiq --install-completion [bash|zsh|fish|powershell|pwsh]\n```\n\nBasic commands\n\n- `run` - run service\n- `docs` - generate AsyncAPI docs\n- `send` - send message to broker\n",
"bugtrack_url": null,
"license": null,
"summary": "Publish/Subscribe asyncio framework for Python",
"version": "1.1.11",
"project_urls": {
"Documentation": "https://github.com/asynq-io/eventiq#readme",
"Issues": "https://github.com/asynq-io/eventiq/issues",
"Source": "https://github.com/asynq-io/eventiq"
},
"split_keywords": [
"asyncapi",
" asyncio",
" cloud-events",
" event-driven",
" framework",
" microservice",
" python"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "c568a31b262c11cc10b6c693d65e9049b307bfe8ae65e07162c99ec525bb2412",
"md5": "3e7d5bd6fbf174927e5e9a73877d3239",
"sha256": "855b3ceb3340fcc344280da82bddf319961b33f929d1f22e6cb406829b392cbc"
},
"downloads": -1,
"filename": "eventiq-1.1.11-py3-none-any.whl",
"has_sig": false,
"md5_digest": "3e7d5bd6fbf174927e5e9a73877d3239",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 45346,
"upload_time": "2025-10-24T12:44:15",
"upload_time_iso_8601": "2025-10-24T12:44:15.919504Z",
"url": "https://files.pythonhosted.org/packages/c5/68/a31b262c11cc10b6c693d65e9049b307bfe8ae65e07162c99ec525bb2412/eventiq-1.1.11-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "36ba094dcfaf38aeee03c5e6702c8817121a7e95ff9c1d00ea5e02b0b5a571b8",
"md5": "9dfb979ce3d3deca96ee5eb9020e44b7",
"sha256": "0b70434be7d61d51f705640a4c62c55d7e0281d1db95cbed387ee3a414d55bef"
},
"downloads": -1,
"filename": "eventiq-1.1.11.tar.gz",
"has_sig": false,
"md5_digest": "9dfb979ce3d3deca96ee5eb9020e44b7",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 37949,
"upload_time": "2025-10-24T12:44:17",
"upload_time_iso_8601": "2025-10-24T12:44:17.399654Z",
"url": "https://files.pythonhosted.org/packages/36/ba/094dcfaf38aeee03c5e6702c8817121a7e95ff9c1d00ea5e02b0b5a571b8/eventiq-1.1.11.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-10-24 12:44:17",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "asynq-io",
"github_project": "eventiq#readme",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "eventiq"
}