From ece6f354360593e91d6a85e4c18acabff712ccad Mon Sep 17 00:00:00 2001 From: Olamide Kolawole Date: Tue, 27 Jun 2023 16:09:31 -0500 Subject: [PATCH 1/8] MODSOURCE-664 move flattening of parsed records into the module --- mod-source-record-storage-server/pom.xml | 6 + .../java/org/folio/dao/RecordDaoImpl.java | 25 +- .../folio/dao/util/ParsedRecordDaoUtil.java | 294 +++++++++++- .../resources/liquibase/tenant/changelog.xml | 1 + ...23-06-26--16-00-update-fill-in-trigger.xml | 45 ++ .../dao/util/ParsedRecordDaoUtilTest.java | 115 +++++ .../org/folio/dao/util/marc_indexer_row.csv | 61 +++ .../org/folio/dao/util/parsedMarcRecord.json | 426 ++++++++++++++++++ .../folio/dao/util/parsedMarcRecordBad.json | 14 + 9 files changed, 964 insertions(+), 23 deletions(-) create mode 100644 mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2023-06-26--16-00-update-fill-in-trigger.xml create mode 100644 mod-source-record-storage-server/src/test/java/org/folio/dao/util/ParsedRecordDaoUtilTest.java create mode 100644 mod-source-record-storage-server/src/test/resources/org/folio/dao/util/marc_indexer_row.csv create mode 100644 mod-source-record-storage-server/src/test/resources/org/folio/dao/util/parsedMarcRecord.json create mode 100644 mod-source-record-storage-server/src/test/resources/org/folio/dao/util/parsedMarcRecordBad.json diff --git a/mod-source-record-storage-server/pom.xml b/mod-source-record-storage-server/pom.xml index 5206f6c5c..b2d68d9bc 100644 --- a/mod-source-record-storage-server/pom.xml +++ b/mod-source-record-storage-server/pom.xml @@ -170,6 +170,12 @@ 5.1.1 test + + org.apache.commons + commons-csv + 1.10.0 + test + org.folio data-import-processing-core diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java index 4f7a1d68e..4ba33a086 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java @@ -13,6 +13,7 @@ import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.text.StrSubstitutor; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.folio.dao.util.ErrorRecordDaoUtil; @@ -624,6 +625,11 @@ public Future saveRecords(RecordCollection recordCollectio .fieldsCorresponding() .execute(); + // update marc_indexers if record type is MARC_RECORDS_LB + ParsedRecordDaoUtil.updateMarcIndexersTableSync(dsl, + recordType, + dbParsedRecords.stream().collect(Collectors.toMap(Record2::value1, Record2::value2))); + if (!dbErrorRecords.isEmpty()) { // batch insert error records dsl.loadInto(ERROR_RECORDS_LB) @@ -800,6 +806,7 @@ public Future updateParsedRecords(RecordCollection r List> recordUpdates = new ArrayList<>(); List> parsedRecordUpdates = new ArrayList<>(); + Map parsedMarcIndexersInput = new HashMap<>(); // used to insert marc_indexers Field prtId = field(name(ID), UUID.class); Field prtContent = field(name(CONTENT), JSONB.class); @@ -872,13 +879,16 @@ public Future updateParsedRecords(RecordCollection r try { RecordType recordType = toRecordType(record.getRecordType().name()); recordType.formatRecord(record); - + UUID id = UUID.fromString(record.getParsedRecord().getId()); + JSONB content = JSONB.valueOf(ParsedRecordDaoUtil.normalizeContent(record.getParsedRecord())); parsedRecordUpdates.add( DSL.update(table(name(recordType.getTableName()))) - .set(prtContent, JSONB.valueOf(ParsedRecordDaoUtil.normalizeContent(record.getParsedRecord()))) - .where(prtId.eq(UUID.fromString(record.getParsedRecord().getId()))) + .set(prtContent, content) + .where(prtId.eq(id)) ); + parsedMarcIndexersInput.put(record.getParsedRecord().getId(), content); + } catch (Exception e) { errorMessages.add(format(INVALID_PARSED_RECORD_MESSAGE_TEMPLATE, record.getId(), e.getMessage())); // if invalid parsed record, set id to null to filter out @@ -920,6 +930,15 @@ public Future updateParsedRecords(RecordCollection r } } + // update MARC_INDEXERS + Map parsedRecordMap = parsedRecordsUpdated.stream() + .map(rec -> Pair.of(UUID.fromString(rec.getId()), parsedMarcIndexersInput.get(rec.getId()))) + .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); + Optional recordTypeOptional = recordTypes.stream().findFirst(); + if (recordTypeOptional.isPresent()) { + ParsedRecordDaoUtil.updateMarcIndexersTableSync(dsl, toRecordType(recordTypeOptional.get()), parsedRecordMap); + } + blockingPromise.complete(new ParsedRecordsBatchResponse() .withErrorMessages(errorMessages) .withParsedRecords(parsedRecordsUpdated) diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java index 167160881..8cbc3527f 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java @@ -1,31 +1,51 @@ package org.folio.dao.util; -import static java.lang.String.format; -import static org.jooq.impl.DSL.field; -import static org.jooq.impl.DSL.name; -import static org.jooq.impl.DSL.table; - -import java.util.Objects; -import java.util.Optional; -import java.util.UUID; - -import javax.ws.rs.NotFoundException; - +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.github.jklingsporn.vertx.jooq.classic.reactivepg.ReactiveClassicGenericQueryExecutor; +import io.github.jklingsporn.vertx.jooq.shared.postgres.JSONBToJsonObjectConverter; +import io.vertx.core.Future; +import io.vertx.core.json.JsonObject; +import io.vertx.sqlclient.Row; import org.apache.commons.lang3.StringUtils; import org.folio.rest.jaxrs.model.ErrorRecord; import org.folio.rest.jaxrs.model.ParsedRecord; import org.folio.rest.jaxrs.model.Record; import org.folio.rest.jooq.tables.records.EdifactRecordsLbRecord; import org.folio.rest.jooq.tables.records.MarcRecordsLbRecord; +import org.jooq.DSLContext; import org.jooq.Field; +import org.jooq.InsertValuesStepN; import org.jooq.JSONB; +import org.jooq.Record1; +import org.jooq.Table; +import org.jooq.TableField; +import org.jooq.exception.DataAccessException; +import org.jooq.impl.DSL; import org.jooq.impl.SQLDataType; +import org.jooq.impl.TableImpl; -import io.github.jklingsporn.vertx.jooq.classic.reactivepg.ReactiveClassicGenericQueryExecutor; -import io.github.jklingsporn.vertx.jooq.shared.postgres.JSONBToJsonObjectConverter; -import io.vertx.core.Future; -import io.vertx.core.json.JsonObject; -import io.vertx.sqlclient.Row; +import javax.ws.rs.NotFoundException; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import static java.lang.String.format; +import static org.folio.rest.jooq.Tables.MARC_RECORDS_TRACKING; +import static org.jooq.impl.DSL.field; +import static org.jooq.impl.DSL.name; +import static org.jooq.impl.DSL.select; +import static org.jooq.impl.DSL.table; /** * Utility class for managing {@link ParsedRecord} @@ -37,12 +57,41 @@ public final class ParsedRecordDaoUtil { private static final String LEADER = "leader"; private static final int LEADER_STATUS_SUBFIELD_POSITION = 5; - private static final Field ID_FIELD = field(name(ID), UUID.class); - private static final Field CONTENT_FIELD = field(name(CONTENT), SQLDataType.JSONB.asConvertedDataType(new JSONBToJsonObjectConverter())); - + public static final Field ID_FIELD = field(name(ID), UUID.class); + public static final Field CONTENT_FIELD = field(name(CONTENT), SQLDataType.JSONB.asConvertedDataType(new JSONBToJsonObjectConverter())); + private static final Field MARC_ID_FIELD = field(name("marc_id"), UUID.class); public static final String PARSED_RECORD_NOT_FOUND_TEMPLATE = "Parsed Record with id '%s' was not found"; - public static final String PARSED_RECORD_CONTENT = "parsed_record_content"; + private static final MarcIndexersUpdatedIds UPDATE_MARC_INDEXERS_TEMP_TABLE = new MarcIndexersUpdatedIds(); + public static final MarcIndexers MARC_INDEXERS_TABLE = new MarcIndexers(); + + public static class MarcIndexersUpdatedIds extends TableImpl> { + public final TableField, UUID> MARC_ID = createField(DSL.name("marc_id"), SQLDataType.UUID); + + private MarcIndexersUpdatedIds() { + super(DSL.name("marc_indexers_updated_ids")); + } + } + + public static class MarcIndexers extends TableImpl { + public final TableField MARC_ID = createField(DSL.name("marc_id"), SQLDataType.UUID); + public final TableField FIELD_NO = + createField(name("field_no"), SQLDataType.VARCHAR); + public final TableField IND1 = + createField(name("ind1"), SQLDataType.VARCHAR); + public final TableField IND2 = + createField(name("ind2"), SQLDataType.VARCHAR); + public final TableField SUBFIELD_NO = + createField(name("subfield_no"), SQLDataType.VARCHAR); + public final TableField VALUE = + createField(name("value"), SQLDataType.VARCHAR); + public final TableField VERSION = + createField(name("version"), SQLDataType.INTEGER); + + private MarcIndexers() { + super(DSL.name("marc_indexers")); + } + } private ParsedRecordDaoUtil() { } @@ -82,6 +131,10 @@ public static Future save(ReactiveClassicGenericQueryExecutor quer .doUpdate() .set(CONTENT_FIELD, content) .returning()) + .compose(res -> + updateMarcIndexersTableAsync(queryExecutor, recordType, id, JSONB.valueOf(content.encode())) + .compose(ar -> Future.succeededFuture(res)) + ) .map(res -> parsedRecord .withContent(content.getMap())); } @@ -102,6 +155,10 @@ public static Future update(ReactiveClassicGenericQueryExecutor qu return queryExecutor.executeAny(dsl -> dsl.update(table(name(recordType.getTableName()))) .set(CONTENT_FIELD, content) .where(ID_FIELD.eq(id))) + .compose(res -> + updateMarcIndexersTableAsync(queryExecutor, recordType, id, JSONB.valueOf(content.encode())) + .compose(ar -> Future.succeededFuture(res)) + ) .map(update -> { if (update.rowCount() > 0) { return parsedRecord @@ -112,6 +169,203 @@ public static Future update(ReactiveClassicGenericQueryExecutor qu }); } + /** + * Synchronously updates the MARC indexers table based on the parsed records provided. This method first validates + * if the provided record type matches 'MARC_BIB'. If not, it terminates without performing any operation. + * Otherwise, it seeds the temporary table with MARC record IDs, retrieves their versions and inserts new indexers + * based on the parsed records data. + * + * @param dsl The DSLContext instance used to create and execute SQL queries. + * @param recordType The type of the record to be updated. + * @param parsedRecords A map containing the UUIDs of the records as keys and the parsed JSONB content as values. + * + * @throws DataAccessException if the execution of any of the SQL queries fails. + */ + public static void updateMarcIndexersTableSync(DSLContext dsl, RecordType recordType, Map parsedRecords) throws IOException { + if (!recordType.getTableName().equals(RecordType.MARC_BIB.getTableName())) { + return; + } + + // Seed marc records identifiers before getting their versions + dsl.createTemporaryTableIfNotExists(UPDATE_MARC_INDEXERS_TEMP_TABLE) + .column(UPDATE_MARC_INDEXERS_TEMP_TABLE.MARC_ID) + .onCommitDrop() + .execute(); + + List> tempIds = parsedRecords.keySet().stream().map(k -> { + Record1 uuidRecord1 = dsl.newRecord(UPDATE_MARC_INDEXERS_TEMP_TABLE.MARC_ID); + uuidRecord1.set(UPDATE_MARC_INDEXERS_TEMP_TABLE.MARC_ID, k); + return uuidRecord1; + }).collect(Collectors.toList()); + + dsl.loadInto(UPDATE_MARC_INDEXERS_TEMP_TABLE) + .batchAfter(250) + .onErrorAbort() + .loadRecords(tempIds) + .fieldsCorresponding() + .execute(); + + // Get marc records versions + Table> subQuery = select(UPDATE_MARC_INDEXERS_TEMP_TABLE.MARC_ID) + .from(UPDATE_MARC_INDEXERS_TEMP_TABLE).asTable("subquery"); + var query = dsl + .select(MARC_RECORDS_TRACKING.MARC_ID, MARC_RECORDS_TRACKING.VERSION) + .from(MARC_RECORDS_TRACKING) + .where(MARC_RECORDS_TRACKING.MARC_ID.in(select(MARC_ID_FIELD).from(subQuery))); + var marcIndexersVersions = query.fetch(); + + // Insert indexers + List indexers = marcIndexersVersions.stream() + .map(record -> { + JSONB jsonb = parsedRecords.get(record.value1()); + if (jsonb != null) { + return createMarcIndexerRecord(dsl, record.value1(), jsonb.data(), record.value2()); + } + return Collections.emptyList(); + }) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + + dsl.loadInto(MARC_INDEXERS_TABLE) + .batchAfter(250) + .onErrorAbort() + .loadRecords(indexers) + .fieldsCorresponding() + .execute(); + } + + /** + * Updates the MARC indexers table asynchronously. This method gets the MARC versions based on the provided object id + * and if the record is marked as dirty, it creates and inserts new MARC indexer records into the MARC indexers table. + * If the record type does not match 'MARC_BIB', or if the record is not marked as dirty, the method immediately + * completes with a 'false' result. + * + * @param queryExecutor The executor to run the database queries. + * @param recordType The type of the record to be updated. + * @param objectId The unique identifier of the object to be updated. + * @param content The content to be used for creating new MARC indexer records. + * + * @return A Future containing 'true' if the MARC indexer records are created and inserted successfully, 'false' + * otherwise. If the MARC record with the provided id cannot be found or there are multiple such records, + * a RuntimeException is thrown. + * + * @throws RuntimeException if the MARC record with the provided id cannot be found or there are multiple such records. + */ + public static Future updateMarcIndexersTableAsync(ReactiveClassicGenericQueryExecutor queryExecutor, + RecordType recordType, + UUID objectId, + JSONB content) { + if (!recordType.getTableName().equals(RecordType.MARC_BIB.getTableName())) { + return Future.succeededFuture(false); + } + return queryExecutor.query(dsl -> + // get marc versions + dsl + .select(MARC_RECORDS_TRACKING.MARC_ID, MARC_RECORDS_TRACKING.VERSION, MARC_RECORDS_TRACKING.IS_DIRTY) + .from(MARC_RECORDS_TRACKING) + .where(MARC_RECORDS_TRACKING.MARC_ID.eq(objectId))) + .compose(ar -> { + if (ar.stream().count() != 1) { + throw new RuntimeException("Could not get version for marc record with id=" + objectId); + } + UUID marcId = ar.get(0, UUID.class); + Integer marcVersion = ar.get(1, Integer.class); + boolean isDirty = ar.get(2, Boolean.class); + if(!isDirty) + { + return Future.succeededFuture(false); + } + + // insert marc indexers records + return queryExecutor.execute(dsl -> { + Collection marcIndexerRecords = + createMarcIndexerRecord(dsl, marcId, content.data(), marcVersion); + InsertValuesStepN insertStep = null; + + for (var record : marcIndexerRecords) { + if (insertStep == null) { + insertStep = dsl.insertInto(MARC_INDEXERS_TABLE) + .values(record.intoArray()); + continue; + } + insertStep = insertStep.values(record); + } + return insertStep; + }).map(true); + }) + .compose(Future::succeededFuture); + } + + /** + * Convert a parsed record into rows for MARC_INDEXERS table + * + * difference between this java version and the sql version are as follows: + * - all valued are trimmed + */ + protected static Set + createMarcIndexerRecord(DSLContext dsl, UUID marcId, String content, int version) { + ObjectMapper objectMapper = new ObjectMapper(); + JsonNode jsonObject = null; + try { + jsonObject = objectMapper.readTree(content); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Error while parsing some content to generate marc_indexers records", e); + } + + JsonNode fieldsArray = jsonObject.get("fields"); + if (fieldsArray == null) { + throw new IllegalArgumentException("Content does not contain 'fields' property"); + } + Set indexerRecords = new HashSet<>(); + + for (JsonNode field : fieldsArray) { + Iterator> fieldIterator = field.fields(); + while (fieldIterator.hasNext()) { + Map.Entry fieldEntry = fieldIterator.next(); + String fieldNo = fieldEntry.getKey().toLowerCase(); + JsonNode fieldValue = fieldEntry.getValue(); + String ind1 = fieldValue.has("ind1") && !fieldValue.get("ind1").asText().trim().isEmpty() + ? fieldValue.get("ind1").asText().trim() : "#"; + String ind2 = fieldValue.has("ind2") && !fieldValue.get("ind2").asText().trim().isEmpty() + ? fieldValue.get("ind2").asText().trim() : "#"; + + if (fieldValue.has("subfields")) { + JsonNode subfieldsArray = fieldValue.get("subfields"); + for (JsonNode subfield : subfieldsArray) { + Iterator> subfieldIterator = subfield.fields(); + while (subfieldIterator.hasNext()) { + Map.Entry subfieldEntry = subfieldIterator.next(); + String subfieldNo = subfieldEntry.getKey(); + String subfieldValue = subfieldEntry.getValue().asText().trim().replaceAll("\"", ""); + var record = dsl.newRecord(MARC_INDEXERS_TABLE); + record.setValue(MARC_INDEXERS_TABLE.FIELD_NO, fieldNo); + record.setValue(MARC_INDEXERS_TABLE.IND1, ind1); + record.setValue(MARC_INDEXERS_TABLE.IND2, ind2); + record.setValue(MARC_INDEXERS_TABLE.SUBFIELD_NO, subfieldNo); + record.setValue(MARC_INDEXERS_TABLE.VALUE, subfieldValue); + record.setValue(MARC_INDEXERS_TABLE.MARC_ID, marcId); + record.setValue(MARC_INDEXERS_TABLE.VERSION, version); + indexerRecords.add(record); + } + } + } else { + String value = fieldValue.textValue().trim().replaceAll("\"", ""); + var record = dsl.newRecord(MARC_INDEXERS_TABLE); + record.setValue(MARC_INDEXERS_TABLE.FIELD_NO, fieldNo); + record.setValue(MARC_INDEXERS_TABLE.IND1, ind1); + record.setValue(MARC_INDEXERS_TABLE.IND2, ind2); + record.setValue(MARC_INDEXERS_TABLE.SUBFIELD_NO, "0"); + record.setValue(MARC_INDEXERS_TABLE.VALUE, value); + record.setValue(MARC_INDEXERS_TABLE.MARC_ID, marcId); + record.setValue(MARC_INDEXERS_TABLE.VERSION, version); + indexerRecords.add(record); + } + } + } + + return indexerRecords; + } + /** * Convert database query result {@link Row} to {@link ParsedRecord} * diff --git a/mod-source-record-storage-server/src/main/resources/liquibase/tenant/changelog.xml b/mod-source-record-storage-server/src/main/resources/liquibase/tenant/changelog.xml index fe598d861..6ecc319b9 100644 --- a/mod-source-record-storage-server/src/main/resources/liquibase/tenant/changelog.xml +++ b/mod-source-record-storage-server/src/main/resources/liquibase/tenant/changelog.xml @@ -68,5 +68,6 @@ + diff --git a/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2023-06-26--16-00-update-fill-in-trigger.xml b/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2023-06-26--16-00-update-fill-in-trigger.xml new file mode 100644 index 000000000..f59e76d16 --- /dev/null +++ b/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2023-06-26--16-00-update-fill-in-trigger.xml @@ -0,0 +1,45 @@ + + + + + + create or replace function ${database.defaultSchemaName}.fill_in_marc_indexers(p_marc_id uuid, p_marc_content jsonb, p_version integer) + returns void + as + $fill_in_marc_indexers$ + begin + -- This section used to contain an insert statement that would generate rows for marc_indexers table. + -- The logic has been moved up into mod-source-record-storage for reliability & performance reasons. + -- MODSOURCE-664 +-- + insert into ${database.defaultSchemaName}.marc_indexers_leader(p_00_04, p_05, p_06, p_07, p_08, p_09, p_10, p_11, p_12_16, p_17,p_18, p_19, p_20, p_21, p_22, marc_id) + (select substring(value from 1 for 5) p_00_04, + substring(value from 6 for 1) p_05, + substring(value from 7 for 1) p_06, + substring(value from 8 for 1) p_07, + substring(value from 9 for 1) p_08, + substring(value from 10 for 1) p_09, + substring(value from 11 for 1) p_10, + substring(value from 12 for 1) p_11, + substring(value from 13 for 5) p_12_16, + substring(value from 18 for 1) p_17, + substring(value from 19 for 1) p_18, + substring(value from 20 for 1) p_19, + substring(value from 21 for 1) p_20, + substring(value from 22 for 1) p_21, + substring(value from 23 for 1) p_22, + marc_id + from (select replace(lower(trim(both '"' from value::text)), ' ', '#') as value, + p_marc_id marc_id + from jsonb_each(p_marc_content) x + where key = 'leader') y); + end; + $fill_in_marc_indexers$ language plpgsql; + + + + diff --git a/mod-source-record-storage-server/src/test/java/org/folio/dao/util/ParsedRecordDaoUtilTest.java b/mod-source-record-storage-server/src/test/java/org/folio/dao/util/ParsedRecordDaoUtilTest.java new file mode 100644 index 000000000..72b4a32bd --- /dev/null +++ b/mod-source-record-storage-server/src/test/java/org/folio/dao/util/ParsedRecordDaoUtilTest.java @@ -0,0 +1,115 @@ +package org.folio.dao.util; + +import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVRecord; +import org.folio.TestUtil; +import org.jooq.DSLContext; +import org.jooq.Record7; +import org.jooq.SQLDialect; +import org.jooq.impl.DSL; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.io.FileReader; +import java.io.Reader; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import static org.apache.commons.csv.CSVFormat.EXCEL; +import static org.folio.dao.util.ParsedRecordDaoUtil.MARC_INDEXERS_TABLE; +import static org.junit.Assert.assertEquals; + +@RunWith(VertxUnitRunner.class) +public class ParsedRecordDaoUtilTest { + + private static final String PARSED_MARC_RECORD_SAMPLE_PATH = + "src/test/resources/org/folio/dao/util/parsedMarcRecord.json"; + private static final String PARSED_MARC_RECORD_BAD_SAMPLE_PATH = + "src/test/resources/org/folio/dao/util/parsedMarcRecordBad.json"; + private static final String MARC_INDEXER_SAMPLE_PATH = + "src/test/resources/org/folio/dao/util/marc_indexer_row.csv"; + private static final DSLContext DSL_CONTEXT = DSL.using(SQLDialect.POSTGRES); + private static final UUID MARC_ID = UUID.fromString("9a4db741-2acb-4ad8-9e66-ab6d17dcbe68"); + private static final Integer VERSION = 1; + + /** + * A single record import was performed with SRS prior to MODSOURCE-664. The parsed record in MARC_RECORDS_LB and the + * MARC_INDEXERS row are saved in this test suite for comparison to the marc indexer generator developed as a result + * of MODSOURCE-664 . A small difference in the MARC_INDEXERS rows saved in this test suite + * is that the MARC_INDEXERS rows have the 010 field's value trimmed. The new marc indexer generator trims its values + * while the SQL version did not. + */ + @Test + public void createMarcIndexerRecord() throws Exception { + String content = TestUtil.readFileFromPath(PARSED_MARC_RECORD_SAMPLE_PATH); + Set> expected = parseCSV(MARC_INDEXER_SAMPLE_PATH); + + Set records = + ParsedRecordDaoUtil.createMarcIndexerRecord(DSL_CONTEXT, MARC_ID, content, VERSION); + + assertEquals(expected, records); + } + + @Test(expected = IllegalArgumentException.class) + public void badParsedRecord() throws Exception { + String content = TestUtil.readFileFromPath(PARSED_MARC_RECORD_BAD_SAMPLE_PATH); + + ParsedRecordDaoUtil.createMarcIndexerRecord(DSL_CONTEXT, MARC_ID, content, VERSION); + } + + @Test(expected = IllegalArgumentException.class) + public void notJsonContent() throws Exception { + String content = "This is a not a parsed record"; + + ParsedRecordDaoUtil.createMarcIndexerRecord(DSL_CONTEXT, MARC_ID, content, VERSION); + } + + private Set> parseCSV(String filePath) throws Exception { + Set> records = new HashSet<>(); + + try (Reader in = new FileReader(filePath)) { + Iterable csvRecords = CSVFormat.Builder.create(EXCEL) + .setHeader() + .setSkipHeaderRecord(true) + .build() + .parse(in); + + for (CSVRecord csvRecord : csvRecords) { + Record7 record = createRecord( + csvRecord.get(0), + csvRecord.get(1), + csvRecord.get(2), + csvRecord.get(3), + csvRecord.get(4), + UUID.fromString(csvRecord.get(5)), + Integer.parseInt(csvRecord.get(6)) + ); + records.add(record); + } + } + + return records; + } + + private Record7 createRecord( + String col1, + String col2, + String col3, + String col4, + String col5, + UUID col6, + Integer col7 + ) { + return DSL_CONTEXT.newRecord( + MARC_INDEXERS_TABLE.FIELD_NO, + MARC_INDEXERS_TABLE.IND1, + MARC_INDEXERS_TABLE.IND2, + MARC_INDEXERS_TABLE.SUBFIELD_NO, + MARC_INDEXERS_TABLE.VALUE, + MARC_INDEXERS_TABLE.MARC_ID, + MARC_INDEXERS_TABLE.VERSION) + .values(col1, col2, col3, col4, col5, col6, col7); + } +} diff --git a/mod-source-record-storage-server/src/test/resources/org/folio/dao/util/marc_indexer_row.csv b/mod-source-record-storage-server/src/test/resources/org/folio/dao/util/marc_indexer_row.csv new file mode 100644 index 000000000..ebe0ca692 --- /dev/null +++ b/mod-source-record-storage-server/src/test/resources/org/folio/dao/util/marc_indexer_row.csv @@ -0,0 +1,61 @@ +field_no,ind1,ind2,subfield_no,value,marc_id,version +001,#,#,0,in00000000001,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +005,#,#,0,20230627082130.8,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +008,#,#,0,750907c19509999enkqr p 0 a0eng d,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +010,#,#,a,58020553,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +022,#,#,a,0022-0469,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +035,#,#,a,366832,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +035,#,#,a,(CStRLIN)NYCX1604275S,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +035,#,#,a,(NIC)notisABP6388,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +035,#,#,a,(OCoLC)1604275,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +040,#,#,d,CStRLIN,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +040,#,#,d,CtY,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +040,#,#,d,MBTI,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +040,#,#,d,NIC,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +050,0,#,a,BR140,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +050,0,#,b,.J6,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +082,#,#,a,270.05,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +222,0,4,a,The Journal of ecclesiastical history,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +245,0,4,a,The Journal of ecclesiastical history.,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +260,#,#,a,"London,",9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +260,#,#,b,Cambridge University Press [etc.],9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +265,#,#,a,"32 East 57th St., New York, 10022",9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +300,#,#,a,v.,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +300,#,#,b,25 cm.,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +310,#,#,a,"Quarterly,",9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +310,#,#,b,1970-,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +321,#,#,a,"Semiannual,",9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +321,#,#,b,1950-69,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +362,0,#,a,v. 1- Apr. 1950-,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +570,#,#,a,Editor: C. W. Dugmore.,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +650,#,0,a,Church history,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +650,#,0,x,Periodicals.,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +650,#,7,0,(OCoLC)fst00860740,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +650,#,7,2,fast,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +650,#,7,a,Church history,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +655,#,7,0,(OCoLC)fst01411641,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +655,#,7,2,fast,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +655,#,7,a,Periodicals,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +700,1,#,a,"Dugmore, C. W.",9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +700,1,#,e,ed.,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +700,1,#,q,"(Clifford William),",9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +853,0,3,8,1,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +853,0,3,a,v.,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +853,0,3,i,(year),9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +863,4,0,8,1,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +863,4,0,a,1-49,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +863,4,0,i,1950-1998,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +902,#,#,a,pfnd,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +902,#,#,b,Lintz,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +905,#,#,a,19890510120000.0,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +948,2,#,a,20141106,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +948,2,#,b,m,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +948,2,#,d,batch,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +948,2,#,e,lts,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +948,2,#,x,addfast,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +950,#,#,a,BR140,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +950,#,#,b,.J86,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +950,#,#,h,01/01/01 N,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +950,#,#,l,OLIN,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +999,f,f,i,65ad2158-687a-406e-9799-aa4427dd9d4a,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 +999,f,f,s,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,9a4db741-2acb-4ad8-9e66-ab6d17dcbe68,1 diff --git a/mod-source-record-storage-server/src/test/resources/org/folio/dao/util/parsedMarcRecord.json b/mod-source-record-storage-server/src/test/resources/org/folio/dao/util/parsedMarcRecord.json new file mode 100644 index 000000000..77427ab1b --- /dev/null +++ b/mod-source-record-storage-server/src/test/resources/org/folio/dao/util/parsedMarcRecord.json @@ -0,0 +1,426 @@ +{ + "fields": [ + { + "001": "in00000000001" + }, + { + "008": "750907c19509999enkqr p 0 a0eng d" + }, + { + "005": "20230627082130.8" + }, + { + "010": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": " 58020553 " + } + ] + } + }, + { + "022": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "0022-0469" + } + ] + } + }, + { + "035": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "(CStRLIN)NYCX1604275S" + } + ] + } + }, + { + "035": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "(NIC)notisABP6388" + } + ] + } + }, + { + "035": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "366832" + } + ] + } + }, + { + "035": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "(OCoLC)1604275" + } + ] + } + }, + { + "040": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "d": "CtY" + }, + { + "d": "MBTI" + }, + { + "d": "CtY" + }, + { + "d": "MBTI" + }, + { + "d": "NIC" + }, + { + "d": "CStRLIN" + }, + { + "d": "NIC" + } + ] + } + }, + { + "050": { + "ind1": "0", + "ind2": " ", + "subfields": [ + { + "a": "BR140" + }, + { + "b": ".J6" + } + ] + } + }, + { + "082": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "270.05" + } + ] + } + }, + { + "222": { + "ind1": "0", + "ind2": "4", + "subfields": [ + { + "a": "The Journal of ecclesiastical history" + } + ] + } + }, + { + "245": { + "ind1": "0", + "ind2": "4", + "subfields": [ + { + "a": "The Journal of ecclesiastical history." + } + ] + } + }, + { + "260": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "London," + }, + { + "b": "Cambridge University Press [etc.]" + } + ] + } + }, + { + "265": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "32 East 57th St., New York, 10022" + } + ] + } + }, + { + "300": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "v." + }, + { + "b": "25 cm." + } + ] + } + }, + { + "310": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "Quarterly," + }, + { + "b": "1970-" + } + ] + } + }, + { + "321": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "Semiannual," + }, + { + "b": "1950-69" + } + ] + } + }, + { + "362": { + "ind1": "0", + "ind2": " ", + "subfields": [ + { + "a": "v. 1- Apr. 1950-" + } + ] + } + }, + { + "570": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "Editor: C. W. Dugmore." + } + ] + } + }, + { + "650": { + "ind1": " ", + "ind2": "0", + "subfields": [ + { + "a": "Church history" + }, + { + "x": "Periodicals." + } + ] + } + }, + { + "650": { + "ind1": " ", + "ind2": "7", + "subfields": [ + { + "a": "Church history" + }, + { + "2": "fast" + }, + { + "0": "(OCoLC)fst00860740" + } + ] + } + }, + { + "655": { + "ind1": " ", + "ind2": "7", + "subfields": [ + { + "a": "Periodicals" + }, + { + "2": "fast" + }, + { + "0": "(OCoLC)fst01411641" + } + ] + } + }, + { + "700": { + "ind1": "1", + "ind2": " ", + "subfields": [ + { + "a": "Dugmore, C. W." + }, + { + "q": "(Clifford William)," + }, + { + "e": "ed." + } + ] + } + }, + { + "853": { + "ind1": "0", + "ind2": "3", + "subfields": [ + { + "8": "1" + }, + { + "a": "v." + }, + { + "i": "(year)" + } + ] + } + }, + { + "863": { + "ind1": "4", + "ind2": "0", + "subfields": [ + { + "8": "1" + }, + { + "a": "1-49" + }, + { + "i": "1950-1998" + } + ] + } + }, + { + "902": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "pfnd" + }, + { + "b": "Lintz" + } + ] + } + }, + { + "905": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "a": "19890510120000.0" + } + ] + } + }, + { + "948": { + "ind1": "2", + "ind2": " ", + "subfields": [ + { + "a": "20141106" + }, + { + "b": "m" + }, + { + "d": "batch" + }, + { + "e": "lts" + }, + { + "x": "addfast" + } + ] + } + }, + { + "950": { + "ind1": " ", + "ind2": " ", + "subfields": [ + { + "l": "OLIN" + }, + { + "a": "BR140" + }, + { + "b": ".J86" + }, + { + "h": "01/01/01 N" + } + ] + } + }, + { + "999": { + "ind1": "f", + "ind2": "f", + "subfields": [ + { + "s": "9a4db741-2acb-4ad8-9e66-ab6d17dcbe68" + }, + { + "i": "65ad2158-687a-406e-9799-aa4427dd9d4a" + } + ] + } + } + ], + "leader": "01338cas a2200409 4500" +} diff --git a/mod-source-record-storage-server/src/test/resources/org/folio/dao/util/parsedMarcRecordBad.json b/mod-source-record-storage-server/src/test/resources/org/folio/dao/util/parsedMarcRecordBad.json new file mode 100644 index 000000000..ef5e73a3a --- /dev/null +++ b/mod-source-record-storage-server/src/test/resources/org/folio/dao/util/parsedMarcRecordBad.json @@ -0,0 +1,14 @@ +{ + "not_fields": [ + { + "001": "in00000000001" + }, + { + "008": "750907c19509999enkqr p 0 a0eng d" + }, + { + "005": "20230627082130.8" + } + ], + "leader": "01338cas a2200409 4500" +} From c246519a67201f6b3ffd4fcb0d1b1199e01e4cf3 Mon Sep 17 00:00:00 2001 From: Olamide Kolawole Date: Wed, 28 Jun 2023 14:10:18 -0500 Subject: [PATCH 2/8] fix tests --- .../folio/dao/util/ParsedRecordDaoUtil.java | 3 +- .../rest/impl/RecordsGenerationTest.java | 44 ++++++++++--------- .../folio/rest/impl/SourceRecordApiTest.java | 25 ++++++----- .../rest/impl/SourceStorageBatchApiTest.java | 8 +++- 4 files changed, 46 insertions(+), 34 deletions(-) diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java index 8cbc3527f..70a9b0c9f 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java @@ -74,7 +74,6 @@ private MarcIndexersUpdatedIds() { } public static class MarcIndexers extends TableImpl { - public final TableField MARC_ID = createField(DSL.name("marc_id"), SQLDataType.UUID); public final TableField FIELD_NO = createField(name("field_no"), SQLDataType.VARCHAR); public final TableField IND1 = @@ -85,6 +84,8 @@ public static class MarcIndexers extends TableImpl { createField(name("subfield_no"), SQLDataType.VARCHAR); public final TableField VALUE = createField(name("value"), SQLDataType.VARCHAR); + public final TableField MARC_ID = + createField(DSL.name("marc_id"), SQLDataType.UUID); public final TableField VERSION = createField(name("version"), SQLDataType.INTEGER); diff --git a/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/RecordsGenerationTest.java b/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/RecordsGenerationTest.java index 5c634a372..4537e112c 100644 --- a/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/RecordsGenerationTest.java +++ b/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/RecordsGenerationTest.java @@ -1,17 +1,13 @@ package org.folio.rest.impl; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.UUID; - +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; - +import io.restassured.RestAssured; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; import org.apache.http.HttpStatus; import org.folio.TestUtil; import org.folio.dao.PostgresClientFactory; @@ -26,12 +22,15 @@ import org.junit.Test; import org.junit.runner.RunWith; -import io.restassured.RestAssured; -import io.vertx.core.json.JsonArray; -import io.vertx.core.json.JsonObject; -import io.vertx.ext.unit.Async; -import io.vertx.ext.unit.TestContext; -import io.vertx.ext.unit.junit.VertxUnitRunner; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; @RunWith(VertxUnitRunner.class) public class RecordsGenerationTest extends AbstractRestVerticleTest { @@ -355,7 +354,7 @@ public void shouldReturnNotFoundOnGetFormattedByInstanceIdWhenRecordDoesNotExist } @Test - public void shouldReturnSameRecordOnGetByIdAndGetBySRSId(TestContext testContext) { + public void shouldReturnSameRecordOnGetByIdAndGetBySRSId(TestContext testContext) throws JsonProcessingException { Async async = testContext.async(); RestAssured.given() .spec(spec) @@ -368,11 +367,14 @@ public void shouldReturnSameRecordOnGetByIdAndGetBySRSId(TestContext testContext async = testContext.async(); String srsId = UUID.randomUUID().toString(); + String contentString = new JsonObject().put("leader", "01542ccm a2200361 4500") + .put("fields", new JsonArray().add(new JsonObject().put("999", new JsonObject() + .put("subfields", new JsonArray().add(new JsonObject().put("s", srsId)))))) + .encode(); + Map contentObj = new ObjectMapper().readValue(contentString, Map.class); ParsedRecord parsedRecord = new ParsedRecord().withId(srsId) - .withContent(new JsonObject().put("leader", "01542ccm a2200361 4500") - .put("fields", new JsonArray().add(new JsonObject().put("999", new JsonObject() - .put("subfields", new JsonArray().add(new JsonObject().put("s", srsId))))))); + .withContent(contentObj); Record newRecord = new Record() .withId(srsId) diff --git a/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/SourceRecordApiTest.java b/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/SourceRecordApiTest.java index 1fd8a37e0..bac843150 100644 --- a/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/SourceRecordApiTest.java +++ b/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/SourceRecordApiTest.java @@ -15,10 +15,12 @@ import java.util.Arrays; import java.util.Date; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.UUID; import java.util.stream.Collectors; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.restassured.RestAssured; import io.restassured.response.Response; @@ -852,17 +854,20 @@ public void shouldReturnSourceRecordsForPeriod(TestContext testContext) { } @Test - public void shouldReturnSourceRecordsByListOfId(TestContext testContext) { + public void shouldReturnSourceRecordsByListOfId(TestContext testContext) throws JsonProcessingException { postSnapshots(testContext, snapshot_1, snapshot_2); String firstSrsId = UUID.randomUUID().toString(); String firstInstanceId = UUID.randomUUID().toString(); + String contentString = new JsonObject().put("leader", "01542dcm a2200361 4500") + .put("fields", new JsonArray().add(new JsonObject().put("999", new JsonObject() + .put("subfields", + new JsonArray().add(new JsonObject().put("s", firstSrsId)).add(new JsonObject().put("i", firstInstanceId)))))) + .encode(); + Map contentObj = new ObjectMapper().readValue(contentString, Map.class); ParsedRecord parsedRecord = new ParsedRecord().withId(firstSrsId) - .withContent(new JsonObject().put("leader", "01542dcm a2200361 4500") - .put("fields", new JsonArray().add(new JsonObject().put("999", new JsonObject() - .put("subfields", - new JsonArray().add(new JsonObject().put("s", firstSrsId)).add(new JsonObject().put("i", firstInstanceId))))))); + .withContent(contentObj); Record deleted_record_1 = new Record() .withId(firstSrsId) @@ -908,7 +913,7 @@ public void shouldReturnSourceRecordsByListOfId(TestContext testContext) { .post(SOURCE_STORAGE_SOURCE_RECORDS_PATH + "?idType=RECORD&deleted=false") .then() .statusCode(HttpStatus.SC_OK) - .body("sourceRecords.size()", is(3)) + .body("sourceRecords.size()", is(4)) .body("totalRecords", is(4)) .body("sourceRecords*.deleted", everyItem(is(false))); async.complete(); @@ -921,7 +926,7 @@ public void shouldReturnSourceRecordsByListOfId(TestContext testContext) { .post(SOURCE_STORAGE_SOURCE_RECORDS_PATH + "?idType=RECORD&deleted=true") .then() .statusCode(HttpStatus.SC_OK) - .body("sourceRecords.size()", is(4)) + .body("sourceRecords.size()", is(5)) .body("totalRecords", is(5)); async.complete(); @@ -938,7 +943,7 @@ public void shouldReturnSourceRecordsByListOfId(TestContext testContext) { .post(SOURCE_STORAGE_SOURCE_RECORDS_PATH + "?idType=INSTANCE&deleted=false") .then() .statusCode(HttpStatus.SC_OK) - .body("sourceRecords.size()", is(3)) + .body("sourceRecords.size()", is(4)) .body("totalRecords", is(4)) .body("sourceRecords*.deleted", everyItem(is(false))); async.complete(); @@ -951,7 +956,7 @@ public void shouldReturnSourceRecordsByListOfId(TestContext testContext) { .post(SOURCE_STORAGE_SOURCE_RECORDS_PATH + "?idType=INSTANCE&deleted=true") .then() .statusCode(HttpStatus.SC_OK) - .body("sourceRecords.size()", is(4)) + .body("sourceRecords.size()", is(5)) .body("totalRecords", is(5)); async.complete(); @@ -963,7 +968,7 @@ public void shouldReturnSourceRecordsByListOfId(TestContext testContext) { .post(SOURCE_STORAGE_SOURCE_RECORDS_PATH + "?idType=RECORD") .then() .statusCode(HttpStatus.SC_OK) - .body("sourceRecords.size()", is(3)) + .body("sourceRecords.size()", is(4)) .body("totalRecords", is(4)); async.complete(); } diff --git a/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/SourceStorageBatchApiTest.java b/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/SourceStorageBatchApiTest.java index 380ffbccc..74e859962 100644 --- a/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/SourceStorageBatchApiTest.java +++ b/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/SourceStorageBatchApiTest.java @@ -16,7 +16,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; @@ -50,7 +52,6 @@ import io.restassured.RestAssured; import io.restassured.response.Response; -import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; @@ -882,8 +883,11 @@ public void shouldUpdateParsedRecordsWithJsonContent(TestContext testContext) { Record createdRecord = createResponse.body().as(Record.class); async.complete(); + Map contentObject = new HashMap<>(); + contentObject.put("leader", "01542ccm a2200361 4500"); + contentObject.put("fields", new ArrayList<>()); ParsedRecord parsedRecordJson = new ParsedRecord().withId(createdRecord.getParsedRecord().getId()) - .withContent(new JsonObject().put("leader", "01542ccm a2200361 4500").put("fields", new JsonArray())); + .withContent(contentObject); RecordCollection recordCollection = new RecordCollection() .withRecords(Collections.singletonList(createdRecord.withParsedRecord(parsedRecordJson))) From c43ce0dd978a532b044f835507971a3cd616f3f7 Mon Sep 17 00:00:00 2001 From: Olamide Kolawole Date: Thu, 25 Jan 2024 18:20:12 -0600 Subject: [PATCH 3/8] MODSOURCE-664 missing import --- .../test/java/org/folio/rest/impl/SourceStorageBatchApiTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/SourceStorageBatchApiTest.java b/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/SourceStorageBatchApiTest.java index 29c320060..b45106361 100644 --- a/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/SourceStorageBatchApiTest.java +++ b/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/SourceStorageBatchApiTest.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; +import io.vertx.core.json.JsonArray; import org.apache.http.HttpStatus; import org.folio.TestMocks; import org.folio.TestUtil; From 0f98006b5684d6f98d8a67e9cb9412885d1a22e7 Mon Sep 17 00:00:00 2001 From: Olamide Kolawole Date: Mon, 1 Jul 2024 13:52:42 -0500 Subject: [PATCH 4/8] MODSOURCE-664 Optimize load of marc_indexers table --- mod-source-record-storage-server/pom.xml | 12 +- .../CustomReactiveQueryExecutor.java | 72 ++++++ .../org/folio/dao/PostgresClientFactory.java | 11 +- .../folio/dao/QueryExecutorInterceptor.java | 33 ++- .../main/java/org/folio/dao/RecordDao.java | 9 +- .../java/org/folio/dao/RecordDaoImpl.java | 25 +- .../folio/dao/util/ParsedRecordDaoUtil.java | 244 ++++++++---------- .../java/org/folio/rest/impl/InitAPIImpl.java | 38 +-- .../org/folio/services/RecordServiceImpl.java | 69 +++-- .../resources/liquibase/tenant/changelog.xml | 1 + ...2024-06-26--16-00-upsert-marc-indexers.xml | 23 ++ .../dao/util/ParsedRecordDaoUtilTest.java | 39 +-- 12 files changed, 356 insertions(+), 220 deletions(-) create mode 100644 mod-source-record-storage-server/src/main/java/io/github/jklingsporn/vertx/jooq/classic/reactivepg/CustomReactiveQueryExecutor.java create mode 100644 mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2024-06-26--16-00-upsert-marc-indexers.xml diff --git a/mod-source-record-storage-server/pom.xml b/mod-source-record-storage-server/pom.xml index 2146affcf..4db53a326 100644 --- a/mod-source-record-storage-server/pom.xml +++ b/mod-source-record-storage-server/pom.xml @@ -286,7 +286,7 @@ 1.18.3 ${project.parent.basedir} ${project.parent.basedir}/ramls - 3.16.19 + 3.16.23 6.5.5 1.9.19 42.7.2 @@ -494,12 +494,18 @@ org.jooq.meta.postgres.PostgresDatabase .* - databasechangelog|databasechangeloglock|marc_indexers.* + databasechangelog|databasechangeloglock|marc_indexers_.* public true false - + + + io.vertx.core.json.JsonObject + upsert_marc_record.content + io.github.jklingsporn.vertx.jooq.shared.postgres.JSONBToJsonObjectConverter + + true diff --git a/mod-source-record-storage-server/src/main/java/io/github/jklingsporn/vertx/jooq/classic/reactivepg/CustomReactiveQueryExecutor.java b/mod-source-record-storage-server/src/main/java/io/github/jklingsporn/vertx/jooq/classic/reactivepg/CustomReactiveQueryExecutor.java new file mode 100644 index 000000000..1283b98ce --- /dev/null +++ b/mod-source-record-storage-server/src/main/java/io/github/jklingsporn/vertx/jooq/classic/reactivepg/CustomReactiveQueryExecutor.java @@ -0,0 +1,72 @@ +package io.github.jklingsporn.vertx.jooq.classic.reactivepg; + + +import io.vertx.core.Future; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.RowSet; +import io.vertx.sqlclient.SqlClient; +import io.vertx.sqlclient.Transaction; +import org.jooq.Configuration; +import org.jooq.Query; +import org.jooq.SQLDialect; +import org.jooq.conf.ParamType; + +import java.util.function.Function; + +/** + * This class was moved to this package so that we can access the constructor that accepts a transaction. The constructor + * is only accessible from this package. + */ +public class CustomReactiveQueryExecutor extends ReactiveClassicGenericQueryExecutor { + private final String tenantId; + private static final String pattern = "(? newInstance(SqlClient connection) { + return transaction -> new CustomReactiveQueryExecutor(configuration(), connection, transaction, tenantId); + } + + @SuppressWarnings("unchecked") + public Future customTransaction(Function> transaction){ + return this.transaction((Function>) transaction); + } + + /** + * This method was copied from the super class. The only difference is the pattern used to replace characters + * in the named query. The pattern in the super class did not handle some cases. + */ + @Override + public String toPreparedQuery(Query query) { + if (SQLDialect.POSTGRES.supports(configuration().dialect())) { + String namedQuery = query.getSQL(ParamType.NAMED); + return namedQuery.replaceAll(pattern, "\\$"); + } + // mysql works with the standard string + return query.getSQL(); + } + + + + /** + * This is a hack to expose the underlying vertx sql client because vertx-jooq does not support batch operations. + */ + public Future> getDelegate(Function>> delegateFunction){ + return delegateFunction.apply(delegate); + } +} diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/PostgresClientFactory.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/PostgresClientFactory.java index 6c7017e82..39fa0b50e 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/PostgresClientFactory.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/PostgresClientFactory.java @@ -2,6 +2,7 @@ import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; +import io.github.jklingsporn.vertx.jooq.classic.reactivepg.CustomReactiveQueryExecutor; import io.github.jklingsporn.vertx.jooq.classic.reactivepg.ReactiveClassicGenericQueryExecutor; import io.vertx.core.json.JsonObject; import io.vertx.pgclient.PgConnectOptions; @@ -72,7 +73,7 @@ public class PostgresClientFactory { private final Vertx vertx; - private static Class reactiveClassicGenericQueryExecutorProxyClass; + private static Class reactiveClassicGenericQueryExecutorProxyClass; @Value("${srs.db.reactive.numRetries:3}") private Integer numOfRetries; @@ -123,13 +124,13 @@ public void close() { * @param tenantId tenant id * @return reactive query executor */ - public ReactiveClassicGenericQueryExecutor getQueryExecutor(String tenantId) { + public CustomReactiveQueryExecutor getQueryExecutor(String tenantId) { if (reactiveClassicGenericQueryExecutorProxyClass == null) setupProxyExecutorClass(); - ReactiveClassicGenericQueryExecutor queryExecutorProxy; + CustomReactiveQueryExecutor queryExecutorProxy; try { queryExecutorProxy = reactiveClassicGenericQueryExecutorProxyClass - .getDeclaredConstructor(Configuration.class, SqlClient.class) - .newInstance(configuration, getCachedPool(this.vertx, tenantId).getDelegate()); + .getDeclaredConstructor(Configuration.class, SqlClient.class, String.class) + .newInstance(configuration, getCachedPool(this.vertx, tenantId).getDelegate(), tenantId); } catch (Exception e) { throw new RuntimeException("Something happened while creating proxied reactiveClassicGenericQueryExecutor", e); } diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/QueryExecutorInterceptor.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/QueryExecutorInterceptor.java index f671af329..62192975b 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/QueryExecutorInterceptor.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/QueryExecutorInterceptor.java @@ -1,6 +1,6 @@ package org.folio.dao; -import io.github.jklingsporn.vertx.jooq.classic.reactivepg.ReactiveClassicGenericQueryExecutor; +import io.github.jklingsporn.vertx.jooq.classic.reactivepg.CustomReactiveQueryExecutor; import io.vertx.core.Context; import io.vertx.core.Future; import io.vertx.core.Promise; @@ -56,13 +56,15 @@ public static void setRetryDelay(long delay) { * * @return the generated subclass */ - public static Class generateClass() { + public static Class generateClass() { return new ByteBuddy() - .subclass(ReactiveClassicGenericQueryExecutor.class) + .subclass(CustomReactiveQueryExecutor.class) .constructor(ElementMatchers.any()) // Match all constructors .intercept(SuperMethodCall.INSTANCE) // Call the original constructor .method(ElementMatchers.named("transaction")) // For transaction method .intercept(MethodDelegation.to(QueryExecutorInterceptor.class)) + .method(ElementMatchers.named("customTransaction")) // For custom transaction method + .intercept(MethodDelegation.to(QueryExecutorInterceptor.class)) .method(ElementMatchers.named("query")) // For query method .intercept(MethodDelegation.to(QueryExecutorInterceptor.class)) .make() @@ -80,7 +82,7 @@ public static Class generateClass public static Future query( @net.bytebuddy.implementation.bind.annotation.SuperCall Callable> superCall ) { - LOGGER.trace("query method of ReactiveClassicGenericQueryExecutor proxied"); + LOGGER.trace("query method of CustomReactiveQueryExecutor proxied"); return retryOf(() -> { try { return superCall.call(); @@ -101,7 +103,28 @@ public static Future query( public static Future transaction( @net.bytebuddy.implementation.bind.annotation.SuperCall Callable> superCall ) { - LOGGER.trace("transaction method of ReactiveClassicGenericQueryExecutor proxied"); + LOGGER.trace("transaction method of CustomReactiveQueryExecutor proxied"); + return retryOf(() -> { + try { + return superCall.call(); + } catch (Throwable e) { + LOGGER.error("Something happened while attempting to make proxied call for transaction method", e); + return Future.failedFuture(e); + } + }, numRetries); + } + + /** + * Interceptor for the transaction method, with retry functionality. + * + * @param superCall the original method call + * @return the result of the transaction operation + */ + @SuppressWarnings("unused") + public static Future customTransaction( + @net.bytebuddy.implementation.bind.annotation.SuperCall Callable> superCall + ) { + LOGGER.trace("customTransaction method of CustomReactiveQueryExecutor proxied"); return retryOf(() -> { try { return superCall.call(); diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java index 49412d6ee..468ac6ae4 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java @@ -5,6 +5,7 @@ import java.util.Optional; import java.util.function.Function; +import io.github.jklingsporn.vertx.jooq.classic.reactivepg.CustomReactiveQueryExecutor; import io.vertx.sqlclient.Row; import org.folio.dao.util.IdType; import org.folio.dao.util.RecordType; @@ -194,13 +195,13 @@ Future getMatchedRecordsIdentifiers(MatchField mat Future saveRecord(Record record, String tenantId); /** - * Saves {@link Record} to the db using {@link ReactiveClassicGenericQueryExecutor} + * Saves {@link Record} to the db using {@link CustomReactiveQueryExecutor} * * @param txQE query executor * @param record Record to save * @return future with saved Record */ - Future saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record record); + Future saveRecord(CustomReactiveQueryExecutor txQE, Record record); /** * Saves {@link RecordCollection} to the db @@ -372,7 +373,7 @@ Future getMatchedRecordsIdentifiers(MatchField mat * @param oldRecord old Record that has to be marked as "old" * @return future with new "updated" Record */ - Future saveUpdatedRecord(ReactiveClassicGenericQueryExecutor txQE, Record newRecord, Record oldRecord); + Future saveUpdatedRecord(CustomReactiveQueryExecutor txQE, Record newRecord, Record oldRecord); /** * Change suppress from discovery flag for record by external relation id @@ -393,7 +394,7 @@ Future getMatchedRecordsIdentifiers(MatchField mat * @param tenantId tenant id * @return future with generic type */ - Future executeInTransaction(Function> action, String tenantId); + Future executeInTransaction(Function> action, String tenantId); /** * Search for non-existent mark bib ids in the system diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java index 7bdb1d760..14980a1e9 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java @@ -35,6 +35,7 @@ import static org.jooq.impl.DSL.trueCondition; import com.google.common.collect.Lists; +import io.github.jklingsporn.vertx.jooq.classic.reactivepg.CustomReactiveQueryExecutor; import io.github.jklingsporn.vertx.jooq.classic.reactivepg.ReactiveClassicGenericQueryExecutor; import io.github.jklingsporn.vertx.jooq.shared.internal.QueryResult; import io.reactivex.Flowable; @@ -224,8 +225,8 @@ public RecordDaoImpl(final PostgresClientFactory postgresClientFactory) { } @Override - public Future executeInTransaction(Function> action, String tenantId) { - return getQueryExecutor(tenantId).transaction(action); + public Future executeInTransaction(Function> action, String tenantId) { + return getQueryExecutor(tenantId).customTransaction(action); } @Override @@ -618,11 +619,11 @@ public Future> getRecordByCondition(ReactiveClassicGenericQuery @Override public Future saveRecord(Record record, String tenantId) { LOG.trace("saveRecord:: Saving {} record {} for tenant {}", record.getRecordType(), record.getId(), tenantId); - return getQueryExecutor(tenantId).transaction(txQE -> saveRecord(txQE, record)); + return getQueryExecutor(tenantId).customTransaction(txQE -> saveRecord(txQE, record)); } @Override - public Future saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record record) { + public Future saveRecord(CustomReactiveQueryExecutor txQE, Record record) { LOG.trace("saveRecord:: Saving {} record {}", record.getRecordType(), record.getId()); return insertOrUpdateRecord(txQE, record); } @@ -849,7 +850,7 @@ public Future saveRecords(RecordCollection recordCollectio @Override public Future updateRecord(Record record, String tenantId) { LOG.trace("updateRecord:: Updating {} record {} for tenant {}", record.getRecordType(), record.getId(), tenantId); - return getQueryExecutor(tenantId).transaction(txQE -> getRecordById(txQE, record.getId()) + return getQueryExecutor(tenantId).customTransaction(txQE -> getRecordById(txQE, record.getId()) .compose(optionalRecord -> optionalRecord .map(r -> saveRecord(txQE, record)) .orElse(Future.failedFuture(new NotFoundException(format(RECORD_NOT_FOUND_TEMPLATE, record.getId())))))); @@ -959,7 +960,7 @@ public Future calculateGeneration(ReactiveClassicGenericQueryExecutor t @Override public Future updateParsedRecord(Record record, String tenantId) { LOG.trace("updateParsedRecord:: Updating {} record {} for tenant {}", record.getRecordType(), record.getId(), tenantId); - return getQueryExecutor(tenantId).transaction(txQE -> GenericCompositeFuture.all(Lists.newArrayList( + return getQueryExecutor(tenantId).customTransaction(txQE -> GenericCompositeFuture.all(Lists.newArrayList( updateExternalIdsForRecord(txQE, record), ParsedRecordDaoUtil.update(txQE, record.getParsedRecord(), ParsedRecordDaoUtil.toRecordType(record)) )).map(res -> record.getParsedRecord())); @@ -1191,7 +1192,7 @@ private MarcBibCollection toMarcBibCollection(QueryResult result) { } @Override - public Future saveUpdatedRecord(ReactiveClassicGenericQueryExecutor txQE, Record newRecord, Record oldRecord) { + public Future saveUpdatedRecord(CustomReactiveQueryExecutor txQE, Record newRecord, Record oldRecord) { LOG.trace("saveUpdatedRecord:: Saving updated record {}", newRecord.getId()); return insertOrUpdateRecord(txQE, oldRecord).compose(r -> insertOrUpdateRecord(txQE, newRecord)); } @@ -1308,7 +1309,7 @@ public Future updateRecordsState(String matchedId, RecordState state, Reco public Future updateMarcAuthorityRecordsStateAsDeleted(String matchedId, String tenantId) { Condition condition = RECORDS_LB.MATCHED_ID.eq(UUID.fromString(matchedId)); - return getQueryExecutor(tenantId).transaction(txQE -> getRecords(condition, RecordType.MARC_AUTHORITY, new ArrayList<>(), 0, RECORDS_LIMIT, tenantId) + return getQueryExecutor(tenantId).customTransaction(txQE -> getRecords(condition, RecordType.MARC_AUTHORITY, new ArrayList<>(), 0, RECORDS_LIMIT, tenantId) .compose(recordCollection -> { List> futures = recordCollection.getRecords().stream() .map(recordToUpdate -> updateMarcAuthorityRecordWithDeletedState(txQE, ensureRecordForeignKeys(recordToUpdate))) @@ -1327,7 +1328,7 @@ public Future updateMarcAuthorityRecordsStateAsDeleted(String matchedId, S })); } - private Future updateMarcAuthorityRecordWithDeletedState(ReactiveClassicGenericQueryExecutor txQE, Record record) { + private Future updateMarcAuthorityRecordWithDeletedState(CustomReactiveQueryExecutor txQE, Record record) { record.withState(Record.State.DELETED); if (Objects.nonNull(record.getParsedRecord())) { record.getParsedRecord().setId(record.getId()); @@ -1342,7 +1343,7 @@ private Future updateMarcAuthorityRecordWithDeletedState(ReactiveClassic } - private ReactiveClassicGenericQueryExecutor getQueryExecutor(String tenantId) { + private CustomReactiveQueryExecutor getQueryExecutor(String tenantId) { return postgresClientFactory.getQueryExecutor(tenantId); } @@ -1388,7 +1389,7 @@ private Future lookupAssociatedRecords(ReactiveClassicGenericQueryExecut return GenericCompositeFuture.all(futures).map(res -> record); } - private Future insertOrUpdateRecord(ReactiveClassicGenericQueryExecutor txQE, Record record) { + private Future insertOrUpdateRecord(CustomReactiveQueryExecutor txQE, Record record) { return RawRecordDaoUtil.save(txQE, record.getRawRecord()) .compose(rawRecord -> { if (Objects.nonNull(record.getParsedRecord())) { @@ -1416,7 +1417,7 @@ private Future insertOrUpdateRecord(ReactiveClassicGenericQueryExecutor }); } - private Future insertOrUpdateParsedRecord(ReactiveClassicGenericQueryExecutor txQE, Record record) { + private Future insertOrUpdateParsedRecord(CustomReactiveQueryExecutor txQE, Record record) { try { LOG.trace("insertOrUpdateParsedRecord:: Inserting or updating {} parsed record", record.getRecordType()); // attempt to format record to validate diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java index 70a9b0c9f..b4354c269 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java @@ -1,22 +1,23 @@ package org.folio.dao.util; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; +import io.github.jklingsporn.vertx.jooq.classic.reactivepg.CustomReactiveQueryExecutor; import io.github.jklingsporn.vertx.jooq.classic.reactivepg.ReactiveClassicGenericQueryExecutor; import io.github.jklingsporn.vertx.jooq.shared.postgres.JSONBToJsonObjectConverter; import io.vertx.core.Future; +import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.Tuple; import org.apache.commons.lang3.StringUtils; import org.folio.rest.jaxrs.model.ErrorRecord; import org.folio.rest.jaxrs.model.ParsedRecord; import org.folio.rest.jaxrs.model.Record; +import org.folio.rest.jooq.routines.UpsertMarcRecord; import org.folio.rest.jooq.tables.records.EdifactRecordsLbRecord; +import org.folio.rest.jooq.tables.records.MarcIndexersRecord; import org.folio.rest.jooq.tables.records.MarcRecordsLbRecord; import org.jooq.DSLContext; import org.jooq.Field; -import org.jooq.InsertValuesStepN; import org.jooq.JSONB; import org.jooq.Record1; import org.jooq.Table; @@ -28,10 +29,10 @@ import javax.ws.rs.NotFoundException; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -41,6 +42,7 @@ import java.util.stream.Collectors; import static java.lang.String.format; +import static org.folio.rest.jooq.Tables.MARC_INDEXERS; import static org.folio.rest.jooq.Tables.MARC_RECORDS_TRACKING; import static org.jooq.impl.DSL.field; import static org.jooq.impl.DSL.name; @@ -63,7 +65,6 @@ public final class ParsedRecordDaoUtil { public static final String PARSED_RECORD_NOT_FOUND_TEMPLATE = "Parsed Record with id '%s' was not found"; public static final String PARSED_RECORD_CONTENT = "parsed_record_content"; private static final MarcIndexersUpdatedIds UPDATE_MARC_INDEXERS_TEMP_TABLE = new MarcIndexersUpdatedIds(); - public static final MarcIndexers MARC_INDEXERS_TABLE = new MarcIndexers(); public static class MarcIndexersUpdatedIds extends TableImpl> { public final TableField, UUID> MARC_ID = createField(DSL.name("marc_id"), SQLDataType.UUID); @@ -73,27 +74,6 @@ private MarcIndexersUpdatedIds() { } } - public static class MarcIndexers extends TableImpl { - public final TableField FIELD_NO = - createField(name("field_no"), SQLDataType.VARCHAR); - public final TableField IND1 = - createField(name("ind1"), SQLDataType.VARCHAR); - public final TableField IND2 = - createField(name("ind2"), SQLDataType.VARCHAR); - public final TableField SUBFIELD_NO = - createField(name("subfield_no"), SQLDataType.VARCHAR); - public final TableField VALUE = - createField(name("value"), SQLDataType.VARCHAR); - public final TableField MARC_ID = - createField(DSL.name("marc_id"), SQLDataType.UUID); - public final TableField VERSION = - createField(name("version"), SQLDataType.INTEGER); - - private MarcIndexers() { - super(DSL.name("marc_indexers")); - } - } - private ParsedRecordDaoUtil() { } /** @@ -121,20 +101,24 @@ public static Future> findById(ReactiveClassicGenericQuer * @param recordType record type to save * @return future with updated ParsedRecord */ - public static Future save(ReactiveClassicGenericQueryExecutor queryExecutor, + public static Future save(CustomReactiveQueryExecutor queryExecutor, ParsedRecord parsedRecord, RecordType recordType) { UUID id = UUID.fromString(parsedRecord.getId()); JsonObject content = normalize(parsedRecord.getContent()); - return queryExecutor.executeAny(dsl -> dsl.insertInto(table(name(recordType.getTableName()))) - .set(ID_FIELD, id) - .set(CONTENT_FIELD, content) - .onConflict(ID_FIELD) - .doUpdate() - .set(CONTENT_FIELD, content) - .returning()) - .compose(res -> - updateMarcIndexersTableAsync(queryExecutor, recordType, id, JSONB.valueOf(content.encode())) - .compose(ar -> Future.succeededFuture(res)) + UpsertMarcRecord upsertMarcRecord = new UpsertMarcRecord(); + upsertMarcRecord.setRecordId(id); + upsertMarcRecord.setContent(content); + return queryExecutor.executeAny(dsl -> dsl.select(upsertMarcRecord.asField())) + .compose(res -> { + Row row = res.iterator().next(); + if(row == null) { + return Future.failedFuture( + String.format("save:: a version was not returned upon upsert of a marc record marRecordId=%s", id)); + } + Integer version = row.getInteger(0); + return updateMarcIndexersTableAsync(queryExecutor, recordType, id, content, version) + .compose(ar -> Future.succeededFuture(res)); + } ) .map(res -> parsedRecord .withContent(content.getMap())); @@ -149,16 +133,24 @@ public static Future save(ReactiveClassicGenericQueryExecutor quer * @param recordType record type to update * @return future of updated ParsedRecord */ - public static Future update(ReactiveClassicGenericQueryExecutor queryExecutor, + public static Future update(CustomReactiveQueryExecutor queryExecutor, ParsedRecord parsedRecord, RecordType recordType) { UUID id = UUID.fromString(parsedRecord.getId()); JsonObject content = normalize(parsedRecord.getContent()); - return queryExecutor.executeAny(dsl -> dsl.update(table(name(recordType.getTableName()))) - .set(CONTENT_FIELD, content) - .where(ID_FIELD.eq(id))) - .compose(res -> - updateMarcIndexersTableAsync(queryExecutor, recordType, id, JSONB.valueOf(content.encode())) - .compose(ar -> Future.succeededFuture(res)) + UpsertMarcRecord upsertMarcRecord = new UpsertMarcRecord(); + upsertMarcRecord.setRecordId(id); + upsertMarcRecord.setContent(content); + return queryExecutor.executeAny(dsl -> dsl.select(upsertMarcRecord.asField())) + .compose(res -> { + Row row = res.iterator().next(); + if (row == null) { + return Future.failedFuture( + String.format("update:: a version was not returned upon upsert of a marc record marRecordId=%s", id)); + } + Integer version = row.getInteger(0); + return updateMarcIndexersTableAsync(queryExecutor, recordType, id, content, version) + .compose(ar -> Future.succeededFuture(res)); + } ) .map(update -> { if (update.rowCount() > 0) { @@ -220,14 +212,14 @@ public static void updateMarcIndexersTableSync(DSLContext dsl, RecordType record .map(record -> { JSONB jsonb = parsedRecords.get(record.value1()); if (jsonb != null) { - return createMarcIndexerRecord(dsl, record.value1(), jsonb.data(), record.value2()); + return createMarcIndexerRecords(dsl, record.value1(), new JsonObject(jsonb.data()), record.value2()); } return Collections.emptyList(); }) .flatMap(Collection::stream) .collect(Collectors.toList()); - dsl.loadInto(MARC_INDEXERS_TABLE) + dsl.loadInto(MARC_INDEXERS) .batchAfter(250) .onErrorAbort() .loadRecords(indexers) @@ -252,49 +244,39 @@ public static void updateMarcIndexersTableSync(DSLContext dsl, RecordType record * * @throws RuntimeException if the MARC record with the provided id cannot be found or there are multiple such records. */ - public static Future updateMarcIndexersTableAsync(ReactiveClassicGenericQueryExecutor queryExecutor, + public static Future updateMarcIndexersTableAsync(CustomReactiveQueryExecutor queryExecutor, RecordType recordType, UUID objectId, - JSONB content) { + JsonObject content, + Integer version) { if (!recordType.getTableName().equals(RecordType.MARC_BIB.getTableName())) { return Future.succeededFuture(false); } - return queryExecutor.query(dsl -> - // get marc versions - dsl - .select(MARC_RECORDS_TRACKING.MARC_ID, MARC_RECORDS_TRACKING.VERSION, MARC_RECORDS_TRACKING.IS_DIRTY) - .from(MARC_RECORDS_TRACKING) - .where(MARC_RECORDS_TRACKING.MARC_ID.eq(objectId))) - .compose(ar -> { - if (ar.stream().count() != 1) { - throw new RuntimeException("Could not get version for marc record with id=" + objectId); - } - UUID marcId = ar.get(0, UUID.class); - Integer marcVersion = ar.get(1, Integer.class); - boolean isDirty = ar.get(2, Boolean.class); - if(!isDirty) - { - return Future.succeededFuture(false); - } - // insert marc indexers records - return queryExecutor.execute(dsl -> { - Collection marcIndexerRecords = - createMarcIndexerRecord(dsl, marcId, content.data(), marcVersion); - InsertValuesStepN insertStep = null; - - for (var record : marcIndexerRecords) { - if (insertStep == null) { - insertStep = dsl.insertInto(MARC_INDEXERS_TABLE) - .values(record.intoArray()); - continue; - } - insertStep = insertStep.values(record); - } - return insertStep; - }).map(true); - }) - .compose(Future::succeededFuture); + DSLContext dslContext = DSL.using(queryExecutor.configuration()); + Collection marcIndexerRecords = + createMarcIndexerRecords(dslContext, objectId, content, version); + return queryExecutor.getDelegate(sqlClient -> { + List batch = new ArrayList<>(); + marcIndexerRecords.stream() + .forEach(r -> batch.add(Tuple.of(r.value1(), + r.value2(), + r.value3(), + r.value4(), + r.value5(), + r.value6(), + r.value7()))); + + String sql = queryExecutor.toPreparedQuery( + dslContext. + insertInto(MARC_INDEXERS, MARC_INDEXERS.fields()) + .values(MARC_INDEXERS.newRecord().intoArray())); + + // Execute the prepared batch + return sqlClient + .preparedQuery(sql) + .executeBatch(batch); + }).map(true); } /** @@ -303,62 +285,39 @@ public static Future updateMarcIndexersTableAsync(ReactiveClassicGeneri * difference between this java version and the sql version are as follows: * - all valued are trimmed */ - protected static Set - createMarcIndexerRecord(DSLContext dsl, UUID marcId, String content, int version) { - ObjectMapper objectMapper = new ObjectMapper(); - JsonNode jsonObject = null; - try { - jsonObject = objectMapper.readTree(content); - } catch (JsonProcessingException e) { - throw new IllegalArgumentException("Error while parsing some content to generate marc_indexers records", e); - } - - JsonNode fieldsArray = jsonObject.get("fields"); + protected static Collection + createMarcIndexerRecords(DSLContext dsl, UUID marcId, JsonObject content, int version) { + JsonArray fieldsArray = content.getJsonArray("fields"); if (fieldsArray == null) { throw new IllegalArgumentException("Content does not contain 'fields' property"); } - Set indexerRecords = new HashSet<>(); - - for (JsonNode field : fieldsArray) { - Iterator> fieldIterator = field.fields(); - while (fieldIterator.hasNext()) { - Map.Entry fieldEntry = fieldIterator.next(); - String fieldNo = fieldEntry.getKey().toLowerCase(); - JsonNode fieldValue = fieldEntry.getValue(); - String ind1 = fieldValue.has("ind1") && !fieldValue.get("ind1").asText().trim().isEmpty() - ? fieldValue.get("ind1").asText().trim() : "#"; - String ind2 = fieldValue.has("ind2") && !fieldValue.get("ind2").asText().trim().isEmpty() - ? fieldValue.get("ind2").asText().trim() : "#"; - - if (fieldValue.has("subfields")) { - JsonNode subfieldsArray = fieldValue.get("subfields"); - for (JsonNode subfield : subfieldsArray) { - Iterator> subfieldIterator = subfield.fields(); - while (subfieldIterator.hasNext()) { - Map.Entry subfieldEntry = subfieldIterator.next(); - String subfieldNo = subfieldEntry.getKey(); - String subfieldValue = subfieldEntry.getValue().asText().trim().replaceAll("\"", ""); - var record = dsl.newRecord(MARC_INDEXERS_TABLE); - record.setValue(MARC_INDEXERS_TABLE.FIELD_NO, fieldNo); - record.setValue(MARC_INDEXERS_TABLE.IND1, ind1); - record.setValue(MARC_INDEXERS_TABLE.IND2, ind2); - record.setValue(MARC_INDEXERS_TABLE.SUBFIELD_NO, subfieldNo); - record.setValue(MARC_INDEXERS_TABLE.VALUE, subfieldValue); - record.setValue(MARC_INDEXERS_TABLE.MARC_ID, marcId); - record.setValue(MARC_INDEXERS_TABLE.VERSION, version); + Set indexerRecords = new HashSet<>(); + + for (int i = 0; i < fieldsArray.size(); i++) { + JsonObject field = fieldsArray.getJsonObject(i); + + for (String fieldNo : field.fieldNames()) { + Object fieldValueObj = field.getValue(fieldNo); + JsonObject fieldValue = fieldValueObj instanceof JsonObject ? (JsonObject) fieldValueObj: null; + String ind1 = fieldValue != null && fieldValue.containsKey("ind1") && !StringUtils.isBlank(fieldValue.getString("ind1")) + ? fieldValue.getString("ind1").trim() : "#"; + String ind2 = fieldValue != null && fieldValue.containsKey("ind2") && !StringUtils.isBlank(fieldValue.getString("ind2")) + ? fieldValue.getString("ind2").trim() : "#"; + + if (fieldValue != null && fieldValue.containsKey("subfields")) { + JsonArray subfieldsArray = fieldValue.getJsonArray("subfields"); + for (int j = 0; j < subfieldsArray.size(); j++) { + JsonObject subfield = subfieldsArray.getJsonObject(j); + for (String subfieldNo : subfield.fieldNames()) { + String subfieldValue = subfield.getString(subfieldNo); + subfieldValue = trimQuotes(subfieldValue.trim()); + MarcIndexersRecord record = createMarcIndexersRecord(dsl, marcId, version, fieldNo, ind1, ind2, subfieldNo, subfieldValue); indexerRecords.add(record); } } } else { - String value = fieldValue.textValue().trim().replaceAll("\"", ""); - var record = dsl.newRecord(MARC_INDEXERS_TABLE); - record.setValue(MARC_INDEXERS_TABLE.FIELD_NO, fieldNo); - record.setValue(MARC_INDEXERS_TABLE.IND1, ind1); - record.setValue(MARC_INDEXERS_TABLE.IND2, ind2); - record.setValue(MARC_INDEXERS_TABLE.SUBFIELD_NO, "0"); - record.setValue(MARC_INDEXERS_TABLE.VALUE, value); - record.setValue(MARC_INDEXERS_TABLE.MARC_ID, marcId); - record.setValue(MARC_INDEXERS_TABLE.VERSION, version); + String value = trimQuotes(fieldValueObj.toString().trim()); + MarcIndexersRecord record = createMarcIndexersRecord(dsl, marcId, version, fieldNo, ind1, ind2, "0", value); indexerRecords.add(record); } } @@ -367,6 +326,25 @@ var record = dsl.newRecord(MARC_INDEXERS_TABLE); return indexerRecords; } + private static MarcIndexersRecord createMarcIndexersRecord(DSLContext dsl, UUID marcId, int version, String fieldNo, String ind1, String ind2, String subfieldNo, String value) { + MarcIndexersRecord record = dsl.newRecord(MARC_INDEXERS); + record.setFieldNo(fieldNo); + record.setInd1(ind1); + record.setInd2(ind2); + record.setSubfieldNo(subfieldNo); + record.setValue(value); + record.setMarcId(marcId); + record.setVersion(version); + return record; + } + + private static String trimQuotes(String value) { + if (value.startsWith("\"") && value.endsWith("\"")) { + return value.substring(1, value.length() - 1); + } + return value; + } + /** * Convert database query result {@link Row} to {@link ParsedRecord} * diff --git a/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/InitAPIImpl.java b/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/InitAPIImpl.java index 6c4205e9a..854c91cdc 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/InitAPIImpl.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/rest/impl/InitAPIImpl.java @@ -10,6 +10,8 @@ import io.vertx.core.spi.VerticleFactory; import java.util.List; import java.util.OptionalInt; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.folio.config.ApplicationConfig; @@ -34,6 +36,7 @@ public class InitAPIImpl implements InitAPI { private static final String SPRING_CONTEXT = "springContext"; private static final Logger LOGGER = LogManager.getLogger(); + private static final AtomicBoolean shouldInit = new AtomicBoolean(true); @Autowired private KafkaConfig kafkaConfig; @@ -64,21 +67,26 @@ public void init(Vertx vertx, Context context, Handler> han try { SpringContextUtil.init(vertx, context, ApplicationConfig.class); SpringContextUtil.autowireDependencies(this, context); - AbstractApplicationContext springContext = vertx.getOrCreateContext().get(SPRING_CONTEXT); - VerticleFactory verticleFactory = springContext.getBean(SpringVerticleFactory.class); - vertx.registerVerticleFactory(verticleFactory); - - EventManager.registerKafkaEventPublisher(kafkaConfig, vertx, maxDistributionNumber); - - registerEventHandlers(); - deployMarcIndexersVersionDeletionVerticle(vertx, verticleFactory); - deployConsumerVerticles(vertx, verticleFactory).onComplete(ar -> { - if (ar.succeeded()) { - handler.handle(Future.succeededFuture(true)); - } else { - handler.handle(Future.failedFuture(ar.cause())); - } - }); + if(shouldInit.compareAndSet(true, false)) { + // only one verticle should perform the actions below + AbstractApplicationContext springContext = vertx.getOrCreateContext().get(SPRING_CONTEXT); + VerticleFactory verticleFactory = springContext.getBean(SpringVerticleFactory.class); + vertx.registerVerticleFactory(verticleFactory); + + EventManager.registerKafkaEventPublisher(kafkaConfig, vertx, maxDistributionNumber); + + registerEventHandlers(); + deployMarcIndexersVersionDeletionVerticle(vertx, verticleFactory); + deployConsumerVerticles(vertx, verticleFactory).onComplete(ar -> { + if (ar.succeeded()) { + handler.handle(Future.succeededFuture(true)); + } else { + handler.handle(Future.failedFuture(ar.cause())); + } + }); + } else { + handler.handle(Future.succeededFuture(true)); + } } catch (Throwable th) { LOGGER.error("init:: Failed to init module", th); handler.handle(Future.failedFuture(th)); diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java index 1b971d89a..9f6a03c48 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java @@ -26,10 +26,13 @@ import java.util.Objects; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.ws.rs.BadRequestException; import javax.ws.rs.NotFoundException; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import io.reactivex.Flowable; import io.vertx.core.AsyncResult; import io.vertx.core.Future; @@ -94,12 +97,18 @@ public class RecordServiceImpl implements RecordService { public static final String UPDATE_RECORD_DUPLICATE_EXCEPTION = "Incoming record could be a duplicate, incoming record generation should not be the same as matched record generation and the execution of job should be started after of creating the previous record generation"; public static final char SUBFIELD_S = 's'; public static final char INDICATOR = 'f'; + private record SnapshotCacheKey(String tenantId, String snapshotId){} private final RecordDao recordDao; + private final Cache validSnapshotCache; @Autowired public RecordServiceImpl(final RecordDao recordDao) { this.recordDao = recordDao; + this.validSnapshotCache = Caffeine.newBuilder() + .expireAfterWrite(10, TimeUnit.MINUTES) + .maximumSize(100) + .build(); } @Override @@ -123,32 +132,42 @@ public Future saveRecord(Record record, String tenantId) { LOG.debug("saveRecord:: Saving record with id: {} for tenant: {}", record.getId(), tenantId); ensureRecordHasId(record); ensureRecordHasSuppressDiscovery(record); - return recordDao.executeInTransaction(txQE -> SnapshotDaoUtil.findById(txQE, record.getSnapshotId()) - .map(optionalSnapshot -> optionalSnapshot - .orElseThrow(() -> new NotFoundException(format(SNAPSHOT_NOT_FOUND_TEMPLATE, record.getSnapshotId())))) - .compose(snapshot -> { - if (Objects.isNull(snapshot.getProcessingStartedDate())) { - return Future.failedFuture(new BadRequestException(format(SNAPSHOT_NOT_STARTED_MESSAGE_TEMPLATE, snapshot.getStatus()))); - } + SnapshotCacheKey snapshotCacheKey = new SnapshotCacheKey(tenantId, record.getSnapshotId()); + Snapshot validSnapshot = validSnapshotCache.getIfPresent(snapshotCacheKey); + return recordDao.executeInTransaction(txQE -> Future.succeededFuture() + .compose(notUsed -> { + if (validSnapshot == null) { + return SnapshotDaoUtil.findById(txQE, record.getSnapshotId()) + .map(optionalSnapshot -> optionalSnapshot + .orElseThrow(() -> new NotFoundException(format(SNAPSHOT_NOT_FOUND_TEMPLATE, record.getSnapshotId())))) + .compose(snapshot -> { + if (Objects.isNull(snapshot.getProcessingStartedDate())) { + return Future.failedFuture(new BadRequestException(format(SNAPSHOT_NOT_STARTED_MESSAGE_TEMPLATE, snapshot.getStatus()))); + } + validSnapshotCache.put(snapshotCacheKey, snapshot); + return Future.succeededFuture(); + }); + } else { return Future.succeededFuture(); - }) - .compose(v -> setMatchedIdForRecord(record, tenantId)) - .compose(r -> { - if (Objects.isNull(r.getGeneration())) { - return recordDao.calculateGeneration(txQE, r); - } - return Future.succeededFuture(r.getGeneration()); - }) - .compose(generation -> { - if (generation > 0) { - return recordDao.getRecordByMatchedId(txQE, record.getMatchedId()) - .compose(optionalMatchedRecord -> optionalMatchedRecord - .map(matchedRecord -> recordDao.saveUpdatedRecord(txQE, ensureRecordForeignKeys(record.withGeneration(generation)), matchedRecord.withState(Record.State.OLD))) - .orElseGet(() -> recordDao.saveRecord(txQE, ensureRecordForeignKeys(record.withGeneration(generation))))); - } else { - return recordDao.saveRecord(txQE, ensureRecordForeignKeys(record.withGeneration(generation))); - } - }), tenantId) + } + }) + .compose(v -> setMatchedIdForRecord(record, tenantId)) + .compose(r -> { + if (Objects.isNull(r.getGeneration())) { + return recordDao.calculateGeneration(txQE, r); + } + return Future.succeededFuture(r.getGeneration()); + }) + .compose(generation -> { + if (generation > 0) { + return recordDao.getRecordByMatchedId(txQE, record.getMatchedId()) + .compose(optionalMatchedRecord -> optionalMatchedRecord + .map(matchedRecord -> recordDao.saveUpdatedRecord(txQE, ensureRecordForeignKeys(record.withGeneration(generation)), matchedRecord.withState(Record.State.OLD))) + .orElseGet(() -> recordDao.saveRecord(txQE, ensureRecordForeignKeys(record.withGeneration(generation))))); + } else { + return recordDao.saveRecord(txQE, ensureRecordForeignKeys(record.withGeneration(generation))); + } + }), tenantId) .recover(RecordServiceImpl::mapToDuplicateExceptionIfNeeded); } diff --git a/mod-source-record-storage-server/src/main/resources/liquibase/tenant/changelog.xml b/mod-source-record-storage-server/src/main/resources/liquibase/tenant/changelog.xml index 18f545e45..40415b980 100644 --- a/mod-source-record-storage-server/src/main/resources/liquibase/tenant/changelog.xml +++ b/mod-source-record-storage-server/src/main/resources/liquibase/tenant/changelog.xml @@ -71,5 +71,6 @@ + diff --git a/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2024-06-26--16-00-upsert-marc-indexers.xml b/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2024-06-26--16-00-upsert-marc-indexers.xml new file mode 100644 index 000000000..7471ae0a2 --- /dev/null +++ b/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2024-06-26--16-00-upsert-marc-indexers.xml @@ -0,0 +1,23 @@ + + + + CREATE OR REPLACE FUNCTION ${database.defaultSchemaName}.upsert_marc_record( + record_id UUID, + content JSONB + ) + RETURNS INTEGER AS $$ + INSERT INTO ${database.defaultSchemaName}.marc_records_lb (id, content) + VALUES (record_id, content) + ON CONFLICT (id) DO UPDATE SET content = EXCLUDED.content; + + SELECT version + FROM ${database.defaultSchemaName}.marc_records_tracking + WHERE marc_id = record_id; + $$ LANGUAGE sql; + + + diff --git a/mod-source-record-storage-server/src/test/java/org/folio/dao/util/ParsedRecordDaoUtilTest.java b/mod-source-record-storage-server/src/test/java/org/folio/dao/util/ParsedRecordDaoUtilTest.java index 72b4a32bd..48c9edfc7 100644 --- a/mod-source-record-storage-server/src/test/java/org/folio/dao/util/ParsedRecordDaoUtilTest.java +++ b/mod-source-record-storage-server/src/test/java/org/folio/dao/util/ParsedRecordDaoUtilTest.java @@ -1,9 +1,11 @@ package org.folio.dao.util; +import io.vertx.core.json.JsonObject; import io.vertx.ext.unit.junit.VertxUnitRunner; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVRecord; import org.folio.TestUtil; +import org.folio.rest.jooq.tables.records.MarcIndexersRecord; import org.jooq.DSLContext; import org.jooq.Record7; import org.jooq.SQLDialect; @@ -13,12 +15,13 @@ import java.io.FileReader; import java.io.Reader; +import java.util.Collection; import java.util.HashSet; import java.util.Set; import java.util.UUID; import static org.apache.commons.csv.CSVFormat.EXCEL; -import static org.folio.dao.util.ParsedRecordDaoUtil.MARC_INDEXERS_TABLE; +import static org.folio.rest.jooq.Tables.MARC_INDEXERS; import static org.junit.Assert.assertEquals; @RunWith(VertxUnitRunner.class) @@ -42,31 +45,31 @@ public class ParsedRecordDaoUtilTest { * while the SQL version did not. */ @Test - public void createMarcIndexerRecord() throws Exception { + public void createMarcIndexerRecords() throws Exception { String content = TestUtil.readFileFromPath(PARSED_MARC_RECORD_SAMPLE_PATH); - Set> expected = parseCSV(MARC_INDEXER_SAMPLE_PATH); + Collection> expected = parseCSV(MARC_INDEXER_SAMPLE_PATH); - Set records = - ParsedRecordDaoUtil.createMarcIndexerRecord(DSL_CONTEXT, MARC_ID, content, VERSION); + Collection records = + ParsedRecordDaoUtil.createMarcIndexerRecords(DSL_CONTEXT, MARC_ID, new JsonObject(content), VERSION); assertEquals(expected, records); } @Test(expected = IllegalArgumentException.class) - public void badParsedRecord() throws Exception { + public void badParsedRecord() { String content = TestUtil.readFileFromPath(PARSED_MARC_RECORD_BAD_SAMPLE_PATH); - ParsedRecordDaoUtil.createMarcIndexerRecord(DSL_CONTEXT, MARC_ID, content, VERSION); + ParsedRecordDaoUtil.createMarcIndexerRecords(DSL_CONTEXT, MARC_ID, new JsonObject(content), VERSION); } - @Test(expected = IllegalArgumentException.class) - public void notJsonContent() throws Exception { + @Test(expected = io.vertx.core.json.DecodeException.class) + public void notJsonContent() { String content = "This is a not a parsed record"; - ParsedRecordDaoUtil.createMarcIndexerRecord(DSL_CONTEXT, MARC_ID, content, VERSION); + ParsedRecordDaoUtil.createMarcIndexerRecords(DSL_CONTEXT, MARC_ID, new JsonObject(content), VERSION); } - private Set> parseCSV(String filePath) throws Exception { + private Collection> parseCSV(String filePath) throws Exception { Set> records = new HashSet<>(); try (Reader in = new FileReader(filePath)) { @@ -103,13 +106,13 @@ private Record7 createRec Integer col7 ) { return DSL_CONTEXT.newRecord( - MARC_INDEXERS_TABLE.FIELD_NO, - MARC_INDEXERS_TABLE.IND1, - MARC_INDEXERS_TABLE.IND2, - MARC_INDEXERS_TABLE.SUBFIELD_NO, - MARC_INDEXERS_TABLE.VALUE, - MARC_INDEXERS_TABLE.MARC_ID, - MARC_INDEXERS_TABLE.VERSION) + MARC_INDEXERS.FIELD_NO, + MARC_INDEXERS.IND1, + MARC_INDEXERS.IND2, + MARC_INDEXERS.SUBFIELD_NO, + MARC_INDEXERS.VALUE, + MARC_INDEXERS.MARC_ID, + MARC_INDEXERS.VERSION) .values(col1, col2, col3, col4, col5, col6, col7); } } From cad85166fdb499435055fc08cdec9be06b752a61 Mon Sep 17 00:00:00 2001 From: Olamide Kolawole Date: Mon, 1 Jul 2024 18:42:45 -0500 Subject: [PATCH 5/8] MODSOURCE-664 Fix update method to use different path --- mod-source-record-storage-server/pom.xml | 5 +++ .../folio/dao/util/ParsedRecordDaoUtil.java | 20 +++++------ ...2024-06-26--16-00-upsert-marc-indexers.xml | 36 +++++++++++++++++++ 3 files changed, 51 insertions(+), 10 deletions(-) diff --git a/mod-source-record-storage-server/pom.xml b/mod-source-record-storage-server/pom.xml index 4db53a326..138c5f935 100644 --- a/mod-source-record-storage-server/pom.xml +++ b/mod-source-record-storage-server/pom.xml @@ -505,6 +505,11 @@ upsert_marc_record.content io.github.jklingsporn.vertx.jooq.shared.postgres.JSONBToJsonObjectConverter + + io.vertx.core.json.JsonObject + update_marc_record.content + io.github.jklingsporn.vertx.jooq.shared.postgres.JSONBToJsonObjectConverter + diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java index b4354c269..fee2c33b3 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java @@ -12,6 +12,7 @@ import org.folio.rest.jaxrs.model.ErrorRecord; import org.folio.rest.jaxrs.model.ParsedRecord; import org.folio.rest.jaxrs.model.Record; +import org.folio.rest.jooq.routines.UpdateMarcRecord; import org.folio.rest.jooq.routines.UpsertMarcRecord; import org.folio.rest.jooq.tables.records.EdifactRecordsLbRecord; import org.folio.rest.jooq.tables.records.MarcIndexersRecord; @@ -137,19 +138,18 @@ public static Future update(CustomReactiveQueryExecutor queryExecu ParsedRecord parsedRecord, RecordType recordType) { UUID id = UUID.fromString(parsedRecord.getId()); JsonObject content = normalize(parsedRecord.getContent()); - UpsertMarcRecord upsertMarcRecord = new UpsertMarcRecord(); - upsertMarcRecord.setRecordId(id); - upsertMarcRecord.setContent(content); - return queryExecutor.executeAny(dsl -> dsl.select(upsertMarcRecord.asField())) + UpdateMarcRecord updateMarcRecord = new UpdateMarcRecord(); + updateMarcRecord.setRecordId(id); + updateMarcRecord.setContent(content); + return queryExecutor.executeAny(dsl -> dsl.select(updateMarcRecord.asField())) .compose(res -> { Row row = res.iterator().next(); - if (row == null) { - return Future.failedFuture( - String.format("update:: a version was not returned upon upsert of a marc record marRecordId=%s", id)); + if (row != null) { + Integer version = row.getInteger(0); + return updateMarcIndexersTableAsync(queryExecutor, recordType, id, content, version) + .map(res); } - Integer version = row.getInteger(0); - return updateMarcIndexersTableAsync(queryExecutor, recordType, id, content, version) - .compose(ar -> Future.succeededFuture(res)); + return Future.succeededFuture(res); } ) .map(update -> { diff --git a/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2024-06-26--16-00-upsert-marc-indexers.xml b/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2024-06-26--16-00-upsert-marc-indexers.xml index 7471ae0a2..bc1d7508c 100644 --- a/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2024-06-26--16-00-upsert-marc-indexers.xml +++ b/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2024-06-26--16-00-upsert-marc-indexers.xml @@ -20,4 +20,40 @@ $$ LANGUAGE sql; + + +CREATE OR REPLACE FUNCTION ${database.defaultSchemaName}.update_marc_record( + record_id UUID, + content JSONB +) +RETURNS INTEGER AS +$$ +DECLARE + affected_rows INTEGER; + version INTEGER; + BEGIN + -- Perform the UPDATE statement + UPDATE ${database.defaultSchemaName}.marc_records_lb + SET content = content + WHERE id = record_id; + + -- Get the number of affected rows + GET DIAGNOSTICS affected_rows = ROW_COUNT; + + IF affected_rows > 0 THEN + -- Retrieve the version from the tracking table + SELECT version + INTO version + FROM ${database.defaultSchemaName}.marc_records_tracking + WHERE marc_id = record_id; + ELSE + version := NULL; + END IF; + + -- Return the version or NULL if no rows were affected + RETURN version; + END; +$$ LANGUAGE plpgsql; + + From 1fe720760650f6d32c5da724d518e5c986bf709f Mon Sep 17 00:00:00 2001 From: Olamide Kolawole Date: Thu, 25 Jul 2024 12:47:24 -0500 Subject: [PATCH 6/8] MODSOURCE-664 Handle update scenario --- mod-source-record-storage-server/pom.xml | 2 +- .../folio/dao/util/ParsedRecordDaoUtil.java | 2 +- .../2024-06-26--16-00-upsert-marc-indexers.xml | 18 +++++++++--------- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/mod-source-record-storage-server/pom.xml b/mod-source-record-storage-server/pom.xml index 138c5f935..9f7cc45e8 100644 --- a/mod-source-record-storage-server/pom.xml +++ b/mod-source-record-storage-server/pom.xml @@ -507,7 +507,7 @@ io.vertx.core.json.JsonObject - update_marc_record.content + update_marc_record.new_content io.github.jklingsporn.vertx.jooq.shared.postgres.JSONBToJsonObjectConverter diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java index fee2c33b3..c25fa8f3e 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java @@ -140,7 +140,7 @@ public static Future update(CustomReactiveQueryExecutor queryExecu JsonObject content = normalize(parsedRecord.getContent()); UpdateMarcRecord updateMarcRecord = new UpdateMarcRecord(); updateMarcRecord.setRecordId(id); - updateMarcRecord.setContent(content); + updateMarcRecord.setNewContent(content); return queryExecutor.executeAny(dsl -> dsl.select(updateMarcRecord.asField())) .compose(res -> { Row row = res.iterator().next(); diff --git a/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2024-06-26--16-00-upsert-marc-indexers.xml b/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2024-06-26--16-00-upsert-marc-indexers.xml index bc1d7508c..4db9c4d7b 100644 --- a/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2024-06-26--16-00-upsert-marc-indexers.xml +++ b/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2024-06-26--16-00-upsert-marc-indexers.xml @@ -24,17 +24,17 @@ CREATE OR REPLACE FUNCTION ${database.defaultSchemaName}.update_marc_record( record_id UUID, - content JSONB + new_content JSONB ) RETURNS INTEGER AS $$ DECLARE - affected_rows INTEGER; - version INTEGER; - BEGIN + affected_rows INTEGER; + curr_version INTEGER; + BEGIN -- Perform the UPDATE statement UPDATE ${database.defaultSchemaName}.marc_records_lb - SET content = content + SET content = new_content WHERE id = record_id; -- Get the number of affected rows @@ -43,16 +43,16 @@ DECLARE IF affected_rows > 0 THEN -- Retrieve the version from the tracking table SELECT version - INTO version + INTO curr_version FROM ${database.defaultSchemaName}.marc_records_tracking WHERE marc_id = record_id; ELSE - version := NULL; + curr_version := NULL; END IF; -- Return the version or NULL if no rows were affected - RETURN version; - END; + RETURN curr_version; + END; $$ LANGUAGE plpgsql; From 5e748dfe4afa7bef1c2cfba852b2ff0ed41d72c0 Mon Sep 17 00:00:00 2001 From: Olamide Kolawole Date: Fri, 26 Jul 2024 09:39:47 -0500 Subject: [PATCH 7/8] MODSOURCE-664 dont query tracking tab when inserting --- ...2024-06-26--16-00-upsert-marc-indexers.xml | 41 +++++++++++++------ 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2024-06-26--16-00-upsert-marc-indexers.xml b/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2024-06-26--16-00-upsert-marc-indexers.xml index 4db9c4d7b..8326c74e1 100644 --- a/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2024-06-26--16-00-upsert-marc-indexers.xml +++ b/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2024-06-26--16-00-upsert-marc-indexers.xml @@ -5,19 +5,36 @@ http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.0.xsd"> - CREATE OR REPLACE FUNCTION ${database.defaultSchemaName}.upsert_marc_record( - record_id UUID, - content JSONB - ) - RETURNS INTEGER AS $$ - INSERT INTO ${database.defaultSchemaName}.marc_records_lb (id, content) - VALUES (record_id, content) - ON CONFLICT (id) DO UPDATE SET content = EXCLUDED.content; +CREATE OR REPLACE FUNCTION ${database.defaultSchemaName}.upsert_marc_record( + record_id UUID, + content JSONB +) +RETURNS INTEGER AS $$ +DECLARE + v_version INTEGER; + v_was_updated BOOLEAN; +BEGIN + -- Perform the UPSERT operation and determine if it was an update + WITH upserted AS ( + INSERT INTO ${database.defaultSchemaName}.marc_records_lb (id, content) + VALUES (record_id, content) + ON CONFLICT (id) DO UPDATE + SET content = EXCLUDED.content + RETURNING (xmax <> 0) AS was_updated -- TRUE if it was an update, FALSE for insert + ) + SELECT was_updated INTO v_was_updated FROM upserted; -- Get the was_updated value - SELECT version - FROM ${database.defaultSchemaName}.marc_records_tracking - WHERE marc_id = record_id; - $$ LANGUAGE sql; + -- Decide what to return based on whether it was an insert or an update + IF was_updated THEN + SELECT version INTO v_version + FROM ${database.defaultSchemaName}.marc_records_tracking + WHERE marc_id = record_id; + RETURN v_version; + ELSE + RETURN 0; + END IF; +END; +$$ LANGUAGE plpgsql; From 28a8f18da30028c53a9a972134a2f47acec7bf9d Mon Sep 17 00:00:00 2001 From: Olamide Kolawole Date: Tue, 30 Jul 2024 14:15:15 -0500 Subject: [PATCH 8/8] MODSOURCE-664 missed variable --- .../scripts/v-5.7.0/2024-06-26--16-00-upsert-marc-indexers.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2024-06-26--16-00-upsert-marc-indexers.xml b/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2024-06-26--16-00-upsert-marc-indexers.xml index 8326c74e1..15daa8c51 100644 --- a/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2024-06-26--16-00-upsert-marc-indexers.xml +++ b/mod-source-record-storage-server/src/main/resources/liquibase/tenant/scripts/v-5.7.0/2024-06-26--16-00-upsert-marc-indexers.xml @@ -25,7 +25,7 @@ BEGIN SELECT was_updated INTO v_was_updated FROM upserted; -- Get the was_updated value -- Decide what to return based on whether it was an insert or an update - IF was_updated THEN + IF v_was_updated THEN SELECT version INTO v_version FROM ${database.defaultSchemaName}.marc_records_tracking WHERE marc_id = record_id;