Name | ce-mkc JSON |
Version |
0.0.2
JSON |
| download |
home_page | None |
Summary | A command line utility to set up Kafka, MongoDB, and MongoDB Kafka Connector environments. |
upload_time | 2024-05-07 16:51:47 |
maintainer | None |
docs_url | None |
author | Your Name |
requires_python | <3.10,>=3.9 |
license | MIT |
keywords |
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# mkc
#### A command line utility to set up Kafka, MongoDB, and MongoDB Kafka Connector environments.
mkc is a python script to create small stacks that pertain to MongoDB and the kafka connector. It uses docker-compose (specifically [this compose file](https://github.com/confluentinc/cp-all-in-one/blob/7.5.0-post/cp-all-in-one/docker-compose.yml) maintained by confluent) to achieve this. You can configure which pieces of the stack you want to deploy, which connectors (if applicable), and which schemas to register (if applicable).
### Setup mkc
#### 1. Install/Start Docker.
For more information, please visit [docker's installation page](https://docs.docker.com/engine/install/)
#### 2. Install Docker compose
```
curl -L "https://github.com/docker/compose/releases/download/1.23.2/docker-compose-$(uname -s)-$(uname -m)" -o docker-compose
sudo mv docker-compose /usr/local/bin && sudo chmod +x /usr/local/bin/docker-compose
sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose
```
Test with
```
docker-compose -v
```
#### 3. Install python3
#### 4. Install packages:
````
pip install -r requirements.txt
````
#### 5. Script usage:
````
$ python3 mkc.py --help
usage: mkc.py [-h] [--config CONFIG] [--connectors CONNECTORS]
[--forceDownload] [--cleanOldFiles] [--logLevel LOGLEVEL]
Command line utility to set up Kafka, MongoDB, and MongoDB Kafka Connector
environments
optional arguments:
-h, --help show this help message and exit
--config CONFIG A string representing the path to the config.json file
--connectors CONNECTORS
A comma separated string of paths to the configs
describing the MongoDB connectors to install
--forceDownload Include this flag to force mkc to download the mongodb
kafka connector
--logLevel LOGLEVEL Log level. Possible values are [none, info, verbose]
````
## Example
### Create config:
```
echo '{
"zookeeperVersion" : "6.2.0",
"kafkaVersion" : "6.2.0",
"schemaRegistryVersion" : "6.2.0",
"kafkaConnectVersion" : "6.2.0",
"mongodbVersion" : "5.0.20",
"akhqVersion" : "0.24.0"
}' >> config/config.test.json
```
### Run mkc:
```
python3 mkc --config config/config.test.json \
--logLevel debug \
--connectors "config/connectors/sample.sink.connector.errol.json,config/connectors/sample.source.connector.errol.json" \
--schemas "config/schemas/topic1.value.avro.json"
```
Note that this may take longer than usual the first time you run mkc on a machine, as it must pull down all docker images locally.
### Once the script has run, check the containers running:
```
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
1dddf9756695 confluentinc/cp-kafka-connect-base:6.2.0 "bash -c 'echo \"Inst…" 3 minutes ago Up 3 minutes (healthy) 0.0.0.0:8083->8083/tcp, 0.0.0.0:9201->9201/tcp, 9092/tcp kafkaConnectContainer-mkc
a3baf7ebf92f confluentinc/cp-schema-registry:6.2.0 "/etc/confluent/dock…" 3 minutes ago Up 3 minutes 0.0.0.0:8081->8081/tcp schemaRegistryContainer-mkc
92dad29f2f08 confluentinc/cp-kafka:6.2.0 "/etc/confluent/dock…" 3 minutes ago Up 3 minutes 0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp kafkaBrokerContainer-mkc
d9627518aeb0 mongo:5.0.20 "mongod --config /mo…" 3 minutes ago Up 3 minutes 0.0.0.0:27017->27017/tcp mongodContainer-mkc
c10e59470405 tchiotludo/akhq:0.24.0 "docker-entrypoint.s…" 3 minutes ago Up 3 minutes (healthy) 0.0.0.0:8082->8080/tcp akhqContainer-mkc
fdc9f6e0eb5a confluentinc/cp-zookeeper:6.2.0 "/etc/confluent/dock…" 3 minutes ago Up 3 minutes 2888/tcp, 0.0.0.0:2181->2181/tcp, 0.0.0.0:9001->9001/tcp, 3888/tcp zookeeperContainer-mkc
```
Here we have launched,
* Kafka (broker)
* Zookeeper for kafka broker
* The kafka schema registry server
* The kafka connect server
* A container running akhq
On the connect server, we have several connectors running:
```
$ curl -s -X GET -H "Content-Type:application/json" http://localhost:8083/connectors | jq .
[
"simple-sink",
"mongo-source-connector"
]
```
One sink connector:
```
$ curl -s -X GET -H "Content-Type:application/json" http://localhost:8083/connectors/simple-sink | jq .
{
"name": "simple-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"mongo.errors.log.enable": "true",
"namespace.mapper.value.collection.field": "ns.coll",
"tasks.max": "1",
"topics": "test",
"namespace.mapper": "com.mongodb.kafka.connect.sink.namespace.mapping.FieldPathNamespaceMapper",
"mongo.errors.tolerance": "all",
"database": "landmark",
"namespace.mapper.error.if.invalid": "true",
"connection.uri": "mongodb+srv://<username>:<pass>@cluster0.bfbxg.mongodb.net/myFirstDatabase",
"name": "simple-sink",
"config.action.reload": "restart",
"errors.log.enable": "true"
},
"tasks": [
{
"connector": "simple-sink",
"task": 0
}
],
"type": "sink"
}
```
One source connector:
```
$ curl -s -X GET -H "Content-Type:application/json" http://localhost:8083/connectors/mongo-source-connector | jq .
{
"name": "mongo-source-connector",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"tasks.max": "1",
"batch.size": "0",
"change.stream.full.document": "updateLookup",
"collection": "source",
"pipeline": "[]",
"database": "kafka",
"topic.prefix": "mongo",
"topic.separator": "-",
"poll.await.time.ms": "5000",
"connection.uri": "mongodb+srv://<username>:<pass>@cluster0.bfbxg.mongodb.net/myFirstDatabase",
"name": "mongo-source-connector",
"change.stream.full.document.before.change": "whenAvailable",
"collation": "",
"topic.suffix": "topic",
"poll.max.batch.size": "1000"
},
"tasks": [
{
"connector": "mongo-source-connector",
"task": 0
}
],
"type": "source"
}
```
We also have a schema in our schema registry:
```
curl -X GET http://localhost:8081/subjects | jq .
[
"source-value"
]
```
We can see the configured value matches the schema file:
```
curl -X GET http://localhost:8081/subjects/source-value/versions/latest | jq .
{
"subject": "source-value",
"version": 1,
"id": 1,
"schema": "{\"type\":\"record\",\"name\":\"Purchase\",\"namespace\":\"io.confluent.developer.avro\",\"fields\":[{\"name\":\"item\",\"type\":\"string\"},{\"name\":\"totalCost\",\"type\":\"double\"},{\"name\":\"customerID\",\"type\":\"string\"},{\"name\":\"itemOptions\",\"type\":{\"type\":\"map\",\"values\":\"double\"}}]}"
}
```
### 6. Write to Kafka and Check Mongo
In this example, we will write to the kafka topic sink and it will be produced to MongoDB:
sink topic --> MongoDB Sink Connector (simple-mongo-sink) --> kafkaconnector.sink collection
Run the following to insert data into the sink topic in our broker:
```
python3 test/testKafkaProducer --bootstrapServers "localhost:9092" --topic "sink" --loglevel debug
```
Check that the data was written to MongoDB via mongosh:
```
mongosh --quiet --eval 'db.getSiblingDB("kafkaconnector").getCollection("sink").findOne()'
{
_id: '539d483a-8482-481a-96bb-60704430b5ca',
array: [ { field1: Long("1") }, { field1: 'String1' } ],
keyConverterType: 'jsonconverter',
keyConverterEnableSchemas: false
}
```
<br>
#### 7. Write to Mongo and Check Kafka
In this example, we will write to the kafkaconnector.source namespace.
kafkaconnector.source --> MongoDB source connector (simple-mongo-source) --> mongo-kafkaconnector-source-topic
kafkaconnector.source --> MongoDB source connector (simple-mongo-source-fulldoconly) --> mongo-kafkaconnector-topic-source-fulldoc-only
Once the stack is up. Run the following insert statement via the mongosh command:
```
mongosh --quiet --eval 'db.getSiblingDB("kafkaconnector").getCollection("source").insertOne({"testMessage" : 1})'\
{
acknowledged: true,
insertedId: ObjectId("6532db9367a1a8a0b943c8d9")
}
```
Run the following to read data from our kafka topic:
```
python3 test/testKafkaConsumer --bootstrapServers "localhost:9092" --topics "mongo-kafkaconnector-source-topic" --groupId consumerGroup1 --loglevel debug
```
You should see an INFO log message displaying the documented produced from the connector to the topic:
```
INFO: Found message {"_id": {"_data": "826532DB93000000022B022C0100296E5A10049099BCB8684E41D0827CF2FD0F8B88A046645F696400646532DB9367A1A8A0B943C8D90004"}, "operationType": "insert", "clusterTime": {"$timestamp": {"t": 1697831827, "i": 2}}, "fullDocument": {"_id": {"$oid": "6532db9367a1a8a0b943c8d9"}, "testMessage": 1}, "ns": {"db": "kafkaconnector", "coll": "source"}, "documentKey": {"_id": {"$oid": "6532db9367a1a8a0b943c8d9"}}}
```
After that, the consumer will block waiting for new messages. You can kill it with [CTRL] + C
<br>
<br>
#### 8. Examine Setup with AKHQ
AKHQ is a web-based application for monitoring various parts of the confluent platform. Our container is bound to port 8082. To access it, please visit localhost:8082. You will see a place like this:
![img.png](img.png)
Here we can see the various topics created in our Kafka broker and configuration/architecture metadata. If we click on the magnifying glass icon for one of the topics (say, mongo-kafkaconnect-source-topic, for example), we can see information about the topic and data therein:
![img_1.png](img_1.png)
If we click on the gear icon for that topic, we can set configurations:
![img_2.png](img_2.png)
We can also see logs, ACLS, anc consumer groups for this topic.
One the left-hand side, we can select "Consumer Groups" to see all the consumer groups for all topics:
![img_3.png](img_3.png)
You can read more about AKHQ and its capabilities [here](https://akhq.io/).
<br>
#### 9. Shutting Down Stack
To shut down the stack, simply run docker-compose down on the docker-compose file in the
present working directory:
```
docker-compose -f docker-compose.XXXX.yml down
```
<br>
### Configuration
mkc allows you to configure it in several ways
#### Specify container configurations via --config flag
Here you can specify which containers to launch and what versions. The containers possible are:
* Kafka Broker (& Zookeeper)
* Kafka Connect
* Schema Registry
* AKHQ
* MongoDB
<br><br>
Omission of any of these will result in that particular container not being deployed. This allows users to only launch some pieces of the stack.
For example, here is one sample configuration:
```
{
"zookeeperVersion" : "6.2.0",
"kafkaVersion" : "6.2.0",
"schemaRegistryVersion" : "6.2.0",
"kafkaConnectVersion" : "6.2.0",
"kafkaCatVersion" : "1.6.0",
"mongodbVersion" : "5.0.20",
"akhqVersion" : "0.24.0"
}
```
<br>
#### Specify connectors via the --connectors flag <br><br>
This is a comma-delimited list of paths to config files specifying the following:
* The connector configuration
* The connector download URL
* The number of instances of this connector
<br><br>
Here is a sample config file:
```{
"name" : "mongodb-kafka-connector",
"version" : "1.10.0",
"config" : {
"name" : "simple-mongo-sink",
"config" : {
"connector.class" : "com.mongodb.kafka.connect.MongoSinkConnector",
"database" : "simple-kafka-sink",
"tasks.max" : 1,
"topics" : "purchases",
"connection.uri" : "mongodb://mongodContainer-mkc:27017/test?replicaSet=testRS",
"mongo.errors.log.enable" : "true",
"mongo.errors.tolerance" : "all",
"config.action.reload" : "restart",
"errors.log.enable" : "true",
"key.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable" : false,
"value.converter.schemas.enable" : false
}
},
"num" : 1,
"downloadURL" : "https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/{version}/mongo-kafka-connect-{version}-all.jar"
}
```
<br>
#### Specify schemas via the --schema flag
This is a comma separated string of paths to the configs describing the schemas to register
One sample schema is :
```
{
"schema" : {
"type" : "record",
"namespace" : "io.confluent.developer.avro",
"name" : "Purchase",
"fields" : [
{ "name" : "item", "type" : "string" },
{ "name" : "totalCost", "type" : "double" },
{ "name" : "customerID", "type" : "string" },
{ "name" : "itemOptions", "type" : {
"type" : "map",
"values" : "double"
}
}
]
},
"subjectName": "source",
"keyOrValue" : "value"
}
```
Note that this must include the following fields:
* A "subjectName" field that indicates the subject for the schema,
* A "schema" field that indicates the schema itself (currently, only Avro is supported by mkc)
* A "keyOrValue" field that indicates whether the schema is for a value or key
Following the creation, we can send a GET request to the schema registry to see the schema:
```
curl -X GET http://localhost:8081/subjects/source-value/versions/latest | jq .
{
"subject": "source-value",
"version": 1,
"id": 1,
"schema": "{\"type\":\"record\",\"name\":\"Purchase\",\"namespace\":\"io.confluent.developer.avro\",\"fields\":[{\"name\":\"item\",\"type\":\"string\"},{\"name\":\"totalCost\",\"type\":\"double\"},{\"name\":\"customerID\",\"type\":\"string\"},{\"name\":\"itemOptions\",\"type\":{\"type\":\"map\",\"values\":\"double\"}}]}"
}
```
<br>
### Known Issues
#### Stopping due to exception Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))
This issue can occur when docker-compose cannot authenticate to dockerd. To resolve this, run the following command
```
sudo ln -s "$HOME/.docker/run/docker.sock" /var/run/docker.sock
```
## Contributing
Please make all changes in the development branch. We will use the [gitflow workflow](https://veerasundar.com/blog/gitflow-animated/) with git for SCM:
1. There's a master branch.
2. You create a develop branch off of master. This develop branch is your bread and butter as most of your changes go in here.
3. feature and release branches are created off of develop.
4. Once you are done with feature, you merge it to develop.
5. Once you are done with release, you merge it to both develop and master. And you tag the release.
6. If there's a issue in production, you create a hotfix branch off of master.
7. Once hotfix is done, you merge it back to master and develop and tag the release.
## TODO
* Switch to docker sdk, individually launching containers
* Ability to have multiple kafka connect workers to test fault tolerance
* Stack needs to automatically preclude zookeeper for newer versions of kafka
Raw data
{
"_id": null,
"home_page": null,
"name": "ce-mkc",
"maintainer": null,
"docs_url": null,
"requires_python": "<3.10,>=3.9",
"maintainer_email": null,
"keywords": null,
"author": "Your Name",
"author_email": "you@example.com",
"download_url": "https://files.pythonhosted.org/packages/f8/73/74d27a2bf2eb8e253939836d119558238549569dd32e89ab0a1f433176c0/ce_mkc-0.0.2.tar.gz",
"platform": null,
"description": "# mkc \n#### A command line utility to set up Kafka, MongoDB, and MongoDB Kafka Connector environments.\n\nmkc is a python script to create small stacks that pertain to MongoDB and the kafka connector. It uses docker-compose (specifically [this compose file](https://github.com/confluentinc/cp-all-in-one/blob/7.5.0-post/cp-all-in-one/docker-compose.yml) maintained by confluent) to achieve this. You can configure which pieces of the stack you want to deploy, which connectors (if applicable), and which schemas to register (if applicable).\n\n### Setup mkc\n\n#### 1. Install/Start Docker.\n\nFor more information, please visit [docker's installation page](https://docs.docker.com/engine/install/)\n\n#### 2. Install Docker compose\n\n```\ncurl -L \"https://github.com/docker/compose/releases/download/1.23.2/docker-compose-$(uname -s)-$(uname -m)\" -o docker-compose\nsudo mv docker-compose /usr/local/bin && sudo chmod +x /usr/local/bin/docker-compose\nsudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose\n```\n\nTest with \n\n```\ndocker-compose -v\n```\n\n#### 3. Install python3\n#### 4. Install packages:\n````\npip install -r requirements.txt\n````\n#### 5. Script usage:\n````\n$ python3 mkc.py --help \nusage: mkc.py [-h] [--config CONFIG] [--connectors CONNECTORS]\n [--forceDownload] [--cleanOldFiles] [--logLevel LOGLEVEL]\n\nCommand line utility to set up Kafka, MongoDB, and MongoDB Kafka Connector\nenvironments\n\noptional arguments:\n -h, --help show this help message and exit\n --config CONFIG A string representing the path to the config.json file\n --connectors CONNECTORS\n A comma separated string of paths to the configs\n describing the MongoDB connectors to install\n --forceDownload Include this flag to force mkc to download the mongodb\n kafka connector\n --logLevel LOGLEVEL Log level. Possible values are [none, info, verbose]\n\n````\n\n## Example\n\n### Create config:\n\n```\necho '{\n \"zookeeperVersion\" : \"6.2.0\",\n \"kafkaVersion\" : \"6.2.0\",\n \"schemaRegistryVersion\" : \"6.2.0\",\n \"kafkaConnectVersion\" : \"6.2.0\",\n \"mongodbVersion\" : \"5.0.20\",\n \"akhqVersion\" : \"0.24.0\"\n}' >> config/config.test.json\n```\n\n### Run mkc:\n\n```\npython3 mkc --config config/config.test.json \\\n --logLevel debug \\\n --connectors \"config/connectors/sample.sink.connector.errol.json,config/connectors/sample.source.connector.errol.json\" \\\n --schemas \"config/schemas/topic1.value.avro.json\"\n```\n\nNote that this may take longer than usual the first time you run mkc on a machine, as it must pull down all docker images locally. \n\n### Once the script has run, check the containers running:\n```\n$ docker ps \nCONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES\n1dddf9756695 confluentinc/cp-kafka-connect-base:6.2.0 \"bash -c 'echo \\\"Inst\u2026\" 3 minutes ago Up 3 minutes (healthy) 0.0.0.0:8083->8083/tcp, 0.0.0.0:9201->9201/tcp, 9092/tcp kafkaConnectContainer-mkc\na3baf7ebf92f confluentinc/cp-schema-registry:6.2.0 \"/etc/confluent/dock\u2026\" 3 minutes ago Up 3 minutes 0.0.0.0:8081->8081/tcp schemaRegistryContainer-mkc\n92dad29f2f08 confluentinc/cp-kafka:6.2.0 \"/etc/confluent/dock\u2026\" 3 minutes ago Up 3 minutes 0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp kafkaBrokerContainer-mkc\nd9627518aeb0 mongo:5.0.20 \"mongod --config /mo\u2026\" 3 minutes ago Up 3 minutes 0.0.0.0:27017->27017/tcp mongodContainer-mkc\nc10e59470405 tchiotludo/akhq:0.24.0 \"docker-entrypoint.s\u2026\" 3 minutes ago Up 3 minutes (healthy) 0.0.0.0:8082->8080/tcp akhqContainer-mkc\nfdc9f6e0eb5a confluentinc/cp-zookeeper:6.2.0 \"/etc/confluent/dock\u2026\" 3 minutes ago Up 3 minutes 2888/tcp, 0.0.0.0:2181->2181/tcp, 0.0.0.0:9001->9001/tcp, 3888/tcp zookeeperContainer-mkc\n```\n\nHere we have launched,\n* Kafka (broker)\n* Zookeeper for kafka broker\n* The kafka schema registry server\n* The kafka connect server \n* A container running akhq\n\nOn the connect server, we have several connectors running:\n\n```\n $ curl -s -X GET -H \"Content-Type:application/json\" http://localhost:8083/connectors | jq . \n[\n \"simple-sink\",\n \"mongo-source-connector\"\n]\n```\n\nOne sink connector:\n```\n $ curl -s -X GET -H \"Content-Type:application/json\" http://localhost:8083/connectors/simple-sink | jq . \n{\n \"name\": \"simple-sink\",\n \"config\": {\n \"connector.class\": \"com.mongodb.kafka.connect.MongoSinkConnector\",\n \"mongo.errors.log.enable\": \"true\",\n \"namespace.mapper.value.collection.field\": \"ns.coll\",\n \"tasks.max\": \"1\",\n \"topics\": \"test\",\n \"namespace.mapper\": \"com.mongodb.kafka.connect.sink.namespace.mapping.FieldPathNamespaceMapper\",\n \"mongo.errors.tolerance\": \"all\",\n \"database\": \"landmark\",\n \"namespace.mapper.error.if.invalid\": \"true\",\n \"connection.uri\": \"mongodb+srv://<username>:<pass>@cluster0.bfbxg.mongodb.net/myFirstDatabase\",\n \"name\": \"simple-sink\",\n \"config.action.reload\": \"restart\",\n \"errors.log.enable\": \"true\"\n },\n \"tasks\": [\n {\n \"connector\": \"simple-sink\",\n \"task\": 0\n }\n ],\n \"type\": \"sink\"\n}\n```\n\nOne source connector:\n```\n$ curl -s -X GET -H \"Content-Type:application/json\" http://localhost:8083/connectors/mongo-source-connector | jq . \n{\n \"name\": \"mongo-source-connector\",\n \"config\": {\n \"connector.class\": \"com.mongodb.kafka.connect.MongoSourceConnector\",\n \"tasks.max\": \"1\",\n \"batch.size\": \"0\",\n \"change.stream.full.document\": \"updateLookup\",\n \"collection\": \"source\",\n \"pipeline\": \"[]\",\n \"database\": \"kafka\",\n \"topic.prefix\": \"mongo\",\n \"topic.separator\": \"-\",\n \"poll.await.time.ms\": \"5000\",\n \"connection.uri\": \"mongodb+srv://<username>:<pass>@cluster0.bfbxg.mongodb.net/myFirstDatabase\",\n \"name\": \"mongo-source-connector\",\n \"change.stream.full.document.before.change\": \"whenAvailable\",\n \"collation\": \"\",\n \"topic.suffix\": \"topic\",\n \"poll.max.batch.size\": \"1000\"\n },\n \"tasks\": [\n {\n \"connector\": \"mongo-source-connector\",\n \"task\": 0\n }\n ],\n \"type\": \"source\"\n}\n```\n\nWe also have a schema in our schema registry:\n\n```\ncurl -X GET http://localhost:8081/subjects | jq . \n[\n \"source-value\"\n]\n```\n\nWe can see the configured value matches the schema file:\n\n```\ncurl -X GET http://localhost:8081/subjects/source-value/versions/latest | jq . \n{\n \"subject\": \"source-value\",\n \"version\": 1,\n \"id\": 1,\n \"schema\": \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"Purchase\\\",\\\"namespace\\\":\\\"io.confluent.developer.avro\\\",\\\"fields\\\":[{\\\"name\\\":\\\"item\\\",\\\"type\\\":\\\"string\\\"},{\\\"name\\\":\\\"totalCost\\\",\\\"type\\\":\\\"double\\\"},{\\\"name\\\":\\\"customerID\\\",\\\"type\\\":\\\"string\\\"},{\\\"name\\\":\\\"itemOptions\\\",\\\"type\\\":{\\\"type\\\":\\\"map\\\",\\\"values\\\":\\\"double\\\"}}]}\"\n}\n```\n\n\n### 6. Write to Kafka and Check Mongo\n\nIn this example, we will write to the kafka topic sink and it will be produced to MongoDB:\n\nsink topic --> MongoDB Sink Connector (simple-mongo-sink) --> kafkaconnector.sink collection\n\nRun the following to insert data into the sink topic in our broker:\n\n```\npython3 test/testKafkaProducer --bootstrapServers \"localhost:9092\" --topic \"sink\" --loglevel debug\n```\n\nCheck that the data was written to MongoDB via mongosh:\n\n```\nmongosh --quiet --eval 'db.getSiblingDB(\"kafkaconnector\").getCollection(\"sink\").findOne()'\n{\n _id: '539d483a-8482-481a-96bb-60704430b5ca',\n array: [ { field1: Long(\"1\") }, { field1: 'String1' } ],\n keyConverterType: 'jsonconverter',\n keyConverterEnableSchemas: false\n}\n```\n<br>\n\n#### 7. Write to Mongo and Check Kafka\n\nIn this example, we will write to the kafkaconnector.source namespace. \n\nkafkaconnector.source --> MongoDB source connector (simple-mongo-source) --> mongo-kafkaconnector-source-topic\nkafkaconnector.source --> MongoDB source connector (simple-mongo-source-fulldoconly) --> mongo-kafkaconnector-topic-source-fulldoc-only\n\nOnce the stack is up. Run the following insert statement via the mongosh command:\n\n```\nmongosh --quiet --eval 'db.getSiblingDB(\"kafkaconnector\").getCollection(\"source\").insertOne({\"testMessage\" : 1})'\\\n{\n acknowledged: true,\n insertedId: ObjectId(\"6532db9367a1a8a0b943c8d9\")\n}\n```\n\nRun the following to read data from our kafka topic:\n\n```\npython3 test/testKafkaConsumer --bootstrapServers \"localhost:9092\" --topics \"mongo-kafkaconnector-source-topic\" --groupId consumerGroup1 --loglevel debug\n```\n\nYou should see an INFO log message displaying the documented produced from the connector to the topic:\n\n```\nINFO: Found message {\"_id\": {\"_data\": \"826532DB93000000022B022C0100296E5A10049099BCB8684E41D0827CF2FD0F8B88A046645F696400646532DB9367A1A8A0B943C8D90004\"}, \"operationType\": \"insert\", \"clusterTime\": {\"$timestamp\": {\"t\": 1697831827, \"i\": 2}}, \"fullDocument\": {\"_id\": {\"$oid\": \"6532db9367a1a8a0b943c8d9\"}, \"testMessage\": 1}, \"ns\": {\"db\": \"kafkaconnector\", \"coll\": \"source\"}, \"documentKey\": {\"_id\": {\"$oid\": \"6532db9367a1a8a0b943c8d9\"}}}\n```\n\nAfter that, the consumer will block waiting for new messages. You can kill it with [CTRL] + C\n<br>\n<br>\n\n#### 8. Examine Setup with AKHQ\n\nAKHQ is a web-based application for monitoring various parts of the confluent platform. Our container is bound to port 8082. To access it, please visit localhost:8082. You will see a place like this:\n\n![img.png](img.png)\n\nHere we can see the various topics created in our Kafka broker and configuration/architecture metadata. If we click on the magnifying glass icon for one of the topics (say, mongo-kafkaconnect-source-topic, for example), we can see information about the topic and data therein: \n\n![img_1.png](img_1.png)\n\nIf we click on the gear icon for that topic, we can set configurations:\n\n![img_2.png](img_2.png)\n\nWe can also see logs, ACLS, anc consumer groups for this topic. \n\nOne the left-hand side, we can select \"Consumer Groups\" to see all the consumer groups for all topics:\n\n![img_3.png](img_3.png)\n\nYou can read more about AKHQ and its capabilities [here](https://akhq.io/).\n\n\n<br>\n\n#### 9. Shutting Down Stack\n\nTo shut down the stack, simply run docker-compose down on the docker-compose file in the \npresent working directory:\n\n```\ndocker-compose -f docker-compose.XXXX.yml down \n```\n<br>\n\n### Configuration\n\nmkc allows you to configure it in several ways\n\n#### Specify container configurations via --config flag \n\nHere you can specify which containers to launch and what versions. The containers possible are:\n\n* Kafka Broker (& Zookeeper)\n* Kafka Connect \n* Schema Registry \n* AKHQ \n* MongoDB\n<br><br>\nOmission of any of these will result in that particular container not being deployed. This allows users to only launch some pieces of the stack. \nFor example, here is one sample configuration:\n \n```\n{\n \"zookeeperVersion\" : \"6.2.0\",\n \"kafkaVersion\" : \"6.2.0\",\n \"schemaRegistryVersion\" : \"6.2.0\",\n \"kafkaConnectVersion\" : \"6.2.0\",\n \"kafkaCatVersion\" : \"1.6.0\",\n \"mongodbVersion\" : \"5.0.20\",\n \"akhqVersion\" : \"0.24.0\"\n}\n```\n\n<br>\n\n#### Specify connectors via the --connectors flag <br><br>\n\nThis is a comma-delimited list of paths to config files specifying the following:\n* The connector configuration\n* The connector download URL\n* The number of instances of this connector\n<br><br>\nHere is a sample config file:\n\n```{\n \"name\" : \"mongodb-kafka-connector\",\n \"version\" : \"1.10.0\",\n \"config\" : {\n \"name\" : \"simple-mongo-sink\",\n \"config\" : {\n \"connector.class\" : \"com.mongodb.kafka.connect.MongoSinkConnector\",\n \"database\" : \"simple-kafka-sink\",\n \"tasks.max\" : 1,\n \"topics\" : \"purchases\",\n \"connection.uri\" : \"mongodb://mongodContainer-mkc:27017/test?replicaSet=testRS\",\n \"mongo.errors.log.enable\" : \"true\",\n \"mongo.errors.tolerance\" : \"all\",\n \"config.action.reload\" : \"restart\",\n \"errors.log.enable\" : \"true\",\n \"key.converter\" : \"org.apache.kafka.connect.json.JsonConverter\",\n \"value.converter\" : \"org.apache.kafka.connect.json.JsonConverter\",\n \"key.converter.schemas.enable\" : false,\n \"value.converter.schemas.enable\" : false\n }\n },\n \"num\" : 1,\n \"downloadURL\" : \"https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/{version}/mongo-kafka-connect-{version}-all.jar\"\n}\n```\n<br>\n\n#### Specify schemas via the --schema flag\n\nThis is a comma separated string of paths to the configs describing the schemas to register\n\nOne sample schema is :\n\n```\n{\n \"schema\" : {\n \"type\" : \"record\",\n \"namespace\" : \"io.confluent.developer.avro\",\n \"name\" : \"Purchase\",\n \"fields\" : [\n { \"name\" : \"item\", \"type\" : \"string\" },\n { \"name\" : \"totalCost\", \"type\" : \"double\" },\n { \"name\" : \"customerID\", \"type\" : \"string\" },\n { \"name\" : \"itemOptions\", \"type\" : {\n \"type\" : \"map\",\n \"values\" : \"double\"\n }\n }\n ]\n },\n \"subjectName\": \"source\",\n \"keyOrValue\" : \"value\"\n}\n```\n\nNote that this must include the following fields:\n* A \"subjectName\" field that indicates the subject for the schema,\n* A \"schema\" field that indicates the schema itself (currently, only Avro is supported by mkc) \n* A \"keyOrValue\" field that indicates whether the schema is for a value or key\n\nFollowing the creation, we can send a GET request to the schema registry to see the schema:\n\n```\ncurl -X GET http://localhost:8081/subjects/source-value/versions/latest | jq . \n{\n \"subject\": \"source-value\",\n \"version\": 1,\n \"id\": 1,\n \"schema\": \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"Purchase\\\",\\\"namespace\\\":\\\"io.confluent.developer.avro\\\",\\\"fields\\\":[{\\\"name\\\":\\\"item\\\",\\\"type\\\":\\\"string\\\"},{\\\"name\\\":\\\"totalCost\\\",\\\"type\\\":\\\"double\\\"},{\\\"name\\\":\\\"customerID\\\",\\\"type\\\":\\\"string\\\"},{\\\"name\\\":\\\"itemOptions\\\",\\\"type\\\":{\\\"type\\\":\\\"map\\\",\\\"values\\\":\\\"double\\\"}}]}\"\n}\n```\n\n<br>\n\n### Known Issues\n\n#### Stopping due to exception Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))\n\nThis issue can occur when docker-compose cannot authenticate to dockerd. To resolve this, run the following command\n\n```\nsudo ln -s \"$HOME/.docker/run/docker.sock\" /var/run/docker.sock\n```\n\n\n## Contributing \n\nPlease make all changes in the development branch. We will use the [gitflow workflow](https://veerasundar.com/blog/gitflow-animated/) with git for SCM:\n1. There's a master branch.\n2. You create a develop branch off of master. This develop branch is your bread and butter as most of your changes go in here.\n3. feature and release branches are created off of develop.\n4. Once you are done with feature, you merge it to develop.\n5. Once you are done with release, you merge it to both develop and master. And you tag the release.\n6. If there's a issue in production, you create a hotfix branch off of master.\n7. Once hotfix is done, you merge it back to master and develop and tag the release.\n\n\n## TODO\n* Switch to docker sdk, individually launching containers\n* Ability to have multiple kafka connect workers to test fault tolerance\n* Stack needs to automatically preclude zookeeper for newer versions of kafka\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "A command line utility to set up Kafka, MongoDB, and MongoDB Kafka Connector environments.",
"version": "0.0.2",
"project_urls": null,
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "0a98ac4e852cfba1f64d5eb7091093902aad710cca6e4cf1b2f75a493d7c532f",
"md5": "e64e0e4773b8a775ef04c84cfded1345",
"sha256": "a48d46522afa71d640af9247b1090942222fa05d6a17df1969fdb48ced97d253"
},
"downloads": -1,
"filename": "ce_mkc-0.0.2-py3-none-any.whl",
"has_sig": false,
"md5_digest": "e64e0e4773b8a775ef04c84cfded1345",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<3.10,>=3.9",
"size": 21241,
"upload_time": "2024-05-07T16:51:44",
"upload_time_iso_8601": "2024-05-07T16:51:44.571603Z",
"url": "https://files.pythonhosted.org/packages/0a/98/ac4e852cfba1f64d5eb7091093902aad710cca6e4cf1b2f75a493d7c532f/ce_mkc-0.0.2-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "f87374d27a2bf2eb8e253939836d119558238549569dd32e89ab0a1f433176c0",
"md5": "51456a223826b67e6335cfad00f51484",
"sha256": "66cb834a0bb96dda9ddc3839ac0bfe0471feb5ce304c49ec731c1c868004ed0a"
},
"downloads": -1,
"filename": "ce_mkc-0.0.2.tar.gz",
"has_sig": false,
"md5_digest": "51456a223826b67e6335cfad00f51484",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<3.10,>=3.9",
"size": 26717,
"upload_time": "2024-05-07T16:51:47",
"upload_time_iso_8601": "2024-05-07T16:51:47.085932Z",
"url": "https://files.pythonhosted.org/packages/f8/73/74d27a2bf2eb8e253939836d119558238549569dd32e89ab0a1f433176c0/ce_mkc-0.0.2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-05-07 16:51:47",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "ce-mkc"
}