# kafkacrypto
Message Layer Encryption for Kafka
Available on PyPI at https://pypi.org/project/kafkacrypto/
Available on Github at https://github.com/tmcqueen-materials/kafkacrypto
Java implementation available on Github at https://github.com/tmcqueen-materials/kafkacrypto-java
## Quick Start
On every kafka consumer or producer node, do:
1. `pip3 install kafkacrypto`
1. [Download](https://github.com/tmcqueen-materials/kafkacrypto/raw/master/tools/simple-provision.py) `simple-provision.py`
1. Run: `./simple-provision.py` and follow the instructions. Use the same root of trust password on all nodes.
In your producer/consumer code:
```python
from kafkacrypto import KafkaCrypto, KafkaConsumer, KafkaProducer
nodeId = 'my-node-ID'
# setup separate consumer/producers for the crypto key passing messages. DO NOT use these for
# other messages.
kcc = KafkaConsumer(...your server params in kafka-python form...)
kcp = KafkaProducer(...your server params in kafka-python form...)
kc = KafkaCrypto(nodeId,kcp,kcc)
... Your code here ...
# Here is how you configure your producer/consumer objects to use the crypto (de)serializers
producer = KafkaProducer(...,key_serializer=kc.getKeySerializer(), value_serializer=kc.getValueSerializer())
consumer = KafkaConsumer(...,key_deserializer=kc.getKeyDeserializer(), value_deserializer=kc.getValueDeserializer())
... Your code here ...
```
And that's it! Your producers and consumers should function as normal, but all traffic within Kafka is encrypted.
If automatic topic creation is disabled, then one more action is needed. For each "root topic" you must create the requisite key-passing topics. By default these are `root.reqs` and `root.keys`, where root is replaced with the root topic name. It is safe to enable regular log compaction on these topics.
## Root Topics
kafkacrypto uses unique keys on a per-"root topic" basis. A root topic is defined as the topic name before the first user-defined separator. The default separator is "`.`". Thus all of these:
`example001`
`example001.foo.bar.baz`
`example001.foo.bar`
`example001.foo`
have the same root topic of `example001`, whereas `example001_baz.bar.foo` has the root topic `example001_baz`. Since kafka does not recommend using both "`.`" and "`_`" in topic names, if you wish every topic to use a unique set of keys, use "`_`" (and not "`.`") in names, or change the defined topic separator.
## Undecryptable Messages
kafkacrypto is designed so that messages being sent can **always** be encrypted once a KafkaCrypto object is successfully created. However, it is possible for a consumer to receive a message for which it does not have a decryption key, i.e. an undecryptable message. This is most often because the asynchronous key exchange process has not completed before the message is received, or because the consumer is not authorized to receive on that topic.
To handle this scenario, all deserialized messages are returned as [KafkaCryptoMessage](https://github.com/tmcqueen-materials/kafkacrypto/blob/master/kafkacrypto/message.py) objects. The `.isCleartext()` method can be used to determine whether the message component was successfully decrypted or not:
```python
# consumer is setup with KafkaCrypto deserializers as shown above
# 'key' refers to the key of key->value pairs from Kafka, not a cryptographic key
for msg in consumer:
if (msg.key.isCleartext()):
# message key was decrypted. bytes(msg.key) is the decrypted message key
else:
# message key was not decrypted. bytes(msg.key) is the raw (undecrypted) message key
# msg.key can be discarded, or saved and decryption attempted at a later time
# by doing KafkaCrypto.getKeyDeserializer().deserialize(msg.topic, msg.key)
if (msg.value.isCleartext()):
# message value was decrypted. bytes(msg.value) is the decrypted message value
else:
# message value was not decrypted. bytes(msg.value) is the raw (undecrypted) message value
# msg.value can be discarded, or saved and decryption attempted at a later time
# by doing KafkaCrypto.getValueDeserializer().deserialize(msg.topic, msg.value)
```
The convenience method `.getMessage()` can be used instead to return the message as bytes if successfully decrypted, or to raise a `KafkaCryptoMessageError` if decryption failed.
## Stacking (De)Serializers
Although not recommended, it is possible to combine multiple (De)Serializers in a single chain to, e.g., encrypt/decrypt and JSON encode/decode a message. Such an example is here:
```python
class CompoundDeSes(kafka.serializer.Serializer,kafka.serializer.Deserializer):
def __init__(self, *args):
self._ser = list(args)
def serialize(self, topic, keyvalue):
for ser in self._ser:
if (isinstance(ser, (kafka.serializer.Serializer,)):
keyvalue = ser.serialize(topic,keyvalue)
else:
keyvalue = ser(keyvalue)
return keyvalue
def deserialize(self, topic, keyvalue):
for ser in self._ser:
if (isinstance(ser, (kafka.serializer.Deserializer,)):
keyvalue = ser.deserialize(topic,keyvalue)
else:
keyvalue = ser(keyvalue)
return keyvalue
...
# Stacked (De)Serializers. Usually, you will encrypt last on serialization, and decrypt
# first on deserialization. Do not forget that exceptions are likely as undecryptable
# messages appear.
producer = KafkaProducer(...,key_serializer=CompoundDeSes(json.dumps,kc.getKeySerializer()), value_serializer=CompoundDeSes(json.dumps,kc.getValueSerializer()))
consumer = KafkaConsumer(...,key_deserializer=CompoundDeSes(kc.getKeyDeserializer(),json.loads), value_deserializer=CompoundDeSes(kc.getValueDeserializer(),json.loads))
```
## Troubleshooting
If something is not working, enable logging to get detailed information:
```python
import logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("kafkacrypto").setLevel(level=logging.INFO) # set to logging.DEBUG for more verbosity
```
## Universal Configuration File
kafkacrypto separates the storage of cryptographic secrets and non-secret configuration information:
1. `my-node-ID.config`: Non-secret parameters, in Python ConfigParser format.
1. `my-node-ID.seed`: Next ratchet seed, when using default implementation of Ratchet. Key secret, should never be saved or transmitted plaintext.
1. `my-node-ID.crypto`: Identification private key, when using default implementation of Cryptokey. Key secret, should never be saved or transmitted plaintext.
Alternative implementations of Ratchet and Cryptokey enable secrets to be managed by specialized hardware (e.g. HSMs).
It is also possible to use `my-node-ID.config` to manage all configuration directives, including those that control Kafka. A sample implementation, which reads the node ID from `node_id` in the `DEFAULT` section is:
```python
#!/usr/bin/python3
from sys import argv
from kafkacrypto import KafkaCrypto, KafkaCryptoStore, KafkaConsumer, KafkaProducer
# Process configuration file
if len(argv) != 2:
exit('Invalid command line.')
kcs = KafkaCryptoStore(argv[1])
# Setup KafkaCrypto
kcc = KafkaConsumer(**kcs.get_kafka_config('consumer',extra='crypto'))
kcp = KafkaProducer(**kcs.get_kafka_config('producer',extra='crypto'))
kc = KafkaCrypto(None,kcp,kcc,config=kcs)
# read program specific values
value1 = kcs.load_value('value1')
value2 = kcs.load_value('value2')
## End read configuration
# Setup Kafka Consumer and Producer
kafka_config = kcs.get_kafka_config('consumer')
kafka_config['key_deserializer'] = kc.getKeyDeserializer()
kafka_config['value_deserializer'] = kc.getValueDeserializer()
consumer = KafkaConsumer(**kafka_config)
kafka_config = kcs.get_kafka_config('producer')
kafka_config['key_serializer'] = kc.getKeySerializer()
kafka_config['value_serializer'] = kc.getValueSerializer()
producer = KafkaProducer(**kafka_config)
... your code here ...
# Save new values
kcs.store_value('value1', 'value-of-value1')
kcs.store_value('value2', 'value-of-value2')
```
## Kafka Python Interfaces
kafkacrypto has been extensively tested with kafka-python. It will use confluent_kafka if available via a thin compatibility wrapper. Other wrappers can be added (submit a pull request!)
## Post Quantum Secure Cryptography
### Prerequisites
Use requires installing [liboqs-python](https://github.com/open-quantum-safe/liboqs-python) with the sntrup761, ML-KEM-1024, and SPINCS+-SHAKE-128f-simple algorithms enabled (the default).
This in turn bumps the minimum python version required to 3.7.
For raspberry pis and other devices not officially supported by liqoqs, the following may help:
```
sudo apt-get install cmake ninja-build git
sudo pip3 install pytest pytest-xdist pyyaml
mkdir oqs
cd oqs
git clone --depth 1 --branch main https://github.com/open-quantum-safe/liboqs
git clone --depth 1 --branch main https://github.com/open-quantum-safe/liboqs-python.git
cd liboqs
mkdir build
cd build
# newer versions of liboqs disable SIKE/SIDH always and do not require turning off on this command line
cmake -G"Ninja" .. -DOQS_DIST_BUILD=ON -DBUILD_SHARED_LIBS=ON -DOQS_PERMIT_UNSUPPORTED_ARCHITECTURE=ON -DOQS_USE_OPENSSL=OFF -DOQS_ENABLE_KEM_SIKE=OFF -DOQS_ENABLE_KEM_SIDH=OFF
ninja
# tests will take about 8 hours on a raspberry pi zero (v1)
ninja run_tests
sudo ninja install
cd ../../liboqs-python
sudo pip3 install .
# the below may or may not be needed, depending on raspi os version
sudo ldconfig
```
### Key Exchange
Starting with version v0.9.10.0, kafkacrypto supports key exchange using Curve25519+sntrup761, a hybrid classical-pq key exchange algorithm. This mirrors support for the same hybrid added in OpenSSH 8.5.
Starting with version v0.9.11.0, kafkacrypto supports key exchange using Curve25519+ML-KEM-1024, a hybrid classical-pq key exchange algorithm, including the FIPS-standardized ML-KEM.
The script `enable-pq-exchange.py` assists in enabing pq key exchange. It must be enabled on both consumers and producers. Optionally, it can be used to select only a pq hybrid algorithm (see code documentation).
### Signing Keys
Starting with version v0.9.11.0, kafkacrypto supports key signing using Ed25519+SLH-DSA-SHAKE-128f, a hybrid classical-pq signing algorithm, including the FIPS-standardized SLH-DSA.
To enable pq signing, simply select a pq signing key when provisioning. Note that provisioning can be run multiple times for a single node to create keys of multiple types. Adding the line `keytypes : 4` just under the cryptokey line in `my-node-ID.config` can be used to enable only hybrid pq signing.
Note that to use password-based deterministic key provisioners, you also need to install [pyspx-slhdsa](https://github.com/tmcqueen-materials/pyspx-slhdsa). We hope to remove this dependency once liboqs-python exposes seed-based key generation.
## Advanced Usage
kafkacrypto has been designed to seamlessly support a range of key exchange authorization and delegation mechanisms beyond the simple single-password root of trust. An example of a simple "controller-based" intermediary is included in the main package. The requisite controller can be setup as:
```python
#!/usr/bin/python3
from kafkacrypto import KafkaCryptoController, KafkaConsumer, KafkaProducer
nodeId = 'controller-name'
# use your normal server parameters in place of the ...
kcc = KafkaConsumer(..., enable_auto_commit=False, group_id=nodeId)
kcp = KafkaProducer(...)
controller = KafkaCryptoController(nodeId,kcp,kcc)
controller._mgmt_thread.join()
```
The configuration parameters inside the provision script should be adjusted so that the "subscribe" and "key request" suffixes are distinct (see comment in `simple-provision.py`, or use `provision.py` instead). If automatic topic creation is disabled, then the topic `root.subs` must also be created. It is safe to enable regular log compaction on this topic.
Another common desire is to use very short chain lifetimes. Chains can be refreshed automatically and pushed to users. The requisite ChainServer can be setup as:
```python
#!/usr/bin/python3
from kafkacrypto import KafkaCryptoChainServer
nodeId = 'chain-server-name'
chainserver = KafkaCryptoChainServer(nodeId)
chainserver._mgmt_thread.join()
```
The sample provision script can appropriately setup keys for the ChainServer as well.
## Design, Specification, and Security Analysis
kafkacrypto is already in limited production use, and should be stable enough for broad adoption. However, a detailed security analysis of the kafkacrypto framework is still in progress, and use of this code should be considered experimental.
## Version Compatibility
kafkacrypto is compatible with all versions of Python 3.3+, and can utilize both [kafka-python](https://github.com/dpkp/kafka-python) and [confluent-kafka](https://github.com/confluentinc/confluent-kafka-python) backends. It will automatically use one of these if already installed (preferring the higher performance confluent-kafka one).
For Python 3.12, kafka-python must be 2.0.3 or later; this dependency has not been released on PyPi, So you either need to use confluent-kafka, or install kafka-python directly from their github.
Raw data
{
"_id": null,
"home_page": "https://github.com/tmcqueen-materials/kafkacrypto",
"name": "kafkacrypto",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.3",
"maintainer_email": null,
"keywords": "kafka kafka-crypto kafka-security security crypo",
"author": "Tyrel M. McQueen",
"author_email": "tmcqueen-pypi@demoivre.com",
"download_url": "https://files.pythonhosted.org/packages/75/af/c8b88dbc7593c54bf544d422036b68fb3fa0c200933268b81c40a17f5a73/kafkacrypto-0.9.11.0.tar.gz",
"platform": null,
"description": "# kafkacrypto\nMessage Layer Encryption for Kafka\n\nAvailable on PyPI at https://pypi.org/project/kafkacrypto/\nAvailable on Github at https://github.com/tmcqueen-materials/kafkacrypto\nJava implementation available on Github at https://github.com/tmcqueen-materials/kafkacrypto-java\n\n## Quick Start\nOn every kafka consumer or producer node, do:\n 1. `pip3 install kafkacrypto`\n 1. [Download](https://github.com/tmcqueen-materials/kafkacrypto/raw/master/tools/simple-provision.py) `simple-provision.py`\n 1. Run: `./simple-provision.py` and follow the instructions. Use the same root of trust password on all nodes.\n\nIn your producer/consumer code:\n```python\nfrom kafkacrypto import KafkaCrypto, KafkaConsumer, KafkaProducer\nnodeId = 'my-node-ID'\n\n# setup separate consumer/producers for the crypto key passing messages. DO NOT use these for\n# other messages.\nkcc = KafkaConsumer(...your server params in kafka-python form...)\nkcp = KafkaProducer(...your server params in kafka-python form...)\nkc = KafkaCrypto(nodeId,kcp,kcc)\n\n... Your code here ...\n\n# Here is how you configure your producer/consumer objects to use the crypto (de)serializers\nproducer = KafkaProducer(...,key_serializer=kc.getKeySerializer(), value_serializer=kc.getValueSerializer())\nconsumer = KafkaConsumer(...,key_deserializer=kc.getKeyDeserializer(), value_deserializer=kc.getValueDeserializer())\n\n... Your code here ...\n```\n\nAnd that's it! Your producers and consumers should function as normal, but all traffic within Kafka is encrypted.\n\nIf automatic topic creation is disabled, then one more action is needed. For each \"root topic\" you must create the requisite key-passing topics. By default these are `root.reqs` and `root.keys`, where root is replaced with the root topic name. It is safe to enable regular log compaction on these topics.\n\n## Root Topics\nkafkacrypto uses unique keys on a per-\"root topic\" basis. A root topic is defined as the topic name before the first user-defined separator. The default separator is \"`.`\". Thus all of these: \n`example001`\n`example001.foo.bar.baz`\n`example001.foo.bar`\n`example001.foo`\nhave the same root topic of `example001`, whereas `example001_baz.bar.foo` has the root topic `example001_baz`. Since kafka does not recommend using both \"`.`\" and \"`_`\" in topic names, if you wish every topic to use a unique set of keys, use \"`_`\" (and not \"`.`\") in names, or change the defined topic separator.\n\n## Undecryptable Messages\nkafkacrypto is designed so that messages being sent can **always** be encrypted once a KafkaCrypto object is successfully created. However, it is possible for a consumer to receive a message for which it does not have a decryption key, i.e. an undecryptable message. This is most often because the asynchronous key exchange process has not completed before the message is received, or because the consumer is not authorized to receive on that topic. \n\nTo handle this scenario, all deserialized messages are returned as [KafkaCryptoMessage](https://github.com/tmcqueen-materials/kafkacrypto/blob/master/kafkacrypto/message.py) objects. The `.isCleartext()` method can be used to determine whether the message component was successfully decrypted or not:\n```python\n# consumer is setup with KafkaCrypto deserializers as shown above\n# 'key' refers to the key of key->value pairs from Kafka, not a cryptographic key\nfor msg in consumer:\n if (msg.key.isCleartext()):\n # message key was decrypted. bytes(msg.key) is the decrypted message key\n else:\n # message key was not decrypted. bytes(msg.key) is the raw (undecrypted) message key\n # msg.key can be discarded, or saved and decryption attempted at a later time\n # by doing KafkaCrypto.getKeyDeserializer().deserialize(msg.topic, msg.key)\n if (msg.value.isCleartext()):\n # message value was decrypted. bytes(msg.value) is the decrypted message value\n else:\n # message value was not decrypted. bytes(msg.value) is the raw (undecrypted) message value\n # msg.value can be discarded, or saved and decryption attempted at a later time\n # by doing KafkaCrypto.getValueDeserializer().deserialize(msg.topic, msg.value)\n```\nThe convenience method `.getMessage()` can be used instead to return the message as bytes if successfully decrypted, or to raise a `KafkaCryptoMessageError` if decryption failed.\n\n## Stacking (De)Serializers\nAlthough not recommended, it is possible to combine multiple (De)Serializers in a single chain to, e.g., encrypt/decrypt and JSON encode/decode a message. Such an example is here:\n```python\nclass CompoundDeSes(kafka.serializer.Serializer,kafka.serializer.Deserializer):\n def __init__(self, *args):\n self._ser = list(args)\n def serialize(self, topic, keyvalue):\n for ser in self._ser:\n if (isinstance(ser, (kafka.serializer.Serializer,)):\n keyvalue = ser.serialize(topic,keyvalue)\n else:\n keyvalue = ser(keyvalue)\n return keyvalue\n def deserialize(self, topic, keyvalue):\n for ser in self._ser:\n if (isinstance(ser, (kafka.serializer.Deserializer,)):\n keyvalue = ser.deserialize(topic,keyvalue)\n else:\n keyvalue = ser(keyvalue)\n return keyvalue\n\n...\n\n# Stacked (De)Serializers. Usually, you will encrypt last on serialization, and decrypt\n# first on deserialization. Do not forget that exceptions are likely as undecryptable\n# messages appear.\nproducer = KafkaProducer(...,key_serializer=CompoundDeSes(json.dumps,kc.getKeySerializer()), value_serializer=CompoundDeSes(json.dumps,kc.getValueSerializer()))\nconsumer = KafkaConsumer(...,key_deserializer=CompoundDeSes(kc.getKeyDeserializer(),json.loads), value_deserializer=CompoundDeSes(kc.getValueDeserializer(),json.loads))\n```\n\n## Troubleshooting\nIf something is not working, enable logging to get detailed information:\n```python\nimport logging\n\nlogging.basicConfig(level=logging.WARNING)\nlogging.getLogger(\"kafkacrypto\").setLevel(level=logging.INFO) # set to logging.DEBUG for more verbosity\n```\n\n## Universal Configuration File\nkafkacrypto separates the storage of cryptographic secrets and non-secret configuration information:\n 1. `my-node-ID.config`: Non-secret parameters, in Python ConfigParser format.\n 1. `my-node-ID.seed`: Next ratchet seed, when using default implementation of Ratchet. Key secret, should never be saved or transmitted plaintext.\n 1. `my-node-ID.crypto`: Identification private key, when using default implementation of Cryptokey. Key secret, should never be saved or transmitted plaintext.\n\nAlternative implementations of Ratchet and Cryptokey enable secrets to be managed by specialized hardware (e.g. HSMs).\n\nIt is also possible to use `my-node-ID.config` to manage all configuration directives, including those that control Kafka. A sample implementation, which reads the node ID from `node_id` in the `DEFAULT` section is:\n```python\n#!/usr/bin/python3\nfrom sys import argv\nfrom kafkacrypto import KafkaCrypto, KafkaCryptoStore, KafkaConsumer, KafkaProducer\n\n# Process configuration file\nif len(argv) != 2:\n exit('Invalid command line.')\nkcs = KafkaCryptoStore(argv[1])\n\n# Setup KafkaCrypto\nkcc = KafkaConsumer(**kcs.get_kafka_config('consumer',extra='crypto'))\nkcp = KafkaProducer(**kcs.get_kafka_config('producer',extra='crypto'))\nkc = KafkaCrypto(None,kcp,kcc,config=kcs)\n\n# read program specific values\nvalue1 = kcs.load_value('value1')\nvalue2 = kcs.load_value('value2')\n\n## End read configuration\n\n# Setup Kafka Consumer and Producer\nkafka_config = kcs.get_kafka_config('consumer')\nkafka_config['key_deserializer'] = kc.getKeyDeserializer()\nkafka_config['value_deserializer'] = kc.getValueDeserializer()\nconsumer = KafkaConsumer(**kafka_config)\nkafka_config = kcs.get_kafka_config('producer')\nkafka_config['key_serializer'] = kc.getKeySerializer()\nkafka_config['value_serializer'] = kc.getValueSerializer()\nproducer = KafkaProducer(**kafka_config)\n\n\n... your code here ...\n\n# Save new values\nkcs.store_value('value1', 'value-of-value1')\nkcs.store_value('value2', 'value-of-value2')\n```\n\n## Kafka Python Interfaces\nkafkacrypto has been extensively tested with kafka-python. It will use confluent_kafka if available via a thin compatibility wrapper. Other wrappers can be added (submit a pull request!)\n\n## Post Quantum Secure Cryptography\n\n### Prerequisites\n\nUse requires installing [liboqs-python](https://github.com/open-quantum-safe/liboqs-python) with the sntrup761, ML-KEM-1024, and SPINCS+-SHAKE-128f-simple algorithms enabled (the default).\n\nThis in turn bumps the minimum python version required to 3.7.\n\nFor raspberry pis and other devices not officially supported by liqoqs, the following may help:\n```\nsudo apt-get install cmake ninja-build git\nsudo pip3 install pytest pytest-xdist pyyaml\nmkdir oqs\ncd oqs\ngit clone --depth 1 --branch main https://github.com/open-quantum-safe/liboqs\ngit clone --depth 1 --branch main https://github.com/open-quantum-safe/liboqs-python.git\ncd liboqs\nmkdir build\ncd build\n# newer versions of liboqs disable SIKE/SIDH always and do not require turning off on this command line\ncmake -G\"Ninja\" .. -DOQS_DIST_BUILD=ON -DBUILD_SHARED_LIBS=ON -DOQS_PERMIT_UNSUPPORTED_ARCHITECTURE=ON -DOQS_USE_OPENSSL=OFF -DOQS_ENABLE_KEM_SIKE=OFF -DOQS_ENABLE_KEM_SIDH=OFF\nninja\n# tests will take about 8 hours on a raspberry pi zero (v1)\nninja run_tests\nsudo ninja install\ncd ../../liboqs-python\nsudo pip3 install .\n# the below may or may not be needed, depending on raspi os version\nsudo ldconfig\n```\n\n### Key Exchange\n\nStarting with version v0.9.10.0, kafkacrypto supports key exchange using Curve25519+sntrup761, a hybrid classical-pq key exchange algorithm. This mirrors support for the same hybrid added in OpenSSH 8.5.\n\nStarting with version v0.9.11.0, kafkacrypto supports key exchange using Curve25519+ML-KEM-1024, a hybrid classical-pq key exchange algorithm, including the FIPS-standardized ML-KEM.\n\nThe script `enable-pq-exchange.py` assists in enabing pq key exchange. It must be enabled on both consumers and producers. Optionally, it can be used to select only a pq hybrid algorithm (see code documentation).\n\n### Signing Keys\n\nStarting with version v0.9.11.0, kafkacrypto supports key signing using Ed25519+SLH-DSA-SHAKE-128f, a hybrid classical-pq signing algorithm, including the FIPS-standardized SLH-DSA.\n\nTo enable pq signing, simply select a pq signing key when provisioning. Note that provisioning can be run multiple times for a single node to create keys of multiple types. Adding the line `keytypes : 4` just under the cryptokey line in `my-node-ID.config` can be used to enable only hybrid pq signing.\n\nNote that to use password-based deterministic key provisioners, you also need to install [pyspx-slhdsa](https://github.com/tmcqueen-materials/pyspx-slhdsa). We hope to remove this dependency once liboqs-python exposes seed-based key generation.\n\n## Advanced Usage\nkafkacrypto has been designed to seamlessly support a range of key exchange authorization and delegation mechanisms beyond the simple single-password root of trust. An example of a simple \"controller-based\" intermediary is included in the main package. The requisite controller can be setup as:\n```python\n#!/usr/bin/python3\nfrom kafkacrypto import KafkaCryptoController, KafkaConsumer, KafkaProducer\n\nnodeId = 'controller-name'\n\n# use your normal server parameters in place of the ...\nkcc = KafkaConsumer(..., enable_auto_commit=False, group_id=nodeId)\nkcp = KafkaProducer(...)\ncontroller = KafkaCryptoController(nodeId,kcp,kcc)\ncontroller._mgmt_thread.join()\n```\nThe configuration parameters inside the provision script should be adjusted so that the \"subscribe\" and \"key request\" suffixes are distinct (see comment in `simple-provision.py`, or use `provision.py` instead). If automatic topic creation is disabled, then the topic `root.subs` must also be created. It is safe to enable regular log compaction on this topic.\n\nAnother common desire is to use very short chain lifetimes. Chains can be refreshed automatically and pushed to users. The requisite ChainServer can be setup as:\n```python\n#!/usr/bin/python3\nfrom kafkacrypto import KafkaCryptoChainServer\n\nnodeId = 'chain-server-name'\n\nchainserver = KafkaCryptoChainServer(nodeId)\nchainserver._mgmt_thread.join()\n```\nThe sample provision script can appropriately setup keys for the ChainServer as well.\n\n## Design, Specification, and Security Analysis\nkafkacrypto is already in limited production use, and should be stable enough for broad adoption. However, a detailed security analysis of the kafkacrypto framework is still in progress, and use of this code should be considered experimental.\n\n## Version Compatibility\nkafkacrypto is compatible with all versions of Python 3.3+, and can utilize both [kafka-python](https://github.com/dpkp/kafka-python) and [confluent-kafka](https://github.com/confluentinc/confluent-kafka-python) backends. It will automatically use one of these if already installed (preferring the higher performance confluent-kafka one).\n\nFor Python 3.12, kafka-python must be 2.0.3 or later; this dependency has not been released on PyPi, So you either need to use confluent-kafka, or install kafka-python directly from their github.\n\n\n\n",
"bugtrack_url": null,
"license": "GNU GPLv2",
"summary": "Message layer security/crypto for Kafka",
"version": "0.9.11.0",
"project_urls": {
"Homepage": "https://github.com/tmcqueen-materials/kafkacrypto"
},
"split_keywords": [
"kafka",
"kafka-crypto",
"kafka-security",
"security",
"crypo"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "7e29c6643dd089e3c516e34abd965531dbfe1db5eba7ac60fc1dc1fc983ca350",
"md5": "1ee06d6aa4929ea8b20b46ae661e75f0",
"sha256": "e3e0852994fa70f72ad4764eb679515d4128b847d4f5a8570ffc78fe8a6c613c"
},
"downloads": -1,
"filename": "kafkacrypto-0.9.11.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "1ee06d6aa4929ea8b20b46ae661e75f0",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.3",
"size": 68051,
"upload_time": "2024-11-23T16:05:46",
"upload_time_iso_8601": "2024-11-23T16:05:46.787915Z",
"url": "https://files.pythonhosted.org/packages/7e/29/c6643dd089e3c516e34abd965531dbfe1db5eba7ac60fc1dc1fc983ca350/kafkacrypto-0.9.11.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "75afc8b88dbc7593c54bf544d422036b68fb3fa0c200933268b81c40a17f5a73",
"md5": "f337450bc9e7503d13299e65611da251",
"sha256": "20c63cc3ac24b55abafa2f9b6f98ba6a1b91ca058433ac696560e51df36c76df"
},
"downloads": -1,
"filename": "kafkacrypto-0.9.11.0.tar.gz",
"has_sig": false,
"md5_digest": "f337450bc9e7503d13299e65611da251",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.3",
"size": 60230,
"upload_time": "2024-11-23T16:05:48",
"upload_time_iso_8601": "2024-11-23T16:05:48.493299Z",
"url": "https://files.pythonhosted.org/packages/75/af/c8b88dbc7593c54bf544d422036b68fb3fa0c200933268b81c40a17f5a73/kafkacrypto-0.9.11.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-11-23 16:05:48",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "tmcqueen-materials",
"github_project": "kafkacrypto",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "kafkacrypto"
}