From 7c6c8fdc7d9523980acbb0c956161b1438b4b945 Mon Sep 17 00:00:00 2001 From: scottf Date: Thu, 14 Nov 2024 15:51:27 -0500 Subject: [PATCH 1/3] Fix simplified ordered consuming when a delivery policy was set. --- .../nats/client/impl/NatsJetStreamImpl.java | 12 ++--- .../nats/client/impl/SimplificationTests.java | 47 +++++++++++++++++++ 2 files changed, 53 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java b/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java index 16f9c35e0..995f4edf1 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java @@ -196,14 +196,13 @@ ConsumerConfiguration consumerConfigurationForOrdered( String consumerName, Long inactiveThreshold) { - ConsumerConfiguration.Builder builder = - ConsumerConfiguration.builder(originalCc) - .deliverSubject(newDeliverSubject) - .startTime(null); // clear start time in case it was originally set + ConsumerConfiguration.Builder builder = ConsumerConfiguration.builder(originalCc).deliverSubject(newDeliverSubject); if (lastStreamSeq > 0) { - builder.deliverPolicy(DeliverPolicy.ByStartSequence) - .startSequence(Math.max(1, lastStreamSeq + 1)); + builder + .deliverPolicy(DeliverPolicy.ByStartSequence) + .startSequence(Math.max(1, lastStreamSeq + 1)) + .startTime(null); // clear start time in case it was originally set } if (consumerName != null && consumerCreate290Available) { @@ -213,6 +212,7 @@ ConsumerConfiguration consumerConfigurationForOrdered( if (inactiveThreshold != null) { builder.inactiveThreshold(inactiveThreshold); } + return builder.build(); } diff --git a/src/test/java/io/nats/client/impl/SimplificationTests.java b/src/test/java/io/nats/client/impl/SimplificationTests.java index e8178e541..66200d9a9 100644 --- a/src/test/java/io/nats/client/impl/SimplificationTests.java +++ b/src/test/java/io/nats/client/impl/SimplificationTests.java @@ -25,6 +25,7 @@ import java.io.*; import java.time.Duration; import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -278,6 +279,52 @@ public void testIterableConsumer() throws Exception { }); } + @Test + public void testOrderedConsumerDeliverPolices() throws Exception { + jsServer.run(TestBase::atLeast2_9_1, nc -> { + // Setup + JetStream js = nc.jetStream(); + JetStreamManagement jsm = nc.jetStreamManagement(); + + TestingStreamContainer tsc = new TestingStreamContainer(jsm); + + // setup mesage data + for (int x = 0; x < 3; x++) { + js.publish(tsc.subject(), null); + sleep(100); // to have a gap to test start time + } + + JetStreamSubscription sub = js.subscribe(tsc.subject()); + Message m = sub.nextMessage(1000); + ZonedDateTime startTime = m.metaData().timestamp().plus(30, ChronoUnit.MILLIS); + sub.unsubscribe(); + + StreamContext sctx = nc.getStreamContext(tsc.stream); + + // test a start time + OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration() + .filterSubject(tsc.subject()) + .deliverPolicy(DeliverPolicy.ByStartTime) + .startTime(startTime); + OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ); + try (IterableConsumer consumer = occtx.iterate()) { + m = consumer.nextMessage(1000); + assertEquals(2, m.metaData().streamSequence()); + } + + // test a start sequence + occ = new OrderedConsumerConfiguration() + .filterSubject(tsc.subject()) + .deliverPolicy(DeliverPolicy.ByStartSequence) + .startSequence(2); + occtx = sctx.createOrderedConsumer(occ); + try (IterableConsumer consumer = occtx.iterate()) { + m = consumer.nextMessage(1000); + assertEquals(2, m.metaData().streamSequence()); + } + }); + } + @Test public void testOrderedIterableConsumerBasic() throws Exception { jsServer.run(TestBase::atLeast2_9_1, nc -> { From 968d3c57fd2aa5305ea2f6494c0963896932f806 Mon Sep 17 00:00:00 2001 From: scottf Date: Fri, 15 Nov 2024 14:08:14 -0500 Subject: [PATCH 2/3] more testing for ordered with start time or start sequence --- .../nats/client/impl/JetStreamTestBase.java | 7 + .../nats/client/impl/SimplificationTests.java | 193 +++++++++++------- 2 files changed, 126 insertions(+), 74 deletions(-) diff --git a/src/test/java/io/nats/client/impl/JetStreamTestBase.java b/src/test/java/io/nats/client/impl/JetStreamTestBase.java index 31e2b476d..c4a0ec333 100644 --- a/src/test/java/io/nats/client/impl/JetStreamTestBase.java +++ b/src/test/java/io/nats/client/impl/JetStreamTestBase.java @@ -205,6 +205,13 @@ public static void jsPublish(JetStream js, String subject, String prefix, int st } } + public static void jsPublish(JetStream js, String subject, int startId, int count, long sleep) throws IOException, JetStreamApiException { + for (int x = 0; x < count; x++) { + js.publish(NatsMessage.builder().subject(subject).data((dataBytes(startId++))).build()); + sleep(sleep); + } + } + public static void jsPublish(JetStream js, String subject, int startId, int count) throws IOException, JetStreamApiException { for (int x = 0; x < count; x++) { js.publish(NatsMessage.builder().subject(subject).data((dataBytes(startId++))).build()); diff --git a/src/test/java/io/nats/client/impl/SimplificationTests.java b/src/test/java/io/nats/client/impl/SimplificationTests.java index 23532409b..c4cf5fef2 100644 --- a/src/test/java/io/nats/client/impl/SimplificationTests.java +++ b/src/test/java/io/nats/client/impl/SimplificationTests.java @@ -285,16 +285,8 @@ public void testOrderedConsumerDeliverPolices() throws Exception { TestingStreamContainer tsc = new TestingStreamContainer(jsm); - // setup mesage data - for (int x = 0; x < 3; x++) { - js.publish(tsc.subject(), null); - sleep(100); // to have a gap to test start time - } - - JetStreamSubscription sub = js.subscribe(tsc.subject()); - Message m = sub.nextMessage(1000); - ZonedDateTime startTime = m.metaData().timestamp().plus(30, ChronoUnit.MILLIS); - sub.unsubscribe(); + jsPublish(js, tsc.subject(), 101, 3, 100); + ZonedDateTime startTime = getStartTimeFirstMessage(js, tsc); StreamContext sctx = nc.getStreamContext(tsc.stream); @@ -305,7 +297,7 @@ public void testOrderedConsumerDeliverPolices() throws Exception { .startTime(startTime); OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ); try (IterableConsumer consumer = occtx.iterate()) { - m = consumer.nextMessage(1000); + Message m = consumer.nextMessage(1000); assertEquals(2, m.metaData().streamSequence()); } @@ -316,7 +308,7 @@ public void testOrderedConsumerDeliverPolices() throws Exception { .startSequence(2); occtx = sctx.createOrderedConsumer(occ); try (IterableConsumer consumer = occtx.iterate()) { - m = consumer.nextMessage(1000); + Message m = consumer.nextMessage(1000); assertEquals(2, m.metaData().streamSequence()); } }); @@ -662,28 +654,49 @@ public void testOrderedBehaviorNext() throws Exception { JetStream js = nc.jetStream(); JetStreamManagement jsm = nc.jetStreamManagement(); - // Get this in place before subscriptions are made - ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedNextTestDropSimulator::new; - TestingStreamContainer tsc = new TestingStreamContainer(jsm); StreamContext sctx = js.getStreamContext(tsc.stream); - jsPublish(js, tsc.subject(), 101, 6); - OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(tsc.subject()); - OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ); - // Loop through the messages to make sure I get stream sequence 1 to 6 - int expectedStreamSeq = 1; - while (expectedStreamSeq <= 6) { - Message m = occtx.next(1000); - if (m != null) { - assertEquals(expectedStreamSeq, m.metaData().streamSequence()); - assertEquals(1, m.metaData().consumerSequence()); - ++expectedStreamSeq; - } - } + jsPublish(js, tsc.subject(), 101, 6, 100); + ZonedDateTime startTime = getStartTimeFirstMessage(js, tsc); + + // New pomm factory in place before each subscription is made + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedNextTestDropSimulator::new; + orderedNext(sctx, 1, new OrderedConsumerConfiguration().filterSubject(tsc.subject())); + + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedNextTestDropSimulator::new; + orderedNext(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject()) + .deliverPolicy(DeliverPolicy.ByStartTime).startTime(startTime)); + + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedNextTestDropSimulator::new; + orderedNext(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject()) + .deliverPolicy(DeliverPolicy.ByStartSequence).startSequence(2)); }); } + private static ZonedDateTime getStartTimeFirstMessage(JetStream js, TestingStreamContainer tsc) throws IOException, JetStreamApiException, InterruptedException { + ZonedDateTime startTime; + JetStreamSubscription sub = js.subscribe(tsc.subject()); + Message mt = sub.nextMessage(1000); + startTime = mt.metaData().timestamp().plus(30, ChronoUnit.MILLIS); + sub.unsubscribe(); + return startTime; + } + + private static void orderedNext(StreamContext sctx, int expectedStreamSeq, OrderedConsumerConfiguration occ) throws IOException, JetStreamApiException, InterruptedException, JetStreamStatusCheckedException { + OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ); + // Loop through the messages to make sure I get stream sequence 1 to 6 + while (expectedStreamSeq <= 6) { + Message m = occtx.next(1000); + if (m != null) { + assertEquals(expectedStreamSeq, m.metaData().streamSequence()); + assertEquals(1, m.metaData().consumerSequence()); + ++expectedStreamSeq; + } + } + } + + public static long CS_FOR_SS_3 = 3; public static class PullOrderedTestDropSimulator extends PullOrderedMessageManager { @SuppressWarnings("ClassEscapesDefinedScope") public PullOrderedTestDropSimulator(NatsConnection conn, NatsJetStream js, String stream, SubscribeOptions so, ConsumerConfiguration serverCC, boolean queueMode, boolean syncMode) { @@ -693,8 +706,8 @@ public PullOrderedTestDropSimulator(NatsConnection conn, NatsJetStream js, Strin @Override protected Boolean beforeQueueProcessorImpl(NatsMessage msg) { if (msg.isJetStream() - && msg.metaData().streamSequence() == 2 - && msg.metaData().consumerSequence() == 2) + && msg.metaData().streamSequence() == 3 + && msg.metaData().consumerSequence() == CS_FOR_SS_3) { return false; } @@ -710,39 +723,55 @@ public void testOrderedBehaviorFetch() throws Exception { JetStream js = nc.jetStream(); JetStreamManagement jsm = nc.jetStreamManagement(); - // Get this in place before subscriptions are made - ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; - TestingStreamContainer tsc = new TestingStreamContainer(jsm); StreamContext sctx = js.getStreamContext(tsc.stream); - jsPublish(js, tsc.subject(), 101, 5); - OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(tsc.subject()); - OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ); - int expectedStreamSeq = 1; - FetchConsumeOptions fco = FetchConsumeOptions.builder().maxMessages(6).expiresIn(1000).build(); - try (FetchConsumer fcon = occtx.fetch(fco)) { - Message m = fcon.nextMessage(); - while (m != null) { - assertEquals(expectedStreamSeq++, m.metaData().streamSequence()); - m = fcon.nextMessage(); - } - // we know this because the simulator is designed to fail the first time at the second message - assertEquals(2, expectedStreamSeq); - // fetch failure will stop the consumer, but make sure it's done b/c with ordered - // I can't have more than one consuming at a time. - while (!fcon.isFinished()) { - sleep(1); - } + + jsPublish(js, tsc.subject(), 101, 6, 100); + ZonedDateTime startTime = getStartTimeFirstMessage(js, tsc); + + // New pomm factory in place before each subscription is made + // Set the Consumer Sequence For Stream Sequence 3 statically for ease + CS_FOR_SS_3 = 3; + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; + orderedFetch(sctx, 1, new OrderedConsumerConfiguration().filterSubject(tsc.subject())); + + CS_FOR_SS_3 = 2; + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; + orderedFetch(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject()) + .deliverPolicy(DeliverPolicy.ByStartTime).startTime(startTime)); + + CS_FOR_SS_3 = 2; + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; + orderedFetch(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject()) + .deliverPolicy(DeliverPolicy.ByStartSequence).startSequence(2)); + }); + } + + private static void orderedFetch(StreamContext sctx, int expectedStreamSeq, OrderedConsumerConfiguration occ) throws Exception { + OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ); + FetchConsumeOptions fco = FetchConsumeOptions.builder().maxMessages(6).expiresIn(1000).build(); + try (FetchConsumer fcon = occtx.fetch(fco)) { + Message m = fcon.nextMessage(); + while (m != null) { + assertEquals(expectedStreamSeq++, m.metaData().streamSequence()); + m = fcon.nextMessage(); } - // this should finish without error - try (FetchConsumer fcon = occtx.fetch(fco)) { - Message m = fcon.nextMessage(); - while (expectedStreamSeq <= 5) { - assertEquals(expectedStreamSeq++, m.metaData().streamSequence()); - m = fcon.nextMessage(); - } + // we know this because the simulator is designed to fail the first time at the third message + assertEquals(3, expectedStreamSeq); + // fetch failure will stop the consumer, but make sure it's done b/c with ordered + // I can't have more than one consuming at a time. + while (!fcon.isFinished()) { + sleep(1); } - }); + } + // this should finish without error + try (FetchConsumer fcon = occtx.fetch(fco)) { + Message m = fcon.nextMessage(); + while (expectedStreamSeq <= 6) { + assertEquals(expectedStreamSeq++, m.metaData().streamSequence()); + m = fcon.nextMessage(); + } + } } @Test @@ -752,25 +781,41 @@ public void testOrderedBehaviorIterable() throws Exception { JetStream js = nc.jetStream(); JetStreamManagement jsm = nc.jetStreamManagement(); - // Get this in place before subscriptions are made - ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; - TestingStreamContainer tsc = new TestingStreamContainer(jsm); StreamContext sctx = js.getStreamContext(tsc.stream); - jsPublish(js, tsc.subject(), 101, 5); - OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(tsc.subject()); - OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ); - try (IterableConsumer icon = occtx.iterate()) { - // Loop through the messages to make sure I get stream sequence 1 to 5 - int expectedStreamSeq = 1; - while (expectedStreamSeq <= 5) { - Message m = icon.nextMessage(Duration.ofSeconds(1)); // use duration version here for coverage - if (m != null) { - assertEquals(expectedStreamSeq++, m.metaData().streamSequence()); - } + + jsPublish(js, tsc.subject(), 101, 6, 100); + ZonedDateTime startTime = getStartTimeFirstMessage(js, tsc); + + // New pomm factory in place before each subscription is made + // Set the Consumer Sequence For Stream Sequence 3 statically for ease + CS_FOR_SS_3 = 3; + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; + orderedIterate(sctx, 1, new OrderedConsumerConfiguration().filterSubject(tsc.subject())); + + CS_FOR_SS_3 = 2; + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; + orderedIterate(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject()) + .deliverPolicy(DeliverPolicy.ByStartTime).startTime(startTime)); + + CS_FOR_SS_3 = 2; + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; + orderedIterate(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject()) + .deliverPolicy(DeliverPolicy.ByStartSequence).startSequence(2)); + }); + } + + private static void orderedIterate(StreamContext sctx, int expectedStreamSeq, OrderedConsumerConfiguration occ) throws Exception { + OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ); + try (IterableConsumer icon = occtx.iterate()) { + // Loop through the messages to make sure I get stream sequence 1 to 5 + while (expectedStreamSeq <= 5) { + Message m = icon.nextMessage(Duration.ofSeconds(1)); // use duration version here for coverage + if (m != null) { + assertEquals(expectedStreamSeq++, m.metaData().streamSequence()); } } - }); + } } @Test From ddf7e8d929c9593dcb6b4f044f4508cfe606e2aa Mon Sep 17 00:00:00 2001 From: scottf Date: Fri, 15 Nov 2024 14:11:06 -0500 Subject: [PATCH 3/3] more testing for ordered with start time or start sequence --- .../nats/client/impl/SimplificationTests.java | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/test/java/io/nats/client/impl/SimplificationTests.java b/src/test/java/io/nats/client/impl/SimplificationTests.java index c4cf5fef2..3bd146542 100644 --- a/src/test/java/io/nats/client/impl/SimplificationTests.java +++ b/src/test/java/io/nats/client/impl/SimplificationTests.java @@ -266,7 +266,7 @@ public void testIterableConsumer() throws Exception { int stopCount = 500; // create the consumer then use it try (IterableConsumer consumer = consumerContext.iterate()) { - _testIterable(js, stopCount, consumer, tsc.subject()); + _testIterableBasic(js, stopCount, consumer, tsc.subject()); } // coverage @@ -327,12 +327,12 @@ public void testOrderedIterableConsumerBasic() throws Exception { OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(tsc.subject()); OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ); try (IterableConsumer consumer = occtx.iterate()) { - _testIterable(js, stopCount, consumer, tsc.subject()); + _testIterableBasic(js, stopCount, consumer, tsc.subject()); } }); } - private static void _testIterable(JetStream js, int stopCount, IterableConsumer consumer, String subject) throws InterruptedException { + private static void _testIterableBasic(JetStream js, int stopCount, IterableConsumer consumer, String subject) throws InterruptedException { AtomicInteger count = new AtomicInteger(); Thread consumeThread = new Thread(() -> { try { @@ -662,14 +662,14 @@ public void testOrderedBehaviorNext() throws Exception { // New pomm factory in place before each subscription is made ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedNextTestDropSimulator::new; - orderedNext(sctx, 1, new OrderedConsumerConfiguration().filterSubject(tsc.subject())); + _testOrderedNext(sctx, 1, new OrderedConsumerConfiguration().filterSubject(tsc.subject())); ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedNextTestDropSimulator::new; - orderedNext(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject()) + _testOrderedNext(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject()) .deliverPolicy(DeliverPolicy.ByStartTime).startTime(startTime)); ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedNextTestDropSimulator::new; - orderedNext(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject()) + _testOrderedNext(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject()) .deliverPolicy(DeliverPolicy.ByStartSequence).startSequence(2)); }); } @@ -683,7 +683,7 @@ private static ZonedDateTime getStartTimeFirstMessage(JetStream js, TestingStrea return startTime; } - private static void orderedNext(StreamContext sctx, int expectedStreamSeq, OrderedConsumerConfiguration occ) throws IOException, JetStreamApiException, InterruptedException, JetStreamStatusCheckedException { + private static void _testOrderedNext(StreamContext sctx, int expectedStreamSeq, OrderedConsumerConfiguration occ) throws IOException, JetStreamApiException, InterruptedException, JetStreamStatusCheckedException { OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ); // Loop through the messages to make sure I get stream sequence 1 to 6 while (expectedStreamSeq <= 6) { @@ -733,21 +733,21 @@ public void testOrderedBehaviorFetch() throws Exception { // Set the Consumer Sequence For Stream Sequence 3 statically for ease CS_FOR_SS_3 = 3; ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; - orderedFetch(sctx, 1, new OrderedConsumerConfiguration().filterSubject(tsc.subject())); + _testOrderedFetch(sctx, 1, new OrderedConsumerConfiguration().filterSubject(tsc.subject())); CS_FOR_SS_3 = 2; ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; - orderedFetch(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject()) + _testOrderedFetch(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject()) .deliverPolicy(DeliverPolicy.ByStartTime).startTime(startTime)); CS_FOR_SS_3 = 2; ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; - orderedFetch(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject()) + _testOrderedFetch(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject()) .deliverPolicy(DeliverPolicy.ByStartSequence).startSequence(2)); }); } - private static void orderedFetch(StreamContext sctx, int expectedStreamSeq, OrderedConsumerConfiguration occ) throws Exception { + private static void _testOrderedFetch(StreamContext sctx, int expectedStreamSeq, OrderedConsumerConfiguration occ) throws Exception { OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ); FetchConsumeOptions fco = FetchConsumeOptions.builder().maxMessages(6).expiresIn(1000).build(); try (FetchConsumer fcon = occtx.fetch(fco)) { @@ -791,21 +791,21 @@ public void testOrderedBehaviorIterable() throws Exception { // Set the Consumer Sequence For Stream Sequence 3 statically for ease CS_FOR_SS_3 = 3; ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; - orderedIterate(sctx, 1, new OrderedConsumerConfiguration().filterSubject(tsc.subject())); + _testOrderedIterate(sctx, 1, new OrderedConsumerConfiguration().filterSubject(tsc.subject())); CS_FOR_SS_3 = 2; ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; - orderedIterate(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject()) + _testOrderedIterate(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject()) .deliverPolicy(DeliverPolicy.ByStartTime).startTime(startTime)); CS_FOR_SS_3 = 2; ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; - orderedIterate(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject()) + _testOrderedIterate(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject()) .deliverPolicy(DeliverPolicy.ByStartSequence).startSequence(2)); }); } - private static void orderedIterate(StreamContext sctx, int expectedStreamSeq, OrderedConsumerConfiguration occ) throws Exception { + private static void _testOrderedIterate(StreamContext sctx, int expectedStreamSeq, OrderedConsumerConfiguration occ) throws Exception { OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ); try (IterableConsumer icon = occtx.iterate()) { // Loop through the messages to make sure I get stream sequence 1 to 5