From 1e769348bd3d40c651f4a445ed40c12f7791d371 Mon Sep 17 00:00:00 2001 From: Vishwa Date: Tue, 30 Nov 2021 12:18:57 +0530 Subject: [PATCH] MOSIP-18534 : Bus out halt config and conditions added to allow stop sending messages to few addressess from all the stages --- .../core/abstractverticle/MosipVerticleManager.java | 13 +++++++++++++ .../abstractverticle/ConsumerVerticle.java | 2 ++ 2 files changed, 15 insertions(+) diff --git a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/abstractverticle/MosipVerticleManager.java b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/abstractverticle/MosipVerticleManager.java index 55fa4ac04bd..8362451528e 100644 --- a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/abstractverticle/MosipVerticleManager.java +++ b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/abstractverticle/MosipVerticleManager.java @@ -7,6 +7,7 @@ import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.Map; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -94,6 +95,12 @@ public abstract class MosipVerticleManager extends AbstractVerticle @Autowired protected PropertiesUtil propertiesUtil; + /* + * Comma separated out bus message addresses for which message will not be sent out from any stage + */ + @Value("#{T(java.util.Arrays).asList('${mosip.regproc.stage-common.bus-out-halt-addresses:}')}") + protected List busOutHaltAddresses; + @Autowired private MosipEventBusFactory mosipEventBusFactory; @@ -173,6 +180,10 @@ public MosipEventBus getEventBus(Object verticleName, String clusterManagerUrl, @Override public void consumeAndSend(MosipEventBus mosipEventBus, MessageBusAddress fromAddress, MessageBusAddress toAddress, long messageExpiryTimeLimit) { + if(busOutHaltAddresses.contains(toAddress.getAddress())) { + consume(mosipEventBus, fromAddress, messageExpiryTimeLimit); + return; + } mosipEventBus.consumeAndSend(fromAddress, toAddress, (msg, handler) -> { logger.debug("consumeAndSend received from {} {}",fromAddress.toString(), msg.getBody()); Map mdc = MDC.getCopyOfContextMap(); @@ -218,6 +229,8 @@ public void consumeAndSend(MosipEventBus mosipEventBus, MessageBusAddress fromAd * The message that needs to be sent */ public void send(MosipEventBus mosipEventBus, MessageBusAddress toAddress, MessageDTO message) { + if(busOutHaltAddresses.contains(toAddress.getAddress())) + return; addTagsToMessageDTO(message); message.setLastHopTimestamp(DateUtils.formatToISOString(DateUtils.getUTCCurrentDateTime())); mosipEventBus.send(toAddress, message); diff --git a/registration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/abstractverticle/ConsumerVerticle.java b/registration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/abstractverticle/ConsumerVerticle.java index 53a75c14d8a..974cf840308 100644 --- a/registration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/abstractverticle/ConsumerVerticle.java +++ b/registration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/abstractverticle/ConsumerVerticle.java @@ -1,6 +1,7 @@ package io.mosip.registration.processor.abstractverticle; import java.net.URL; +import java.util.ArrayList; import brave.Tracing; import io.mosip.registration.processor.core.tracing.EventTracingHandler; @@ -39,6 +40,7 @@ public void start() throws UnsupportedEventBusTypeException { this.messageDTO.setIsValid(true); this.messageDTO.setInternalError(false); this.messageDTO.setReg_type(RegistrationType.NEW.name()); + this.busOutHaltAddresses = new ArrayList(); //this.consume(mosipEventBus, MessageBusAddress.PACKET_VALIDATOR_BUS_IN); //this.consumeAndSend(mosipEventBus, MessageBusAddress.PACKET_VALIDATOR_BUS_OUT, MessageBusAddress.RETRY_BUS);