Skip to content

Commit

Permalink
Fix simplified ordered consuming when a delivery policy was set.
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Nov 14, 2024
1 parent f336bae commit 7c6c8fd
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 6 deletions.
12 changes: 6 additions & 6 deletions src/main/java/io/nats/client/impl/NatsJetStreamImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,13 @@ ConsumerConfiguration consumerConfigurationForOrdered(
String consumerName,
Long inactiveThreshold)
{
ConsumerConfiguration.Builder builder =
ConsumerConfiguration.builder(originalCc)
.deliverSubject(newDeliverSubject)
.startTime(null); // clear start time in case it was originally set
ConsumerConfiguration.Builder builder = ConsumerConfiguration.builder(originalCc).deliverSubject(newDeliverSubject);

if (lastStreamSeq > 0) {
builder.deliverPolicy(DeliverPolicy.ByStartSequence)
.startSequence(Math.max(1, lastStreamSeq + 1));
builder
.deliverPolicy(DeliverPolicy.ByStartSequence)
.startSequence(Math.max(1, lastStreamSeq + 1))
.startTime(null); // clear start time in case it was originally set
}

if (consumerName != null && consumerCreate290Available) {
Expand All @@ -213,6 +212,7 @@ ConsumerConfiguration consumerConfigurationForOrdered(
if (inactiveThreshold != null) {
builder.inactiveThreshold(inactiveThreshold);
}

return builder.build();
}

Expand Down
47 changes: 47 additions & 0 deletions src/test/java/io/nats/client/impl/SimplificationTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.*;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -278,6 +279,52 @@ public void testIterableConsumer() throws Exception {
});
}

@Test
public void testOrderedConsumerDeliverPolices() throws Exception {
jsServer.run(TestBase::atLeast2_9_1, nc -> {
// Setup
JetStream js = nc.jetStream();
JetStreamManagement jsm = nc.jetStreamManagement();

TestingStreamContainer tsc = new TestingStreamContainer(jsm);

// setup mesage data
for (int x = 0; x < 3; x++) {
js.publish(tsc.subject(), null);
sleep(100); // to have a gap to test start time
}

JetStreamSubscription sub = js.subscribe(tsc.subject());
Message m = sub.nextMessage(1000);
ZonedDateTime startTime = m.metaData().timestamp().plus(30, ChronoUnit.MILLIS);
sub.unsubscribe();

StreamContext sctx = nc.getStreamContext(tsc.stream);

// test a start time
OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration()
.filterSubject(tsc.subject())
.deliverPolicy(DeliverPolicy.ByStartTime)
.startTime(startTime);
OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ);
try (IterableConsumer consumer = occtx.iterate()) {
m = consumer.nextMessage(1000);
assertEquals(2, m.metaData().streamSequence());
}

// test a start sequence
occ = new OrderedConsumerConfiguration()
.filterSubject(tsc.subject())
.deliverPolicy(DeliverPolicy.ByStartSequence)
.startSequence(2);
occtx = sctx.createOrderedConsumer(occ);
try (IterableConsumer consumer = occtx.iterate()) {
m = consumer.nextMessage(1000);
assertEquals(2, m.metaData().streamSequence());
}
});
}

@Test
public void testOrderedIterableConsumerBasic() throws Exception {
jsServer.run(TestBase::atLeast2_9_1, nc -> {
Expand Down

0 comments on commit 7c6c8fd

Please sign in to comment.