Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MODINV-1125: Fix handling Optimistic Locking error and retry mechanism on Instance update #799

Merged
merged 7 commits into from
Jan 22, 2025
Merged
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* Provide consistent handling with concurrency two or more Marc Bib Update events for the same bib record [MODINV-1100](https://folio-org.atlassian.net/browse/MODINV-1100)
* Enable system user for data-import processes [MODINV-1115](https://folio-org.atlassian.net/browse/MODINV-1115)
* Missing x-okapi-user-id header in communications with inventory-storage [MODINV-1134](https://folio-org.atlassian.net/browse/MODINV-1134)
* Fix handling optimistic locking behavior for instance update when consuming Marc Bib update event [MODINV-1125](https://folio-org.atlassian.net/browse/MODINV-1125)

## 21.0.0 2024-10-29
* Existing "035" field is not retained the original position in imported record [MODINV-1049](https://folio-org.atlassian.net/browse/MODINV-1049)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import static org.folio.inventory.EntityLinksKafkaTopic.LINKS_STATS;
import static org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil.OKAPI_REQUEST_ID;
import static org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil.OKAPI_USER_ID;
import static org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil.constructContext;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.SUBFIELD_I;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.TAG_999;
import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_TYPE;
import static org.folio.kafka.KafkaHeaderUtils.kafkaHeadersToMap;
import static org.folio.rest.jaxrs.model.LinkUpdateReport.Status.FAIL;
import static org.folio.rest.jaxrs.model.LinkUpdateReport.Status.SUCCESS;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
Expand Down Expand Up @@ -35,12 +37,10 @@
import org.folio.inventory.dataimport.cache.MappingMetadataCache;
import org.folio.inventory.dataimport.exceptions.OptimisticLockingException;
import org.folio.inventory.dataimport.handlers.actions.InstanceUpdateDelegate;
import org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil;
import org.folio.inventory.dataimport.util.AdditionalFieldsUtil;
import org.folio.inventory.domain.instances.Instance;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaHeaderUtils;
import org.folio.kafka.SimpleKafkaProducerManager;
import org.folio.kafka.services.KafkaProducerRecordBuilder;
import org.folio.processing.exceptions.EventProcessingException;
Expand All @@ -57,7 +57,7 @@ public class MarcBibUpdateKafkaHandler implements AsyncRecordHandler<String, Str
private static final String MAPPING_RULES_KEY = "MAPPING_RULES";
private static final String MAPPING_PARAMS_KEY = "MAPPING_PARAMS";
private static final String CURRENT_RETRY_NUMBER = "CURRENT_RETRY_NUMBER";
private static final int MAX_RETRIES_COUNT = Integer.parseInt(System.getenv().getOrDefault("inventory.di.ol.retry.number", "1"));
private static final int MAX_RETRIES_COUNT = Integer.parseInt(System.getenv().getOrDefault("inventory.di.ol.retry.number", "3"));

private final InstanceUpdateDelegate instanceUpdateDelegate;
private final MappingMetadataCache mappingMetadataCache;
Expand All @@ -78,11 +78,9 @@ public MarcBibUpdateKafkaHandler(Vertx vertx, int maxDistributionNumber, KafkaCo
@Override
public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord) {
try {
Promise<Instance> promise = Promise.promise();
MarcBibUpdate instanceEvent = OBJECT_MAPPER.readValue(consumerRecord.value(), MarcBibUpdate.class);
Map<String, String> headersMap = KafkaHeaderUtils.kafkaHeadersToMap(consumerRecord.headers());
HashMap<String, String> metaDataPayload = new HashMap<>();
var jobId = instanceEvent.getJobId();
var instanceEvent = OBJECT_MAPPER.readValue(consumerRecord.value(), MarcBibUpdate.class);
var headers = kafkaHeadersToMap(consumerRecord.headers());
Map<String, String> metaDataPayload = new HashMap<>();

LOGGER.info("Event payload has been received with event type: {} by jobId: {}", instanceEvent.getType(), instanceEvent.getJobId());

Expand All @@ -91,52 +89,59 @@ public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord)
LOGGER.error(message);
return Future.failedFuture(message);
}
Context context = EventHandlingUtil.constructContext(instanceEvent.getTenant(), headersMap.get(OKAPI_TOKEN_HEADER), headersMap.get(OKAPI_URL_HEADER),
headersMap.get(OKAPI_USER_ID), headersMap.get(OKAPI_REQUEST_ID));
Record marcBibRecord = instanceEvent.getRecord();

io.vertx.core.Context vertxContext = Vertx.currentContext();

if(vertxContext == null) {
return Future.failedFuture("handle:: operation must be executed by a Vertx thread");
}

vertxContext.owner().executeBlocking(
() -> {
var mappingMetadataDto =
mappingMetadataCache.getByRecordTypeBlocking(jobId, context, MARC_BIB_RECORD_TYPE)
.orElseThrow(() -> new EventProcessingException(format(MAPPING_METADATA_NOT_FOUND_MSG, jobId)));
ensureEventPayloadWithMappingMetadata(metaDataPayload, mappingMetadataDto);
return instanceUpdateDelegate.handleBlocking(metaDataPayload, marcBibRecord, context);
},
r -> {
if (r.failed()) {
LOGGER.warn("handle:: Error during instance update", r.cause());
promise.fail(r.cause());
} else {
LOGGER.debug("handle:: Instance update was successful");
promise.complete(r.result());
}
}
);

Promise<String> finalPromise = Promise.promise();
promise.future()
.onComplete(ar -> processUpdateResult(ar, metaDataPayload, finalPromise, consumerRecord, instanceEvent));
processEvent(instanceEvent, headers, metaDataPayload)
.onComplete(ar -> processUpdateResult(ar, consumerRecord, instanceEvent, metaDataPayload, finalPromise));
return finalPromise.future();
} catch (Exception ex) {
LOGGER.error(format("Failed to process kafka record from topic %s", consumerRecord.topic()), ex);
return Future.failedFuture(ex);
}
}

private Future<Instance> processEvent(MarcBibUpdate instanceEvent, Map<String, String> headers, Map<String, String> metaDataPayload) {
Context context = constructContext(instanceEvent.getTenant(), headers.get(OKAPI_TOKEN_HEADER), headers.get(OKAPI_URL_HEADER),
headers.get(OKAPI_USER_ID), headers.get(OKAPI_REQUEST_ID));
Record marcBibRecord = instanceEvent.getRecord();
var jobId = instanceEvent.getJobId();
Promise<Instance> promise = Promise.promise();
io.vertx.core.Context vertxContext = Vertx.currentContext();

if(vertxContext == null) {
return Future.failedFuture("handle:: operation must be executed by a Vertx thread");
VRohach marked this conversation as resolved.
Show resolved Hide resolved
}

vertxContext.owner().executeBlocking(
() -> {
var mappingMetadataDto =
mappingMetadataCache.getByRecordTypeBlocking(jobId, context, MARC_BIB_RECORD_TYPE)
.orElseThrow(() -> new EventProcessingException(format(MAPPING_METADATA_NOT_FOUND_MSG, jobId)));
ensureEventPayloadWithMappingMetadata(metaDataPayload, mappingMetadataDto);
return instanceUpdateDelegate.handleBlocking(metaDataPayload, marcBibRecord, context);
},
r -> {
if (r.failed()) {
LOGGER.warn("handle:: Error during instance update", r.cause());
promise.fail(r.cause());
} else {
LOGGER.debug("handle:: Instance update was successful");
promise.complete(r.result());
}
}
);

return promise.future();
}

private void processUpdateResult(AsyncResult<Instance> result,
HashMap<String, String> eventPayload,
Promise<String> promise,
KafkaConsumerRecord<String, String> consumerRecord,
MarcBibUpdate instanceEvent) {
MarcBibUpdate instanceEvent,
Map<String, String> eventPayload,
Promise<String> promise) {
if (result.failed() && result.cause() instanceof OptimisticLockingException) {
processOLError(consumerRecord, promise, eventPayload, instanceEvent.getJobId(), result);
var headers = kafkaHeadersToMap(consumerRecord.headers());
processOLError(consumerRecord, instanceEvent, promise, eventPayload, headers);
return;
}

Expand All @@ -147,38 +152,37 @@ private void processUpdateResult(AsyncResult<Instance> result,
} else {
var errorCause = result.cause();
LOGGER.error("Failed to update instance by jobId {}:{}", instanceEvent.getJobId(), errorCause);
eventPayload.remove(CURRENT_RETRY_NUMBER);
linkUpdateReport = mapToLinkReport(instanceEvent, errorCause.getMessage());
promise.fail(errorCause);
}

sendEventToKafka(linkUpdateReport, consumerRecord.headers());
}

private void processOLError(KafkaConsumerRecord<String, String> value,
private void processOLError(KafkaConsumerRecord<String, String> consumerRecord,
MarcBibUpdate instanceEvent,
Promise<String> promise,
HashMap<String, String> eventPayload,
String jobId,
AsyncResult<Instance> ar) {
int currentRetryNumber = Optional.ofNullable(eventPayload.get(CURRENT_RETRY_NUMBER))
.map(Integer::parseInt).orElse(0);
if (currentRetryNumber < MAX_RETRIES_COUNT) {
eventPayload.put(CURRENT_RETRY_NUMBER, String.valueOf(currentRetryNumber + 1));
LOGGER.warn("Error updating Instance: {}, jobId: {}, Retry MarcBibUpdateKafkaHandler handler...",
ar.cause().getMessage(), jobId);
handle(value).onComplete(result -> {
if (result.succeeded()) {
promise.complete(value.key());
} else {
promise.fail(result.cause());
}
});
} else {
eventPayload.remove(CURRENT_RETRY_NUMBER);
String errMessage = format("Current retry number %s exceeded given number %s for the Instance update", MAX_RETRIES_COUNT, currentRetryNumber);
LOGGER.error(errMessage);
promise.fail(new OptimisticLockingException(errMessage));
Map<String, String> eventPayload,
Map<String, String> headers) {
int retryNumber = Optional.ofNullable(eventPayload.get(CURRENT_RETRY_NUMBER))
.map(Integer::parseInt)
.orElse(0);
if (retryNumber < MAX_RETRIES_COUNT) {
eventPayload.put(CURRENT_RETRY_NUMBER, String.valueOf(retryNumber + 1));
LOGGER.warn("Optimistic Locking Error on updating Instance, jobId: {}, Retry Instance update", instanceEvent.getJobId());
VRohach marked this conversation as resolved.
Show resolved Hide resolved

processEvent(instanceEvent, headers, eventPayload)
.onComplete(ar -> processUpdateResult(ar, consumerRecord, instanceEvent, eventPayload, promise));
return;
}

eventPayload.remove(CURRENT_RETRY_NUMBER);
String errMessage = format("Optimistic Locking Error, current retry number: %s exceeded the given max retry attempt of %s for Instance update", retryNumber, MAX_RETRIES_COUNT);
LOGGER.error(errMessage);
promise.fail(errMessage);

var linkUpdateReport = mapToLinkReport(instanceEvent, errMessage);
sendEventToKafka(linkUpdateReport, consumerRecord.headers());
}

private void sendEventToKafka(LinkUpdateReport linkUpdateReport, List<KafkaHeader> kafkaHeaders) {
Expand Down Expand Up @@ -216,7 +220,7 @@ private static String formatTopicName(String env, String tenant, String eventTyp
return String.join(".", env, tenant, eventType);
}

private void ensureEventPayloadWithMappingMetadata(HashMap<String, String> eventPayload, MappingMetadataDto mappingMetadataDto) {
private void ensureEventPayloadWithMappingMetadata(Map<String, String> eventPayload, MappingMetadataDto mappingMetadataDto) {
eventPayload.put(MAPPING_RULES_KEY, mappingMetadataDto.getMappingRules());
eventPayload.put(MAPPING_PARAMS_KEY, mappingMetadataDto.getMappingParams());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public Future<Instance> handle(Map<String, String> eventPayload, Record marcReco
}
}

public Instance handleBlocking(Map<String, String> eventPayload, Record marcRecord, Context context) {
public Instance handleBlocking(Map<String, String> eventPayload, Record marcRecord, Context context) throws Exception {
logParametersUpdateDelegate(LOGGER, eventPayload, marcRecord, context);
try {
JsonObject mappingRules = new JsonObject(eventPayload.get(MAPPING_RULES_KEY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@

public interface SynchronousCollection<T> {

T findByIdAndUpdate(String id, JsonObject entity, Context context);
T findByIdAndUpdate(String id, JsonObject entity, Context context) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.folio.inventory.common.Context;
import org.folio.inventory.common.domain.Failure;
import org.folio.inventory.common.domain.Success;
import org.folio.inventory.dataimport.exceptions.OptimisticLockingException;
import org.folio.inventory.domain.BatchResult;
import org.folio.inventory.domain.Metadata;
import org.folio.inventory.domain.instances.Instance;
Expand Down Expand Up @@ -128,40 +129,40 @@ private boolean isBatchResponse(Response response) {
}

@Override
public Instance findByIdAndUpdate(String id, JsonObject instance, Context inventoryContext) {
try {
SynchronousHttpClient client = getSynchronousHttpClient(inventoryContext);
var url = individualRecordLocation(id);
var response = client.get(url);
var responseBody = response.getBody();

if (response.getStatusCode() == HttpStatus.SC_NOT_FOUND) {
LOGGER.warn("Instance not found by id - {} : {}", id, responseBody);
throw new NotFoundException(format("Instance not found by id - %s : %s", id, responseBody));
} else if (response.getStatusCode() != HttpStatus.SC_OK) {
LOGGER.warn("Failed to fetch Instance by id - {} : {}, {}",
id, responseBody, response.getStatusCode());
throw new ExternalResourceFetchException("Failed to fetch Instance record",
responseBody, response.getStatusCode(), null);
}
public Instance findByIdAndUpdate(String id, JsonObject instance, Context inventoryContext) throws Exception {
SynchronousHttpClient client = getSynchronousHttpClient(inventoryContext);
var url = individualRecordLocation(id);
var response = client.get(url);
var responseBody = response.getBody();

if (response.getStatusCode() == HttpStatus.SC_NOT_FOUND) {
LOGGER.warn("Instance not found by id - {} : {}", id, responseBody);
throw new NotFoundException(format("Instance not found by id - %s : %s", id, responseBody));
} else if (response.getStatusCode() != HttpStatus.SC_OK) {
LOGGER.warn("Failed to fetch Instance by id - {} : {}, {}",
id, responseBody, response.getStatusCode());
throw new ExternalResourceFetchException("Failed to fetch Instance record",
responseBody, response.getStatusCode(), null);
}

var jsonInstance = new JsonObject(responseBody);
var existingInstance = mapFromJson(jsonInstance);
var modified = modifyInstance(existingInstance, instance);
var modifiedInstance = mapToRequest(modified);

response = client.put(url, modifiedInstance);
var statusCode = response.getStatusCode();
if (statusCode != HttpStatus.SC_NO_CONTENT) {
var errorMessage = format("Failed to update Instance by id : %s, error : %s, status code %s",
id, response.getBody(), response.getStatusCode());
LOGGER.error(errorMessage);

var jsonInstance = new JsonObject(responseBody);
var existingInstance = mapFromJson(jsonInstance);
var modified = modifyInstance(existingInstance, instance);
var modifiedInstance = mapToRequest(modified);

response = client.put(url, modifiedInstance);
if (response.getStatusCode() != 204) {
var errorMessage = format("Failed to update Instance by id - %s : %s, %s",
id, response.getBody(), response.getStatusCode());
LOGGER.warn(errorMessage);
throw new InternalServerErrorException(errorMessage);
if (statusCode == HttpStatus.SC_CONFLICT) {
throw new OptimisticLockingException(errorMessage);
}
return modified;
} catch (Exception ex) {
throw new InternalServerErrorException(
format("Failed to find and update Instance by id - %s : %s", id, ex));
throw new InternalServerErrorException(errorMessage);
}
return modified;
}

private Instance modifyInstance(Instance existingInstance, JsonObject instance) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

Expand All @@ -32,6 +31,7 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import lombok.SneakyThrows;
import net.mguenther.kafka.junit.EmbeddedKafkaCluster;
import net.mguenther.kafka.junit.ObserveKeyValues;
import net.mguenther.kafka.junit.ReadKeyValues;
Expand Down Expand Up @@ -116,6 +116,7 @@ public static void tearDownClass(TestContext context) {
}

@Before
@SneakyThrows
public void setUp() throws IOException {
JsonObject mappingRules = new JsonObject(TestUtil.readFileFromPath(MAPPING_RULES_PATH));
instance = Instance.fromJson(new JsonObject(TestUtil.readFileFromPath(INSTANCE_PATH)));
Expand Down Expand Up @@ -164,16 +165,13 @@ public void shouldReturnSucceededFutureWithObtainedRecordKey(TestContext context
context.assertTrue(ar.succeeded());
context.assertEquals(expectedKafkaRecordKey, ar.result());

context.verify(vo -> {
verify(mappingMetadataCache).getByRecordTypeBlocking(anyString(), any(Context.class), anyString());
verify(mockedStorage).getInstanceCollection(any());
verify(mockedInstanceCollection).findByIdAndUpdate(anyString(), any(), any());
});
context.verify(vo -> verify(vo, 1));
async.complete();
});
}

@Test
@SneakyThrows
public void shouldReturnSucceededFutureAfterHandlingOptimisticLockingError(TestContext context) {
// given
Async async = context.async();
Expand All @@ -195,9 +193,7 @@ public void shouldReturnSucceededFutureAfterHandlingOptimisticLockingError(TestC

// then
future.onComplete(ar -> {
verify(mappingMetadataCache, times(2)).getByRecordTypeBlocking(anyString(), any(Context.class), anyString());
verify(mockedStorage, times(2)).getInstanceCollection(any());
verify(mockedInstanceCollection, times(2)).findByIdAndUpdate(anyString(), any(), any());
verify(null, 2);

async.complete();
});
Expand Down Expand Up @@ -225,7 +221,7 @@ public void shouldReturnFailedFutureWhenMappingRulesNotFound(TestContext context
context.assertTrue(ar.failed());
context.assertTrue(ar.cause().getMessage().contains("MappingParameters and mapping rules snapshots were not found by jobId"));
verifyNoInteractions(mockedInstanceCollection);
verify(mappingMetadataCache).getByRecordTypeBlocking(anyString(), any(Context.class), anyString());
Mockito.verify(mappingMetadataCache).getByRecordTypeBlocking(anyString(), any(Context.class), anyString());
async.complete();
});
}
Expand Down Expand Up @@ -351,4 +347,11 @@ public void shouldSendFailedLinkReportEvent() throws InterruptedException {
Assert.assertEquals("Can't find Instance by id: " + record.getId(), report.getFailCause());
});
}

@SneakyThrows
private void verify(Void vo, int n) {
Mockito.verify(mappingMetadataCache, times(n)).getByRecordTypeBlocking(anyString(), any(Context.class), anyString());
Mockito.verify(mockedStorage, times(n)).getInstanceCollection(any());
Mockito.verify(mockedInstanceCollection, times(n)).findByIdAndUpdate(anyString(), any(), any());
}
}