Skip to content

Commit

Permalink
Release v20.2.11: MODINV-1125 - Fix handling optimistic locking behav…
Browse files Browse the repository at this point in the history
…ior for instance update when consuming Marc Bib update event (#808)

* MODINV-1125: Fix handling Optimistic Locking error and retry mechanism on Instance update (#799)

* [maven-release-plugin] prepare release v20.2.11

* [maven-release-plugin] prepare for next development iteration
  • Loading branch information
mukhiddin-yusuf authored Jan 28, 2025
1 parent 72adc24 commit ff08492
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 110 deletions.
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 20.2.11 2025-01-27
* [MODINV-1125](https://folio-org.atlassian.net/browse/MODINV-1125) Fix handling optimistic locking behavior for instance update when consuming Marc Bib update event

## 20.2.10 2025-01-16
* [MODINV-1136](https://folio-org.atlassian.net/browse/MODINV-1136) The "discoverySuppress" instance attribute is not created in database when instance created by default mapping profile

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>mod-inventory</artifactId>
<groupId>org.folio</groupId>
<version>20.2.11-SNAPSHOT</version>
<version>20.2.12-SNAPSHOT</version>
<licenses>
<license>
<name>Apache License 2.0</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
import static java.lang.String.format;
import static java.util.Objects.isNull;
import static org.folio.inventory.EntityLinksKafkaTopic.LINKS_STATS;
import static org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil.OKAPI_USER_ID;
import static org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil.constructContext;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.SUBFIELD_I;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.TAG_999;
import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_TYPE;
import static org.folio.kafka.KafkaHeaderUtils.kafkaHeadersToMap;
import static org.folio.rest.jaxrs.model.LinkUpdateReport.Status.FAIL;
import static org.folio.rest.jaxrs.model.LinkUpdateReport.Status.SUCCESS;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
Expand Down Expand Up @@ -33,12 +36,10 @@
import org.folio.inventory.dataimport.cache.MappingMetadataCache;
import org.folio.inventory.dataimport.exceptions.OptimisticLockingException;
import org.folio.inventory.dataimport.handlers.actions.InstanceUpdateDelegate;
import org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil;
import org.folio.inventory.dataimport.util.AdditionalFieldsUtil;
import org.folio.inventory.domain.instances.Instance;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaHeaderUtils;
import org.folio.kafka.SimpleKafkaProducerManager;
import org.folio.kafka.services.KafkaProducerRecordBuilder;
import org.folio.processing.exceptions.EventProcessingException;
Expand All @@ -55,8 +56,7 @@ public class MarcBibUpdateKafkaHandler implements AsyncRecordHandler<String, Str
private static final String MAPPING_RULES_KEY = "MAPPING_RULES";
private static final String MAPPING_PARAMS_KEY = "MAPPING_PARAMS";
private static final String CURRENT_RETRY_NUMBER = "CURRENT_RETRY_NUMBER";
private static final String OKAPI_USER_ID = "x-okapi-user-id";
private static final int MAX_RETRIES_COUNT = Integer.parseInt(System.getenv().getOrDefault("inventory.di.ol.retry.number", "1"));
private static final int MAX_RETRIES_COUNT = Integer.parseInt(System.getenv().getOrDefault("inventory.di.ol.retry.number", "3"));

private final InstanceUpdateDelegate instanceUpdateDelegate;
private final MappingMetadataCache mappingMetadataCache;
Expand All @@ -77,11 +77,9 @@ public MarcBibUpdateKafkaHandler(Vertx vertx, int maxDistributionNumber, KafkaCo
@Override
public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord) {
try {
Promise<Instance> promise = Promise.promise();
MarcBibUpdate instanceEvent = OBJECT_MAPPER.readValue(consumerRecord.value(), MarcBibUpdate.class);
Map<String, String> headersMap = KafkaHeaderUtils.kafkaHeadersToMap(consumerRecord.headers());
HashMap<String, String> metaDataPayload = new HashMap<>();
var jobId = instanceEvent.getJobId();
var instanceEvent = OBJECT_MAPPER.readValue(consumerRecord.value(), MarcBibUpdate.class);
var headers = kafkaHeadersToMap(consumerRecord.headers());
Map<String, String> metaDataPayload = new HashMap<>();

LOGGER.info("Event payload has been received with event type: {} by jobId: {}", instanceEvent.getType(), instanceEvent.getJobId());

Expand All @@ -90,51 +88,59 @@ public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord)
LOGGER.error(message);
return Future.failedFuture(message);
}
Context context = EventHandlingUtil.constructContext(instanceEvent.getTenant(), headersMap.get(OKAPI_TOKEN_HEADER), headersMap.get(OKAPI_URL_HEADER), headersMap.get(OKAPI_USER_ID));
Record marcBibRecord = instanceEvent.getRecord();

io.vertx.core.Context vertxContext = Vertx.currentContext();

if(vertxContext == null) {
return Future.failedFuture("handle:: operation must be executed by a Vertx thread");
}

vertxContext.owner().executeBlocking(
() -> {
var mappingMetadataDto =
mappingMetadataCache.getByRecordTypeBlocking(jobId, context, MARC_BIB_RECORD_TYPE)
.orElseThrow(() -> new EventProcessingException(format(MAPPING_METADATA_NOT_FOUND_MSG, jobId)));
ensureEventPayloadWithMappingMetadata(metaDataPayload, mappingMetadataDto);
return instanceUpdateDelegate.handleBlocking(metaDataPayload, marcBibRecord, context);
},
r -> {
if (r.failed()) {
LOGGER.warn("handle:: Error during instance update", r.cause());
promise.fail(r.cause());
} else {
LOGGER.debug("handle:: Instance update was successful");
promise.complete(r.result());
}
}
);

Promise<String> finalPromise = Promise.promise();
promise.future()
.onComplete(ar -> processUpdateResult(ar, metaDataPayload, finalPromise, consumerRecord, instanceEvent));
processEvent(instanceEvent, headers, metaDataPayload)
.onComplete(ar -> processUpdateResult(ar, consumerRecord, instanceEvent, metaDataPayload, finalPromise));
return finalPromise.future();
} catch (Exception ex) {
LOGGER.error(format("Failed to process kafka record from topic %s", consumerRecord.topic()), ex);
return Future.failedFuture(ex);
}
}

private Future<Instance> processEvent(MarcBibUpdate instanceEvent, Map<String, String> headers, Map<String, String> metaDataPayload) {
Context context = constructContext(instanceEvent.getTenant(), headers.get(OKAPI_TOKEN_HEADER), headers.get(OKAPI_URL_HEADER),
headers.get(OKAPI_USER_ID));
Record marcBibRecord = instanceEvent.getRecord();
var jobId = instanceEvent.getJobId();
Promise<Instance> promise = Promise.promise();
io.vertx.core.Context vertxContext = Vertx.currentContext();

if(vertxContext == null) {
return Future.failedFuture("handle:: operation must be executed by a Vertx thread");
}

vertxContext.owner().executeBlocking(
() -> {
var mappingMetadataDto =
mappingMetadataCache.getByRecordTypeBlocking(jobId, context, MARC_BIB_RECORD_TYPE)
.orElseThrow(() -> new EventProcessingException(format(MAPPING_METADATA_NOT_FOUND_MSG, jobId)));
ensureEventPayloadWithMappingMetadata(metaDataPayload, mappingMetadataDto);
return instanceUpdateDelegate.handleBlocking(metaDataPayload, marcBibRecord, context);
},
r -> {
if (r.failed()) {
LOGGER.warn("handle:: Error during instance update", r.cause());
promise.fail(r.cause());
} else {
LOGGER.debug("handle:: Instance update was successful");
promise.complete(r.result());
}
}
);

return promise.future();
}

private void processUpdateResult(AsyncResult<Instance> result,
HashMap<String, String> eventPayload,
Promise<String> promise,
KafkaConsumerRecord<String, String> consumerRecord,
MarcBibUpdate instanceEvent) {
MarcBibUpdate instanceEvent,
Map<String, String> eventPayload,
Promise<String> promise) {
if (result.failed() && result.cause() instanceof OptimisticLockingException) {
processOLError(consumerRecord, promise, eventPayload, instanceEvent.getJobId(), result);
var headers = kafkaHeadersToMap(consumerRecord.headers());
processOLError(consumerRecord, instanceEvent, promise, eventPayload, headers);
return;
}

Expand All @@ -145,38 +151,37 @@ private void processUpdateResult(AsyncResult<Instance> result,
} else {
var errorCause = result.cause();
LOGGER.error("Failed to update instance by jobId {}:{}", instanceEvent.getJobId(), errorCause);
eventPayload.remove(CURRENT_RETRY_NUMBER);
linkUpdateReport = mapToLinkReport(instanceEvent, errorCause.getMessage());
promise.fail(errorCause);
}

sendEventToKafka(linkUpdateReport, consumerRecord.headers());
}

private void processOLError(KafkaConsumerRecord<String, String> value,
private void processOLError(KafkaConsumerRecord<String, String> consumerRecord,
MarcBibUpdate instanceEvent,
Promise<String> promise,
HashMap<String, String> eventPayload,
String jobId,
AsyncResult<Instance> ar) {
int currentRetryNumber = Optional.ofNullable(eventPayload.get(CURRENT_RETRY_NUMBER))
.map(Integer::parseInt).orElse(0);
if (currentRetryNumber < MAX_RETRIES_COUNT) {
eventPayload.put(CURRENT_RETRY_NUMBER, String.valueOf(currentRetryNumber + 1));
LOGGER.warn("Error updating Instance: {}, jobId: {}, Retry MarcBibUpdateKafkaHandler handler...",
ar.cause().getMessage(), jobId);
handle(value).onComplete(result -> {
if (result.succeeded()) {
promise.complete(value.key());
} else {
promise.fail(result.cause());
}
});
} else {
eventPayload.remove(CURRENT_RETRY_NUMBER);
String errMessage = format("Current retry number %s exceeded given number %s for the Instance update", MAX_RETRIES_COUNT, currentRetryNumber);
LOGGER.error(errMessage);
promise.fail(new OptimisticLockingException(errMessage));
Map<String, String> eventPayload,
Map<String, String> headers) {
int retryNumber = Optional.ofNullable(eventPayload.get(CURRENT_RETRY_NUMBER))
.map(Integer::parseInt)
.orElse(0);
if (retryNumber < MAX_RETRIES_COUNT) {
eventPayload.put(CURRENT_RETRY_NUMBER, String.valueOf(retryNumber + 1));
LOGGER.warn("Optimistic Locking Error on updating Instance, jobId: {}, Retry Instance update", instanceEvent.getJobId());

processEvent(instanceEvent, headers, eventPayload)
.onComplete(ar -> processUpdateResult(ar, consumerRecord, instanceEvent, eventPayload, promise));
return;
}

eventPayload.remove(CURRENT_RETRY_NUMBER);
String errMessage = format("Optimistic Locking Error, current retry number: %s exceeded the given max retry attempt of %s for Instance update", retryNumber, MAX_RETRIES_COUNT);
LOGGER.error(errMessage);
promise.fail(errMessage);

var linkUpdateReport = mapToLinkReport(instanceEvent, errMessage);
sendEventToKafka(linkUpdateReport, consumerRecord.headers());
}

private void sendEventToKafka(LinkUpdateReport linkUpdateReport, List<KafkaHeader> kafkaHeaders) {
Expand Down Expand Up @@ -214,7 +219,7 @@ private static String formatTopicName(String env, String tenant, String eventTyp
return String.join(".", env, tenant, eventType);
}

private void ensureEventPayloadWithMappingMetadata(HashMap<String, String> eventPayload, MappingMetadataDto mappingMetadataDto) {
private void ensureEventPayloadWithMappingMetadata(Map<String, String> eventPayload, MappingMetadataDto mappingMetadataDto) {
eventPayload.put(MAPPING_RULES_KEY, mappingMetadataDto.getMappingRules());
eventPayload.put(MAPPING_PARAMS_KEY, mappingMetadataDto.getMappingParams());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public Future<Instance> handle(Map<String, String> eventPayload, Record marcReco
}
}

public Instance handleBlocking(Map<String, String> eventPayload, Record marcRecord, Context context) {
public Instance handleBlocking(Map<String, String> eventPayload, Record marcRecord, Context context) throws Exception {
logParametersUpdateDelegate(LOGGER, eventPayload, marcRecord, context);
try {
JsonObject mappingRules = new JsonObject(eventPayload.get(MAPPING_RULES_KEY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.vertx.core.json.JsonObject;

public final class EventHandlingUtil {
public static final String OKAPI_USER_ID = "x-okapi-user-id";
private static final String CENTRAL_TENANT_ID = "CENTRAL_TENANT_ID";

private EventHandlingUtil() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@

public interface SynchronousCollection<T> {

T findByIdAndUpdate(String id, JsonObject entity, Context context);
T findByIdAndUpdate(String id, JsonObject entity, Context context) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.folio.inventory.common.Context;
import org.folio.inventory.common.domain.Failure;
import org.folio.inventory.common.domain.Success;
import org.folio.inventory.dataimport.exceptions.OptimisticLockingException;
import org.folio.inventory.domain.BatchResult;
import org.folio.inventory.domain.Metadata;
import org.folio.inventory.domain.instances.Instance;
Expand Down Expand Up @@ -126,40 +127,40 @@ private boolean isBatchResponse(Response response) {
}

@Override
public Instance findByIdAndUpdate(String id, JsonObject instance, Context inventoryContext) {
try {
SynchronousHttpClient client = getSynchronousHttpClient(inventoryContext);
var url = individualRecordLocation(id);
var response = client.get(url);
var responseBody = response.getBody();

if (response.getStatusCode() == HttpStatus.SC_NOT_FOUND) {
LOGGER.warn("Instance not found by id - {} : {}", id, responseBody);
throw new NotFoundException(format("Instance not found by id - %s : %s", id, responseBody));
} else if (response.getStatusCode() != HttpStatus.SC_OK) {
LOGGER.warn("Failed to fetch Instance by id - {} : {}, {}",
id, responseBody, response.getStatusCode());
throw new ExternalResourceFetchException("Failed to fetch Instance record",
responseBody, response.getStatusCode(), null);
}
public Instance findByIdAndUpdate(String id, JsonObject instance, Context inventoryContext) throws Exception {
SynchronousHttpClient client = getSynchronousHttpClient(inventoryContext);
var url = individualRecordLocation(id);
var response = client.get(url);
var responseBody = response.getBody();

if (response.getStatusCode() == HttpStatus.SC_NOT_FOUND) {
LOGGER.warn("Instance not found by id - {} : {}", id, responseBody);
throw new NotFoundException(format("Instance not found by id - %s : %s", id, responseBody));
} else if (response.getStatusCode() != HttpStatus.SC_OK) {
LOGGER.warn("Failed to fetch Instance by id - {} : {}, {}",
id, responseBody, response.getStatusCode());
throw new ExternalResourceFetchException("Failed to fetch Instance record",
responseBody, response.getStatusCode(), null);
}

var jsonInstance = new JsonObject(responseBody);
var existingInstance = mapFromJson(jsonInstance);
var modified = modifyInstance(existingInstance, instance);
var modifiedInstance = mapToRequest(modified);

response = client.put(url, modifiedInstance);
var statusCode = response.getStatusCode();
if (statusCode != HttpStatus.SC_NO_CONTENT) {
var errorMessage = format("Failed to update Instance by id : %s, error : %s, status code %s",
id, response.getBody(), response.getStatusCode());
LOGGER.error(errorMessage);

var jsonInstance = new JsonObject(responseBody);
var existingInstance = mapFromJson(jsonInstance);
var modified = modifyInstance(existingInstance, instance);
var modifiedInstance = mapToRequest(modified);

response = client.put(url, modifiedInstance);
if (response.getStatusCode() != 204) {
var errorMessage = format("Failed to update Instance by id - %s : %s, %s",
id, response.getBody(), response.getStatusCode());
LOGGER.warn(errorMessage);
throw new InternalServerErrorException(errorMessage);
if (statusCode == HttpStatus.SC_CONFLICT) {
throw new OptimisticLockingException(errorMessage);
}
return modified;
} catch (Exception ex) {
throw new InternalServerErrorException(
format("Failed to find and update Instance by id - %s : %s", id, ex));
throw new InternalServerErrorException(errorMessage);
}
return modified;
}

private Instance modifyInstance(Instance existingInstance, JsonObject instance) {
Expand Down
Loading

0 comments on commit ff08492

Please sign in to comment.