Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Listener Improvements & Incorporate ITs for Listener Tx flows. #142

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,22 @@
<artifactId>micrometer-registry-prometheus</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<!-- this section is for auto version increments -->
<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import hlf.java.rest.client.exception.FabricTransactionException;
import hlf.java.rest.client.exception.RetryableServiceException;
import hlf.java.rest.client.exception.ServiceException;
import hlf.java.rest.client.exception.UnrecognizedTransactionPayloadException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -33,6 +35,9 @@ public class KafkaConsumerErrorHandler {
private static final List<Class<? extends Exception>> connectorRetryableExceptions =
Arrays.asList(RetryableServiceException.class, FabricTransactionException.class);

private static final List<Class<? extends Exception>> connectorNonRetryableExceptions =
Arrays.asList(UnrecognizedTransactionPayloadException.class, ServiceException.class);

@Autowired private KafkaProperties kafkaProperties;

@Autowired private KafkaProducerConfig kafkaProducerConfig;
Expand Down Expand Up @@ -88,6 +93,10 @@ public void accept(
defaultErrorHandler.addRetryableExceptions(retryableExceptionClass);
}

for (Class<? extends Exception> nonRetryableExceptionClass : connectorNonRetryableExceptions) {
defaultErrorHandler.addNotRetryableExceptions(nonRetryableExceptionClass);
}

return defaultErrorHandler;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -36,9 +37,9 @@
@ConditionalOnProperty("kafka.integration-points[0].brokerHost")
public class DynamicKafkaListener {

private static final int MAX_CONCURRENT_LISTENERS_PER_CONSUMER = 6;
private static final int MAX_CONCURRENT_LISTENERS_PER_CONSUMER = 12;

private List<ConcurrentMessageListenerContainer> existingContainers = new ArrayList<>();
@Getter private List<ConcurrentMessageListenerContainer> existingContainers = new ArrayList<>();

@Autowired private KafkaProperties kafkaProperties;

Expand Down Expand Up @@ -92,7 +93,7 @@ public void generateAndStartConsumerGroup(KafkaProperties.Consumer consumer) {

int consumerListenerConcurrency = 1; // Kafka default if no concurrency is set.

if (consumer.isEnableParallelListenerCapabilities() && consumer.getTopicPartitions() > 1) {
if (consumer.getTopicPartitions() > 1) {
consumerListenerConcurrency =
Math.min(consumer.getTopicPartitions(), MAX_CONCURRENT_LISTENERS_PER_CONSUMER);
}
Expand All @@ -102,6 +103,7 @@ public void generateAndStartConsumerGroup(KafkaProperties.Consumer consumer) {

container.start();
existingContainers.add(container);

log.debug(
"Created kafka message listener container"
+ container.metrics().keySet().iterator().next());
Expand All @@ -117,9 +119,9 @@ private Object determineMessageListenerForTransactions(KafkaProperties.Consumer
/**
* A Message listener, where each Consumer container would get the list of Records fetched as part
* of poll() to process. The records are then supplied to an Async Task pool so that multiple
* individual Records can be processed in Parallel aynchronously. In case if one of the
* individual Records can be processed in Parallel asynchronously. In case if one of the
* tasks/record fails with an Exception, we perform a partial Batch commit, in which the next
* poll() from the server would contain the non committed records of the previous Batch to
* poll() from the server would contain the non-committed records of the previous Batch to
* process.
*
* @return
Expand Down Expand Up @@ -173,13 +175,11 @@ public void onMessage(

private Object getPerRecordAcknowledgingListener() {

return new AcknowledgingMessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> message, Acknowledgment acknowledgment) {
transactionConsumer.listen(message);
// Manually ack the single Record
acknowledgment.acknowledge();
}
};
return (AcknowledgingMessageListener<String, String>)
(message, acknowledgment) -> {
transactionConsumer.listen(message);
// Manually ack the single Record
acknowledgment.acknowledge();
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import hlf.java.rest.client.exception.ErrorCode;
import hlf.java.rest.client.exception.FabricTransactionException;
import hlf.java.rest.client.exception.ServiceException;
import hlf.java.rest.client.exception.UnrecognizedTransactionPayloadException;
import hlf.java.rest.client.metrics.EmitCustomTransactionListenerMetrics;
import hlf.java.rest.client.model.MultiDataTransactionPayload;
Expand Down Expand Up @@ -169,16 +167,13 @@ public void listen(ConsumerRecord<String, String> message) {
"Inbound transaction format is incorrect or doesn't contain valid parameters.");
}

} catch (FabricTransactionException fte) {
log.error("Error in Submitting Transaction - Exception - " + fte.getMessage());
} catch (Exception exception) {
log.error("Error in Submitting Transaction - Exception - " + exception.getMessage());
/*
If the error handler has dead letter publish enabled, the errored Record header will be enriched by extracting
the error cause and message from the thrown exception.
*/
throw fte;
} catch (Exception ex) {
log.error("Error in Kafka Listener - Message Format exception - " + ex.getMessage());
throw new ServiceException(ErrorCode.HYPERLEDGER_FABRIC_TRANSACTION_ERROR, ex.getMessage());
throw exception;
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
<pattern>%d{yyyy-MM-dd | HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

Expand Down
145 changes: 145 additions & 0 deletions src/test/java/hlf/java/rest/client/IT/KafkaBaseIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package hlf.java.rest.client.IT;

import hlf.java.rest.client.util.FabricClientConstants;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.utils.KafkaTestUtils;

public abstract class KafkaBaseIT {

private static final String IN_MEMORY_BROKER_ADDRESS = "PLAINTEXT://localhost:9092";
protected static final String INBOUND_TOPIC_NAME = "test-consumer-inbound-topic";
protected static final String OUTBOUND_TOPIC_NAME = "test-publisher-event-topic";

protected static final String OUTBOUND_DLT_NAME = "test-consumer-dlt";

protected static final String DEFAULT_CHANNEL_NAME = "test-channel";
protected static final String DEFAULT_CONTRACT_NAME = "test-contract";
protected static final String DEFAULT_FUNCTION_NAME = "test-function";

protected static final String DEFAULT_TRANSACTION_BODY = "stringified-transaction-message";

private static EmbeddedKafkaBroker embeddedKafkaBroker;
protected static Producer<Object, Object> testProducer;

protected static Consumer<Object, Object> testDltConsumer;

@BeforeAll
public static void setUpClass() {
startEmbeddedKafkaBroker();
}

@AfterAll
public static void tearDownClass() {
if (embeddedKafkaBroker != null) {
embeddedKafkaBroker.destroy();
embeddedKafkaBroker = null;
}
}

private static void startEmbeddedKafkaBroker() {
if (embeddedKafkaBroker == null) {
embeddedKafkaBroker =
new EmbeddedKafkaBroker(1, false, getDefaultPartitionSize(), getTopicsToBootstrap())
.brokerProperties(getBrokerProperties())
.kafkaPorts(9092);
embeddedKafkaBroker.afterPropertiesSet();

testProducer = configureProducer();
testDltConsumer = configureDltConsumer();
}
}

private static Producer<Object, Object> configureProducer() {
Map<String, Object> producerProps =
new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
return new DefaultKafkaProducerFactory<>(producerProps).createProducer();
}

private static Consumer<Object, Object> configureDltConsumer() {
Map<String, Object> consumerProps =
new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "dlt_group");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer<Object, Object> dltConsumer =
new DefaultKafkaConsumerFactory<>(consumerProps).createConsumer();

dltConsumer.subscribe(Collections.singleton(OUTBOUND_DLT_NAME));

return dltConsumer;
}

private static Map<String, String> getBrokerProperties() {
Map<String, String> brokerProperties = new HashMap<>();
brokerProperties.put("listeners", IN_MEMORY_BROKER_ADDRESS);
brokerProperties.put("port", "9092");
return brokerProperties;
}

protected static String[] getTopicsToBootstrap() {
return new String[] {INBOUND_TOPIC_NAME, OUTBOUND_TOPIC_NAME, OUTBOUND_DLT_NAME};
}

protected static int getDefaultPartitionSize() {
return 12;
}

protected static String getBrokerAddress() {
return IN_MEMORY_BROKER_ADDRESS;
}

protected void publishValidTransactionToInboundTopic(
String channelName, String contractName, String functionName) {

ProducerRecord<Object, Object> producerRecord =
new ProducerRecord<Object, Object>(INBOUND_TOPIC_NAME, "stringified-transaction-message");

producerRecord.headers().add(getHeader(FabricClientConstants.CHANNEL_NAME, channelName));
producerRecord.headers().add(getHeader(FabricClientConstants.CHAINCODE_NAME, contractName));
producerRecord.headers().add(getHeader(FabricClientConstants.FUNCTION_NAME, functionName));

testProducer.send(producerRecord);
}

private Header getHeader(String headerName, String headerValue) {
return new Header() {
@Override
public String key() {
return headerName;
}

@Override
public byte[] value() {
return headerValue.getBytes(StandardCharsets.UTF_8);
}
};
}

protected long getCurrentCommittedMessageCountForInboundTopic(String groupId) throws Exception {

long currentOffset = 0;

for (int i = 0; i < getDefaultPartitionSize(); i++) {
currentOffset +=
KafkaTestUtils.getCurrentOffset(getBrokerAddress(), groupId, INBOUND_TOPIC_NAME, i)
.offset();
}

return currentOffset;
}
}
Loading
Loading