Skip to content

Commit

Permalink
MODSOURCE-832 - In Handling Authority-Instance links events, make upd…
Browse files Browse the repository at this point in the history
…ating Bib fields consistent for having two or more event consumers. (#665)

* MODSOURCE-832: add consistent authority-instance link event handling and updating for same Marc Bib records by two or more consumers (module instances)

* [maven-release-plugin] prepare release v5.9.6

* [maven-release-plugin] prepare for next development iteration
  • Loading branch information
mukhiddin-yusuf authored Jan 23, 2025
1 parent 84dd449 commit ce32959
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 100 deletions.
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 2025-01-23 5.9.6
* [MODSOURCE-832](https://folio-org.atlassian.net/browse/MODSOURCE-832) Add consistent handling and updating for same Marc Bib records linked to Authority by two or more consumers

## 2025-01-09 5.9.5
* [MODSOURCE-847](https://folio-org.atlassian.net/browse/MODSOURCE-847) Second update of the same MARC authority / MARC holdings record completes with errors

Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Version 2.0. See the file "[LICENSE](LICENSE)" for more information.
* [Docker](#docker)
* [Installing the module](#installing-the-module)
* [Deploying the module](#deploying-the-module)
* [Interaction with Kafka](#interaction-with-kafka)
* [Database schemas](#database-schemas)
* [How to fill module with data for testing purposes](https://wiki.folio.org/x/G6bc)

Expand Down Expand Up @@ -98,11 +99,12 @@ curl -w '\n' -X POST -D - \

## Interaction with Kafka


There are several properties that should be set for modules that interact with Kafka: **KAFKA_HOST, KAFKA_PORT, OKAPI_URL, ENV**(unique env ID).
After setup, it is good to check logs in all related modules for errors. Data import consumers and producers work in separate verticles that are set up in RMB's InitAPI for each module. That would be the first place to check deploy/install logs.

**Environment variables** that can be adjusted for this module and default values:
* Relevant from the **Ramsons** release, module versions from 5.9.6:
* "AUTHORITY_TO_BIB_LINK_CHANGE_HANDLER_RETRY_COUNT": 5
* Relevant from the **Iris** release, module versions from 5.0.0:
* "_srs.kafka.ParsedMarcChunkConsumer.instancesNumber_": 1
* "_srs.kafka.DataImportConsumer.instancesNumber_": 1
Expand Down
2 changes: 1 addition & 1 deletion mod-source-record-storage-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.folio</groupId>
<artifactId>mod-source-record-storage</artifactId>
<version>5.9.6-SNAPSHOT</version>
<version>5.9.7-SNAPSHOT</version>
</parent>

<properties>
Expand Down
2 changes: 1 addition & 1 deletion mod-source-record-storage-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.folio</groupId>
<artifactId>mod-source-record-storage</artifactId>
<version>5.9.6-SNAPSHOT</version>
<version>5.9.7-SNAPSHOT</version>
</parent>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import io.vertx.kafka.client.producer.KafkaHeader;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;

import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -71,6 +73,9 @@ public class AuthorityLinkChunkKafkaHandler implements AsyncRecordHandler<String
@Value("${srs.kafka.AuthorityLinkChunkKafkaHandler.maxDistributionNum:100}")
private int maxDistributionNum;

@Value("${AUTHORITY_TO_BIB_LINK_CHANGE_HANDLER_RETRY_COUNT:5}")
private int maxBibSaveRetryCount;

public AuthorityLinkChunkKafkaHandler(RecordService recordService, KafkaConfig kafkaConfig,
SnapshotService snapshotService) {
this.kafkaConfig = kafkaConfig;
Expand All @@ -95,11 +100,15 @@ public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord)
RecordsModifierOperator recordsModifier = recordsCollection ->
this.mapRecordFieldsChanges(linksUpdate, recordsCollection, userId);

return recordService.saveRecordsByExternalIds(instanceIds, RecordType.MARC_BIB, recordsModifier, okapiHeaders)
return recordService.saveRecordsByExternalIds(instanceIds, RecordType.MARC_BIB, recordsModifier, okapiHeaders, maxBibSaveRetryCount)
.compose(recordsBatchResponse -> {
sendReports(recordsBatchResponse, linksUpdate, consumerRecord.headers());
var marcBibUpdateStats = mapRecordsToBibUpdateEvents(recordsBatchResponse, linksUpdate);
return sendEvents(marcBibUpdateStats, linksUpdate, consumerRecord);
var marcBibUpdateStats = mapRecordsToBibUpdateEventsByInstanceId(recordsBatchResponse, linksUpdate);
return Future.all(marcBibUpdateStats.entrySet().stream()
.map(entry -> sendEvents(entry.getValue(), linksUpdate, entry.getKey(), consumerRecord))
.toList()
)
.map(unused -> consumerRecord.key());
});
});

Expand Down Expand Up @@ -228,8 +237,8 @@ private List<String> extractUpdateTargetFieldCodesForInstance(BibAuthorityLinksU
.toList();
}

private List<MarcBibUpdate> mapRecordsToBibUpdateEvents(RecordsBatchResponse batchResponse,
BibAuthorityLinksUpdate event) {
private Map<String, List<MarcBibUpdate>> mapRecordsToBibUpdateEventsByInstanceId(RecordsBatchResponse batchResponse,
BibAuthorityLinksUpdate event) {
LOGGER.debug("Updated {} bibs for jobId {}, authorityId {}",
batchResponse.getTotalRecords(), event.getJobId(), event.getAuthorityId());

Expand All @@ -241,27 +250,36 @@ private List<MarcBibUpdate> mapRecordsToBibUpdateEvents(RecordsBatchResponse bat
batchResponse.getTotalRecords(), batchResponse.getRecords().size(), errors);
}

return toMarcBibUpdateEvents(batchResponse, event);
return toMarcBibUpdateEventsByInstanceId(batchResponse, event);
}

private List<MarcBibUpdate> toMarcBibUpdateEvents(RecordsBatchResponse batchResponse,
BibAuthorityLinksUpdate bibAuthorityLinksUpdate) {
private Map<String, List<MarcBibUpdate>> toMarcBibUpdateEventsByInstanceId(RecordsBatchResponse batchResponse,
BibAuthorityLinksUpdate bibAuthorityLinksUpdate) {
var instanceIdToLinkIds = bibAuthorityLinksUpdate.getUpdateTargets().stream()
.flatMap(updateTarget -> updateTarget.getLinks().stream())
.collect(Collectors.groupingBy(Link::getInstanceId, Collectors.mapping(Link::getLinkId, Collectors.toList())));

return batchResponse.getRecords().stream()
.map(bibRecord -> {
var instanceId = bibRecord.getExternalIdsHolder().getInstanceId();
return new MarcBibUpdate()
var marcBibUpdate = new MarcBibUpdate()
.withJobId(bibAuthorityLinksUpdate.getJobId())
.withLinkIds(instanceIdToLinkIds.get(instanceId))
.withTenant(bibAuthorityLinksUpdate.getTenant())
.withType(MarcBibUpdate.Type.UPDATE)
.withTs(bibAuthorityLinksUpdate.getTs())
.withRecord(bibRecord);
return Map.entry(instanceId, marcBibUpdate);
})
.toList();
.collect(Collectors.groupingBy(this::entryKey, Collectors.mapping(this::entryValue, Collectors.toList())));
}

private String entryKey(Map.Entry<String, MarcBibUpdate> entry) {
return entry.getKey();
}

private MarcBibUpdate entryValue(Map.Entry<String, MarcBibUpdate> entry) {
return entry.getValue();
}

private List<LinkUpdateReport> toFailedLinkUpdateReports(List<Record> errorRecords,
Expand Down Expand Up @@ -303,32 +321,34 @@ private void sendReports(RecordsBatchResponse batchResponse, BibAuthorityLinksUp
errorRecords.size(), event.getJobId(), event.getAuthorityId());

toFailedLinkUpdateReports(errorRecords, event).forEach(report ->
sendEventToKafka(LINKS_STATS, report.getTenant(), report.getJobId(), report, headers));
sendEventToKafka(LINKS_STATS, report.getTenant(), report.getJobId(), null, report, headers));
}
}

private Future<String> sendEvents(List<MarcBibUpdate> marcBibUpdateEvents, BibAuthorityLinksUpdate event,
private Future<String> sendEvents(List<MarcBibUpdate> marcBibUpdateEvents,
BibAuthorityLinksUpdate event,
String instanceId,
KafkaConsumerRecord<String, String> consumerRecord) {
LOGGER.info("Sending {} bib update events for jobId {}, authorityId {}",
marcBibUpdateEvents.size(), event.getJobId(), event.getAuthorityId());

return Future.fromCompletionStage(
CompletableFuture.allOf(
marcBibUpdateEvents.stream()
.map(marcBibUpdate -> sendEventToKafka(MARC_BIB, marcBibUpdate.getTenant(), marcBibUpdate.getJobId(),
marcBibUpdate, consumerRecord.headers()))
.map(bibUpdateEvent -> sendEventToKafka(MARC_BIB, bibUpdateEvent.getTenant(), bibUpdateEvent.getJobId(),
instanceId, bibUpdateEvent, consumerRecord.headers()))
.map(Future::toCompletionStage)
.map(CompletionStage::toCompletableFuture)
.toArray(CompletableFuture[]::new)
).minimalCompletionStage()
).map(unused -> consumerRecord.key());
}

private Future<Boolean> sendEventToKafka(KafkaTopic topic, String tenant, String jobId, Object marcRecord,
List<KafkaHeader> kafkaHeaders) {
private Future<Boolean> sendEventToKafka(KafkaTopic topic, String tenant, String jobId, String recordKey,
Object marcRecord, List<KafkaHeader> kafkaHeaders) {
var promise = Promise.<Boolean>promise();
try {
var kafkaRecord = createKafkaProducerRecord(topic, tenant, marcRecord, kafkaHeaders);
var kafkaRecord = createKafkaProducerRecord(topic, tenant, recordKey, marcRecord, kafkaHeaders);
producers.get(topic).write(kafkaRecord, ar -> {
if (ar.succeeded()) {
LOGGER.debug("Event with type {}, jobId {} was sent to kafka", topic.topicName(), jobId);
Expand All @@ -348,10 +368,10 @@ private Future<Boolean> sendEventToKafka(KafkaTopic topic, String tenant, String
}

private KafkaProducerRecord<String, String> createKafkaProducerRecord(KafkaTopic topic, String tenant,
Object marcRecord,
String recordKey, Object marcRecord,
List<KafkaHeader> kafkaHeaders) {
var topicName = topic.fullTopicName(kafkaConfig, tenant);
var key = String.valueOf(INDEXER.incrementAndGet() % maxDistributionNum);
var key = Optional.ofNullable(recordKey).orElse(String.valueOf(INDEXER.incrementAndGet() % maxDistributionNum));
var kafkaRecord = KafkaProducerRecord.create(topicName, key, Json.encode(marcRecord));
kafkaHeaders.removeIf(kafkaHeader -> !StringUtils.startsWithIgnoreCase(kafkaHeader.key(), "x-okapi-"));
kafkaRecord.addHeaders(kafkaHeaders);
Expand Down
Loading

0 comments on commit ce32959

Please sign in to comment.