From d1745e75523195a2fba53bd8e6b261cad6f1de89 Mon Sep 17 00:00:00 2001 From: Mukhiddin Yusupov Date: Tue, 3 Dec 2024 21:03:16 +0500 Subject: [PATCH 1/3] MODINV-1125: fix handling optimistic locking error and retry mechanism on instance update --- .../consumers/MarcBibUpdateKafkaHandler.java | 134 +++++++++--------- .../actions/InstanceUpdateDelegate.java | 2 +- .../domain/SynchronousCollection.java | 2 +- ...ternalStorageModuleInstanceCollection.java | 63 ++++---- .../MarcBibUpdateKafkaHandlerTest.java | 23 +-- 5 files changed, 116 insertions(+), 108 deletions(-) diff --git a/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandler.java b/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandler.java index 04e1840dc..ce523527c 100644 --- a/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandler.java +++ b/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandler.java @@ -5,9 +5,11 @@ import static org.folio.inventory.EntityLinksKafkaTopic.LINKS_STATS; import static org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil.OKAPI_REQUEST_ID; import static org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil.OKAPI_USER_ID; +import static org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil.constructContext; import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.SUBFIELD_I; import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.TAG_999; import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_TYPE; +import static org.folio.kafka.KafkaHeaderUtils.kafkaHeadersToMap; import static org.folio.rest.jaxrs.model.LinkUpdateReport.Status.FAIL; import static org.folio.rest.jaxrs.model.LinkUpdateReport.Status.SUCCESS; import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER; @@ -35,12 +37,10 @@ import org.folio.inventory.dataimport.cache.MappingMetadataCache; import org.folio.inventory.dataimport.exceptions.OptimisticLockingException; import org.folio.inventory.dataimport.handlers.actions.InstanceUpdateDelegate; -import org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil; import org.folio.inventory.dataimport.util.AdditionalFieldsUtil; import org.folio.inventory.domain.instances.Instance; import org.folio.kafka.AsyncRecordHandler; import org.folio.kafka.KafkaConfig; -import org.folio.kafka.KafkaHeaderUtils; import org.folio.kafka.SimpleKafkaProducerManager; import org.folio.kafka.services.KafkaProducerRecordBuilder; import org.folio.processing.exceptions.EventProcessingException; @@ -78,11 +78,9 @@ public MarcBibUpdateKafkaHandler(Vertx vertx, int maxDistributionNumber, KafkaCo @Override public Future handle(KafkaConsumerRecord consumerRecord) { try { - Promise promise = Promise.promise(); - MarcBibUpdate instanceEvent = OBJECT_MAPPER.readValue(consumerRecord.value(), MarcBibUpdate.class); - Map headersMap = KafkaHeaderUtils.kafkaHeadersToMap(consumerRecord.headers()); - HashMap metaDataPayload = new HashMap<>(); - var jobId = instanceEvent.getJobId(); + var instanceEvent = OBJECT_MAPPER.readValue(consumerRecord.value(), MarcBibUpdate.class); + var headers = kafkaHeadersToMap(consumerRecord.headers()); + Map metaDataPayload = new HashMap<>(); LOGGER.info("Event payload has been received with event type: {} by jobId: {}", instanceEvent.getType(), instanceEvent.getJobId()); @@ -91,38 +89,10 @@ public Future handle(KafkaConsumerRecord consumerRecord) LOGGER.error(message); return Future.failedFuture(message); } - Context context = EventHandlingUtil.constructContext(instanceEvent.getTenant(), headersMap.get(OKAPI_TOKEN_HEADER), headersMap.get(OKAPI_URL_HEADER), - headersMap.get(OKAPI_USER_ID), headersMap.get(OKAPI_REQUEST_ID)); - Record marcBibRecord = instanceEvent.getRecord(); - - io.vertx.core.Context vertxContext = Vertx.currentContext(); - - if(vertxContext == null) { - return Future.failedFuture("handle:: operation must be executed by a Vertx thread"); - } - - vertxContext.owner().executeBlocking( - () -> { - var mappingMetadataDto = - mappingMetadataCache.getByRecordTypeBlocking(jobId, context, MARC_BIB_RECORD_TYPE) - .orElseThrow(() -> new EventProcessingException(format(MAPPING_METADATA_NOT_FOUND_MSG, jobId))); - ensureEventPayloadWithMappingMetadata(metaDataPayload, mappingMetadataDto); - return instanceUpdateDelegate.handleBlocking(metaDataPayload, marcBibRecord, context); - }, - r -> { - if (r.failed()) { - LOGGER.warn("handle:: Error during instance update", r.cause()); - promise.fail(r.cause()); - } else { - LOGGER.debug("handle:: Instance update was successful"); - promise.complete(r.result()); - } - } - ); Promise finalPromise = Promise.promise(); - promise.future() - .onComplete(ar -> processUpdateResult(ar, metaDataPayload, finalPromise, consumerRecord, instanceEvent)); + processEvent(instanceEvent, headers, metaDataPayload) + .onComplete(ar -> processUpdateResult(ar, consumerRecord, instanceEvent, metaDataPayload, finalPromise)); return finalPromise.future(); } catch (Exception ex) { LOGGER.error(format("Failed to process kafka record from topic %s", consumerRecord.topic()), ex); @@ -130,13 +100,48 @@ public Future handle(KafkaConsumerRecord consumerRecord) } } + private Future processEvent(MarcBibUpdate instanceEvent, Map headers, Map metaDataPayload) { + Context context = constructContext(instanceEvent.getTenant(), headers.get(OKAPI_TOKEN_HEADER), headers.get(OKAPI_URL_HEADER), + headers.get(OKAPI_USER_ID), headers.get(OKAPI_REQUEST_ID)); + Record marcBibRecord = instanceEvent.getRecord(); + var jobId = instanceEvent.getJobId(); + Promise promise = Promise.promise(); + io.vertx.core.Context vertxContext = Vertx.currentContext(); + + if(vertxContext == null) { + return Future.failedFuture("handle:: operation must be executed by a Vertx thread"); + } + + vertxContext.owner().executeBlocking( + () -> { + var mappingMetadataDto = + mappingMetadataCache.getByRecordTypeBlocking(jobId, context, MARC_BIB_RECORD_TYPE) + .orElseThrow(() -> new EventProcessingException(format(MAPPING_METADATA_NOT_FOUND_MSG, jobId))); + ensureEventPayloadWithMappingMetadata(metaDataPayload, mappingMetadataDto); + return instanceUpdateDelegate.handleBlocking(metaDataPayload, marcBibRecord, context); + }, + r -> { + if (r.failed()) { + LOGGER.warn("handle:: Error during instance update", r.cause()); + promise.fail(r.cause()); + } else { + LOGGER.debug("handle:: Instance update was successful"); + promise.complete(r.result()); + } + } + ); + + return promise.future(); + } + private void processUpdateResult(AsyncResult result, - HashMap eventPayload, - Promise promise, KafkaConsumerRecord consumerRecord, - MarcBibUpdate instanceEvent) { + MarcBibUpdate instanceEvent, + Map eventPayload, + Promise promise) { if (result.failed() && result.cause() instanceof OptimisticLockingException) { - processOLError(consumerRecord, promise, eventPayload, instanceEvent.getJobId(), result); + var headers = kafkaHeadersToMap(consumerRecord.headers()); + processOLError(consumerRecord, instanceEvent, promise, eventPayload, headers); return; } @@ -147,7 +152,6 @@ private void processUpdateResult(AsyncResult result, } else { var errorCause = result.cause(); LOGGER.error("Failed to update instance by jobId {}:{}", instanceEvent.getJobId(), errorCause); - eventPayload.remove(CURRENT_RETRY_NUMBER); linkUpdateReport = mapToLinkReport(instanceEvent, errorCause.getMessage()); promise.fail(errorCause); } @@ -155,30 +159,30 @@ private void processUpdateResult(AsyncResult result, sendEventToKafka(linkUpdateReport, consumerRecord.headers()); } - private void processOLError(KafkaConsumerRecord value, + private void processOLError(KafkaConsumerRecord consumerRecord, + MarcBibUpdate instanceEvent, Promise promise, - HashMap eventPayload, - String jobId, - AsyncResult ar) { - int currentRetryNumber = Optional.ofNullable(eventPayload.get(CURRENT_RETRY_NUMBER)) - .map(Integer::parseInt).orElse(0); - if (currentRetryNumber < MAX_RETRIES_COUNT) { - eventPayload.put(CURRENT_RETRY_NUMBER, String.valueOf(currentRetryNumber + 1)); - LOGGER.warn("Error updating Instance: {}, jobId: {}, Retry MarcBibUpdateKafkaHandler handler...", - ar.cause().getMessage(), jobId); - handle(value).onComplete(result -> { - if (result.succeeded()) { - promise.complete(value.key()); - } else { - promise.fail(result.cause()); - } - }); - } else { - eventPayload.remove(CURRENT_RETRY_NUMBER); - String errMessage = format("Current retry number %s exceeded given number %s for the Instance update", MAX_RETRIES_COUNT, currentRetryNumber); - LOGGER.error(errMessage); - promise.fail(new OptimisticLockingException(errMessage)); + Map eventPayload, + Map headers) { + int retryNumber = Optional.ofNullable(eventPayload.get(CURRENT_RETRY_NUMBER)) + .map(Integer::parseInt) + .orElse(0); + if (retryNumber < MAX_RETRIES_COUNT) { + eventPayload.put(CURRENT_RETRY_NUMBER, String.valueOf(retryNumber + 1)); + LOGGER.warn("Optimistic Locking Error on updating Instance, jobId: {}, Retry Instance update", instanceEvent.getJobId()); + + processEvent(instanceEvent, headers, eventPayload) + .onComplete(ar -> processUpdateResult(ar, consumerRecord, instanceEvent, eventPayload, promise)); + return; } + + eventPayload.remove(CURRENT_RETRY_NUMBER); + String errMessage = format("Optimistic Locking Error, current retry number: %s exceeded the given max retry attempt of %s for Instance update", retryNumber, MAX_RETRIES_COUNT); + LOGGER.error(errMessage); + promise.fail(errMessage); + + var linkUpdateReport = mapToLinkReport(instanceEvent, errMessage); + sendEventToKafka(linkUpdateReport, consumerRecord.headers()); } private void sendEventToKafka(LinkUpdateReport linkUpdateReport, List kafkaHeaders) { @@ -216,7 +220,7 @@ private static String formatTopicName(String env, String tenant, String eventTyp return String.join(".", env, tenant, eventType); } - private void ensureEventPayloadWithMappingMetadata(HashMap eventPayload, MappingMetadataDto mappingMetadataDto) { + private void ensureEventPayloadWithMappingMetadata(Map eventPayload, MappingMetadataDto mappingMetadataDto) { eventPayload.put(MAPPING_RULES_KEY, mappingMetadataDto.getMappingRules()); eventPayload.put(MAPPING_PARAMS_KEY, mappingMetadataDto.getMappingParams()); } diff --git a/src/main/java/org/folio/inventory/dataimport/handlers/actions/InstanceUpdateDelegate.java b/src/main/java/org/folio/inventory/dataimport/handlers/actions/InstanceUpdateDelegate.java index c0cb085f4..04ff9602d 100644 --- a/src/main/java/org/folio/inventory/dataimport/handlers/actions/InstanceUpdateDelegate.java +++ b/src/main/java/org/folio/inventory/dataimport/handlers/actions/InstanceUpdateDelegate.java @@ -59,7 +59,7 @@ public Future handle(Map eventPayload, Record marcReco } } - public Instance handleBlocking(Map eventPayload, Record marcRecord, Context context) { + public Instance handleBlocking(Map eventPayload, Record marcRecord, Context context) throws Exception { logParametersUpdateDelegate(LOGGER, eventPayload, marcRecord, context); try { JsonObject mappingRules = new JsonObject(eventPayload.get(MAPPING_RULES_KEY)); diff --git a/src/main/java/org/folio/inventory/domain/SynchronousCollection.java b/src/main/java/org/folio/inventory/domain/SynchronousCollection.java index 487b8eeb2..77fcabe3e 100644 --- a/src/main/java/org/folio/inventory/domain/SynchronousCollection.java +++ b/src/main/java/org/folio/inventory/domain/SynchronousCollection.java @@ -5,5 +5,5 @@ public interface SynchronousCollection { - T findByIdAndUpdate(String id, JsonObject entity, Context context); + T findByIdAndUpdate(String id, JsonObject entity, Context context) throws Exception; } diff --git a/src/main/java/org/folio/inventory/storage/external/ExternalStorageModuleInstanceCollection.java b/src/main/java/org/folio/inventory/storage/external/ExternalStorageModuleInstanceCollection.java index 5dab1e11d..dcfc31803 100644 --- a/src/main/java/org/folio/inventory/storage/external/ExternalStorageModuleInstanceCollection.java +++ b/src/main/java/org/folio/inventory/storage/external/ExternalStorageModuleInstanceCollection.java @@ -24,6 +24,7 @@ import org.folio.inventory.common.Context; import org.folio.inventory.common.domain.Failure; import org.folio.inventory.common.domain.Success; +import org.folio.inventory.dataimport.exceptions.OptimisticLockingException; import org.folio.inventory.domain.BatchResult; import org.folio.inventory.domain.Metadata; import org.folio.inventory.domain.instances.Instance; @@ -128,40 +129,40 @@ private boolean isBatchResponse(Response response) { } @Override - public Instance findByIdAndUpdate(String id, JsonObject instance, Context inventoryContext) { - try { - SynchronousHttpClient client = getSynchronousHttpClient(inventoryContext); - var url = individualRecordLocation(id); - var response = client.get(url); - var responseBody = response.getBody(); - - if (response.getStatusCode() == HttpStatus.SC_NOT_FOUND) { - LOGGER.warn("Instance not found by id - {} : {}", id, responseBody); - throw new NotFoundException(format("Instance not found by id - %s : %s", id, responseBody)); - } else if (response.getStatusCode() != HttpStatus.SC_OK) { - LOGGER.warn("Failed to fetch Instance by id - {} : {}, {}", - id, responseBody, response.getStatusCode()); - throw new ExternalResourceFetchException("Failed to fetch Instance record", - responseBody, response.getStatusCode(), null); - } + public Instance findByIdAndUpdate(String id, JsonObject instance, Context inventoryContext) throws Exception { + SynchronousHttpClient client = getSynchronousHttpClient(inventoryContext); + var url = individualRecordLocation(id); + var response = client.get(url); + var responseBody = response.getBody(); + + if (response.getStatusCode() == HttpStatus.SC_NOT_FOUND) { + LOGGER.warn("Instance not found by id - {} : {}", id, responseBody); + throw new NotFoundException(format("Instance not found by id - %s : %s", id, responseBody)); + } else if (response.getStatusCode() != HttpStatus.SC_OK) { + LOGGER.warn("Failed to fetch Instance by id - {} : {}, {}", + id, responseBody, response.getStatusCode()); + throw new ExternalResourceFetchException("Failed to fetch Instance record", + responseBody, response.getStatusCode(), null); + } + + var jsonInstance = new JsonObject(responseBody); + var existingInstance = mapFromJson(jsonInstance); + var modified = modifyInstance(existingInstance, instance); + var modifiedInstance = mapToRequest(modified); + + response = client.put(url, modifiedInstance); + var statusCode = response.getStatusCode(); + if (statusCode != HttpStatus.SC_NO_CONTENT) { + var errorMessage = format("Failed to update Instance by id : %s, error : %s, status code %s", + id, response.getBody(), response.getStatusCode()); + LOGGER.error(errorMessage); - var jsonInstance = new JsonObject(responseBody); - var existingInstance = mapFromJson(jsonInstance); - var modified = modifyInstance(existingInstance, instance); - var modifiedInstance = mapToRequest(modified); - - response = client.put(url, modifiedInstance); - if (response.getStatusCode() != 204) { - var errorMessage = format("Failed to update Instance by id - %s : %s, %s", - id, response.getBody(), response.getStatusCode()); - LOGGER.warn(errorMessage); - throw new InternalServerErrorException(errorMessage); + if (statusCode == HttpStatus.SC_CONFLICT) { + throw new OptimisticLockingException(errorMessage); } - return modified; - } catch (Exception ex) { - throw new InternalServerErrorException( - format("Failed to find and update Instance by id - %s : %s", id, ex)); + throw new InternalServerErrorException(errorMessage); } + return modified; } private Instance modifyInstance(Instance existingInstance, JsonObject instance) { diff --git a/src/test/java/org/folio/inventory/dataimport/handlers/MarcBibUpdateKafkaHandlerTest.java b/src/test/java/org/folio/inventory/dataimport/handlers/MarcBibUpdateKafkaHandlerTest.java index 55c2a435c..af1cb429d 100644 --- a/src/test/java/org/folio/inventory/dataimport/handlers/MarcBibUpdateKafkaHandlerTest.java +++ b/src/test/java/org/folio/inventory/dataimport/handlers/MarcBibUpdateKafkaHandlerTest.java @@ -13,7 +13,6 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @@ -32,6 +31,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; +import lombok.SneakyThrows; import net.mguenther.kafka.junit.EmbeddedKafkaCluster; import net.mguenther.kafka.junit.ObserveKeyValues; import net.mguenther.kafka.junit.ReadKeyValues; @@ -116,6 +116,7 @@ public static void tearDownClass(TestContext context) { } @Before + @SneakyThrows public void setUp() throws IOException { JsonObject mappingRules = new JsonObject(TestUtil.readFileFromPath(MAPPING_RULES_PATH)); instance = Instance.fromJson(new JsonObject(TestUtil.readFileFromPath(INSTANCE_PATH))); @@ -164,16 +165,13 @@ public void shouldReturnSucceededFutureWithObtainedRecordKey(TestContext context context.assertTrue(ar.succeeded()); context.assertEquals(expectedKafkaRecordKey, ar.result()); - context.verify(vo -> { - verify(mappingMetadataCache).getByRecordTypeBlocking(anyString(), any(Context.class), anyString()); - verify(mockedStorage).getInstanceCollection(any()); - verify(mockedInstanceCollection).findByIdAndUpdate(anyString(), any(), any()); - }); + context.verify(vo -> verify(vo, 1)); async.complete(); }); } @Test + @SneakyThrows public void shouldReturnSucceededFutureAfterHandlingOptimisticLockingError(TestContext context) { // given Async async = context.async(); @@ -195,9 +193,7 @@ public void shouldReturnSucceededFutureAfterHandlingOptimisticLockingError(TestC // then future.onComplete(ar -> { - verify(mappingMetadataCache, times(2)).getByRecordTypeBlocking(anyString(), any(Context.class), anyString()); - verify(mockedStorage, times(2)).getInstanceCollection(any()); - verify(mockedInstanceCollection, times(2)).findByIdAndUpdate(anyString(), any(), any()); + verify(null, 2); async.complete(); }); @@ -225,7 +221,7 @@ public void shouldReturnFailedFutureWhenMappingRulesNotFound(TestContext context context.assertTrue(ar.failed()); context.assertTrue(ar.cause().getMessage().contains("MappingParameters and mapping rules snapshots were not found by jobId")); verifyNoInteractions(mockedInstanceCollection); - verify(mappingMetadataCache).getByRecordTypeBlocking(anyString(), any(Context.class), anyString()); + Mockito.verify(mappingMetadataCache).getByRecordTypeBlocking(anyString(), any(Context.class), anyString()); async.complete(); }); } @@ -351,4 +347,11 @@ public void shouldSendFailedLinkReportEvent() throws InterruptedException { Assert.assertEquals("Can't find Instance by id: " + record.getId(), report.getFailCause()); }); } + + @SneakyThrows + private void verify(Void vo, int n) { + Mockito.verify(mappingMetadataCache, times(n)).getByRecordTypeBlocking(anyString(), any(Context.class), anyString()); + Mockito.verify(mockedStorage, times(n)).getInstanceCollection(any()); + Mockito.verify(mockedInstanceCollection, times(n)).findByIdAndUpdate(anyString(), any(), any()); + } } From 69d65d0f07cd8d13c1f42be580fa1ec07ca1738c Mon Sep 17 00:00:00 2001 From: Mukhiddin Yusupov Date: Wed, 8 Jan 2025 20:03:52 +0500 Subject: [PATCH 2/3] MODINV-1125: increase default max retry count --- .../dataimport/consumers/MarcBibUpdateKafkaHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandler.java b/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandler.java index ce523527c..7fcc90351 100644 --- a/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandler.java +++ b/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandler.java @@ -57,7 +57,7 @@ public class MarcBibUpdateKafkaHandler implements AsyncRecordHandler Date: Thu, 9 Jan 2025 16:35:23 +0500 Subject: [PATCH 3/3] MODINV-1125: update NEWS.md --- NEWS.md | 1 + 1 file changed, 1 insertion(+) diff --git a/NEWS.md b/NEWS.md index 72e46c928..e6d87bdf3 100644 --- a/NEWS.md +++ b/NEWS.md @@ -2,6 +2,7 @@ * Provide consistent handling with concurrency two or more Marc Bib Update events for the same bib record [MODINV-1100](https://folio-org.atlassian.net/browse/MODINV-1100) * Enable system user for data-import processes [MODINV-1115](https://folio-org.atlassian.net/browse/MODINV-1115) * Missing x-okapi-user-id header in communications with inventory-storage [MODINV-1134](https://folio-org.atlassian.net/browse/MODINV-1134) +* Fix handling optimistic locking behavior for instance update when consuming Marc Bib update event [MODINV-1125](https://folio-org.atlassian.net/browse/MODINV-1125) ## 21.0.0 2024-10-29 * Existing "035" field is not retained the original position in imported record [MODINV-1049](https://folio-org.atlassian.net/browse/MODINV-1049)