nameko-kafka


Namenameko-kafka JSON
Version 0.2.1 PyPI version JSON
download
home_pagehttps://github.com/ketgo/nameko-kafka
SummaryKafka extension for Nameko microservice framework
upload_time2023-07-12 02:26:38
maintainer
docs_urlNone
authorKetan Goyal
requires_python>=3.4
licenseMIT
keywords nameko kafka microservice
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI
coveralls test coverage
            # Nameko-Kafka

[![Build Status](https://travis-ci.com/ketgo/nameko-kafka.svg?branch=master)](https://travis-ci.com/ketgo/nameko-kafka)
[![codecov.io](https://codecov.io/gh/ketgo/nameko-kafka/coverage.svg?branch=master)](https://codecov.io/gh/ketgo/nameko-kafka/coverage.svg?branch=master)
[![MIT licensed](https://img.shields.io/badge/License-MIT-yellow.svg)](LICENSE)
---

Kafka extension for [Nameko](https://www.nameko.io/) microservice framework. 

## Introduction

This is a Nameko microservice framework [extension](https://nameko.readthedocs.io/en/stable/key_concepts.html) to support 
Kafka entrypoint and dependency. The motivation behind the project is issue [569](https://github.com/nameko/nameko/issues/569). 
_Nameko-kafka_ provide a simple implementation of the entrypoint based on the approach by [calumpeterwebb](https://medium.com/@calumpeterwebb/nameko-tutorial-creating-a-kafka-consuming-microservice-c4a7adb804d0).
It also includes a dependency provider for publishing Kafka messages from within a Nameko service.

## Installation

The package is supports Python >= 3.5
```bash
$ pip install nameko-kafka
```

## Usage

The extension can be used for both, a service dependency and entrypoint. Example usage for both cases are shown in the
following sections.

## Dependency

This is basically a [python-kafka](https://github.com/dpkp/kafka-python) producer in the form of Nameko dependency. 
Nameko uses dependency injection to instantiate the producer. You just need to declare it in your service class as shown:

```python
from nameko.rpc import rpc
from nameko_kafka import KafkaProducer


class MyService:
    """
        My microservice
    """
    name = "my-service"
    # Kafak dependency
    producer = KafkaProducer(bootstrap_servers='localhost:1234')
    
    @rpc
    def method(self):
        # Publish message using dependency
        self.producer.send("kafka-topic", value=b"my-message", key=b"my-key")
```

Here `KafkaProducer` accepts all options valid for `python-kafka`'s [KafkaProducer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html).

### Entrypoint

You can use the `nameko_kafka.consume` decorator in your services to process Kafka messages:

```python
from nameko_kafka import consume


class MyService:
    """
        My microservice 
    """
    name = "my-service"

    @consume("kafka-topic", group_id="my-group", bootstrap_servers='localhost:1234')
    def method(self, message):
        # Your message handler
        handle_message(message) 
```

The `consume` decorator accepts all the options valid for `python-kafka`'s [KafkaConsumer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html). 

On top of the default `python-kafka`'s autocommit feature, the entrypoint also comes with support for three different 
types of offset commit strategies: _at least once_, _at most once_ and _exactly once_. The three strategies correspond 
to the different message delivery semantics achievable in Kafka. Examples for each are shown in the following subsections.

#### At Least Once

```python
from nameko_kafka import consume, Semantic


class MyService:
    """
        My microservice 
    """
    name = "my-service"
    
    # At least once semantic consumer
    @consume("kafka-topic", group_id="my-group", bootstrap_servers='localhost:1234', semantic=Semantic.AT_LEAST_ONCE)
    def method(self, message):
        # Your message handler
        handle_message(message) 
```

#### At Most Once

```python
from nameko_kafka import consume, Semantic


class MyService:
    """
        My microservice 
    """
    name = "my-service"
    
    # At most once semantic consumer
    @consume("kafka-topic", group_id="my-group", bootstrap_servers='localhost:1234', semantic=Semantic.AT_MOST_ONCE)
    def method(self, message):
        # Your message handler
        handle_message(message) 
```

#### Exactly Once

The exactly once semantic requires a persistent storage to save message offsets. Such a persistent store can be 
implemented using the `OffsetStorage` interface provided by Nameko-kafka. There can be various backend implementations 
like RDBMS, NoSQL databases, etc. Support for some comes out of the box:

##### MongoDB Storage 

```python
from nameko_kafka import consume, Semantic
from nameko_kafka.storage import MongoStorage

from pymongo import MongoClient


class MyService:
    """
        My microservice 
    """
    name = "my-service"
    
    # At most once semantic consumer
    @consume(
        "kafka-topic", 
        group_id="my-group", 
        bootstrap_servers='localhost:1234', 
        semantic=Semantic.EXACTLY_ONCE,
        storage=MongoStorage(
            # MongoDB backend client
            client=MongoClient('localhost', 27017),
            # Database to use for storage
            db_name="database-name",
            # Collection to use for storage
            collection="collection-name"
        )       
    )
    def method(self, message):
        # Your message handler
        handle_message(message) 
```

Note: If the `db_name` and `collection` arguments are not specified, the default value of `"nameko_kafka_offsets"` and 
`"offsets"` will be used by the storage respectively.

##### SQL Storage

Part of v0.3.0

##### S3 Storage

Part of v0.4.0

##### Azure Block Storage

Part of v0.5.0

##### Create Custom Storage

You can create your own offset storage by implementing the `OffsetStorage` interface. It exposes the following methods:

```python
from nameko_kafka.storage.base import OffsetStorage

class MyStorage(OffsetStorage):
    """
        My custom offset storage.
    """

    def setup(self):
        """
            Method for setup of the storage.
        """

    def stop(self):
        """
            Method to teardown the storage.
        """

    def read(self, topic, partition):
        """
            Read last stored offset from storage for 
            given topic and partition.

            :param topic: message topic
            :param partition: partition number of the topic
            :returns: last committed offset value
        """

    def write(self, offsets):
        """
            Write offsets to storage.

            :param offsets: mapping between topic-partition
                tuples and corresponding latest offset value, 
                e.g.
                {
                    ("topic-1", 0): 1,
                    ("topic-1", 1): 3,
                    ("topic-2", 1): 10,
                    ...
                }
        """
```


## Configurations

The extension configurations can be set in a nameko [config.yaml]((https://docs.nameko.io/en/stable/cli.html)) file, or 
by environment variables.

### Config File

```yaml
# Config for entrypoint
KAFKA_CONSUMER:
  bootstrap_servers: 'localhost:1234'
  retry_backoff_ms: 100
  ...

# Config for dependency
KAFKA_PRODUCER:
  bootstrap_servers: 'localhost:1234'
  retries: 3
  ...
```

### Environment Variables

```.env
# Config for entrypoint
KAFKA_CONSUMER='{"bootstrap_servers": "localhost:1234", "retry_backoff_ms": 100}'

# Config for dependency
KAFKA_PRODUCER='{"bootstrap_servers": "localhost:1234", "retries": 3}'
```

## Milestones

- [x] Kafka Entrypoint
- [x] Kafka Dependency
- [x] Commit strategies: 
    - _ALMOST_ONCE_DELIVERY_
    - _AT_LEAST_ONCE_DELIVERY_ 
    - _EXACTLY_ONCE_DELIVERY_
- [x] Commit storage for _EXACT_ONCE_DELIVERY_ strategy

## Developers

For development a kafka broker is required. You can spawn one using the [docker-compose.yml](https://github.com/ketgo/nameko-kafka/blob/master/tests/conftest.py) 
file in the `tests` folder:
```bash
$ cd tests
$ docker-compose up -d 
```

To install all package dependencies:
```bash
$ pip install -r .[dev]
or
$ make deps
```

Other useful commands:
```bash
$ pytest --cov=nameko_kafka tests/			# to get coverage report
or
$ make coverage

$ pylint nameko_kafka       # to check code quality with PyLint
or
$ make lint
```

## Contributions

Issue reports and Pull requests are always welcomed. Thanks!

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/ketgo/nameko-kafka",
    "name": "nameko-kafka",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.4",
    "maintainer_email": "",
    "keywords": "nameko,kafka,microservice",
    "author": "Ketan Goyal",
    "author_email": "ketangoyal1988@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/50/b2/28aef78afcd414892ce1d743817f90ec6993f6ec54fe7d356961faa058cd/nameko_kafka-0.2.1.tar.gz",
    "platform": null,
    "description": "# Nameko-Kafka\n\n[![Build Status](https://travis-ci.com/ketgo/nameko-kafka.svg?branch=master)](https://travis-ci.com/ketgo/nameko-kafka)\n[![codecov.io](https://codecov.io/gh/ketgo/nameko-kafka/coverage.svg?branch=master)](https://codecov.io/gh/ketgo/nameko-kafka/coverage.svg?branch=master)\n[![MIT licensed](https://img.shields.io/badge/License-MIT-yellow.svg)](LICENSE)\n---\n\nKafka extension for [Nameko](https://www.nameko.io/) microservice framework. \n\n## Introduction\n\nThis is a Nameko microservice framework [extension](https://nameko.readthedocs.io/en/stable/key_concepts.html) to support \nKafka entrypoint and dependency. The motivation behind the project is issue [569](https://github.com/nameko/nameko/issues/569). \n_Nameko-kafka_ provide a simple implementation of the entrypoint based on the approach by [calumpeterwebb](https://medium.com/@calumpeterwebb/nameko-tutorial-creating-a-kafka-consuming-microservice-c4a7adb804d0).\nIt also includes a dependency provider for publishing Kafka messages from within a Nameko service.\n\n## Installation\n\nThe package is supports Python >= 3.5\n```bash\n$ pip install nameko-kafka\n```\n\n## Usage\n\nThe extension can be used for both, a service dependency and entrypoint. Example usage for both cases are shown in the\nfollowing sections.\n\n## Dependency\n\nThis is basically a [python-kafka](https://github.com/dpkp/kafka-python) producer in the form of Nameko dependency. \nNameko uses dependency injection to instantiate the producer. You just need to declare it in your service class as shown:\n\n```python\nfrom nameko.rpc import rpc\nfrom nameko_kafka import KafkaProducer\n\n\nclass MyService:\n    \"\"\"\n        My microservice\n    \"\"\"\n    name = \"my-service\"\n    # Kafak dependency\n    producer = KafkaProducer(bootstrap_servers='localhost:1234')\n    \n    @rpc\n    def method(self):\n        # Publish message using dependency\n        self.producer.send(\"kafka-topic\", value=b\"my-message\", key=b\"my-key\")\n```\n\nHere `KafkaProducer` accepts all options valid for `python-kafka`'s [KafkaProducer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html).\n\n### Entrypoint\n\nYou can use the `nameko_kafka.consume` decorator in your services to process Kafka messages:\n\n```python\nfrom nameko_kafka import consume\n\n\nclass MyService:\n    \"\"\"\n        My microservice \n    \"\"\"\n    name = \"my-service\"\n\n    @consume(\"kafka-topic\", group_id=\"my-group\", bootstrap_servers='localhost:1234')\n    def method(self, message):\n        # Your message handler\n        handle_message(message) \n```\n\nThe `consume` decorator accepts all the options valid for `python-kafka`'s [KafkaConsumer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html). \n\nOn top of the default `python-kafka`'s autocommit feature, the entrypoint also comes with support for three different \ntypes of offset commit strategies: _at least once_, _at most once_ and _exactly once_. The three strategies correspond \nto the different message delivery semantics achievable in Kafka. Examples for each are shown in the following subsections.\n\n#### At Least Once\n\n```python\nfrom nameko_kafka import consume, Semantic\n\n\nclass MyService:\n    \"\"\"\n        My microservice \n    \"\"\"\n    name = \"my-service\"\n    \n    # At least once semantic consumer\n    @consume(\"kafka-topic\", group_id=\"my-group\", bootstrap_servers='localhost:1234', semantic=Semantic.AT_LEAST_ONCE)\n    def method(self, message):\n        # Your message handler\n        handle_message(message) \n```\n\n#### At Most Once\n\n```python\nfrom nameko_kafka import consume, Semantic\n\n\nclass MyService:\n    \"\"\"\n        My microservice \n    \"\"\"\n    name = \"my-service\"\n    \n    # At most once semantic consumer\n    @consume(\"kafka-topic\", group_id=\"my-group\", bootstrap_servers='localhost:1234', semantic=Semantic.AT_MOST_ONCE)\n    def method(self, message):\n        # Your message handler\n        handle_message(message) \n```\n\n#### Exactly Once\n\nThe exactly once semantic requires a persistent storage to save message offsets. Such a persistent store can be \nimplemented using the `OffsetStorage` interface provided by Nameko-kafka. There can be various backend implementations \nlike RDBMS, NoSQL databases, etc. Support for some comes out of the box:\n\n##### MongoDB Storage \n\n```python\nfrom nameko_kafka import consume, Semantic\nfrom nameko_kafka.storage import MongoStorage\n\nfrom pymongo import MongoClient\n\n\nclass MyService:\n    \"\"\"\n        My microservice \n    \"\"\"\n    name = \"my-service\"\n    \n    # At most once semantic consumer\n    @consume(\n        \"kafka-topic\", \n        group_id=\"my-group\", \n        bootstrap_servers='localhost:1234', \n        semantic=Semantic.EXACTLY_ONCE,\n        storage=MongoStorage(\n            # MongoDB backend client\n            client=MongoClient('localhost', 27017),\n            # Database to use for storage\n            db_name=\"database-name\",\n            # Collection to use for storage\n            collection=\"collection-name\"\n        )       \n    )\n    def method(self, message):\n        # Your message handler\n        handle_message(message) \n```\n\nNote: If the `db_name` and `collection` arguments are not specified, the default value of `\"nameko_kafka_offsets\"` and \n`\"offsets\"` will be used by the storage respectively.\n\n##### SQL Storage\n\nPart of v0.3.0\n\n##### S3 Storage\n\nPart of v0.4.0\n\n##### Azure Block Storage\n\nPart of v0.5.0\n\n##### Create Custom Storage\n\nYou can create your own offset storage by implementing the `OffsetStorage` interface. It exposes the following methods:\n\n```python\nfrom nameko_kafka.storage.base import OffsetStorage\n\nclass MyStorage(OffsetStorage):\n    \"\"\"\n        My custom offset storage.\n    \"\"\"\n\n    def setup(self):\n        \"\"\"\n            Method for setup of the storage.\n        \"\"\"\n\n    def stop(self):\n        \"\"\"\n            Method to teardown the storage.\n        \"\"\"\n\n    def read(self, topic, partition):\n        \"\"\"\n            Read last stored offset from storage for \n            given topic and partition.\n\n            :param topic: message topic\n            :param partition: partition number of the topic\n            :returns: last committed offset value\n        \"\"\"\n\n    def write(self, offsets):\n        \"\"\"\n            Write offsets to storage.\n\n            :param offsets: mapping between topic-partition\n                tuples and corresponding latest offset value, \n                e.g.\n                {\n                    (\"topic-1\", 0): 1,\n                    (\"topic-1\", 1): 3,\n                    (\"topic-2\", 1): 10,\n                    ...\n                }\n        \"\"\"\n```\n\n\n## Configurations\n\nThe extension configurations can be set in a nameko [config.yaml]((https://docs.nameko.io/en/stable/cli.html)) file, or \nby environment variables.\n\n### Config File\n\n```yaml\n# Config for entrypoint\nKAFKA_CONSUMER:\n  bootstrap_servers: 'localhost:1234'\n  retry_backoff_ms: 100\n  ...\n\n# Config for dependency\nKAFKA_PRODUCER:\n  bootstrap_servers: 'localhost:1234'\n  retries: 3\n  ...\n```\n\n### Environment Variables\n\n```.env\n# Config for entrypoint\nKAFKA_CONSUMER='{\"bootstrap_servers\": \"localhost:1234\", \"retry_backoff_ms\": 100}'\n\n# Config for dependency\nKAFKA_PRODUCER='{\"bootstrap_servers\": \"localhost:1234\", \"retries\": 3}'\n```\n\n## Milestones\n\n- [x] Kafka Entrypoint\n- [x] Kafka Dependency\n- [x] Commit strategies: \n    - _ALMOST_ONCE_DELIVERY_\n    - _AT_LEAST_ONCE_DELIVERY_ \n    - _EXACTLY_ONCE_DELIVERY_\n- [x] Commit storage for _EXACT_ONCE_DELIVERY_ strategy\n\n## Developers\n\nFor development a kafka broker is required. You can spawn one using the [docker-compose.yml](https://github.com/ketgo/nameko-kafka/blob/master/tests/conftest.py) \nfile in the `tests` folder:\n```bash\n$ cd tests\n$ docker-compose up -d \n```\n\nTo install all package dependencies:\n```bash\n$ pip install -r .[dev]\nor\n$ make deps\n```\n\nOther useful commands:\n```bash\n$ pytest --cov=nameko_kafka tests/\t\t\t# to get coverage report\nor\n$ make coverage\n\n$ pylint nameko_kafka       # to check code quality with PyLint\nor\n$ make lint\n```\n\n## Contributions\n\nIssue reports and Pull requests are always welcomed. Thanks!\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Kafka extension for Nameko microservice framework",
    "version": "0.2.1",
    "project_urls": {
        "Homepage": "https://github.com/ketgo/nameko-kafka"
    },
    "split_keywords": [
        "nameko",
        "kafka",
        "microservice"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "50b228aef78afcd414892ce1d743817f90ec6993f6ec54fe7d356961faa058cd",
                "md5": "3b51e0b403d718553f4d766e23785cd8",
                "sha256": "eaa05b19ce3222bc9bb1d1b2544f7f91454c8ff5f4fd8e856040e711b8c1d6ae"
            },
            "downloads": -1,
            "filename": "nameko_kafka-0.2.1.tar.gz",
            "has_sig": false,
            "md5_digest": "3b51e0b403d718553f4d766e23785cd8",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.4",
            "size": 15244,
            "upload_time": "2023-07-12T02:26:38",
            "upload_time_iso_8601": "2023-07-12T02:26:38.750735Z",
            "url": "https://files.pythonhosted.org/packages/50/b2/28aef78afcd414892ce1d743817f90ec6993f6ec54fe7d356961faa058cd/nameko_kafka-0.2.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-07-12 02:26:38",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "ketgo",
    "github_project": "nameko-kafka",
    "travis_ci": true,
    "coveralls": true,
    "github_actions": false,
    "lcname": "nameko-kafka"
}
        
Elapsed time: 1.31742s