diff --git a/NEWS.md b/NEWS.md
index 279464ff3..a25a074da 100644
--- a/NEWS.md
+++ b/NEWS.md
@@ -1,3 +1,6 @@
+## 20.2.11 2025-01-27
+* [MODINV-1125](https://folio-org.atlassian.net/browse/MODINV-1125) Fix handling optimistic locking behavior for instance update when consuming Marc Bib update event
+
## 20.2.10 2025-01-16
* [MODINV-1136](https://folio-org.atlassian.net/browse/MODINV-1136) The "discoverySuppress" instance attribute is not created in database when instance created by default mapping profile
diff --git a/pom.xml b/pom.xml
index 0df712500..51192eeb6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2,7 +2,7 @@
4.0.0
mod-inventory
org.folio
- 20.2.11-SNAPSHOT
+ 20.2.12-SNAPSHOT
Apache License 2.0
diff --git a/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandler.java b/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandler.java
index 35d07bec1..b85db5657 100644
--- a/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandler.java
+++ b/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandler.java
@@ -3,9 +3,12 @@
import static java.lang.String.format;
import static java.util.Objects.isNull;
import static org.folio.inventory.EntityLinksKafkaTopic.LINKS_STATS;
+import static org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil.OKAPI_USER_ID;
+import static org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil.constructContext;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.SUBFIELD_I;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.TAG_999;
import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_TYPE;
+import static org.folio.kafka.KafkaHeaderUtils.kafkaHeadersToMap;
import static org.folio.rest.jaxrs.model.LinkUpdateReport.Status.FAIL;
import static org.folio.rest.jaxrs.model.LinkUpdateReport.Status.SUCCESS;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
@@ -33,12 +36,10 @@
import org.folio.inventory.dataimport.cache.MappingMetadataCache;
import org.folio.inventory.dataimport.exceptions.OptimisticLockingException;
import org.folio.inventory.dataimport.handlers.actions.InstanceUpdateDelegate;
-import org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil;
import org.folio.inventory.dataimport.util.AdditionalFieldsUtil;
import org.folio.inventory.domain.instances.Instance;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.kafka.KafkaConfig;
-import org.folio.kafka.KafkaHeaderUtils;
import org.folio.kafka.SimpleKafkaProducerManager;
import org.folio.kafka.services.KafkaProducerRecordBuilder;
import org.folio.processing.exceptions.EventProcessingException;
@@ -55,8 +56,7 @@ public class MarcBibUpdateKafkaHandler implements AsyncRecordHandler handle(KafkaConsumerRecord consumerRecord) {
try {
- Promise promise = Promise.promise();
- MarcBibUpdate instanceEvent = OBJECT_MAPPER.readValue(consumerRecord.value(), MarcBibUpdate.class);
- Map headersMap = KafkaHeaderUtils.kafkaHeadersToMap(consumerRecord.headers());
- HashMap metaDataPayload = new HashMap<>();
- var jobId = instanceEvent.getJobId();
+ var instanceEvent = OBJECT_MAPPER.readValue(consumerRecord.value(), MarcBibUpdate.class);
+ var headers = kafkaHeadersToMap(consumerRecord.headers());
+ Map metaDataPayload = new HashMap<>();
LOGGER.info("Event payload has been received with event type: {} by jobId: {}", instanceEvent.getType(), instanceEvent.getJobId());
@@ -90,37 +88,10 @@ public Future handle(KafkaConsumerRecord consumerRecord)
LOGGER.error(message);
return Future.failedFuture(message);
}
- Context context = EventHandlingUtil.constructContext(instanceEvent.getTenant(), headersMap.get(OKAPI_TOKEN_HEADER), headersMap.get(OKAPI_URL_HEADER), headersMap.get(OKAPI_USER_ID));
- Record marcBibRecord = instanceEvent.getRecord();
-
- io.vertx.core.Context vertxContext = Vertx.currentContext();
-
- if(vertxContext == null) {
- return Future.failedFuture("handle:: operation must be executed by a Vertx thread");
- }
-
- vertxContext.owner().executeBlocking(
- () -> {
- var mappingMetadataDto =
- mappingMetadataCache.getByRecordTypeBlocking(jobId, context, MARC_BIB_RECORD_TYPE)
- .orElseThrow(() -> new EventProcessingException(format(MAPPING_METADATA_NOT_FOUND_MSG, jobId)));
- ensureEventPayloadWithMappingMetadata(metaDataPayload, mappingMetadataDto);
- return instanceUpdateDelegate.handleBlocking(metaDataPayload, marcBibRecord, context);
- },
- r -> {
- if (r.failed()) {
- LOGGER.warn("handle:: Error during instance update", r.cause());
- promise.fail(r.cause());
- } else {
- LOGGER.debug("handle:: Instance update was successful");
- promise.complete(r.result());
- }
- }
- );
Promise finalPromise = Promise.promise();
- promise.future()
- .onComplete(ar -> processUpdateResult(ar, metaDataPayload, finalPromise, consumerRecord, instanceEvent));
+ processEvent(instanceEvent, headers, metaDataPayload)
+ .onComplete(ar -> processUpdateResult(ar, consumerRecord, instanceEvent, metaDataPayload, finalPromise));
return finalPromise.future();
} catch (Exception ex) {
LOGGER.error(format("Failed to process kafka record from topic %s", consumerRecord.topic()), ex);
@@ -128,13 +99,48 @@ public Future handle(KafkaConsumerRecord consumerRecord)
}
}
+ private Future processEvent(MarcBibUpdate instanceEvent, Map headers, Map metaDataPayload) {
+ Context context = constructContext(instanceEvent.getTenant(), headers.get(OKAPI_TOKEN_HEADER), headers.get(OKAPI_URL_HEADER),
+ headers.get(OKAPI_USER_ID));
+ Record marcBibRecord = instanceEvent.getRecord();
+ var jobId = instanceEvent.getJobId();
+ Promise promise = Promise.promise();
+ io.vertx.core.Context vertxContext = Vertx.currentContext();
+
+ if(vertxContext == null) {
+ return Future.failedFuture("handle:: operation must be executed by a Vertx thread");
+ }
+
+ vertxContext.owner().executeBlocking(
+ () -> {
+ var mappingMetadataDto =
+ mappingMetadataCache.getByRecordTypeBlocking(jobId, context, MARC_BIB_RECORD_TYPE)
+ .orElseThrow(() -> new EventProcessingException(format(MAPPING_METADATA_NOT_FOUND_MSG, jobId)));
+ ensureEventPayloadWithMappingMetadata(metaDataPayload, mappingMetadataDto);
+ return instanceUpdateDelegate.handleBlocking(metaDataPayload, marcBibRecord, context);
+ },
+ r -> {
+ if (r.failed()) {
+ LOGGER.warn("handle:: Error during instance update", r.cause());
+ promise.fail(r.cause());
+ } else {
+ LOGGER.debug("handle:: Instance update was successful");
+ promise.complete(r.result());
+ }
+ }
+ );
+
+ return promise.future();
+ }
+
private void processUpdateResult(AsyncResult result,
- HashMap eventPayload,
- Promise promise,
KafkaConsumerRecord consumerRecord,
- MarcBibUpdate instanceEvent) {
+ MarcBibUpdate instanceEvent,
+ Map eventPayload,
+ Promise promise) {
if (result.failed() && result.cause() instanceof OptimisticLockingException) {
- processOLError(consumerRecord, promise, eventPayload, instanceEvent.getJobId(), result);
+ var headers = kafkaHeadersToMap(consumerRecord.headers());
+ processOLError(consumerRecord, instanceEvent, promise, eventPayload, headers);
return;
}
@@ -145,7 +151,6 @@ private void processUpdateResult(AsyncResult result,
} else {
var errorCause = result.cause();
LOGGER.error("Failed to update instance by jobId {}:{}", instanceEvent.getJobId(), errorCause);
- eventPayload.remove(CURRENT_RETRY_NUMBER);
linkUpdateReport = mapToLinkReport(instanceEvent, errorCause.getMessage());
promise.fail(errorCause);
}
@@ -153,30 +158,30 @@ private void processUpdateResult(AsyncResult result,
sendEventToKafka(linkUpdateReport, consumerRecord.headers());
}
- private void processOLError(KafkaConsumerRecord value,
+ private void processOLError(KafkaConsumerRecord consumerRecord,
+ MarcBibUpdate instanceEvent,
Promise promise,
- HashMap eventPayload,
- String jobId,
- AsyncResult ar) {
- int currentRetryNumber = Optional.ofNullable(eventPayload.get(CURRENT_RETRY_NUMBER))
- .map(Integer::parseInt).orElse(0);
- if (currentRetryNumber < MAX_RETRIES_COUNT) {
- eventPayload.put(CURRENT_RETRY_NUMBER, String.valueOf(currentRetryNumber + 1));
- LOGGER.warn("Error updating Instance: {}, jobId: {}, Retry MarcBibUpdateKafkaHandler handler...",
- ar.cause().getMessage(), jobId);
- handle(value).onComplete(result -> {
- if (result.succeeded()) {
- promise.complete(value.key());
- } else {
- promise.fail(result.cause());
- }
- });
- } else {
- eventPayload.remove(CURRENT_RETRY_NUMBER);
- String errMessage = format("Current retry number %s exceeded given number %s for the Instance update", MAX_RETRIES_COUNT, currentRetryNumber);
- LOGGER.error(errMessage);
- promise.fail(new OptimisticLockingException(errMessage));
+ Map eventPayload,
+ Map headers) {
+ int retryNumber = Optional.ofNullable(eventPayload.get(CURRENT_RETRY_NUMBER))
+ .map(Integer::parseInt)
+ .orElse(0);
+ if (retryNumber < MAX_RETRIES_COUNT) {
+ eventPayload.put(CURRENT_RETRY_NUMBER, String.valueOf(retryNumber + 1));
+ LOGGER.warn("Optimistic Locking Error on updating Instance, jobId: {}, Retry Instance update", instanceEvent.getJobId());
+
+ processEvent(instanceEvent, headers, eventPayload)
+ .onComplete(ar -> processUpdateResult(ar, consumerRecord, instanceEvent, eventPayload, promise));
+ return;
}
+
+ eventPayload.remove(CURRENT_RETRY_NUMBER);
+ String errMessage = format("Optimistic Locking Error, current retry number: %s exceeded the given max retry attempt of %s for Instance update", retryNumber, MAX_RETRIES_COUNT);
+ LOGGER.error(errMessage);
+ promise.fail(errMessage);
+
+ var linkUpdateReport = mapToLinkReport(instanceEvent, errMessage);
+ sendEventToKafka(linkUpdateReport, consumerRecord.headers());
}
private void sendEventToKafka(LinkUpdateReport linkUpdateReport, List kafkaHeaders) {
@@ -214,7 +219,7 @@ private static String formatTopicName(String env, String tenant, String eventTyp
return String.join(".", env, tenant, eventType);
}
- private void ensureEventPayloadWithMappingMetadata(HashMap eventPayload, MappingMetadataDto mappingMetadataDto) {
+ private void ensureEventPayloadWithMappingMetadata(Map eventPayload, MappingMetadataDto mappingMetadataDto) {
eventPayload.put(MAPPING_RULES_KEY, mappingMetadataDto.getMappingRules());
eventPayload.put(MAPPING_PARAMS_KEY, mappingMetadataDto.getMappingParams());
}
diff --git a/src/main/java/org/folio/inventory/dataimport/handlers/actions/InstanceUpdateDelegate.java b/src/main/java/org/folio/inventory/dataimport/handlers/actions/InstanceUpdateDelegate.java
index c8072a668..6191900f6 100644
--- a/src/main/java/org/folio/inventory/dataimport/handlers/actions/InstanceUpdateDelegate.java
+++ b/src/main/java/org/folio/inventory/dataimport/handlers/actions/InstanceUpdateDelegate.java
@@ -61,7 +61,7 @@ public Future handle(Map eventPayload, Record marcReco
}
}
- public Instance handleBlocking(Map eventPayload, Record marcRecord, Context context) {
+ public Instance handleBlocking(Map eventPayload, Record marcRecord, Context context) throws Exception {
logParametersUpdateDelegate(LOGGER, eventPayload, marcRecord, context);
try {
JsonObject mappingRules = new JsonObject(eventPayload.get(MAPPING_RULES_KEY));
diff --git a/src/main/java/org/folio/inventory/dataimport/handlers/matching/util/EventHandlingUtil.java b/src/main/java/org/folio/inventory/dataimport/handlers/matching/util/EventHandlingUtil.java
index 373df85c0..ea0eddc49 100644
--- a/src/main/java/org/folio/inventory/dataimport/handlers/matching/util/EventHandlingUtil.java
+++ b/src/main/java/org/folio/inventory/dataimport/handlers/matching/util/EventHandlingUtil.java
@@ -15,6 +15,7 @@
import io.vertx.core.json.JsonObject;
public final class EventHandlingUtil {
+ public static final String OKAPI_USER_ID = "x-okapi-user-id";
private static final String CENTRAL_TENANT_ID = "CENTRAL_TENANT_ID";
private EventHandlingUtil() {}
diff --git a/src/main/java/org/folio/inventory/domain/SynchronousCollection.java b/src/main/java/org/folio/inventory/domain/SynchronousCollection.java
index 487b8eeb2..77fcabe3e 100644
--- a/src/main/java/org/folio/inventory/domain/SynchronousCollection.java
+++ b/src/main/java/org/folio/inventory/domain/SynchronousCollection.java
@@ -5,5 +5,5 @@
public interface SynchronousCollection {
- T findByIdAndUpdate(String id, JsonObject entity, Context context);
+ T findByIdAndUpdate(String id, JsonObject entity, Context context) throws Exception;
}
diff --git a/src/main/java/org/folio/inventory/storage/external/ExternalStorageModuleInstanceCollection.java b/src/main/java/org/folio/inventory/storage/external/ExternalStorageModuleInstanceCollection.java
index aee0a9680..90550b7fc 100644
--- a/src/main/java/org/folio/inventory/storage/external/ExternalStorageModuleInstanceCollection.java
+++ b/src/main/java/org/folio/inventory/storage/external/ExternalStorageModuleInstanceCollection.java
@@ -24,6 +24,7 @@
import org.folio.inventory.common.Context;
import org.folio.inventory.common.domain.Failure;
import org.folio.inventory.common.domain.Success;
+import org.folio.inventory.dataimport.exceptions.OptimisticLockingException;
import org.folio.inventory.domain.BatchResult;
import org.folio.inventory.domain.Metadata;
import org.folio.inventory.domain.instances.Instance;
@@ -126,40 +127,40 @@ private boolean isBatchResponse(Response response) {
}
@Override
- public Instance findByIdAndUpdate(String id, JsonObject instance, Context inventoryContext) {
- try {
- SynchronousHttpClient client = getSynchronousHttpClient(inventoryContext);
- var url = individualRecordLocation(id);
- var response = client.get(url);
- var responseBody = response.getBody();
-
- if (response.getStatusCode() == HttpStatus.SC_NOT_FOUND) {
- LOGGER.warn("Instance not found by id - {} : {}", id, responseBody);
- throw new NotFoundException(format("Instance not found by id - %s : %s", id, responseBody));
- } else if (response.getStatusCode() != HttpStatus.SC_OK) {
- LOGGER.warn("Failed to fetch Instance by id - {} : {}, {}",
- id, responseBody, response.getStatusCode());
- throw new ExternalResourceFetchException("Failed to fetch Instance record",
- responseBody, response.getStatusCode(), null);
- }
+ public Instance findByIdAndUpdate(String id, JsonObject instance, Context inventoryContext) throws Exception {
+ SynchronousHttpClient client = getSynchronousHttpClient(inventoryContext);
+ var url = individualRecordLocation(id);
+ var response = client.get(url);
+ var responseBody = response.getBody();
+
+ if (response.getStatusCode() == HttpStatus.SC_NOT_FOUND) {
+ LOGGER.warn("Instance not found by id - {} : {}", id, responseBody);
+ throw new NotFoundException(format("Instance not found by id - %s : %s", id, responseBody));
+ } else if (response.getStatusCode() != HttpStatus.SC_OK) {
+ LOGGER.warn("Failed to fetch Instance by id - {} : {}, {}",
+ id, responseBody, response.getStatusCode());
+ throw new ExternalResourceFetchException("Failed to fetch Instance record",
+ responseBody, response.getStatusCode(), null);
+ }
+
+ var jsonInstance = new JsonObject(responseBody);
+ var existingInstance = mapFromJson(jsonInstance);
+ var modified = modifyInstance(existingInstance, instance);
+ var modifiedInstance = mapToRequest(modified);
+
+ response = client.put(url, modifiedInstance);
+ var statusCode = response.getStatusCode();
+ if (statusCode != HttpStatus.SC_NO_CONTENT) {
+ var errorMessage = format("Failed to update Instance by id : %s, error : %s, status code %s",
+ id, response.getBody(), response.getStatusCode());
+ LOGGER.error(errorMessage);
- var jsonInstance = new JsonObject(responseBody);
- var existingInstance = mapFromJson(jsonInstance);
- var modified = modifyInstance(existingInstance, instance);
- var modifiedInstance = mapToRequest(modified);
-
- response = client.put(url, modifiedInstance);
- if (response.getStatusCode() != 204) {
- var errorMessage = format("Failed to update Instance by id - %s : %s, %s",
- id, response.getBody(), response.getStatusCode());
- LOGGER.warn(errorMessage);
- throw new InternalServerErrorException(errorMessage);
+ if (statusCode == HttpStatus.SC_CONFLICT) {
+ throw new OptimisticLockingException(errorMessage);
}
- return modified;
- } catch (Exception ex) {
- throw new InternalServerErrorException(
- format("Failed to find and update Instance by id - %s : %s", id, ex));
+ throw new InternalServerErrorException(errorMessage);
}
+ return modified;
}
private Instance modifyInstance(Instance existingInstance, JsonObject instance) {
diff --git a/src/test/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandlerTest.java b/src/test/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandlerTest.java
index d1c424167..5c7fa521f 100644
--- a/src/test/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandlerTest.java
+++ b/src/test/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandlerTest.java
@@ -13,7 +13,6 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
@@ -32,6 +31,7 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import lombok.SneakyThrows;
import net.mguenther.kafka.junit.EmbeddedKafkaCluster;
import net.mguenther.kafka.junit.ObserveKeyValues;
import net.mguenther.kafka.junit.ReadKeyValues;
@@ -115,6 +115,7 @@ public static void tearDownClass(TestContext context) {
}
@Before
+ @SneakyThrows
public void setUp() throws IOException {
JsonObject mappingRules = new JsonObject(TestUtil.readFileFromPath(MAPPING_RULES_PATH));
instance = Instance.fromJson(new JsonObject(TestUtil.readFileFromPath(INSTANCE_PATH)));
@@ -163,16 +164,13 @@ public void shouldReturnSucceededFutureWithObtainedRecordKey(TestContext context
context.assertTrue(ar.succeeded());
context.assertEquals(expectedKafkaRecordKey, ar.result());
- context.verify(vo -> {
- verify(mappingMetadataCache).getByRecordTypeBlocking(anyString(), any(Context.class), anyString());
- verify(mockedStorage).getInstanceCollection(any());
- verify(mockedInstanceCollection).findByIdAndUpdate(anyString(), any(), any());
- });
+ context.verify(vo -> verify(vo, 1));
async.complete();
});
}
@Test
+ @SneakyThrows
public void shouldReturnSucceededFutureAfterHandlingOptimisticLockingError(TestContext context) {
// given
Async async = context.async();
@@ -194,9 +192,7 @@ public void shouldReturnSucceededFutureAfterHandlingOptimisticLockingError(TestC
// then
future.onComplete(ar -> {
- verify(mappingMetadataCache, times(2)).getByRecordTypeBlocking(anyString(), any(Context.class), anyString());
- verify(mockedStorage, times(2)).getInstanceCollection(any());
- verify(mockedInstanceCollection, times(2)).findByIdAndUpdate(anyString(), any(), any());
+ verify(null, 2);
async.complete();
});
@@ -224,7 +220,7 @@ public void shouldReturnFailedFutureWhenMappingRulesNotFound(TestContext context
context.assertTrue(ar.failed());
context.assertTrue(ar.cause().getMessage().contains("MappingParameters and mapping rules snapshots were not found by jobId"));
verifyNoInteractions(mockedInstanceCollection);
- verify(mappingMetadataCache).getByRecordTypeBlocking(anyString(), any(Context.class), anyString());
+ Mockito.verify(mappingMetadataCache).getByRecordTypeBlocking(anyString(), any(Context.class), anyString());
async.complete();
});
}
@@ -350,4 +346,11 @@ public void shouldSendFailedLinkReportEvent() throws InterruptedException {
Assert.assertEquals("Can't find Instance by id: " + record.getId(), report.getFailCause());
});
}
+
+ @SneakyThrows
+ private void verify(Void vo, int n) {
+ Mockito.verify(mappingMetadataCache, times(n)).getByRecordTypeBlocking(anyString(), any(Context.class), anyString());
+ Mockito.verify(mockedStorage, times(n)).getInstanceCollection(any());
+ Mockito.verify(mockedInstanceCollection, times(n)).findByIdAndUpdate(anyString(), any(), any());
+ }
}