django-streams


Namedjango-streams JSON
Version 3.0.0 PyPI version JSON
download
home_pageNone
SummaryDjango application to produce/consume events from Kafka
upload_time2025-01-07 11:12:41
maintainerNone
docs_urlNone
authorMarcos Schroh
requires_python<4.0,>=3.9
licenseNone
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Django application to produce/consume events from Kafka

![django streaming](docs/img/django_worker.png)

![Build status](https://github.com/kpn/django-streams/actions/workflows/pr-tests.yaml/badge.svg?branch=main)
[![codecov](https://codecov.io/gh/kpn/django-streams/branch/main/graph/badge.svg?token=t7pxIPtphF)](https://codecov.io/gh/kpn/django-streams)
![python version](https://img.shields.io/badge/python-3.9%2B-yellowgreen)

## Installation

```bash
pip install django-streams
```

or with poetry

```bash
poetry add django-streams
```

and add it to `INSTALLED_APPS`:

```python
INSTALLED_APPS = [
    ...
    "django_streams",
    ...
    "my_streams_app",
    # etc...
]
```

## Documentation

https://kpn.github.io/django-streams/

## Usage

create the `engine`:

```python
# my_streams_app/engine.py
from django_streams import create_engine

from kstreams.backends import Kafka


stream_engine = create_engine(
    title="test-engine",
    backend=Kafka(),
)
```

*To configure the backend follow the [kstreams backend documentation](https://kpn.github.io/kstreams/backends/)*

### Consuming events

Define your streams:

```python
# my_streams_app/streams.py
from kstreams import ConsumerRecord
from .engine import stream_engine


@stream_engine.stream("dev-kpn-des--hello-kpn", group_id="django-streams-principal-group-id")  # your consumer
async def consumer_task(cr: ConsumerRecord):
    async for cr in stream:
        logger.info(f"Event consumed: headers: {cr.headers}, value: {cr.value}")
```

and then in your `apps.py` you must import the `python module` or your `coroutines`

```python
# my_streams_app/apps.py
from django.apps import AppConfig


class StreamingAppConfig(AppConfig):
    name = "streaming_app"

    def ready(self):
        from . import streams  # import the streams module
```

Now you can run the worker:

```bash
python manage.py worker
```

### Producing events

Producing events can be `sync` or `async`. If you are in a `sync` context you must use `stream_engine.sync_send`, otherwise [stream_engine.send](https://pages.kpn.org/repos-docs/dsl/django-streams/producer/#producing-in-an-async-context). For both cases a `RecordMetadata` is returned.

```python
# streaming_app/views.py
from django.http import HttpResponse
from django.views.generic import View

from de.core.conf import settings
from .engine import stream_engine


class HelloWorldView(View):

    def get(self, request, *args, **kwargs):
        topic = f"{settings.KAFKA_TOPIC_PREFIX}hello-kpn"

        record_metadata = stream_engine.sync_send(
            topic,
            value=b"hello world",
            key="hello",
            partition=None,
            timestamp_ms=None,
            headers=None,
        )

        return HttpResponse(f"Event metadata: {record_metadata}")
```

## Benchmark

Producer:

| Total produced events | Time (seconds) |
|--------------|----------------|
| 1 | 0.004278898239135742 |
| 10 | 0.030963897705078125 |
| 100 | 0.07049298286437988 |
| 1000 | 0.6609988212585449 |
| 10000 | 6.501222133636475 |

## Running tests

```bash
./scrtips/test
```

## Code formating

```bash
./scrtips/format
```

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "django-streams",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.9",
    "maintainer_email": null,
    "keywords": null,
    "author": "Marcos Schroh",
    "author_email": "marcos.schroh@kpn.com",
    "download_url": "https://files.pythonhosted.org/packages/9f/e3/419222adb3fbcf36fd465364f6c0d9432828e6ddc465a7d7cdcfdd808980/django_streams-3.0.0.tar.gz",
    "platform": null,
    "description": "# Django application to produce/consume events from Kafka\n\n![django streaming](docs/img/django_worker.png)\n\n![Build status](https://github.com/kpn/django-streams/actions/workflows/pr-tests.yaml/badge.svg?branch=main)\n[![codecov](https://codecov.io/gh/kpn/django-streams/branch/main/graph/badge.svg?token=t7pxIPtphF)](https://codecov.io/gh/kpn/django-streams)\n![python version](https://img.shields.io/badge/python-3.9%2B-yellowgreen)\n\n## Installation\n\n```bash\npip install django-streams\n```\n\nor with poetry\n\n```bash\npoetry add django-streams\n```\n\nand add it to `INSTALLED_APPS`:\n\n```python\nINSTALLED_APPS = [\n    ...\n    \"django_streams\",\n    ...\n    \"my_streams_app\",\n    # etc...\n]\n```\n\n## Documentation\n\nhttps://kpn.github.io/django-streams/\n\n## Usage\n\ncreate the `engine`:\n\n```python\n# my_streams_app/engine.py\nfrom django_streams import create_engine\n\nfrom kstreams.backends import Kafka\n\n\nstream_engine = create_engine(\n    title=\"test-engine\",\n    backend=Kafka(),\n)\n```\n\n*To configure the backend follow the [kstreams backend documentation](https://kpn.github.io/kstreams/backends/)*\n\n### Consuming events\n\nDefine your streams:\n\n```python\n# my_streams_app/streams.py\nfrom kstreams import ConsumerRecord\nfrom .engine import stream_engine\n\n\n@stream_engine.stream(\"dev-kpn-des--hello-kpn\", group_id=\"django-streams-principal-group-id\")  # your consumer\nasync def consumer_task(cr: ConsumerRecord):\n    async for cr in stream:\n        logger.info(f\"Event consumed: headers: {cr.headers}, value: {cr.value}\")\n```\n\nand then in your `apps.py` you must import the `python module` or your `coroutines`\n\n```python\n# my_streams_app/apps.py\nfrom django.apps import AppConfig\n\n\nclass StreamingAppConfig(AppConfig):\n    name = \"streaming_app\"\n\n    def ready(self):\n        from . import streams  # import the streams module\n```\n\nNow you can run the worker:\n\n```bash\npython manage.py worker\n```\n\n### Producing events\n\nProducing events can be `sync` or `async`. If you are in a `sync` context you must use `stream_engine.sync_send`, otherwise [stream_engine.send](https://pages.kpn.org/repos-docs/dsl/django-streams/producer/#producing-in-an-async-context). For both cases a `RecordMetadata` is returned.\n\n```python\n# streaming_app/views.py\nfrom django.http import HttpResponse\nfrom django.views.generic import View\n\nfrom de.core.conf import settings\nfrom .engine import stream_engine\n\n\nclass HelloWorldView(View):\n\n    def get(self, request, *args, **kwargs):\n        topic = f\"{settings.KAFKA_TOPIC_PREFIX}hello-kpn\"\n\n        record_metadata = stream_engine.sync_send(\n            topic,\n            value=b\"hello world\",\n            key=\"hello\",\n            partition=None,\n            timestamp_ms=None,\n            headers=None,\n        )\n\n        return HttpResponse(f\"Event metadata: {record_metadata}\")\n```\n\n## Benchmark\n\nProducer:\n\n| Total produced events | Time (seconds) |\n|--------------|----------------|\n| 1 | 0.004278898239135742 |\n| 10 | 0.030963897705078125 |\n| 100 | 0.07049298286437988 |\n| 1000 | 0.6609988212585449 |\n| 10000 | 6.501222133636475 |\n\n## Running tests\n\n```bash\n./scrtips/test\n```\n\n## Code formating\n\n```bash\n./scrtips/format\n```\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "Django application to produce/consume events from Kafka",
    "version": "3.0.0",
    "project_urls": null,
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "2e16cb08c8d13c03923a8685ff3dca0ccad08d5c139cdf70ad1c862ace11bae3",
                "md5": "a9f7a1c76325991988f11ebd66efa167",
                "sha256": "5fed031c84cca41a66be95ac6ab967ad571f59c705b55895de6f752eb1d50e00"
            },
            "downloads": -1,
            "filename": "django_streams-3.0.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "a9f7a1c76325991988f11ebd66efa167",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.9",
            "size": 11080,
            "upload_time": "2025-01-07T11:12:40",
            "upload_time_iso_8601": "2025-01-07T11:12:40.327398Z",
            "url": "https://files.pythonhosted.org/packages/2e/16/cb08c8d13c03923a8685ff3dca0ccad08d5c139cdf70ad1c862ace11bae3/django_streams-3.0.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "9fe3419222adb3fbcf36fd465364f6c0d9432828e6ddc465a7d7cdcfdd808980",
                "md5": "e195a291973b2513c08706afc7741798",
                "sha256": "45b23fe98b2fb688b8172014f90671015551f4212ba4e9274b615a513461136e"
            },
            "downloads": -1,
            "filename": "django_streams-3.0.0.tar.gz",
            "has_sig": false,
            "md5_digest": "e195a291973b2513c08706afc7741798",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.9",
            "size": 14974,
            "upload_time": "2025-01-07T11:12:41",
            "upload_time_iso_8601": "2025-01-07T11:12:41.994762Z",
            "url": "https://files.pythonhosted.org/packages/9f/e3/419222adb3fbcf36fd465364f6c0d9432828e6ddc465a7d7cdcfdd808980/django_streams-3.0.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-01-07 11:12:41",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "django-streams"
}
        
Elapsed time: 0.39503s