- Build an EncryptionKeyProvider
- Full-Message-Encryption workflow
- Field-Level-Encryption workflow
- Caching
- Key Rotation
dependencies {
implementation "de.otto:kafka-messaging-e2ee:2.1.0"
}
class Example {
void example() {
VaultConnectionConfig vaultConnectionConfig = VaultConnectionConfig.builder()
// use token or appRole
.token("some token")
.build();
VaultEncryptionKeyProviderConfig vaultEncryptionKeyProviderConfig = SingleTopicVaultEncryptionKeyProviderConfig.builder()
.isEncryptedTopic(true)
.vaultConnectionConfig(vaultConnectionConfig)
.vaultPath("galapagos/local/galapagos_someTeam/someKafkaTopic")
.build();
EncryptionKeyProvider encryptionKeyProvider = new VaultEncryptionKeyProvider(
vaultEncryptionKeyProviderConfig);
}
}
Option 2: Use MultiTopicVaultEncryptionKeyProviderConfig when you consume from and/or publish to multiple kafka topics.
class Example {
void example() {
VaultConnectionConfig vaultConnectionConfig = VaultConnectionConfig.builder()
// use token or appRole
.token("some token")
.build();
VaultEncryptionKeyProviderConfig vaultEncryptionKeyProviderConfig = MultiTopicVaultEncryptionKeyProviderConfig.builder()
.vaultConnectionConfig(VaultConnectionConfig.builder()
// use token or appRole
.token("dev-token")
.build())
.configEntry(KafkaTopicConfigEntry.builder()
.isDefault(true)
.vaultPathTemplate("galapagos/local/galapagos_%TEAMNAME%/%TOPICNAME%")
.build())
.configEntry(KafkaTopicConfigEntry.builder()
.teamName("someTeam-One")
.kafkaTopicName("some-topic")
.build())
.configEntry(KafkaTopicConfigEntry.builder()
.teamName("someTeam-Two")
.kafkaTopicName("some-other-topic")
.encryptionKeyAttributeName("aes")
.build())
.build();
EncryptionKeyProvider encryptionKeyProvider = new VaultEncryptionKeyProvider(
vaultEncryptionKeyProviderConfig);
}
}
The main class you need is a EncryptionService and/or DecryptionService.
class Example {
void example() {
EncryptionService encryptionService = new EncryptionService(encryptionKeyProvider);
DecryptionService decryptionService = new DecryptionService(encryptionKeyProvider);
}
}
class Example {
void example() {
String kafkaTopicName = "some-topic";
String plainTextPayload = "Hello World!";
AesEncryptedPayload aesEncryptedPayload = encryptionService.encryptPayloadWithAes("some-topic",
plainTextPayload);
// now you have to store the metadata next to the encrypted payload itself
System.out.println("Encrypted Payload: " + Arrays.toString(aesEncryptedPayload.encryptedPayload()));
System.out.println("Key Version: " + aesEncryptedPayload.keyVersion());
System.out.println("Initialization Vector: " + aesEncryptedPayload.initializationVectorBase64());
// in order to get the Kafka Headers use
Map<String, byte[]> kafkaHeaders = null;
if (aesEncryptedPayload.isEncrypted()) {
kafkaHeaders = KafkaEncryptionHelper.mapToKafkaHeadersForValue(aesEncryptedPayload);
System.out.println("Kafka Headers: " + kafkaHeaders);
}
}
}
class Example {
void example() {
// metadata attributes for Kafka Headers (values can be String or byte[])
String kafkaTopicName = "some-topic";
Map<String, Object> kafkaHeaders = new HashMap<>();
kafkaHeaders.put(KafkaEncryptionHelper.KAFKA_HEADER_IV_VALUE, "2rW2tDnRdwRg87Ta");
kafkaHeaders.put(KafkaEncryptionHelper.KAFKA_HEADER_CIPHER_VALUE, "[{\"encryption_key\":{\"cipherVersionString\":null,\"cipherVersion\":3,\"cipherName\":\"encryption_key\"}}]");
kafkaHeaders.put(KafkaEncryptionHelper.KAFKA_CE_HEADER_IV_VALUE, "2rW2tDnRdwRg87Ta");
kafkaHeaders.put(KafkaEncryptionHelper.KAFKA_CE_HEADER_CIPHER_VERSION_VALUE, "3");
kafkaHeaders.put(KafkaEncryptionHelper.KAFKA_CE_HEADER_CIPHER_NAME_VALUE, "encryption_key");
// the encrypted payload
byte[] encryptedPayloadByteArray = Base64.getDecoder().decode("6ttHpHYw7eYQ1OnvrhZAFi0PPsUGl9NR18hXFQ==");
// perform decryption
AesEncryptedPayload encryptedPayload = KafkaEncryptionHelper.aesEncryptedPayloadOfKafkaForValue(
encryptedPayloadByteArray, kafkaHeaders);
String plainText = decryptionService.decryptToString(kafkaTopicName, encryptedPayload);
// print result
System.out.println("Plain Text Payload: " + plainText);
}
}
class Example {
void example() {
// meta data attributes for Kafka Headers
String kafkaTopicName = "some-topic";
byte[] kafkaHeaderInitializationVector = kafkaHeaders.get(KafkaEncryptionHelper.KAFKA_HEADER_IV_VALUE);
byte[] kafkaHeaderCiphersText = kafkaHeaders.get(KafkaEncryptionHelper.KAFKA_HEADER_CIPHER_VALUE);
byte[] kafkaCeHeaderInitializationVector = kafkaHeaders.get(KafkaEncryptionHelper.KAFKA_CE_HEADER_IV_VALUE);
byte[] kafkaCeHeaderCipherVersion = kafkaHeaders.get(KafkaEncryptionHelper.KAFKA_CE_HEADER_CIPHER_VERSION_VALUE);
byte[] kafkaCeHeaderCipherName = kafkaHeaders.get(KafkaEncryptionHelper.KAFKA_CE_HEADER_CIPHER_NAME_VALUE);
// the encrypted payload
byte[] encryptedPayloadByteArray = Base64.getDecoder().decode("6ttHpHYw7eYQ1OnvrhZAFi0PPsUGl9NR18hXFQ==");
// perform decryption
AesEncryptedPayload encryptedPayload = KafkaEncryptionHelper.aesEncryptedPayloadOfKafka(
encryptedPayloadByteArray, kafkaHeaderInitializationVector, kafkaHeaderCiphersText,
kafkaCeHeaderInitializationVector, kafkaCeHeaderCipherVersion, kafkaCeHeaderCipherName
);
String plainText = decryptionService.decryptToString(kafkaTopicName, encryptedPayload);
// print result
System.out.println("Plain Text Payload: " + plainText);
}
}
Step 1: create a SingleTopicFieldLevelEncryptionService and/or SingleTopicFieldLevelDecryptionService
The main class you need is a SingleTopicFieldLevelEncryptionService and/or SingleTopicFieldLevelDecryptionService.
You can also use the classes FieldLevelEncryptionService and/or FieldLevelDecryptionService.
class Example {
void example() {
String kafkaTopicName = "some-topic";
SingleTopicFieldLevelEncryptionService fieldLevelEncryptionService = new SingleTopicFieldLevelEncryptionService(encryptionKeyProvider, kafkaTopicName);
SingleTopicFieldLevelDecryptionService fieldLevelDecryptionService = new SingleTopicFieldLevelDecryptionService(encryptionKeyProvider, kafkaTopicName);
}
}
class Example {
void example() {
String plainTextFieldValue = "Hello World!";
EncryptedString encryptedString = fieldLevelEncryptionService.encryptToEncryptedString(plainTextFieldValue);
// now you have to store the metadata next to the encrypted payload itself
System.out.println("Encrypted Payload: " + encryptedString.value());
}
}
class Example {
void example() {
// the encrypted payload (Format is: ""encAesV1."<key version>"."<Initialization Vector base 64 encoded>"."<encrypted payload base 64 encoded> )
EncryptedString encryptedString = EncryptedString.of("encAesV1.3.2rW2tDnRdwRg87Ta.6ttHpHYw7eYQ1OnvrhZAFi0PPsUGl9NR18hXFQ==");
// perform decryption
String plainText = fieldLevelDecryptionService.decrypt(encryptedString);
// print result
System.out.println("Plain Text Payload: " + plainText);
}
}
The retrieved encryption keys from the vault will be cached for 1 hour. Its recommended using a 2nd-Level-Cache to avoid outages when the vault in not reachable. So in case of a downtime of the central vault the operation of your service is not discontinued.
See CACHING.md.
In order to rotate the encryption key (for the example vault) see ROTATE-SHARED-ENCRYPTION-KEY.md.