Skip to content

Commit

Permalink
Merge pull request #13 from telekom/feature/control-stream-termination
Browse files Browse the repository at this point in the history
Feature/control stream termination
  • Loading branch information
mherwig authored Sep 12, 2024
2 parents 2c415cc + c4e9852 commit 1029412
Show file tree
Hide file tree
Showing 14 changed files with 360 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
package de.telekom.horizon.pulsar.api;

import de.telekom.eni.pandora.horizon.model.common.ProblemMessage;
import de.telekom.horizon.pulsar.exception.ConnectionCutOutException;
import de.telekom.horizon.pulsar.exception.ConnectionTimeoutException;
import de.telekom.horizon.pulsar.exception.QueueWaitTimeoutException;
import de.telekom.horizon.pulsar.exception.SubscriberDoesNotMatchSubscriptionException;
import de.telekom.horizon.pulsar.exception.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.catalina.connector.ClientAbortException;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -64,6 +61,7 @@ public RestResponseEntityExceptionHandler(ApplicationContext applicationContext)
*/
@ExceptionHandler(value = {
ConnectionCutOutException.class,
StreamLimitExceededException.class
})
@ResponseStatus(HttpStatus.OK)
protected ResponseEntity<Object> handleCutOut(Exception e, WebRequest request) {
Expand Down
31 changes: 27 additions & 4 deletions src/main/java/de/telekom/horizon/pulsar/api/SseController.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package de.telekom.horizon.pulsar.api;

import de.telekom.horizon.pulsar.exception.SubscriberDoesNotMatchSubscriptionException;
import de.telekom.horizon.pulsar.helper.StreamLimit;
import de.telekom.horizon.pulsar.service.SseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpHeaders;
Expand Down Expand Up @@ -56,17 +57,23 @@ public ResponseEntity<Void> headRequest(@PathVariable String environment) {
/**
* Retrieves SSE stream for the specified subscriptionId.
*
* @param environment The environment path variable.
* @param subscriptionId The subscriptionId path variable.
* @param environment The environment path variable.
* @param subscriptionId The subscriptionId path variable.
* @param includeHttpHeaders Whether to include HTTP headers in the response.
* @param accept The value of the "Accept" header in the request.
* @param maxNumber Whether to terminate after a certain number of consumed events.
* @param maxMinutes Whether to terminate after a certain time (in minutes).
* @param maxBytes Whether to terminate after a certain number of bytes consumed.
* @param accept The value of the "Accept" header in the request.
* @return A response containing a {@code ResponseBodyEmitter} for SSE streaming.
* @throws SubscriberDoesNotMatchSubscriptionException If the subscriber does not match the specified subscription.
*/
@GetMapping(value = "/sse/{subscriptionId}", produces = {MediaType.ALL_VALUE, APPLICATION_STREAM_JSON_VALUE, MediaType.TEXT_EVENT_STREAM_VALUE})
public ResponseEntity<ResponseBodyEmitter> getSseStream(@PathVariable String environment,
@PathVariable String subscriptionId,
@RequestParam(defaultValue = "false") boolean includeHttpHeaders,
@RequestParam(defaultValue = "0") int maxNumber,
@RequestParam(defaultValue = "0") int maxMinutes,
@RequestParam(defaultValue = "0") int maxBytes,
@RequestHeader(value = HttpHeaders.ACCEPT, required = false) String accept) throws SubscriberDoesNotMatchSubscriptionException {

sseService.validateSubscriberIdForSubscription(environment, subscriptionId);
Expand All @@ -76,7 +83,7 @@ public ResponseEntity<ResponseBodyEmitter> getSseStream(@PathVariable String env
accept = APPLICATION_STREAM_JSON_VALUE;
}

var responseContainer = sseService.startEmittingEvents(environment, subscriptionId, accept, includeHttpHeaders);
var responseContainer = sseService.startEmittingEvents(environment, subscriptionId, accept, includeHttpHeaders, StreamLimit.of(maxNumber, maxMinutes, maxBytes));

var responseHeaders = new HttpHeaders();
responseHeaders.add(HttpHeaders.CONTENT_TYPE, accept);
Expand All @@ -86,4 +93,20 @@ public ResponseEntity<ResponseBodyEmitter> getSseStream(@PathVariable String env

return new ResponseEntity<>(responseContainer.getEmitter(), responseHeaders, HttpStatus.OK);
}

/**
* Stops an active SSE stream for the specified subscriptionId.
*
* @param environment The environment path variable.
* @param subscriptionId The subscriptionId path variable.
* @throws SubscriberDoesNotMatchSubscriptionException If the subscriber does not match the specified subscription.
*/
@PostMapping(value = "/sse/{subscriptionId}/terminate", produces = {MediaType.ALL_VALUE, APPLICATION_STREAM_JSON_VALUE, MediaType.TEXT_EVENT_STREAM_VALUE})
public ResponseEntity<Void> terminateSseStream(@PathVariable String environment, @PathVariable String subscriptionId) throws SubscriberDoesNotMatchSubscriptionException {

sseService.validateSubscriberIdForSubscription(environment, subscriptionId);
sseService.stopEmittingEvents(subscriptionId);

return ResponseEntity.status(HttpStatus.OK).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright 2024 Deutsche Telekom IT GmbH
//
// SPDX-License-Identifier: Apache-2.0

package de.telekom.horizon.pulsar.exception;

public class StreamLimitExceededException extends HorizonPulsarException {
public StreamLimitExceededException() {
super();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* Context class representing the context of an event message in a subscription.
*
* This class encapsulates information related to a subscription event message, including
* whether to include HTTP headers, and optional tracing components such as a span and span in scope.
* whether to include HTTP headers or stream limits, and optional tracing components such as a span and span in scope.
* It provides a method to finish the span and span in scope if they are present.
*/

Expand All @@ -28,6 +28,9 @@ public class EventMessageContext {
private SubscriptionEventMessage subscriptionEventMessage;
@Getter
private Boolean includeHttpHeaders;
@Getter
private StreamLimit streamLimit;

private Span span;
private Tracer.SpanInScope spanInScope;
public void finishSpan() {
Expand Down
33 changes: 33 additions & 0 deletions src/main/java/de/telekom/horizon/pulsar/helper/StreamLimit.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 Deutsche Telekom IT GmbH
//
// SPDX-License-Identifier: Apache-2.0

package de.telekom.horizon.pulsar.helper;


import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

/**
* Class that contains any streaming limits provided by the customer.
*
* This class encapsulates all possible streaming limits that have been provided by the customer when
* requesting a new stream. The streaming limits will ensure that a active stream terminates early on when exceeded.
* Currently, a customer can specify that the stream should terminate when a specific number of events have been consumed
* or after a certain time (in minutes) or after exceeding a certain number of bytes which have been consumed.
*/
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class StreamLimit {
private int maxNumber;
private int maxMinutes;
private long maxBytes;

public static StreamLimit of(int maxNumber, int maxMinutes, int maxBytes) {
return new StreamLimit(maxNumber, maxMinutes, maxBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import de.telekom.horizon.pulsar.exception.CouldNotPickMessageException;
import de.telekom.horizon.pulsar.exception.SubscriberDoesNotMatchSubscriptionException;
import de.telekom.horizon.pulsar.helper.EventMessageContext;
import de.telekom.horizon.pulsar.helper.StreamLimit;
import de.telekom.horizon.pulsar.utils.KafkaPicker;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -54,6 +55,7 @@ public class EventMessageSupplier implements Supplier<EventMessageContext> {
@Getter
private final String subscriptionId;
private final Boolean includeHttpHeaders;
private final StreamLimit streamLimit;
private final KafkaPicker kafkaPicker;
private final EventWriter eventWriter;
private final MessageStateMongoRepo messageStateMongoRepo;
Expand All @@ -69,7 +71,7 @@ public class EventMessageSupplier implements Supplier<EventMessageContext> {
* @param factory The {@link SseTaskFactory} used for obtaining related components.
* @param includeHttpHeaders Boolean flag indicating whether to include HTTP headers in the generated {@code EventMessageContext}.
*/
public EventMessageSupplier(String subscriptionId, SseTaskFactory factory, boolean includeHttpHeaders) {
public EventMessageSupplier(String subscriptionId, SseTaskFactory factory, boolean includeHttpHeaders, StreamLimit streamLimit) {
this.subscriptionId = subscriptionId;

this.pulsarConfig = factory.getPulsarConfig();
Expand All @@ -78,6 +80,7 @@ public EventMessageSupplier(String subscriptionId, SseTaskFactory factory, boole
this.eventWriter = factory.getEventWriter();
this.tracingHelper = factory.getTracingHelper();
this.includeHttpHeaders = includeHttpHeaders;
this.streamLimit = streamLimit;
}

/**
Expand Down Expand Up @@ -119,10 +122,10 @@ public EventMessageContext get() {
}
}

return new EventMessageContext(message, includeHttpHeaders, span, spanInScope);
return new EventMessageContext(message, includeHttpHeaders, streamLimit, span, spanInScope);
} catch (CouldNotPickMessageException | SubscriberDoesNotMatchSubscriptionException e) {
handleException(state, e);
return new EventMessageContext(null, includeHttpHeaders, span, spanInScope);
return new EventMessageContext(null, includeHttpHeaders, streamLimit, span, spanInScope);
} finally {
pickSpan.finish();
}
Expand Down
18 changes: 12 additions & 6 deletions src/main/java/de/telekom/horizon/pulsar/service/SseService.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

package de.telekom.horizon.pulsar.service;

import de.telekom.eni.pandora.horizon.cache.service.DeDuplicationService;
import de.telekom.horizon.pulsar.cache.SubscriberCache;
import de.telekom.horizon.pulsar.config.PulsarConfig;
import de.telekom.horizon.pulsar.exception.SubscriberDoesNotMatchSubscriptionException;
import de.telekom.horizon.pulsar.helper.SseTaskStateContainer;
import de.telekom.horizon.pulsar.helper.StreamLimit;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.util.Strings;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -42,14 +42,12 @@ public class SseService {
* @param sseTaskFactory The {@link SseTaskFactory} for creating Server-Sent Event tasks.
* @param subscriberCache The {@link SubscriberCache} for caching subscriber information.
* @param pulsarConfig The {@link PulsarConfig} for Pulsar-related configuration.
* @param deDuplicationService The {@link DeDuplicationService} for handling message deduplication.
*/
@Autowired
public SseService(TokenService tokenService,
SseTaskFactory sseTaskFactory,
SubscriberCache subscriberCache,
PulsarConfig pulsarConfig,
DeDuplicationService deDuplicationService) {
PulsarConfig pulsarConfig) {

this.tokenService = tokenService;
this.sseTaskFactory = sseTaskFactory;
Expand Down Expand Up @@ -98,14 +96,22 @@ public void validateSubscriberIdForSubscription(String environment, String subsc
* @param includeHttpHeaders A boolean flag indicating whether to include HTTP headers in the emitted events.
* @return The {@link SseTaskStateContainer} representing the state of the emitted events.
*/
public SseTaskStateContainer startEmittingEvents(String environment, String subscriptionId, String contentType, boolean includeHttpHeaders) {
public SseTaskStateContainer startEmittingEvents(String environment, String subscriptionId, String contentType, boolean includeHttpHeaders, StreamLimit streamLimit) {
var responseContainer = new SseTaskStateContainer();

taskExecutor.submit(sseTaskFactory.createNew(environment, subscriptionId, contentType, responseContainer, includeHttpHeaders));
taskExecutor.submit(sseTaskFactory.createNew(environment, subscriptionId, contentType, responseContainer, includeHttpHeaders, streamLimit));

responseContainer.setReady(pulsarConfig.getSseTimeout());

return responseContainer;
}

/**
* Stops emitting events for an existing active stream.
*
* @param subscriptionId The ID of the subscription for which events are being emitted.
*/
public void stopEmittingEvents(String subscriptionId) {
sseTaskFactory.getConnectionCache().removeConnectionForSubscription(subscriptionId);
}
}
42 changes: 39 additions & 3 deletions src/main/java/de/telekom/horizon/pulsar/service/SseTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
import de.telekom.horizon.pulsar.config.PulsarConfig;
import de.telekom.horizon.pulsar.exception.ConnectionCutOutException;
import de.telekom.horizon.pulsar.exception.ConnectionTimeoutException;
import de.telekom.horizon.pulsar.exception.StreamLimitExceededException;
import de.telekom.horizon.pulsar.helper.EventMessageContext;
import de.telekom.horizon.pulsar.helper.SseResponseWrapper;
import de.telekom.horizon.pulsar.helper.SseTaskStateContainer;
import de.telekom.horizon.pulsar.helper.StreamLimit;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -28,11 +30,14 @@
import org.springframework.lang.Nullable;
import org.springframework.web.server.UnsupportedMediaTypeStatusException;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

/**
Expand All @@ -47,6 +52,9 @@ public class SseTask implements Runnable {
private static final String TIMEOUT_MESSAGE = "No events received for more than %s ms";

private final SseTaskStateContainer sseTaskStateContainer;

private final EventMessageSupplier eventMessageSupplier;

@Getter
private final AtomicInteger openConnectionGaugeValue;
private final PulsarConfig pulsarConfig;
Expand All @@ -56,8 +64,14 @@ public class SseTask implements Runnable {

@Setter
private String contentType = APPLICATION_STREAM_JSON_VALUE;
private final Stream<EventMessageContext> stream;

@Getter
private Instant startTime;
@Getter
private Instant stopTime;
private Instant lastEventMessage = Instant.now();
private AtomicLong bytesConsumed = new AtomicLong(0);
private AtomicLong numberConsumed = new AtomicLong(0);

private final AtomicBoolean isCutOut = new AtomicBoolean(false);
private final AtomicBoolean isEmitterCompleted = new AtomicBoolean(false);
Expand All @@ -78,14 +92,13 @@ public SseTask(SseTaskStateContainer sseTaskStateContainer,
SseTaskFactory factory) {

this.sseTaskStateContainer = sseTaskStateContainer;
this.eventMessageSupplier = eventMessageSupplier;
this.openConnectionGaugeValue = openConnectionGaugeValue;

this.pulsarConfig = factory.getPulsarConfig();
this.eventWriter = factory.getEventWriter();
this.deDuplicationService = factory.getDeDuplicationService();
this.tracingHelper = factory.getTracingHelper();

this.stream = createStreamTopology(eventMessageSupplier);
}

/**
Expand Down Expand Up @@ -116,6 +129,8 @@ private Stream<EventMessageContext> createStreamTopology(EventMessageSupplier ev
*/
@Override
public void run() {
var stream = createStreamTopology(eventMessageSupplier);

if (sseTaskStateContainer.getCanceled().get()) {
return;
}
Expand All @@ -124,6 +139,8 @@ public void run() {
sseTaskStateContainer.getEmitter().onCompletion(() -> isEmitterCompleted.compareAndExchange(false, true));
sseTaskStateContainer.getEmitter().onError(e -> isEmitterCompleted.compareAndExchange(false, true));

startTime = Instant.now();

// Mark the task as running and increment the open connection gauge value.
sseTaskStateContainer.getRunning().compareAndExchange(false, true);
openConnectionGaugeValue.getAndSet(1);
Expand All @@ -136,6 +153,7 @@ public void run() {
sseTaskStateContainer.getEmitter().completeWithError(e);
} finally {
openConnectionGaugeValue.getAndSet(0);
stopTime = Instant.now();
}
}

Expand Down Expand Up @@ -172,6 +190,19 @@ private boolean applyStreamEndFilter(EventMessageContext context) {
return false;
}

final StreamLimit streamLimit = context.getStreamLimit();
if (streamLimit != null) {
final boolean maxNumberExceeded = streamLimit.getMaxNumber() > 0 && numberConsumed.get() >= streamLimit.getMaxNumber();
final boolean maxMinutesExceeded = streamLimit.getMaxMinutes() > 0 && ChronoUnit.MINUTES.between(startTime, Instant.now()) >= streamLimit.getMaxMinutes();
final boolean maxBytesExceeded = streamLimit.getMaxBytes() > 0 && bytesConsumed.get() >= streamLimit.getMaxBytes();

if (maxNumberExceeded || maxMinutesExceeded || maxBytesExceeded) {
sseTaskStateContainer.getEmitter().completeWithError(new StreamLimitExceededException());
context.finishSpan();
return false;
}
}

return true;
}

Expand Down Expand Up @@ -258,6 +289,7 @@ private void emitEventMessage(EventMessageContext context) {
try {
sendEvent(msg, context.getIncludeHttpHeaders());
} finally {

context.finishSpan();
}
}
Expand All @@ -277,9 +309,13 @@ private void sendEvent(SubscriptionEventMessage msg, boolean includeHttpHeaders)

try {
var eventJson = serializeEvent(new SseResponseWrapper(msg, includeHttpHeaders));

sseTaskStateContainer.getEmitter().send(eventJson);

pushMetadata(msg, Status.DELIVERED, null);

bytesConsumed.addAndGet(eventJson.getBytes(StandardCharsets.UTF_8).length);
numberConsumed.incrementAndGet();
} catch (JsonProcessingException e) {
var err = String.format("Error occurred while emitting the event: %s", e.getMessage());
log.info(err, e);
Expand Down
Loading

0 comments on commit 1029412

Please sign in to comment.