diff --git a/src/main/java/de/telekom/horizon/pulsar/api/RestResponseEntityExceptionHandler.java b/src/main/java/de/telekom/horizon/pulsar/api/RestResponseEntityExceptionHandler.java index da02593..06254dd 100644 --- a/src/main/java/de/telekom/horizon/pulsar/api/RestResponseEntityExceptionHandler.java +++ b/src/main/java/de/telekom/horizon/pulsar/api/RestResponseEntityExceptionHandler.java @@ -5,10 +5,7 @@ package de.telekom.horizon.pulsar.api; import de.telekom.eni.pandora.horizon.model.common.ProblemMessage; -import de.telekom.horizon.pulsar.exception.ConnectionCutOutException; -import de.telekom.horizon.pulsar.exception.ConnectionTimeoutException; -import de.telekom.horizon.pulsar.exception.QueueWaitTimeoutException; -import de.telekom.horizon.pulsar.exception.SubscriberDoesNotMatchSubscriptionException; +import de.telekom.horizon.pulsar.exception.*; import lombok.extern.slf4j.Slf4j; import org.apache.catalina.connector.ClientAbortException; import org.springframework.beans.factory.annotation.Autowired; @@ -64,6 +61,7 @@ public RestResponseEntityExceptionHandler(ApplicationContext applicationContext) */ @ExceptionHandler(value = { ConnectionCutOutException.class, + StreamLimitExceededException.class }) @ResponseStatus(HttpStatus.OK) protected ResponseEntity handleCutOut(Exception e, WebRequest request) { diff --git a/src/main/java/de/telekom/horizon/pulsar/api/SseController.java b/src/main/java/de/telekom/horizon/pulsar/api/SseController.java index fb85e3f..28aa880 100644 --- a/src/main/java/de/telekom/horizon/pulsar/api/SseController.java +++ b/src/main/java/de/telekom/horizon/pulsar/api/SseController.java @@ -5,6 +5,7 @@ package de.telekom.horizon.pulsar.api; import de.telekom.horizon.pulsar.exception.SubscriberDoesNotMatchSubscriptionException; +import de.telekom.horizon.pulsar.helper.StreamLimit; import de.telekom.horizon.pulsar.service.SseService; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpHeaders; @@ -56,10 +57,13 @@ public ResponseEntity headRequest(@PathVariable String environment) { /** * Retrieves SSE stream for the specified subscriptionId. * - * @param environment The environment path variable. - * @param subscriptionId The subscriptionId path variable. + * @param environment The environment path variable. + * @param subscriptionId The subscriptionId path variable. * @param includeHttpHeaders Whether to include HTTP headers in the response. - * @param accept The value of the "Accept" header in the request. + * @param maxNumber Whether to terminate after a certain number of consumed events. + * @param maxMinutes Whether to terminate after a certain time (in minutes). + * @param maxBytes Whether to terminate after a certain number of bytes consumed. + * @param accept The value of the "Accept" header in the request. * @return A response containing a {@code ResponseBodyEmitter} for SSE streaming. * @throws SubscriberDoesNotMatchSubscriptionException If the subscriber does not match the specified subscription. */ @@ -67,6 +71,9 @@ public ResponseEntity headRequest(@PathVariable String environment) { public ResponseEntity getSseStream(@PathVariable String environment, @PathVariable String subscriptionId, @RequestParam(defaultValue = "false") boolean includeHttpHeaders, + @RequestParam(defaultValue = "0") int maxNumber, + @RequestParam(defaultValue = "0") int maxMinutes, + @RequestParam(defaultValue = "0") int maxBytes, @RequestHeader(value = HttpHeaders.ACCEPT, required = false) String accept) throws SubscriberDoesNotMatchSubscriptionException { sseService.validateSubscriberIdForSubscription(environment, subscriptionId); @@ -76,7 +83,7 @@ public ResponseEntity getSseStream(@PathVariable String env accept = APPLICATION_STREAM_JSON_VALUE; } - var responseContainer = sseService.startEmittingEvents(environment, subscriptionId, accept, includeHttpHeaders); + var responseContainer = sseService.startEmittingEvents(environment, subscriptionId, accept, includeHttpHeaders, StreamLimit.of(maxNumber, maxMinutes, maxBytes)); var responseHeaders = new HttpHeaders(); responseHeaders.add(HttpHeaders.CONTENT_TYPE, accept); @@ -86,4 +93,20 @@ public ResponseEntity getSseStream(@PathVariable String env return new ResponseEntity<>(responseContainer.getEmitter(), responseHeaders, HttpStatus.OK); } + + /** + * Stops an active SSE stream for the specified subscriptionId. + * + * @param environment The environment path variable. + * @param subscriptionId The subscriptionId path variable. + * @throws SubscriberDoesNotMatchSubscriptionException If the subscriber does not match the specified subscription. + */ + @PostMapping(value = "/sse/{subscriptionId}/terminate", produces = {MediaType.ALL_VALUE, APPLICATION_STREAM_JSON_VALUE, MediaType.TEXT_EVENT_STREAM_VALUE}) + public ResponseEntity terminateSseStream(@PathVariable String environment, @PathVariable String subscriptionId) throws SubscriberDoesNotMatchSubscriptionException { + + sseService.validateSubscriberIdForSubscription(environment, subscriptionId); + sseService.stopEmittingEvents(subscriptionId); + + return ResponseEntity.status(HttpStatus.OK).build(); + } } diff --git a/src/main/java/de/telekom/horizon/pulsar/exception/StreamLimitExceededException.java b/src/main/java/de/telekom/horizon/pulsar/exception/StreamLimitExceededException.java new file mode 100644 index 0000000..2f819e4 --- /dev/null +++ b/src/main/java/de/telekom/horizon/pulsar/exception/StreamLimitExceededException.java @@ -0,0 +1,11 @@ +// Copyright 2024 Deutsche Telekom IT GmbH +// +// SPDX-License-Identifier: Apache-2.0 + +package de.telekom.horizon.pulsar.exception; + +public class StreamLimitExceededException extends HorizonPulsarException { + public StreamLimitExceededException() { + super(); + } +} diff --git a/src/main/java/de/telekom/horizon/pulsar/helper/EventMessageContext.java b/src/main/java/de/telekom/horizon/pulsar/helper/EventMessageContext.java index c482fe9..814de75 100644 --- a/src/main/java/de/telekom/horizon/pulsar/helper/EventMessageContext.java +++ b/src/main/java/de/telekom/horizon/pulsar/helper/EventMessageContext.java @@ -17,7 +17,7 @@ * Context class representing the context of an event message in a subscription. * * This class encapsulates information related to a subscription event message, including - * whether to include HTTP headers, and optional tracing components such as a span and span in scope. + * whether to include HTTP headers or stream limits, and optional tracing components such as a span and span in scope. * It provides a method to finish the span and span in scope if they are present. */ @@ -28,6 +28,9 @@ public class EventMessageContext { private SubscriptionEventMessage subscriptionEventMessage; @Getter private Boolean includeHttpHeaders; + @Getter + private StreamLimit streamLimit; + private Span span; private Tracer.SpanInScope spanInScope; public void finishSpan() { diff --git a/src/main/java/de/telekom/horizon/pulsar/helper/StreamLimit.java b/src/main/java/de/telekom/horizon/pulsar/helper/StreamLimit.java new file mode 100644 index 0000000..dc291b9 --- /dev/null +++ b/src/main/java/de/telekom/horizon/pulsar/helper/StreamLimit.java @@ -0,0 +1,33 @@ +// Copyright 2024 Deutsche Telekom IT GmbH +// +// SPDX-License-Identifier: Apache-2.0 + +package de.telekom.horizon.pulsar.helper; + + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +/** + * Class that contains any streaming limits provided by the customer. + * + * This class encapsulates all possible streaming limits that have been provided by the customer when + * requesting a new stream. The streaming limits will ensure that a active stream terminates early on when exceeded. + * Currently, a customer can specify that the stream should terminate when a specific number of events have been consumed + * or after a certain time (in minutes) or after exceeding a certain number of bytes which have been consumed. + */ +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +public class StreamLimit { + private int maxNumber; + private int maxMinutes; + private long maxBytes; + + public static StreamLimit of(int maxNumber, int maxMinutes, int maxBytes) { + return new StreamLimit(maxNumber, maxMinutes, maxBytes); + } +} diff --git a/src/main/java/de/telekom/horizon/pulsar/service/EventMessageSupplier.java b/src/main/java/de/telekom/horizon/pulsar/service/EventMessageSupplier.java index f342def..d3df8da 100644 --- a/src/main/java/de/telekom/horizon/pulsar/service/EventMessageSupplier.java +++ b/src/main/java/de/telekom/horizon/pulsar/service/EventMessageSupplier.java @@ -21,6 +21,7 @@ import de.telekom.horizon.pulsar.exception.CouldNotPickMessageException; import de.telekom.horizon.pulsar.exception.SubscriberDoesNotMatchSubscriptionException; import de.telekom.horizon.pulsar.helper.EventMessageContext; +import de.telekom.horizon.pulsar.helper.StreamLimit; import de.telekom.horizon.pulsar.utils.KafkaPicker; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -54,6 +55,7 @@ public class EventMessageSupplier implements Supplier { @Getter private final String subscriptionId; private final Boolean includeHttpHeaders; + private final StreamLimit streamLimit; private final KafkaPicker kafkaPicker; private final EventWriter eventWriter; private final MessageStateMongoRepo messageStateMongoRepo; @@ -69,7 +71,7 @@ public class EventMessageSupplier implements Supplier { * @param factory The {@link SseTaskFactory} used for obtaining related components. * @param includeHttpHeaders Boolean flag indicating whether to include HTTP headers in the generated {@code EventMessageContext}. */ - public EventMessageSupplier(String subscriptionId, SseTaskFactory factory, boolean includeHttpHeaders) { + public EventMessageSupplier(String subscriptionId, SseTaskFactory factory, boolean includeHttpHeaders, StreamLimit streamLimit) { this.subscriptionId = subscriptionId; this.pulsarConfig = factory.getPulsarConfig(); @@ -78,6 +80,7 @@ public EventMessageSupplier(String subscriptionId, SseTaskFactory factory, boole this.eventWriter = factory.getEventWriter(); this.tracingHelper = factory.getTracingHelper(); this.includeHttpHeaders = includeHttpHeaders; + this.streamLimit = streamLimit; } /** @@ -119,10 +122,10 @@ public EventMessageContext get() { } } - return new EventMessageContext(message, includeHttpHeaders, span, spanInScope); + return new EventMessageContext(message, includeHttpHeaders, streamLimit, span, spanInScope); } catch (CouldNotPickMessageException | SubscriberDoesNotMatchSubscriptionException e) { handleException(state, e); - return new EventMessageContext(null, includeHttpHeaders, span, spanInScope); + return new EventMessageContext(null, includeHttpHeaders, streamLimit, span, spanInScope); } finally { pickSpan.finish(); } diff --git a/src/main/java/de/telekom/horizon/pulsar/service/SseService.java b/src/main/java/de/telekom/horizon/pulsar/service/SseService.java index 35fb4bd..8234fca 100644 --- a/src/main/java/de/telekom/horizon/pulsar/service/SseService.java +++ b/src/main/java/de/telekom/horizon/pulsar/service/SseService.java @@ -4,11 +4,11 @@ package de.telekom.horizon.pulsar.service; -import de.telekom.eni.pandora.horizon.cache.service.DeDuplicationService; import de.telekom.horizon.pulsar.cache.SubscriberCache; import de.telekom.horizon.pulsar.config.PulsarConfig; import de.telekom.horizon.pulsar.exception.SubscriberDoesNotMatchSubscriptionException; import de.telekom.horizon.pulsar.helper.SseTaskStateContainer; +import de.telekom.horizon.pulsar.helper.StreamLimit; import lombok.extern.slf4j.Slf4j; import org.apache.logging.log4j.util.Strings; import org.springframework.beans.factory.annotation.Autowired; @@ -42,14 +42,12 @@ public class SseService { * @param sseTaskFactory The {@link SseTaskFactory} for creating Server-Sent Event tasks. * @param subscriberCache The {@link SubscriberCache} for caching subscriber information. * @param pulsarConfig The {@link PulsarConfig} for Pulsar-related configuration. - * @param deDuplicationService The {@link DeDuplicationService} for handling message deduplication. */ @Autowired public SseService(TokenService tokenService, SseTaskFactory sseTaskFactory, SubscriberCache subscriberCache, - PulsarConfig pulsarConfig, - DeDuplicationService deDuplicationService) { + PulsarConfig pulsarConfig) { this.tokenService = tokenService; this.sseTaskFactory = sseTaskFactory; @@ -98,14 +96,22 @@ public void validateSubscriberIdForSubscription(String environment, String subsc * @param includeHttpHeaders A boolean flag indicating whether to include HTTP headers in the emitted events. * @return The {@link SseTaskStateContainer} representing the state of the emitted events. */ - public SseTaskStateContainer startEmittingEvents(String environment, String subscriptionId, String contentType, boolean includeHttpHeaders) { + public SseTaskStateContainer startEmittingEvents(String environment, String subscriptionId, String contentType, boolean includeHttpHeaders, StreamLimit streamLimit) { var responseContainer = new SseTaskStateContainer(); - taskExecutor.submit(sseTaskFactory.createNew(environment, subscriptionId, contentType, responseContainer, includeHttpHeaders)); + taskExecutor.submit(sseTaskFactory.createNew(environment, subscriptionId, contentType, responseContainer, includeHttpHeaders, streamLimit)); responseContainer.setReady(pulsarConfig.getSseTimeout()); return responseContainer; } + /** + * Stops emitting events for an existing active stream. + * + * @param subscriptionId The ID of the subscription for which events are being emitted. + */ + public void stopEmittingEvents(String subscriptionId) { + sseTaskFactory.getConnectionCache().removeConnectionForSubscription(subscriptionId); + } } diff --git a/src/main/java/de/telekom/horizon/pulsar/service/SseTask.java b/src/main/java/de/telekom/horizon/pulsar/service/SseTask.java index 8849514..6d18bdc 100644 --- a/src/main/java/de/telekom/horizon/pulsar/service/SseTask.java +++ b/src/main/java/de/telekom/horizon/pulsar/service/SseTask.java @@ -16,9 +16,11 @@ import de.telekom.horizon.pulsar.config.PulsarConfig; import de.telekom.horizon.pulsar.exception.ConnectionCutOutException; import de.telekom.horizon.pulsar.exception.ConnectionTimeoutException; +import de.telekom.horizon.pulsar.exception.StreamLimitExceededException; import de.telekom.horizon.pulsar.helper.EventMessageContext; import de.telekom.horizon.pulsar.helper.SseResponseWrapper; import de.telekom.horizon.pulsar.helper.SseTaskStateContainer; +import de.telekom.horizon.pulsar.helper.StreamLimit; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -28,11 +30,14 @@ import org.springframework.lang.Nullable; import org.springframework.web.server.UnsupportedMediaTypeStatusException; +import java.nio.charset.StandardCharsets; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; /** @@ -47,6 +52,9 @@ public class SseTask implements Runnable { private static final String TIMEOUT_MESSAGE = "No events received for more than %s ms"; private final SseTaskStateContainer sseTaskStateContainer; + + private final EventMessageSupplier eventMessageSupplier; + @Getter private final AtomicInteger openConnectionGaugeValue; private final PulsarConfig pulsarConfig; @@ -56,8 +64,14 @@ public class SseTask implements Runnable { @Setter private String contentType = APPLICATION_STREAM_JSON_VALUE; - private final Stream stream; + + @Getter + private Instant startTime; + @Getter + private Instant stopTime; private Instant lastEventMessage = Instant.now(); + private AtomicLong bytesConsumed = new AtomicLong(0); + private AtomicLong numberConsumed = new AtomicLong(0); private final AtomicBoolean isCutOut = new AtomicBoolean(false); private final AtomicBoolean isEmitterCompleted = new AtomicBoolean(false); @@ -78,14 +92,13 @@ public SseTask(SseTaskStateContainer sseTaskStateContainer, SseTaskFactory factory) { this.sseTaskStateContainer = sseTaskStateContainer; + this.eventMessageSupplier = eventMessageSupplier; this.openConnectionGaugeValue = openConnectionGaugeValue; this.pulsarConfig = factory.getPulsarConfig(); this.eventWriter = factory.getEventWriter(); this.deDuplicationService = factory.getDeDuplicationService(); this.tracingHelper = factory.getTracingHelper(); - - this.stream = createStreamTopology(eventMessageSupplier); } /** @@ -116,6 +129,8 @@ private Stream createStreamTopology(EventMessageSupplier ev */ @Override public void run() { + var stream = createStreamTopology(eventMessageSupplier); + if (sseTaskStateContainer.getCanceled().get()) { return; } @@ -124,6 +139,8 @@ public void run() { sseTaskStateContainer.getEmitter().onCompletion(() -> isEmitterCompleted.compareAndExchange(false, true)); sseTaskStateContainer.getEmitter().onError(e -> isEmitterCompleted.compareAndExchange(false, true)); + startTime = Instant.now(); + // Mark the task as running and increment the open connection gauge value. sseTaskStateContainer.getRunning().compareAndExchange(false, true); openConnectionGaugeValue.getAndSet(1); @@ -136,6 +153,7 @@ public void run() { sseTaskStateContainer.getEmitter().completeWithError(e); } finally { openConnectionGaugeValue.getAndSet(0); + stopTime = Instant.now(); } } @@ -172,6 +190,19 @@ private boolean applyStreamEndFilter(EventMessageContext context) { return false; } + final StreamLimit streamLimit = context.getStreamLimit(); + if (streamLimit != null) { + final boolean maxNumberExceeded = streamLimit.getMaxNumber() > 0 && numberConsumed.get() >= streamLimit.getMaxNumber(); + final boolean maxMinutesExceeded = streamLimit.getMaxMinutes() > 0 && ChronoUnit.MINUTES.between(startTime, Instant.now()) >= streamLimit.getMaxMinutes(); + final boolean maxBytesExceeded = streamLimit.getMaxBytes() > 0 && bytesConsumed.get() >= streamLimit.getMaxBytes(); + + if (maxNumberExceeded || maxMinutesExceeded || maxBytesExceeded) { + sseTaskStateContainer.getEmitter().completeWithError(new StreamLimitExceededException()); + context.finishSpan(); + return false; + } + } + return true; } @@ -258,6 +289,7 @@ private void emitEventMessage(EventMessageContext context) { try { sendEvent(msg, context.getIncludeHttpHeaders()); } finally { + context.finishSpan(); } } @@ -277,9 +309,13 @@ private void sendEvent(SubscriptionEventMessage msg, boolean includeHttpHeaders) try { var eventJson = serializeEvent(new SseResponseWrapper(msg, includeHttpHeaders)); + sseTaskStateContainer.getEmitter().send(eventJson); pushMetadata(msg, Status.DELIVERED, null); + + bytesConsumed.addAndGet(eventJson.getBytes(StandardCharsets.UTF_8).length); + numberConsumed.incrementAndGet(); } catch (JsonProcessingException e) { var err = String.format("Error occurred while emitting the event: %s", e.getMessage()); log.info(err, e); diff --git a/src/main/java/de/telekom/horizon/pulsar/service/SseTaskFactory.java b/src/main/java/de/telekom/horizon/pulsar/service/SseTaskFactory.java index 14a2772..83253c2 100644 --- a/src/main/java/de/telekom/horizon/pulsar/service/SseTaskFactory.java +++ b/src/main/java/de/telekom/horizon/pulsar/service/SseTaskFactory.java @@ -12,6 +12,7 @@ import de.telekom.horizon.pulsar.cache.ConnectionGaugeCache; import de.telekom.horizon.pulsar.config.PulsarConfig; import de.telekom.horizon.pulsar.helper.SseTaskStateContainer; +import de.telekom.horizon.pulsar.helper.StreamLimit; import de.telekom.horizon.pulsar.utils.KafkaPicker; import lombok.Getter; import org.springframework.stereotype.Component; @@ -75,12 +76,13 @@ public SseTaskFactory( * @param environment The environment associated with the subscription. * @param subscriptionId The ID of the subscription for which the task is created. * @param contentType The content type for the SSE task. - * @param sseTaskStateContainer The {@link SseTaskStateContainer} representing the state of the SSE task. + * @param sseTaskStateContainer The {@link SseTaskStateContainer} represents the state of the SSE task. * @param includeHttpHeaders A boolean flag indicating whether to include HTTP headers in the SSE task. + * @param streamLimit The {@link StreamLimit} represents any customer specific conditions for terminating the stream early. * @return The newly created {@link SseTask}. */ - public SseTask createNew(String environment, String subscriptionId, String contentType, SseTaskStateContainer sseTaskStateContainer, boolean includeHttpHeaders) { - var eventMessageSupplier = new EventMessageSupplier(subscriptionId, this, includeHttpHeaders); + public SseTask createNew(String environment, String subscriptionId, String contentType, SseTaskStateContainer sseTaskStateContainer, boolean includeHttpHeaders, StreamLimit streamLimit) { + var eventMessageSupplier = new EventMessageSupplier(subscriptionId, this, includeHttpHeaders, streamLimit); var connection = connectionGaugeCache.getOrCreateGaugeForSubscription(environment, subscriptionId); var task = new SseTask(sseTaskStateContainer, eventMessageSupplier, connection, this); diff --git a/src/test/java/de/telekom/horizon/pulsar/api/SseControllerSecurityEnabledTest.java b/src/test/java/de/telekom/horizon/pulsar/api/SseControllerSecurityEnabledTest.java index 7a74f6d..93ee900 100644 --- a/src/test/java/de/telekom/horizon/pulsar/api/SseControllerSecurityEnabledTest.java +++ b/src/test/java/de/telekom/horizon/pulsar/api/SseControllerSecurityEnabledTest.java @@ -6,6 +6,7 @@ import de.telekom.horizon.pulsar.cache.SubscriberCache; import de.telekom.horizon.pulsar.exception.SubscriberDoesNotMatchSubscriptionException; +import de.telekom.horizon.pulsar.helper.StreamLimit; import de.telekom.horizon.pulsar.service.SseService; import de.telekom.horizon.pulsar.utils.AbstractIntegrationTest; import de.telekom.horizon.pulsar.utils.MongoTestServerConfiguration; @@ -87,7 +88,7 @@ void getEventsViaSSESuccessfullyWithOk() throws Exception { .andExpect(status().isOk()); verify(sseService, times(1)).validateSubscriberIdForSubscription(eq(env), eq(subscriptionId)); - verify(sseService, times(1)).startEmittingEvents(eq(env), eq(subscriptionId), any(String.class), eq(true)); + verify(sseService, times(1)).startEmittingEvents(eq(env), eq(subscriptionId), any(String.class), eq(true), any(StreamLimit.class)); } private Jwt getJwt(String subscriberId) { diff --git a/src/test/java/de/telekom/horizon/pulsar/service/EventMessageSupplierTest.java b/src/test/java/de/telekom/horizon/pulsar/service/EventMessageSupplierTest.java index d2c6a8b..fa7fbdc 100644 --- a/src/test/java/de/telekom/horizon/pulsar/service/EventMessageSupplierTest.java +++ b/src/test/java/de/telekom/horizon/pulsar/service/EventMessageSupplierTest.java @@ -10,6 +10,7 @@ import de.telekom.eni.pandora.horizon.model.event.DeliveryType; import de.telekom.eni.pandora.horizon.model.event.Status; import de.telekom.eni.pandora.horizon.model.event.StatusMessage; +import de.telekom.horizon.pulsar.helper.StreamLimit; import de.telekom.horizon.pulsar.testutils.MockHelper; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -43,7 +44,7 @@ class EventMessageSupplierTest { void setupEventMessageSupplierTest() { MockHelper.init(); - eventMessageSupplier = new EventMessageSupplier(MockHelper.TEST_SUBSCRIPTION_ID, MockHelper.sseTaskFactory, false); + eventMessageSupplier = new EventMessageSupplier(MockHelper.TEST_SUBSCRIPTION_ID, MockHelper.sseTaskFactory, false, new StreamLimit()); } @ParameterizedTest diff --git a/src/test/java/de/telekom/horizon/pulsar/service/SseServiceTest.java b/src/test/java/de/telekom/horizon/pulsar/service/SseServiceTest.java index 4f3978d..27d5fbe 100644 --- a/src/test/java/de/telekom/horizon/pulsar/service/SseServiceTest.java +++ b/src/test/java/de/telekom/horizon/pulsar/service/SseServiceTest.java @@ -4,8 +4,14 @@ package de.telekom.horizon.pulsar.service; +import com.hazelcast.cluster.Cluster; +import com.hazelcast.cluster.Member; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.topic.ITopic; +import de.telekom.horizon.pulsar.cache.ConnectionCache; import de.telekom.horizon.pulsar.exception.SubscriberDoesNotMatchSubscriptionException; import de.telekom.horizon.pulsar.helper.SseTaskStateContainer; +import de.telekom.horizon.pulsar.helper.StreamLimit; import de.telekom.horizon.pulsar.testutils.MockHelper; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -18,6 +24,8 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.test.util.ReflectionTestUtils; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -38,7 +46,7 @@ class SseServiceTest { void setupSseServiceTest() { MockHelper.init(); - sseService = spy(new SseService(MockHelper.tokenService, sseTaskFactoryMock, MockHelper.subscriberCache, MockHelper.pulsarConfig, MockHelper.deDuplicationService)); + sseService = spy(new SseService(MockHelper.tokenService, sseTaskFactoryMock, MockHelper.subscriberCache, MockHelper.pulsarConfig)); } @Test @@ -88,10 +96,33 @@ void testStartEmittingEvents() { verify(taskExecutorSpy).setQueueCapacity(MockHelper.pulsarConfig.getQueueCapacity()); verify(taskExecutorSpy).afterPropertiesSet(); - // We are mocking the actual task here - var sseTaskMock = Mockito.mock(SseTask.class); + var sseTask = new SseTask(Mockito.mock(SseTaskStateContainer.class), Mockito.mock(EventMessageSupplier.class), MockHelper.openConnectionGaugeValue, sseTaskFactoryMock); - when(sseTaskFactoryMock.createNew(eq(MockHelper.TEST_ENVIRONMENT), eq(MockHelper.TEST_SUBSCRIPTION_ID), eq(MockHelper.TEST_CONTENT_TYPE), sseTaskStateContainerCaptor.capture(), eq(false))).thenReturn(sseTaskMock); + // We are spying on the actual task here + var sseTaskSpy= Mockito.spy(sseTask); + + var hazelcastInstanceMock = Mockito.mock(HazelcastInstance.class); + + var cacheClusterMock = Mockito.mock(Cluster.class); + var cacheMemberMock = Mockito.mock(Member.class); + + when(hazelcastInstanceMock.getCluster()).thenReturn(cacheClusterMock); + when(cacheClusterMock.getLocalMember()).thenReturn(cacheMemberMock); + when(cacheMemberMock.getUuid()).thenReturn(UUID.fromString("477bf3c9-ef1f-41de-9574-419a2ab61131")); + + var cacheWorkers = Mockito.mock(ITopic.class); + when(hazelcastInstanceMock.getTopic("workers")).thenReturn(cacheWorkers); + + var connectionCache = new ConnectionCache(hazelcastInstanceMock); + + var map = new ConcurrentHashMap<>(); + map.put(MockHelper.TEST_SUBSCRIPTION_ID, sseTaskSpy); + ReflectionTestUtils.setField(connectionCache, "map", map, ConcurrentHashMap.class); + + var connectionCacheSpy = Mockito.spy(connectionCache); + + when(sseTaskFactoryMock.getConnectionCache()).thenReturn(connectionCacheSpy); + when(sseTaskFactoryMock.createNew(eq(MockHelper.TEST_ENVIRONMENT), eq(MockHelper.TEST_SUBSCRIPTION_ID), eq(MockHelper.TEST_CONTENT_TYPE), sseTaskStateContainerCaptor.capture(), eq(false), any(StreamLimit.class))).thenReturn(sseTaskSpy); // The mocked task should trigger the termination condition of SseTaskStateContainer.setReady(long timeout) immediately // otherwise startEmittingEvents() would run until the timeout is reached, since setReady() is not called asynchronously @@ -116,13 +147,17 @@ void testStartEmittingEvents() { }).start(); // PUBLIC METHOD WE WANT TO TEST - var responseContainer = sseService.startEmittingEvents(MockHelper.TEST_ENVIRONMENT, MockHelper.TEST_SUBSCRIPTION_ID, MockHelper.TEST_CONTENT_TYPE, false); + var responseContainer = sseService.startEmittingEvents(MockHelper.TEST_ENVIRONMENT, MockHelper.TEST_SUBSCRIPTION_ID, MockHelper.TEST_CONTENT_TYPE, false, new StreamLimit()); latch.countDown(); assertNotNull(responseContainer); + verify(taskExecutorSpy).submit(sseTaskSpy); + + sseService.stopEmittingEvents(MockHelper.TEST_SUBSCRIPTION_ID); - verify(taskExecutorSpy).submit(sseTaskMock); + verify(sseTaskSpy).terminate(); + verify(connectionCacheSpy).removeConnectionForSubscription(MockHelper.TEST_SUBSCRIPTION_ID); } } diff --git a/src/test/java/de/telekom/horizon/pulsar/service/SseTaskFactoryTest.java b/src/test/java/de/telekom/horizon/pulsar/service/SseTaskFactoryTest.java index b2f87cf..ee21908 100644 --- a/src/test/java/de/telekom/horizon/pulsar/service/SseTaskFactoryTest.java +++ b/src/test/java/de/telekom/horizon/pulsar/service/SseTaskFactoryTest.java @@ -10,6 +10,7 @@ import de.telekom.eni.pandora.horizon.kubernetes.resource.SubscriptionResource; import de.telekom.eni.pandora.horizon.kubernetes.resource.SubscriptionResourceSpec; import de.telekom.horizon.pulsar.helper.SseTaskStateContainer; +import de.telekom.horizon.pulsar.helper.StreamLimit; import de.telekom.horizon.pulsar.testutils.MockHelper; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -62,7 +63,7 @@ void testCreateNew () throws JsonCacheException { when(MockHelper.connectionGaugeCache.getOrCreateGaugeForSubscription(MockHelper.TEST_ENVIRONMENT, MockHelper.TEST_SUBSCRIPTION_ID)).thenReturn(new AtomicInteger(1)); // PUBLIC METHOD WE WANT TO TEST - var task = sseTaskFactorySpy.createNew(MockHelper.TEST_ENVIRONMENT, MockHelper.TEST_SUBSCRIPTION_ID, MockHelper.TEST_CONTENT_TYPE, sseTaskStateContainer, false); + var task = sseTaskFactorySpy.createNew(MockHelper.TEST_ENVIRONMENT, MockHelper.TEST_SUBSCRIPTION_ID, MockHelper.TEST_CONTENT_TYPE, sseTaskStateContainer, false, new StreamLimit()); assertNotNull(task); // Let's verify that connectionGaugeCache.getOrCreateGaugeForSubscription(String environment, String subscriptionId) is called diff --git a/src/test/java/de/telekom/horizon/pulsar/service/SseTaskTest.java b/src/test/java/de/telekom/horizon/pulsar/service/SseTaskTest.java index 1a69dbb..539cbbd 100644 --- a/src/test/java/de/telekom/horizon/pulsar/service/SseTaskTest.java +++ b/src/test/java/de/telekom/horizon/pulsar/service/SseTaskTest.java @@ -13,8 +13,10 @@ import de.telekom.eni.pandora.horizon.tracing.HorizonTracer; import de.telekom.horizon.pulsar.exception.ConnectionCutOutException; import de.telekom.horizon.pulsar.exception.ConnectionTimeoutException; +import de.telekom.horizon.pulsar.exception.StreamLimitExceededException; import de.telekom.horizon.pulsar.helper.EventMessageContext; import de.telekom.horizon.pulsar.helper.SseTaskStateContainer; +import de.telekom.horizon.pulsar.helper.StreamLimit; import de.telekom.horizon.pulsar.testutils.MockHelper; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeEach; @@ -30,12 +32,12 @@ import java.io.IOException; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Stream; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.*; @@ -75,11 +77,6 @@ void testRun() throws IOException, InterruptedException { final var itemQueueInitialSize = itemQueue.size(); - // We check whether a stream has been created successfully - // Since it is not accessible, we use ReflectionTestUtils - var stream = (Stream) ReflectionTestUtils.getField(sseTaskSpy,"stream"); - assertNotNull(stream); - // We check whether an EventWriter has been created successfully // since it is not accessible, we use ReflectionTestUtils. // We then overwrite it with a mock, so that we can do checks on it @@ -97,7 +94,10 @@ void testRun() throws IOException, InterruptedException { var emitterMock = mock(ResponseBodyEmitter.class); when(sseTaskStateContainerMock.getEmitter()).thenReturn(emitterMock); // We mock the EventMessageSupplier since it's tested in a separate test - when(eventMessageSupplierMock.get()).thenAnswer(i -> new EventMessageContext(itemQueue.poll(), false, Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class))); + when(eventMessageSupplierMock.get()).thenAnswer(i -> { + await().pollDelay(10, TimeUnit.MILLISECONDS).untilTrue(new AtomicBoolean(true)); + return new EventMessageContext(itemQueue.poll(), false, new StreamLimit(), Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class)); + }); when(eventMessageSupplierMock.getSubscriptionId()).thenReturn(MockHelper.TEST_SUBSCRIPTION_ID); // The following checks that we track a DELIVERED event with the de-duplication service // which is done in the callback of the eventwriter @@ -146,7 +146,7 @@ void testRun() throws IOException, InterruptedException { } @Test - void testTerminateConnection() throws InterruptedException, JsonProcessingException { + void testTerminateConnection() throws InterruptedException, IOException { // We check whether an EventWriter has been created successfully // since it is not accessible, we use ReflectionTestUtils. // We then overwrite it with a mock, so that we can do checks on it @@ -176,9 +176,14 @@ void testTerminateConnection() throws InterruptedException, JsonProcessingExcept latch.countDown(); }).start(); + var streamLimit = new StreamLimit(); // default values + // We mock the EventMessageSupplier and let the mock always answer with a new event message to simulate // an endless stream - when(eventMessageSupplierMock.get()).thenAnswer(i -> new EventMessageContext(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT, false), false, Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class))); + when(eventMessageSupplierMock.get()).thenAnswer(i -> { + await().pollDelay(10, TimeUnit.MILLISECONDS).untilTrue(new AtomicBoolean(true)); + return new EventMessageContext(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT, false), false, streamLimit, Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class)); + }); when(eventMessageSupplierMock.getSubscriptionId()).thenReturn(MockHelper.TEST_SUBSCRIPTION_ID); // The following checks that we track a DELIVERED event with the de-duplication service // which is done in the callback of the eventwriter @@ -191,8 +196,8 @@ void testTerminateConnection() throws InterruptedException, JsonProcessingExcept var reachedZero = latch.await(10000, TimeUnit.MILLISECONDS); assertTrue(reachedZero); - // Verify that cut out happened - verify(emitterMock, times(1)).completeWithError(any(ConnectionCutOutException.class)); + verify(emitterMock, atLeast(1)).send(anyString()); + // Verify that ResponseBodyEmitter finishes with a ConnectionCutOutException verify(emitterMock, times(1)).completeWithError(any(ConnectionCutOutException.class)); // Verify that emitter has completed @@ -201,6 +206,160 @@ void testTerminateConnection() throws InterruptedException, JsonProcessingExcept verify(MockHelper.openConnectionGaugeValue, times(1)).getAndSet(0); } + @Test + void testTerminateConnectionThroughMaxNumberStreamLimit() throws IOException { + // We check whether an EventWriter has been created successfully + // since it is not accessible, we use ReflectionTestUtils. + // We then overwrite it with a mock, so that we can do checks on it + var eventWriter = (EventWriter) ReflectionTestUtils.getField(sseTaskSpy,"eventWriter"); + assertNotNull(eventWriter); + var eventWriterMock = mock(EventWriter.class); + ReflectionTestUtils.setField(sseTaskSpy,"eventWriter", eventWriterMock); + + // We assume the task has not been canceled + when(sseTaskStateContainerMock.getCanceled()).thenReturn(new AtomicBoolean(false)); + // We assume the task is not running in the beginning + var taskRunningSpy = spy(new AtomicBoolean(false)); + when(sseTaskStateContainerMock.getRunning()).thenReturn(taskRunningSpy); + + // Mock the ResponseBodyEmitter so that we can verify data is actually sent + var emitterMock = mock(ResponseBodyEmitter.class); + when(sseTaskStateContainerMock.getEmitter()).thenReturn(emitterMock); + + var maxNumberLimit = 5; + + var streamLimit = StreamLimit.of(maxNumberLimit, 0, 0); + + // We mock the EventMessageSupplier and let the mock always answer with a new event message to simulate + // an endless stream + when(eventMessageSupplierMock.get()).thenAnswer(i -> { + await().pollDelay(10, TimeUnit.MILLISECONDS).untilTrue(new AtomicBoolean(true)); + return new EventMessageContext(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT, false), false, streamLimit, Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class)); + }); + when(eventMessageSupplierMock.getSubscriptionId()).thenReturn(MockHelper.TEST_SUBSCRIPTION_ID); + // The following checks that we track a DELIVERED event with the de-duplication service + // which is done in the callback of the eventwriter + SendResult sendResult = mock(SendResult.class); + var succeededFuture = CompletableFuture.completedFuture(sendResult); + when(eventWriterMock.send(anyString(), notNull(), any())).thenReturn(succeededFuture); + + sseTaskSpy.run(); + + verify(emitterMock, times(maxNumberLimit)).send(anyString()); + + // Verify that ResponseBodyEmitter finishes with a ConnectionCutOutException + verify(emitterMock, times(1)).completeWithError(any(StreamLimitExceededException.class)); + // Verify that emitter has completed + verify(emitterMock, times(1)).onCompletion(any()); + // Verify that metrics for a closed connection are handled + verify(MockHelper.openConnectionGaugeValue, times(1)).getAndSet(0); + } + + @Test + void testTerminateConnectionThroughMaxByteStreamLimit() throws IOException { + // We check whether an EventWriter has been created successfully + // since it is not accessible, we use ReflectionTestUtils. + // We then overwrite it with a mock, so that we can do checks on it + var eventWriter = (EventWriter) ReflectionTestUtils.getField(sseTaskSpy,"eventWriter"); + assertNotNull(eventWriter); + var eventWriterMock = mock(EventWriter.class); + ReflectionTestUtils.setField(sseTaskSpy,"eventWriter", eventWriterMock); + + // We assume the task has not been canceled + when(sseTaskStateContainerMock.getCanceled()).thenReturn(new AtomicBoolean(false)); + // We assume the task is not running in the beginning + var taskRunningSpy = spy(new AtomicBoolean(false)); + when(sseTaskStateContainerMock.getRunning()).thenReturn(taskRunningSpy); + + // Mock the ResponseBodyEmitter so that we can verify data is actually sent + var emitterMock = mock(ResponseBodyEmitter.class); + when(sseTaskStateContainerMock.getEmitter()).thenReturn(emitterMock); + + var maxBytesLimit = 2000; + + var streamLimit = StreamLimit.of(0, 0, maxBytesLimit); + + // We mock the EventMessageSupplier and let the mock always answer with a new event message to simulate + // an endless stream + when(eventMessageSupplierMock.get()).thenAnswer(i -> { + await().pollDelay(10, TimeUnit.MILLISECONDS).untilTrue(new AtomicBoolean(true)); + return new EventMessageContext(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT, false), false, streamLimit, Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class)); + }); + when(eventMessageSupplierMock.getSubscriptionId()).thenReturn(MockHelper.TEST_SUBSCRIPTION_ID); + // The following checks that we track a DELIVERED event with the de-duplication service + // which is done in the callback of the eventwriter + SendResult sendResult = mock(SendResult.class); + var succeededFuture = CompletableFuture.completedFuture(sendResult); + when(eventWriterMock.send(anyString(), notNull(), any())).thenReturn(succeededFuture); + + sseTaskSpy.run(); + + ArgumentCaptor jsonCaptor = ArgumentCaptor.forClass(String.class); + + verify(emitterMock, atLeast(1)).send(jsonCaptor.capture()); + + var bytesSum = jsonCaptor.getAllValues().stream().mapToInt(x -> x.getBytes().length).sum(); + + assertTrue(bytesSum >= maxBytesLimit); + + // Verify that ResponseBodyEmitter finishes with a ConnectionCutOutException + verify(emitterMock, times(1)).completeWithError(any(StreamLimitExceededException.class)); + // Verify that emitter has completed + verify(emitterMock, times(1)).onCompletion(any()); + // Verify that metrics for a closed connection are handled + verify(MockHelper.openConnectionGaugeValue, times(1)).getAndSet(0); + } + + @Test + void testTerminateConnectionThroughMaxMinutesStreamLimit() throws IOException { + // We check whether an EventWriter has been created successfully + // since it is not accessible, we use ReflectionTestUtils. + // We then overwrite it with a mock, so that we can do checks on it + var eventWriter = (EventWriter) ReflectionTestUtils.getField(sseTaskSpy,"eventWriter"); + assertNotNull(eventWriter); + var eventWriterMock = mock(EventWriter.class); + ReflectionTestUtils.setField(sseTaskSpy,"eventWriter", eventWriterMock); + + // We assume the task has not been canceled + when(sseTaskStateContainerMock.getCanceled()).thenReturn(new AtomicBoolean(false)); + // We assume the task is not running in the beginning + var taskRunningSpy = spy(new AtomicBoolean(false)); + when(sseTaskStateContainerMock.getRunning()).thenReturn(taskRunningSpy); + + // Mock the ResponseBodyEmitter so that we can verify data is actually sent + var emitterMock = mock(ResponseBodyEmitter.class); + when(sseTaskStateContainerMock.getEmitter()).thenReturn(emitterMock); + + var maxMinutes = 1; + + var streamLimit = StreamLimit.of(0, maxMinutes, 0); + + // We mock the EventMessageSupplier and let the mock always answer with a new event message to simulate + // an endless stream + when(eventMessageSupplierMock.get()).thenAnswer(i -> { + await().pollDelay(10, TimeUnit.MILLISECONDS).untilTrue(new AtomicBoolean(true)); + + return new EventMessageContext(MockHelper.createSubscriptionEventMessageForTesting(DeliveryType.SERVER_SENT_EVENT, false), false, streamLimit, Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class)); + }); + when(eventMessageSupplierMock.getSubscriptionId()).thenReturn(MockHelper.TEST_SUBSCRIPTION_ID); + // The following checks that we track a DELIVERED event with the de-duplication service + // which is done in the callback of the eventwriter + SendResult sendResult = mock(SendResult.class); + var succeededFuture = CompletableFuture.completedFuture(sendResult); + when(eventWriterMock.send(anyString(), notNull(), any())).thenReturn(succeededFuture); + + sseTaskSpy.run(); + + assertEquals(maxMinutes, ChronoUnit.MINUTES.between(sseTaskSpy.getStartTime(), sseTaskSpy.getStopTime())); + + // Verify that ResponseBodyEmitter finishes with a ConnectionCutOutException + verify(emitterMock, times(1)).completeWithError(any(StreamLimitExceededException.class)); + // Verify that emitter has completed + verify(emitterMock, times(1)).onCompletion(any()); + // Verify that metrics for a closed connection are handled + verify(MockHelper.openConnectionGaugeValue, times(1)).getAndSet(0); + } + @Test void testSendEventFailed() throws IOException { // We create a new SubscriptionEventMessage queue and add some test messages @@ -231,7 +390,10 @@ void testSendEventFailed() throws IOException { var emitterMock = mock(ResponseBodyEmitter.class); when(sseTaskStateContainerMock.getEmitter()).thenReturn(emitterMock); // We mock the EventMessageSupplier since it's tested in a separate test - when(eventMessageSupplierMock.get()).thenAnswer(i -> new EventMessageContext(itemQueue.poll(), false, Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class))); + when(eventMessageSupplierMock.get()).thenAnswer(i -> { + await().pollDelay(10, TimeUnit.MILLISECONDS).untilTrue(new AtomicBoolean(true)); + return new EventMessageContext(itemQueue.poll(), false, new StreamLimit(), Mockito.mock(Span.class), Mockito.mock(Tracer.SpanInScope.class)); + }); when(eventMessageSupplierMock.getSubscriptionId()).thenReturn(MockHelper.TEST_SUBSCRIPTION_ID); // The following checks that we track a DELIVERED event with the de-duplication service // which is done in the callback of the eventwriter