Skip to content

Commit

Permalink
MOSIP-31982 changes done for publishing the data from websub to kafka…
Browse files Browse the repository at this point in the history
… directly (#1230)

Signed-off-by: Neha Farheen <[email protected]>
Co-authored-by: Neha Farheen <[email protected]>
  • Loading branch information
Neha2365 and Neha Farheen authored Apr 1, 2024
1 parent dbcefea commit 0e385e5
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 72 deletions.
6 changes: 6 additions & 0 deletions authentication/authentication-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.3</version>
</dependency>

<!-- Kernel Template -->
<dependency>
<groupId>io.mosip.kernel</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.mosip.authentication.common.service.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

@Configuration
public class KafkaProducerConfig {

@Value(value = "${mosip.ida.kafka.bootstrap.servers}")
private String bootstrapAddress;

@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.mosip.authentication.common.service.websub.impl;
package io.mosip.authentication.common.service.kafka.impl;

import static io.mosip.authentication.core.constant.IdAuthConfigKeyConstants.ON_DEMAND_TEMPLATE_EXTRACTION_TOPIC;
import static io.mosip.authentication.core.constant.IdAuthConfigKeyConstants.AUTHENTICATION_ERROR_EVENTING_TOPIC;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -9,15 +9,13 @@

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import io.mosip.authentication.common.service.entity.PartnerData;
import io.mosip.authentication.common.service.helper.WebSubHelper;
import io.mosip.authentication.common.service.repository.PartnerDataRepository;
import io.mosip.authentication.common.service.transaction.manager.IdAuthSecurityManager;
import io.mosip.authentication.core.constant.IdAuthCommonConstants;
import io.mosip.authentication.core.exception.IdAuthenticationBusinessException;
import io.mosip.authentication.core.indauth.dto.BaseAuthResponseDTO;
import io.mosip.authentication.core.indauth.dto.BaseRequestDTO;
import io.mosip.authentication.core.logger.IdaLogger;
import io.mosip.authentication.core.partner.dto.PartnerDTO;
Expand All @@ -28,12 +26,12 @@
import io.mosip.kernel.core.websub.model.EventModel;

/**
* The Class OnDemandTemplateEventPublisher.
* The Class AuthenticationErrorEventingPublisher.
*
* @author Neha
*/
@Component
public class OndemandTemplateEventPublisher extends BaseWebSubEventsInitializer {
public class AuthenticationErrorEventingPublisher {

private static final String REQUEST_SIGNATURE = "requestSignature";

Expand All @@ -56,78 +54,44 @@ public class OndemandTemplateEventPublisher extends BaseWebSubEventsInitializer

/** The Constant logger. */

private static final Logger logger = IdaLogger.getLogger(OndemandTemplateEventPublisher.class);
private static final Logger logger = IdaLogger.getLogger(AuthenticationErrorEventingPublisher.class);


/** The on demand template extraction topic. */
@Value("${" + ON_DEMAND_TEMPLATE_EXTRACTION_TOPIC + "}")
private String onDemadTemplateExtractionTopic;
/** The Authenticatrion error eventing topic. */
@Value("${" + AUTHENTICATION_ERROR_EVENTING_TOPIC + "}")
private String authenticationErrorEventingTopic;

@Value("${mosip.ida.ondemand.template.extraction.partner.id}")
@Value("${mosip.ida.authentication.error.eventing.encrypt.partner.id}")
private String partnerId;

/** The web sub event publish helper. */

@Autowired
private WebSubHelper webSubHelper;
private KafkaTemplate<String, Object> kafkaTemplate;

@Autowired
private IdAuthSecurityManager securityManager;

@Autowired
private PartnerDataRepository partnerDataRepo;

/**
* Do subscribe.
*/
@Override
protected void doSubscribe() {
// Nothing to do here since we are just publishing event for this topic
}

/**
* Try register topic partner service events.
*/
private void tryRegisterTopicOnDemandEvent() {
try {
logger.debug(IdAuthCommonConstants.SESSION_ID, "tryRegisterOnDemandEvent", "",
"Trying to register topic: " + onDemadTemplateExtractionTopic);
webSubHelper.registerTopic(onDemadTemplateExtractionTopic);
logger.info(IdAuthCommonConstants.SESSION_ID, "tryRegisterOnDemandEvent", "",
"Registered topic: " + onDemadTemplateExtractionTopic);
} catch (Exception e) {
logger.info(IdAuthCommonConstants.SESSION_ID, "tryRegisterOnDemandEvent", e.getClass().toString(),
"Error registering topic: " + onDemadTemplateExtractionTopic + "\n" + e.getMessage());
}
}

@Override
protected void doRegister() {
logger.info(IdAuthCommonConstants.SESSION_ID, "doRegister", this.getClass().getSimpleName(),
"On demand template event topic..");
tryRegisterTopicOnDemandEvent();
}

public void publishEvent(EventModel eventModel) {
webSubHelper.publishEvent(onDemadTemplateExtractionTopic, eventModel);
}


public void notify(BaseRequestDTO baserequestdto, String headerSignature, Optional<PartnerDTO> partner,
IdAuthenticationBusinessException e, Map<String, Object> metadata) {
try {
sendEvents(baserequestdto, headerSignature, partner, e, metadata);
} catch (Exception exception) {
logger.error(IdRepoSecurityManager.getUser(), "On demand template extraction", "notify",
logger.error(IdRepoSecurityManager.getUser(), "Authentication error eventing", "notify",
exception.getMessage());
}
}

private void sendEvents(BaseRequestDTO baserequestdto, String headerSignature, Optional<PartnerDTO> partner,
IdAuthenticationBusinessException e, Map<String, Object> metadata) {
logger.info("Inside sendEvents ondemand extraction");
logger.info("Inside partner data to get certificate for ondemand extraction encryption");
logger.info("Inside sendEvents authentication error eventing");
logger.info("Inside partner data to get certificate for authentication error eventing encryption");
Optional<PartnerData> partnerDataCert = partnerDataRepo.findByPartnerId(partnerId);
if (partnerDataCert.isEmpty()) {
logger.info("Partner is not configured for on demand extraction.");
logger.info("Partner is not configured for encrypting individual id.");
} else {
Map<String, Object> eventData = new HashMap<>();
eventData.put(ERROR_CODE, e.getErrorCode());
Expand All @@ -139,7 +103,7 @@ private void sendEvents(BaseRequestDTO baserequestdto, String headerSignature, O
eventData.put(INDIVIDUAL_ID_TYPE, baserequestdto.getIndividualIdType());
eventData.put(ENTITY_NAME, partner.map(PartnerDTO::getPartnerName).orElse(null));
eventData.put(REQUEST_SIGNATURE, headerSignature);
EventModel eventModel = createEventModel(onDemadTemplateExtractionTopic, eventData);
EventModel eventModel = createEventModel(authenticationErrorEventingTopic, eventData);
publishEvent(eventModel);
}
}
Expand All @@ -158,10 +122,14 @@ private EventModel createEventModel(String topic, Map<String, Object> eventData)
model.setTopic(topic);
return model;
}

public void publishEvent(EventModel eventModel) {
kafkaTemplate.send(authenticationErrorEventingTopic, eventModel);
}

private String encryptIndividualId(String id, String partnerCertificate) {
try {
logger.info("Inside the method of encryptIndividualId using partner certificate ");
logger.info("Inside the method of encrypting IndividualId using partner certificate ");
return securityManager.asymmetricEncryption(id.getBytes(), partnerCertificate);
} catch (IdAuthenticationBusinessException e) {
// TODO Auto-generated catch block
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,6 @@ public String asymmetricEncryption(byte[] dataToEncrypt, String partnerCertifica
X509Certificate x509Certificate = getX509Certificate(partnerCertificate);
PublicKey publicKey = x509Certificate.getPublicKey();
byte[] encryptedData = cryptoCore.asymmetricEncrypt(publicKey, dataToEncrypt);
mosipLogger.info("AssymetricEncrypted data -- Start" + encryptedData+ " End--AssymetricEncrypted data" );
return CryptoUtil.encodeBase64(encryptedData);

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
mosip.ida.kafka.bootstrap.servers=kafka-0.kafka-headless.${kafka.profile}:${kafka.port},kafka-1.kafka-headless.${kafka.profile}:${kafka.port},kafka-2.kafka-headless.${kafka.profile}:${kafka.port}
spring.kafka.admin.properties.allow.auto.create.topics=true
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,7 @@ private IdAuthConfigKeyConstants() {
public static final String AUTH_TRANSACTION_STATUS_TOPIC = "ida-topic-auth-transaction-status";
public static final String AUTH_ANONYMOUS_PROFILE_TOPIC = "ida-topic-auth-anonymous-profile";
public static final String AUTH_FRAUD_ANALYSIS_TOPIC = "ida-topic-fraud-analysis";
public static final String ON_DEMAND_TEMPLATE_EXTRACTION_TOPIC = "ida-topic-on-demand-template-extraction";

public static final String AUTHENTICATION_ERROR_EVENTING_TOPIC = "ida-topic-authentication-error-eventing";

public static final String IDA_MAX_CREDENTIAL_PULL_WINDOW_DAYS = "ida-max-credential-pull-window-days";
public static final String IDA_MAX_WEBSUB_MSG_PULL_WINDOW_DAYS = "ida-max-websub-messages-pull-window-days";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.mosip.authentication.common.service.integration.PartnerServiceManager;
import io.mosip.authentication.common.service.integration.PasswordComparator;
import io.mosip.authentication.common.service.integration.TokenIdManager;
import io.mosip.authentication.common.service.kafka.impl.AuthenticationErrorEventingPublisher;
import io.mosip.authentication.common.service.transaction.manager.IdAuthSecurityManager;
import io.mosip.authentication.common.service.util.EnvUtil;
import io.mosip.authentication.common.service.util.IdaRequestResponsConsumerUtil;
Expand All @@ -54,7 +55,6 @@
import io.mosip.authentication.common.service.websub.impl.AuthTransactionStatusEventPublisher;
import io.mosip.authentication.common.service.websub.impl.IdAuthFraudAnalysisEventPublisher;
import io.mosip.authentication.common.service.websub.impl.MasterDataUpdateEventInitializer;
import io.mosip.authentication.common.service.websub.impl.OndemandTemplateEventPublisher;
import io.mosip.authentication.common.service.websub.impl.PartnerCACertEventInitializer;
import io.mosip.authentication.common.service.websub.impl.PartnerServiceEventsInitializer;
import io.mosip.authentication.core.util.IdTypeUtil;
Expand Down Expand Up @@ -112,7 +112,7 @@
PartnerCACertEventServiceImpl.class, PartnerCACertEventInitializer.class,
IdAuthWebSubInitializer.class, AuthAnonymousEventPublisher.class, EnvUtil.class, KeyBindedTokenMatcherUtil.class,
HSMHealthCheck.class, PrivateKeyDecryptorHelper.class,
PasswordAuthServiceImpl.class, PasswordComparator.class, OndemandTemplateEventPublisher.class })
PasswordAuthServiceImpl.class, PasswordComparator.class, AuthenticationErrorEventingPublisher.class })
@ComponentScan(basePackages = { "io.mosip.authentication.otp.service.*",
"io.mosip.kernel.core.logger.config", "${mosip.auth.adapter.impl.basepackage}" }, excludeFilters = @ComponentScan.Filter(type = FilterType.REGEX, pattern = {
"io.mosip.idrepository.core.config.IdRepoDataSourceConfig.*" }))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
import io.mosip.authentication.common.service.builder.AuthTransactionBuilder;
import io.mosip.authentication.common.service.helper.AuditHelper;
import io.mosip.authentication.common.service.helper.AuthTransactionHelper;
import io.mosip.authentication.common.service.kafka.impl.AuthenticationErrorEventingPublisher;
import io.mosip.authentication.common.service.transaction.manager.IdAuthSecurityManager;
import io.mosip.authentication.common.service.util.IdaRequestResponsConsumerUtil;
import io.mosip.authentication.common.service.validator.OTPRequestValidator;
import io.mosip.authentication.common.service.websub.impl.OndemandTemplateEventPublisher;
import io.mosip.authentication.core.constant.AuditEvents;
import io.mosip.authentication.core.constant.AuditModules;
import io.mosip.authentication.core.constant.IdAuthCommonConstants;
Expand Down Expand Up @@ -93,7 +93,7 @@ public class OTPController {
private IdAuthSecurityManager securityManager;

@Autowired
private OndemandTemplateEventPublisher ondemandTemplateEventPublisher;
private AuthenticationErrorEventingPublisher authenticationErrorEventingPublisher;

@InitBinder
private void initBinder(WebDataBinder binder) {
Expand Down Expand Up @@ -163,7 +163,7 @@ public OtpResponseDTO generateOTP(@Valid @RequestBody OtpRequestDTO otpRequestDt
} catch (IdAuthenticationBusinessException e) {
logger.error(IdAuthCommonConstants.SESSION_ID, e.getClass().toString(), e.getErrorCode(), e.getErrorText());
if (IdAuthenticationErrorConstants.ID_NOT_AVAILABLE.getErrorCode().equals(e.getErrorCode())) {
ondemandTemplateEventPublisher.notify(otpRequestDto, request.getHeader("signature"), partner, e,
authenticationErrorEventingPublisher.notify(otpRequestDto, request.getHeader("signature"), partner, e,
otpRequestDto.getMetadata());
}
auditHelper.audit(AuditModules.OTP_REQUEST, AuditEvents.OTP_TRIGGER_REQUEST_RESPONSE , otpRequestDto.getTransactionID(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.mosip.authentication.common.service.integration.PartnerServiceManager;
import io.mosip.authentication.common.service.integration.PasswordComparator;
import io.mosip.authentication.common.service.integration.TokenIdManager;
import io.mosip.authentication.common.service.kafka.impl.AuthenticationErrorEventingPublisher;
import io.mosip.authentication.common.service.transaction.manager.IdAuthSecurityManager;
import io.mosip.authentication.common.service.util.BioMatcherUtil;
import io.mosip.authentication.common.service.util.EnvUtil;
Expand All @@ -59,7 +60,6 @@
import io.mosip.authentication.common.service.websub.impl.AuthTransactionStatusEventPublisher;
import io.mosip.authentication.common.service.websub.impl.IdAuthFraudAnalysisEventPublisher;
import io.mosip.authentication.common.service.websub.impl.MasterDataUpdateEventInitializer;
import io.mosip.authentication.common.service.websub.impl.OndemandTemplateEventPublisher;
import io.mosip.authentication.common.service.websub.impl.PartnerCACertEventInitializer;
import io.mosip.authentication.common.service.websub.impl.PartnerServiceEventsInitializer;
import io.mosip.authentication.core.util.DemoMatcherUtil;
Expand Down Expand Up @@ -126,7 +126,7 @@
AuthAnonymousProfileServiceImpl.class, AuthAnonymousEventPublisher.class, SessionKeyDecrytorHelper.class, ExternalRestHelperConfig.class, IdaRequestResponsConsumerUtil.class,
PartnerCACertEventServiceImpl.class, PartnerCACertEventInitializer.class, EnvUtil.class, KeyBindedTokenMatcherUtil.class,
HSMHealthCheck.class, TokenValidationHelper.class, VCSchemaProviderUtil.class, PrivateKeyDecryptorHelper.class,
PasswordAuthServiceImpl.class, PasswordComparator.class, OndemandTemplateEventPublisher.class })
PasswordAuthServiceImpl.class, PasswordComparator.class, AuthenticationErrorEventingPublisher.class })
@ComponentScan(basePackages = { "io.mosip.authentication.service.*", "io.mosip.kernel.core.logger.config",
"io.mosip.authentication.common.service.config", "${mosip.auth.adapter.impl.basepackage}" }, excludeFilters = @ComponentScan.Filter(type = FilterType.REGEX, pattern = {
"io.mosip.idrepository.core.config.IdRepoDataSourceConfig.*" }))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import io.mosip.authentication.common.service.builder.AuthTransactionBuilder;
import io.mosip.authentication.common.service.helper.AuditHelper;
import io.mosip.authentication.common.service.helper.AuthTransactionHelper;
import io.mosip.authentication.common.service.kafka.impl.AuthenticationErrorEventingPublisher;
import io.mosip.authentication.common.service.util.AuthTypeUtil;
import io.mosip.authentication.common.service.util.IdaRequestResponsConsumerUtil;
import io.mosip.authentication.common.service.validator.AuthRequestValidator;
import io.mosip.authentication.common.service.websub.impl.OndemandTemplateEventPublisher;
import io.mosip.authentication.core.constant.AuditEvents;
import io.mosip.authentication.core.constant.IdAuthCommonConstants;
import io.mosip.authentication.core.constant.IdAuthenticationErrorConstants;
Expand Down Expand Up @@ -91,7 +91,7 @@ public class AuthController {
private PartnerService partnerService;

@Autowired
private OndemandTemplateEventPublisher ondemandTemplateEventPublisher;
private AuthenticationErrorEventingPublisher authenticationErrorEventingPublisher;


/**
Expand Down Expand Up @@ -163,7 +163,7 @@ public AuthResponseDTO authenticateIndividual(@Validated @RequestBody AuthReques
mosipLogger.error(IdAuthCommonConstants.SESSION_ID, this.getClass().getSimpleName(),
"authenticateApplication", e.getErrorCode() + " : " + e.getErrorText());
if (IdAuthenticationErrorConstants.ID_NOT_AVAILABLE.getErrorCode().equals(e.getErrorCode())) {
ondemandTemplateEventPublisher.notify(authrequestdto, request.getHeader("signature"), partner, e,
authenticationErrorEventingPublisher.notify(authrequestdto, request.getHeader("signature"), partner, e,
authrequestdto.getMetadata());
}
auditHelper.auditExceptionForAuthRequestedModules(AuditEvents.AUTH_REQUEST_RESPONSE, authrequestdto, e);
Expand Down
Loading

0 comments on commit 0e385e5

Please sign in to comment.