Skip to content

Commit

Permalink
Merge pull request #15 from telekom/feature-new-sse-sent-metric
Browse files Browse the repository at this point in the history
feat: add new metric to monitor deliveries
  • Loading branch information
julian-spierefka authored Sep 18, 2024
2 parents 08d707d + 09e5ae6 commit df21d8c
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 6 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
fabric8Version=5.12.4
horizonParentVersion=4.1.0
horizonParentVersion=4.3.0
7 changes: 7 additions & 0 deletions src/main/java/de/telekom/horizon/pulsar/service/SseTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import de.telekom.eni.pandora.horizon.cache.service.DeDuplicationService;
import de.telekom.eni.pandora.horizon.kafka.event.EventWriter;
import de.telekom.eni.pandora.horizon.metrics.HorizonMetricsHelper;
import de.telekom.eni.pandora.horizon.model.event.Status;
import de.telekom.eni.pandora.horizon.model.event.StatusMessage;
import de.telekom.eni.pandora.horizon.model.event.SubscriptionEventMessage;
Expand Down Expand Up @@ -40,6 +41,8 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import static de.telekom.eni.pandora.horizon.metrics.HorizonMetricsConstants.METRIC_SENT_SSE_EVENTS;

/**
* Represents a Server-Sent Events (SSE) task responsible for emitting events to clients.
*
Expand All @@ -61,6 +64,7 @@ public class SseTask implements Runnable {
private final EventWriter eventWriter;
private final DeDuplicationService deDuplicationService;
private final HorizonTracer tracingHelper;
private final HorizonMetricsHelper metricsHelper;

@Setter
private String contentType = APPLICATION_STREAM_JSON_VALUE;
Expand Down Expand Up @@ -99,6 +103,7 @@ public SseTask(SseTaskStateContainer sseTaskStateContainer,
this.eventWriter = factory.getEventWriter();
this.deDuplicationService = factory.getDeDuplicationService();
this.tracingHelper = factory.getTracingHelper();
this.metricsHelper = factory.getMetricsHelper();
}

/**
Expand Down Expand Up @@ -316,6 +321,8 @@ private void sendEvent(SubscriptionEventMessage msg, boolean includeHttpHeaders)

bytesConsumed.addAndGet(eventJson.getBytes(StandardCharsets.UTF_8).length);
numberConsumed.incrementAndGet();

metricsHelper.getRegistry().counter(METRIC_SENT_SSE_EVENTS, metricsHelper.buildTagsFromSubscriptionEventMessage(msg)).increment();
} catch (JsonProcessingException e) {
var err = String.format("Error occurred while emitting the event: %s", e.getMessage());
log.info(err, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import de.telekom.eni.pandora.horizon.cache.service.DeDuplicationService;
import de.telekom.eni.pandora.horizon.kafka.event.EventWriter;
import de.telekom.eni.pandora.horizon.metrics.HorizonMetricsHelper;
import de.telekom.eni.pandora.horizon.mongo.repository.MessageStateMongoRepo;
import de.telekom.eni.pandora.horizon.tracing.HorizonTracer;
import de.telekom.horizon.pulsar.cache.ConnectionCache;
Expand Down Expand Up @@ -36,6 +37,7 @@ public class SseTaskFactory {
private final DeDuplicationService deDuplicationService;
private final HorizonTracer tracingHelper;
private final EventWriter eventWriter;
private final HorizonMetricsHelper metricsHelper;

/**
* Constructs an instance of {@code SseTaskFactory}.
Expand All @@ -57,6 +59,7 @@ public SseTaskFactory(
KafkaPicker kafkaPicker,
MessageStateMongoRepo messageStateMongoRepo,
DeDuplicationService deDuplicationService,
HorizonMetricsHelper metricsHelper,
HorizonTracer tracingHelper) {

this.pulsarConfig = pulsarConfig;
Expand All @@ -66,8 +69,8 @@ public SseTaskFactory(
this.tracingHelper = tracingHelper;
this.connectionCache = connectionCache;
this.deDuplicationService = deDuplicationService;

this.eventWriter = eventWriter;
this.metricsHelper = metricsHelper;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import de.telekom.eni.pandora.horizon.model.event.StatusMessage;
import de.telekom.horizon.pulsar.helper.StreamLimit;
import de.telekom.horizon.pulsar.testutils.MockHelper;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -83,7 +86,6 @@ void testGetEventMessageContext(int polls) {
var eventWriterMock = mock(EventWriter.class);
ReflectionTestUtils.setField(eventMessageSupplier, "eventWriter", eventWriterMock, EventWriter.class);


// We do multiple calls to EventMessageSupplier.get() in order to test
// that each call will fetch the next event message in the queue
for (int i = 0; i < polls; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class SseTaskFactoryTest {
void setupSseTaskFactoryTest() {
MockHelper.init();

sseTaskFactorySpy = spy(new SseTaskFactory(MockHelper.pulsarConfig, MockHelper.connectionCache, MockHelper.connectionGaugeCache, MockHelper.eventWriter, MockHelper.kafkaPicker, MockHelper.messageStateMongoRepo, MockHelper.deDuplicationService, MockHelper.tracingHelper));
sseTaskFactorySpy = spy(new SseTaskFactory(MockHelper.pulsarConfig, MockHelper.connectionCache, MockHelper.connectionGaugeCache, MockHelper.eventWriter, MockHelper.kafkaPicker, MockHelper.messageStateMongoRepo, MockHelper.deDuplicationService, MockHelper.metricsHelper, MockHelper.tracingHelper));
}

SubscriptionResource createSubscriptionResource() {
Expand Down
35 changes: 34 additions & 1 deletion src/test/java/de/telekom/horizon/pulsar/service/SseTaskTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import de.telekom.horizon.pulsar.helper.SseTaskStateContainer;
import de.telekom.horizon.pulsar.helper.StreamLimit;
import de.telekom.horizon.pulsar.testutils.MockHelper;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -39,6 +42,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static de.telekom.horizon.pulsar.testutils.MockHelper.metricsHelper;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;
Expand All @@ -59,7 +63,6 @@ void setupSseServiceTest() {
MockHelper.init();

var sseTask = new SseTask(sseTaskStateContainerMock, eventMessageSupplierMock, MockHelper.openConnectionGaugeValue, MockHelper.sseTaskFactory);

sseTaskSpy = spy(sseTask);
}

Expand Down Expand Up @@ -108,6 +111,12 @@ void testRun() throws IOException, InterruptedException {
// Used for verifying the timout worked
var started = Instant.now();

var counterMock = Mockito.mock(Counter.class);
var registryMock = Mockito.mock(MeterRegistry.class);
when(registryMock.counter(any(), any(Tags.class))).thenReturn(counterMock);
when(metricsHelper.buildTagsFromSubscriptionEventMessage(any())).thenReturn(Tags.empty());
when(metricsHelper.getRegistry()).thenReturn(registryMock);

// PUBLIC METHOD WE WANT TO TEST
sseTaskSpy.run();

Expand Down Expand Up @@ -191,6 +200,12 @@ void testTerminateConnection() throws InterruptedException, IOException {
var succeededFuture = CompletableFuture.completedFuture(sendResult);
when(eventWriterMock.send(anyString(), notNull(), any())).thenReturn(succeededFuture);

var counterMock = Mockito.mock(Counter.class);
var registryMock = Mockito.mock(MeterRegistry.class);
when(registryMock.counter(any(), any(Tags.class))).thenReturn(counterMock);
when(metricsHelper.buildTagsFromSubscriptionEventMessage(any())).thenReturn(Tags.empty());
when(metricsHelper.getRegistry()).thenReturn(registryMock);

sseTaskSpy.run();

var reachedZero = latch.await(10000, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -243,6 +258,12 @@ void testTerminateConnectionThroughMaxNumberStreamLimit() throws IOException {
var succeededFuture = CompletableFuture.completedFuture(sendResult);
when(eventWriterMock.send(anyString(), notNull(), any())).thenReturn(succeededFuture);

var counterMock = Mockito.mock(Counter.class);
var registryMock = Mockito.mock(MeterRegistry.class);
when(registryMock.counter(any(), any(Tags.class))).thenReturn(counterMock);
when(metricsHelper.buildTagsFromSubscriptionEventMessage(any())).thenReturn(Tags.empty());
when(metricsHelper.getRegistry()).thenReturn(registryMock);

sseTaskSpy.run();

verify(emitterMock, times(maxNumberLimit)).send(anyString());
Expand Down Expand Up @@ -292,6 +313,12 @@ void testTerminateConnectionThroughMaxByteStreamLimit() throws IOException {
var succeededFuture = CompletableFuture.completedFuture(sendResult);
when(eventWriterMock.send(anyString(), notNull(), any())).thenReturn(succeededFuture);

var counterMock = Mockito.mock(Counter.class);
var registryMock = Mockito.mock(MeterRegistry.class);
when(registryMock.counter(any(), any(Tags.class))).thenReturn(counterMock);
when(metricsHelper.buildTagsFromSubscriptionEventMessage(any())).thenReturn(Tags.empty());
when(metricsHelper.getRegistry()).thenReturn(registryMock);

sseTaskSpy.run();

ArgumentCaptor<String> jsonCaptor = ArgumentCaptor.forClass(String.class);
Expand Down Expand Up @@ -348,6 +375,12 @@ void testTerminateConnectionThroughMaxMinutesStreamLimit() throws IOException {
var succeededFuture = CompletableFuture.completedFuture(sendResult);
when(eventWriterMock.send(anyString(), notNull(), any())).thenReturn(succeededFuture);

var counterMock = Mockito.mock(Counter.class);
var registryMock = Mockito.mock(MeterRegistry.class);
when(registryMock.counter(any(), any(Tags.class))).thenReturn(counterMock);
when(metricsHelper.buildTagsFromSubscriptionEventMessage(any())).thenReturn(Tags.empty());
when(metricsHelper.getRegistry()).thenReturn(registryMock);

sseTaskSpy.run();

assertEquals(maxMinutes, ChronoUnit.MINUTES.between(sseTaskSpy.getStartTime(), sseTaskSpy.getStopTime()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import de.telekom.eni.pandora.horizon.cache.service.DeDuplicationService;
import de.telekom.eni.pandora.horizon.kafka.config.KafkaProperties;
import de.telekom.eni.pandora.horizon.kafka.event.EventWriter;
import de.telekom.eni.pandora.horizon.metrics.HorizonMetricsHelper;
import de.telekom.eni.pandora.horizon.model.db.Coordinates;
import de.telekom.eni.pandora.horizon.model.db.PartialEvent;
import de.telekom.eni.pandora.horizon.model.db.State;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class MockHelper {
public static Environment environment;
public static ResponseBodyEmitter emitter;
public static TokenService tokenService;
public static HorizonMetricsHelper metricsHelper;

public static DeDuplicationService deDuplicationService;

Expand All @@ -85,6 +87,7 @@ public static void init() {
subscriberCache = mock(SubscriberCache.class);
pulsarConfig = mock(PulsarConfig.class);
messageStateMongoRepo = mock(MessageStateMongoRepo.class);
metricsHelper = mock(HorizonMetricsHelper.class);
tracingHelper = mock(HorizonTracer.class);
environment = mock(Environment.class);
emitter = mock(ResponseBodyEmitter.class);
Expand All @@ -99,7 +102,7 @@ public static void init() {
lenient().when(pulsarConfig.getQueueCapacity()).thenReturn(100);

kafkaPicker = new KafkaPicker(kafkaTemplate);
sseTaskFactory = new SseTaskFactory(pulsarConfig, connectionCache, connectionGaugeCache, eventWriter, kafkaPicker, messageStateMongoRepo, deDuplicationService, tracingHelper);
sseTaskFactory = new SseTaskFactory(pulsarConfig, connectionCache, connectionGaugeCache, eventWriter, kafkaPicker, messageStateMongoRepo, deDuplicationService, metricsHelper, tracingHelper);
}

public static SubscriptionEventMessage createSubscriptionEventMessageForTesting(DeliveryType deliveryType, boolean withAdditionalFields) {
Expand Down

0 comments on commit df21d8c

Please sign in to comment.