Skip to content

Commit

Permalink
feat: enable offset-based event streaming (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
mherwig authored Nov 28, 2024
1 parent df21d8c commit bfbf073
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 89 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
fabric8Version=5.12.4
horizonParentVersion=4.3.0
horizonParentVersion=0.0.0-feature-offset-based-sse-SNAPSHOT
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import de.telekom.horizon.pulsar.helper.StreamLimit;
import de.telekom.horizon.pulsar.service.SseService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
Expand Down Expand Up @@ -74,6 +75,7 @@ public ResponseEntity<ResponseBodyEmitter> getSseStream(@PathVariable String env
@RequestParam(defaultValue = "0") int maxNumber,
@RequestParam(defaultValue = "0") int maxMinutes,
@RequestParam(defaultValue = "0") int maxBytes,
@RequestHeader(value = "Last-Event-ID", required = false) String offset,
@RequestHeader(value = HttpHeaders.ACCEPT, required = false) String accept) throws SubscriberDoesNotMatchSubscriptionException {

sseService.validateSubscriberIdForSubscription(environment, subscriptionId);
Expand All @@ -83,7 +85,7 @@ public ResponseEntity<ResponseBodyEmitter> getSseStream(@PathVariable String env
accept = APPLICATION_STREAM_JSON_VALUE;
}

var responseContainer = sseService.startEmittingEvents(environment, subscriptionId, accept, includeHttpHeaders, StreamLimit.of(maxNumber, maxMinutes, maxBytes));
var responseContainer = sseService.startEmittingEvents(environment, subscriptionId, accept, StringUtils.isNotEmpty(offset) || includeHttpHeaders, offset, StreamLimit.of(maxNumber, maxMinutes, maxBytes));

var responseHeaders = new HttpHeaders();
responseHeaders.add(HttpHeaders.CONTENT_TYPE, accept);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class EventMessageContext {
private Boolean includeHttpHeaders;
@Getter
private StreamLimit streamLimit;
@Getter
private boolean ignoreDeduplication;

private Span span;
private Tracer.SpanInScope spanInScope;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import de.telekom.eni.pandora.horizon.model.event.StatusMessage;
import de.telekom.eni.pandora.horizon.model.event.SubscriptionEventMessage;
import de.telekom.eni.pandora.horizon.model.meta.EventRetentionTime;
import de.telekom.eni.pandora.horizon.mongo.model.MessageStateMongoDocument;
import de.telekom.eni.pandora.horizon.mongo.repository.MessageStateMongoRepo;
import de.telekom.eni.pandora.horizon.tracing.HorizonTracer;
import de.telekom.horizon.pulsar.config.PulsarConfig;
Expand All @@ -25,13 +26,15 @@
import de.telekom.horizon.pulsar.utils.KafkaPicker;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -62,16 +65,19 @@ public class EventMessageSupplier implements Supplier<EventMessageContext> {
private final HorizonTracer tracingHelper;
private final ConcurrentLinkedQueue<State> messageStates = new ConcurrentLinkedQueue<>();
private Instant lastPoll;
private String currentOffset;
private final ObjectMapper objectMapper = new ObjectMapper();

/**
* Constructs an instance of {@code EventMessageSupplier}.
*
* @param subscriptionId The subscriptionId for which messages are fetched.
* @param factory The {@link SseTaskFactory} used for obtaining related components.
* @param subscriptionId The subscriptionId for which messages are fetched.
* @param factory The {@link SseTaskFactory} used for obtaining related components.
* @param includeHttpHeaders Boolean flag indicating whether to include HTTP headers in the generated {@code EventMessageContext}.
* @param startingOffset Enables offset based streaming. Specifies the offset (message id) of the last received event message.
* @param streamLimit The {@link StreamLimit} represents any customer specific conditions for terminating the stream early.
*/
public EventMessageSupplier(String subscriptionId, SseTaskFactory factory, boolean includeHttpHeaders, StreamLimit streamLimit) {
public EventMessageSupplier(String subscriptionId, SseTaskFactory factory, boolean includeHttpHeaders, String startingOffset, StreamLimit streamLimit) {
this.subscriptionId = subscriptionId;

this.pulsarConfig = factory.getPulsarConfig();
Expand All @@ -80,6 +86,7 @@ public EventMessageSupplier(String subscriptionId, SseTaskFactory factory, boole
this.eventWriter = factory.getEventWriter();
this.tracingHelper = factory.getTracingHelper();
this.includeHttpHeaders = includeHttpHeaders;
this.currentOffset = startingOffset;
this.streamLimit = streamLimit;
}

Expand All @@ -98,6 +105,7 @@ public EventMessageContext get() {

if (!messageStates.isEmpty()) {
var state = messageStates.poll();
var ignoreDeduplication = StringUtils.isNotEmpty(currentOffset);

// TODO: these spans get duplicated cause of the vortex latency - will be resolved DHEI-13764

Expand All @@ -120,12 +128,13 @@ public EventMessageContext get() {
var errorMessage = String.format("Event message %s did not match subscriptionId %s", state.getUuid(), state.getSubscriptionId());
throw new SubscriberDoesNotMatchSubscriptionException(errorMessage);
}
}

return new EventMessageContext(message, includeHttpHeaders, streamLimit, span, spanInScope);
Optional.ofNullable(message.getHttpHeaders()).ifPresent(headers -> headers.put("x-pubsub-offset-id", new ArrayList<>(List.of(state.getUuid()))));
}
return new EventMessageContext(message, includeHttpHeaders, streamLimit, ignoreDeduplication, span, spanInScope);
} catch (CouldNotPickMessageException | SubscriberDoesNotMatchSubscriptionException e) {
handleException(state, e);
return new EventMessageContext(null, includeHttpHeaders, streamLimit, span, spanInScope);
return new EventMessageContext(null, includeHttpHeaders, streamLimit, ignoreDeduplication, span, spanInScope);
} finally {
pickSpan.finish();
}
Expand Down Expand Up @@ -171,16 +180,42 @@ private void pollMessageStates() {

Pageable pageable = PageRequest.of(0, pulsarConfig.getSseBatchSize(), Sort.by(Sort.Direction.ASC, "timestamp"));

var list = messageStateMongoRepo.findByStatusInAndDeliveryTypeAndSubscriptionIdAsc(
List.of(Status.PROCESSED),
DeliveryType.SERVER_SENT_EVENT,
subscriptionId,
pageable
).stream()
.filter(m -> m.getCoordinates() != null) // we skip messages that refer to -1 partitions and offsets
.toList();
Optional<MessageStateMongoDocument> offsetMsg = Optional.empty();
if (StringUtils.isNoneEmpty(currentOffset)) {
offsetMsg = messageStateMongoRepo.findById(currentOffset);
}

if (offsetMsg.isPresent()) {
var offsetTimestamp = offsetMsg.get().getTimestamp();

var list = messageStateMongoRepo.findByDeliveryTypeAndSubscriptionIdAndTimestampGreaterThanAsc(
DeliveryType.SERVER_SENT_EVENT,
subscriptionId,
offsetTimestamp,
pageable
).stream()
.filter(m -> m.getCoordinates() != null) // we skip messages that refer to -1 partitions and offsets
.toList();

messageStates.addAll(list);
messageStates.addAll(list);

if (!list.isEmpty()) {
currentOffset = list.getLast().getUuid();
}
} else {
currentOffset = null;

var list = messageStateMongoRepo.findByStatusInAndDeliveryTypeAndSubscriptionIdAsc(
List.of(Status.PROCESSED),
DeliveryType.SERVER_SENT_EVENT,
subscriptionId,
pageable
).stream()
.filter(m -> m.getCoordinates() != null) // we skip messages that refer to -1 partitions and offsets
.toList();

messageStates.addAll(list);
}

lastPoll = Instant.now();
}
Expand Down
16 changes: 9 additions & 7 deletions src/main/java/de/telekom/horizon/pulsar/service/SseService.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,18 @@ public void validateSubscriberIdForSubscription(String environment, String subsc
/**
* Starts emitting events for the specified subscription.
*
* @param environment The environment associated with the subscription.
* @param subscriptionId The ID of the subscription for which events should be emitted.
* @param contentType The content type for the events.
* @param includeHttpHeaders A boolean flag indicating whether to include HTTP headers in the emitted events.
* @return The {@link SseTaskStateContainer} representing the state of the emitted events.
* @param environment The environment associated with the subscription.
* @param subscriptionId The ID of the subscription for which events should be emitted.
* @param contentType The content type for the events.
* @param includeHttpHeaders A boolean flag indicating whether to include HTTP headers in the emitted events.
* @param offset Enables offset based streaming. Specifies the offset (message id) of the last received event message.
* @param streamLimit The {@link StreamLimit} represents any customer specific conditions for terminating the stream early.
* @return The {@link SseTaskStateContainer} representing the state of the emitted events.
*/
public SseTaskStateContainer startEmittingEvents(String environment, String subscriptionId, String contentType, boolean includeHttpHeaders, StreamLimit streamLimit) {
public SseTaskStateContainer startEmittingEvents(String environment, String subscriptionId, String contentType, boolean includeHttpHeaders, String offset, StreamLimit streamLimit) {
var responseContainer = new SseTaskStateContainer();

taskExecutor.submit(sseTaskFactory.createNew(environment, subscriptionId, contentType, responseContainer, includeHttpHeaders, streamLimit));
taskExecutor.submit(sseTaskFactory.createNew(environment, subscriptionId, contentType, responseContainer, includeHttpHeaders, offset, streamLimit));

responseContainer.setReady(pulsarConfig.getSseTimeout());

Expand Down
24 changes: 13 additions & 11 deletions src/main/java/de/telekom/horizon/pulsar/service/SseTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -277,18 +277,20 @@ private void emitEventMessage(EventMessageContext context) {
return;
}

String msgUuidOrNull = deDuplicationService.get(msg);
boolean isDuplicate = Objects.nonNull(msgUuidOrNull);
if (isDuplicate) {
if(Objects.equals(msg.getUuid(), msgUuidOrNull)) {
log.debug("Message with id {} was found in the deduplication cache with the same UUID. Message will be ignored, because status will probably set to DELIVERED in the next minutes.", msg.getUuid());
} else {
log.debug("Message with id {} was found in the deduplication cache with another UUID. Message will be set to DUPLICATE to prevent event being stuck at PROCESSED.", msg.getUuid());
pushMetadata(msg, Status.DUPLICATE, null);
}
if (!context.isIgnoreDeduplication()) {
String msgUuidOrNull = deDuplicationService.get(msg);
boolean isDuplicate = Objects.nonNull(msgUuidOrNull);
if (isDuplicate) {
if(Objects.equals(msg.getUuid(), msgUuidOrNull)) {
log.debug("Message with id {} was found in the deduplication cache with the same UUID. Message will be ignored, because status will probably set to DELIVERED in the next minutes.", msg.getUuid());
} else {
log.debug("Message with id {} was found in the deduplication cache with another UUID. Message will be set to DUPLICATE to prevent event being stuck at PROCESSED.", msg.getUuid());
pushMetadata(msg, Status.DUPLICATE, null);
}

context.finishSpan();
return;
context.finishSpan();
return;
}
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,12 @@ public SseTaskFactory(
* @param contentType The content type for the SSE task.
* @param sseTaskStateContainer The {@link SseTaskStateContainer} represents the state of the SSE task.
* @param includeHttpHeaders A boolean flag indicating whether to include HTTP headers in the SSE task.
* @param offset Enables offset based streaming. Specifies the offset (message id) of the last received event message.
* @param streamLimit The {@link StreamLimit} represents any customer specific conditions for terminating the stream early.
* @return The newly created {@link SseTask}.
* @return The newly created {@link SseTask}.
*/
public SseTask createNew(String environment, String subscriptionId, String contentType, SseTaskStateContainer sseTaskStateContainer, boolean includeHttpHeaders, StreamLimit streamLimit) {
var eventMessageSupplier = new EventMessageSupplier(subscriptionId, this, includeHttpHeaders, streamLimit);
public SseTask createNew(String environment, String subscriptionId, String contentType, SseTaskStateContainer sseTaskStateContainer, boolean includeHttpHeaders, String offset, StreamLimit streamLimit) {
var eventMessageSupplier = new EventMessageSupplier(subscriptionId, this, includeHttpHeaders, offset, streamLimit);
var connection = connectionGaugeCache.getOrCreateGaugeForSubscription(environment, subscriptionId);

var task = new SseTask(sseTaskStateContainer, eventMessageSupplier, connection, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ void getEventsViaSSESuccessfullyWithOk() throws Exception {
.andExpect(status().isOk());

verify(sseService, times(1)).validateSubscriberIdForSubscription(eq(env), eq(subscriptionId));
verify(sseService, times(1)).startEmittingEvents(eq(env), eq(subscriptionId), any(String.class), eq(true), any(StreamLimit.class));
verify(sseService, times(1)).startEmittingEvents(eq(env), eq(subscriptionId), any(String.class), eq(true), any(), any(StreamLimit.class));
}

private Jwt getJwt(String subscriberId) {
Expand Down
Loading

0 comments on commit bfbf073

Please sign in to comment.