ce-mkc


Namece-mkc JSON
Version 0.0.2 PyPI version JSON
download
home_pageNone
SummaryA command line utility to set up Kafka, MongoDB, and MongoDB Kafka Connector environments.
upload_time2024-05-07 16:51:47
maintainerNone
docs_urlNone
authorYour Name
requires_python<3.10,>=3.9
licenseMIT
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # mkc 
#### A command line utility to set up Kafka, MongoDB, and MongoDB Kafka Connector environments.

mkc is a python script to create small stacks that pertain to MongoDB and the kafka connector. It uses docker-compose (specifically [this compose file](https://github.com/confluentinc/cp-all-in-one/blob/7.5.0-post/cp-all-in-one/docker-compose.yml) maintained by confluent) to achieve this. You can configure which pieces of the stack you want to deploy, which connectors (if applicable), and which schemas to register (if applicable).

### Setup mkc

#### 1. Install/Start Docker.

For more information, please visit [docker's installation page](https://docs.docker.com/engine/install/)

#### 2. Install Docker compose

```
curl -L "https://github.com/docker/compose/releases/download/1.23.2/docker-compose-$(uname -s)-$(uname -m)" -o docker-compose
sudo mv docker-compose /usr/local/bin && sudo chmod +x /usr/local/bin/docker-compose
sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose
```

Test with 

```
docker-compose -v
```

#### 3. Install python3
#### 4. Install packages:
````
pip install -r requirements.txt
````
#### 5. Script usage:
````
$ python3 mkc.py --help 
usage: mkc.py [-h] [--config CONFIG] [--connectors CONNECTORS]
              [--forceDownload] [--cleanOldFiles] [--logLevel LOGLEVEL]

Command line utility to set up Kafka, MongoDB, and MongoDB Kafka Connector
environments

optional arguments:
  -h, --help            show this help message and exit
  --config CONFIG       A string representing the path to the config.json file
  --connectors CONNECTORS
                        A comma separated string of paths to the configs
                        describing the MongoDB connectors to install
  --forceDownload       Include this flag to force mkc to download the mongodb
                        kafka connector
  --logLevel LOGLEVEL   Log level. Possible values are [none, info, verbose]

````

## Example

### Create config:

```
echo '{
    "zookeeperVersion" : "6.2.0",
    "kafkaVersion" : "6.2.0",
    "schemaRegistryVersion" : "6.2.0",
    "kafkaConnectVersion" : "6.2.0",
    "mongodbVersion" : "5.0.20",
    "akhqVersion" : "0.24.0"
}' >> config/config.test.json
```

### Run mkc:

```
python3 mkc --config config/config.test.json \
   --logLevel debug \
   --connectors  "config/connectors/sample.sink.connector.errol.json,config/connectors/sample.source.connector.errol.json" \
   --schemas "config/schemas/topic1.value.avro.json"
```

Note that this may take longer than usual the first time you run mkc on a machine, as it must pull down all docker images locally. 

### Once the script has run, check the containers running:
```
$ docker ps 
CONTAINER ID   IMAGE                                      COMMAND                   CREATED         STATUS                   PORTS                                                                NAMES
1dddf9756695   confluentinc/cp-kafka-connect-base:6.2.0   "bash -c 'echo \"Inst…"   3 minutes ago   Up 3 minutes (healthy)   0.0.0.0:8083->8083/tcp, 0.0.0.0:9201->9201/tcp, 9092/tcp             kafkaConnectContainer-mkc
a3baf7ebf92f   confluentinc/cp-schema-registry:6.2.0      "/etc/confluent/dock…"    3 minutes ago   Up 3 minutes             0.0.0.0:8081->8081/tcp                                               schemaRegistryContainer-mkc
92dad29f2f08   confluentinc/cp-kafka:6.2.0                "/etc/confluent/dock…"    3 minutes ago   Up 3 minutes             0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp                       kafkaBrokerContainer-mkc
d9627518aeb0   mongo:5.0.20                               "mongod --config /mo…"    3 minutes ago   Up 3 minutes             0.0.0.0:27017->27017/tcp                                             mongodContainer-mkc
c10e59470405   tchiotludo/akhq:0.24.0                     "docker-entrypoint.s…"    3 minutes ago   Up 3 minutes (healthy)   0.0.0.0:8082->8080/tcp                                               akhqContainer-mkc
fdc9f6e0eb5a   confluentinc/cp-zookeeper:6.2.0            "/etc/confluent/dock…"    3 minutes ago   Up 3 minutes             2888/tcp, 0.0.0.0:2181->2181/tcp, 0.0.0.0:9001->9001/tcp, 3888/tcp   zookeeperContainer-mkc
```

Here we have launched,
* Kafka (broker)
* Zookeeper for kafka broker
* The kafka schema registry server
* The kafka connect server 
* A container running akhq

On the connect server, we have several connectors running:

```
 $ curl -s -X GET -H "Content-Type:application/json" http://localhost:8083/connectors | jq . 
[
  "simple-sink",
  "mongo-source-connector"
]
```

One sink connector:
```
 $ curl -s -X GET -H "Content-Type:application/json" http://localhost:8083/connectors/simple-sink | jq . 
{
  "name": "simple-sink",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "mongo.errors.log.enable": "true",
    "namespace.mapper.value.collection.field": "ns.coll",
    "tasks.max": "1",
    "topics": "test",
    "namespace.mapper": "com.mongodb.kafka.connect.sink.namespace.mapping.FieldPathNamespaceMapper",
    "mongo.errors.tolerance": "all",
    "database": "landmark",
    "namespace.mapper.error.if.invalid": "true",
    "connection.uri": "mongodb+srv://<username>:<pass>@cluster0.bfbxg.mongodb.net/myFirstDatabase",
    "name": "simple-sink",
    "config.action.reload": "restart",
    "errors.log.enable": "true"
  },
  "tasks": [
    {
      "connector": "simple-sink",
      "task": 0
    }
  ],
  "type": "sink"
}
```

One source connector:
```
$ curl -s -X GET -H "Content-Type:application/json" http://localhost:8083/connectors/mongo-source-connector | jq .  
{
  "name": "mongo-source-connector",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "tasks.max": "1",
    "batch.size": "0",
    "change.stream.full.document": "updateLookup",
    "collection": "source",
    "pipeline": "[]",
    "database": "kafka",
    "topic.prefix": "mongo",
    "topic.separator": "-",
    "poll.await.time.ms": "5000",
    "connection.uri": "mongodb+srv://<username>:<pass>@cluster0.bfbxg.mongodb.net/myFirstDatabase",
    "name": "mongo-source-connector",
    "change.stream.full.document.before.change": "whenAvailable",
    "collation": "",
    "topic.suffix": "topic",
    "poll.max.batch.size": "1000"
  },
  "tasks": [
    {
      "connector": "mongo-source-connector",
      "task": 0
    }
  ],
  "type": "source"
}
```

We also have a schema in our schema registry:

```
curl -X GET http://localhost:8081/subjects | jq . 
[
  "source-value"
]
```

We can see the configured value matches the schema file:

```
curl -X GET http://localhost:8081/subjects/source-value/versions/latest | jq . 
{
  "subject": "source-value",
  "version": 1,
  "id": 1,
  "schema": "{\"type\":\"record\",\"name\":\"Purchase\",\"namespace\":\"io.confluent.developer.avro\",\"fields\":[{\"name\":\"item\",\"type\":\"string\"},{\"name\":\"totalCost\",\"type\":\"double\"},{\"name\":\"customerID\",\"type\":\"string\"},{\"name\":\"itemOptions\",\"type\":{\"type\":\"map\",\"values\":\"double\"}}]}"
}
```


### 6. Write to Kafka and Check Mongo

In this example, we will write to the kafka topic sink and it will be produced to MongoDB:

sink topic --> MongoDB Sink Connector (simple-mongo-sink) --> kafkaconnector.sink collection

Run the following to insert data into the sink topic in our broker:

```
python3 test/testKafkaProducer --bootstrapServers "localhost:9092" --topic "sink" --loglevel debug
```

Check that the data was written to MongoDB via mongosh:

```
mongosh --quiet --eval 'db.getSiblingDB("kafkaconnector").getCollection("sink").findOne()'
{
  _id: '539d483a-8482-481a-96bb-60704430b5ca',
  array: [ { field1: Long("1") }, { field1: 'String1' } ],
  keyConverterType: 'jsonconverter',
  keyConverterEnableSchemas: false
}
```
<br>

#### 7. Write to Mongo and Check Kafka

In this example, we will write to the kafkaconnector.source namespace. 

kafkaconnector.source --> MongoDB source connector (simple-mongo-source) --> mongo-kafkaconnector-source-topic
kafkaconnector.source --> MongoDB source connector (simple-mongo-source-fulldoconly) --> mongo-kafkaconnector-topic-source-fulldoc-only

Once the stack is up. Run the following insert statement via the mongosh command:

```
mongosh --quiet --eval 'db.getSiblingDB("kafkaconnector").getCollection("source").insertOne({"testMessage" : 1})'\
{
  acknowledged: true,
  insertedId: ObjectId("6532db9367a1a8a0b943c8d9")
}
```

Run the following to read data from our kafka topic:

```
python3 test/testKafkaConsumer --bootstrapServers "localhost:9092" --topics "mongo-kafkaconnector-source-topic" --groupId consumerGroup1 --loglevel debug
```

You should see an INFO log message displaying the documented produced from the connector to the topic:

```
INFO: Found message {"_id": {"_data": "826532DB93000000022B022C0100296E5A10049099BCB8684E41D0827CF2FD0F8B88A046645F696400646532DB9367A1A8A0B943C8D90004"}, "operationType": "insert", "clusterTime": {"$timestamp": {"t": 1697831827, "i": 2}}, "fullDocument": {"_id": {"$oid": "6532db9367a1a8a0b943c8d9"}, "testMessage": 1}, "ns": {"db": "kafkaconnector", "coll": "source"}, "documentKey": {"_id": {"$oid": "6532db9367a1a8a0b943c8d9"}}}
```

After that, the consumer will block waiting for new messages. You can kill it with [CTRL] + C
<br>
<br>

#### 8. Examine Setup with AKHQ

AKHQ is a web-based application for monitoring various parts of the confluent platform. Our container is bound to port 8082. To access it, please visit localhost:8082. You will see a place like this:

![img.png](img.png)

Here we can see the various topics created in our Kafka broker and configuration/architecture metadata. If we click on the magnifying glass icon for one of the topics (say, mongo-kafkaconnect-source-topic, for example), we can see information about the topic and data therein:  

![img_1.png](img_1.png)

If we click on the gear icon for that topic, we can set configurations:

![img_2.png](img_2.png)

We can also see logs, ACLS, anc consumer groups for this topic. 

One the left-hand side, we can select "Consumer Groups" to see all the consumer groups for all topics:

![img_3.png](img_3.png)

You can read more about AKHQ and its capabilities [here](https://akhq.io/).


<br>

#### 9. Shutting Down Stack

To shut down the stack, simply run docker-compose down on the docker-compose file in the 
present working directory:

```
docker-compose -f docker-compose.XXXX.yml down 
```
<br>

### Configuration

mkc allows you to configure it in several ways

#### Specify container configurations via --config flag  

Here you can specify which containers to launch and what versions. The containers possible are:

* Kafka Broker (& Zookeeper)
* Kafka Connect 
* Schema Registry 
* AKHQ 
* MongoDB
<br><br>
Omission of any of these will result in that particular container not being deployed. This allows users to only launch some pieces of the stack. 
For example, here is one sample configuration:
 
```
{
  "zookeeperVersion" : "6.2.0",
  "kafkaVersion" : "6.2.0",
  "schemaRegistryVersion" : "6.2.0",
  "kafkaConnectVersion" : "6.2.0",
  "kafkaCatVersion" : "1.6.0",
  "mongodbVersion" : "5.0.20",
  "akhqVersion" : "0.24.0"
}
```

<br>

#### Specify connectors via the --connectors flag <br><br>

This is a comma-delimited list of paths to config files specifying the following:
* The connector configuration
* The connector download URL
* The number of instances of this connector
<br><br>
Here is a sample config file:

```{
 "name" : "mongodb-kafka-connector",
 "version" : "1.10.0",
 "config" : {
   "name" : "simple-mongo-sink",
   "config" : {
    "connector.class" : "com.mongodb.kafka.connect.MongoSinkConnector",
    "database" : "simple-kafka-sink",
    "tasks.max" : 1,
    "topics" : "purchases",
    "connection.uri" : "mongodb://mongodContainer-mkc:27017/test?replicaSet=testRS",
    "mongo.errors.log.enable" : "true",
    "mongo.errors.tolerance" : "all",
    "config.action.reload" : "restart",
     "errors.log.enable" : "true",
     "key.converter" : "org.apache.kafka.connect.json.JsonConverter",
     "value.converter" : "org.apache.kafka.connect.json.JsonConverter",
     "key.converter.schemas.enable" : false,
     "value.converter.schemas.enable" : false
  }
 },
 "num" : 1,
 "downloadURL" : "https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/{version}/mongo-kafka-connect-{version}-all.jar"
}
```
<br>

#### Specify schemas via the --schema flag

This is a comma separated string of paths to the configs describing the schemas to register

One sample schema is :

```
{
  "schema" : {
    "type" : "record",
    "namespace" : "io.confluent.developer.avro",
    "name" : "Purchase",
    "fields" : [
      { "name" :  "item", "type" :  "string" },
      { "name" :  "totalCost", "type" :  "double" },
      { "name" :  "customerID", "type" :  "string" },
      { "name" :  "itemOptions", "type" :  {
            "type" : "map",
            "values" : "double"
        }
      }
    ]
  },
  "subjectName": "source",
  "keyOrValue" : "value"
}
```

Note that this must include the following fields:
* A "subjectName" field that indicates the subject for the schema,
* A "schema" field that indicates the schema itself (currently, only Avro is supported by mkc) 
* A "keyOrValue" field that indicates whether the schema is for a value or key

Following the creation, we can send a GET request to the schema registry to see the schema:

```
curl -X GET http://localhost:8081/subjects/source-value/versions/latest | jq . 
{
  "subject": "source-value",
  "version": 1,
  "id": 1,
  "schema": "{\"type\":\"record\",\"name\":\"Purchase\",\"namespace\":\"io.confluent.developer.avro\",\"fields\":[{\"name\":\"item\",\"type\":\"string\"},{\"name\":\"totalCost\",\"type\":\"double\"},{\"name\":\"customerID\",\"type\":\"string\"},{\"name\":\"itemOptions\",\"type\":{\"type\":\"map\",\"values\":\"double\"}}]}"
}
```

<br>

### Known Issues

#### Stopping due to exception Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))

This issue can occur when docker-compose cannot authenticate to dockerd. To resolve this, run the following command

```
sudo ln -s "$HOME/.docker/run/docker.sock" /var/run/docker.sock
```


## Contributing 

Please make all changes in the development branch. We will use the [gitflow workflow](https://veerasundar.com/blog/gitflow-animated/) with git for SCM:
1. There's a master branch.
2. You create a develop branch off of master. This develop branch is your bread and butter as most of your changes go in here.
3. feature and release branches are created off of develop.
4. Once you are done with feature, you merge it to develop.
5. Once you are done with release, you merge it to both develop and master. And you tag the release.
6. If there's a issue in production, you create a hotfix branch off of master.
7. Once hotfix is done, you merge it back to master and develop and tag the release.


## TODO
* Switch to docker sdk, individually launching containers
* Ability to have multiple kafka connect workers to test fault tolerance
* Stack needs to automatically preclude zookeeper for newer versions of kafka

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "ce-mkc",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<3.10,>=3.9",
    "maintainer_email": null,
    "keywords": null,
    "author": "Your Name",
    "author_email": "you@example.com",
    "download_url": "https://files.pythonhosted.org/packages/f8/73/74d27a2bf2eb8e253939836d119558238549569dd32e89ab0a1f433176c0/ce_mkc-0.0.2.tar.gz",
    "platform": null,
    "description": "# mkc \n#### A command line utility to set up Kafka, MongoDB, and MongoDB Kafka Connector environments.\n\nmkc is a python script to create small stacks that pertain to MongoDB and the kafka connector. It uses docker-compose (specifically [this compose file](https://github.com/confluentinc/cp-all-in-one/blob/7.5.0-post/cp-all-in-one/docker-compose.yml) maintained by confluent) to achieve this. You can configure which pieces of the stack you want to deploy, which connectors (if applicable), and which schemas to register (if applicable).\n\n### Setup mkc\n\n#### 1. Install/Start Docker.\n\nFor more information, please visit [docker's installation page](https://docs.docker.com/engine/install/)\n\n#### 2. Install Docker compose\n\n```\ncurl -L \"https://github.com/docker/compose/releases/download/1.23.2/docker-compose-$(uname -s)-$(uname -m)\" -o docker-compose\nsudo mv docker-compose /usr/local/bin && sudo chmod +x /usr/local/bin/docker-compose\nsudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose\n```\n\nTest with \n\n```\ndocker-compose -v\n```\n\n#### 3. Install python3\n#### 4. Install packages:\n````\npip install -r requirements.txt\n````\n#### 5. Script usage:\n````\n$ python3 mkc.py --help \nusage: mkc.py [-h] [--config CONFIG] [--connectors CONNECTORS]\n              [--forceDownload] [--cleanOldFiles] [--logLevel LOGLEVEL]\n\nCommand line utility to set up Kafka, MongoDB, and MongoDB Kafka Connector\nenvironments\n\noptional arguments:\n  -h, --help            show this help message and exit\n  --config CONFIG       A string representing the path to the config.json file\n  --connectors CONNECTORS\n                        A comma separated string of paths to the configs\n                        describing the MongoDB connectors to install\n  --forceDownload       Include this flag to force mkc to download the mongodb\n                        kafka connector\n  --logLevel LOGLEVEL   Log level. Possible values are [none, info, verbose]\n\n````\n\n## Example\n\n### Create config:\n\n```\necho '{\n    \"zookeeperVersion\" : \"6.2.0\",\n    \"kafkaVersion\" : \"6.2.0\",\n    \"schemaRegistryVersion\" : \"6.2.0\",\n    \"kafkaConnectVersion\" : \"6.2.0\",\n    \"mongodbVersion\" : \"5.0.20\",\n    \"akhqVersion\" : \"0.24.0\"\n}' >> config/config.test.json\n```\n\n### Run mkc:\n\n```\npython3 mkc --config config/config.test.json \\\n   --logLevel debug \\\n   --connectors  \"config/connectors/sample.sink.connector.errol.json,config/connectors/sample.source.connector.errol.json\" \\\n   --schemas \"config/schemas/topic1.value.avro.json\"\n```\n\nNote that this may take longer than usual the first time you run mkc on a machine, as it must pull down all docker images locally. \n\n### Once the script has run, check the containers running:\n```\n$ docker ps \nCONTAINER ID   IMAGE                                      COMMAND                   CREATED         STATUS                   PORTS                                                                NAMES\n1dddf9756695   confluentinc/cp-kafka-connect-base:6.2.0   \"bash -c 'echo \\\"Inst\u2026\"   3 minutes ago   Up 3 minutes (healthy)   0.0.0.0:8083->8083/tcp, 0.0.0.0:9201->9201/tcp, 9092/tcp             kafkaConnectContainer-mkc\na3baf7ebf92f   confluentinc/cp-schema-registry:6.2.0      \"/etc/confluent/dock\u2026\"    3 minutes ago   Up 3 minutes             0.0.0.0:8081->8081/tcp                                               schemaRegistryContainer-mkc\n92dad29f2f08   confluentinc/cp-kafka:6.2.0                \"/etc/confluent/dock\u2026\"    3 minutes ago   Up 3 minutes             0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp                       kafkaBrokerContainer-mkc\nd9627518aeb0   mongo:5.0.20                               \"mongod --config /mo\u2026\"    3 minutes ago   Up 3 minutes             0.0.0.0:27017->27017/tcp                                             mongodContainer-mkc\nc10e59470405   tchiotludo/akhq:0.24.0                     \"docker-entrypoint.s\u2026\"    3 minutes ago   Up 3 minutes (healthy)   0.0.0.0:8082->8080/tcp                                               akhqContainer-mkc\nfdc9f6e0eb5a   confluentinc/cp-zookeeper:6.2.0            \"/etc/confluent/dock\u2026\"    3 minutes ago   Up 3 minutes             2888/tcp, 0.0.0.0:2181->2181/tcp, 0.0.0.0:9001->9001/tcp, 3888/tcp   zookeeperContainer-mkc\n```\n\nHere we have launched,\n* Kafka (broker)\n* Zookeeper for kafka broker\n* The kafka schema registry server\n* The kafka connect server \n* A container running akhq\n\nOn the connect server, we have several connectors running:\n\n```\n $ curl -s -X GET -H \"Content-Type:application/json\" http://localhost:8083/connectors | jq . \n[\n  \"simple-sink\",\n  \"mongo-source-connector\"\n]\n```\n\nOne sink connector:\n```\n $ curl -s -X GET -H \"Content-Type:application/json\" http://localhost:8083/connectors/simple-sink | jq . \n{\n  \"name\": \"simple-sink\",\n  \"config\": {\n    \"connector.class\": \"com.mongodb.kafka.connect.MongoSinkConnector\",\n    \"mongo.errors.log.enable\": \"true\",\n    \"namespace.mapper.value.collection.field\": \"ns.coll\",\n    \"tasks.max\": \"1\",\n    \"topics\": \"test\",\n    \"namespace.mapper\": \"com.mongodb.kafka.connect.sink.namespace.mapping.FieldPathNamespaceMapper\",\n    \"mongo.errors.tolerance\": \"all\",\n    \"database\": \"landmark\",\n    \"namespace.mapper.error.if.invalid\": \"true\",\n    \"connection.uri\": \"mongodb+srv://<username>:<pass>@cluster0.bfbxg.mongodb.net/myFirstDatabase\",\n    \"name\": \"simple-sink\",\n    \"config.action.reload\": \"restart\",\n    \"errors.log.enable\": \"true\"\n  },\n  \"tasks\": [\n    {\n      \"connector\": \"simple-sink\",\n      \"task\": 0\n    }\n  ],\n  \"type\": \"sink\"\n}\n```\n\nOne source connector:\n```\n$ curl -s -X GET -H \"Content-Type:application/json\" http://localhost:8083/connectors/mongo-source-connector | jq .  \n{\n  \"name\": \"mongo-source-connector\",\n  \"config\": {\n    \"connector.class\": \"com.mongodb.kafka.connect.MongoSourceConnector\",\n    \"tasks.max\": \"1\",\n    \"batch.size\": \"0\",\n    \"change.stream.full.document\": \"updateLookup\",\n    \"collection\": \"source\",\n    \"pipeline\": \"[]\",\n    \"database\": \"kafka\",\n    \"topic.prefix\": \"mongo\",\n    \"topic.separator\": \"-\",\n    \"poll.await.time.ms\": \"5000\",\n    \"connection.uri\": \"mongodb+srv://<username>:<pass>@cluster0.bfbxg.mongodb.net/myFirstDatabase\",\n    \"name\": \"mongo-source-connector\",\n    \"change.stream.full.document.before.change\": \"whenAvailable\",\n    \"collation\": \"\",\n    \"topic.suffix\": \"topic\",\n    \"poll.max.batch.size\": \"1000\"\n  },\n  \"tasks\": [\n    {\n      \"connector\": \"mongo-source-connector\",\n      \"task\": 0\n    }\n  ],\n  \"type\": \"source\"\n}\n```\n\nWe also have a schema in our schema registry:\n\n```\ncurl -X GET http://localhost:8081/subjects | jq . \n[\n  \"source-value\"\n]\n```\n\nWe can see the configured value matches the schema file:\n\n```\ncurl -X GET http://localhost:8081/subjects/source-value/versions/latest | jq . \n{\n  \"subject\": \"source-value\",\n  \"version\": 1,\n  \"id\": 1,\n  \"schema\": \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"Purchase\\\",\\\"namespace\\\":\\\"io.confluent.developer.avro\\\",\\\"fields\\\":[{\\\"name\\\":\\\"item\\\",\\\"type\\\":\\\"string\\\"},{\\\"name\\\":\\\"totalCost\\\",\\\"type\\\":\\\"double\\\"},{\\\"name\\\":\\\"customerID\\\",\\\"type\\\":\\\"string\\\"},{\\\"name\\\":\\\"itemOptions\\\",\\\"type\\\":{\\\"type\\\":\\\"map\\\",\\\"values\\\":\\\"double\\\"}}]}\"\n}\n```\n\n\n### 6. Write to Kafka and Check Mongo\n\nIn this example, we will write to the kafka topic sink and it will be produced to MongoDB:\n\nsink topic --> MongoDB Sink Connector (simple-mongo-sink) --> kafkaconnector.sink collection\n\nRun the following to insert data into the sink topic in our broker:\n\n```\npython3 test/testKafkaProducer --bootstrapServers \"localhost:9092\" --topic \"sink\" --loglevel debug\n```\n\nCheck that the data was written to MongoDB via mongosh:\n\n```\nmongosh --quiet --eval 'db.getSiblingDB(\"kafkaconnector\").getCollection(\"sink\").findOne()'\n{\n  _id: '539d483a-8482-481a-96bb-60704430b5ca',\n  array: [ { field1: Long(\"1\") }, { field1: 'String1' } ],\n  keyConverterType: 'jsonconverter',\n  keyConverterEnableSchemas: false\n}\n```\n<br>\n\n#### 7. Write to Mongo and Check Kafka\n\nIn this example, we will write to the kafkaconnector.source namespace. \n\nkafkaconnector.source --> MongoDB source connector (simple-mongo-source) --> mongo-kafkaconnector-source-topic\nkafkaconnector.source --> MongoDB source connector (simple-mongo-source-fulldoconly) --> mongo-kafkaconnector-topic-source-fulldoc-only\n\nOnce the stack is up. Run the following insert statement via the mongosh command:\n\n```\nmongosh --quiet --eval 'db.getSiblingDB(\"kafkaconnector\").getCollection(\"source\").insertOne({\"testMessage\" : 1})'\\\n{\n  acknowledged: true,\n  insertedId: ObjectId(\"6532db9367a1a8a0b943c8d9\")\n}\n```\n\nRun the following to read data from our kafka topic:\n\n```\npython3 test/testKafkaConsumer --bootstrapServers \"localhost:9092\" --topics \"mongo-kafkaconnector-source-topic\" --groupId consumerGroup1 --loglevel debug\n```\n\nYou should see an INFO log message displaying the documented produced from the connector to the topic:\n\n```\nINFO: Found message {\"_id\": {\"_data\": \"826532DB93000000022B022C0100296E5A10049099BCB8684E41D0827CF2FD0F8B88A046645F696400646532DB9367A1A8A0B943C8D90004\"}, \"operationType\": \"insert\", \"clusterTime\": {\"$timestamp\": {\"t\": 1697831827, \"i\": 2}}, \"fullDocument\": {\"_id\": {\"$oid\": \"6532db9367a1a8a0b943c8d9\"}, \"testMessage\": 1}, \"ns\": {\"db\": \"kafkaconnector\", \"coll\": \"source\"}, \"documentKey\": {\"_id\": {\"$oid\": \"6532db9367a1a8a0b943c8d9\"}}}\n```\n\nAfter that, the consumer will block waiting for new messages. You can kill it with [CTRL] + C\n<br>\n<br>\n\n#### 8. Examine Setup with AKHQ\n\nAKHQ is a web-based application for monitoring various parts of the confluent platform. Our container is bound to port 8082. To access it, please visit localhost:8082. You will see a place like this:\n\n![img.png](img.png)\n\nHere we can see the various topics created in our Kafka broker and configuration/architecture metadata. If we click on the magnifying glass icon for one of the topics (say, mongo-kafkaconnect-source-topic, for example), we can see information about the topic and data therein:  \n\n![img_1.png](img_1.png)\n\nIf we click on the gear icon for that topic, we can set configurations:\n\n![img_2.png](img_2.png)\n\nWe can also see logs, ACLS, anc consumer groups for this topic. \n\nOne the left-hand side, we can select \"Consumer Groups\" to see all the consumer groups for all topics:\n\n![img_3.png](img_3.png)\n\nYou can read more about AKHQ and its capabilities [here](https://akhq.io/).\n\n\n<br>\n\n#### 9. Shutting Down Stack\n\nTo shut down the stack, simply run docker-compose down on the docker-compose file in the \npresent working directory:\n\n```\ndocker-compose -f docker-compose.XXXX.yml down \n```\n<br>\n\n### Configuration\n\nmkc allows you to configure it in several ways\n\n#### Specify container configurations via --config flag  \n\nHere you can specify which containers to launch and what versions. The containers possible are:\n\n* Kafka Broker (& Zookeeper)\n* Kafka Connect \n* Schema Registry \n* AKHQ \n* MongoDB\n<br><br>\nOmission of any of these will result in that particular container not being deployed. This allows users to only launch some pieces of the stack. \nFor example, here is one sample configuration:\n \n```\n{\n  \"zookeeperVersion\" : \"6.2.0\",\n  \"kafkaVersion\" : \"6.2.0\",\n  \"schemaRegistryVersion\" : \"6.2.0\",\n  \"kafkaConnectVersion\" : \"6.2.0\",\n  \"kafkaCatVersion\" : \"1.6.0\",\n  \"mongodbVersion\" : \"5.0.20\",\n  \"akhqVersion\" : \"0.24.0\"\n}\n```\n\n<br>\n\n#### Specify connectors via the --connectors flag <br><br>\n\nThis is a comma-delimited list of paths to config files specifying the following:\n* The connector configuration\n* The connector download URL\n* The number of instances of this connector\n<br><br>\nHere is a sample config file:\n\n```{\n \"name\" : \"mongodb-kafka-connector\",\n \"version\" : \"1.10.0\",\n \"config\" : {\n   \"name\" : \"simple-mongo-sink\",\n   \"config\" : {\n    \"connector.class\" : \"com.mongodb.kafka.connect.MongoSinkConnector\",\n    \"database\" : \"simple-kafka-sink\",\n    \"tasks.max\" : 1,\n    \"topics\" : \"purchases\",\n    \"connection.uri\" : \"mongodb://mongodContainer-mkc:27017/test?replicaSet=testRS\",\n    \"mongo.errors.log.enable\" : \"true\",\n    \"mongo.errors.tolerance\" : \"all\",\n    \"config.action.reload\" : \"restart\",\n     \"errors.log.enable\" : \"true\",\n     \"key.converter\" : \"org.apache.kafka.connect.json.JsonConverter\",\n     \"value.converter\" : \"org.apache.kafka.connect.json.JsonConverter\",\n     \"key.converter.schemas.enable\" : false,\n     \"value.converter.schemas.enable\" : false\n  }\n },\n \"num\" : 1,\n \"downloadURL\" : \"https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/{version}/mongo-kafka-connect-{version}-all.jar\"\n}\n```\n<br>\n\n#### Specify schemas via the --schema flag\n\nThis is a comma separated string of paths to the configs describing the schemas to register\n\nOne sample schema is :\n\n```\n{\n  \"schema\" : {\n    \"type\" : \"record\",\n    \"namespace\" : \"io.confluent.developer.avro\",\n    \"name\" : \"Purchase\",\n    \"fields\" : [\n      { \"name\" :  \"item\", \"type\" :  \"string\" },\n      { \"name\" :  \"totalCost\", \"type\" :  \"double\" },\n      { \"name\" :  \"customerID\", \"type\" :  \"string\" },\n      { \"name\" :  \"itemOptions\", \"type\" :  {\n            \"type\" : \"map\",\n            \"values\" : \"double\"\n        }\n      }\n    ]\n  },\n  \"subjectName\": \"source\",\n  \"keyOrValue\" : \"value\"\n}\n```\n\nNote that this must include the following fields:\n* A \"subjectName\" field that indicates the subject for the schema,\n* A \"schema\" field that indicates the schema itself (currently, only Avro is supported by mkc) \n* A \"keyOrValue\" field that indicates whether the schema is for a value or key\n\nFollowing the creation, we can send a GET request to the schema registry to see the schema:\n\n```\ncurl -X GET http://localhost:8081/subjects/source-value/versions/latest | jq . \n{\n  \"subject\": \"source-value\",\n  \"version\": 1,\n  \"id\": 1,\n  \"schema\": \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"Purchase\\\",\\\"namespace\\\":\\\"io.confluent.developer.avro\\\",\\\"fields\\\":[{\\\"name\\\":\\\"item\\\",\\\"type\\\":\\\"string\\\"},{\\\"name\\\":\\\"totalCost\\\",\\\"type\\\":\\\"double\\\"},{\\\"name\\\":\\\"customerID\\\",\\\"type\\\":\\\"string\\\"},{\\\"name\\\":\\\"itemOptions\\\",\\\"type\\\":{\\\"type\\\":\\\"map\\\",\\\"values\\\":\\\"double\\\"}}]}\"\n}\n```\n\n<br>\n\n### Known Issues\n\n#### Stopping due to exception Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))\n\nThis issue can occur when docker-compose cannot authenticate to dockerd. To resolve this, run the following command\n\n```\nsudo ln -s \"$HOME/.docker/run/docker.sock\" /var/run/docker.sock\n```\n\n\n## Contributing \n\nPlease make all changes in the development branch. We will use the [gitflow workflow](https://veerasundar.com/blog/gitflow-animated/) with git for SCM:\n1. There's a master branch.\n2. You create a develop branch off of master. This develop branch is your bread and butter as most of your changes go in here.\n3. feature and release branches are created off of develop.\n4. Once you are done with feature, you merge it to develop.\n5. Once you are done with release, you merge it to both develop and master. And you tag the release.\n6. If there's a issue in production, you create a hotfix branch off of master.\n7. Once hotfix is done, you merge it back to master and develop and tag the release.\n\n\n## TODO\n* Switch to docker sdk, individually launching containers\n* Ability to have multiple kafka connect workers to test fault tolerance\n* Stack needs to automatically preclude zookeeper for newer versions of kafka\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "A command line utility to set up Kafka, MongoDB, and MongoDB Kafka Connector environments.",
    "version": "0.0.2",
    "project_urls": null,
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "0a98ac4e852cfba1f64d5eb7091093902aad710cca6e4cf1b2f75a493d7c532f",
                "md5": "e64e0e4773b8a775ef04c84cfded1345",
                "sha256": "a48d46522afa71d640af9247b1090942222fa05d6a17df1969fdb48ced97d253"
            },
            "downloads": -1,
            "filename": "ce_mkc-0.0.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "e64e0e4773b8a775ef04c84cfded1345",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<3.10,>=3.9",
            "size": 21241,
            "upload_time": "2024-05-07T16:51:44",
            "upload_time_iso_8601": "2024-05-07T16:51:44.571603Z",
            "url": "https://files.pythonhosted.org/packages/0a/98/ac4e852cfba1f64d5eb7091093902aad710cca6e4cf1b2f75a493d7c532f/ce_mkc-0.0.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "f87374d27a2bf2eb8e253939836d119558238549569dd32e89ab0a1f433176c0",
                "md5": "51456a223826b67e6335cfad00f51484",
                "sha256": "66cb834a0bb96dda9ddc3839ac0bfe0471feb5ce304c49ec731c1c868004ed0a"
            },
            "downloads": -1,
            "filename": "ce_mkc-0.0.2.tar.gz",
            "has_sig": false,
            "md5_digest": "51456a223826b67e6335cfad00f51484",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<3.10,>=3.9",
            "size": 26717,
            "upload_time": "2024-05-07T16:51:47",
            "upload_time_iso_8601": "2024-05-07T16:51:47.085932Z",
            "url": "https://files.pythonhosted.org/packages/f8/73/74d27a2bf2eb8e253939836d119558238549569dd32e89ab0a1f433176c0/ce_mkc-0.0.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-05-07 16:51:47",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "ce-mkc"
}
        
Elapsed time: 0.24838s