Skip to content

Commit

Permalink
MODINV-1125: Fix handling Optimistic Locking error and retry mechanis…
Browse files Browse the repository at this point in the history
…m on Instance update (#799)

* MODINV-1125: fix handling optimistic locking error and retry mechanism on instance update
  • Loading branch information
mukhiddin-yusuf committed Jan 23, 2025
1 parent 27ac8e3 commit 2dd95f7
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 109 deletions.
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 21.0.9 2025-01-23
* Fix handling optimistic locking behavior for instance update when consuming Marc Bib update event [MODINV-1125](https://folio-org.atlassian.net/browse/MODINV-1125)

## 21.0.8 2025-01-23
* Make dependencies on circulation interfaces optional [MODINV-1159](https://folio-org.atlassian.net/browse/MODINV-1159)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import static org.folio.inventory.EntityLinksKafkaTopic.LINKS_STATS;
import static org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil.OKAPI_REQUEST_ID;
import static org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil.OKAPI_USER_ID;
import static org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil.constructContext;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.SUBFIELD_I;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.TAG_999;
import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_TYPE;
import static org.folio.kafka.KafkaHeaderUtils.kafkaHeadersToMap;
import static org.folio.rest.jaxrs.model.LinkUpdateReport.Status.FAIL;
import static org.folio.rest.jaxrs.model.LinkUpdateReport.Status.SUCCESS;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
Expand Down Expand Up @@ -35,12 +37,10 @@
import org.folio.inventory.dataimport.cache.MappingMetadataCache;
import org.folio.inventory.dataimport.exceptions.OptimisticLockingException;
import org.folio.inventory.dataimport.handlers.actions.InstanceUpdateDelegate;
import org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil;
import org.folio.inventory.dataimport.util.AdditionalFieldsUtil;
import org.folio.inventory.domain.instances.Instance;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaHeaderUtils;
import org.folio.kafka.SimpleKafkaProducerManager;
import org.folio.kafka.services.KafkaProducerRecordBuilder;
import org.folio.processing.exceptions.EventProcessingException;
Expand All @@ -57,7 +57,7 @@ public class MarcBibUpdateKafkaHandler implements AsyncRecordHandler<String, Str
private static final String MAPPING_RULES_KEY = "MAPPING_RULES";
private static final String MAPPING_PARAMS_KEY = "MAPPING_PARAMS";
private static final String CURRENT_RETRY_NUMBER = "CURRENT_RETRY_NUMBER";
private static final int MAX_RETRIES_COUNT = Integer.parseInt(System.getenv().getOrDefault("inventory.di.ol.retry.number", "1"));
private static final int MAX_RETRIES_COUNT = Integer.parseInt(System.getenv().getOrDefault("inventory.di.ol.retry.number", "3"));

private final InstanceUpdateDelegate instanceUpdateDelegate;
private final MappingMetadataCache mappingMetadataCache;
Expand All @@ -78,11 +78,9 @@ public MarcBibUpdateKafkaHandler(Vertx vertx, int maxDistributionNumber, KafkaCo
@Override
public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord) {
try {
Promise<Instance> promise = Promise.promise();
MarcBibUpdate instanceEvent = OBJECT_MAPPER.readValue(consumerRecord.value(), MarcBibUpdate.class);
Map<String, String> headersMap = KafkaHeaderUtils.kafkaHeadersToMap(consumerRecord.headers());
HashMap<String, String> metaDataPayload = new HashMap<>();
var jobId = instanceEvent.getJobId();
var instanceEvent = OBJECT_MAPPER.readValue(consumerRecord.value(), MarcBibUpdate.class);
var headers = kafkaHeadersToMap(consumerRecord.headers());
Map<String, String> metaDataPayload = new HashMap<>();

LOGGER.info("Event payload has been received with event type: {} by jobId: {}", instanceEvent.getType(), instanceEvent.getJobId());

Expand All @@ -91,52 +89,59 @@ public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord)
LOGGER.error(message);
return Future.failedFuture(message);
}
Context context = EventHandlingUtil.constructContext(instanceEvent.getTenant(), headersMap.get(OKAPI_TOKEN_HEADER), headersMap.get(OKAPI_URL_HEADER),
headersMap.get(OKAPI_USER_ID), headersMap.get(OKAPI_REQUEST_ID));
Record marcBibRecord = instanceEvent.getRecord();

io.vertx.core.Context vertxContext = Vertx.currentContext();

if(vertxContext == null) {
return Future.failedFuture("handle:: operation must be executed by a Vertx thread");
}

vertxContext.owner().executeBlocking(
() -> {
var mappingMetadataDto =
mappingMetadataCache.getByRecordTypeBlocking(jobId, context, MARC_BIB_RECORD_TYPE)
.orElseThrow(() -> new EventProcessingException(format(MAPPING_METADATA_NOT_FOUND_MSG, jobId)));
ensureEventPayloadWithMappingMetadata(metaDataPayload, mappingMetadataDto);
return instanceUpdateDelegate.handleBlocking(metaDataPayload, marcBibRecord, context);
},
r -> {
if (r.failed()) {
LOGGER.warn("handle:: Error during instance update", r.cause());
promise.fail(r.cause());
} else {
LOGGER.debug("handle:: Instance update was successful");
promise.complete(r.result());
}
}
);

Promise<String> finalPromise = Promise.promise();
promise.future()
.onComplete(ar -> processUpdateResult(ar, metaDataPayload, finalPromise, consumerRecord, instanceEvent));
processEvent(instanceEvent, headers, metaDataPayload)
.onComplete(ar -> processUpdateResult(ar, consumerRecord, instanceEvent, metaDataPayload, finalPromise));
return finalPromise.future();
} catch (Exception ex) {
LOGGER.error(format("Failed to process kafka record from topic %s", consumerRecord.topic()), ex);
return Future.failedFuture(ex);
}
}

private Future<Instance> processEvent(MarcBibUpdate instanceEvent, Map<String, String> headers, Map<String, String> metaDataPayload) {
Context context = constructContext(instanceEvent.getTenant(), headers.get(OKAPI_TOKEN_HEADER), headers.get(OKAPI_URL_HEADER),
headers.get(OKAPI_USER_ID), headers.get(OKAPI_REQUEST_ID));
Record marcBibRecord = instanceEvent.getRecord();
var jobId = instanceEvent.getJobId();
Promise<Instance> promise = Promise.promise();
io.vertx.core.Context vertxContext = Vertx.currentContext();

if(vertxContext == null) {
return Future.failedFuture("handle:: operation must be executed by a Vertx thread");
}

vertxContext.owner().executeBlocking(
() -> {
var mappingMetadataDto =
mappingMetadataCache.getByRecordTypeBlocking(jobId, context, MARC_BIB_RECORD_TYPE)
.orElseThrow(() -> new EventProcessingException(format(MAPPING_METADATA_NOT_FOUND_MSG, jobId)));
ensureEventPayloadWithMappingMetadata(metaDataPayload, mappingMetadataDto);
return instanceUpdateDelegate.handleBlocking(metaDataPayload, marcBibRecord, context);
},
r -> {
if (r.failed()) {
LOGGER.warn("handle:: Error during instance update", r.cause());
promise.fail(r.cause());
} else {
LOGGER.debug("handle:: Instance update was successful");
promise.complete(r.result());
}
}
);

return promise.future();
}

private void processUpdateResult(AsyncResult<Instance> result,
HashMap<String, String> eventPayload,
Promise<String> promise,
KafkaConsumerRecord<String, String> consumerRecord,
MarcBibUpdate instanceEvent) {
MarcBibUpdate instanceEvent,
Map<String, String> eventPayload,
Promise<String> promise) {
if (result.failed() && result.cause() instanceof OptimisticLockingException) {
processOLError(consumerRecord, promise, eventPayload, instanceEvent.getJobId(), result);
var headers = kafkaHeadersToMap(consumerRecord.headers());
processOLError(consumerRecord, instanceEvent, promise, eventPayload, headers);
return;
}

Expand All @@ -147,38 +152,37 @@ private void processUpdateResult(AsyncResult<Instance> result,
} else {
var errorCause = result.cause();
LOGGER.error("Failed to update instance by jobId {}:{}", instanceEvent.getJobId(), errorCause);
eventPayload.remove(CURRENT_RETRY_NUMBER);
linkUpdateReport = mapToLinkReport(instanceEvent, errorCause.getMessage());
promise.fail(errorCause);
}

sendEventToKafka(linkUpdateReport, consumerRecord.headers());
}

private void processOLError(KafkaConsumerRecord<String, String> value,
private void processOLError(KafkaConsumerRecord<String, String> consumerRecord,
MarcBibUpdate instanceEvent,
Promise<String> promise,
HashMap<String, String> eventPayload,
String jobId,
AsyncResult<Instance> ar) {
int currentRetryNumber = Optional.ofNullable(eventPayload.get(CURRENT_RETRY_NUMBER))
.map(Integer::parseInt).orElse(0);
if (currentRetryNumber < MAX_RETRIES_COUNT) {
eventPayload.put(CURRENT_RETRY_NUMBER, String.valueOf(currentRetryNumber + 1));
LOGGER.warn("Error updating Instance: {}, jobId: {}, Retry MarcBibUpdateKafkaHandler handler...",
ar.cause().getMessage(), jobId);
handle(value).onComplete(result -> {
if (result.succeeded()) {
promise.complete(value.key());
} else {
promise.fail(result.cause());
}
});
} else {
eventPayload.remove(CURRENT_RETRY_NUMBER);
String errMessage = format("Current retry number %s exceeded given number %s for the Instance update", MAX_RETRIES_COUNT, currentRetryNumber);
LOGGER.error(errMessage);
promise.fail(new OptimisticLockingException(errMessage));
Map<String, String> eventPayload,
Map<String, String> headers) {
int retryNumber = Optional.ofNullable(eventPayload.get(CURRENT_RETRY_NUMBER))
.map(Integer::parseInt)
.orElse(0);
if (retryNumber < MAX_RETRIES_COUNT) {
eventPayload.put(CURRENT_RETRY_NUMBER, String.valueOf(retryNumber + 1));
LOGGER.warn("Optimistic Locking Error on updating Instance, jobId: {}, Retry Instance update", instanceEvent.getJobId());

processEvent(instanceEvent, headers, eventPayload)
.onComplete(ar -> processUpdateResult(ar, consumerRecord, instanceEvent, eventPayload, promise));
return;
}

eventPayload.remove(CURRENT_RETRY_NUMBER);
String errMessage = format("Optimistic Locking Error, current retry number: %s exceeded the given max retry attempt of %s for Instance update", retryNumber, MAX_RETRIES_COUNT);
LOGGER.error(errMessage);
promise.fail(errMessage);

var linkUpdateReport = mapToLinkReport(instanceEvent, errMessage);
sendEventToKafka(linkUpdateReport, consumerRecord.headers());
}

private void sendEventToKafka(LinkUpdateReport linkUpdateReport, List<KafkaHeader> kafkaHeaders) {
Expand Down Expand Up @@ -216,7 +220,7 @@ private static String formatTopicName(String env, String tenant, String eventTyp
return String.join(".", env, tenant, eventType);
}

private void ensureEventPayloadWithMappingMetadata(HashMap<String, String> eventPayload, MappingMetadataDto mappingMetadataDto) {
private void ensureEventPayloadWithMappingMetadata(Map<String, String> eventPayload, MappingMetadataDto mappingMetadataDto) {
eventPayload.put(MAPPING_RULES_KEY, mappingMetadataDto.getMappingRules());
eventPayload.put(MAPPING_PARAMS_KEY, mappingMetadataDto.getMappingParams());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public Future<Instance> handle(Map<String, String> eventPayload, Record marcReco
}
}

public Instance handleBlocking(Map<String, String> eventPayload, Record marcRecord, Context context) {
public Instance handleBlocking(Map<String, String> eventPayload, Record marcRecord, Context context) throws Exception {
logParametersUpdateDelegate(LOGGER, eventPayload, marcRecord, context);
try {
JsonObject mappingRules = new JsonObject(eventPayload.get(MAPPING_RULES_KEY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@

public interface SynchronousCollection<T> {

T findByIdAndUpdate(String id, JsonObject entity, Context context);
T findByIdAndUpdate(String id, JsonObject entity, Context context) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.folio.inventory.common.Context;
import org.folio.inventory.common.domain.Failure;
import org.folio.inventory.common.domain.Success;
import org.folio.inventory.dataimport.exceptions.OptimisticLockingException;
import org.folio.inventory.domain.BatchResult;
import org.folio.inventory.domain.Metadata;
import org.folio.inventory.domain.instances.Instance;
Expand Down Expand Up @@ -128,40 +129,40 @@ private boolean isBatchResponse(Response response) {
}

@Override
public Instance findByIdAndUpdate(String id, JsonObject instance, Context inventoryContext) {
try {
SynchronousHttpClient client = getSynchronousHttpClient(inventoryContext);
var url = individualRecordLocation(id);
var response = client.get(url);
var responseBody = response.getBody();

if (response.getStatusCode() == HttpStatus.SC_NOT_FOUND) {
LOGGER.warn("Instance not found by id - {} : {}", id, responseBody);
throw new NotFoundException(format("Instance not found by id - %s : %s", id, responseBody));
} else if (response.getStatusCode() != HttpStatus.SC_OK) {
LOGGER.warn("Failed to fetch Instance by id - {} : {}, {}",
id, responseBody, response.getStatusCode());
throw new ExternalResourceFetchException("Failed to fetch Instance record",
responseBody, response.getStatusCode(), null);
}
public Instance findByIdAndUpdate(String id, JsonObject instance, Context inventoryContext) throws Exception {
SynchronousHttpClient client = getSynchronousHttpClient(inventoryContext);
var url = individualRecordLocation(id);
var response = client.get(url);
var responseBody = response.getBody();

if (response.getStatusCode() == HttpStatus.SC_NOT_FOUND) {
LOGGER.warn("Instance not found by id - {} : {}", id, responseBody);
throw new NotFoundException(format("Instance not found by id - %s : %s", id, responseBody));
} else if (response.getStatusCode() != HttpStatus.SC_OK) {
LOGGER.warn("Failed to fetch Instance by id - {} : {}, {}",
id, responseBody, response.getStatusCode());
throw new ExternalResourceFetchException("Failed to fetch Instance record",
responseBody, response.getStatusCode(), null);
}

var jsonInstance = new JsonObject(responseBody);
var existingInstance = mapFromJson(jsonInstance);
var modified = modifyInstance(existingInstance, instance);
var modifiedInstance = mapToRequest(modified);

response = client.put(url, modifiedInstance);
var statusCode = response.getStatusCode();
if (statusCode != HttpStatus.SC_NO_CONTENT) {
var errorMessage = format("Failed to update Instance by id : %s, error : %s, status code %s",
id, response.getBody(), response.getStatusCode());
LOGGER.error(errorMessage);

var jsonInstance = new JsonObject(responseBody);
var existingInstance = mapFromJson(jsonInstance);
var modified = modifyInstance(existingInstance, instance);
var modifiedInstance = mapToRequest(modified);

response = client.put(url, modifiedInstance);
if (response.getStatusCode() != 204) {
var errorMessage = format("Failed to update Instance by id - %s : %s, %s",
id, response.getBody(), response.getStatusCode());
LOGGER.warn(errorMessage);
throw new InternalServerErrorException(errorMessage);
if (statusCode == HttpStatus.SC_CONFLICT) {
throw new OptimisticLockingException(errorMessage);
}
return modified;
} catch (Exception ex) {
throw new InternalServerErrorException(
format("Failed to find and update Instance by id - %s : %s", id, ex));
throw new InternalServerErrorException(errorMessage);
}
return modified;
}

private Instance modifyInstance(Instance existingInstance, JsonObject instance) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

Expand All @@ -32,6 +31,7 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import lombok.SneakyThrows;
import net.mguenther.kafka.junit.EmbeddedKafkaCluster;
import net.mguenther.kafka.junit.ObserveKeyValues;
import net.mguenther.kafka.junit.ReadKeyValues;
Expand Down Expand Up @@ -116,6 +116,7 @@ public static void tearDownClass(TestContext context) {
}

@Before
@SneakyThrows
public void setUp() throws IOException {
JsonObject mappingRules = new JsonObject(TestUtil.readFileFromPath(MAPPING_RULES_PATH));
instance = Instance.fromJson(new JsonObject(TestUtil.readFileFromPath(INSTANCE_PATH)));
Expand Down Expand Up @@ -164,16 +165,13 @@ public void shouldReturnSucceededFutureWithObtainedRecordKey(TestContext context
context.assertTrue(ar.succeeded());
context.assertEquals(expectedKafkaRecordKey, ar.result());

context.verify(vo -> {
verify(mappingMetadataCache).getByRecordTypeBlocking(anyString(), any(Context.class), anyString());
verify(mockedStorage).getInstanceCollection(any());
verify(mockedInstanceCollection).findByIdAndUpdate(anyString(), any(), any());
});
context.verify(vo -> verify(vo, 1));
async.complete();
});
}

@Test
@SneakyThrows
public void shouldReturnSucceededFutureAfterHandlingOptimisticLockingError(TestContext context) {
// given
Async async = context.async();
Expand All @@ -195,9 +193,7 @@ public void shouldReturnSucceededFutureAfterHandlingOptimisticLockingError(TestC

// then
future.onComplete(ar -> {
verify(mappingMetadataCache, times(2)).getByRecordTypeBlocking(anyString(), any(Context.class), anyString());
verify(mockedStorage, times(2)).getInstanceCollection(any());
verify(mockedInstanceCollection, times(2)).findByIdAndUpdate(anyString(), any(), any());
verify(null, 2);

async.complete();
});
Expand Down Expand Up @@ -225,7 +221,7 @@ public void shouldReturnFailedFutureWhenMappingRulesNotFound(TestContext context
context.assertTrue(ar.failed());
context.assertTrue(ar.cause().getMessage().contains("MappingParameters and mapping rules snapshots were not found by jobId"));
verifyNoInteractions(mockedInstanceCollection);
verify(mappingMetadataCache).getByRecordTypeBlocking(anyString(), any(Context.class), anyString());
Mockito.verify(mappingMetadataCache).getByRecordTypeBlocking(anyString(), any(Context.class), anyString());
async.complete();
});
}
Expand Down Expand Up @@ -351,4 +347,11 @@ public void shouldSendFailedLinkReportEvent() throws InterruptedException {
Assert.assertEquals("Can't find Instance by id: " + record.getId(), report.getFailCause());
});
}

@SneakyThrows
private void verify(Void vo, int n) {
Mockito.verify(mappingMetadataCache, times(n)).getByRecordTypeBlocking(anyString(), any(Context.class), anyString());
Mockito.verify(mockedStorage, times(n)).getInstanceCollection(any());
Mockito.verify(mockedInstanceCollection, times(n)).findByIdAndUpdate(anyString(), any(), any());
}
}

0 comments on commit 2dd95f7

Please sign in to comment.