Skip to content

Commit

Permalink
Support multiple outbound topics for publishing blockchain events
Browse files Browse the repository at this point in the history
Signed-off-by: Abhay Kishore <[email protected]>
  • Loading branch information
abhakish committed Oct 21, 2024
1 parent 41078ec commit 7eb24ff
Show file tree
Hide file tree
Showing 13 changed files with 209 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,21 @@ public static class Events {
// preferred for providing Chaincode details for Event subscription
private List<String> chaincode;
private boolean standardCCEventEnabled;
private List<String> block;
private List<BlockDetails> blockDetails;
private List<ChaincodeDetails> chaincodeDetails;
}

@Data
public static class BlockDetails {
private String channelName;
private List<String> listenerTopics;
}

@Data
public static class ChaincodeDetails {
private String channelName;
private String chaincodeId;
private List<String> listenerTopics;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.Consumer;
Expand All @@ -22,6 +23,7 @@
import org.springframework.kafka.listener.ConsumerAwareRecordRecoverer;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.CollectionUtils;
import org.springframework.util.backoff.FixedBackOff;

@Configuration
Expand Down Expand Up @@ -59,11 +61,16 @@ public CommonErrorHandler topicTransactionErrorHandler() {

deadLetterPublishingRecoverer =
generateRecordRecovererWithPublisher(kafkaProperties.getFailedMessageListener());
} else if (Objects.nonNull(kafkaProperties.getEventListener())
&& kafkaProperties.getEventListener().isListenToFailedMessages()) {

deadLetterPublishingRecoverer =
generateRecordRecovererWithPublisher(kafkaProperties.getEventListener());
} else if (!CollectionUtils.isEmpty(kafkaProperties.getEventListeners())) {

Optional<KafkaProperties.EventProducer> eventProducerOptional =
kafkaProperties.getEventListeners().stream()
.filter(KafkaProperties.EventProducer::isListenToFailedMessages)
.findAny();
if (eventProducerOptional.isPresent()) {
deadLetterPublishingRecoverer =
generateRecordRecovererWithPublisher(eventProducerOptional.get());
}
}

/*
Expand Down Expand Up @@ -103,7 +110,7 @@ public void accept(
private DeadLetterPublishingRecoverer generateRecordRecovererWithPublisher(
KafkaProperties.Producer destination) {

KafkaTemplate<String, String> deadLetterPublisherTemplate =
KafkaTemplate<Object, Object> deadLetterPublisherTemplate =
new KafkaTemplate<>(kafkaProducerConfig.eventProducerFactory(destination));
deadLetterPublisherTemplate.setDefaultTopic(destination.getTopic());

Expand Down
36 changes: 29 additions & 7 deletions src/main/java/hlf/java/rest/client/config/KafkaProducerConfig.java
Original file line number Diff line number Diff line change
@@ -1,35 +1,41 @@
package hlf.java.rest.client.config;

import hlf.java.rest.client.exception.ErrorCode;
import hlf.java.rest.client.exception.ServiceException;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.MicrometerProducerListener;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.RoutingKafkaTemplate;

/** This class is the configuration class for sending to Chaincode event to eventHub/Kafka Topic. */
@Slf4j
@Configuration
@ConditionalOnProperty("kafka.event-listener.brokerHost")
@ConditionalOnProperty("kafka.event-listeners[0].brokerHost")
@RefreshScope
public class KafkaProducerConfig extends BaseKafkaConfig {

private static final String PRODUCER_ALL_ACKS = "all";
private static final int RETRIES_CONFIG_FOR_AT_MOST_ONCE = 0;

@Autowired private KafkaProperties kafkaProperties;

@Autowired private MeterRegistry meterRegistry;

public ProducerFactory<String, String> eventProducerFactory(
public ProducerFactory<Object, Object> eventProducerFactory(
KafkaProperties.Producer kafkaProducerProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProducerProperties.getBrokerHost());
Expand Down Expand Up @@ -60,7 +66,7 @@ public ProducerFactory<String, String> eventProducerFactory(

log.info("Generating Kafka producer factory..");

DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory =
DefaultKafkaProducerFactory<Object, Object> defaultKafkaProducerFactory =
new DefaultKafkaProducerFactory<>(props);
defaultKafkaProducerFactory.addListener(new MicrometerProducerListener<>(meterRegistry));

Expand All @@ -69,8 +75,24 @@ public ProducerFactory<String, String> eventProducerFactory(

@Bean
@RefreshScope
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(eventProducerFactory(kafkaProperties.getEventListener()));
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context) {

Set<String> topicSet = new HashSet<>();
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
for (KafkaProperties.EventProducer eventProducer : kafkaProperties.getEventListeners()) {
if (topicSet.contains(eventProducer.getTopic())) {
throw new ServiceException(ErrorCode.NOT_SUPPORTED, "Topic name should be unique");
}
topicSet.add(eventProducer.getTopic());
ProducerFactory<Object, Object> defaultKafkaProducerFactory =
eventProducerFactory(eventProducer);
context.registerBean(
eventProducer.getTopic() + "PF",
ProducerFactory.class,
() -> defaultKafkaProducerFactory);
map.put(Pattern.compile(eventProducer.getTopic()), defaultKafkaProducerFactory);
}
return new RoutingKafkaTemplate(map);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
public class KafkaProperties {

private List<Consumer> integrationPoints;
private EventProducer eventListener;
private List<EventProducer> eventListeners;
private Producer failedMessageListener;

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ public void onRefresh(RefreshScopeRefreshedEvent event) {
startEventListener();
}

public void startEventListener() {
private void startEventListener() {

try {
List<String> blockChannelNames = fabricProperties.getEvents().getBlock();
if (!CollectionUtils.isEmpty(blockChannelNames)) {
List<FabricProperties.BlockDetails> blockDetailsList =
fabricProperties.getEvents().getBlockDetails();
if (!CollectionUtils.isEmpty(blockDetailsList)) {

for (String channelName : blockChannelNames) {
for (FabricProperties.BlockDetails blockDetails : blockDetailsList) {
String channelName = blockDetails.getChannelName();
log.info("channel names {}", channelName);
Network network = gateway.getNetwork(channelName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* The EventPublishService is a service class, which include the kafka template. It sends the
* Message to the Event Kafka message topic
*/
@ConditionalOnProperty("kafka.event-listener.brokerHost")
@ConditionalOnProperty("kafka.event-listeners[0].brokerHost")
public interface EventPublishService {

/**
Expand All @@ -25,9 +25,8 @@ boolean sendMessage(
* @param eventName String chaincode event-name
* @param channelName String Name of the channel where the event was generated.
* @param messageKey associated key for the payload.
* @return status boolean status of msg sent
*/
boolean publishChaincodeEvents(
void publishChaincodeEvents(
final String payload,
String chaincodeName,
String fabricTxId,
Expand All @@ -42,9 +41,8 @@ boolean publishChaincodeEvents(
* @param channelName String Name of the channel where the event was generated.
* @param functionName String Name of the function name.
* @param isPrivateDataPresent boolean flag to check if privateData present in payload
* @return status boolean status of msg sent
*/
boolean publishBlockEvents(
void publishBlockEvents(
final String payload,
String fabricTxId,
String channelName,
Expand Down
Loading

0 comments on commit 7eb24ff

Please sign in to comment.