Skip to content

Commit

Permalink
Merge pull request #33 from telekom/feature/offset-based-sse
Browse files Browse the repository at this point in the history
feat: added mongo db query required for offset-based sse
  • Loading branch information
mherwig authored Nov 28, 2024
2 parents 0a2c570 + a8d9273 commit 258619d
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import de.telekom.eni.pandora.horizon.model.event.DeliveryType;
import de.telekom.eni.pandora.horizon.model.event.Status;
import de.telekom.eni.pandora.horizon.mongo.model.MessageStateMongoDocument;
import org.bson.types.ObjectId;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.data.mongodb.repository.MongoRepository;
Expand Down Expand Up @@ -42,6 +43,11 @@ public interface MessageStateMongoRepo extends MongoRepository<MessageStateMongo
@Query(value = "{status: {$in: ?0}, deliveryType: ?1, subscriptionId: ?2}", sort = "{timestamp: 1}")
Slice<MessageStateMongoDocument> findByStatusInAndDeliveryTypeAndSubscriptionIdAsc(List<Status> status, DeliveryType deliveryType, String subscriptionId, Pageable pageable);

@Query(value = "{deliveryType: ?0, subscriptionId: ?1, timestamp: { $gt: ?2 }}", sort = "{timestamp: 1}")
List<MessageStateMongoDocument> findByDeliveryTypeAndSubscriptionIdAndTimestampGreaterThanAsc(DeliveryType deliveryType, String subscriptionId, Date timestamp);
@Query(value = "{deliveryType: ?0, subscriptionId: ?1, timestamp: { $gt: ?2 }}", sort = "{timestamp: 1}")
Slice<MessageStateMongoDocument> findByDeliveryTypeAndSubscriptionIdAndTimestampGreaterThanAsc(DeliveryType deliveryType, String subscriptionId, Date timestamp, Pageable pageable);

@Query(value = "{status: {$in: ?0}, deliveryType: ?1, subscriptionId: {$in: ?2}}", sort = "{timestamp: 1}")
List<MessageStateMongoDocument> findByStatusInAndDeliveryTypeAndSubscriptionIdsAsc(List<Status> status, DeliveryType deliveryType, List<String> subscriptionIds);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import de.telekom.eni.pandora.horizon.mongo.model.MessageStateMongoDocument;
import de.telekom.eni.pandora.horizon.mongo.repository.MessageStateMongoRepo;
import org.junit.jupiter.api.*;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
Expand All @@ -32,6 +34,7 @@
import java.sql.Time;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;

import static org.junit.jupiter.api.Assertions.*;

Expand Down Expand Up @@ -215,8 +218,62 @@ void testFindByPartitionAndStatusAndDeliveryType() {
}
}

static MessageStateMongoDocument createDummyStateWithStatus(Status status, String multiplexedFrom) {
@ParameterizedTest
@Order(11)
@ValueSource(booleans = {false, true})
@DisplayName("Search for any inserted StatusMessage by DeliveryType and SubscriptionId that has been created after a given timestamp")
void findByDeliveryTypeAndSubscriptionIdAndTimestampGreaterThanAsc(boolean pagable) throws InterruptedException {
var subscriptionId = UUID.randomUUID().toString();

var docCount = 10;

// first, we will create a bunch of documents for the given subscriptionId
var documents = new ArrayList<MessageStateMongoDocument>();
for (var i = 0; i < docCount; i++) {
var index = ThreadLocalRandom.current().nextInt(0, Status.values().length);
// create document with random status
var document = createDummyStateWithStatus(subscriptionId, Status.values()[index], UUID.randomUUID().toString());
Thread.sleep(1); // sleep 1 ms otherwise timestamps would be all the same.
documents.add(document);
}
messageStateMongoRepo.saveAll(documents);

var doc = documents.get(3); // pick 4th document

var timestamp = doc.getTimestamp();

// find all messages for the given subscription that are newer than the 4th message
List<MessageStateMongoDocument> foundMessages = null;

if (pagable) {
Pageable pageable = PageRequest.of(0, docCount, Sort.by(Sort.Direction.ASC, "timestamp"));
foundMessages = messageStateMongoRepo.findByDeliveryTypeAndSubscriptionIdAndTimestampGreaterThanAsc(testDeliveryType, subscriptionId, timestamp, pageable).getContent();
} else {
foundMessages = messageStateMongoRepo.findByDeliveryTypeAndSubscriptionIdAndTimestampGreaterThanAsc(testDeliveryType, subscriptionId, timestamp);
}

assertNotNull(foundMessages);

// we expect to get 6 messages that are newer (total of 10)
var expectedDocsCount = documents.size() - (documents.indexOf(doc) +1);
assertEquals(expectedDocsCount, foundMessages.size());

// let's check if all filter criteria apply to the entries found
var result = foundMessages.stream()
.filter(m -> testDeliveryType.equals(m.getDeliveryType()))
.filter(m -> subscriptionId.equals(m.getSubscriptionId()))
.filter(m -> timestamp.toInstant().isBefore(m.getTimestamp().toInstant()))
.toList();

assertEquals(expectedDocsCount, result.size());
}

static MessageStateMongoDocument createDummyStateWithStatus(Status status, String multiplexedFrom) {
return createDummyStateWithStatus(UUID.randomUUID().toString(), status, multiplexedFrom);
}


static MessageStateMongoDocument createDummyStateWithStatus(String subscriptionId, Status status, String multiplexedFrom) {
return new MessageStateMongoDocument( //
UUID.randomUUID().toString(), // String uuid,
new Coordinates(testPartition, 458), // Coordinates,
Expand Down

0 comments on commit 258619d

Please sign in to comment.