Skip to content

Commit

Permalink
MODINV-944 OOM issue in mod-inventory (#668)
Browse files Browse the repository at this point in the history
* close kafka producer to fix memory leaks

* increase test coverage

* increase test coverage, add wrapper for SimpleKafkaProducerManager

* remove unneccesary tests

* correct spaces

* correct spaces

* remove code smell

* close producers

* add shutdown method in marc bib update kafka

* add shutdown method in qucik marc kafka

* add shutdown hooks to close producers

* increase test coverage

* add test ConsortiumInstanceSharingConsumerVerticleTest
  • Loading branch information
JavokhirAbdullayev authored Jan 23, 2024
1 parent 4c8ca59 commit e0c3f01
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 42 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* Apply 005 logic before saving MARC Bib in SRS [MODINV-921](https://issues.folio.org/browse/MODINV-921)
* Remove extra fields from 'holdings/move' mechanism [MODINV-948](https://issues.folio.org/browse/MODINV-948)
* Allow to link local instance to shared instance [MODINV-901](https://issues.folio.org/browse/MODINV-901)
* OOM issue in mod-inventory ([MODINV-944](https://issues.folio.org/browse/MODINV-944))

## 20.1.0 2023-10-13
* Update status when user attempts to update shared auth record from member tenant ([MODDATAIMP-926](https://issues.folio.org/browse/MODDATAIMP-926))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class ConsortiumInstanceSharingConsumerVerticle extends AbstractVerticle
private final int loadLimit = getLoadLimit();

private KafkaConsumerWrapper<String, String> consumer;
private ConsortiumInstanceSharingHandler consortiumInstanceSharingHandler;

@Override
public void start(Promise<Void> startPromise) {
Expand All @@ -46,7 +47,7 @@ public void start(Promise<Void> startPromise) {
HttpClient httpClient = vertx.createHttpClient();
Storage storage = Storage.basedUpon(config, httpClient);
SharedInstanceEventIdStorageServiceImpl sharedInstanceEventIdStorageService = new SharedInstanceEventIdStorageServiceImpl(new EventIdStorageDaoImpl(new PostgresClientFactory(vertx)));
ConsortiumInstanceSharingHandler consortiumInstanceSharingHandler = new ConsortiumInstanceSharingHandler(vertx, httpClient, storage, kafkaConfig, sharedInstanceEventIdStorageService);
consortiumInstanceSharingHandler = new ConsortiumInstanceSharingHandler(vertx, httpClient, storage, kafkaConfig, sharedInstanceEventIdStorageService);

var kafkaConsumerFuture = createKafkaConsumerWrapper(kafkaConfig, consortiumInstanceSharingHandler);
kafkaConsumerFuture.onFailure(startPromise::fail)
Expand Down Expand Up @@ -90,11 +91,13 @@ private KafkaConfig getKafkaConfig(JsonObject config) {

@Override
public void stop(Promise<Void> stopPromise) {
consumer.stop().onComplete(ar -> stopPromise.complete());
consumer.stop().onComplete(ar -> {
consortiumInstanceSharingHandler.shutdown();
stopPromise.complete();
});
}

private int getLoadLimit() {
return Integer.parseInt(System.getProperty("inventory.kafka.ConsortiumInstanceSharingConsumer.loadLimit", "5"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class MarcBibUpdateConsumerVerticle extends AbstractVerticle {
private static final String METADATA_EXPIRATION_TIME = "inventory.mapping-metadata-cache.expiration.time.seconds";
private final int loadLimit = getLoadLimit();
private KafkaConsumerWrapper<String, String> marcBibUpdateConsumerWrapper;
private MarcBibUpdateKafkaHandler marcBibUpdateKafkaHandler;

@Override
public void start(Promise<Void> startPromise) {
Expand All @@ -43,7 +44,7 @@ public void start(Promise<Void> startPromise) {
var mappingMetadataExpirationTime = getCacheEnvVariable(config, METADATA_EXPIRATION_TIME);
MappingMetadataCache mappingMetadataCache = new MappingMetadataCache(vertx, client, Long.parseLong(mappingMetadataExpirationTime));

var marcBibUpdateKafkaHandler = new MarcBibUpdateKafkaHandler(vertx, getMaxDistributionNumber(), kafkaConfig, instanceUpdateDelegate, mappingMetadataCache);
marcBibUpdateKafkaHandler = new MarcBibUpdateKafkaHandler(vertx, getMaxDistributionNumber(), kafkaConfig, instanceUpdateDelegate, mappingMetadataCache);

marcBibUpdateConsumerWrapper = createConsumer(kafkaConfig, SRS_MARC_BIB_TOPIC_NAME);
marcBibUpdateConsumerWrapper.start(marcBibUpdateKafkaHandler, constructModuleName())
Expand All @@ -70,7 +71,10 @@ private KafkaConsumerWrapper<String, String> createConsumer(KafkaConfig kafkaCon
@Override
public void stop(Promise<Void> stopPromise) {
marcBibUpdateConsumerWrapper.stop()
.onComplete(ar -> stopPromise.complete());
.onComplete(ar -> {
marcBibUpdateKafkaHandler.shutdown();
stopPromise.complete();
});
}

private int getLoadLimit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaConsumerWrapper;

import static java.lang.String.format;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_ENV;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_HOST;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_MAX_REQUEST_SIZE;
Expand All @@ -36,6 +35,7 @@ public class QuickMarcConsumerVerticle extends AbstractVerticle {
private final int loadLimit = getLoadLimit();
private final int maxDistributionNumber = getMaxDistributionNumber();
private KafkaConsumerWrapper<String, String> consumer;
private QuickMarcKafkaHandler handler;

@Override
public void start(Promise<Void> startPromise) {
Expand All @@ -47,7 +47,7 @@ public void start(Promise<Void> startPromise) {

var precedingSucceedingTitlesHelper = new PrecedingSucceedingTitlesHelper(WebClient.wrap(client));
HoldingsCollectionService holdingsCollectionService = new HoldingsCollectionService();
var handler = new QuickMarcKafkaHandler(vertx, storage, maxDistributionNumber, kafkaConfig, precedingSucceedingTitlesHelper, holdingsCollectionService);
handler = new QuickMarcKafkaHandler(vertx, storage, maxDistributionNumber, kafkaConfig, precedingSucceedingTitlesHelper, holdingsCollectionService);

var kafkaConsumerFuture = createKafkaConsumer(kafkaConfig, QMEventTypes.QM_SRS_MARC_RECORD_UPDATED, handler);

Expand All @@ -74,7 +74,10 @@ private KafkaConfig getKafkaConfig(JsonObject config) {

@Override
public void stop(Promise<Void> stopPromise) {
consumer.stop().onComplete(ar -> stopPromise.complete());
consumer.stop().onComplete(ar -> {
handler.shutdown();
stopPromise.complete();
});
}

private Future<KafkaConsumerWrapper<String, String>> createKafkaConsumer(KafkaConfig kafkaConfig, QMEventTypes eventType,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
package org.folio.inventory.consortium.consumers;

import static java.lang.String.format;
import static org.apache.commons.lang.StringUtils.EMPTY;
import static org.folio.inventory.consortium.entities.SharingInstanceEventType.CONSORTIUM_INSTANCE_SHARING_COMPLETE;
import static org.folio.inventory.consortium.entities.SharingStatus.COMPLETE;
import static org.folio.inventory.consortium.handlers.InstanceSharingHandlerFactory.getInstanceSharingHandler;
import static org.folio.inventory.consortium.handlers.InstanceSharingHandlerFactory.values;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_URL_HEADER;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Promise;
Expand All @@ -10,6 +20,11 @@
import io.vertx.kafka.client.producer.KafkaHeader;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.inventory.consortium.entities.SharingInstance;
Expand All @@ -33,26 +48,11 @@
import org.folio.kafka.exception.DuplicateEventException;
import org.folio.kafka.services.KafkaProducerRecordBuilder;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;

import static java.lang.String.format;
import static org.apache.commons.lang.StringUtils.EMPTY;
import static org.folio.inventory.consortium.entities.SharingInstanceEventType.CONSORTIUM_INSTANCE_SHARING_COMPLETE;
import static org.folio.inventory.consortium.entities.SharingStatus.COMPLETE;
import static org.folio.inventory.consortium.handlers.InstanceSharingHandlerFactory.getInstanceSharingHandler;
import static org.folio.inventory.consortium.handlers.InstanceSharingHandlerFactory.values;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_URL_HEADER;

public class ConsortiumInstanceSharingHandler implements AsyncRecordHandler<String, String> {

private static final Logger LOGGER = LogManager.getLogger(ConsortiumInstanceSharingHandler.class);
public static final String SOURCE = "source";
public static final SharingInstanceEventType eventType = CONSORTIUM_INSTANCE_SHARING_COMPLETE;

public static final String ID = "id";
private final Vertx vertx;
Expand All @@ -61,6 +61,7 @@ public class ConsortiumInstanceSharingHandler implements AsyncRecordHandler<Stri
private final KafkaConfig kafkaConfig;
private final InstanceOperationsHelper instanceOperations;
private final EventIdStorageService eventIdStorageService;
private final KafkaProducer<String, String> sharedProducer;

public ConsortiumInstanceSharingHandler(Vertx vertx, HttpClient httpClient, Storage storage, KafkaConfig kafkaConfig, EventIdStorageService eventIdStorageService) {
this.vertx = vertx;
Expand All @@ -69,6 +70,7 @@ public ConsortiumInstanceSharingHandler(Vertx vertx, HttpClient httpClient, Stor
this.kafkaConfig = kafkaConfig;
this.instanceOperations = new InstanceOperationsHelper();
this.eventIdStorageService = eventIdStorageService;
this.sharedProducer = createSharedProducer(eventType.name());
}

@Override
Expand Down Expand Up @@ -217,9 +219,6 @@ private void sendErrorResponseAndPrintLogMessage(String errorMessage, SharingIns

private void sendCompleteEventToKafka(SharingInstance sharingInstance, SharingStatus status, String errorMessage,
Map<String, String> kafkaHeaders) {

SharingInstanceEventType evenType = CONSORTIUM_INSTANCE_SHARING_COMPLETE;

try {
String tenantId = kafkaHeaders.get(OKAPI_TENANT_HEADER);
List<KafkaHeader> kafkaHeadersList = convertKafkaHeadersMap(kafkaHeaders);
Expand All @@ -228,24 +227,23 @@ private void sendCompleteEventToKafka(SharingInstance sharingInstance, SharingSt
" to tenant {}. Status: {}, Message: {}", sharingInstance.getInstanceIdentifier(), tenantId, status.getValue(), errorMessage);

KafkaProducerRecord<String, String> kafkaRecord =
createProducerRecord(getTopicName(tenantId, evenType),
createProducerRecord(getTopicName(tenantId, eventType),
sharingInstance,
status,
errorMessage,
kafkaHeadersList);

var kafkaProducer = createProducer(tenantId, getTopicName(tenantId, evenType));
kafkaProducer.send(kafkaRecord)
sharedProducer.send(kafkaRecord)
.onSuccess(res -> LOGGER.info("Event with type {}, was sent to kafka about sharing instance with InstanceId={}",
evenType.value(), sharingInstance.getInstanceIdentifier()))
eventType.value(), sharingInstance.getInstanceIdentifier()))
.onFailure(err -> {
var cause = err.getCause();
LOGGER.info("Failed to sent event {} to kafka about sharing instance with InstanceId={}, cause: {}",
evenType.value(), sharingInstance.getInstanceIdentifier(), cause);
eventType.value(), sharingInstance.getInstanceIdentifier(), cause);
});
} catch (Exception e) {
LOGGER.error("Failed to send an event for eventType {} about sharing instance with InstanceId={}, cause {}",
evenType.value(), sharingInstance.getInstanceIdentifier(), e);
eventType.value(), sharingInstance.getInstanceIdentifier(), e);
}
}

Expand Down Expand Up @@ -276,9 +274,9 @@ private String getTopicName(String tenantId, SharingInstanceEventType eventType)
KafkaTopicNameHelper.getDefaultNameSpace(), tenantId, eventType.value());
}

private KafkaProducer<String, String> createProducer(String tenantId, String topicName) {
LOGGER.info("createProducer :: tenantId: {}, topicName: {}", tenantId, topicName);
return new SimpleKafkaProducerManager(vertx, kafkaConfig).createShared(topicName);
private KafkaProducer<String, String> createSharedProducer(String name) {
LOGGER.info("createSharedProducer :: topicName: {}", name);
return new SimpleKafkaProducerManager(vertx, kafkaConfig).createShared(name);
}

private SharingInstance parseSharingInstance(String eventValue) {
Expand All @@ -302,4 +300,9 @@ private List<KafkaHeader> convertKafkaHeadersMap(Map<String, String> kafkaHeader
);
}

public void shutdown() {
if (sharedProducer != null) {
sharedProducer.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,10 @@ private LinkUpdateReport mapToLinkReport(MarcBibUpdate marcBibUpdate, String ins
.withTenant(marcBibUpdate.getTenant())
.withTs(marcBibUpdate.getTs());
}

public void shutdown() {
if (producer != null) {
producer.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,11 @@ private KafkaProducerRecord<String, String> createRecord(String eventPayload, St
return producerRecord;
}

public void shutdown() {
producerMap.values().forEach(producer -> {
if (producer != null) {
producer.close();
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package org.folio.inventory.consortium.consumers;

import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith;
import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_ENV;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_HOST;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_MAX_REQUEST_SIZE;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_PORT;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_REPLICATION_FACTOR;

import io.vertx.core.DeploymentOptions;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import net.mguenther.kafka.junit.EmbeddedKafkaCluster;
import org.folio.inventory.ConsortiumInstanceSharingConsumerVerticle;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

@RunWith(VertxUnitRunner.class)
public class ConsortiumInstanceSharingConsumerVerticleTest {

private static final String KAFKA_ENV_NAME = "test-env";
private static Vertx vertx = Vertx.vertx();
public static EmbeddedKafkaCluster cluster;

@Mock
private static ConsortiumInstanceSharingHandler consortiumInstanceSharingHandler;

@Before
public void setUp() {
MockitoAnnotations.openMocks(this);
}

@Test
public void shouldDeployVerticle(TestContext context) {
Async async = context.async();
cluster = provisionWith(defaultClusterConfig());
cluster.start();
String[] hostAndPort = cluster.getBrokerList().split(":");
DeploymentOptions options = new DeploymentOptions()
.setConfig(new JsonObject()
.put(KAFKA_HOST, hostAndPort[0])
.put(KAFKA_PORT, hostAndPort[1])
.put(KAFKA_REPLICATION_FACTOR, "1")
.put(KAFKA_ENV, KAFKA_ENV_NAME)
.put(KAFKA_MAX_REQUEST_SIZE, "1048576"))
.setWorker(true);

Promise<String> promise = Promise.promise();
vertx.deployVerticle(ConsortiumInstanceSharingConsumerVerticle.class.getName(), options, promise);

promise.future().onComplete(ar -> {
context.assertTrue(ar.succeeded());
async.complete();
});

}

@AfterClass
public static void tearDownClass(TestContext context) {
Async async = context.async();
vertx.close(ar -> {
cluster.stop();
consortiumInstanceSharingHandler.shutdown();
async.complete();
});
}

}
Loading

0 comments on commit e0c3f01

Please sign in to comment.