# Kstreams
`kstreams` is a library/micro framework to use with `kafka`. It has simple kafka streams implementation that gives certain guarantees, see below.

[](https://codecov.io/gh/kpn/kstreams)

---
**Documentation**: https://kpn.github.io/kstreams/
---
## Installation
```bash
pip install kstreams
```
You will need a worker, we recommend [aiorun](https://github.com/cjrh/aiorun)
```bash
pip install aiorun
```
## Usage
```python
import aiorun
from kstreams import create_engine, ConsumerRecord
stream_engine = create_engine(title="my-stream-engine")
@stream_engine.stream("local--kstream")
async def consume(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")
async def produce():
payload = b'{"message": "Hello world!"}'
for i in range(5):
metadata = await stream_engine.send("local--kstreams", value=payload)
print(f"Message sent: {metadata}")
async def start():
await stream_engine.start()
await produce()
async def shutdown(loop):
await stream_engine.stop()
if __name__ == "__main__":
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)
```
## Features
- [x] Produce events
- [x] Consumer events with `Streams`
- [x] Subscribe to topics by `pattern`
- [x] `Prometheus` metrics and custom monitoring
- [x] TestClient
- [x] Custom Serialization and Deserialization
- [x] Easy to integrate with any `async` framework. No tied to any library!!
- [x] Yield events from streams
- [x] [Opentelemetry Instrumentation](https://github.com/kpn/opentelemetry-instrumentation-kstreams)
- [x] Middlewares
- [x] Hooks (on_startup, on_stop, after_startup, after_stop)
- [ ] Store (kafka streams pattern)
- [ ] Stream Join
- [ ] Windowing
## Development
This repo requires the use of [poetry](https://python-poetry.org/docs/basic-usage/) instead of pip.
*Note*: If you want to have the `virtualenv` in the same path as the project first you should run `poetry config --local virtualenvs.in-project true`
To install the dependencies just execute:
```bash
poetry install
```
Then you can activate the `virtualenv` with
```bash
poetry shell
```
Run test:
```bash
./scripts/test
```
Run code formatting with ruff:
```bash
./scripts/format
```
### Commit messages
We use [conventional commits](https://www.conventionalcommits.org/en/v1.0.0/) for the commit message.
The use of [commitizen](https://commitizen-tools.github.io/commitizen/) is recommended. Commitizen is part of the dev dependencies.
```bash
cz commit
```
Raw data
{
"_id": null,
"home_page": null,
"name": "kstreams",
"maintainer": "Santiago Fraire Willemo\u00ebs",
"docs_url": null,
"requires_python": "<4.0,>=3.9",
"maintainer_email": "santiago.fraire@kpn.com",
"keywords": "stream, processing, kafka, event streaming",
"author": "Marcos Schroh",
"author_email": "marcos.schroh@kpn.com",
"download_url": "https://files.pythonhosted.org/packages/b9/71/950ebfb5810c865dbe4f459690fb75db17d3db26123b057029e3fd835f1a/kstreams-0.26.7.tar.gz",
"platform": null,
"description": "# Kstreams\n\n`kstreams` is a library/micro framework to use with `kafka`. It has simple kafka streams implementation that gives certain guarantees, see below.\n\n\n[](https://codecov.io/gh/kpn/kstreams)\n\n\n---\n\n**Documentation**: https://kpn.github.io/kstreams/\n\n---\n\n## Installation\n\n```bash\npip install kstreams\n```\n\nYou will need a worker, we recommend [aiorun](https://github.com/cjrh/aiorun)\n\n```bash\npip install aiorun\n```\n\n## Usage\n\n```python\nimport aiorun\nfrom kstreams import create_engine, ConsumerRecord\n\n\nstream_engine = create_engine(title=\"my-stream-engine\")\n\n@stream_engine.stream(\"local--kstream\")\nasync def consume(cr: ConsumerRecord):\n print(f\"Event consumed: headers: {cr.headers}, payload: {cr.value}\")\n\n\nasync def produce():\n payload = b'{\"message\": \"Hello world!\"}'\n\n for i in range(5):\n metadata = await stream_engine.send(\"local--kstreams\", value=payload)\n print(f\"Message sent: {metadata}\")\n\n\nasync def start():\n await stream_engine.start()\n await produce()\n\n\nasync def shutdown(loop):\n await stream_engine.stop()\n\n\nif __name__ == \"__main__\":\n aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)\n```\n\n## Features\n\n- [x] Produce events\n- [x] Consumer events with `Streams`\n- [x] Subscribe to topics by `pattern`\n- [x] `Prometheus` metrics and custom monitoring\n- [x] TestClient\n- [x] Custom Serialization and Deserialization\n- [x] Easy to integrate with any `async` framework. No tied to any library!!\n- [x] Yield events from streams\n- [x] [Opentelemetry Instrumentation](https://github.com/kpn/opentelemetry-instrumentation-kstreams)\n- [x] Middlewares\n- [x] Hooks (on_startup, on_stop, after_startup, after_stop)\n- [ ] Store (kafka streams pattern)\n- [ ] Stream Join\n- [ ] Windowing\n\n## Development\n\nThis repo requires the use of [poetry](https://python-poetry.org/docs/basic-usage/) instead of pip.\n*Note*: If you want to have the `virtualenv` in the same path as the project first you should run `poetry config --local virtualenvs.in-project true`\n\nTo install the dependencies just execute:\n\n```bash\npoetry install\n```\n\nThen you can activate the `virtualenv` with\n\n```bash\npoetry shell\n```\n\nRun test:\n\n```bash\n./scripts/test\n```\n\nRun code formatting with ruff:\n\n```bash\n./scripts/format\n```\n\n### Commit messages\n\nWe use [conventional commits](https://www.conventionalcommits.org/en/v1.0.0/) for the commit message.\n\nThe use of [commitizen](https://commitizen-tools.github.io/commitizen/) is recommended. Commitizen is part of the dev dependencies.\n\n```bash\ncz commit\n```\n\n",
"bugtrack_url": null,
"license": "Apache-2.0",
"summary": "Build simple kafka streams applications",
"version": "0.26.7",
"project_urls": null,
"split_keywords": [
"stream",
" processing",
" kafka",
" event streaming"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "d6aae60dd8dbfb90407e0184c04e4ce7642d32c168999e5ccd47260263be6832",
"md5": "51649241ca7323d87b0c75bd29bc00c4",
"sha256": "1adaac49204234fb7b48c934a75e88ac5e066b7131522828de1a3455f9a0dddb"
},
"downloads": -1,
"filename": "kstreams-0.26.7-py3-none-any.whl",
"has_sig": false,
"md5_digest": "51649241ca7323d87b0c75bd29bc00c4",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.9",
"size": 38094,
"upload_time": "2025-03-19T11:13:32",
"upload_time_iso_8601": "2025-03-19T11:13:32.435941Z",
"url": "https://files.pythonhosted.org/packages/d6/aa/e60dd8dbfb90407e0184c04e4ce7642d32c168999e5ccd47260263be6832/kstreams-0.26.7-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "b971950ebfb5810c865dbe4f459690fb75db17d3db26123b057029e3fd835f1a",
"md5": "7d9d6924499cb48d536e35eb32f17d96",
"sha256": "bdd5f0f00fbb139440cf23708e33817d128d0a4f92e2e6092b3c611a07b5f316"
},
"downloads": -1,
"filename": "kstreams-0.26.7.tar.gz",
"has_sig": false,
"md5_digest": "7d9d6924499cb48d536e35eb32f17d96",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.9",
"size": 30305,
"upload_time": "2025-03-19T11:13:33",
"upload_time_iso_8601": "2025-03-19T11:13:33.817325Z",
"url": "https://files.pythonhosted.org/packages/b9/71/950ebfb5810c865dbe4f459690fb75db17d3db26123b057029e3fd835f1a/kstreams-0.26.7.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-03-19 11:13:33",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "kstreams"
}