From 1bf06d56423c128cfbfd775c5ba3d33e570057d7 Mon Sep 17 00:00:00 2001 From: Scott Fauerbach Date: Fri, 15 Nov 2024 14:20:23 -0500 Subject: [PATCH] Fix simplified ordered consuming when a delivery policy was set. (#1251) * Fix simplified ordered consuming when a delivery policy was set. * more testing for ordered with start time or start sequence * more testing for ordered with start time or start sequence --- .../nats/client/impl/NatsJetStreamImpl.java | 12 +- .../nats/client/impl/JetStreamTestBase.java | 7 + .../nats/client/impl/SimplificationTests.java | 222 +++++++++++++----- 3 files changed, 170 insertions(+), 71 deletions(-) diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java b/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java index 16f9c35e0..995f4edf1 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java @@ -196,14 +196,13 @@ ConsumerConfiguration consumerConfigurationForOrdered( String consumerName, Long inactiveThreshold) { - ConsumerConfiguration.Builder builder = - ConsumerConfiguration.builder(originalCc) - .deliverSubject(newDeliverSubject) - .startTime(null); // clear start time in case it was originally set + ConsumerConfiguration.Builder builder = ConsumerConfiguration.builder(originalCc).deliverSubject(newDeliverSubject); if (lastStreamSeq > 0) { - builder.deliverPolicy(DeliverPolicy.ByStartSequence) - .startSequence(Math.max(1, lastStreamSeq + 1)); + builder + .deliverPolicy(DeliverPolicy.ByStartSequence) + .startSequence(Math.max(1, lastStreamSeq + 1)) + .startTime(null); // clear start time in case it was originally set } if (consumerName != null && consumerCreate290Available) { @@ -213,6 +212,7 @@ ConsumerConfiguration consumerConfigurationForOrdered( if (inactiveThreshold != null) { builder.inactiveThreshold(inactiveThreshold); } + return builder.build(); } diff --git a/src/test/java/io/nats/client/impl/JetStreamTestBase.java b/src/test/java/io/nats/client/impl/JetStreamTestBase.java index 31e2b476d..c4a0ec333 100644 --- a/src/test/java/io/nats/client/impl/JetStreamTestBase.java +++ b/src/test/java/io/nats/client/impl/JetStreamTestBase.java @@ -205,6 +205,13 @@ public static void jsPublish(JetStream js, String subject, String prefix, int st } } + public static void jsPublish(JetStream js, String subject, int startId, int count, long sleep) throws IOException, JetStreamApiException { + for (int x = 0; x < count; x++) { + js.publish(NatsMessage.builder().subject(subject).data((dataBytes(startId++))).build()); + sleep(sleep); + } + } + public static void jsPublish(JetStream js, String subject, int startId, int count) throws IOException, JetStreamApiException { for (int x = 0; x < count; x++) { js.publish(NatsMessage.builder().subject(subject).data((dataBytes(startId++))).build()); diff --git a/src/test/java/io/nats/client/impl/SimplificationTests.java b/src/test/java/io/nats/client/impl/SimplificationTests.java index 90d59ea7c..3bd146542 100644 --- a/src/test/java/io/nats/client/impl/SimplificationTests.java +++ b/src/test/java/io/nats/client/impl/SimplificationTests.java @@ -22,6 +22,7 @@ import java.io.*; import java.time.Duration; import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -265,7 +266,7 @@ public void testIterableConsumer() throws Exception { int stopCount = 500; // create the consumer then use it try (IterableConsumer consumer = consumerContext.iterate()) { - _testIterable(js, stopCount, consumer, tsc.subject()); + _testIterableBasic(js, stopCount, consumer, tsc.subject()); } // coverage @@ -275,6 +276,44 @@ public void testIterableConsumer() throws Exception { }); } + @Test + public void testOrderedConsumerDeliverPolices() throws Exception { + jsServer.run(TestBase::atLeast2_9_1, nc -> { + // Setup + JetStream js = nc.jetStream(); + JetStreamManagement jsm = nc.jetStreamManagement(); + + TestingStreamContainer tsc = new TestingStreamContainer(jsm); + + jsPublish(js, tsc.subject(), 101, 3, 100); + ZonedDateTime startTime = getStartTimeFirstMessage(js, tsc); + + StreamContext sctx = nc.getStreamContext(tsc.stream); + + // test a start time + OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration() + .filterSubject(tsc.subject()) + .deliverPolicy(DeliverPolicy.ByStartTime) + .startTime(startTime); + OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ); + try (IterableConsumer consumer = occtx.iterate()) { + Message m = consumer.nextMessage(1000); + assertEquals(2, m.metaData().streamSequence()); + } + + // test a start sequence + occ = new OrderedConsumerConfiguration() + .filterSubject(tsc.subject()) + .deliverPolicy(DeliverPolicy.ByStartSequence) + .startSequence(2); + occtx = sctx.createOrderedConsumer(occ); + try (IterableConsumer consumer = occtx.iterate()) { + Message m = consumer.nextMessage(1000); + assertEquals(2, m.metaData().streamSequence()); + } + }); + } + @Test public void testOrderedIterableConsumerBasic() throws Exception { jsServer.run(TestBase::atLeast2_9_1, nc -> { @@ -288,12 +327,12 @@ public void testOrderedIterableConsumerBasic() throws Exception { OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(tsc.subject()); OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ); try (IterableConsumer consumer = occtx.iterate()) { - _testIterable(js, stopCount, consumer, tsc.subject()); + _testIterableBasic(js, stopCount, consumer, tsc.subject()); } }); } - private static void _testIterable(JetStream js, int stopCount, IterableConsumer consumer, String subject) throws InterruptedException { + private static void _testIterableBasic(JetStream js, int stopCount, IterableConsumer consumer, String subject) throws InterruptedException { AtomicInteger count = new AtomicInteger(); Thread consumeThread = new Thread(() -> { try { @@ -615,28 +654,49 @@ public void testOrderedBehaviorNext() throws Exception { JetStream js = nc.jetStream(); JetStreamManagement jsm = nc.jetStreamManagement(); - // Get this in place before subscriptions are made - ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedNextTestDropSimulator::new; - TestingStreamContainer tsc = new TestingStreamContainer(jsm); StreamContext sctx = js.getStreamContext(tsc.stream); - jsPublish(js, tsc.subject(), 101, 6); - OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(tsc.subject()); - OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ); - // Loop through the messages to make sure I get stream sequence 1 to 6 - int expectedStreamSeq = 1; - while (expectedStreamSeq <= 6) { - Message m = occtx.next(1000); - if (m != null) { - assertEquals(expectedStreamSeq, m.metaData().streamSequence()); - assertEquals(1, m.metaData().consumerSequence()); - ++expectedStreamSeq; - } - } + jsPublish(js, tsc.subject(), 101, 6, 100); + ZonedDateTime startTime = getStartTimeFirstMessage(js, tsc); + + // New pomm factory in place before each subscription is made + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedNextTestDropSimulator::new; + _testOrderedNext(sctx, 1, new OrderedConsumerConfiguration().filterSubject(tsc.subject())); + + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedNextTestDropSimulator::new; + _testOrderedNext(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject()) + .deliverPolicy(DeliverPolicy.ByStartTime).startTime(startTime)); + + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedNextTestDropSimulator::new; + _testOrderedNext(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject()) + .deliverPolicy(DeliverPolicy.ByStartSequence).startSequence(2)); }); } + private static ZonedDateTime getStartTimeFirstMessage(JetStream js, TestingStreamContainer tsc) throws IOException, JetStreamApiException, InterruptedException { + ZonedDateTime startTime; + JetStreamSubscription sub = js.subscribe(tsc.subject()); + Message mt = sub.nextMessage(1000); + startTime = mt.metaData().timestamp().plus(30, ChronoUnit.MILLIS); + sub.unsubscribe(); + return startTime; + } + + private static void _testOrderedNext(StreamContext sctx, int expectedStreamSeq, OrderedConsumerConfiguration occ) throws IOException, JetStreamApiException, InterruptedException, JetStreamStatusCheckedException { + OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ); + // Loop through the messages to make sure I get stream sequence 1 to 6 + while (expectedStreamSeq <= 6) { + Message m = occtx.next(1000); + if (m != null) { + assertEquals(expectedStreamSeq, m.metaData().streamSequence()); + assertEquals(1, m.metaData().consumerSequence()); + ++expectedStreamSeq; + } + } + } + + public static long CS_FOR_SS_3 = 3; public static class PullOrderedTestDropSimulator extends PullOrderedMessageManager { @SuppressWarnings("ClassEscapesDefinedScope") public PullOrderedTestDropSimulator(NatsConnection conn, NatsJetStream js, String stream, SubscribeOptions so, ConsumerConfiguration serverCC, boolean queueMode, boolean syncMode) { @@ -646,8 +706,8 @@ public PullOrderedTestDropSimulator(NatsConnection conn, NatsJetStream js, Strin @Override protected Boolean beforeQueueProcessorImpl(NatsMessage msg) { if (msg.isJetStream() - && msg.metaData().streamSequence() == 2 - && msg.metaData().consumerSequence() == 2) + && msg.metaData().streamSequence() == 3 + && msg.metaData().consumerSequence() == CS_FOR_SS_3) { return false; } @@ -663,39 +723,55 @@ public void testOrderedBehaviorFetch() throws Exception { JetStream js = nc.jetStream(); JetStreamManagement jsm = nc.jetStreamManagement(); - // Get this in place before subscriptions are made - ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; - TestingStreamContainer tsc = new TestingStreamContainer(jsm); StreamContext sctx = js.getStreamContext(tsc.stream); - jsPublish(js, tsc.subject(), 101, 5); - OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(tsc.subject()); - OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ); - int expectedStreamSeq = 1; - FetchConsumeOptions fco = FetchConsumeOptions.builder().maxMessages(6).expiresIn(1000).build(); - try (FetchConsumer fcon = occtx.fetch(fco)) { - Message m = fcon.nextMessage(); - while (m != null) { - assertEquals(expectedStreamSeq++, m.metaData().streamSequence()); - m = fcon.nextMessage(); - } - // we know this because the simulator is designed to fail the first time at the second message - assertEquals(2, expectedStreamSeq); - // fetch failure will stop the consumer, but make sure it's done b/c with ordered - // I can't have more than one consuming at a time. - while (!fcon.isFinished()) { - sleep(1); - } + + jsPublish(js, tsc.subject(), 101, 6, 100); + ZonedDateTime startTime = getStartTimeFirstMessage(js, tsc); + + // New pomm factory in place before each subscription is made + // Set the Consumer Sequence For Stream Sequence 3 statically for ease + CS_FOR_SS_3 = 3; + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; + _testOrderedFetch(sctx, 1, new OrderedConsumerConfiguration().filterSubject(tsc.subject())); + + CS_FOR_SS_3 = 2; + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; + _testOrderedFetch(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject()) + .deliverPolicy(DeliverPolicy.ByStartTime).startTime(startTime)); + + CS_FOR_SS_3 = 2; + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; + _testOrderedFetch(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject()) + .deliverPolicy(DeliverPolicy.ByStartSequence).startSequence(2)); + }); + } + + private static void _testOrderedFetch(StreamContext sctx, int expectedStreamSeq, OrderedConsumerConfiguration occ) throws Exception { + OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ); + FetchConsumeOptions fco = FetchConsumeOptions.builder().maxMessages(6).expiresIn(1000).build(); + try (FetchConsumer fcon = occtx.fetch(fco)) { + Message m = fcon.nextMessage(); + while (m != null) { + assertEquals(expectedStreamSeq++, m.metaData().streamSequence()); + m = fcon.nextMessage(); } - // this should finish without error - try (FetchConsumer fcon = occtx.fetch(fco)) { - Message m = fcon.nextMessage(); - while (expectedStreamSeq <= 5) { - assertEquals(expectedStreamSeq++, m.metaData().streamSequence()); - m = fcon.nextMessage(); - } + // we know this because the simulator is designed to fail the first time at the third message + assertEquals(3, expectedStreamSeq); + // fetch failure will stop the consumer, but make sure it's done b/c with ordered + // I can't have more than one consuming at a time. + while (!fcon.isFinished()) { + sleep(1); } - }); + } + // this should finish without error + try (FetchConsumer fcon = occtx.fetch(fco)) { + Message m = fcon.nextMessage(); + while (expectedStreamSeq <= 6) { + assertEquals(expectedStreamSeq++, m.metaData().streamSequence()); + m = fcon.nextMessage(); + } + } } @Test @@ -705,25 +781,41 @@ public void testOrderedBehaviorIterable() throws Exception { JetStream js = nc.jetStream(); JetStreamManagement jsm = nc.jetStreamManagement(); - // Get this in place before subscriptions are made - ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; - TestingStreamContainer tsc = new TestingStreamContainer(jsm); StreamContext sctx = js.getStreamContext(tsc.stream); - jsPublish(js, tsc.subject(), 101, 5); - OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(tsc.subject()); - OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ); - try (IterableConsumer icon = occtx.iterate()) { - // Loop through the messages to make sure I get stream sequence 1 to 5 - int expectedStreamSeq = 1; - while (expectedStreamSeq <= 5) { - Message m = icon.nextMessage(Duration.ofSeconds(1)); // use duration version here for coverage - if (m != null) { - assertEquals(expectedStreamSeq++, m.metaData().streamSequence()); - } + + jsPublish(js, tsc.subject(), 101, 6, 100); + ZonedDateTime startTime = getStartTimeFirstMessage(js, tsc); + + // New pomm factory in place before each subscription is made + // Set the Consumer Sequence For Stream Sequence 3 statically for ease + CS_FOR_SS_3 = 3; + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; + _testOrderedIterate(sctx, 1, new OrderedConsumerConfiguration().filterSubject(tsc.subject())); + + CS_FOR_SS_3 = 2; + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; + _testOrderedIterate(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject()) + .deliverPolicy(DeliverPolicy.ByStartTime).startTime(startTime)); + + CS_FOR_SS_3 = 2; + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new; + _testOrderedIterate(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject()) + .deliverPolicy(DeliverPolicy.ByStartSequence).startSequence(2)); + }); + } + + private static void _testOrderedIterate(StreamContext sctx, int expectedStreamSeq, OrderedConsumerConfiguration occ) throws Exception { + OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ); + try (IterableConsumer icon = occtx.iterate()) { + // Loop through the messages to make sure I get stream sequence 1 to 5 + while (expectedStreamSeq <= 5) { + Message m = icon.nextMessage(Duration.ofSeconds(1)); // use duration version here for coverage + if (m != null) { + assertEquals(expectedStreamSeq++, m.metaData().streamSequence()); } } - }); + } } @Test