# Nameko-Kafka
[![Build Status](https://travis-ci.com/ketgo/nameko-kafka.svg?branch=master)](https://travis-ci.com/ketgo/nameko-kafka)
[![codecov.io](https://codecov.io/gh/ketgo/nameko-kafka/coverage.svg?branch=master)](https://codecov.io/gh/ketgo/nameko-kafka/coverage.svg?branch=master)
[![MIT licensed](https://img.shields.io/badge/License-MIT-yellow.svg)](LICENSE)
---
Kafka extension for [Nameko](https://www.nameko.io/) microservice framework.
## Introduction
This is a Nameko microservice framework [extension](https://nameko.readthedocs.io/en/stable/key_concepts.html) to support
Kafka entrypoint and dependency. The motivation behind the project is issue [569](https://github.com/nameko/nameko/issues/569).
_Nameko-kafka_ provide a simple implementation of the entrypoint based on the approach by [calumpeterwebb](https://medium.com/@calumpeterwebb/nameko-tutorial-creating-a-kafka-consuming-microservice-c4a7adb804d0).
It also includes a dependency provider for publishing Kafka messages from within a Nameko service.
## Installation
The package is supports Python >= 3.5
```bash
$ pip install nameko-kafka
```
## Usage
The extension can be used for both, a service dependency and entrypoint. Example usage for both cases are shown in the
following sections.
## Dependency
This is basically a [python-kafka](https://github.com/dpkp/kafka-python) producer in the form of Nameko dependency.
Nameko uses dependency injection to instantiate the producer. You just need to declare it in your service class as shown:
```python
from nameko.rpc import rpc
from nameko_kafka import KafkaProducer
class MyService:
"""
My microservice
"""
name = "my-service"
# Kafak dependency
producer = KafkaProducer(bootstrap_servers='localhost:1234')
@rpc
def method(self):
# Publish message using dependency
self.producer.send("kafka-topic", value=b"my-message", key=b"my-key")
```
Here `KafkaProducer` accepts all options valid for `python-kafka`'s [KafkaProducer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html).
### Entrypoint
You can use the `nameko_kafka.consume` decorator in your services to process Kafka messages:
```python
from nameko_kafka import consume
class MyService:
"""
My microservice
"""
name = "my-service"
@consume("kafka-topic", group_id="my-group", bootstrap_servers='localhost:1234')
def method(self, message):
# Your message handler
handle_message(message)
```
The `consume` decorator accepts all the options valid for `python-kafka`'s [KafkaConsumer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html).
On top of the default `python-kafka`'s autocommit feature, the entrypoint also comes with support for three different
types of offset commit strategies: _at least once_, _at most once_ and _exactly once_. The three strategies correspond
to the different message delivery semantics achievable in Kafka. Examples for each are shown in the following subsections.
#### At Least Once
```python
from nameko_kafka import consume, Semantic
class MyService:
"""
My microservice
"""
name = "my-service"
# At least once semantic consumer
@consume("kafka-topic", group_id="my-group", bootstrap_servers='localhost:1234', semantic=Semantic.AT_LEAST_ONCE)
def method(self, message):
# Your message handler
handle_message(message)
```
#### At Most Once
```python
from nameko_kafka import consume, Semantic
class MyService:
"""
My microservice
"""
name = "my-service"
# At most once semantic consumer
@consume("kafka-topic", group_id="my-group", bootstrap_servers='localhost:1234', semantic=Semantic.AT_MOST_ONCE)
def method(self, message):
# Your message handler
handle_message(message)
```
#### Exactly Once
The exactly once semantic requires a persistent storage to save message offsets. Such a persistent store can be
implemented using the `OffsetStorage` interface provided by Nameko-kafka. There can be various backend implementations
like RDBMS, NoSQL databases, etc. Support for some comes out of the box:
##### MongoDB Storage
```python
from nameko_kafka import consume, Semantic
from nameko_kafka.storage import MongoStorage
from pymongo import MongoClient
class MyService:
"""
My microservice
"""
name = "my-service"
# At most once semantic consumer
@consume(
"kafka-topic",
group_id="my-group",
bootstrap_servers='localhost:1234',
semantic=Semantic.EXACTLY_ONCE,
storage=MongoStorage(
# MongoDB backend client
client=MongoClient('localhost', 27017),
# Database to use for storage
db_name="database-name",
# Collection to use for storage
collection="collection-name"
)
)
def method(self, message):
# Your message handler
handle_message(message)
```
Note: If the `db_name` and `collection` arguments are not specified, the default value of `"nameko_kafka_offsets"` and
`"offsets"` will be used by the storage respectively.
##### SQL Storage
Part of v0.3.0
##### S3 Storage
Part of v0.4.0
##### Azure Block Storage
Part of v0.5.0
##### Create Custom Storage
You can create your own offset storage by implementing the `OffsetStorage` interface. It exposes the following methods:
```python
from nameko_kafka.storage.base import OffsetStorage
class MyStorage(OffsetStorage):
"""
My custom offset storage.
"""
def setup(self):
"""
Method for setup of the storage.
"""
def stop(self):
"""
Method to teardown the storage.
"""
def read(self, topic, partition):
"""
Read last stored offset from storage for
given topic and partition.
:param topic: message topic
:param partition: partition number of the topic
:returns: last committed offset value
"""
def write(self, offsets):
"""
Write offsets to storage.
:param offsets: mapping between topic-partition
tuples and corresponding latest offset value,
e.g.
{
("topic-1", 0): 1,
("topic-1", 1): 3,
("topic-2", 1): 10,
...
}
"""
```
## Configurations
The extension configurations can be set in a nameko [config.yaml]((https://docs.nameko.io/en/stable/cli.html)) file, or
by environment variables.
### Config File
```yaml
# Config for entrypoint
KAFKA_CONSUMER:
bootstrap_servers: 'localhost:1234'
retry_backoff_ms: 100
...
# Config for dependency
KAFKA_PRODUCER:
bootstrap_servers: 'localhost:1234'
retries: 3
...
```
### Environment Variables
```.env
# Config for entrypoint
KAFKA_CONSUMER='{"bootstrap_servers": "localhost:1234", "retry_backoff_ms": 100}'
# Config for dependency
KAFKA_PRODUCER='{"bootstrap_servers": "localhost:1234", "retries": 3}'
```
## Milestones
- [x] Kafka Entrypoint
- [x] Kafka Dependency
- [x] Commit strategies:
- _ALMOST_ONCE_DELIVERY_
- _AT_LEAST_ONCE_DELIVERY_
- _EXACTLY_ONCE_DELIVERY_
- [x] Commit storage for _EXACT_ONCE_DELIVERY_ strategy
## Developers
For development a kafka broker is required. You can spawn one using the [docker-compose.yml](https://github.com/ketgo/nameko-kafka/blob/master/tests/conftest.py)
file in the `tests` folder:
```bash
$ cd tests
$ docker-compose up -d
```
To install all package dependencies:
```bash
$ pip install -r .[dev]
or
$ make deps
```
Other useful commands:
```bash
$ pytest --cov=nameko_kafka tests/ # to get coverage report
or
$ make coverage
$ pylint nameko_kafka # to check code quality with PyLint
or
$ make lint
```
## Contributions
Issue reports and Pull requests are always welcomed. Thanks!
Raw data
{
"_id": null,
"home_page": "https://github.com/ketgo/nameko-kafka",
"name": "nameko-kafka",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.4",
"maintainer_email": "",
"keywords": "nameko,kafka,microservice",
"author": "Ketan Goyal",
"author_email": "ketangoyal1988@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/50/b2/28aef78afcd414892ce1d743817f90ec6993f6ec54fe7d356961faa058cd/nameko_kafka-0.2.1.tar.gz",
"platform": null,
"description": "# Nameko-Kafka\n\n[![Build Status](https://travis-ci.com/ketgo/nameko-kafka.svg?branch=master)](https://travis-ci.com/ketgo/nameko-kafka)\n[![codecov.io](https://codecov.io/gh/ketgo/nameko-kafka/coverage.svg?branch=master)](https://codecov.io/gh/ketgo/nameko-kafka/coverage.svg?branch=master)\n[![MIT licensed](https://img.shields.io/badge/License-MIT-yellow.svg)](LICENSE)\n---\n\nKafka extension for [Nameko](https://www.nameko.io/) microservice framework. \n\n## Introduction\n\nThis is a Nameko microservice framework [extension](https://nameko.readthedocs.io/en/stable/key_concepts.html) to support \nKafka entrypoint and dependency. The motivation behind the project is issue [569](https://github.com/nameko/nameko/issues/569). \n_Nameko-kafka_ provide a simple implementation of the entrypoint based on the approach by [calumpeterwebb](https://medium.com/@calumpeterwebb/nameko-tutorial-creating-a-kafka-consuming-microservice-c4a7adb804d0).\nIt also includes a dependency provider for publishing Kafka messages from within a Nameko service.\n\n## Installation\n\nThe package is supports Python >= 3.5\n```bash\n$ pip install nameko-kafka\n```\n\n## Usage\n\nThe extension can be used for both, a service dependency and entrypoint. Example usage for both cases are shown in the\nfollowing sections.\n\n## Dependency\n\nThis is basically a [python-kafka](https://github.com/dpkp/kafka-python) producer in the form of Nameko dependency. \nNameko uses dependency injection to instantiate the producer. You just need to declare it in your service class as shown:\n\n```python\nfrom nameko.rpc import rpc\nfrom nameko_kafka import KafkaProducer\n\n\nclass MyService:\n \"\"\"\n My microservice\n \"\"\"\n name = \"my-service\"\n # Kafak dependency\n producer = KafkaProducer(bootstrap_servers='localhost:1234')\n \n @rpc\n def method(self):\n # Publish message using dependency\n self.producer.send(\"kafka-topic\", value=b\"my-message\", key=b\"my-key\")\n```\n\nHere `KafkaProducer` accepts all options valid for `python-kafka`'s [KafkaProducer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html).\n\n### Entrypoint\n\nYou can use the `nameko_kafka.consume` decorator in your services to process Kafka messages:\n\n```python\nfrom nameko_kafka import consume\n\n\nclass MyService:\n \"\"\"\n My microservice \n \"\"\"\n name = \"my-service\"\n\n @consume(\"kafka-topic\", group_id=\"my-group\", bootstrap_servers='localhost:1234')\n def method(self, message):\n # Your message handler\n handle_message(message) \n```\n\nThe `consume` decorator accepts all the options valid for `python-kafka`'s [KafkaConsumer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html). \n\nOn top of the default `python-kafka`'s autocommit feature, the entrypoint also comes with support for three different \ntypes of offset commit strategies: _at least once_, _at most once_ and _exactly once_. The three strategies correspond \nto the different message delivery semantics achievable in Kafka. Examples for each are shown in the following subsections.\n\n#### At Least Once\n\n```python\nfrom nameko_kafka import consume, Semantic\n\n\nclass MyService:\n \"\"\"\n My microservice \n \"\"\"\n name = \"my-service\"\n \n # At least once semantic consumer\n @consume(\"kafka-topic\", group_id=\"my-group\", bootstrap_servers='localhost:1234', semantic=Semantic.AT_LEAST_ONCE)\n def method(self, message):\n # Your message handler\n handle_message(message) \n```\n\n#### At Most Once\n\n```python\nfrom nameko_kafka import consume, Semantic\n\n\nclass MyService:\n \"\"\"\n My microservice \n \"\"\"\n name = \"my-service\"\n \n # At most once semantic consumer\n @consume(\"kafka-topic\", group_id=\"my-group\", bootstrap_servers='localhost:1234', semantic=Semantic.AT_MOST_ONCE)\n def method(self, message):\n # Your message handler\n handle_message(message) \n```\n\n#### Exactly Once\n\nThe exactly once semantic requires a persistent storage to save message offsets. Such a persistent store can be \nimplemented using the `OffsetStorage` interface provided by Nameko-kafka. There can be various backend implementations \nlike RDBMS, NoSQL databases, etc. Support for some comes out of the box:\n\n##### MongoDB Storage \n\n```python\nfrom nameko_kafka import consume, Semantic\nfrom nameko_kafka.storage import MongoStorage\n\nfrom pymongo import MongoClient\n\n\nclass MyService:\n \"\"\"\n My microservice \n \"\"\"\n name = \"my-service\"\n \n # At most once semantic consumer\n @consume(\n \"kafka-topic\", \n group_id=\"my-group\", \n bootstrap_servers='localhost:1234', \n semantic=Semantic.EXACTLY_ONCE,\n storage=MongoStorage(\n # MongoDB backend client\n client=MongoClient('localhost', 27017),\n # Database to use for storage\n db_name=\"database-name\",\n # Collection to use for storage\n collection=\"collection-name\"\n ) \n )\n def method(self, message):\n # Your message handler\n handle_message(message) \n```\n\nNote: If the `db_name` and `collection` arguments are not specified, the default value of `\"nameko_kafka_offsets\"` and \n`\"offsets\"` will be used by the storage respectively.\n\n##### SQL Storage\n\nPart of v0.3.0\n\n##### S3 Storage\n\nPart of v0.4.0\n\n##### Azure Block Storage\n\nPart of v0.5.0\n\n##### Create Custom Storage\n\nYou can create your own offset storage by implementing the `OffsetStorage` interface. It exposes the following methods:\n\n```python\nfrom nameko_kafka.storage.base import OffsetStorage\n\nclass MyStorage(OffsetStorage):\n \"\"\"\n My custom offset storage.\n \"\"\"\n\n def setup(self):\n \"\"\"\n Method for setup of the storage.\n \"\"\"\n\n def stop(self):\n \"\"\"\n Method to teardown the storage.\n \"\"\"\n\n def read(self, topic, partition):\n \"\"\"\n Read last stored offset from storage for \n given topic and partition.\n\n :param topic: message topic\n :param partition: partition number of the topic\n :returns: last committed offset value\n \"\"\"\n\n def write(self, offsets):\n \"\"\"\n Write offsets to storage.\n\n :param offsets: mapping between topic-partition\n tuples and corresponding latest offset value, \n e.g.\n {\n (\"topic-1\", 0): 1,\n (\"topic-1\", 1): 3,\n (\"topic-2\", 1): 10,\n ...\n }\n \"\"\"\n```\n\n\n## Configurations\n\nThe extension configurations can be set in a nameko [config.yaml]((https://docs.nameko.io/en/stable/cli.html)) file, or \nby environment variables.\n\n### Config File\n\n```yaml\n# Config for entrypoint\nKAFKA_CONSUMER:\n bootstrap_servers: 'localhost:1234'\n retry_backoff_ms: 100\n ...\n\n# Config for dependency\nKAFKA_PRODUCER:\n bootstrap_servers: 'localhost:1234'\n retries: 3\n ...\n```\n\n### Environment Variables\n\n```.env\n# Config for entrypoint\nKAFKA_CONSUMER='{\"bootstrap_servers\": \"localhost:1234\", \"retry_backoff_ms\": 100}'\n\n# Config for dependency\nKAFKA_PRODUCER='{\"bootstrap_servers\": \"localhost:1234\", \"retries\": 3}'\n```\n\n## Milestones\n\n- [x] Kafka Entrypoint\n- [x] Kafka Dependency\n- [x] Commit strategies: \n - _ALMOST_ONCE_DELIVERY_\n - _AT_LEAST_ONCE_DELIVERY_ \n - _EXACTLY_ONCE_DELIVERY_\n- [x] Commit storage for _EXACT_ONCE_DELIVERY_ strategy\n\n## Developers\n\nFor development a kafka broker is required. You can spawn one using the [docker-compose.yml](https://github.com/ketgo/nameko-kafka/blob/master/tests/conftest.py) \nfile in the `tests` folder:\n```bash\n$ cd tests\n$ docker-compose up -d \n```\n\nTo install all package dependencies:\n```bash\n$ pip install -r .[dev]\nor\n$ make deps\n```\n\nOther useful commands:\n```bash\n$ pytest --cov=nameko_kafka tests/\t\t\t# to get coverage report\nor\n$ make coverage\n\n$ pylint nameko_kafka # to check code quality with PyLint\nor\n$ make lint\n```\n\n## Contributions\n\nIssue reports and Pull requests are always welcomed. Thanks!\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Kafka extension for Nameko microservice framework",
"version": "0.2.1",
"project_urls": {
"Homepage": "https://github.com/ketgo/nameko-kafka"
},
"split_keywords": [
"nameko",
"kafka",
"microservice"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "50b228aef78afcd414892ce1d743817f90ec6993f6ec54fe7d356961faa058cd",
"md5": "3b51e0b403d718553f4d766e23785cd8",
"sha256": "eaa05b19ce3222bc9bb1d1b2544f7f91454c8ff5f4fd8e856040e711b8c1d6ae"
},
"downloads": -1,
"filename": "nameko_kafka-0.2.1.tar.gz",
"has_sig": false,
"md5_digest": "3b51e0b403d718553f4d766e23785cd8",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.4",
"size": 15244,
"upload_time": "2023-07-12T02:26:38",
"upload_time_iso_8601": "2023-07-12T02:26:38.750735Z",
"url": "https://files.pythonhosted.org/packages/50/b2/28aef78afcd414892ce1d743817f90ec6993f6ec54fe7d356961faa058cd/nameko_kafka-0.2.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-07-12 02:26:38",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "ketgo",
"github_project": "nameko-kafka",
"travis_ci": true,
"coveralls": true,
"github_actions": false,
"lcname": "nameko-kafka"
}