Skip to content

Commit

Permalink
MODINV-986: unit test + fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
PBobylev committed Jun 7, 2024
1 parent 9e3b8cd commit 4c8d7bf
Show file tree
Hide file tree
Showing 12 changed files with 491 additions and 93 deletions.
2 changes: 1 addition & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* Fix mod-inventory OOM issue [MODINV-1023](https://folio-org.atlassian.net/browse/MODINV-1023)
* Replace GET with POST request for fetching instances and holdings on /items endpoint to omit 414 error [MODINV-943](https://folio-org.atlassian.net/browse/MODINV-943)
* Requires `holdings-storage 2.0 3.0 4.0 5.0 6.0 7.0`
* InstanceIngress events consumption [MODINV-986](https://folio-org.atlassian.net/browse/MODINV-986)
* InstanceIngress create events consumption [MODINV-986](https://folio-org.atlassian.net/browse/MODINV-986)

## 20.2.0 2023-03-20
* Inventory cannot process Holdings with virtual fields ([MODINV-941](https://issues.folio.org/browse/MODINV-941))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public CompletableFuture<DataImportEventPayload> handle(DataImportEventPayload d
return future;
}

private String getInstanceId(Record record) {
protected String getInstanceId(Record record) {
String subfield999ffi = ParsedRecordUtil.getAdditionalSubfieldValue(record.getParsedRecord(), ParsedRecordUtil.AdditionalSubfields.I);
return isEmpty(subfield999ffi) ? UUID.randomUUID().toString() : subfield999ffi;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public final class AdditionalFieldsUtil {
private static final char TAG_035_IND = ' ';
private static final String ANY_STRING = "*";
private static final char INDICATOR = 'f';
public static final char SUBFIELD_B = 'b';
public static final char SUBFIELD_I = 'i';
private static final String HR_ID_FIELD = "hrid";
private static final CacheLoader<String, org.marc4j.marc.Record> parsedRecordContentCacheLoader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

import static java.lang.String.format;
import static java.util.Objects.isNull;
import static java.util.Optional.ofNullable;
import static org.apache.logging.log4j.LogManager.getLogger;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.SUBFIELD_B;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.TAG_035;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.TAG_035_SUB;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.TAG_999;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.reorderMarcRecordFields;
import static org.folio.inventory.dataimport.util.MappingConstants.INSTANCE_REQUIRED_FIELDS;
import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_FORMAT;
Expand All @@ -19,9 +23,10 @@
import io.vertx.core.json.JsonObject;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.HttpStatus;
import org.folio.MappingMetadataDto;
Expand All @@ -37,20 +42,24 @@
import org.folio.inventory.instanceingress.InstanceIngressEventConsumer;
import org.folio.inventory.services.IdStorageService;
import org.folio.inventory.storage.Storage;
import org.folio.kafka.exception.DuplicateEventException;
import org.folio.processing.exceptions.EventProcessingException;
import org.folio.processing.mapping.defaultmapper.RecordMapper;
import org.folio.processing.mapping.defaultmapper.RecordMapperBuilder;
import org.folio.processing.mapping.defaultmapper.processor.parameters.MappingParameters;
import org.folio.rest.jaxrs.model.InstanceIngressEvent;
import org.folio.rest.jaxrs.model.InstanceIngressPayload;
import org.folio.rest.jaxrs.model.ParsedRecord;
import org.folio.rest.jaxrs.model.RawRecord;
import org.folio.rest.jaxrs.model.Record;
import org.folio.rest.jaxrs.model.Snapshot;

public class CreateInstanceIngressEventHandler extends CreateInstanceEventHandler implements InstanceIngressEventHandler {

private static final Logger LOGGER = LogManager.getLogger(CreateInstanceIngressEventHandler.class);
public static final String BIBFRAME_ID = "bibframeId";
private static final Logger LOGGER = getLogger(CreateInstanceIngressEventHandler.class);
private static final String BIBFRAME = "(bibframe) ";
private static final String FAILURE = "Failed to process InstanceIngressEvent with id {}";
private final Context context;
private final InstanceCollection instanceCollection;

Expand All @@ -66,76 +75,91 @@ public CreateInstanceIngressEventHandler(PrecedingSucceedingTitlesHelper precedi
}

@Override
public CompletableFuture<InstanceIngressEvent> handle(InstanceIngressEvent event) {
public CompletableFuture<Instance> handle(InstanceIngressEvent event) {
try {
LOGGER.info("Processing InstanceIngressEvent with id '{}' for instance creation", event.getId());
var future = new CompletableFuture<Instance>();
if (eventContainsNoData(event)) {
var message = format("InstanceIngressEvent message does not contain required data to create Instance for eventId: '%s'", event.getId());
LOGGER.error(message);
return CompletableFuture.failedFuture(new EventProcessingException(message));
}
super.getMappingMetadataCache().getByRecordType(InstanceIngressEventConsumer.class.getSimpleName(), context, MARC_BIB_RECORD_TYPE)
.map(metadataOptional -> metadataOptional.orElseThrow(() -> new EventProcessingException("MappingMetadata was not found for marc-bib record type")))
.compose(mappingMetadataDto -> prepareAndExecuteMapping(mappingMetadataDto, event))
.compose(instance -> validateInstance(instance, event))
.compose(instance -> saveInstance(instance, event));

return CompletableFuture.completedFuture(event);
var targetRecord = constructMarcBibRecord(event.getEventPayload());
var instanceId = ofNullable(event.getEventPayload().getSourceRecordIdentifier()).orElseGet(() -> getInstanceId(targetRecord));
idStorageService.store(targetRecord.getId(), instanceId, context.getTenantId())
.compose(res -> super.getMappingMetadataCache().getByRecordType(InstanceIngressEventConsumer.class.getSimpleName(), context, MARC_BIB_RECORD_TYPE))
.compose(metadataOptional -> metadataOptional.map(metadata -> prepareAndExecuteMapping(metadata, targetRecord, event, instanceId))
.orElseGet(() -> Future.failedFuture("MappingMetadata was not found for marc-bib record type")))
.compose(instance -> validateInstance(instance, event))
.compose(instance -> saveInstance(instance, event))
.onFailure(e -> {
if (!(e instanceof DuplicateEventException)) {
LOGGER.error(FAILURE, event.getId(), e);
}
future.completeExceptionally(e);
})
.onComplete(ar -> future.complete(ar.result()));
return future;
} catch (Exception e) {
LOGGER.error("Failed to process InstanceIngressEvent with id {}", event.getId(), e);
LOGGER.error(FAILURE, event.getId(), e);
return CompletableFuture.failedFuture(e);
}
}

private static boolean eventContainsNoData(InstanceIngressEvent event) {
return isNull(event.getEventPayload()) || isNull(event.getEventPayload().getSourceRecordObject());
private boolean eventContainsNoData(InstanceIngressEvent event) {
return isNull(event.getEventPayload())
|| isNull(event.getEventPayload().getSourceRecordObject())
|| isNull(event.getEventPayload().getSourceType());
}

private Future<org.folio.Instance> prepareAndExecuteMapping(MappingMetadataDto mappingMetadata, InstanceIngressEvent event) {
private Future<org.folio.Instance> prepareAndExecuteMapping(MappingMetadataDto mappingMetadata,
Record targetRecord,
InstanceIngressEvent event,
String instanceId) {
return postSnapshotInSrsAndHandleResponse(event.getId())
.compose(snapshot -> {
try {
var marcBibRecord = constructMarcBibRecord(event);

LOGGER.info("Manipulating fields of a Record from InstanceIngressEvent with id '{}'", event.getId());
var mappingParameters = Json.decodeValue(mappingMetadata.getMappingParams(), MappingParameters.class);
AdditionalFieldsUtil.updateLatestTransactionDate(marcBibRecord, mappingParameters);
AdditionalFieldsUtil.move001To035(marcBibRecord);
AdditionalFieldsUtil.normalize035(marcBibRecord);
AdditionalFieldsUtil.addFieldToMarcRecord(marcBibRecord, TAG_035, TAG_035_SUB,
BIBFRAME + event.getEventPayload().getSourceRecordIdentifier());
AdditionalFieldsUtil.updateLatestTransactionDate(targetRecord, mappingParameters);
AdditionalFieldsUtil.move001To035(targetRecord);
AdditionalFieldsUtil.normalize035(targetRecord);
if (event.getEventPayload().getAdditionalProperties().containsKey(BIBFRAME_ID)) {
AdditionalFieldsUtil.addFieldToMarcRecord(targetRecord, TAG_035, TAG_035_SUB,
BIBFRAME + event.getEventPayload().getAdditionalProperties().get(BIBFRAME_ID));
}

LOGGER.info("Mapping a Record from InstanceIngressEvent with id '{}' into an Instance", event.getId());
var parsedRecord = new JsonObject((String) marcBibRecord.getParsedRecord().getContent());
var parsedRecord = new JsonObject((String) targetRecord.getParsedRecord().getContent());
RecordMapper<org.folio.Instance> recordMapper = RecordMapperBuilder.buildMapper(MARC_BIB_RECORD_FORMAT);
var instance = recordMapper.mapRecord(parsedRecord, mappingParameters, new JsonObject(mappingMetadata.getMappingRules()));
instance.setId(instanceId);
instance.setSource(event.getEventPayload().getSourceType().value());
LOGGER.info("Mapped Instance from InstanceIngressEvent with id '{}': {}", event.getId(), instance);
return super.idStorageService.store(marcBibRecord.getId(), instance.getId(), context.getTenantId())
.map(r -> instance)
.onFailure(e -> LOGGER.error("Error creating relationship of inventory recordId '{} and instanceId '{}'", event.getId(), instance.getId()));
return Future.succeededFuture(instance);
} catch (Exception e) {
LOGGER.warn("Error during preparing and executing mapping:", e);
return Future.failedFuture(e);
}
});
}

private Record constructMarcBibRecord(InstanceIngressEvent event) {
LOGGER.info("Constructing a Record from InstanceIngressEvent with id '{}'", event.getId());
private Record constructMarcBibRecord(InstanceIngressPayload eventPayload) {
var recordId = UUID.randomUUID().toString();
var marcBibRecord = new org.folio.rest.jaxrs.model.Record()
.withId(event.getId())
.withId(recordId)
.withRecordType(MARC_BIB)
.withSnapshotId(event.getId())
.withSnapshotId(recordId)
.withRawRecord(new RawRecord()
.withId(event.getId())
.withContent(event.getEventPayload().getSourceRecordObject())
.withId(recordId)
.withContent(eventPayload.getSourceRecordObject())
)
.withParsedRecord(new ParsedRecord()
.withId(event.getId())
.withContent(event.getEventPayload().getSourceRecordObject())
.withId(recordId)
.withContent(eventPayload.getSourceRecordObject())
);
event.getEventPayload()
eventPayload
.withAdditionalProperty(MARC_BIBLIOGRAPHIC.value(), marcBibRecord);
return marcBibRecord;
}
Expand All @@ -149,40 +173,46 @@ private Future<Instance> validateInstance(org.folio.Instance instance, InstanceI
.orElseGet(() -> {
var mappedInstance = Instance.fromJson(instanceAsJson);
var uuidErrors = ValidationUtil.validateUUIDs(mappedInstance);
return failIfErrors(uuidErrors, event.getId())
.orElseGet(() -> Future.succeededFuture(mappedInstance));
return failIfErrors(uuidErrors, event.getId()).orElseGet(() -> Future.succeededFuture(mappedInstance));
});
} catch (Exception e) {
return Future.failedFuture(e);
}
}

private static Optional<Future<Instance>> failIfErrors(List<String> errors, String eventId) {
private Optional<Future<Instance>> failIfErrors(List<String> errors, String eventId) {
if (errors.isEmpty()) {
return Optional.empty();
}
var msg = format("Mapped Instance is invalid: %s, from InstanceIngressEvent with id '%s' ", errors, eventId);
var msg = format("Mapped Instance is invalid: %s, from InstanceIngressEvent with id '%s'", errors, eventId);
LOGGER.warn(msg);
return Optional.of(Future.failedFuture(msg));
}

private Future<Void> saveInstance(Instance instance, InstanceIngressEvent event) {
private Future<Instance> saveInstance(Instance instance, InstanceIngressEvent event) {
LOGGER.info("Saving Instance from InstanceIngressEvent with id '{}': {}", event.getId(), instance);
var targetRecord = (Record) event.getEventPayload().getAdditionalProperties().get(MARC_BIBLIOGRAPHIC.value());
var sourceContent = targetRecord.getParsedRecord().getContent().toString();
super.addInstance(instance, instanceCollection)
return super.addInstance(instance, instanceCollection)
.compose(createdInstance -> getPrecedingSucceedingTitlesHelper().createPrecedingSucceedingTitles(instance, context).map(createdInstance))
.compose(createdInstance -> executeFieldsManipulation(createdInstance, targetRecord))
.compose(createdInstance -> executeFieldsManipulation(createdInstance, targetRecord, event.getEventPayload().getAdditionalProperties()))
.compose(createdInstance -> {
var targetContent = targetRecord.getParsedRecord().getContent().toString();
var content = reorderMarcRecordFields(sourceContent, targetContent);
targetRecord.setParsedRecord(targetRecord.getParsedRecord().withContent(content));
return saveRecordInSrsAndHandleResponse(event, targetRecord, createdInstance);
});
return Future.succeededFuture();
}

protected Future<Instance> saveRecordInSrsAndHandleResponse(InstanceIngressEvent event, Record srcRecord, Instance instance) {
private Future<Instance> executeFieldsManipulation(Instance instance, Record srcRecord,
Map<String, Object> eventProperties) {
if (eventProperties.containsKey(BIBFRAME_ID)) {
AdditionalFieldsUtil.addFieldToMarcRecord(srcRecord, TAG_999, SUBFIELD_B, (String) eventProperties.get(BIBFRAME_ID));
}
return super.executeFieldsManipulation(instance, srcRecord);
}

private Future<Instance> saveRecordInSrsAndHandleResponse(InstanceIngressEvent event, Record srcRecord, Instance instance) {
LOGGER.info("Saving record in SRS and handling a response for an Instance with id '{}':", instance.getId());
Promise<Instance> promise = Promise.promise();
getSourceStorageRecordsClient(context.getOkapiLocation(), context.getToken(), context.getTenantId())
Expand All @@ -209,7 +239,8 @@ private Future<Snapshot> postSnapshotInSrsAndHandleResponse(String id) {
.withJobExecutionId(id)
.withProcessingStartedDate(new Date())
.withStatus(PROCESSING_IN_PROGRESS);
return super.postSnapshotInSrsAndHandleResponse(context.getOkapiLocation(), context.getToken(), snapshot, context.getTenantId());
return super.postSnapshotInSrsAndHandleResponse(context.getOkapiLocation(),
context.getToken(), snapshot, context.getTenantId());
}

}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package org.folio.inventory.instanceingress.handler;

import java.util.concurrent.CompletableFuture;
import org.folio.inventory.domain.instances.Instance;
import org.folio.rest.jaxrs.model.InstanceIngressEvent;
import org.folio.rest.jaxrs.model.InstanceIngressPayload;

public interface InstanceIngressEventHandler {

CompletableFuture<InstanceIngressEvent> handle(InstanceIngressEvent instanceIngressEvent);
CompletableFuture<Instance> handle(InstanceIngressEvent instanceIngressEvent);

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package org.folio.inventory.instanceingress.handler;

import java.util.concurrent.CompletableFuture;
import org.folio.inventory.domain.instances.Instance;
import org.folio.rest.jaxrs.model.InstanceIngressEvent;

public class InstanceIngressUpdateEventHandler implements InstanceIngressEventHandler {

@Override
public CompletableFuture<InstanceIngressEvent> handle(InstanceIngressEvent instanceIngressEvent) {
public CompletableFuture<Instance> handle(InstanceIngressEvent instanceIngressEvent) {
// to be implemented in MODINV-1008
return CompletableFuture.failedFuture(new UnsupportedOperationException());
}
Expand Down
7 changes: 7 additions & 0 deletions src/test/java/org/folio/inventory/TestUtil.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.folio.inventory;

import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.impl.HttpResponseImpl;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -12,4 +14,9 @@ public final class TestUtil {
public static String readFileFromPath(String path) throws IOException {
return Files.readString(Path.of(path));
}

public static HttpResponseImpl<Buffer> buildHttpResponseWithBuffer(Buffer buffer, int httpStatus) {
return new HttpResponseImpl<>(null, httpStatus, "",
null, null, null, buffer, null);
}
}
Loading

0 comments on commit 4c8d7bf

Please sign in to comment.