diff --git a/gradle.properties b/gradle.properties index f184a04..f061faa 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ fabric8Version=5.12.4 -horizonParentVersion=4.3.0 \ No newline at end of file +horizonParentVersion=0.0.0-feature-offset-based-sse-SNAPSHOT \ No newline at end of file diff --git a/src/main/java/de/telekom/horizon/pulsar/api/SseController.java b/src/main/java/de/telekom/horizon/pulsar/api/SseController.java index 843fb0c..df49b0c 100644 --- a/src/main/java/de/telekom/horizon/pulsar/api/SseController.java +++ b/src/main/java/de/telekom/horizon/pulsar/api/SseController.java @@ -8,6 +8,7 @@ import de.telekom.horizon.pulsar.helper.StreamLimit; import de.telekom.horizon.pulsar.service.SseService; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; @@ -74,6 +75,7 @@ public ResponseEntity getSseStream(@PathVariable String env @RequestParam(defaultValue = "0") int maxNumber, @RequestParam(defaultValue = "0") int maxMinutes, @RequestParam(defaultValue = "0") int maxBytes, + @RequestHeader(value = "Last-Event-ID", required = false) String offset, @RequestHeader(value = HttpHeaders.ACCEPT, required = false) String accept) throws SubscriberDoesNotMatchSubscriptionException { sseService.validateSubscriberIdForSubscription(environment, subscriptionId); @@ -83,7 +85,7 @@ public ResponseEntity getSseStream(@PathVariable String env accept = APPLICATION_STREAM_JSON_VALUE; } - var responseContainer = sseService.startEmittingEvents(environment, subscriptionId, accept, includeHttpHeaders, StreamLimit.of(maxNumber, maxMinutes, maxBytes)); + var responseContainer = sseService.startEmittingEvents(environment, subscriptionId, accept, StringUtils.isNotEmpty(offset) || includeHttpHeaders, offset, StreamLimit.of(maxNumber, maxMinutes, maxBytes)); var responseHeaders = new HttpHeaders(); responseHeaders.add(HttpHeaders.CONTENT_TYPE, accept); diff --git a/src/main/java/de/telekom/horizon/pulsar/helper/EventMessageContext.java b/src/main/java/de/telekom/horizon/pulsar/helper/EventMessageContext.java index 814de75..708dae8 100644 --- a/src/main/java/de/telekom/horizon/pulsar/helper/EventMessageContext.java +++ b/src/main/java/de/telekom/horizon/pulsar/helper/EventMessageContext.java @@ -30,6 +30,8 @@ public class EventMessageContext { private Boolean includeHttpHeaders; @Getter private StreamLimit streamLimit; + @Getter + private boolean ignoreDeduplication; private Span span; private Tracer.SpanInScope spanInScope; diff --git a/src/main/java/de/telekom/horizon/pulsar/service/EventMessageSupplier.java b/src/main/java/de/telekom/horizon/pulsar/service/EventMessageSupplier.java index d3df8da..0418c73 100644 --- a/src/main/java/de/telekom/horizon/pulsar/service/EventMessageSupplier.java +++ b/src/main/java/de/telekom/horizon/pulsar/service/EventMessageSupplier.java @@ -14,6 +14,7 @@ import de.telekom.eni.pandora.horizon.model.event.StatusMessage; import de.telekom.eni.pandora.horizon.model.event.SubscriptionEventMessage; import de.telekom.eni.pandora.horizon.model.meta.EventRetentionTime; +import de.telekom.eni.pandora.horizon.mongo.model.MessageStateMongoDocument; import de.telekom.eni.pandora.horizon.mongo.repository.MessageStateMongoRepo; import de.telekom.eni.pandora.horizon.tracing.HorizonTracer; import de.telekom.horizon.pulsar.config.PulsarConfig; @@ -25,6 +26,7 @@ import de.telekom.horizon.pulsar.utils.KafkaPicker; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; @@ -32,6 +34,7 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -62,16 +65,19 @@ public class EventMessageSupplier implements Supplier { private final HorizonTracer tracingHelper; private final ConcurrentLinkedQueue messageStates = new ConcurrentLinkedQueue<>(); private Instant lastPoll; + private String currentOffset; private final ObjectMapper objectMapper = new ObjectMapper(); /** * Constructs an instance of {@code EventMessageSupplier}. * - * @param subscriptionId The subscriptionId for which messages are fetched. - * @param factory The {@link SseTaskFactory} used for obtaining related components. + * @param subscriptionId The subscriptionId for which messages are fetched. + * @param factory The {@link SseTaskFactory} used for obtaining related components. * @param includeHttpHeaders Boolean flag indicating whether to include HTTP headers in the generated {@code EventMessageContext}. + * @param startingOffset Enables offset based streaming. Specifies the offset (message id) of the last received event message. + * @param streamLimit The {@link StreamLimit} represents any customer specific conditions for terminating the stream early. */ - public EventMessageSupplier(String subscriptionId, SseTaskFactory factory, boolean includeHttpHeaders, StreamLimit streamLimit) { + public EventMessageSupplier(String subscriptionId, SseTaskFactory factory, boolean includeHttpHeaders, String startingOffset, StreamLimit streamLimit) { this.subscriptionId = subscriptionId; this.pulsarConfig = factory.getPulsarConfig(); @@ -80,6 +86,7 @@ public EventMessageSupplier(String subscriptionId, SseTaskFactory factory, boole this.eventWriter = factory.getEventWriter(); this.tracingHelper = factory.getTracingHelper(); this.includeHttpHeaders = includeHttpHeaders; + this.currentOffset = startingOffset; this.streamLimit = streamLimit; } @@ -98,6 +105,7 @@ public EventMessageContext get() { if (!messageStates.isEmpty()) { var state = messageStates.poll(); + var ignoreDeduplication = StringUtils.isNotEmpty(currentOffset); // TODO: these spans get duplicated cause of the vortex latency - will be resolved DHEI-13764 @@ -120,12 +128,13 @@ public EventMessageContext get() { var errorMessage = String.format("Event message %s did not match subscriptionId %s", state.getUuid(), state.getSubscriptionId()); throw new SubscriberDoesNotMatchSubscriptionException(errorMessage); } - } - return new EventMessageContext(message, includeHttpHeaders, streamLimit, span, spanInScope); + Optional.ofNullable(message.getHttpHeaders()).ifPresent(headers -> headers.put("x-pubsub-offset-id", new ArrayList<>(List.of(state.getUuid())))); + } + return new EventMessageContext(message, includeHttpHeaders, streamLimit, ignoreDeduplication, span, spanInScope); } catch (CouldNotPickMessageException | SubscriberDoesNotMatchSubscriptionException e) { handleException(state, e); - return new EventMessageContext(null, includeHttpHeaders, streamLimit, span, spanInScope); + return new EventMessageContext(null, includeHttpHeaders, streamLimit, ignoreDeduplication, span, spanInScope); } finally { pickSpan.finish(); } @@ -171,16 +180,42 @@ private void pollMessageStates() { Pageable pageable = PageRequest.of(0, pulsarConfig.getSseBatchSize(), Sort.by(Sort.Direction.ASC, "timestamp")); - var list = messageStateMongoRepo.findByStatusInAndDeliveryTypeAndSubscriptionIdAsc( - List.of(Status.PROCESSED), - DeliveryType.SERVER_SENT_EVENT, - subscriptionId, - pageable - ).stream() - .filter(m -> m.getCoordinates() != null) // we skip messages that refer to -1 partitions and offsets - .toList(); + Optional offsetMsg = Optional.empty(); + if (StringUtils.isNoneEmpty(currentOffset)) { + offsetMsg = messageStateMongoRepo.findById(currentOffset); + } + + if (offsetMsg.isPresent()) { + var offsetTimestamp = offsetMsg.get().getTimestamp(); + + var list = messageStateMongoRepo.findByDeliveryTypeAndSubscriptionIdAndTimestampGreaterThanAsc( + DeliveryType.SERVER_SENT_EVENT, + subscriptionId, + offsetTimestamp, + pageable + ).stream() + .filter(m -> m.getCoordinates() != null) // we skip messages that refer to -1 partitions and offsets + .toList(); - messageStates.addAll(list); + messageStates.addAll(list); + + if (!list.isEmpty()) { + currentOffset = list.getLast().getUuid(); + } + } else { + currentOffset = null; + + var list = messageStateMongoRepo.findByStatusInAndDeliveryTypeAndSubscriptionIdAsc( + List.of(Status.PROCESSED), + DeliveryType.SERVER_SENT_EVENT, + subscriptionId, + pageable + ).stream() + .filter(m -> m.getCoordinates() != null) // we skip messages that refer to -1 partitions and offsets + .toList(); + + messageStates.addAll(list); + } lastPoll = Instant.now(); } diff --git a/src/main/java/de/telekom/horizon/pulsar/service/SseService.java b/src/main/java/de/telekom/horizon/pulsar/service/SseService.java index 8234fca..87c5dcd 100644 --- a/src/main/java/de/telekom/horizon/pulsar/service/SseService.java +++ b/src/main/java/de/telekom/horizon/pulsar/service/SseService.java @@ -90,16 +90,18 @@ public void validateSubscriberIdForSubscription(String environment, String subsc /** * Starts emitting events for the specified subscription. * - * @param environment The environment associated with the subscription. - * @param subscriptionId The ID of the subscription for which events should be emitted. - * @param contentType The content type for the events. - * @param includeHttpHeaders A boolean flag indicating whether to include HTTP headers in the emitted events. - * @return The {@link SseTaskStateContainer} representing the state of the emitted events. + * @param environment The environment associated with the subscription. + * @param subscriptionId The ID of the subscription for which events should be emitted. + * @param contentType The content type for the events. + * @param includeHttpHeaders A boolean flag indicating whether to include HTTP headers in the emitted events. + * @param offset Enables offset based streaming. Specifies the offset (message id) of the last received event message. + * @param streamLimit The {@link StreamLimit} represents any customer specific conditions for terminating the stream early. + * @return The {@link SseTaskStateContainer} representing the state of the emitted events. */ - public SseTaskStateContainer startEmittingEvents(String environment, String subscriptionId, String contentType, boolean includeHttpHeaders, StreamLimit streamLimit) { + public SseTaskStateContainer startEmittingEvents(String environment, String subscriptionId, String contentType, boolean includeHttpHeaders, String offset, StreamLimit streamLimit) { var responseContainer = new SseTaskStateContainer(); - taskExecutor.submit(sseTaskFactory.createNew(environment, subscriptionId, contentType, responseContainer, includeHttpHeaders, streamLimit)); + taskExecutor.submit(sseTaskFactory.createNew(environment, subscriptionId, contentType, responseContainer, includeHttpHeaders, offset, streamLimit)); responseContainer.setReady(pulsarConfig.getSseTimeout()); diff --git a/src/main/java/de/telekom/horizon/pulsar/service/SseTask.java b/src/main/java/de/telekom/horizon/pulsar/service/SseTask.java index 0d8fcad..1b023f9 100644 --- a/src/main/java/de/telekom/horizon/pulsar/service/SseTask.java +++ b/src/main/java/de/telekom/horizon/pulsar/service/SseTask.java @@ -277,18 +277,20 @@ private void emitEventMessage(EventMessageContext context) { return; } - String msgUuidOrNull = deDuplicationService.get(msg); - boolean isDuplicate = Objects.nonNull(msgUuidOrNull); - if (isDuplicate) { - if(Objects.equals(msg.getUuid(), msgUuidOrNull)) { - log.debug("Message with id {} was found in the deduplication cache with the same UUID. Message will be ignored, because status will probably set to DELIVERED in the next minutes.", msg.getUuid()); - } else { - log.debug("Message with id {} was found in the deduplication cache with another UUID. Message will be set to DUPLICATE to prevent event being stuck at PROCESSED.", msg.getUuid()); - pushMetadata(msg, Status.DUPLICATE, null); - } + if (!context.isIgnoreDeduplication()) { + String msgUuidOrNull = deDuplicationService.get(msg); + boolean isDuplicate = Objects.nonNull(msgUuidOrNull); + if (isDuplicate) { + if(Objects.equals(msg.getUuid(), msgUuidOrNull)) { + log.debug("Message with id {} was found in the deduplication cache with the same UUID. Message will be ignored, because status will probably set to DELIVERED in the next minutes.", msg.getUuid()); + } else { + log.debug("Message with id {} was found in the deduplication cache with another UUID. Message will be set to DUPLICATE to prevent event being stuck at PROCESSED.", msg.getUuid()); + pushMetadata(msg, Status.DUPLICATE, null); + } - context.finishSpan(); - return; + context.finishSpan(); + return; + } } try { diff --git a/src/main/java/de/telekom/horizon/pulsar/service/SseTaskFactory.java b/src/main/java/de/telekom/horizon/pulsar/service/SseTaskFactory.java index 493d846..554fe88 100644 --- a/src/main/java/de/telekom/horizon/pulsar/service/SseTaskFactory.java +++ b/src/main/java/de/telekom/horizon/pulsar/service/SseTaskFactory.java @@ -81,11 +81,12 @@ public SseTaskFactory( * @param contentType The content type for the SSE task. * @param sseTaskStateContainer The {@link SseTaskStateContainer} represents the state of the SSE task. * @param includeHttpHeaders A boolean flag indicating whether to include HTTP headers in the SSE task. + * @param offset Enables offset based streaming. Specifies the offset (message id) of the last received event message. * @param streamLimit The {@link StreamLimit} represents any customer specific conditions for terminating the stream early. - * @return The newly created {@link SseTask}. + * @return The newly created {@link SseTask}. */ - public SseTask createNew(String environment, String subscriptionId, String contentType, SseTaskStateContainer sseTaskStateContainer, boolean includeHttpHeaders, StreamLimit streamLimit) { - var eventMessageSupplier = new EventMessageSupplier(subscriptionId, this, includeHttpHeaders, streamLimit); + public SseTask createNew(String environment, String subscriptionId, String contentType, SseTaskStateContainer sseTaskStateContainer, boolean includeHttpHeaders, String offset, StreamLimit streamLimit) { + var eventMessageSupplier = new EventMessageSupplier(subscriptionId, this, includeHttpHeaders, offset, streamLimit); var connection = connectionGaugeCache.getOrCreateGaugeForSubscription(environment, subscriptionId); var task = new SseTask(sseTaskStateContainer, eventMessageSupplier, connection, this); diff --git a/src/test/java/de/telekom/horizon/pulsar/api/SseControllerSecurityEnabledTest.java b/src/test/java/de/telekom/horizon/pulsar/api/SseControllerSecurityEnabledTest.java index 93ee900..19a2a32 100644 --- a/src/test/java/de/telekom/horizon/pulsar/api/SseControllerSecurityEnabledTest.java +++ b/src/test/java/de/telekom/horizon/pulsar/api/SseControllerSecurityEnabledTest.java @@ -88,7 +88,7 @@ void getEventsViaSSESuccessfullyWithOk() throws Exception { .andExpect(status().isOk()); verify(sseService, times(1)).validateSubscriberIdForSubscription(eq(env), eq(subscriptionId)); - verify(sseService, times(1)).startEmittingEvents(eq(env), eq(subscriptionId), any(String.class), eq(true), any(StreamLimit.class)); + verify(sseService, times(1)).startEmittingEvents(eq(env), eq(subscriptionId), any(String.class), eq(true), any(), any(StreamLimit.class)); } private Jwt getJwt(String subscriberId) { diff --git a/src/test/java/de/telekom/horizon/pulsar/service/EventMessageSupplierTest.java b/src/test/java/de/telekom/horizon/pulsar/service/EventMessageSupplierTest.java index 847b2c7..f0cd1b8 100644 --- a/src/test/java/de/telekom/horizon/pulsar/service/EventMessageSupplierTest.java +++ b/src/test/java/de/telekom/horizon/pulsar/service/EventMessageSupplierTest.java @@ -7,14 +7,12 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import de.telekom.eni.pandora.horizon.kafka.event.EventWriter; +import de.telekom.eni.pandora.horizon.model.db.State; import de.telekom.eni.pandora.horizon.model.event.DeliveryType; import de.telekom.eni.pandora.horizon.model.event.Status; import de.telekom.eni.pandora.horizon.model.event.StatusMessage; import de.telekom.horizon.pulsar.helper.StreamLimit; import de.telekom.horizon.pulsar.testutils.MockHelper; -import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.Tags; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.BeforeEach; @@ -31,6 +29,8 @@ import org.springframework.test.util.ReflectionTestUtils; import java.time.Duration; +import java.util.Comparator; +import java.util.Date; import java.util.List; import java.util.Optional; @@ -47,28 +47,22 @@ class EventMessageSupplierTest { void setupEventMessageSupplierTest() { MockHelper.init(); - eventMessageSupplier = new EventMessageSupplier(MockHelper.TEST_SUBSCRIPTION_ID, MockHelper.sseTaskFactory, false, new StreamLimit()); + eventMessageSupplier = new EventMessageSupplier(MockHelper.TEST_SUBSCRIPTION_ID, MockHelper.sseTaskFactory, false, null, new StreamLimit()); } @ParameterizedTest - @ValueSource(ints = {10}) - void testGetEventMessageContext(int polls) { + @ValueSource(booleans = {false, true}) + void testGetEventMessageContext(boolean isOffsetMode) { final var objectMapper = new ObjectMapper(); var pageableCaptor = ArgumentCaptor.forClass(Pageable.class); + var messagesCount = 15; // We create a list of some test state documents similar as we would get from MongoDB - var states = MockHelper.createMessageStateMongoDocumentsForTesting(MockHelper.pulsarConfig.getSseBatchSize(), MockHelper.TEST_ENVIRONMENT, Status.PROCESSED, false); - - // We mock the request to MongoDB and return our dummy state documents instead - // We also capture the pageable argument to check whether is has been used correctly, later - when(MockHelper.messageStateMongoRepo.findByStatusInAndDeliveryTypeAndSubscriptionIdAsc(eq(List.of(Status.PROCESSED)), - eq(DeliveryType.SERVER_SENT_EVENT), - eq(MockHelper.TEST_SUBSCRIPTION_ID), - pageableCaptor.capture())).thenReturn(new SliceImpl<>(states)); + var states = MockHelper.createMessageStateMongoDocumentsForTesting(messagesCount, MockHelper.TEST_ENVIRONMENT, Status.PROCESSED, false); // We create a new SubscriptionEventMessage for testing - var subscriptionEventMessage = MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.CALLBACK, false); + var subscriptionEventMessage = MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.CALLBACK); // We mock the picked message from Kafka ConsumerRecord record = Mockito.mock(ConsumerRecord.class); @@ -81,14 +75,49 @@ void testGetEventMessageContext(int polls) { fail(e); } - // EventWriter ha private access and will be created in the constructor of the SseTaskFactory + var offsetIndex = 0; + // We mock the request to MongoDB and return our dummy state documents instead + // We also capture the pageable argument to check whether it has been used correctly, later + if (isOffsetMode) { + offsetIndex = 4; + var offsetMessage = states.get(offsetIndex); + + eventMessageSupplier = new EventMessageSupplier(MockHelper.TEST_SUBSCRIPTION_ID, MockHelper.sseTaskFactory, false, offsetMessage.getUuid(), new StreamLimit()); + + when(MockHelper.messageStateMongoRepo.findById(anyString())).thenAnswer(invocation -> { + String uuid = invocation.getArgument(0); + return states.stream().filter(s -> s.getUuid().equals(uuid)).findAny(); + }); + when(MockHelper.messageStateMongoRepo.findByDeliveryTypeAndSubscriptionIdAndTimestampGreaterThanAsc(eq(DeliveryType.SERVER_SENT_EVENT), + eq(MockHelper.TEST_SUBSCRIPTION_ID), + any(Date.class), + pageableCaptor.capture())).thenAnswer(invocation -> { + Date timestamp = invocation.getArgument(2); + var slice = states.stream().filter(s -> s.getTimestamp().toInstant().isAfter(timestamp.toInstant())).sorted(Comparator.comparing(State::getTimestamp)).limit(MockHelper.pulsarConfig.getSseBatchSize()).toList(); + return new SliceImpl<>(slice); + }); + } else { + when(MockHelper.messageStateMongoRepo.findByStatusInAndDeliveryTypeAndSubscriptionIdAsc(eq(List.of(Status.PROCESSED)), + eq(DeliveryType.SERVER_SENT_EVENT), + eq(MockHelper.TEST_SUBSCRIPTION_ID), + pageableCaptor.capture())).thenAnswer(invocation -> { + var slice = states.stream().filter(a -> a.getStatus() == Status.PROCESSED).sorted(Comparator.comparing(State::getTimestamp)).limit(MockHelper.pulsarConfig.getSseBatchSize()).toList(); + return new SliceImpl<>(slice); + }); + } + + // EventWriter has private access and will be created in the constructor of the SseTaskFactory // Since we want to check invocations with it, we will overwrite the EventWriter in EventMessageSupplier via reflections var eventWriterMock = mock(EventWriter.class); ReflectionTestUtils.setField(eventMessageSupplier, "eventWriter", eventWriterMock, EventWriter.class); + var startAt = 0; + if (isOffsetMode) { + startAt = offsetIndex +1; + } // We do multiple calls to EventMessageSupplier.get() in order to test // that each call will fetch the next event message in the queue - for (int i = 0; i < polls; i++) { + for(int i = startAt; i < states.size(); i++) { // We mock the actual picking of a message from Kafka here when(MockHelper.kafkaTemplate.receive(eq(MockHelper.TEST_TOPIC), eq(states.get(i).getCoordinates().partition()), eq(states.get(i).getCoordinates().offset()), eq(Duration.ofMillis(30000)))).thenReturn(record); @@ -96,6 +125,9 @@ void testGetEventMessageContext(int polls) { var result = eventMessageSupplier.get(); assertNotNull(result); + // mock Vortex setting event to DELIVERED + states.get(i).setStatus(Status.DELIVERED); + // Check that we fetch batches from MongoDB correctly var pageable = pageableCaptor.getValue(); assertEquals(0, pageable.getOffset()); @@ -129,7 +161,7 @@ void testGetEventMessageContextWithSubscriberDoesNotMatchSubscriptionException() // We create a new SubscriptionEventMessage for testing, // and we overwrite the subscriptionId so that it doesn't match the standard testing subscriptionId // in the state messages anymore - var subscriptionEventMessage = MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.CALLBACK, false); + var subscriptionEventMessage = MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.CALLBACK); subscriptionEventMessage.setSubscriptionId("something different"); // We mock the picked message from Kafka diff --git a/src/test/java/de/telekom/horizon/pulsar/service/SseServiceTest.java b/src/test/java/de/telekom/horizon/pulsar/service/SseServiceTest.java index 27d5fbe..bc2e3f4 100644 --- a/src/test/java/de/telekom/horizon/pulsar/service/SseServiceTest.java +++ b/src/test/java/de/telekom/horizon/pulsar/service/SseServiceTest.java @@ -122,7 +122,7 @@ void testStartEmittingEvents() { var connectionCacheSpy = Mockito.spy(connectionCache); when(sseTaskFactoryMock.getConnectionCache()).thenReturn(connectionCacheSpy); - when(sseTaskFactoryMock.createNew(eq(MockHelper.TEST_ENVIRONMENT), eq(MockHelper.TEST_SUBSCRIPTION_ID), eq(MockHelper.TEST_CONTENT_TYPE), sseTaskStateContainerCaptor.capture(), eq(false), any(StreamLimit.class))).thenReturn(sseTaskSpy); + when(sseTaskFactoryMock.createNew(eq(MockHelper.TEST_ENVIRONMENT), eq(MockHelper.TEST_SUBSCRIPTION_ID), eq(MockHelper.TEST_CONTENT_TYPE), sseTaskStateContainerCaptor.capture(), eq(false), any(), any(StreamLimit.class))).thenReturn(sseTaskSpy); // The mocked task should trigger the termination condition of SseTaskStateContainer.setReady(long timeout) immediately // otherwise startEmittingEvents() would run until the timeout is reached, since setReady() is not called asynchronously @@ -147,7 +147,7 @@ void testStartEmittingEvents() { }).start(); // PUBLIC METHOD WE WANT TO TEST - var responseContainer = sseService.startEmittingEvents(MockHelper.TEST_ENVIRONMENT, MockHelper.TEST_SUBSCRIPTION_ID, MockHelper.TEST_CONTENT_TYPE, false, new StreamLimit()); + var responseContainer = sseService.startEmittingEvents(MockHelper.TEST_ENVIRONMENT, MockHelper.TEST_SUBSCRIPTION_ID, MockHelper.TEST_CONTENT_TYPE, false, null, new StreamLimit()); latch.countDown(); diff --git a/src/test/java/de/telekom/horizon/pulsar/service/SseTaskFactoryTest.java b/src/test/java/de/telekom/horizon/pulsar/service/SseTaskFactoryTest.java index 2b63732..0a78437 100644 --- a/src/test/java/de/telekom/horizon/pulsar/service/SseTaskFactoryTest.java +++ b/src/test/java/de/telekom/horizon/pulsar/service/SseTaskFactoryTest.java @@ -63,7 +63,7 @@ void testCreateNew () throws JsonCacheException { when(MockHelper.connectionGaugeCache.getOrCreateGaugeForSubscription(MockHelper.TEST_ENVIRONMENT, MockHelper.TEST_SUBSCRIPTION_ID)).thenReturn(new AtomicInteger(1)); // PUBLIC METHOD WE WANT TO TEST - var task = sseTaskFactorySpy.createNew(MockHelper.TEST_ENVIRONMENT, MockHelper.TEST_SUBSCRIPTION_ID, MockHelper.TEST_CONTENT_TYPE, sseTaskStateContainer, false, new StreamLimit()); + var task = sseTaskFactorySpy.createNew(MockHelper.TEST_ENVIRONMENT, MockHelper.TEST_SUBSCRIPTION_ID, MockHelper.TEST_CONTENT_TYPE, sseTaskStateContainer, false, null, new StreamLimit()); assertNotNull(task); // Let's verify that connectionGaugeCache.getOrCreateGaugeForSubscription(String environment, String subscriptionId) is called diff --git a/src/test/java/de/telekom/horizon/pulsar/service/SseTaskTest.java b/src/test/java/de/telekom/horizon/pulsar/service/SseTaskTest.java index fc8a327..fd9ad37 100644 --- a/src/test/java/de/telekom/horizon/pulsar/service/SseTaskTest.java +++ b/src/test/java/de/telekom/horizon/pulsar/service/SseTaskTest.java @@ -25,6 +25,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; @@ -36,6 +38,7 @@ import java.io.IOException; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.HashMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; @@ -66,8 +69,9 @@ void setupSseServiceTest() { sseTaskSpy = spy(sseTask); } - @Test - void testRun() throws IOException, InterruptedException { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testRun(boolean ignoreDeduplication) throws IOException { final var o = new ObjectMapper(); // For this test we set the timeout to 10s to simulate the automatic timeout if there are no new events flowing final var timeout = 10000L; @@ -75,8 +79,15 @@ void testRun() throws IOException, InterruptedException { // We create a new SubscriptionEventMessage queue and add some test messages var itemQueue = new ConcurrentLinkedQueue(); - itemQueue.add(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT, false)); - itemQueue.add(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT, false)); + itemQueue.add(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT)); + var item = MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT); + itemQueue.add(item); + + // add duplicates + var duplicatesNumber = 2; + for (var i = 0; i < duplicatesNumber; i++) { + itemQueue.add(item); + } final var itemQueueInitialSize = itemQueue.size(); @@ -99,7 +110,7 @@ void testRun() throws IOException, InterruptedException { // We mock the EventMessageSupplier since it's tested in a separate test when(eventMessageSupplierMock.get()).thenAnswer(i -> { await().pollDelay(10, TimeUnit.MILLISECONDS).untilTrue(new AtomicBoolean(true)); - return new EventMessageContext(itemQueue.poll(), false, new StreamLimit(), Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class)); + return new EventMessageContext(itemQueue.poll(), false, new StreamLimit(), ignoreDeduplication, Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class)); }); when(eventMessageSupplierMock.getSubscriptionId()).thenReturn(MockHelper.TEST_SUBSCRIPTION_ID); // The following checks that we track a DELIVERED event with the de-duplication service @@ -117,6 +128,19 @@ void testRun() throws IOException, InterruptedException { when(metricsHelper.buildTagsFromSubscriptionEventMessage(any())).thenReturn(Tags.empty()); when(metricsHelper.getRegistry()).thenReturn(registryMock); + var mockedDeduplicationCache = new HashMap(); + when(MockHelper.deDuplicationService.track(any(SubscriptionEventMessage.class))).thenAnswer(invocation -> { + SubscriptionEventMessage msg = invocation.getArgument(0); + return mockedDeduplicationCache.put(msg.getUuid(), msg.getUuid()); + }); + + if (!ignoreDeduplication) { + when(MockHelper.deDuplicationService.get(any(SubscriptionEventMessage.class))).thenAnswer(invocation -> { + SubscriptionEventMessage msg = invocation.getArgument(0); + return mockedDeduplicationCache.get(msg.getUuid()); + }); + } + // PUBLIC METHOD WE WANT TO TEST sseTaskSpy.run(); @@ -133,17 +157,28 @@ void testRun() throws IOException, InterruptedException { verify(emitterMock, never()).completeWithError(any(ConnectionCutOutException.class)); // Verify that ResponseBodyEmitter emits data ArgumentCaptor jsonStringCaptor = ArgumentCaptor.forClass(String.class); - verify(emitterMock, times(itemQueueInitialSize)).send(jsonStringCaptor.capture()); + // Verify that for each emitted event a DELIVERED status will be written + ArgumentCaptor statusMessageCaptor = ArgumentCaptor.forClass(StatusMessage.class); + + if (ignoreDeduplication) { + verify(emitterMock, times(itemQueueInitialSize)).send(jsonStringCaptor.capture()); + verify(eventWriterMock, times(itemQueueInitialSize)).send(anyString(), statusMessageCaptor.capture(), any(HorizonTracer.class)); + // Verify that de-duplication logic is bypassed + verify(MockHelper.deDuplicationService, times(itemQueueInitialSize)).track(any()); + } else { + verify(emitterMock, times(itemQueueInitialSize - duplicatesNumber)).send(jsonStringCaptor.capture()); + verify(eventWriterMock, times(itemQueueInitialSize - duplicatesNumber)).send(anyString(), statusMessageCaptor.capture(), any(HorizonTracer.class)); + // Verify that for each emitted event a de-duplication entry will be written + verify(MockHelper.deDuplicationService, times(itemQueueInitialSize - duplicatesNumber)).track(any()); + } + assertNotNull(jsonStringCaptor.getValue()); var event = o.readValue(jsonStringCaptor.getValue(), Event.class); assertNotNull(event); - // Verify that for each emitted event a DELIVERED status will be written - ArgumentCaptor statusMessageCaptor = ArgumentCaptor.forClass(StatusMessage.class); - verify(eventWriterMock, times(itemQueueInitialSize)).send(anyString(), statusMessageCaptor.capture(), any(HorizonTracer.class)); + assertNotNull(statusMessageCaptor.getValue()); assertEquals(Status.DELIVERED, statusMessageCaptor.getValue().getStatus()); - // Verify that for each emitted event a de-duplication entry will be written - verify(MockHelper.deDuplicationService, times(itemQueueInitialSize)).track(any()); + // Verify that ResponseBodyEmitter finishes with a ConnectionTimeoutException verify(emitterMock, times(1)).completeWithError(any(ConnectionTimeoutException.class)); // Verify that emitter has completed @@ -191,7 +226,7 @@ void testTerminateConnection() throws InterruptedException, IOException { // an endless stream when(eventMessageSupplierMock.get()).thenAnswer(i -> { await().pollDelay(10, TimeUnit.MILLISECONDS).untilTrue(new AtomicBoolean(true)); - return new EventMessageContext(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT, false), false, streamLimit, Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class)); + return new EventMessageContext(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT), false, streamLimit, false, Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class)); }); when(eventMessageSupplierMock.getSubscriptionId()).thenReturn(MockHelper.TEST_SUBSCRIPTION_ID); // The following checks that we track a DELIVERED event with the de-duplication service @@ -249,7 +284,7 @@ void testTerminateConnectionThroughMaxNumberStreamLimit() throws IOException { // an endless stream when(eventMessageSupplierMock.get()).thenAnswer(i -> { await().pollDelay(10, TimeUnit.MILLISECONDS).untilTrue(new AtomicBoolean(true)); - return new EventMessageContext(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT, false), false, streamLimit, Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class)); + return new EventMessageContext(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT), false, streamLimit, false, Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class)); }); when(eventMessageSupplierMock.getSubscriptionId()).thenReturn(MockHelper.TEST_SUBSCRIPTION_ID); // The following checks that we track a DELIVERED event with the de-duplication service @@ -304,7 +339,7 @@ void testTerminateConnectionThroughMaxByteStreamLimit() throws IOException { // an endless stream when(eventMessageSupplierMock.get()).thenAnswer(i -> { await().pollDelay(10, TimeUnit.MILLISECONDS).untilTrue(new AtomicBoolean(true)); - return new EventMessageContext(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT, false), false, streamLimit, Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class)); + return new EventMessageContext(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT), false, streamLimit, false, Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class)); }); when(eventMessageSupplierMock.getSubscriptionId()).thenReturn(MockHelper.TEST_SUBSCRIPTION_ID); // The following checks that we track a DELIVERED event with the de-duplication service @@ -366,7 +401,7 @@ void testTerminateConnectionThroughMaxMinutesStreamLimit() throws IOException { when(eventMessageSupplierMock.get()).thenAnswer(i -> { await().pollDelay(10, TimeUnit.MILLISECONDS).untilTrue(new AtomicBoolean(true)); - return new EventMessageContext(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT, false), false, streamLimit, Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class)); + return new EventMessageContext(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT), false, streamLimit, false, Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class)); }); when(eventMessageSupplierMock.getSubscriptionId()).thenReturn(MockHelper.TEST_SUBSCRIPTION_ID); // The following checks that we track a DELIVERED event with the de-duplication service @@ -397,8 +432,8 @@ void testTerminateConnectionThroughMaxMinutesStreamLimit() throws IOException { void testSendEventFailed() throws IOException { // We create a new SubscriptionEventMessage queue and add some test messages var itemQueue = new ConcurrentLinkedQueue(); - itemQueue.add(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT, false)); - itemQueue.add(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT, false)); + itemQueue.add(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT)); + itemQueue.add(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT)); final var itemQueueInitialSize = itemQueue.size(); @@ -425,7 +460,7 @@ void testSendEventFailed() throws IOException { // We mock the EventMessageSupplier since it's tested in a separate test when(eventMessageSupplierMock.get()).thenAnswer(i -> { await().pollDelay(10, TimeUnit.MILLISECONDS).untilTrue(new AtomicBoolean(true)); - return new EventMessageContext(itemQueue.poll(), false, new StreamLimit(), Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class)); + return new EventMessageContext(itemQueue.poll(), false, new StreamLimit(), false, Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class)); }); when(eventMessageSupplierMock.getSubscriptionId()).thenReturn(MockHelper.TEST_SUBSCRIPTION_ID); // The following checks that we track a DELIVERED event with the de-duplication service diff --git a/src/test/java/de/telekom/horizon/pulsar/testutils/MockHelper.java b/src/test/java/de/telekom/horizon/pulsar/testutils/MockHelper.java index adf3c86..776ae11 100644 --- a/src/test/java/de/telekom/horizon/pulsar/testutils/MockHelper.java +++ b/src/test/java/de/telekom/horizon/pulsar/testutils/MockHelper.java @@ -37,10 +37,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; @@ -64,12 +61,7 @@ public class MockHelper { public static ResponseBodyEmitter emitter; public static TokenService tokenService; public static HorizonMetricsHelper metricsHelper; - public static DeDuplicationService deDuplicationService; - - - - public static String TEST_EVENT_ID = "abc123-def456-ghi789"; public static String TEST_ENVIRONMENT = "bond"; public static String TEST_SUBSCRIPTION_ID = "1-2-3"; public static String TEST_SUBSCRIBER_ID = "eni-pan-dora"; @@ -105,14 +97,14 @@ public static void init() { sseTaskFactory = new SseTaskFactory(pulsarConfig, connectionCache, connectionGaugeCache, eventWriter, kafkaPicker, messageStateMongoRepo, deDuplicationService, metricsHelper, tracingHelper); } - public static SubscriptionEventMessage createSubscriptionEventMessageForTesting(DeliveryType deliveryType, boolean withAdditionalFields) { + public static SubscriptionEventMessage createSubscriptionEventMessageForTesting(DeliveryType deliveryType) { var subscriptionEventMessageForTesting = new SubscriptionEventMessage(); var event = new Event(); event.setId(RandomStringUtils.random(12, true, true)); event.setData(Map.of("message", "foobar")); - subscriptionEventMessageForTesting.setUuid(TEST_EVENT_ID); + subscriptionEventMessageForTesting.setUuid(UUID.randomUUID().toString()); subscriptionEventMessageForTesting.setEvent(event); subscriptionEventMessageForTesting.setEnvironment(TEST_ENVIRONMENT); subscriptionEventMessageForTesting.setSubscriptionId(TEST_SUBSCRIPTION_ID); @@ -173,6 +165,11 @@ private static State createStateForTesting(String environment, Status status, bo public static List createStatesForTesting (int count, String environment, Status status, boolean randomSubscriptionId) { var states = new ArrayList(); for (int i = 0; i < count; i++) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } states.add(createStateForTesting(environment, status, randomSubscriptionId)); } return states;