Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose Kafka Consumer & Producer metrics to Metrics Endpoint. Introduce custom error counters for Invalid Txn Payload & Processing Failures #137

Merged
merged 6 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package hlf.java.rest.client.config;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnProperty("kafka.integration-points[0].brokerHost")
public class CustomTxnListenerMetricsConfig {

@Bean
public Counter customKafkaSuccessCounter(MeterRegistry meterRegistry) {
return meterRegistry.counter("kafka.messages.processed.messages");
}

@Bean
public Counter invalidInboundTransactionMessageCounter(MeterRegistry meterRegistry) {
return meterRegistry.counter("transaction.messages.unrecognized.failures");
}

@Bean
public Counter inboundTxnProcessingFailureCounter(MeterRegistry meterRegistry) {
return meterRegistry.counter("transaction.messages.process.failures");
}

@Bean
public Counter inboundTxnContractExceptionCounter(MeterRegistry meterRegistry) {
return meterRegistry.counter("transaction.messages.contract.failures");
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package hlf.java.rest.client.config;

import hlf.java.rest.client.util.FabricClientConstants;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -9,10 +12,12 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.MicrometerConsumerListener;

/*
* This class is the configuration class for setting the properties for the kafka consumers.
Expand All @@ -24,6 +29,8 @@
@RefreshScope
public class KafkaConsumerConfig {

@Autowired private MeterRegistry meterRegistry;

public DefaultKafkaConsumerFactory<String, String> consumerFactory(
KafkaProperties.Consumer kafkaConsumerProperties) {
Map<String, Object> props = new HashMap<>();
Expand Down Expand Up @@ -60,7 +67,7 @@ public DefaultKafkaConsumerFactory<String, String> consumerFactory(
// Adding SSL configuration if Kafka Cluster is SSL secured
if (kafkaConsumerProperties.isSslAuthRequired()) {

SSLAuthFilesCreationHelper.createSSLAuthFiles(kafkaConsumerProperties);
SSLAuthFilesHelper.createSSLAuthFiles(kafkaConsumerProperties);

props.put(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Expand All @@ -78,9 +85,41 @@ public DefaultKafkaConsumerFactory<String, String> consumerFactory(
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
kafkaConsumerProperties.getSslTruststorePassword());
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaConsumerProperties.getSslKeyPassword());

try {
Timestamp keyStoreCertExpiryTimestamp =
SSLAuthFilesHelper.getExpiryTimestampForKeyStore(
kafkaConsumerProperties.getSslKeystoreLocation(),
kafkaConsumerProperties.getSslKeystorePassword());
Timestamp trustStoreCertExpiryTimestamp =
SSLAuthFilesHelper.getExpiryTimestampForKeyStore(
kafkaConsumerProperties.getSslTruststoreLocation(),
kafkaConsumerProperties.getSslTruststorePassword());

Gauge.builder(
"consumer." + kafkaConsumerProperties.getTopic() + ".keystore.expiryTs",
keyStoreCertExpiryTimestamp::getTime)
.strongReference(true)
.register(meterRegistry);

Gauge.builder(
"consumer." + kafkaConsumerProperties.getTopic() + ".truststore.expiryTs",
trustStoreCertExpiryTimestamp::getTime)
.strongReference(true)
.register(meterRegistry);

} catch (Exception e) {
log.error(
"Failed to extract expiry details of Consumer SSL Certs. Metrics for Consumer SSL cert-expiry will not be available.");
}
}

log.info("Created kafka consumer factory");
return new DefaultKafkaConsumerFactory<>(props);
log.info("Generating Kafka consumer factory..");

DefaultKafkaConsumerFactory<String, String> defaultKafkaConsumerFactory =
new DefaultKafkaConsumerFactory<>(props);
defaultKafkaConsumerFactory.addListener(new MicrometerConsumerListener<>(meterRegistry));

return defaultKafkaConsumerFactory;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package hlf.java.rest.client.config;

import hlf.java.rest.client.util.FabricClientConstants;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -15,6 +18,7 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.MicrometerProducerListener;
import org.springframework.kafka.core.ProducerFactory;

/** This class is the configuration class for sending to Chaincode event to eventHub/Kafka Topic. */
Expand All @@ -26,6 +30,8 @@ public class KafkaProducerConfig {

@Autowired private KafkaProperties kafkaProperties;

@Autowired private MeterRegistry meterRegistry;

public ProducerFactory<String, String> eventProducerFactory(
KafkaProperties.Producer kafkaProducerProperties) {
Map<String, Object> props = new HashMap<>();
Expand Down Expand Up @@ -53,7 +59,7 @@ public ProducerFactory<String, String> eventProducerFactory(
// Adding SSL configuration if Kafka Cluster is SSL secured
if (kafkaProducerProperties.isSslAuthRequired()) {

SSLAuthFilesCreationHelper.createSSLAuthFiles(kafkaProducerProperties);
SSLAuthFilesHelper.createSSLAuthFiles(kafkaProducerProperties);

props.put(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Expand All @@ -71,10 +77,42 @@ public ProducerFactory<String, String> eventProducerFactory(
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
kafkaProducerProperties.getSslTruststorePassword());
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaProducerProperties.getSslKeyPassword());

try {
Timestamp keyStoreCertExpiryTimestamp =
SSLAuthFilesHelper.getExpiryTimestampForKeyStore(
kafkaProducerProperties.getSslKeystoreLocation(),
kafkaProducerProperties.getSslKeystorePassword());
Timestamp trustStoreCertExpiryTimestamp =
SSLAuthFilesHelper.getExpiryTimestampForKeyStore(
kafkaProducerProperties.getSslTruststoreLocation(),
kafkaProducerProperties.getSslTruststorePassword());

Gauge.builder(
"producer." + kafkaProducerProperties.getTopic() + ".keystore.expiryTs",
keyStoreCertExpiryTimestamp::getTime)
.strongReference(true)
.register(meterRegistry);

Gauge.builder(
"producer." + kafkaProducerProperties.getTopic() + ".truststore.expiryTs",
trustStoreCertExpiryTimestamp::getTime)
.strongReference(true)
.register(meterRegistry);

} catch (Exception e) {
log.error(
"Failed to extract expiry details of Producer SSL Certs. Metrics for Producer SSL cert-expiry will not be available.");
}
}

log.info("Created kafka producer factory");
return new DefaultKafkaProducerFactory<>(props);
log.info("Generating Kafka producer factory..");

DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory =
new DefaultKafkaProducerFactory<>(props);
defaultKafkaProducerFactory.addListener(new MicrometerProducerListener<>(meterRegistry));

return defaultKafkaProducerFactory;
}

@Bean
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package hlf.java.rest.client.config;

import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.sql.Timestamp;
import java.util.Base64;
import java.util.Enumeration;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@UtilityClass
public class SSLAuthFilesHelper {

void createSSLAuthFiles(KafkaProperties.SSLProperties kafkaSSLProperties) {

log.info("Creating Kafka ssl keystore file");
createSSLFileFromBase64(
kafkaSSLProperties.getSslKeystoreBase64(), kafkaSSLProperties.getSslKeystoreLocation());

log.info("Keystore file created at {}", kafkaSSLProperties.getSslKeystoreLocation());

log.info("Creating Kafka ssl truststore file");
createSSLFileFromBase64(
kafkaSSLProperties.getSslTruststoreBase64(), kafkaSSLProperties.getSslTruststoreLocation());

log.info("TrustStore file created at {}", kafkaSSLProperties.getSslTruststoreLocation());
}

private static void createSSLFileFromBase64(
String sslKeystoreBase64, String sslKeystoreLocation) {
log.info("Creating file at: {}", sslKeystoreLocation);
try {
final Path path = Paths.get(sslKeystoreLocation);
final Path parentPath = path.getParent();
if (!Files.isDirectory(parentPath)) {
log.info("Creating directory: {}", parentPath);
Files.createDirectory(parentPath);
}
Files.write(path, Base64.getDecoder().decode(sslKeystoreBase64));
} catch (IOException e) {
log.error("Failed to create the ssl auth file at location: {}", sslKeystoreLocation, e);
}
}

Timestamp getExpiryTimestampForKeyStore(String keyStorePath, String keyStorePassword)
throws CertificateException, KeyStoreException, IOException, NoSuchAlgorithmException {

KeyStore keyStore = loadKeyStore(keyStorePath, keyStorePassword);

Enumeration<String> aliases = keyStore.aliases();
while (aliases.hasMoreElements()) {
String alias = aliases.nextElement();
Certificate cert = keyStore.getCertificate(alias);
if (cert instanceof X509Certificate) {
X509Certificate x509Cert = (X509Certificate) cert;
return new Timestamp(x509Cert.getNotAfter().getTime());
}
}

throw new CertificateException(
"Couldn't extract an instance of X509Certificate for fetching expiry details");
}

private static KeyStore loadKeyStore(String path, String password)
throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException {
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
try (FileInputStream fis = new FileInputStream(path)) {
keyStore.load(fis, password.toCharArray());
}
return keyStore;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package hlf.java.rest.client.exception;

public class UnrecognizedTransactionPayloadException extends BaseException {

private static final long serialVersionUID = 6585724920013541503L;

public UnrecognizedTransactionPayloadException(ErrorCode code, String message) {
super(code, message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import hlf.java.rest.client.exception.ErrorCode;
import hlf.java.rest.client.exception.FabricTransactionException;
import hlf.java.rest.client.exception.ServiceException;
import hlf.java.rest.client.metrics.EmitKafkaCustomMetrics;
import hlf.java.rest.client.exception.UnrecognizedTransactionPayloadException;
import hlf.java.rest.client.metrics.EmitCustomTransactionListenerMetrics;
import hlf.java.rest.client.model.MultiDataTransactionPayload;
import hlf.java.rest.client.service.TransactionFulfillment;
import hlf.java.rest.client.util.FabricClientConstants;
Expand Down Expand Up @@ -38,7 +39,7 @@ public class TransactionConsumer {
*
* @param message ConsumerRecord payload from upstream system
*/
@EmitKafkaCustomMetrics
@EmitCustomTransactionListenerMetrics
public void listen(ConsumerRecord<String, String> message) {
log.info(
"Incoming Message details : Topic : "
Expand Down Expand Up @@ -108,7 +109,7 @@ public void listen(ConsumerRecord<String, String> message) {
multiDataTransactionPayload =
objectMapper.readValue(transactionParams, MultiDataTransactionPayload.class);
} catch (Exception e) {
throw new ServiceException(
throw new UnrecognizedTransactionPayloadException(
ErrorCode.VALIDATION_FAILED, "Invalid transaction payload provided");
}

Expand Down Expand Up @@ -163,7 +164,7 @@ public void listen(ConsumerRecord<String, String> message) {

} else {
log.error("Incorrect Transaction Payload");
throw new ServiceException(
throw new UnrecognizedTransactionPayloadException(
ErrorCode.VALIDATION_FAILED,
"Inbound transaction format is incorrect or doesn't contain valid parameters.");
}
Expand Down
Loading
Loading