diff --git a/NEWS.md b/NEWS.md index 1be24137..7f1ff7b3 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,8 @@ +## v3.1.2 2025-01-07 +### Bug fixes +* Shadow copies of deleted Shared MARC authority from Central tenant are not deleted from Member tenants after specified retention period ([MODELINKS-279](https://folio-org.atlassian.net/browse/MODELINKS-279)) +--- + ## v3.1.2 2024-12-17 ### Bug fixes * Close input stream on s3 file read ([MODELINKS-278](https://folio-org.atlassian.net/browse/MODELINKS-278)) diff --git a/src/main/java/org/folio/entlinks/controller/delegate/AuthorityArchiveServiceDelegate.java b/src/main/java/org/folio/entlinks/controller/delegate/AuthorityArchiveServiceDelegate.java index 245dcccc..2d07876e 100644 --- a/src/main/java/org/folio/entlinks/controller/delegate/AuthorityArchiveServiceDelegate.java +++ b/src/main/java/org/folio/entlinks/controller/delegate/AuthorityArchiveServiceDelegate.java @@ -29,6 +29,8 @@ @RequiredArgsConstructor public class AuthorityArchiveServiceDelegate { + private static final String CONSORTIUM_SOURCE_PREFIX = "CONSORTIUM-"; + private final AuthorityArchiveService authorityArchiveService; private final SettingsService settingsService; private final AuthorityArchiveRepository authorityArchiveRepository; @@ -60,7 +62,8 @@ public void expire() { } var tillDate = LocalDateTime.now().minusDays(retention.get()); - try (Stream archives = authorityArchiveRepository.streamByUpdatedTillDate(tillDate)) { + try (Stream archives = authorityArchiveRepository.streamByUpdatedTillDateAndSourcePrefix( + tillDate, CONSORTIUM_SOURCE_PREFIX)) { archives.forEach(this::process); } } diff --git a/src/main/java/org/folio/entlinks/domain/repository/AuthorityArchiveRepository.java b/src/main/java/org/folio/entlinks/domain/repository/AuthorityArchiveRepository.java index c6f2843b..fb674192 100644 --- a/src/main/java/org/folio/entlinks/domain/repository/AuthorityArchiveRepository.java +++ b/src/main/java/org/folio/entlinks/domain/repository/AuthorityArchiveRepository.java @@ -18,9 +18,10 @@ public interface AuthorityArchiveRepository extends JpaRepository, AuthorityArchiveCqlRepository { - @Query("select aa from AuthorityArchive aa where aa.updatedDate <= :tillDate") + @Query("select aa from AuthorityArchive aa where aa.updatedDate <= :tillDate and aa.source not like :sourcePrefix%") @QueryHints(@QueryHint(name = HibernateHints.HINT_FETCH_SIZE, value = "25")) - Stream streamByUpdatedTillDate(@Param("tillDate") LocalDateTime tillDate); + Stream streamByUpdatedTillDateAndSourcePrefix( + @Param("tillDate") LocalDateTime tillDate, @Param("sourcePrefix") String sourcePrefix); @Query("select a.id as id from AuthorityArchive a") Page findAllIds(Pageable pageable); diff --git a/src/test/java/org/folio/entlinks/controller/AuthorityControllerEcsIT.java b/src/test/java/org/folio/entlinks/controller/AuthorityControllerEcsIT.java new file mode 100644 index 00000000..07ce40b9 --- /dev/null +++ b/src/test/java/org/folio/entlinks/controller/AuthorityControllerEcsIT.java @@ -0,0 +1,212 @@ +package org.folio.entlinks.controller; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; +import static org.folio.support.DatabaseHelper.AUTHORITY_ARCHIVE_TABLE; +import static org.folio.support.DatabaseHelper.AUTHORITY_DATA_STAT_TABLE; +import static org.folio.support.DatabaseHelper.AUTHORITY_SOURCE_FILE_CODE_TABLE; +import static org.folio.support.DatabaseHelper.AUTHORITY_SOURCE_FILE_TABLE; +import static org.folio.support.DatabaseHelper.AUTHORITY_TABLE; +import static org.folio.support.KafkaTestUtils.createAndStartTestConsumer; +import static org.folio.support.TestDataUtils.AUTHORITY_IDS; +import static org.folio.support.TestDataUtils.AuthorityTestData.authority; +import static org.folio.support.TestDataUtils.AuthorityTestData.authoritySourceFile; +import static org.folio.support.base.TestConstants.CENTRAL_TENANT_ID; +import static org.folio.support.base.TestConstants.TENANT_ID; +import static org.folio.support.base.TestConstants.authorityEndpoint; +import static org.folio.support.base.TestConstants.authorityExpireEndpoint; +import static org.folio.support.base.TestConstants.authorityTopic; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.sql.Timestamp; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import lombok.SneakyThrows; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.folio.entlinks.domain.dto.AuthorityDto; +import org.folio.entlinks.domain.entity.Authority; +import org.folio.entlinks.integration.dto.event.AuthorityDomainEvent; +import org.folio.spring.testing.extension.DatabaseCleanup; +import org.folio.spring.testing.type.IntegrationTest; +import org.folio.support.base.IntegrationTestBase; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.kafka.listener.KafkaMessageListenerContainer; + +@IntegrationTest +@DatabaseCleanup(tables = { + AUTHORITY_SOURCE_FILE_CODE_TABLE, + AUTHORITY_DATA_STAT_TABLE, + AUTHORITY_TABLE, + AUTHORITY_ARCHIVE_TABLE, + AUTHORITY_SOURCE_FILE_TABLE}, + tenants = {CENTRAL_TENANT_ID, TENANT_ID}) +class AuthorityControllerEcsIT extends IntegrationTestBase { + private KafkaMessageListenerContainer container; + private BlockingQueue> consumerRecords; + + @BeforeAll + static void prepare() { + setUpConsortium(CENTRAL_TENANT_ID, List.of(TENANT_ID), true); + } + + @BeforeEach + void setUp(@Autowired KafkaProperties kafkaProperties) { + consumerRecords = new LinkedBlockingQueue<>(); + container = createAndStartTestConsumer( + authorityTopic(), consumerRecords, kafkaProperties, AuthorityDomainEvent.class); + } + + @AfterEach + void tearDown() { + consumerRecords.clear(); + container.stop(); + } + + @ParameterizedTest + @CsvSource({"consortium, 0", "test, 1"}) + @DisplayName("DELETE: Should delete existing authority archives by retention in settings " + + "for Consortium and Member tenants") + void expireAuthorityArchives_positive_shouldExpireExistingArchivesForConsortiumAndMemberTenant( + String tenant, int expectedCount) { + + //mock retention period + mockFailedSettingsRequest(); + + //create authority records for consortium tenant + var authority = createAuthorityForConsortium(); + + var count = 1; + awaitUntilAsserted(() -> { + assertEquals(count, databaseHelper.countRows(AUTHORITY_TABLE, CENTRAL_TENANT_ID)); + assertEquals(count, databaseHelper.countRows(AUTHORITY_TABLE, TENANT_ID)); + }); + + //delete records from authority table + doDelete(authorityEndpoint(authority.getId()), tenantHeaders(CENTRAL_TENANT_ID)); + getConsumedEvent(); + + // wait for the archive to be created + var count1 = 1; + awaitUntilAsserted(() -> { + assertEquals(count1, databaseHelper.countRowsWhere(AUTHORITY_ARCHIVE_TABLE, CENTRAL_TENANT_ID, "deleted = true")); + assertEquals(count1, databaseHelper.countRowsWhere(AUTHORITY_ARCHIVE_TABLE, TENANT_ID, "deleted = true")); + }); + + awaitUntilAsserted(() -> assertEquals(0, databaseHelper.countRows(AUTHORITY_TABLE, CENTRAL_TENANT_ID))); + + // update AuthorityArchive updated_date field + var dateInPast = Timestamp.from(Instant.now().minus(8, ChronoUnit.DAYS)); + databaseHelper.updateAuthorityArchiveUpdateDate(CENTRAL_TENANT_ID, authority.getId(), dateInPast); + databaseHelper.updateAuthorityArchiveUpdateDate(TENANT_ID, authority.getId(), dateInPast); + + // trigger endpoint + doPost(authorityExpireEndpoint(), null, tenantHeaders(tenant)); + + //check the archive records count in Central and Member tenants + awaitUntilAsserted(() -> { + assertEquals(expectedCount, databaseHelper.countRowsWhere(AUTHORITY_ARCHIVE_TABLE, CENTRAL_TENANT_ID, + "deleted = true")); + assertEquals(expectedCount, databaseHelper.countRowsWhere(AUTHORITY_ARCHIVE_TABLE, TENANT_ID, "deleted = true")); + }); + } + + @ParameterizedTest + @CsvSource({"consortium, 0, 1", "test, 1, 1"}) + @DisplayName("DELETE: Should not delete existing local record in Member tenant from the authority archives " + + "by retention in settings") + void expireAuthorityArchives_positive_shouldExpireExistingArchivesWithLocalRecordForMemberTenant( + String tenant, int expectedConsortiumCount, int expectedMemberCount) { + + //mock retention period + mockFailedSettingsRequest(); + createSourceFile(); + //create authority record for consortium tenant + var authority1 = createAuthorityForConsortium(); + //create local authority record for Member tenant + var authority2 = createAuthority(); + + //delete records from authority table + doDelete(authorityEndpoint(authority1.getId()), tenantHeaders(CENTRAL_TENANT_ID)); + getConsumedEvent(); + doDelete(authorityEndpoint(authority2.getId()), tenantHeaders(TENANT_ID)); + getConsumedEvent(); + + // wait for the archive to be created + awaitUntilAsserted(() -> { + assertEquals(1, databaseHelper.countRowsWhere(AUTHORITY_ARCHIVE_TABLE, CENTRAL_TENANT_ID, "deleted = true")); + assertEquals(2, databaseHelper.countRowsWhere(AUTHORITY_ARCHIVE_TABLE, TENANT_ID, "deleted = true")); + }); + + awaitUntilAsserted(() -> { + assertEquals(0, databaseHelper.countRows(AUTHORITY_TABLE, CENTRAL_TENANT_ID)); + assertEquals(0, databaseHelper.countRows(AUTHORITY_TABLE, TENANT_ID)); + }); + + // update AuthorityArchive updated_date field + var dateInPast = Timestamp.from(Instant.now().minus(8, ChronoUnit.DAYS)); + databaseHelper.updateAuthorityArchiveUpdateDate(CENTRAL_TENANT_ID, authority1.getId(), dateInPast); + databaseHelper.updateAuthorityArchiveUpdateDate(TENANT_ID, authority1.getId(), dateInPast); + databaseHelper.updateAuthorityArchiveUpdateDate(TENANT_ID, authority2.getId(), dateInPast); + + // trigger endpoint + doPost(authorityExpireEndpoint(), null, tenantHeaders(tenant)); + + //check the archive records count in Central and Member tenants + awaitUntilAsserted(() -> { + assertEquals(expectedConsortiumCount, databaseHelper.countRowsWhere(AUTHORITY_ARCHIVE_TABLE, CENTRAL_TENANT_ID, + "deleted = true")); + assertEquals(expectedMemberCount, databaseHelper.countRowsWhere(AUTHORITY_ARCHIVE_TABLE, TENANT_ID, + "deleted = true")); + }); + } + + private void mockFailedSettingsRequest() { + okapi.wireMockServer().stubFor(get(urlPathEqualTo("/settings/entries")) + .withQueryParam("query", equalTo("(scope=authority-storage AND key=authority-archives-expiration)")) + .withQueryParam("limit", equalTo("10000")) + .willReturn(aResponse().withStatus(500))); + } + + private Authority createAuthority() { + var entity = authority(1, 0); + databaseHelper.saveAuthority(TENANT_ID, entity); + return entity; + } + + private AuthorityDto createAuthorityForConsortium() { + var dto = new AuthorityDto() + .id(AUTHORITY_IDS[0]) + .version(0) + .source("MARC") + .naturalId("ns123456") + .personalName("Nikola Tesla1"); + doPost(authorityEndpoint(), dto, tenantHeaders(CENTRAL_TENANT_ID)); + return dto; + } + + private void createSourceFile() { + var entity = authoritySourceFile(0); + databaseHelper.saveAuthoritySourceFile(TENANT_ID, entity); + + entity.getAuthoritySourceFileCodes().forEach(code -> + databaseHelper.saveAuthoritySourceFileCode(TENANT_ID, entity.getId(), code)); + } + + @SneakyThrows + private void getConsumedEvent() { + consumerRecords.poll(10, TimeUnit.SECONDS); + } +} diff --git a/src/test/java/org/folio/entlinks/controller/delegate/AuthorityArchiveServiceDelegateTest.java b/src/test/java/org/folio/entlinks/controller/delegate/AuthorityArchiveServiceDelegateTest.java index 8ea0500c..f0fc317b 100644 --- a/src/test/java/org/folio/entlinks/controller/delegate/AuthorityArchiveServiceDelegateTest.java +++ b/src/test/java/org/folio/entlinks/controller/delegate/AuthorityArchiveServiceDelegateTest.java @@ -5,6 +5,7 @@ import static org.folio.entlinks.integration.SettingsService.AUTHORITIES_EXPIRE_SETTING_KEY; import static org.folio.entlinks.integration.SettingsService.AUTHORITIES_EXPIRE_SETTING_SCOPE; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @@ -44,6 +45,8 @@ @ExtendWith(MockitoExtension.class) class AuthorityArchiveServiceDelegateTest { + private static final String TENANT_ID = "tenantId"; + @Mock private AuthorityArchiveService service; @@ -87,8 +90,8 @@ void shouldRetrieveAuthorityCollection_idsOnly() { var dtoResult = (AuthorityIdDtoCollection) result; assertThat(dtoResult.getTotalRecords()).isEqualTo(total); assertThat(dtoResult.getAuthorities()) - .extracting(AuthorityIdDto::getId) - .containsExactlyElementsOf(page.getContent()); + .extracting(AuthorityIdDto::getId) + .containsExactlyElementsOf(page.getContent()); } @Test @@ -111,15 +114,16 @@ void shouldExpireAuthorityArchivesWithDefaultRetentionPeriod() { when(authorityMapper.toDto(archive)).thenReturn(dto); when(settingsService.getAuthorityExpireSetting()).thenReturn(Optional.empty()); when(authorityArchiveProperties.getRetentionPeriodInDays()).thenReturn(7); - when(authorityArchiveRepository.streamByUpdatedTillDate(any(LocalDateTime.class))).thenReturn(Stream.of(archive)); - when(context.getTenantId()).thenReturn("tenantId"); + when(authorityArchiveRepository.streamByUpdatedTillDateAndSourcePrefix(any(LocalDateTime.class), anyString())) + .thenReturn(Stream.of(archive)); + when(context.getTenantId()).thenReturn(TENANT_ID); delegate.expire(); verify(service).delete(archive); verify(eventPublisher).publishHardDeleteEvent(dto); verify(propagationService) - .propagate(archive, ConsortiumPropagationService.PropagationType.DELETE, "tenantId"); + .propagate(archive, ConsortiumPropagationService.PropagationType.DELETE, TENANT_ID); } @Test @@ -131,14 +135,15 @@ void shouldExpireAuthorityArchivesWithRetentionPeriodFromSettings() { archive.setUpdatedDate(Timestamp.from(Instant.now().minus(2, ChronoUnit.DAYS))); when(authorityMapper.toDto(archive)).thenReturn(dto); when(settingsService.getAuthorityExpireSetting()).thenReturn(Optional.of(setting)); - when(authorityArchiveRepository.streamByUpdatedTillDate(any(LocalDateTime.class))).thenReturn(Stream.of(archive)); - when(context.getTenantId()).thenReturn("tenantId"); + when(authorityArchiveRepository.streamByUpdatedTillDateAndSourcePrefix(any(LocalDateTime.class), anyString())) + .thenReturn(Stream.of(archive)); + when(context.getTenantId()).thenReturn(TENANT_ID); delegate.expire(); verify(service).delete(archive); verify(eventPublisher).publishHardDeleteEvent(dto); verify(propagationService) - .propagate(archive, ConsortiumPropagationService.PropagationType.DELETE, "tenantId"); + .propagate(archive, ConsortiumPropagationService.PropagationType.DELETE, TENANT_ID); } }