Skip to content

Commit

Permalink
fix(batch-processing) Fix an issue with interrupting the batch events…
Browse files Browse the repository at this point in the history
… processing due to SystemUserAuthorizationException. (#718)

(cherry picked from commit 1204500)
  • Loading branch information
SvitlanaKovalova1 authored and psmagin committed Jan 8, 2025
1 parent ff6daa4 commit 15c988c
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 38 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## v4.0.6 2025-01-08
### Tech Dept
* Recreate upload ranges each upload execution ([MSEARCH-934](https://folio-org.atlassian.net/browse/MSEARCH-934))
* Fix an issue with interrupting the batch event processing due to SystemUserAuthorizationException ([MSEARCH-925](https://folio-org.atlassian.net/browse/MSEARCH-925))

## v4.0.5 2025-01-03
### Bug fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand All @@ -28,6 +29,7 @@
import org.folio.search.service.reindex.jdbc.MergeRangeRepository;
import org.folio.search.service.reindex.jdbc.ReindexJdbcRepository;
import org.folio.search.utils.SearchConverterUtils;
import org.folio.spring.exception.SystemUserAuthorizationException;
import org.folio.spring.service.SystemUserScopedExecutionService;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
Expand All @@ -36,6 +38,7 @@

@Order(Ordered.LOWEST_PRECEDENCE)
@Component
@Log4j2
public class PopulateInstanceBatchInterceptor implements BatchInterceptor<String, ResourceEvent> {

private final Map<ReindexEntityType, MergeRangeRepository> repositories;
Expand Down Expand Up @@ -74,50 +77,33 @@ public ConsumerRecords<String, ResourceEvent> intercept(ConsumerRecords<String,

private void populate(List<ResourceEvent> records) {
var batchByTenant = records.stream().collect(Collectors.groupingBy(ResourceEvent::getTenant));
batchByTenant.forEach((tenant, batch) -> systemUserScopedExecutionService.executeSystemUserScoped(tenant,
() -> executionService.execute(() -> {
process(tenant, batch);
return null;
})));

batchByTenant.forEach((tenant, batch) -> {
try {
systemUserScopedExecutionService.executeSystemUserScoped(tenant, () -> executionService.execute(() -> {
process(tenant, batch);
return null;
}));
} catch (SystemUserAuthorizationException ex) {
log.warn("System user authorization failed. Skip processing batch for tenant {}: {}",
tenant, ex.getMessage(), ex);
}
});
}

private void process(String tenant, List<ResourceEvent> batch) {
var recordByResource = batch.stream().collect(Collectors.groupingBy(ResourceEvent::getResourceName));
for (Map.Entry<String, List<ResourceEvent>> recordCollection : recordByResource.entrySet()) {
if (ResourceType.BOUND_WITH.getName().equals(recordCollection.getKey())) {
var repository = repositories.get(ReindexEntityType.INSTANCE);
for (ResourceEvent resourceEvent : recordCollection.getValue()) {
boolean bound = resourceEvent.getType() != ResourceEventType.DELETE;
var eventPayload = getEventPayload(resourceEvent);
var id = getString(eventPayload, INSTANCE_ID_FIELD);
repository.updateBoundWith(tenant, id, bound);
}
processBoundWithEvents(tenant, recordCollection);
continue;
}

var repository = repositories.get(ReindexEntityType.fromValue(recordCollection.getKey()));
if (repository != null) {
var recordByOperation = recordCollection.getValue().stream()
.filter(resourceEvent -> {
if (ResourceType.INSTANCE.getName().equals(resourceEvent.getResourceName())) {
return !startsWith(getResourceSource(resourceEvent), SOURCE_CONSORTIUM_PREFIX);
}
return true;
})
.collect(Collectors.groupingBy(resourceEvent -> resourceEvent.getType() != ResourceEventType.DELETE));
var resourceToSave = recordByOperation.getOrDefault(true, emptyList()).stream()
.map(SearchConverterUtils::getNewAsMap)
.toList();
if (!resourceToSave.isEmpty()) {
repository.saveEntities(tenant, resourceToSave);
}
var idsToDrop = recordByOperation.getOrDefault(false, emptyList()).stream()
.map(ResourceEvent::getId)
.toList();
if (!idsToDrop.isEmpty()) {
deleteEntities(tenant, recordCollection.getKey(), repository, idsToDrop);
}
var recordByOperation = getRecordByOperation(recordCollection);
saveEntities(tenant, recordByOperation, repository);
deleteEntities(tenant, recordCollection.getKey(), recordByOperation, repository);

if (ResourceType.INSTANCE.getName().equals(recordCollection.getKey())) {
var noShadowCopiesInstanceEvents = recordByOperation.values().stream().flatMap(Collection::stream).toList();
instanceChildrenResourceService.persistChildren(tenant, noShadowCopiesInstanceEvents);
Expand All @@ -127,11 +113,50 @@ private void process(String tenant, List<ResourceEvent> batch) {
}
}

private void deleteEntities(String tenant, String resourceType, MergeRangeRepository repository, List<String> ids) {
if (ResourceType.HOLDINGS.getName().equals(resourceType) || ResourceType.ITEM.getName().equals(resourceType)) {
repository.deleteEntitiesForTenant(ids, tenant);
} else {
repository.deleteEntities(ids);
private void processBoundWithEvents(String tenant, Map.Entry<String, List<ResourceEvent>> recordCollection) {
var repository = repositories.get(ReindexEntityType.INSTANCE);
for (ResourceEvent resourceEvent : recordCollection.getValue()) {
boolean bound = resourceEvent.getType() != ResourceEventType.DELETE;
var eventPayload = getEventPayload(resourceEvent);
var id = getString(eventPayload, INSTANCE_ID_FIELD);
repository.updateBoundWith(tenant, id, bound);
}
}

private Map<Boolean, List<ResourceEvent>> getRecordByOperation(
Map.Entry<String, List<ResourceEvent>> recordCollection) {

return recordCollection.getValue().stream()
.filter(resourceEvent -> {
if (ResourceType.INSTANCE.getName().equals(resourceEvent.getResourceName())) {
return !startsWith(getResourceSource(resourceEvent), SOURCE_CONSORTIUM_PREFIX);
}
return true;
})
.collect(Collectors.groupingBy(resourceEvent -> resourceEvent.getType() != ResourceEventType.DELETE));
}

private void saveEntities(String tenant, Map<Boolean, List<ResourceEvent>> recordByOperation,
MergeRangeRepository repository) {
var resourceToSave = recordByOperation.getOrDefault(true, emptyList()).stream()
.map(SearchConverterUtils::getNewAsMap)
.toList();
if (!resourceToSave.isEmpty()) {
repository.saveEntities(tenant, resourceToSave);
}
}

private void deleteEntities(String tenant, String resourceType,
Map<Boolean, List<ResourceEvent>> recordByOperation, MergeRangeRepository repository) {
var idsToDrop = recordByOperation.getOrDefault(false, emptyList()).stream()
.map(ResourceEvent::getId)
.toList();
if (!idsToDrop.isEmpty()) {
if (ResourceType.HOLDINGS.getName().equals(resourceType) || ResourceType.ITEM.getName().equals(resourceType)) {
repository.deleteEntitiesForTenant(idsToDrop, tenant);
} else {
repository.deleteEntities(idsToDrop);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package org.folio.search.integration.message.interceptor;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.folio.search.domain.dto.ResourceEvent;
import org.folio.search.service.InstanceChildrenResourceService;
import org.folio.search.service.consortium.ConsortiumTenantExecutor;
import org.folio.search.service.reindex.jdbc.ItemRepository;
import org.folio.spring.exception.SystemUserAuthorizationException;
import org.folio.spring.service.SystemUserScopedExecutionService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
class PopulateInstanceBatchInterceptorTest {

private static final String TENANT_ID = "tenantId";

@Mock
private ConsortiumTenantExecutor executionService;
@Mock
private SystemUserScopedExecutionService systemUserScopedExecutionService;
@Mock
private InstanceChildrenResourceService instanceChildrenResourceService;
@Mock
private ItemRepository itemRepository;
@Mock
private Consumer<String, ResourceEvent> consumer;

private PopulateInstanceBatchInterceptor populateInstanceBatchInterceptor;

@BeforeEach
void setUp() {
populateInstanceBatchInterceptor = new PopulateInstanceBatchInterceptor(
List.of(itemRepository),
executionService,
systemUserScopedExecutionService,
instanceChildrenResourceService
);
}

@Test
void shouldHandleSystemUserAuthorizationExceptionInIntercept() {
// Arrange
var resourceEvent = new ResourceEvent().tenant(TENANT_ID).resourceName("instance");
var consumerRecord = new ConsumerRecord<>("topic", 0, 0L, "key", resourceEvent);
var records = new ConsumerRecords<>(Map.of(new TopicPartition("topic", 0), List.of(consumerRecord)));

doThrow(new SystemUserAuthorizationException("Authorization failed"))
.when(systemUserScopedExecutionService).executeSystemUserScoped(eq(TENANT_ID), any());

// Act
populateInstanceBatchInterceptor.intercept(records, consumer);

// Assert
verify(systemUserScopedExecutionService).executeSystemUserScoped(eq(TENANT_ID), any());
verify(executionService, never()).execute(any());
}

@Test
void shouldProcessRecordsSuccessfullyInIntercept() {
// Arrange
doAnswer(invocation -> {
Supplier<?> operation = invocation.getArgument(0);
return operation.get();
}).when(executionService).execute(any(Supplier.class));

doAnswer(invocation -> {
Callable<?> action = invocation.getArgument(1);
return action.call();
}).when(systemUserScopedExecutionService).executeSystemUserScoped(any(String.class), any(Callable.class));

var resourceEvent = new ResourceEvent().tenant(TENANT_ID).resourceName("instance");
var consumerRecord = new ConsumerRecord<>("topic", 0, 0L, "key", resourceEvent);
var records = new ConsumerRecords<>(Map.of(new TopicPartition("topic", 0), List.of(consumerRecord)));

// Act
populateInstanceBatchInterceptor.intercept(records, consumer);

// Assert
verify(systemUserScopedExecutionService).executeSystemUserScoped(eq(TENANT_ID), any());
verify(executionService).execute(any());
}
}

0 comments on commit 15c988c

Please sign in to comment.