Name | django-streams JSON |
Version |
3.0.0
JSON |
| download |
home_page | None |
Summary | Django application to produce/consume events from Kafka |
upload_time | 2025-01-07 11:12:41 |
maintainer | None |
docs_url | None |
author | Marcos Schroh |
requires_python | <4.0,>=3.9 |
license | None |
keywords |
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# Django application to produce/consume events from Kafka
![django streaming](docs/img/django_worker.png)
![Build status](https://github.com/kpn/django-streams/actions/workflows/pr-tests.yaml/badge.svg?branch=main)
[![codecov](https://codecov.io/gh/kpn/django-streams/branch/main/graph/badge.svg?token=t7pxIPtphF)](https://codecov.io/gh/kpn/django-streams)
![python version](https://img.shields.io/badge/python-3.9%2B-yellowgreen)
## Installation
```bash
pip install django-streams
```
or with poetry
```bash
poetry add django-streams
```
and add it to `INSTALLED_APPS`:
```python
INSTALLED_APPS = [
...
"django_streams",
...
"my_streams_app",
# etc...
]
```
## Documentation
https://kpn.github.io/django-streams/
## Usage
create the `engine`:
```python
# my_streams_app/engine.py
from django_streams import create_engine
from kstreams.backends import Kafka
stream_engine = create_engine(
title="test-engine",
backend=Kafka(),
)
```
*To configure the backend follow the [kstreams backend documentation](https://kpn.github.io/kstreams/backends/)*
### Consuming events
Define your streams:
```python
# my_streams_app/streams.py
from kstreams import ConsumerRecord
from .engine import stream_engine
@stream_engine.stream("dev-kpn-des--hello-kpn", group_id="django-streams-principal-group-id") # your consumer
async def consumer_task(cr: ConsumerRecord):
async for cr in stream:
logger.info(f"Event consumed: headers: {cr.headers}, value: {cr.value}")
```
and then in your `apps.py` you must import the `python module` or your `coroutines`
```python
# my_streams_app/apps.py
from django.apps import AppConfig
class StreamingAppConfig(AppConfig):
name = "streaming_app"
def ready(self):
from . import streams # import the streams module
```
Now you can run the worker:
```bash
python manage.py worker
```
### Producing events
Producing events can be `sync` or `async`. If you are in a `sync` context you must use `stream_engine.sync_send`, otherwise [stream_engine.send](https://pages.kpn.org/repos-docs/dsl/django-streams/producer/#producing-in-an-async-context). For both cases a `RecordMetadata` is returned.
```python
# streaming_app/views.py
from django.http import HttpResponse
from django.views.generic import View
from de.core.conf import settings
from .engine import stream_engine
class HelloWorldView(View):
def get(self, request, *args, **kwargs):
topic = f"{settings.KAFKA_TOPIC_PREFIX}hello-kpn"
record_metadata = stream_engine.sync_send(
topic,
value=b"hello world",
key="hello",
partition=None,
timestamp_ms=None,
headers=None,
)
return HttpResponse(f"Event metadata: {record_metadata}")
```
## Benchmark
Producer:
| Total produced events | Time (seconds) |
|--------------|----------------|
| 1 | 0.004278898239135742 |
| 10 | 0.030963897705078125 |
| 100 | 0.07049298286437988 |
| 1000 | 0.6609988212585449 |
| 10000 | 6.501222133636475 |
## Running tests
```bash
./scrtips/test
```
## Code formating
```bash
./scrtips/format
```
Raw data
{
"_id": null,
"home_page": null,
"name": "django-streams",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.9",
"maintainer_email": null,
"keywords": null,
"author": "Marcos Schroh",
"author_email": "marcos.schroh@kpn.com",
"download_url": "https://files.pythonhosted.org/packages/9f/e3/419222adb3fbcf36fd465364f6c0d9432828e6ddc465a7d7cdcfdd808980/django_streams-3.0.0.tar.gz",
"platform": null,
"description": "# Django application to produce/consume events from Kafka\n\n![django streaming](docs/img/django_worker.png)\n\n![Build status](https://github.com/kpn/django-streams/actions/workflows/pr-tests.yaml/badge.svg?branch=main)\n[![codecov](https://codecov.io/gh/kpn/django-streams/branch/main/graph/badge.svg?token=t7pxIPtphF)](https://codecov.io/gh/kpn/django-streams)\n![python version](https://img.shields.io/badge/python-3.9%2B-yellowgreen)\n\n## Installation\n\n```bash\npip install django-streams\n```\n\nor with poetry\n\n```bash\npoetry add django-streams\n```\n\nand add it to `INSTALLED_APPS`:\n\n```python\nINSTALLED_APPS = [\n ...\n \"django_streams\",\n ...\n \"my_streams_app\",\n # etc...\n]\n```\n\n## Documentation\n\nhttps://kpn.github.io/django-streams/\n\n## Usage\n\ncreate the `engine`:\n\n```python\n# my_streams_app/engine.py\nfrom django_streams import create_engine\n\nfrom kstreams.backends import Kafka\n\n\nstream_engine = create_engine(\n title=\"test-engine\",\n backend=Kafka(),\n)\n```\n\n*To configure the backend follow the [kstreams backend documentation](https://kpn.github.io/kstreams/backends/)*\n\n### Consuming events\n\nDefine your streams:\n\n```python\n# my_streams_app/streams.py\nfrom kstreams import ConsumerRecord\nfrom .engine import stream_engine\n\n\n@stream_engine.stream(\"dev-kpn-des--hello-kpn\", group_id=\"django-streams-principal-group-id\") # your consumer\nasync def consumer_task(cr: ConsumerRecord):\n async for cr in stream:\n logger.info(f\"Event consumed: headers: {cr.headers}, value: {cr.value}\")\n```\n\nand then in your `apps.py` you must import the `python module` or your `coroutines`\n\n```python\n# my_streams_app/apps.py\nfrom django.apps import AppConfig\n\n\nclass StreamingAppConfig(AppConfig):\n name = \"streaming_app\"\n\n def ready(self):\n from . import streams # import the streams module\n```\n\nNow you can run the worker:\n\n```bash\npython manage.py worker\n```\n\n### Producing events\n\nProducing events can be `sync` or `async`. If you are in a `sync` context you must use `stream_engine.sync_send`, otherwise [stream_engine.send](https://pages.kpn.org/repos-docs/dsl/django-streams/producer/#producing-in-an-async-context). For both cases a `RecordMetadata` is returned.\n\n```python\n# streaming_app/views.py\nfrom django.http import HttpResponse\nfrom django.views.generic import View\n\nfrom de.core.conf import settings\nfrom .engine import stream_engine\n\n\nclass HelloWorldView(View):\n\n def get(self, request, *args, **kwargs):\n topic = f\"{settings.KAFKA_TOPIC_PREFIX}hello-kpn\"\n\n record_metadata = stream_engine.sync_send(\n topic,\n value=b\"hello world\",\n key=\"hello\",\n partition=None,\n timestamp_ms=None,\n headers=None,\n )\n\n return HttpResponse(f\"Event metadata: {record_metadata}\")\n```\n\n## Benchmark\n\nProducer:\n\n| Total produced events | Time (seconds) |\n|--------------|----------------|\n| 1 | 0.004278898239135742 |\n| 10 | 0.030963897705078125 |\n| 100 | 0.07049298286437988 |\n| 1000 | 0.6609988212585449 |\n| 10000 | 6.501222133636475 |\n\n## Running tests\n\n```bash\n./scrtips/test\n```\n\n## Code formating\n\n```bash\n./scrtips/format\n```\n",
"bugtrack_url": null,
"license": null,
"summary": "Django application to produce/consume events from Kafka",
"version": "3.0.0",
"project_urls": null,
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "2e16cb08c8d13c03923a8685ff3dca0ccad08d5c139cdf70ad1c862ace11bae3",
"md5": "a9f7a1c76325991988f11ebd66efa167",
"sha256": "5fed031c84cca41a66be95ac6ab967ad571f59c705b55895de6f752eb1d50e00"
},
"downloads": -1,
"filename": "django_streams-3.0.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "a9f7a1c76325991988f11ebd66efa167",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.9",
"size": 11080,
"upload_time": "2025-01-07T11:12:40",
"upload_time_iso_8601": "2025-01-07T11:12:40.327398Z",
"url": "https://files.pythonhosted.org/packages/2e/16/cb08c8d13c03923a8685ff3dca0ccad08d5c139cdf70ad1c862ace11bae3/django_streams-3.0.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "9fe3419222adb3fbcf36fd465364f6c0d9432828e6ddc465a7d7cdcfdd808980",
"md5": "e195a291973b2513c08706afc7741798",
"sha256": "45b23fe98b2fb688b8172014f90671015551f4212ba4e9274b615a513461136e"
},
"downloads": -1,
"filename": "django_streams-3.0.0.tar.gz",
"has_sig": false,
"md5_digest": "e195a291973b2513c08706afc7741798",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.9",
"size": 14974,
"upload_time": "2025-01-07T11:12:41",
"upload_time_iso_8601": "2025-01-07T11:12:41.994762Z",
"url": "https://files.pythonhosted.org/packages/9f/e3/419222adb3fbcf36fd465364f6c0d9432828e6ddc465a7d7cdcfdd808980/django_streams-3.0.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-01-07 11:12:41",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "django-streams"
}