diff --git a/mod-source-record-storage-server/pom.xml b/mod-source-record-storage-server/pom.xml index bb19aefa8..9f7cc45e8 100644 --- a/mod-source-record-storage-server/pom.xml +++ b/mod-source-record-storage-server/pom.xml @@ -179,6 +179,12 @@ 5.1.1 test + + org.apache.commons + commons-csv + 1.10.0 + test + org.folio data-import-processing-core @@ -280,7 +286,7 @@ 1.18.3 ${project.parent.basedir} ${project.parent.basedir}/ramls - 3.16.19 + 3.16.23 6.5.5 1.9.19 42.7.2 @@ -488,12 +494,23 @@ org.jooq.meta.postgres.PostgresDatabase .* - databasechangelog|databasechangeloglock|marc_indexers.* + databasechangelog|databasechangeloglock|marc_indexers_.* public true false - + + + io.vertx.core.json.JsonObject + upsert_marc_record.content + io.github.jklingsporn.vertx.jooq.shared.postgres.JSONBToJsonObjectConverter + + + io.vertx.core.json.JsonObject + update_marc_record.new_content + io.github.jklingsporn.vertx.jooq.shared.postgres.JSONBToJsonObjectConverter + + true diff --git a/mod-source-record-storage-server/src/main/java/io/github/jklingsporn/vertx/jooq/classic/reactivepg/CustomReactiveQueryExecutor.java b/mod-source-record-storage-server/src/main/java/io/github/jklingsporn/vertx/jooq/classic/reactivepg/CustomReactiveQueryExecutor.java new file mode 100644 index 000000000..1283b98ce --- /dev/null +++ b/mod-source-record-storage-server/src/main/java/io/github/jklingsporn/vertx/jooq/classic/reactivepg/CustomReactiveQueryExecutor.java @@ -0,0 +1,72 @@ +package io.github.jklingsporn.vertx.jooq.classic.reactivepg; + + +import io.vertx.core.Future; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.RowSet; +import io.vertx.sqlclient.SqlClient; +import io.vertx.sqlclient.Transaction; +import org.jooq.Configuration; +import org.jooq.Query; +import org.jooq.SQLDialect; +import org.jooq.conf.ParamType; + +import java.util.function.Function; + +/** + * This class was moved to this package so that we can access the constructor that accepts a transaction. The constructor + * is only accessible from this package. + */ +public class CustomReactiveQueryExecutor extends ReactiveClassicGenericQueryExecutor { + private final String tenantId; + private static final String pattern = "(? newInstance(SqlClient connection) { + return transaction -> new CustomReactiveQueryExecutor(configuration(), connection, transaction, tenantId); + } + + @SuppressWarnings("unchecked") + public Future customTransaction(Function> transaction){ + return this.transaction((Function>) transaction); + } + + /** + * This method was copied from the super class. The only difference is the pattern used to replace characters + * in the named query. The pattern in the super class did not handle some cases. + */ + @Override + public String toPreparedQuery(Query query) { + if (SQLDialect.POSTGRES.supports(configuration().dialect())) { + String namedQuery = query.getSQL(ParamType.NAMED); + return namedQuery.replaceAll(pattern, "\\$"); + } + // mysql works with the standard string + return query.getSQL(); + } + + + + /** + * This is a hack to expose the underlying vertx sql client because vertx-jooq does not support batch operations. + */ + public Future> getDelegate(Function>> delegateFunction){ + return delegateFunction.apply(delegate); + } +} diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/PostgresClientFactory.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/PostgresClientFactory.java index 6c7017e82..39fa0b50e 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/PostgresClientFactory.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/PostgresClientFactory.java @@ -2,6 +2,7 @@ import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; +import io.github.jklingsporn.vertx.jooq.classic.reactivepg.CustomReactiveQueryExecutor; import io.github.jklingsporn.vertx.jooq.classic.reactivepg.ReactiveClassicGenericQueryExecutor; import io.vertx.core.json.JsonObject; import io.vertx.pgclient.PgConnectOptions; @@ -72,7 +73,7 @@ public class PostgresClientFactory { private final Vertx vertx; - private static Class reactiveClassicGenericQueryExecutorProxyClass; + private static Class reactiveClassicGenericQueryExecutorProxyClass; @Value("${srs.db.reactive.numRetries:3}") private Integer numOfRetries; @@ -123,13 +124,13 @@ public void close() { * @param tenantId tenant id * @return reactive query executor */ - public ReactiveClassicGenericQueryExecutor getQueryExecutor(String tenantId) { + public CustomReactiveQueryExecutor getQueryExecutor(String tenantId) { if (reactiveClassicGenericQueryExecutorProxyClass == null) setupProxyExecutorClass(); - ReactiveClassicGenericQueryExecutor queryExecutorProxy; + CustomReactiveQueryExecutor queryExecutorProxy; try { queryExecutorProxy = reactiveClassicGenericQueryExecutorProxyClass - .getDeclaredConstructor(Configuration.class, SqlClient.class) - .newInstance(configuration, getCachedPool(this.vertx, tenantId).getDelegate()); + .getDeclaredConstructor(Configuration.class, SqlClient.class, String.class) + .newInstance(configuration, getCachedPool(this.vertx, tenantId).getDelegate(), tenantId); } catch (Exception e) { throw new RuntimeException("Something happened while creating proxied reactiveClassicGenericQueryExecutor", e); } diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/QueryExecutorInterceptor.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/QueryExecutorInterceptor.java index f671af329..62192975b 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/QueryExecutorInterceptor.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/QueryExecutorInterceptor.java @@ -1,6 +1,6 @@ package org.folio.dao; -import io.github.jklingsporn.vertx.jooq.classic.reactivepg.ReactiveClassicGenericQueryExecutor; +import io.github.jklingsporn.vertx.jooq.classic.reactivepg.CustomReactiveQueryExecutor; import io.vertx.core.Context; import io.vertx.core.Future; import io.vertx.core.Promise; @@ -56,13 +56,15 @@ public static void setRetryDelay(long delay) { * * @return the generated subclass */ - public static Class generateClass() { + public static Class generateClass() { return new ByteBuddy() - .subclass(ReactiveClassicGenericQueryExecutor.class) + .subclass(CustomReactiveQueryExecutor.class) .constructor(ElementMatchers.any()) // Match all constructors .intercept(SuperMethodCall.INSTANCE) // Call the original constructor .method(ElementMatchers.named("transaction")) // For transaction method .intercept(MethodDelegation.to(QueryExecutorInterceptor.class)) + .method(ElementMatchers.named("customTransaction")) // For custom transaction method + .intercept(MethodDelegation.to(QueryExecutorInterceptor.class)) .method(ElementMatchers.named("query")) // For query method .intercept(MethodDelegation.to(QueryExecutorInterceptor.class)) .make() @@ -80,7 +82,7 @@ public static Class generateClass public static Future query( @net.bytebuddy.implementation.bind.annotation.SuperCall Callable> superCall ) { - LOGGER.trace("query method of ReactiveClassicGenericQueryExecutor proxied"); + LOGGER.trace("query method of CustomReactiveQueryExecutor proxied"); return retryOf(() -> { try { return superCall.call(); @@ -101,7 +103,28 @@ public static Future query( public static Future transaction( @net.bytebuddy.implementation.bind.annotation.SuperCall Callable> superCall ) { - LOGGER.trace("transaction method of ReactiveClassicGenericQueryExecutor proxied"); + LOGGER.trace("transaction method of CustomReactiveQueryExecutor proxied"); + return retryOf(() -> { + try { + return superCall.call(); + } catch (Throwable e) { + LOGGER.error("Something happened while attempting to make proxied call for transaction method", e); + return Future.failedFuture(e); + } + }, numRetries); + } + + /** + * Interceptor for the transaction method, with retry functionality. + * + * @param superCall the original method call + * @return the result of the transaction operation + */ + @SuppressWarnings("unused") + public static Future customTransaction( + @net.bytebuddy.implementation.bind.annotation.SuperCall Callable> superCall + ) { + LOGGER.trace("customTransaction method of CustomReactiveQueryExecutor proxied"); return retryOf(() -> { try { return superCall.call(); diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java index 49412d6ee..468ac6ae4 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java @@ -5,6 +5,7 @@ import java.util.Optional; import java.util.function.Function; +import io.github.jklingsporn.vertx.jooq.classic.reactivepg.CustomReactiveQueryExecutor; import io.vertx.sqlclient.Row; import org.folio.dao.util.IdType; import org.folio.dao.util.RecordType; @@ -194,13 +195,13 @@ Future getMatchedRecordsIdentifiers(MatchField mat Future saveRecord(Record record, String tenantId); /** - * Saves {@link Record} to the db using {@link ReactiveClassicGenericQueryExecutor} + * Saves {@link Record} to the db using {@link CustomReactiveQueryExecutor} * * @param txQE query executor * @param record Record to save * @return future with saved Record */ - Future saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record record); + Future saveRecord(CustomReactiveQueryExecutor txQE, Record record); /** * Saves {@link RecordCollection} to the db @@ -372,7 +373,7 @@ Future getMatchedRecordsIdentifiers(MatchField mat * @param oldRecord old Record that has to be marked as "old" * @return future with new "updated" Record */ - Future saveUpdatedRecord(ReactiveClassicGenericQueryExecutor txQE, Record newRecord, Record oldRecord); + Future saveUpdatedRecord(CustomReactiveQueryExecutor txQE, Record newRecord, Record oldRecord); /** * Change suppress from discovery flag for record by external relation id @@ -393,7 +394,7 @@ Future getMatchedRecordsIdentifiers(MatchField mat * @param tenantId tenant id * @return future with generic type */ - Future executeInTransaction(Function> action, String tenantId); + Future executeInTransaction(Function> action, String tenantId); /** * Search for non-existent mark bib ids in the system diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java index f13e773a4..14980a1e9 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java @@ -35,6 +35,7 @@ import static org.jooq.impl.DSL.trueCondition; import com.google.common.collect.Lists; +import io.github.jklingsporn.vertx.jooq.classic.reactivepg.CustomReactiveQueryExecutor; import io.github.jklingsporn.vertx.jooq.classic.reactivepg.ReactiveClassicGenericQueryExecutor; import io.github.jklingsporn.vertx.jooq.shared.internal.QueryResult; import io.reactivex.Flowable; @@ -66,6 +67,7 @@ import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.text.StrSubstitutor; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.folio.dao.util.ErrorRecordDaoUtil; @@ -223,8 +225,8 @@ public RecordDaoImpl(final PostgresClientFactory postgresClientFactory) { } @Override - public Future executeInTransaction(Function> action, String tenantId) { - return getQueryExecutor(tenantId).transaction(action); + public Future executeInTransaction(Function> action, String tenantId) { + return getQueryExecutor(tenantId).customTransaction(action); } @Override @@ -617,11 +619,11 @@ public Future> getRecordByCondition(ReactiveClassicGenericQuery @Override public Future saveRecord(Record record, String tenantId) { LOG.trace("saveRecord:: Saving {} record {} for tenant {}", record.getRecordType(), record.getId(), tenantId); - return getQueryExecutor(tenantId).transaction(txQE -> saveRecord(txQE, record)); + return getQueryExecutor(tenantId).customTransaction(txQE -> saveRecord(txQE, record)); } @Override - public Future saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record record) { + public Future saveRecord(CustomReactiveQueryExecutor txQE, Record record) { LOG.trace("saveRecord:: Saving {} record {}", record.getRecordType(), record.getId()); return insertOrUpdateRecord(txQE, record); } @@ -801,6 +803,11 @@ public Future saveRecords(RecordCollection recordCollectio .fieldsCorresponding() .execute(); + // update marc_indexers if record type is MARC_RECORDS_LB + ParsedRecordDaoUtil.updateMarcIndexersTableSync(dsl, + recordType, + dbParsedRecords.stream().collect(Collectors.toMap(Record2::value1, Record2::value2))); + if (!dbErrorRecords.isEmpty()) { // batch insert error records dsl.loadInto(ERROR_RECORDS_LB) @@ -843,7 +850,7 @@ public Future saveRecords(RecordCollection recordCollectio @Override public Future updateRecord(Record record, String tenantId) { LOG.trace("updateRecord:: Updating {} record {} for tenant {}", record.getRecordType(), record.getId(), tenantId); - return getQueryExecutor(tenantId).transaction(txQE -> getRecordById(txQE, record.getId()) + return getQueryExecutor(tenantId).customTransaction(txQE -> getRecordById(txQE, record.getId()) .compose(optionalRecord -> optionalRecord .map(r -> saveRecord(txQE, record)) .orElse(Future.failedFuture(new NotFoundException(format(RECORD_NOT_FOUND_TEMPLATE, record.getId())))))); @@ -953,7 +960,7 @@ public Future calculateGeneration(ReactiveClassicGenericQueryExecutor t @Override public Future updateParsedRecord(Record record, String tenantId) { LOG.trace("updateParsedRecord:: Updating {} record {} for tenant {}", record.getRecordType(), record.getId(), tenantId); - return getQueryExecutor(tenantId).transaction(txQE -> GenericCompositeFuture.all(Lists.newArrayList( + return getQueryExecutor(tenantId).customTransaction(txQE -> GenericCompositeFuture.all(Lists.newArrayList( updateExternalIdsForRecord(txQE, record), ParsedRecordDaoUtil.update(txQE, record.getParsedRecord(), ParsedRecordDaoUtil.toRecordType(record)) )).map(res -> record.getParsedRecord())); @@ -975,6 +982,7 @@ public Future updateParsedRecords(RecordCollection r List> recordUpdates = new ArrayList<>(); List> parsedRecordUpdates = new ArrayList<>(); + Map parsedMarcIndexersInput = new HashMap<>(); // used to insert marc_indexers Field prtId = field(name(ID), UUID.class); Field prtContent = field(name(CONTENT), JSONB.class); @@ -1047,13 +1055,16 @@ public Future updateParsedRecords(RecordCollection r try { RecordType recordType = toRecordType(record.getRecordType().name()); recordType.formatRecord(record); - + UUID id = UUID.fromString(record.getParsedRecord().getId()); + JSONB content = JSONB.valueOf(ParsedRecordDaoUtil.normalizeContent(record.getParsedRecord())); parsedRecordUpdates.add( DSL.update(table(name(recordType.getTableName()))) - .set(prtContent, JSONB.valueOf(ParsedRecordDaoUtil.normalizeContent(record.getParsedRecord()))) - .where(prtId.eq(UUID.fromString(record.getParsedRecord().getId()))) + .set(prtContent, content) + .where(prtId.eq(id)) ); + parsedMarcIndexersInput.put(record.getParsedRecord().getId(), content); + } catch (Exception e) { errorMessages.add(format(INVALID_PARSED_RECORD_MESSAGE_TEMPLATE, record.getId(), e.getMessage())); // if invalid parsed record, set id to null to filter out @@ -1095,6 +1106,15 @@ public Future updateParsedRecords(RecordCollection r } } + // update MARC_INDEXERS + Map parsedRecordMap = parsedRecordsUpdated.stream() + .map(rec -> Pair.of(UUID.fromString(rec.getId()), parsedMarcIndexersInput.get(rec.getId()))) + .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); + Optional recordTypeOptional = recordTypes.stream().findFirst(); + if (recordTypeOptional.isPresent()) { + ParsedRecordDaoUtil.updateMarcIndexersTableSync(dsl, toRecordType(recordTypeOptional.get()), parsedRecordMap); + } + blockingPromise.complete(new ParsedRecordsBatchResponse() .withErrorMessages(errorMessages) .withParsedRecords(parsedRecordsUpdated) @@ -1172,7 +1192,7 @@ private MarcBibCollection toMarcBibCollection(QueryResult result) { } @Override - public Future saveUpdatedRecord(ReactiveClassicGenericQueryExecutor txQE, Record newRecord, Record oldRecord) { + public Future saveUpdatedRecord(CustomReactiveQueryExecutor txQE, Record newRecord, Record oldRecord) { LOG.trace("saveUpdatedRecord:: Saving updated record {}", newRecord.getId()); return insertOrUpdateRecord(txQE, oldRecord).compose(r -> insertOrUpdateRecord(txQE, newRecord)); } @@ -1289,7 +1309,7 @@ public Future updateRecordsState(String matchedId, RecordState state, Reco public Future updateMarcAuthorityRecordsStateAsDeleted(String matchedId, String tenantId) { Condition condition = RECORDS_LB.MATCHED_ID.eq(UUID.fromString(matchedId)); - return getQueryExecutor(tenantId).transaction(txQE -> getRecords(condition, RecordType.MARC_AUTHORITY, new ArrayList<>(), 0, RECORDS_LIMIT, tenantId) + return getQueryExecutor(tenantId).customTransaction(txQE -> getRecords(condition, RecordType.MARC_AUTHORITY, new ArrayList<>(), 0, RECORDS_LIMIT, tenantId) .compose(recordCollection -> { List> futures = recordCollection.getRecords().stream() .map(recordToUpdate -> updateMarcAuthorityRecordWithDeletedState(txQE, ensureRecordForeignKeys(recordToUpdate))) @@ -1308,7 +1328,7 @@ public Future updateMarcAuthorityRecordsStateAsDeleted(String matchedId, S })); } - private Future updateMarcAuthorityRecordWithDeletedState(ReactiveClassicGenericQueryExecutor txQE, Record record) { + private Future updateMarcAuthorityRecordWithDeletedState(CustomReactiveQueryExecutor txQE, Record record) { record.withState(Record.State.DELETED); if (Objects.nonNull(record.getParsedRecord())) { record.getParsedRecord().setId(record.getId()); @@ -1323,7 +1343,7 @@ private Future updateMarcAuthorityRecordWithDeletedState(ReactiveClassic } - private ReactiveClassicGenericQueryExecutor getQueryExecutor(String tenantId) { + private CustomReactiveQueryExecutor getQueryExecutor(String tenantId) { return postgresClientFactory.getQueryExecutor(tenantId); } @@ -1369,7 +1389,7 @@ private Future lookupAssociatedRecords(ReactiveClassicGenericQueryExecut return GenericCompositeFuture.all(futures).map(res -> record); } - private Future insertOrUpdateRecord(ReactiveClassicGenericQueryExecutor txQE, Record record) { + private Future insertOrUpdateRecord(CustomReactiveQueryExecutor txQE, Record record) { return RawRecordDaoUtil.save(txQE, record.getRawRecord()) .compose(rawRecord -> { if (Objects.nonNull(record.getParsedRecord())) { @@ -1397,7 +1417,7 @@ private Future insertOrUpdateRecord(ReactiveClassicGenericQueryExecutor }); } - private Future insertOrUpdateParsedRecord(ReactiveClassicGenericQueryExecutor txQE, Record record) { + private Future insertOrUpdateParsedRecord(CustomReactiveQueryExecutor txQE, Record record) { try { LOG.trace("insertOrUpdateParsedRecord:: Inserting or updating {} parsed record", record.getRecordType()); // attempt to format record to validate diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java index 167160881..c25fa8f3e 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java @@ -1,31 +1,54 @@ package org.folio.dao.util; -import static java.lang.String.format; -import static org.jooq.impl.DSL.field; -import static org.jooq.impl.DSL.name; -import static org.jooq.impl.DSL.table; - -import java.util.Objects; -import java.util.Optional; -import java.util.UUID; - -import javax.ws.rs.NotFoundException; - +import io.github.jklingsporn.vertx.jooq.classic.reactivepg.CustomReactiveQueryExecutor; +import io.github.jklingsporn.vertx.jooq.classic.reactivepg.ReactiveClassicGenericQueryExecutor; +import io.github.jklingsporn.vertx.jooq.shared.postgres.JSONBToJsonObjectConverter; +import io.vertx.core.Future; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.Tuple; import org.apache.commons.lang3.StringUtils; import org.folio.rest.jaxrs.model.ErrorRecord; import org.folio.rest.jaxrs.model.ParsedRecord; import org.folio.rest.jaxrs.model.Record; +import org.folio.rest.jooq.routines.UpdateMarcRecord; +import org.folio.rest.jooq.routines.UpsertMarcRecord; import org.folio.rest.jooq.tables.records.EdifactRecordsLbRecord; +import org.folio.rest.jooq.tables.records.MarcIndexersRecord; import org.folio.rest.jooq.tables.records.MarcRecordsLbRecord; +import org.jooq.DSLContext; import org.jooq.Field; import org.jooq.JSONB; +import org.jooq.Record1; +import org.jooq.Table; +import org.jooq.TableField; +import org.jooq.exception.DataAccessException; +import org.jooq.impl.DSL; import org.jooq.impl.SQLDataType; +import org.jooq.impl.TableImpl; -import io.github.jklingsporn.vertx.jooq.classic.reactivepg.ReactiveClassicGenericQueryExecutor; -import io.github.jklingsporn.vertx.jooq.shared.postgres.JSONBToJsonObjectConverter; -import io.vertx.core.Future; -import io.vertx.core.json.JsonObject; -import io.vertx.sqlclient.Row; +import javax.ws.rs.NotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import static java.lang.String.format; +import static org.folio.rest.jooq.Tables.MARC_INDEXERS; +import static org.folio.rest.jooq.Tables.MARC_RECORDS_TRACKING; +import static org.jooq.impl.DSL.field; +import static org.jooq.impl.DSL.name; +import static org.jooq.impl.DSL.select; +import static org.jooq.impl.DSL.table; /** * Utility class for managing {@link ParsedRecord} @@ -37,12 +60,20 @@ public final class ParsedRecordDaoUtil { private static final String LEADER = "leader"; private static final int LEADER_STATUS_SUBFIELD_POSITION = 5; - private static final Field ID_FIELD = field(name(ID), UUID.class); - private static final Field CONTENT_FIELD = field(name(CONTENT), SQLDataType.JSONB.asConvertedDataType(new JSONBToJsonObjectConverter())); - + public static final Field ID_FIELD = field(name(ID), UUID.class); + public static final Field CONTENT_FIELD = field(name(CONTENT), SQLDataType.JSONB.asConvertedDataType(new JSONBToJsonObjectConverter())); + private static final Field MARC_ID_FIELD = field(name("marc_id"), UUID.class); public static final String PARSED_RECORD_NOT_FOUND_TEMPLATE = "Parsed Record with id '%s' was not found"; - public static final String PARSED_RECORD_CONTENT = "parsed_record_content"; + private static final MarcIndexersUpdatedIds UPDATE_MARC_INDEXERS_TEMP_TABLE = new MarcIndexersUpdatedIds(); + + public static class MarcIndexersUpdatedIds extends TableImpl> { + public final TableField, UUID> MARC_ID = createField(DSL.name("marc_id"), SQLDataType.UUID); + + private MarcIndexersUpdatedIds() { + super(DSL.name("marc_indexers_updated_ids")); + } + } private ParsedRecordDaoUtil() { } @@ -71,17 +102,25 @@ public static Future> findById(ReactiveClassicGenericQuer * @param recordType record type to save * @return future with updated ParsedRecord */ - public static Future save(ReactiveClassicGenericQueryExecutor queryExecutor, + public static Future save(CustomReactiveQueryExecutor queryExecutor, ParsedRecord parsedRecord, RecordType recordType) { UUID id = UUID.fromString(parsedRecord.getId()); JsonObject content = normalize(parsedRecord.getContent()); - return queryExecutor.executeAny(dsl -> dsl.insertInto(table(name(recordType.getTableName()))) - .set(ID_FIELD, id) - .set(CONTENT_FIELD, content) - .onConflict(ID_FIELD) - .doUpdate() - .set(CONTENT_FIELD, content) - .returning()) + UpsertMarcRecord upsertMarcRecord = new UpsertMarcRecord(); + upsertMarcRecord.setRecordId(id); + upsertMarcRecord.setContent(content); + return queryExecutor.executeAny(dsl -> dsl.select(upsertMarcRecord.asField())) + .compose(res -> { + Row row = res.iterator().next(); + if(row == null) { + return Future.failedFuture( + String.format("save:: a version was not returned upon upsert of a marc record marRecordId=%s", id)); + } + Integer version = row.getInteger(0); + return updateMarcIndexersTableAsync(queryExecutor, recordType, id, content, version) + .compose(ar -> Future.succeededFuture(res)); + } + ) .map(res -> parsedRecord .withContent(content.getMap())); } @@ -95,13 +134,24 @@ public static Future save(ReactiveClassicGenericQueryExecutor quer * @param recordType record type to update * @return future of updated ParsedRecord */ - public static Future update(ReactiveClassicGenericQueryExecutor queryExecutor, + public static Future update(CustomReactiveQueryExecutor queryExecutor, ParsedRecord parsedRecord, RecordType recordType) { UUID id = UUID.fromString(parsedRecord.getId()); JsonObject content = normalize(parsedRecord.getContent()); - return queryExecutor.executeAny(dsl -> dsl.update(table(name(recordType.getTableName()))) - .set(CONTENT_FIELD, content) - .where(ID_FIELD.eq(id))) + UpdateMarcRecord updateMarcRecord = new UpdateMarcRecord(); + updateMarcRecord.setRecordId(id); + updateMarcRecord.setNewContent(content); + return queryExecutor.executeAny(dsl -> dsl.select(updateMarcRecord.asField())) + .compose(res -> { + Row row = res.iterator().next(); + if (row != null) { + Integer version = row.getInteger(0); + return updateMarcIndexersTableAsync(queryExecutor, recordType, id, content, version) + .map(res); + } + return Future.succeededFuture(res); + } + ) .map(update -> { if (update.rowCount() > 0) { return parsedRecord @@ -112,6 +162,189 @@ public static Future update(ReactiveClassicGenericQueryExecutor qu }); } + /** + * Synchronously updates the MARC indexers table based on the parsed records provided. This method first validates + * if the provided record type matches 'MARC_BIB'. If not, it terminates without performing any operation. + * Otherwise, it seeds the temporary table with MARC record IDs, retrieves their versions and inserts new indexers + * based on the parsed records data. + * + * @param dsl The DSLContext instance used to create and execute SQL queries. + * @param recordType The type of the record to be updated. + * @param parsedRecords A map containing the UUIDs of the records as keys and the parsed JSONB content as values. + * + * @throws DataAccessException if the execution of any of the SQL queries fails. + */ + public static void updateMarcIndexersTableSync(DSLContext dsl, RecordType recordType, Map parsedRecords) throws IOException { + if (!recordType.getTableName().equals(RecordType.MARC_BIB.getTableName())) { + return; + } + + // Seed marc records identifiers before getting their versions + dsl.createTemporaryTableIfNotExists(UPDATE_MARC_INDEXERS_TEMP_TABLE) + .column(UPDATE_MARC_INDEXERS_TEMP_TABLE.MARC_ID) + .onCommitDrop() + .execute(); + + List> tempIds = parsedRecords.keySet().stream().map(k -> { + Record1 uuidRecord1 = dsl.newRecord(UPDATE_MARC_INDEXERS_TEMP_TABLE.MARC_ID); + uuidRecord1.set(UPDATE_MARC_INDEXERS_TEMP_TABLE.MARC_ID, k); + return uuidRecord1; + }).collect(Collectors.toList()); + + dsl.loadInto(UPDATE_MARC_INDEXERS_TEMP_TABLE) + .batchAfter(250) + .onErrorAbort() + .loadRecords(tempIds) + .fieldsCorresponding() + .execute(); + + // Get marc records versions + Table> subQuery = select(UPDATE_MARC_INDEXERS_TEMP_TABLE.MARC_ID) + .from(UPDATE_MARC_INDEXERS_TEMP_TABLE).asTable("subquery"); + var query = dsl + .select(MARC_RECORDS_TRACKING.MARC_ID, MARC_RECORDS_TRACKING.VERSION) + .from(MARC_RECORDS_TRACKING) + .where(MARC_RECORDS_TRACKING.MARC_ID.in(select(MARC_ID_FIELD).from(subQuery))); + var marcIndexersVersions = query.fetch(); + + // Insert indexers + List indexers = marcIndexersVersions.stream() + .map(record -> { + JSONB jsonb = parsedRecords.get(record.value1()); + if (jsonb != null) { + return createMarcIndexerRecords(dsl, record.value1(), new JsonObject(jsonb.data()), record.value2()); + } + return Collections.emptyList(); + }) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + + dsl.loadInto(MARC_INDEXERS) + .batchAfter(250) + .onErrorAbort() + .loadRecords(indexers) + .fieldsCorresponding() + .execute(); + } + + /** + * Updates the MARC indexers table asynchronously. This method gets the MARC versions based on the provided object id + * and if the record is marked as dirty, it creates and inserts new MARC indexer records into the MARC indexers table. + * If the record type does not match 'MARC_BIB', or if the record is not marked as dirty, the method immediately + * completes with a 'false' result. + * + * @param queryExecutor The executor to run the database queries. + * @param recordType The type of the record to be updated. + * @param objectId The unique identifier of the object to be updated. + * @param content The content to be used for creating new MARC indexer records. + * + * @return A Future containing 'true' if the MARC indexer records are created and inserted successfully, 'false' + * otherwise. If the MARC record with the provided id cannot be found or there are multiple such records, + * a RuntimeException is thrown. + * + * @throws RuntimeException if the MARC record with the provided id cannot be found or there are multiple such records. + */ + public static Future updateMarcIndexersTableAsync(CustomReactiveQueryExecutor queryExecutor, + RecordType recordType, + UUID objectId, + JsonObject content, + Integer version) { + if (!recordType.getTableName().equals(RecordType.MARC_BIB.getTableName())) { + return Future.succeededFuture(false); + } + + DSLContext dslContext = DSL.using(queryExecutor.configuration()); + Collection marcIndexerRecords = + createMarcIndexerRecords(dslContext, objectId, content, version); + return queryExecutor.getDelegate(sqlClient -> { + List batch = new ArrayList<>(); + marcIndexerRecords.stream() + .forEach(r -> batch.add(Tuple.of(r.value1(), + r.value2(), + r.value3(), + r.value4(), + r.value5(), + r.value6(), + r.value7()))); + + String sql = queryExecutor.toPreparedQuery( + dslContext. + insertInto(MARC_INDEXERS, MARC_INDEXERS.fields()) + .values(MARC_INDEXERS.newRecord().intoArray())); + + // Execute the prepared batch + return sqlClient + .preparedQuery(sql) + .executeBatch(batch); + }).map(true); + } + + /** + * Convert a parsed record into rows for MARC_INDEXERS table + * + * difference between this java version and the sql version are as follows: + * - all valued are trimmed + */ + protected static Collection + createMarcIndexerRecords(DSLContext dsl, UUID marcId, JsonObject content, int version) { + JsonArray fieldsArray = content.getJsonArray("fields"); + if (fieldsArray == null) { + throw new IllegalArgumentException("Content does not contain 'fields' property"); + } + Set indexerRecords = new HashSet<>(); + + for (int i = 0; i < fieldsArray.size(); i++) { + JsonObject field = fieldsArray.getJsonObject(i); + + for (String fieldNo : field.fieldNames()) { + Object fieldValueObj = field.getValue(fieldNo); + JsonObject fieldValue = fieldValueObj instanceof JsonObject ? (JsonObject) fieldValueObj: null; + String ind1 = fieldValue != null && fieldValue.containsKey("ind1") && !StringUtils.isBlank(fieldValue.getString("ind1")) + ? fieldValue.getString("ind1").trim() : "#"; + String ind2 = fieldValue != null && fieldValue.containsKey("ind2") && !StringUtils.isBlank(fieldValue.getString("ind2")) + ? fieldValue.getString("ind2").trim() : "#"; + + if (fieldValue != null && fieldValue.containsKey("subfields")) { + JsonArray subfieldsArray = fieldValue.getJsonArray("subfields"); + for (int j = 0; j < subfieldsArray.size(); j++) { + JsonObject subfield = subfieldsArray.getJsonObject(j); + for (String subfieldNo : subfield.fieldNames()) { + String subfieldValue = subfield.getString(subfieldNo); + subfieldValue = trimQuotes(subfieldValue.trim()); + MarcIndexersRecord record = createMarcIndexersRecord(dsl, marcId, version, fieldNo, ind1, ind2, subfieldNo, subfieldValue); + indexerRecords.add(record); + } + } + } else { + String value = trimQuotes(fieldValueObj.toString().trim()); + MarcIndexersRecord record = createMarcIndexersRecord(dsl, marcId, version, fieldNo, ind1, ind2, "0", value); + indexerRecords.add(record); + } + } + } + + return indexerRecords; + } + + private static MarcIndexersRecord createMarcIndexersRecord(DSLContext dsl, UUID marcId, int version, String fieldNo, String ind1, String ind2, String subfieldNo, String value) { + MarcIndexersRecord record = dsl.newRecord(MARC_INDEXERS); + record.setFieldNo(fieldNo); + record.setInd1(ind1); + record.setInd2(ind2); + record.setSubfieldNo(subfieldNo); + record.setValue(value); + record.setMarcId(marcId); + record.setVersion(version); + return record; + } + + private static String trimQuotes(String value) { + if (value.startsWith("\"") && value.endsWith("\"")) { + return value.substring(1, value.length() - 1); + } + return value; + } + /** * Convert database query result {@link Row} to {@link ParsedRecord} * diff --git a/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/InitAPIImpl.java b/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/InitAPIImpl.java index 6c4205e9a..854c91cdc 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/InitAPIImpl.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/InitAPIImpl.java @@ -10,6 +10,8 @@ import io.vertx.core.spi.VerticleFactory; import java.util.List; import java.util.OptionalInt; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.folio.config.ApplicationConfig; @@ -34,6 +36,7 @@ public class InitAPIImpl implements InitAPI { private static final String SPRING_CONTEXT = "springContext"; private static final Logger LOGGER = LogManager.getLogger(); + private static final AtomicBoolean shouldInit = new AtomicBoolean(true); @Autowired private KafkaConfig kafkaConfig; @@ -64,21 +67,26 @@ public void init(Vertx vertx, Context context, Handler> han try { SpringContextUtil.init(vertx, context, ApplicationConfig.class); SpringContextUtil.autowireDependencies(this, context); - AbstractApplicationContext springContext = vertx.getOrCreateContext().get(SPRING_CONTEXT); - VerticleFactory verticleFactory = springContext.getBean(SpringVerticleFactory.class); - vertx.registerVerticleFactory(verticleFactory); - - EventManager.registerKafkaEventPublisher(kafkaConfig, vertx, maxDistributionNumber); - - registerEventHandlers(); - deployMarcIndexersVersionDeletionVerticle(vertx, verticleFactory); - deployConsumerVerticles(vertx, verticleFactory).onComplete(ar -> { - if (ar.succeeded()) { - handler.handle(Future.succeededFuture(true)); - } else { - handler.handle(Future.failedFuture(ar.cause())); - } - }); + if(shouldInit.compareAndSet(true, false)) { + // only one verticle should perform the actions below + AbstractApplicationContext springContext = vertx.getOrCreateContext().get(SPRING_CONTEXT); + VerticleFactory verticleFactory = springContext.getBean(SpringVerticleFactory.class); + vertx.registerVerticleFactory(verticleFactory); + + EventManager.registerKafkaEventPublisher(kafkaConfig, vertx, maxDistributionNumber); + + registerEventHandlers(); + deployMarcIndexersVersionDeletionVerticle(vertx, verticleFactory); + deployConsumerVerticles(vertx, verticleFactory).onComplete(ar -> { + if (ar.succeeded()) { + handler.handle(Future.succeededFuture(true)); + } else { + handler.handle(Future.failedFuture(ar.cause())); + } + }); + } else { + handler.handle(Future.succeededFuture(true)); + } } catch (Throwable th) { LOGGER.error("init:: Failed to init module", th); handler.handle(Future.failedFuture(th)); diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java index 1b971d89a..9f6a03c48 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java @@ -26,10 +26,13 @@ import java.util.Objects; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.ws.rs.BadRequestException; import javax.ws.rs.NotFoundException; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import io.reactivex.Flowable; import io.vertx.core.AsyncResult; import io.vertx.core.Future; @@ -94,12 +97,18 @@ public class RecordServiceImpl implements RecordService { public static final String UPDATE_RECORD_DUPLICATE_EXCEPTION = "Incoming record could be a duplicate, incoming record generation should not be the same as matched record generation and the execution of job should be started after of creating the previous record generation"; public static final char SUBFIELD_S = 's'; public static final char INDICATOR = 'f'; + private record SnapshotCacheKey(String tenantId, String snapshotId){} private final RecordDao recordDao; + private final Cache validSnapshotCache; @Autowired public RecordServiceImpl(final RecordDao recordDao) { this.recordDao = recordDao; + this.validSnapshotCache = Caffeine.newBuilder() + .expireAfterWrite(10, TimeUnit.MINUTES) + .maximumSize(100) + .build(); } @Override @@ -123,32 +132,42 @@ public Future saveRecord(Record record, String tenantId) { LOG.debug("saveRecord:: Saving record with id: {} for tenant: {}", record.getId(), tenantId); ensureRecordHasId(record); ensureRecordHasSuppressDiscovery(record); - return recordDao.executeInTransaction(txQE -> SnapshotDaoUtil.findById(txQE, record.getSnapshotId()) - .map(optionalSnapshot -> optionalSnapshot - .orElseThrow(() -> new NotFoundException(format(SNAPSHOT_NOT_FOUND_TEMPLATE, record.getSnapshotId())))) - .compose(snapshot -> { - if (Objects.isNull(snapshot.getProcessingStartedDate())) { - return Future.failedFuture(new BadRequestException(format(SNAPSHOT_NOT_STARTED_MESSAGE_TEMPLATE, snapshot.getStatus()))); - } + SnapshotCacheKey snapshotCacheKey = new SnapshotCacheKey(tenantId, record.getSnapshotId()); + Snapshot validSnapshot = validSnapshotCache.getIfPresent(snapshotCacheKey); + return recordDao.executeInTransaction(txQE -> Future.succeededFuture() + .compose(notUsed -> { + if (validSnapshot == null) { + return SnapshotDaoUtil.findById(txQE, record.getSnapshotId()) + .map(optionalSnapshot -> optionalSnapshot + .orElseThrow(() -> new NotFoundException(format(SNAPSHOT_NOT_FOUND_TEMPLATE, record.getSnapshotId())))) + .compose(snapshot -> { + if (Objects.isNull(snapshot.getProcessingStartedDate())) { + return Future.failedFuture(new BadRequestException(format(SNAPSHOT_NOT_STARTED_MESSAGE_TEMPLATE, snapshot.getStatus()))); + } + validSnapshotCache.put(snapshotCacheKey, snapshot); + return Future.succeededFuture(); + }); + } else { return Future.succeededFuture(); - }) - .compose(v -> setMatchedIdForRecord(record, tenantId)) - .compose(r -> { - if (Objects.isNull(r.getGeneration())) { - return recordDao.calculateGeneration(txQE, r); - } - return Future.succeededFuture(r.getGeneration()); - }) - .compose(generation -> { - if (generation > 0) { - return recordDao.getRecordByMatchedId(txQE, record.getMatchedId()) - .compose(optionalMatchedRecord -> optionalMatchedRecord - .map(matchedRecord -> recordDao.saveUpdatedRecord(txQE, ensureRecordForeignKeys(record.withGeneration(generation)), matchedRecord.withState(Record.State.OLD))) - .orElseGet(() -> recordDao.saveRecord(txQE, ensureRecordForeignKeys(record.withGeneration(generation))))); - } else { - return recordDao.saveRecord(txQE, ensureRecordForeignKeys(record.withGeneration(generation))); - } - }), tenantId) + } + }) + .compose(v -> setMatchedIdForRecord(record, tenantId)) + .compose(r -> { + if (Objects.isNull(r.getGeneration())) { + return recordDao.calculateGeneration(txQE, r); + } + return Future.succeededFuture(r.getGeneration()); + }) + .compose(generation -> { + if (generation > 0) { + return recordDao.getRecordByMatchedId(txQE, record.getMatchedId()) + .compose(optionalMatchedRecord -> optionalMatchedRecord + .map(matchedRecord -> recordDao.saveUpdatedRecord(txQE, ensureRecordForeignKeys(record.withGeneration(generation)), matchedRecord.withState(Record.State.OLD))) + .orElseGet(() -> recordDao.saveRecord(txQE, ensureRecordForeignKeys(record.withGeneration(generation))))); + } else { + return recordDao.saveRecord(txQE, ensureRecordForeignKeys(record.withGeneration(generation))); + } + }), tenantId) .recover(RecordServiceImpl::mapToDuplicateExceptionIfNeeded); } diff --git a/mod-source-record-storage-server/src/main/resources/liquibase/tenant/changelog.xml b/mod-source-record-storage-server/src/main/resources/liquibase/tenant/changelog.xml index b498662ef..40415b980 100644 --- a/mod-source-record-storage-server/src/main/resources/liquibase/tenant/changelog.xml +++ b/mod-source-record-storage-server/src/main/resources/liquibase/tenant/changelog.xml @@ -70,5 +70,7 @@ + + diff --git a/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2023-06-26--16-00-update-fill-in-trigger.xml b/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2023-06-26--16-00-update-fill-in-trigger.xml new file mode 100644 index 000000000..f59e76d16 --- /dev/null +++ b/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2023-06-26--16-00-update-fill-in-trigger.xml @@ -0,0 +1,45 @@ + + + + + + create or replace function ${database.defaultSchemaName}.fill_in_marc_indexers(p_marc_id uuid, p_marc_content jsonb, p_version integer) + returns void + as + $fill_in_marc_indexers$ + begin + -- This section used to contain an insert statement that would generate rows for marc_indexers table. + -- The logic has been moved up into mod-source-record-storage for reliability & performance reasons. + -- MODSOURCE-664 +-- + insert into ${database.defaultSchemaName}.marc_indexers_leader(p_00_04, p_05, p_06, p_07, p_08, p_09, p_10, p_11, p_12_16, p_17,p_18, p_19, p_20, p_21, p_22, marc_id) + (select substring(value from 1 for 5) p_00_04, + substring(value from 6 for 1) p_05, + substring(value from 7 for 1) p_06, + substring(value from 8 for 1) p_07, + substring(value from 9 for 1) p_08, + substring(value from 10 for 1) p_09, + substring(value from 11 for 1) p_10, + substring(value from 12 for 1) p_11, + substring(value from 13 for 5) p_12_16, + substring(value from 18 for 1) p_17, + substring(value from 19 for 1) p_18, + substring(value from 20 for 1) p_19, + substring(value from 21 for 1) p_20, + substring(value from 22 for 1) p_21, + substring(value from 23 for 1) p_22, + marc_id + from (select replace(lower(trim(both '"' from value::text)), ' ', '#') as value, + p_marc_id marc_id + from jsonb_each(p_marc_content) x + where key = 'leader') y); + end; + $fill_in_marc_indexers$ language plpgsql; + + + + diff --git a/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2024-06-26--16-00-upsert-marc-indexers.xml b/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2024-06-26--16-00-upsert-marc-indexers.xml new file mode 100644 index 000000000..15daa8c51 --- /dev/null +++ b/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2024-06-26--16-00-upsert-marc-indexers.xml @@ -0,0 +1,76 @@ + + + +CREATE OR REPLACE FUNCTION ${database.defaultSchemaName}.upsert_marc_record( + record_id UUID, + content JSONB +) +RETURNS INTEGER AS $$ +DECLARE + v_version INTEGER; + v_was_updated BOOLEAN; +BEGIN + -- Perform the UPSERT operation and determine if it was an update + WITH upserted AS ( + INSERT INTO ${database.defaultSchemaName}.marc_records_lb (id, content) + VALUES (record_id, content) + ON CONFLICT (id) DO UPDATE + SET content = EXCLUDED.content + RETURNING (xmax <> 0) AS was_updated -- TRUE if it was an update, FALSE for insert + ) + SELECT was_updated INTO v_was_updated FROM upserted; -- Get the was_updated value + + -- Decide what to return based on whether it was an insert or an update + IF v_was_updated THEN + SELECT version INTO v_version + FROM ${database.defaultSchemaName}.marc_records_tracking + WHERE marc_id = record_id; + RETURN v_version; + ELSE + RETURN 0; + END IF; +END; +$$ LANGUAGE plpgsql; + + + + +CREATE OR REPLACE FUNCTION ${database.defaultSchemaName}.update_marc_record( + record_id UUID, + new_content JSONB +) +RETURNS INTEGER AS +$$ +DECLARE + affected_rows INTEGER; + curr_version INTEGER; + BEGIN + -- Perform the UPDATE statement + UPDATE ${database.defaultSchemaName}.marc_records_lb + SET content = new_content + WHERE id = record_id; + + -- Get the number of affected rows + GET DIAGNOSTICS affected_rows = ROW_COUNT; + + IF affected_rows > 0 THEN + -- Retrieve the version from the tracking table + SELECT version + INTO curr_version + FROM ${database.defaultSchemaName}.marc_records_tracking + WHERE marc_id = record_id; + ELSE + curr_version := NULL; + END IF; + + -- Return the version or NULL if no rows were affected + RETURN curr_version; + END; +$$ LANGUAGE plpgsql; + + + diff --git a/mod-source-record-storage-server/src/test/java/org/folio/dao/util/ParsedRecordDaoUtilTest.java b/mod-source-record-storage-server/src/test/java/org/folio/dao/util/ParsedRecordDaoUtilTest.java new file mode 100644 index 000000000..48c9edfc7 --- /dev/null +++ b/mod-source-record-storage-server/src/test/java/org/folio/dao/util/ParsedRecordDaoUtilTest.java @@ -0,0 +1,118 @@ +package org.folio.dao.util; + +import io.vertx.core.json.JsonObject; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVRecord; +import org.folio.TestUtil; +import org.folio.rest.jooq.tables.records.MarcIndexersRecord; +import org.jooq.DSLContext; +import org.jooq.Record7; +import org.jooq.SQLDialect; +import org.jooq.impl.DSL; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.io.FileReader; +import java.io.Reader; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import static org.apache.commons.csv.CSVFormat.EXCEL; +import static org.folio.rest.jooq.Tables.MARC_INDEXERS; +import static org.junit.Assert.assertEquals; + +@RunWith(VertxUnitRunner.class) +public class ParsedRecordDaoUtilTest { + + private static final String PARSED_MARC_RECORD_SAMPLE_PATH = + "src/test/resources/org/folio/dao/util/parsedMarcRecord.json"; + private static final String PARSED_MARC_RECORD_BAD_SAMPLE_PATH = + "src/test/resources/org/folio/dao/util/parsedMarcRecordBad.json"; + private static final String MARC_INDEXER_SAMPLE_PATH = + "src/test/resources/org/folio/dao/util/marc_indexer_row.csv"; + private static final DSLContext DSL_CONTEXT = DSL.using(SQLDialect.POSTGRES); + private static final UUID MARC_ID = UUID.fromString("9a4db741-2acb-4ad8-9e66-ab6d17dcbe68"); + private static final Integer VERSION = 1; + + /** + * A single record import was performed with SRS prior to MODSOURCE-664. The parsed record in MARC_RECORDS_LB and the + * MARC_INDEXERS row are saved in this test suite for comparison to the marc indexer generator developed as a result + * of MODSOURCE-664 . A small difference in the MARC_INDEXERS rows saved in this test suite + * is that the MARC_INDEXERS rows have the 010 field's value trimmed. The new marc indexer generator trims its values + * while the SQL version did not. + */ + @Test + public void createMarcIndexerRecords() throws Exception { + String content = TestUtil.readFileFromPath(PARSED_MARC_RECORD_SAMPLE_PATH); + Collection> expected = parseCSV(MARC_INDEXER_SAMPLE_PATH); + + Collection records = + ParsedRecordDaoUtil.createMarcIndexerRecords(DSL_CONTEXT, MARC_ID, new JsonObject(content), VERSION); + + assertEquals(expected, records); + } + + @Test(expected = IllegalArgumentException.class) + public void badParsedRecord() { + String content = TestUtil.readFileFromPath(PARSED_MARC_RECORD_BAD_SAMPLE_PATH); + + ParsedRecordDaoUtil.createMarcIndexerRecords(DSL_CONTEXT, MARC_ID, new JsonObject(content), VERSION); + } + + @Test(expected = io.vertx.core.json.DecodeException.class) + public void notJsonContent() { + String content = "This is a not a parsed record"; + + ParsedRecordDaoUtil.createMarcIndexerRecords(DSL_CONTEXT, MARC_ID, new JsonObject(content), VERSION); + } + + private Collection> parseCSV(String filePath) throws Exception { + Set> records = new HashSet<>(); + + try (Reader in = new FileReader(filePath)) { + Iterable csvRecords = CSVFormat.Builder.create(EXCEL) + .setHeader() + .setSkipHeaderRecord(true) + .build() + .parse(in); + + for (CSVRecord csvRecord : csvRecords) { + Record7 record = createRecord( + csvRecord.get(0), + csvRecord.get(1), + csvRecord.get(2), + csvRecord.get(3), + csvRecord.get(4), + UUID.fromString(csvRecord.get(5)), + Integer.parseInt(csvRecord.get(6)) + ); + records.add(record); + } + } + + return records; + } + + private Record7 createRecord( + String col1, + String col2, + String col3, + String col4, + String col5, + UUID col6, + Integer col7 + ) { + return DSL_CONTEXT.newRecord( + MARC_INDEXERS.FIELD_NO, + MARC_INDEXERS.IND1, + MARC_INDEXERS.IND2, + MARC_INDEXERS.SUBFIELD_NO, + MARC_INDEXERS.VALUE, + MARC_INDEXERS.MARC_ID, + MARC_INDEXERS.VERSION) + .values(col1, col2, col3, col4, col5, col6, col7); + } +} diff --git a/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/SourceStorageBatchApiTest.java b/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/SourceStorageBatchApiTest.java index ce8d9a661..492102d57 100644 --- a/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/SourceStorageBatchApiTest.java +++ b/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/SourceStorageBatchApiTest.java @@ -16,12 +16,15 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; import com.fasterxml.jackson.databind.ObjectMapper; +import io.vertx.core.json.JsonArray; import org.apache.http.HttpStatus; import org.folio.TestMocks; import org.folio.TestUtil; @@ -50,7 +53,6 @@ import io.restassured.RestAssured; import io.restassured.response.Response; -import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; @@ -889,8 +891,11 @@ public void shouldUpdateParsedRecordsWithJsonContent(TestContext testContext) { Record createdRecord = createResponse.body().as(Record.class); async.complete(); + Map contentObject = new HashMap<>(); + contentObject.put("leader", "01542ccm a2200361 4500"); + contentObject.put("fields", new ArrayList<>()); ParsedRecord parsedRecordJson = new ParsedRecord().withId(createdRecord.getParsedRecord().getId()) - .withContent(new JsonObject().put("leader", "01542ccm a2200361 4500").put("fields", new JsonArray())); + .withContent(contentObject); RecordCollection recordCollection = new RecordCollection() .withRecords(Collections.singletonList(createdRecord.withParsedRecord(parsedRecordJson))) diff --git a/mod-source-record-storage-server/src/test/resources/org/folio/dao/util/marc_indexer_row.csv b/mod-source-record-storage-server/src/test/resources/org/folio/dao/util/marc_indexer_row.csv new file mode 100644 index 000000000..ebe0ca692 --- /dev/null +++ b/mod-source-record-storage-server/src/test/resources/org/folio/dao/util/marc_indexer_row.csv @@ -0,0 +1,61 @@ +field_no,ind1,ind2,subfield_no,value,marc_id,version +001,#,#,0,in00000000001,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +005,#,#,0,20230627082130.8,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +008,#,#,0,750907c19509999enkqr p 0 a0eng d,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +010,#,#,a,58020553,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +022,#,#,a,0022-0469,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +035,#,#,a,366832,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +035,#,#,a,(CStRLIN)NYCX1604275S,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +035,#,#,a,(NIC)notisABP6388,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +035,#,#,a,(OCoLC)1604275,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +040,#,#,d,CStRLIN,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +040,#,#,d,CtY,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +040,#,#,d,MBTI,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +040,#,#,d,NIC,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +050,0,#,a,BR140,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +050,0,#,b,.J6,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +082,#,#,a,270.05,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +222,0,4,a,The Journal of ecclesiastical history,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +245,0,4,a,The Journal of ecclesiastical history.,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +260,#,#,a,"London,",9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +260,#,#,b,Cambridge University Press [etc.],9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +265,#,#,a,"32 East 57th St., New York, 10022",9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +300,#,#,a,v.,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +300,#,#,b,25 cm.,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +310,#,#,a,"Quarterly,",9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +310,#,#,b,1970-,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +321,#,#,a,"Semiannual,",9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +321,#,#,b,1950-69,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +362,0,#,a,v. 1- Apr. 1950-,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +570,#,#,a,Editor: C. W. Dugmore.,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +650,#,0,a,Church history,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +650,#,0,x,Periodicals.,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +650,#,7,0,(OCoLC)fst00860740,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +650,#,7,2,fast,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +650,#,7,a,Church history,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +655,#,7,0,(OCoLC)fst01411641,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +655,#,7,2,fast,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +655,#,7,a,Periodicals,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +700,1,#,a,"Dugmore, C. W.",9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +700,1,#,e,ed.,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +700,1,#,q,"(Clifford William),",9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +853,0,3,8,1,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +853,0,3,a,v.,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +853,0,3,i,(year),9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +863,4,0,8,1,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +863,4,0,a,1-49,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +863,4,0,i,1950-1998,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +902,#,#,a,pfnd,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +902,#,#,b,Lintz,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +905,#,#,a,19890510120000.0,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +948,2,#,a,20141106,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +948,2,#,b,m,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +948,2,#,d,batch,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +948,2,#,e,lts,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +948,2,#,x,addfast,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +950,#,#,a,BR140,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +950,#,#,b,.J86,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +950,#,#,h,01/01/01 N,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +950,#,#,l,OLIN,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +999,f,f,i,65ad2158-687a-406e-9799-aa4427dd9d4a,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +999,f,f,s,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 diff --git a/mod-source-record-storage-server/src/test/resources/org/folio/dao/util/parsedMarcRecord.json b/mod-source-record-storage-server/src/test/resources/org/folio/dao/util/parsedMarcRecord.json new file mode 100644 index 000000000..77427ab1b --- /dev/null +++ b/mod-source-record-storage-server/src/test/resources/org/folio/dao/util/parsedMarcRecord.json @@ -0,0 +1,426 @@ +{ + "fields": [ + { + "001": "in00000000001" + }, + { + "008": "750907c19509999enkqr p 0 a0eng d" + }, + { + "005": "20230627082130.8" + }, + { + "010": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": " 58020553 " + } + ] + } + }, + { + "022": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "0022-0469" + } + ] + } + }, + { + "035": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "(CStRLIN)NYCX1604275S" + } + ] + } + }, + { + "035": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "(NIC)notisABP6388" + } + ] + } + }, + { + "035": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "366832" + } + ] + } + }, + { + "035": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "(OCoLC)1604275" + } + ] + } + }, + { + "040": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "d": "CtY" + }, + { + "d": "MBTI" + }, + { + "d": "CtY" + }, + { + "d": "MBTI" + }, + { + "d": "NIC" + }, + { + "d": "CStRLIN" + }, + { + "d": "NIC" + } + ] + } + }, + { + "050": { + "ind1": "0", + "ind2": " ", + "subfields": [ + { + "a": "BR140" + }, + { + "b": ".J6" + } + ] + } + }, + { + "082": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "270.05" + } + ] + } + }, + { + "222": { + "ind1": "0", + "ind2": "4", + "subfields": [ + { + "a": "The Journal of ecclesiastical history" + } + ] + } + }, + { + "245": { + "ind1": "0", + "ind2": "4", + "subfields": [ + { + "a": "The Journal of ecclesiastical history." + } + ] + } + }, + { + "260": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "London," + }, + { + "b": "Cambridge University Press [etc.]" + } + ] + } + }, + { + "265": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "32 East 57th St., New York, 10022" + } + ] + } + }, + { + "300": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "v." + }, + { + "b": "25 cm." + } + ] + } + }, + { + "310": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "Quarterly," + }, + { + "b": "1970-" + } + ] + } + }, + { + "321": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "Semiannual," + }, + { + "b": "1950-69" + } + ] + } + }, + { + "362": { + "ind1": "0", + "ind2": " ", + "subfields": [ + { + "a": "v. 1- Apr. 1950-" + } + ] + } + }, + { + "570": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "Editor: C. W. Dugmore." + } + ] + } + }, + { + "650": { + "ind1": " ", + "ind2": "0", + "subfields": [ + { + "a": "Church history" + }, + { + "x": "Periodicals." + } + ] + } + }, + { + "650": { + "ind1": " ", + "ind2": "7", + "subfields": [ + { + "a": "Church history" + }, + { + "2": "fast" + }, + { + "0": "(OCoLC)fst00860740" + } + ] + } + }, + { + "655": { + "ind1": " ", + "ind2": "7", + "subfields": [ + { + "a": "Periodicals" + }, + { + "2": "fast" + }, + { + "0": "(OCoLC)fst01411641" + } + ] + } + }, + { + "700": { + "ind1": "1", + "ind2": " ", + "subfields": [ + { + "a": "Dugmore, C. W." + }, + { + "q": "(Clifford William)," + }, + { + "e": "ed." + } + ] + } + }, + { + "853": { + "ind1": "0", + "ind2": "3", + "subfields": [ + { + "8": "1" + }, + { + "a": "v." + }, + { + "i": "(year)" + } + ] + } + }, + { + "863": { + "ind1": "4", + "ind2": "0", + "subfields": [ + { + "8": "1" + }, + { + "a": "1-49" + }, + { + "i": "1950-1998" + } + ] + } + }, + { + "902": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "pfnd" + }, + { + "b": "Lintz" + } + ] + } + }, + { + "905": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "19890510120000.0" + } + ] + } + }, + { + "948": { + "ind1": "2", + "ind2": " ", + "subfields": [ + { + "a": "20141106" + }, + { + "b": "m" + }, + { + "d": "batch" + }, + { + "e": "lts" + }, + { + "x": "addfast" + } + ] + } + }, + { + "950": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "l": "OLIN" + }, + { + "a": "BR140" + }, + { + "b": ".J86" + }, + { + "h": "01/01/01 N" + } + ] + } + }, + { + "999": { + "ind1": "f", + "ind2": "f", + "subfields": [ + { + "s": "9a4db741-2acb-4ad8-9e66-ab6d17dcbe68" + }, + { + "i": "65ad2158-687a-406e-9799-aa4427dd9d4a" + } + ] + } + } + ], + "leader": "01338cas a2200409 4500" +} diff --git a/mod-source-record-storage-server/src/test/resources/org/folio/dao/util/parsedMarcRecordBad.json b/mod-source-record-storage-server/src/test/resources/org/folio/dao/util/parsedMarcRecordBad.json new file mode 100644 index 000000000..ef5e73a3a --- /dev/null +++ b/mod-source-record-storage-server/src/test/resources/org/folio/dao/util/parsedMarcRecordBad.json @@ -0,0 +1,14 @@ +{ + "not_fields": [ + { + "001": "in00000000001" + }, + { + "008": "750907c19509999enkqr p 0 a0eng d" + }, + { + "005": "20230627082130.8" + } + ], + "leader": "01338cas a2200409 4500" +}