eventiq


Nameeventiq JSON
Version 1.1.7 PyPI version JSON
download
home_pageNone
SummaryPublish/Subscribe asyncio framework for Python
upload_time2024-10-17 21:20:16
maintainerNone
docs_urlNone
authorNone
requires_python>=3.9
licenseNone
keywords asyncapi asyncio cloud-events event-driven framework microservice python
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            <p align="center">
<img src="https://raw.githubusercontent.com/asynq-io/eventiq/main/assets/logo.svg" style="width: 250px">

</p>
<p align="center">
<em>Asyncio native pub/sub framework for Python</em>
</p>

![Tests](https://github.com/asynq-io/eventiq/workflows/Tests/badge.svg)
![Build](https://github.com/asynq-io/eventiq/workflows/Publish/badge.svg)
![License](https://img.shields.io/github/license/asynq-io/eventiq)
![Mypy](https://img.shields.io/badge/mypy-checked-blue)
[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/charliermarsh/ruff/main/assets/badge/v1.json)](https://github.com/charliermarsh/ruff)
[![Pydantic v2](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/pydantic/pydantic/main/docs/badge/v2.json)](https://docs.pydantic.dev/latest/contributing/#badges)
[![security: bandit](https://img.shields.io/badge/security-bandit-yellow.svg)](https://github.com/PyCQA/bandit)
![Python](https://img.shields.io/pypi/pyversions/eventiq)
![Format](https://img.shields.io/pypi/format/eventiq)
![PyPi](https://img.shields.io/pypi/v/eventiq)

## Installation
```shell
pip install eventiq
```
or
```shell
poetry add eventiq
```

### Installing optional dependencies

```shell
pip install 'eventiq[broker]'
```

### Available brokers

- `nats`
- `rabbitmq`
- `kafka`
- `redis`

## Features

- Modern, `asyncio` based python 3.8+ syntax
- Fully type annotated
- Minimal external dependencies (`anyio`, `pydantic`, `typer`)
- Automatic message parsing based on type annotations using `pydantic`
- Code hot-reload
- Highly scalable: each service can process hundreds of tasks concurrently,
    all messages are load balanced between all instances by default
- Resilient - at least once delivery for all messages by default (except for Redis*)
- Customizable & pluggable message encoder/decoder (`json` as default)
- Multiple broker support
    - Memory (for testing)
    - Nats
    - Kafka
    - Rabbitmq
    - Redis
- Result Backend implementation for Nats & Redis
- Lifespan protocol support
- Lightweight (and completely optional) dependency injection system based on type annotations
- Easy and lightweight (~3k lines of code including types definitions and brokers implementations)
- [Cloud Events](https://cloudevents.io/) standard as base message structure (no more python specific `*args` and `**kwargs` in messages)
- [AsyncAPI](https://www.asyncapi.com/en) documentation generation from code
- Twelve factor app approach - stdout logging, configuration through environment variables
- Easily extensible via Middlewares
- Multiple extensions and integrations including:
  - Prometheus - mertics exporter
  - OpenTelemetry - tracing and metrics
  - Message Pack - message pack encoder for messages
  - FastAPI - integrating eventiq Service with FastAPI applications (WIP)
  - Dataref - data reference resolver for messages (WIP)
  - Eventiq Workflows - orchestration engine built on top of eventiq (WIP)

## Basic Usage

```Python
import asyncio
from eventiq import Service, Middleware, CloudEvent, GenericConsumer
from eventiq.backends.nats import JetStreamBroker

class SendMessageMiddleware(Middleware):
    async def after_broker_connect(self):
        print(f"After service start, running with {service.broker}")
        await asyncio.sleep(10)
        for i in range(100):
            message = CloudEvent(topic="test.topic", data={"counter": i})
            await service.publish(message)
        print("Published messages(s)")

broker = JetStreamBroker(url="nats://localhost:4222")

service = Service(
    name="example-service",
    broker=broker,
)
service.add_middleware(SendMessageMiddleware)

@service.subscribe(topic="test.topic")
async def example_run(message: CloudEvent):
    print(f"Received Message {message.id} with data: {message.data}")

@service.subscribe(topic="test.topic2")
class MyConsumer(GenericConsumer[CloudEvent]):
    async def process(self, message: CloudEvent):
        print(f"Received Message {message.id} with data: {message.data}")
        await self.publish(CloudEvent(topic="test.topic", data={"response": "ok"})

```

Run with

```shell
eventiq run app:service --log-level=info
```

## Watching for changes

```shell
eventiq run app:service --log-level=info --reload=.
```

## Testing

`StubBroker` class is provided as in memory replacement for running unit tests

```python
import os


def get_broker(**kwargs):
    if os.getenv('ENV') == 'TEST':
        from eventiq.backends.stub import StubBroker
        return StubBroker()
    else:
        from eventiq.backends.rabbitmq import RabbitmqBroker
        return RabbitmqBroker(**kwargs)

broker = get_broker()

```

Furthermore, subscribers are just regular python coroutines, so it's possible to test them simply by invocation

```python

# main.py
@service.subscribe(topic="test.topic")
async def my_subscriber(message: CloudEvent):
    return 42

# tests.py
from main import my_subscriber

async def test_my_subscriber():
    result = await my_subscriber(None)
    assert result == 42

```

## CLI

Getting help:
```shell
eventiq --help
```

Installing shell autocompletion:
```shell
eventiq --install-completion [bash|zsh|fish|powershell|pwsh]
```

Basic commands

- `run` - run service
- `docs` - generate AsyncAPI docs
- `send` - send message to broker

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "eventiq",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.9",
    "maintainer_email": null,
    "keywords": "asyncapi, asyncio, cloud-events, event-driven, framework, microservice, python",
    "author": null,
    "author_email": "RaRhAeu <rarha_eu@protonmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/ce/cb/136811bf4ced129fc811479f55cabc7ecfdd73317e29f1bb07bb301c48a6/eventiq-1.1.7.tar.gz",
    "platform": null,
    "description": "<p align=\"center\">\n<img src=\"https://raw.githubusercontent.com/asynq-io/eventiq/main/assets/logo.svg\" style=\"width: 250px\">\n\n</p>\n<p align=\"center\">\n<em>Asyncio native pub/sub framework for Python</em>\n</p>\n\n![Tests](https://github.com/asynq-io/eventiq/workflows/Tests/badge.svg)\n![Build](https://github.com/asynq-io/eventiq/workflows/Publish/badge.svg)\n![License](https://img.shields.io/github/license/asynq-io/eventiq)\n![Mypy](https://img.shields.io/badge/mypy-checked-blue)\n[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/charliermarsh/ruff/main/assets/badge/v1.json)](https://github.com/charliermarsh/ruff)\n[![Pydantic v2](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/pydantic/pydantic/main/docs/badge/v2.json)](https://docs.pydantic.dev/latest/contributing/#badges)\n[![security: bandit](https://img.shields.io/badge/security-bandit-yellow.svg)](https://github.com/PyCQA/bandit)\n![Python](https://img.shields.io/pypi/pyversions/eventiq)\n![Format](https://img.shields.io/pypi/format/eventiq)\n![PyPi](https://img.shields.io/pypi/v/eventiq)\n\n## Installation\n```shell\npip install eventiq\n```\nor\n```shell\npoetry add eventiq\n```\n\n### Installing optional dependencies\n\n```shell\npip install 'eventiq[broker]'\n```\n\n### Available brokers\n\n- `nats`\n- `rabbitmq`\n- `kafka`\n- `redis`\n\n## Features\n\n- Modern, `asyncio` based python 3.8+ syntax\n- Fully type annotated\n- Minimal external dependencies (`anyio`, `pydantic`, `typer`)\n- Automatic message parsing based on type annotations using `pydantic`\n- Code hot-reload\n- Highly scalable: each service can process hundreds of tasks concurrently,\n    all messages are load balanced between all instances by default\n- Resilient - at least once delivery for all messages by default (except for Redis*)\n- Customizable & pluggable message encoder/decoder (`json` as default)\n- Multiple broker support\n    - Memory (for testing)\n    - Nats\n    - Kafka\n    - Rabbitmq\n    - Redis\n- Result Backend implementation for Nats & Redis\n- Lifespan protocol support\n- Lightweight (and completely optional) dependency injection system based on type annotations\n- Easy and lightweight (~3k lines of code including types definitions and brokers implementations)\n- [Cloud Events](https://cloudevents.io/) standard as base message structure (no more python specific `*args` and `**kwargs` in messages)\n- [AsyncAPI](https://www.asyncapi.com/en) documentation generation from code\n- Twelve factor app approach - stdout logging, configuration through environment variables\n- Easily extensible via Middlewares\n- Multiple extensions and integrations including:\n  - Prometheus - mertics exporter\n  - OpenTelemetry - tracing and metrics\n  - Message Pack - message pack encoder for messages\n  - FastAPI - integrating eventiq Service with FastAPI applications (WIP)\n  - Dataref - data reference resolver for messages (WIP)\n  - Eventiq Workflows - orchestration engine built on top of eventiq (WIP)\n\n## Basic Usage\n\n```Python\nimport asyncio\nfrom eventiq import Service, Middleware, CloudEvent, GenericConsumer\nfrom eventiq.backends.nats import JetStreamBroker\n\nclass SendMessageMiddleware(Middleware):\n    async def after_broker_connect(self):\n        print(f\"After service start, running with {service.broker}\")\n        await asyncio.sleep(10)\n        for i in range(100):\n            message = CloudEvent(topic=\"test.topic\", data={\"counter\": i})\n            await service.publish(message)\n        print(\"Published messages(s)\")\n\nbroker = JetStreamBroker(url=\"nats://localhost:4222\")\n\nservice = Service(\n    name=\"example-service\",\n    broker=broker,\n)\nservice.add_middleware(SendMessageMiddleware)\n\n@service.subscribe(topic=\"test.topic\")\nasync def example_run(message: CloudEvent):\n    print(f\"Received Message {message.id} with data: {message.data}\")\n\n@service.subscribe(topic=\"test.topic2\")\nclass MyConsumer(GenericConsumer[CloudEvent]):\n    async def process(self, message: CloudEvent):\n        print(f\"Received Message {message.id} with data: {message.data}\")\n        await self.publish(CloudEvent(topic=\"test.topic\", data={\"response\": \"ok\"})\n\n```\n\nRun with\n\n```shell\neventiq run app:service --log-level=info\n```\n\n## Watching for changes\n\n```shell\neventiq run app:service --log-level=info --reload=.\n```\n\n## Testing\n\n`StubBroker` class is provided as in memory replacement for running unit tests\n\n```python\nimport os\n\n\ndef get_broker(**kwargs):\n    if os.getenv('ENV') == 'TEST':\n        from eventiq.backends.stub import StubBroker\n        return StubBroker()\n    else:\n        from eventiq.backends.rabbitmq import RabbitmqBroker\n        return RabbitmqBroker(**kwargs)\n\nbroker = get_broker()\n\n```\n\nFurthermore, subscribers are just regular python coroutines, so it's possible to test them simply by invocation\n\n```python\n\n# main.py\n@service.subscribe(topic=\"test.topic\")\nasync def my_subscriber(message: CloudEvent):\n    return 42\n\n# tests.py\nfrom main import my_subscriber\n\nasync def test_my_subscriber():\n    result = await my_subscriber(None)\n    assert result == 42\n\n```\n\n## CLI\n\nGetting help:\n```shell\neventiq --help\n```\n\nInstalling shell autocompletion:\n```shell\neventiq --install-completion [bash|zsh|fish|powershell|pwsh]\n```\n\nBasic commands\n\n- `run` - run service\n- `docs` - generate AsyncAPI docs\n- `send` - send message to broker\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "Publish/Subscribe asyncio framework for Python",
    "version": "1.1.7",
    "project_urls": {
        "Documentation": "https://github.com/asynq-io/eventiq#readme",
        "Issues": "https://github.com/asynq-io/eventiq/issues",
        "Source": "https://github.com/asynq-io/eventiq"
    },
    "split_keywords": [
        "asyncapi",
        " asyncio",
        " cloud-events",
        " event-driven",
        " framework",
        " microservice",
        " python"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "357d9f132559677bb44aaf59f599a38739b1c9241c191944a2d3c565f659a59a",
                "md5": "8efd98815ded7bd4a80c45844d1c7da4",
                "sha256": "38fd376451c8b9c5778bdb422b9189bb58eb0c51d0010eb0714d6e460c24f59e"
            },
            "downloads": -1,
            "filename": "eventiq-1.1.7-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "8efd98815ded7bd4a80c45844d1c7da4",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.9",
            "size": 45148,
            "upload_time": "2024-10-17T21:20:15",
            "upload_time_iso_8601": "2024-10-17T21:20:15.241182Z",
            "url": "https://files.pythonhosted.org/packages/35/7d/9f132559677bb44aaf59f599a38739b1c9241c191944a2d3c565f659a59a/eventiq-1.1.7-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "cecb136811bf4ced129fc811479f55cabc7ecfdd73317e29f1bb07bb301c48a6",
                "md5": "51e21d45ee3283d3085493c1b763b08c",
                "sha256": "b50778cd92d6ee7eaa016f0b4b5b3c7ba864781c81ba59e4055e9e7212ef8827"
            },
            "downloads": -1,
            "filename": "eventiq-1.1.7.tar.gz",
            "has_sig": false,
            "md5_digest": "51e21d45ee3283d3085493c1b763b08c",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.9",
            "size": 37761,
            "upload_time": "2024-10-17T21:20:16",
            "upload_time_iso_8601": "2024-10-17T21:20:16.987720Z",
            "url": "https://files.pythonhosted.org/packages/ce/cb/136811bf4ced129fc811479f55cabc7ecfdd73317e29f1bb07bb301c48a6/eventiq-1.1.7.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-10-17 21:20:16",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "asynq-io",
    "github_project": "eventiq#readme",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "eventiq"
}
        
Elapsed time: 0.62919s