Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/folio-org/mod-inventory i…
Browse files Browse the repository at this point in the history
…nto MODINV-948

# Conflicts:
#	NEWS.md
  • Loading branch information
VRohach committed Jan 25, 2024
2 parents 9f4c159 + 8abc611 commit 5388856
Show file tree
Hide file tree
Showing 30 changed files with 484 additions and 88 deletions.
4 changes: 3 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
* Remove step of initial saving of incoming records to SRS [MODSOURMAN-1022](https://issues.folio.org/browse/MODSOURMAN-1022)
* Apply 005 logic before saving MARC Bib in SRS [MODINV-921](https://issues.folio.org/browse/MODINV-921)
* Remove extra fields from 'holdings/move' mechanism [MODINV-948](https://issues.folio.org/browse/MODINV-948)

* Allow to link local instance to shared instance [MODINV-901](https://issues.folio.org/browse/MODINV-901)
* OOM issue in mod-inventory ([MODINV-944](https://issues.folio.org/browse/MODINV-944))
* Make configurable params for instance sharing [MODINV-950](https://issues.folio.org/browse/MODINV-950)

## 20.1.0 2023-10-13
* Update status when user attempts to update shared auth record from member tenant ([MODDATAIMP-926](https://issues.folio.org/browse/MODDATAIMP-926))
Expand Down
2 changes: 1 addition & 1 deletion descriptors/ModuleDescriptor-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"provides": [
{
"id": "inventory",
"version": "13.0",
"version": "13.1",
"handlers": [
{
"methods": ["GET"],
Expand Down
4 changes: 4 additions & 0 deletions ramls/item.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
"type": "boolean",
"description": "Records the fact that the record should not be displayed in a discovery system"
},
"displaySummary": {
"description": "Display summary about the item",
"type": "string"
},
"title": {
"type": "string",
"description": "Resouce title (read only, inherited from associated instance record)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class ConsortiumInstanceSharingConsumerVerticle extends AbstractVerticle
private final int loadLimit = getLoadLimit();

private KafkaConsumerWrapper<String, String> consumer;
private ConsortiumInstanceSharingHandler consortiumInstanceSharingHandler;

@Override
public void start(Promise<Void> startPromise) {
Expand All @@ -46,7 +47,7 @@ public void start(Promise<Void> startPromise) {
HttpClient httpClient = vertx.createHttpClient();
Storage storage = Storage.basedUpon(config, httpClient);
SharedInstanceEventIdStorageServiceImpl sharedInstanceEventIdStorageService = new SharedInstanceEventIdStorageServiceImpl(new EventIdStorageDaoImpl(new PostgresClientFactory(vertx)));
ConsortiumInstanceSharingHandler consortiumInstanceSharingHandler = new ConsortiumInstanceSharingHandler(vertx, httpClient, storage, kafkaConfig, sharedInstanceEventIdStorageService);
consortiumInstanceSharingHandler = new ConsortiumInstanceSharingHandler(vertx, httpClient, storage, kafkaConfig, sharedInstanceEventIdStorageService);

var kafkaConsumerFuture = createKafkaConsumerWrapper(kafkaConfig, consortiumInstanceSharingHandler);
kafkaConsumerFuture.onFailure(startPromise::fail)
Expand Down Expand Up @@ -90,11 +91,13 @@ private KafkaConfig getKafkaConfig(JsonObject config) {

@Override
public void stop(Promise<Void> stopPromise) {
consumer.stop().onComplete(ar -> stopPromise.complete());
consumer.stop().onComplete(ar -> {
consortiumInstanceSharingHandler.shutdown();
stopPromise.complete();
});
}

private int getLoadLimit() {
return Integer.parseInt(System.getProperty("inventory.kafka.ConsortiumInstanceSharingConsumer.loadLimit", "5"));
}

}
10 changes: 8 additions & 2 deletions src/main/java/org/folio/inventory/InventoryVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import org.apache.logging.log4j.Logger;
import org.folio.inventory.common.WebRequestDiagnostics;
import org.folio.inventory.common.dao.PostgresClientFactory;
import org.folio.inventory.consortium.cache.ConsortiumDataCache;
import org.folio.inventory.consortium.services.ConsortiumService;
import org.folio.inventory.consortium.services.ConsortiumServiceImpl;
import org.folio.inventory.resources.AdminApi;
import org.folio.inventory.resources.Holdings;
import org.folio.inventory.resources.Instances;
Expand Down Expand Up @@ -53,12 +56,15 @@ public void start(Promise<Void> started) {

router.route().handler(WebRequestDiagnostics::outputDiagnostics);

ConsortiumDataCache consortiumDataCache = new ConsortiumDataCache(vertx, client);
ConsortiumService consortiumService = new ConsortiumServiceImpl(client, consortiumDataCache);

new AdminApi().register(router);
new Items(storage, client).register(router);
new MoveApi(storage, client).register(router);
new Instances(storage, client).register(router);
new Instances(storage, client, consortiumService).register(router);
new Holdings(storage).register(router);
new InstancesBatch(storage, client).register(router);
new InstancesBatch(storage, client, consortiumService).register(router);
new IsbnUtilsApi().register(router);
new ItemsByHoldingsRecordId(storage, client).register(router);
new InventoryConfigApi().register(router);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class MarcBibUpdateConsumerVerticle extends AbstractVerticle {
private static final String METADATA_EXPIRATION_TIME = "inventory.mapping-metadata-cache.expiration.time.seconds";
private final int loadLimit = getLoadLimit();
private KafkaConsumerWrapper<String, String> marcBibUpdateConsumerWrapper;
private MarcBibUpdateKafkaHandler marcBibUpdateKafkaHandler;

@Override
public void start(Promise<Void> startPromise) {
Expand All @@ -43,7 +44,7 @@ public void start(Promise<Void> startPromise) {
var mappingMetadataExpirationTime = getCacheEnvVariable(config, METADATA_EXPIRATION_TIME);
MappingMetadataCache mappingMetadataCache = new MappingMetadataCache(vertx, client, Long.parseLong(mappingMetadataExpirationTime));

var marcBibUpdateKafkaHandler = new MarcBibUpdateKafkaHandler(vertx, getMaxDistributionNumber(), kafkaConfig, instanceUpdateDelegate, mappingMetadataCache);
marcBibUpdateKafkaHandler = new MarcBibUpdateKafkaHandler(vertx, getMaxDistributionNumber(), kafkaConfig, instanceUpdateDelegate, mappingMetadataCache);

marcBibUpdateConsumerWrapper = createConsumer(kafkaConfig, SRS_MARC_BIB_TOPIC_NAME);
marcBibUpdateConsumerWrapper.start(marcBibUpdateKafkaHandler, constructModuleName())
Expand All @@ -70,7 +71,10 @@ private KafkaConsumerWrapper<String, String> createConsumer(KafkaConfig kafkaCon
@Override
public void stop(Promise<Void> stopPromise) {
marcBibUpdateConsumerWrapper.stop()
.onComplete(ar -> stopPromise.complete());
.onComplete(ar -> {
marcBibUpdateKafkaHandler.shutdown();
stopPromise.complete();
});
}

private int getLoadLimit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaConsumerWrapper;

import static java.lang.String.format;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_ENV;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_HOST;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_MAX_REQUEST_SIZE;
Expand All @@ -36,6 +35,7 @@ public class QuickMarcConsumerVerticle extends AbstractVerticle {
private final int loadLimit = getLoadLimit();
private final int maxDistributionNumber = getMaxDistributionNumber();
private KafkaConsumerWrapper<String, String> consumer;
private QuickMarcKafkaHandler handler;

@Override
public void start(Promise<Void> startPromise) {
Expand All @@ -47,7 +47,7 @@ public void start(Promise<Void> startPromise) {

var precedingSucceedingTitlesHelper = new PrecedingSucceedingTitlesHelper(WebClient.wrap(client));
HoldingsCollectionService holdingsCollectionService = new HoldingsCollectionService();
var handler = new QuickMarcKafkaHandler(vertx, storage, maxDistributionNumber, kafkaConfig, precedingSucceedingTitlesHelper, holdingsCollectionService);
handler = new QuickMarcKafkaHandler(vertx, storage, maxDistributionNumber, kafkaConfig, precedingSucceedingTitlesHelper, holdingsCollectionService);

var kafkaConsumerFuture = createKafkaConsumer(kafkaConfig, QMEventTypes.QM_SRS_MARC_RECORD_UPDATED, handler);

Expand All @@ -74,7 +74,10 @@ private KafkaConfig getKafkaConfig(JsonObject config) {

@Override
public void stop(Promise<Void> stopPromise) {
consumer.stop().onComplete(ar -> stopPromise.complete());
consumer.stop().onComplete(ar -> {
handler.shutdown();
stopPromise.complete();
});
}

private Future<KafkaConsumerWrapper<String, String>> createKafkaConsumer(KafkaConfig kafkaConfig, QMEventTypes eventType,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
package org.folio.inventory.consortium.consumers;

import static java.lang.String.format;
import static org.apache.commons.lang.StringUtils.EMPTY;
import static org.folio.inventory.consortium.entities.SharingInstanceEventType.CONSORTIUM_INSTANCE_SHARING_COMPLETE;
import static org.folio.inventory.consortium.entities.SharingStatus.COMPLETE;
import static org.folio.inventory.consortium.handlers.InstanceSharingHandlerFactory.getInstanceSharingHandler;
import static org.folio.inventory.consortium.handlers.InstanceSharingHandlerFactory.values;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_URL_HEADER;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Promise;
Expand All @@ -10,6 +20,11 @@
import io.vertx.kafka.client.producer.KafkaHeader;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.inventory.consortium.entities.SharingInstance;
Expand All @@ -33,26 +48,11 @@
import org.folio.kafka.exception.DuplicateEventException;
import org.folio.kafka.services.KafkaProducerRecordBuilder;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;

import static java.lang.String.format;
import static org.apache.commons.lang.StringUtils.EMPTY;
import static org.folio.inventory.consortium.entities.SharingInstanceEventType.CONSORTIUM_INSTANCE_SHARING_COMPLETE;
import static org.folio.inventory.consortium.entities.SharingStatus.COMPLETE;
import static org.folio.inventory.consortium.handlers.InstanceSharingHandlerFactory.getInstanceSharingHandler;
import static org.folio.inventory.consortium.handlers.InstanceSharingHandlerFactory.values;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_URL_HEADER;

public class ConsortiumInstanceSharingHandler implements AsyncRecordHandler<String, String> {

private static final Logger LOGGER = LogManager.getLogger(ConsortiumInstanceSharingHandler.class);
public static final String SOURCE = "source";
public static final SharingInstanceEventType eventType = CONSORTIUM_INSTANCE_SHARING_COMPLETE;

public static final String ID = "id";
private final Vertx vertx;
Expand All @@ -61,6 +61,7 @@ public class ConsortiumInstanceSharingHandler implements AsyncRecordHandler<Stri
private final KafkaConfig kafkaConfig;
private final InstanceOperationsHelper instanceOperations;
private final EventIdStorageService eventIdStorageService;
private final KafkaProducer<String, String> sharedProducer;

public ConsortiumInstanceSharingHandler(Vertx vertx, HttpClient httpClient, Storage storage, KafkaConfig kafkaConfig, EventIdStorageService eventIdStorageService) {
this.vertx = vertx;
Expand All @@ -69,6 +70,7 @@ public ConsortiumInstanceSharingHandler(Vertx vertx, HttpClient httpClient, Stor
this.kafkaConfig = kafkaConfig;
this.instanceOperations = new InstanceOperationsHelper();
this.eventIdStorageService = eventIdStorageService;
this.sharedProducer = createSharedProducer(eventType.name());
}

@Override
Expand Down Expand Up @@ -217,9 +219,6 @@ private void sendErrorResponseAndPrintLogMessage(String errorMessage, SharingIns

private void sendCompleteEventToKafka(SharingInstance sharingInstance, SharingStatus status, String errorMessage,
Map<String, String> kafkaHeaders) {

SharingInstanceEventType evenType = CONSORTIUM_INSTANCE_SHARING_COMPLETE;

try {
String tenantId = kafkaHeaders.get(OKAPI_TENANT_HEADER);
List<KafkaHeader> kafkaHeadersList = convertKafkaHeadersMap(kafkaHeaders);
Expand All @@ -228,24 +227,23 @@ private void sendCompleteEventToKafka(SharingInstance sharingInstance, SharingSt
" to tenant {}. Status: {}, Message: {}", sharingInstance.getInstanceIdentifier(), tenantId, status.getValue(), errorMessage);

KafkaProducerRecord<String, String> kafkaRecord =
createProducerRecord(getTopicName(tenantId, evenType),
createProducerRecord(getTopicName(tenantId, eventType),
sharingInstance,
status,
errorMessage,
kafkaHeadersList);

var kafkaProducer = createProducer(tenantId, getTopicName(tenantId, evenType));
kafkaProducer.send(kafkaRecord)
sharedProducer.send(kafkaRecord)
.onSuccess(res -> LOGGER.info("Event with type {}, was sent to kafka about sharing instance with InstanceId={}",
evenType.value(), sharingInstance.getInstanceIdentifier()))
eventType.value(), sharingInstance.getInstanceIdentifier()))
.onFailure(err -> {
var cause = err.getCause();
LOGGER.info("Failed to sent event {} to kafka about sharing instance with InstanceId={}, cause: {}",
evenType.value(), sharingInstance.getInstanceIdentifier(), cause);
eventType.value(), sharingInstance.getInstanceIdentifier(), cause);
});
} catch (Exception e) {
LOGGER.error("Failed to send an event for eventType {} about sharing instance with InstanceId={}, cause {}",
evenType.value(), sharingInstance.getInstanceIdentifier(), e);
eventType.value(), sharingInstance.getInstanceIdentifier(), e);
}
}

Expand Down Expand Up @@ -276,9 +274,9 @@ private String getTopicName(String tenantId, SharingInstanceEventType eventType)
KafkaTopicNameHelper.getDefaultNameSpace(), tenantId, eventType.value());
}

private KafkaProducer<String, String> createProducer(String tenantId, String topicName) {
LOGGER.info("createProducer :: tenantId: {}, topicName: {}", tenantId, topicName);
return new SimpleKafkaProducerManager(vertx, kafkaConfig).createShared(topicName);
private KafkaProducer<String, String> createSharedProducer(String name) {
LOGGER.info("createSharedProducer :: topicName: {}", name);
return new SimpleKafkaProducerManager(vertx, kafkaConfig).createShared(name);
}

private SharingInstance parseSharingInstance(String eventValue) {
Expand All @@ -302,4 +300,9 @@ private List<KafkaHeader> convertKafkaHeadersMap(Map<String, String> kafkaHeader
);
}

public void shutdown() {
if (sharedProducer != null) {
sharedProducer.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.Logger;
import org.folio.Record;
import org.folio.inventory.consortium.entities.SharingInstance;
import org.folio.kafka.SimpleConfigurationReader;
import org.folio.rest.client.ChangeManagerClient;
import org.folio.rest.jaxrs.model.InitJobExecutionsRqDto;
import org.folio.rest.jaxrs.model.InitialRecord;
Expand Down Expand Up @@ -46,10 +47,23 @@ public class RestDataImportHelper {
public static final String STATUS_COMMITTED = "COMMITTED";
public static final String STATUS_ERROR = "ERROR";

private static final String IMPORT_STATUS_POLL_INTERVAL_SEC_PARAM =
"inventory.sharing.di.status.poll.interval.seconds";
private static final String IMPORT_STATUS_POLL_NUMBER_PARAM =
"inventory.sharing.di.status.poll.number";
private static final String DEFAULT_IMPORT_STATUS_POLL_INTERVAL_SEC = "5";
private static final String DEFAULT_IMPORT_STATUS_POLL_NUMBER = "5";

private final Vertx vertx;
private final long durationInSec;
private final int attemptsNumber;

public RestDataImportHelper(Vertx vertx) {
this.vertx = vertx;
this.durationInSec = Integer.parseInt(SimpleConfigurationReader.getValue(
IMPORT_STATUS_POLL_INTERVAL_SEC_PARAM, DEFAULT_IMPORT_STATUS_POLL_INTERVAL_SEC));
this.attemptsNumber = Integer.parseInt(SimpleConfigurationReader.getValue(
IMPORT_STATUS_POLL_NUMBER_PARAM, DEFAULT_IMPORT_STATUS_POLL_NUMBER));
}

public static final JobProfileInfo JOB_PROFILE_INFO = new JobProfileInfo()
Expand All @@ -73,11 +87,6 @@ public Future<String> importMarcRecord(Record marcRecord, SharingInstance sharin

ChangeManagerClient changeManagerClient = getChangeManagerClient(kafkaHeaders);

//TODO: move to config
//Constants for checkDataImportStatus method
final long durationInSec = 20;
final int attemptsNumber = 3;

return initJobExecution(instanceId, changeManagerClient, kafkaHeaders)
.compose(jobExecutionId -> setDefaultJobProfileToJobExecution(jobExecutionId, changeManagerClient))
.compose(jobExecutionId -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,10 @@ private LinkUpdateReport mapToLinkReport(MarcBibUpdate marcBibUpdate, String ins
.withTenant(marcBibUpdate.getTenant())
.withTs(marcBibUpdate.getTs());
}

public void shutdown() {
if (producer != null) {
producer.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,11 @@ private KafkaProducerRecord<String, String> createRecord(String eventPayload, St
return producerRecord;
}

public void shutdown() {
producerMap.values().forEach(producer -> {
if (producer != null) {
producer.close();
}
});
}
}
Loading

0 comments on commit 5388856

Please sign in to comment.