diff --git a/gradle.properties b/gradle.properties index c85b76f..f184a04 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ fabric8Version=5.12.4 -horizonParentVersion=4.1.0 \ No newline at end of file +horizonParentVersion=4.3.0 \ No newline at end of file diff --git a/src/main/java/de/telekom/horizon/pulsar/service/SseTask.java b/src/main/java/de/telekom/horizon/pulsar/service/SseTask.java index 6d18bdc..0d8fcad 100644 --- a/src/main/java/de/telekom/horizon/pulsar/service/SseTask.java +++ b/src/main/java/de/telekom/horizon/pulsar/service/SseTask.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import de.telekom.eni.pandora.horizon.cache.service.DeDuplicationService; import de.telekom.eni.pandora.horizon.kafka.event.EventWriter; +import de.telekom.eni.pandora.horizon.metrics.HorizonMetricsHelper; import de.telekom.eni.pandora.horizon.model.event.Status; import de.telekom.eni.pandora.horizon.model.event.StatusMessage; import de.telekom.eni.pandora.horizon.model.event.SubscriptionEventMessage; @@ -40,6 +41,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; +import static de.telekom.eni.pandora.horizon.metrics.HorizonMetricsConstants.METRIC_SENT_SSE_EVENTS; + /** * Represents a Server-Sent Events (SSE) task responsible for emitting events to clients. * @@ -61,6 +64,7 @@ public class SseTask implements Runnable { private final EventWriter eventWriter; private final DeDuplicationService deDuplicationService; private final HorizonTracer tracingHelper; + private final HorizonMetricsHelper metricsHelper; @Setter private String contentType = APPLICATION_STREAM_JSON_VALUE; @@ -99,6 +103,7 @@ public SseTask(SseTaskStateContainer sseTaskStateContainer, this.eventWriter = factory.getEventWriter(); this.deDuplicationService = factory.getDeDuplicationService(); this.tracingHelper = factory.getTracingHelper(); + this.metricsHelper = factory.getMetricsHelper(); } /** @@ -316,6 +321,8 @@ private void sendEvent(SubscriptionEventMessage msg, boolean includeHttpHeaders) bytesConsumed.addAndGet(eventJson.getBytes(StandardCharsets.UTF_8).length); numberConsumed.incrementAndGet(); + + metricsHelper.getRegistry().counter(METRIC_SENT_SSE_EVENTS, metricsHelper.buildTagsFromSubscriptionEventMessage(msg)).increment(); } catch (JsonProcessingException e) { var err = String.format("Error occurred while emitting the event: %s", e.getMessage()); log.info(err, e); diff --git a/src/main/java/de/telekom/horizon/pulsar/service/SseTaskFactory.java b/src/main/java/de/telekom/horizon/pulsar/service/SseTaskFactory.java index 83253c2..493d846 100644 --- a/src/main/java/de/telekom/horizon/pulsar/service/SseTaskFactory.java +++ b/src/main/java/de/telekom/horizon/pulsar/service/SseTaskFactory.java @@ -6,6 +6,7 @@ import de.telekom.eni.pandora.horizon.cache.service.DeDuplicationService; import de.telekom.eni.pandora.horizon.kafka.event.EventWriter; +import de.telekom.eni.pandora.horizon.metrics.HorizonMetricsHelper; import de.telekom.eni.pandora.horizon.mongo.repository.MessageStateMongoRepo; import de.telekom.eni.pandora.horizon.tracing.HorizonTracer; import de.telekom.horizon.pulsar.cache.ConnectionCache; @@ -36,6 +37,7 @@ public class SseTaskFactory { private final DeDuplicationService deDuplicationService; private final HorizonTracer tracingHelper; private final EventWriter eventWriter; + private final HorizonMetricsHelper metricsHelper; /** * Constructs an instance of {@code SseTaskFactory}. @@ -57,6 +59,7 @@ public SseTaskFactory( KafkaPicker kafkaPicker, MessageStateMongoRepo messageStateMongoRepo, DeDuplicationService deDuplicationService, + HorizonMetricsHelper metricsHelper, HorizonTracer tracingHelper) { this.pulsarConfig = pulsarConfig; @@ -66,8 +69,8 @@ public SseTaskFactory( this.tracingHelper = tracingHelper; this.connectionCache = connectionCache; this.deDuplicationService = deDuplicationService; - this.eventWriter = eventWriter; + this.metricsHelper = metricsHelper; } /** diff --git a/src/test/java/de/telekom/horizon/pulsar/service/EventMessageSupplierTest.java b/src/test/java/de/telekom/horizon/pulsar/service/EventMessageSupplierTest.java index fa7fbdc..847b2c7 100644 --- a/src/test/java/de/telekom/horizon/pulsar/service/EventMessageSupplierTest.java +++ b/src/test/java/de/telekom/horizon/pulsar/service/EventMessageSupplierTest.java @@ -12,6 +12,9 @@ import de.telekom.eni.pandora.horizon.model.event.StatusMessage; import de.telekom.horizon.pulsar.helper.StreamLimit; import de.telekom.horizon.pulsar.testutils.MockHelper; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tags; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.BeforeEach; @@ -83,7 +86,6 @@ void testGetEventMessageContext(int polls) { var eventWriterMock = mock(EventWriter.class); ReflectionTestUtils.setField(eventMessageSupplier, "eventWriter", eventWriterMock, EventWriter.class); - // We do multiple calls to EventMessageSupplier.get() in order to test // that each call will fetch the next event message in the queue for (int i = 0; i < polls; i++) { diff --git a/src/test/java/de/telekom/horizon/pulsar/service/SseTaskFactoryTest.java b/src/test/java/de/telekom/horizon/pulsar/service/SseTaskFactoryTest.java index ee21908..2b63732 100644 --- a/src/test/java/de/telekom/horizon/pulsar/service/SseTaskFactoryTest.java +++ b/src/test/java/de/telekom/horizon/pulsar/service/SseTaskFactoryTest.java @@ -38,7 +38,7 @@ class SseTaskFactoryTest { void setupSseTaskFactoryTest() { MockHelper.init(); - sseTaskFactorySpy = spy(new SseTaskFactory(MockHelper.pulsarConfig, MockHelper.connectionCache, MockHelper.connectionGaugeCache, MockHelper.eventWriter, MockHelper.kafkaPicker, MockHelper.messageStateMongoRepo, MockHelper.deDuplicationService, MockHelper.tracingHelper)); + sseTaskFactorySpy = spy(new SseTaskFactory(MockHelper.pulsarConfig, MockHelper.connectionCache, MockHelper.connectionGaugeCache, MockHelper.eventWriter, MockHelper.kafkaPicker, MockHelper.messageStateMongoRepo, MockHelper.deDuplicationService, MockHelper.metricsHelper, MockHelper.tracingHelper)); } SubscriptionResource createSubscriptionResource() { diff --git a/src/test/java/de/telekom/horizon/pulsar/service/SseTaskTest.java b/src/test/java/de/telekom/horizon/pulsar/service/SseTaskTest.java index 539cbbd..fc8a327 100644 --- a/src/test/java/de/telekom/horizon/pulsar/service/SseTaskTest.java +++ b/src/test/java/de/telekom/horizon/pulsar/service/SseTaskTest.java @@ -18,6 +18,9 @@ import de.telekom.horizon.pulsar.helper.SseTaskStateContainer; import de.telekom.horizon.pulsar.helper.StreamLimit; import de.telekom.horizon.pulsar.testutils.MockHelper; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tags; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -39,6 +42,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static de.telekom.horizon.pulsar.testutils.MockHelper.metricsHelper; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; @@ -59,7 +63,6 @@ void setupSseServiceTest() { MockHelper.init(); var sseTask = new SseTask(sseTaskStateContainerMock, eventMessageSupplierMock, MockHelper.openConnectionGaugeValue, MockHelper.sseTaskFactory); - sseTaskSpy = spy(sseTask); } @@ -108,6 +111,12 @@ void testRun() throws IOException, InterruptedException { // Used for verifying the timout worked var started = Instant.now(); + var counterMock = Mockito.mock(Counter.class); + var registryMock = Mockito.mock(MeterRegistry.class); + when(registryMock.counter(any(), any(Tags.class))).thenReturn(counterMock); + when(metricsHelper.buildTagsFromSubscriptionEventMessage(any())).thenReturn(Tags.empty()); + when(metricsHelper.getRegistry()).thenReturn(registryMock); + // PUBLIC METHOD WE WANT TO TEST sseTaskSpy.run(); @@ -191,6 +200,12 @@ void testTerminateConnection() throws InterruptedException, IOException { var succeededFuture = CompletableFuture.completedFuture(sendResult); when(eventWriterMock.send(anyString(), notNull(), any())).thenReturn(succeededFuture); + var counterMock = Mockito.mock(Counter.class); + var registryMock = Mockito.mock(MeterRegistry.class); + when(registryMock.counter(any(), any(Tags.class))).thenReturn(counterMock); + when(metricsHelper.buildTagsFromSubscriptionEventMessage(any())).thenReturn(Tags.empty()); + when(metricsHelper.getRegistry()).thenReturn(registryMock); + sseTaskSpy.run(); var reachedZero = latch.await(10000, TimeUnit.MILLISECONDS); @@ -243,6 +258,12 @@ void testTerminateConnectionThroughMaxNumberStreamLimit() throws IOException { var succeededFuture = CompletableFuture.completedFuture(sendResult); when(eventWriterMock.send(anyString(), notNull(), any())).thenReturn(succeededFuture); + var counterMock = Mockito.mock(Counter.class); + var registryMock = Mockito.mock(MeterRegistry.class); + when(registryMock.counter(any(), any(Tags.class))).thenReturn(counterMock); + when(metricsHelper.buildTagsFromSubscriptionEventMessage(any())).thenReturn(Tags.empty()); + when(metricsHelper.getRegistry()).thenReturn(registryMock); + sseTaskSpy.run(); verify(emitterMock, times(maxNumberLimit)).send(anyString()); @@ -292,6 +313,12 @@ void testTerminateConnectionThroughMaxByteStreamLimit() throws IOException { var succeededFuture = CompletableFuture.completedFuture(sendResult); when(eventWriterMock.send(anyString(), notNull(), any())).thenReturn(succeededFuture); + var counterMock = Mockito.mock(Counter.class); + var registryMock = Mockito.mock(MeterRegistry.class); + when(registryMock.counter(any(), any(Tags.class))).thenReturn(counterMock); + when(metricsHelper.buildTagsFromSubscriptionEventMessage(any())).thenReturn(Tags.empty()); + when(metricsHelper.getRegistry()).thenReturn(registryMock); + sseTaskSpy.run(); ArgumentCaptor jsonCaptor = ArgumentCaptor.forClass(String.class); @@ -348,6 +375,12 @@ void testTerminateConnectionThroughMaxMinutesStreamLimit() throws IOException { var succeededFuture = CompletableFuture.completedFuture(sendResult); when(eventWriterMock.send(anyString(), notNull(), any())).thenReturn(succeededFuture); + var counterMock = Mockito.mock(Counter.class); + var registryMock = Mockito.mock(MeterRegistry.class); + when(registryMock.counter(any(), any(Tags.class))).thenReturn(counterMock); + when(metricsHelper.buildTagsFromSubscriptionEventMessage(any())).thenReturn(Tags.empty()); + when(metricsHelper.getRegistry()).thenReturn(registryMock); + sseTaskSpy.run(); assertEquals(maxMinutes, ChronoUnit.MINUTES.between(sseTaskSpy.getStartTime(), sseTaskSpy.getStopTime())); diff --git a/src/test/java/de/telekom/horizon/pulsar/testutils/MockHelper.java b/src/test/java/de/telekom/horizon/pulsar/testutils/MockHelper.java index 30e8cd4..adf3c86 100644 --- a/src/test/java/de/telekom/horizon/pulsar/testutils/MockHelper.java +++ b/src/test/java/de/telekom/horizon/pulsar/testutils/MockHelper.java @@ -10,6 +10,7 @@ import de.telekom.eni.pandora.horizon.cache.service.DeDuplicationService; import de.telekom.eni.pandora.horizon.kafka.config.KafkaProperties; import de.telekom.eni.pandora.horizon.kafka.event.EventWriter; +import de.telekom.eni.pandora.horizon.metrics.HorizonMetricsHelper; import de.telekom.eni.pandora.horizon.model.db.Coordinates; import de.telekom.eni.pandora.horizon.model.db.PartialEvent; import de.telekom.eni.pandora.horizon.model.db.State; @@ -62,6 +63,7 @@ public class MockHelper { public static Environment environment; public static ResponseBodyEmitter emitter; public static TokenService tokenService; + public static HorizonMetricsHelper metricsHelper; public static DeDuplicationService deDuplicationService; @@ -85,6 +87,7 @@ public static void init() { subscriberCache = mock(SubscriberCache.class); pulsarConfig = mock(PulsarConfig.class); messageStateMongoRepo = mock(MessageStateMongoRepo.class); + metricsHelper = mock(HorizonMetricsHelper.class); tracingHelper = mock(HorizonTracer.class); environment = mock(Environment.class); emitter = mock(ResponseBodyEmitter.class); @@ -99,7 +102,7 @@ public static void init() { lenient().when(pulsarConfig.getQueueCapacity()).thenReturn(100); kafkaPicker = new KafkaPicker(kafkaTemplate); - sseTaskFactory = new SseTaskFactory(pulsarConfig, connectionCache, connectionGaugeCache, eventWriter, kafkaPicker, messageStateMongoRepo, deDuplicationService, tracingHelper); + sseTaskFactory = new SseTaskFactory(pulsarConfig, connectionCache, connectionGaugeCache, eventWriter, kafkaPicker, messageStateMongoRepo, deDuplicationService, metricsHelper, tracingHelper); } public static SubscriptionEventMessage createSubscriptionEventMessageForTesting(DeliveryType deliveryType, boolean withAdditionalFields) {