From 1fbee9ba12f3fe95fc3d162c9d5fe12d69fdbf6f Mon Sep 17 00:00:00 2001 From: Scott Fauerbach Date: Tue, 15 Oct 2024 12:47:18 -0400 Subject: [PATCH] Extract non 211 Part 2 (#1242) --- .../java/io/nats/client/FeatureOptions.java | 3 +- .../java/io/nats/client/SubscribeOptions.java | 3 +- .../client/impl/JetStreamGeneralTests.java | 18 ++++---- .../client/impl/JetStreamManagementTests.java | 24 +++++------ .../nats/client/impl/JetStreamPullTests.java | 26 ++++++------ .../client/impl/JetStreamPushQueueTests.java | 2 +- .../nats/client/impl/JetStreamTestBase.java | 6 +-- .../nats/client/impl/SimplificationTests.java | 42 +++++++++---------- .../nats/client/support/ValidatorTests.java | 2 +- 9 files changed, 62 insertions(+), 64 deletions(-) diff --git a/src/main/java/io/nats/client/FeatureOptions.java b/src/main/java/io/nats/client/FeatureOptions.java index 1ca3cb8be..38b085e1b 100644 --- a/src/main/java/io/nats/client/FeatureOptions.java +++ b/src/main/java/io/nats/client/FeatureOptions.java @@ -22,8 +22,7 @@ public abstract class FeatureOptions { private final JetStreamOptions jso; - @SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars - protected FeatureOptions(Builder b) { + protected FeatureOptions(Builder b) { jso = b.jsoBuilder.build(); } diff --git a/src/main/java/io/nats/client/SubscribeOptions.java b/src/main/java/io/nats/client/SubscribeOptions.java index 74a75907e..acfd3e03b 100644 --- a/src/main/java/io/nats/client/SubscribeOptions.java +++ b/src/main/java/io/nats/client/SubscribeOptions.java @@ -38,8 +38,7 @@ public abstract class SubscribeOptions { protected final long pendingByteLimit; // Only applicable for non dispatched (sync) push consumers. protected final String name; - @SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars - protected SubscribeOptions(Builder builder, boolean isPull, + protected SubscribeOptions(Builder builder, boolean isPull, String deliverSubject, String deliverGroup, long pendingMessageLimit, long pendingByteLimit) { diff --git a/src/test/java/io/nats/client/impl/JetStreamGeneralTests.java b/src/test/java/io/nats/client/impl/JetStreamGeneralTests.java index 138cf238c..9a47ad375 100644 --- a/src/test/java/io/nats/client/impl/JetStreamGeneralTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamGeneralTests.java @@ -462,7 +462,7 @@ public void testBindPush() throws Exception { jsPublish(js, tsc.subject(), 1, 1); PushSubscribeOptions pso = PushSubscribeOptions.builder() - .durable(tsc.name()) + .durable(tsc.consumerName()) .build(); JetStreamSubscription s = js.subscribe(tsc.subject(), pso); Message m = s.nextMessage(DEFAULT_TIMEOUT); @@ -474,7 +474,7 @@ public void testBindPush() throws Exception { jsPublish(js, tsc.subject(), 2, 1); pso = PushSubscribeOptions.builder() .stream(tsc.stream) - .durable(tsc.name()) + .durable(tsc.consumerName()) .bind(true) .build(); s = js.subscribe(tsc.subject(), pso); @@ -485,7 +485,7 @@ public void testBindPush() throws Exception { unsubscribeEnsureNotBound(s); jsPublish(js, tsc.subject(), 3, 1); - pso = PushSubscribeOptions.bind(tsc.stream, tsc.name()); + pso = PushSubscribeOptions.bind(tsc.stream, tsc.consumerName()); s = js.subscribe(tsc.subject(), pso); m = s.nextMessage(DEFAULT_TIMEOUT); assertNotNull(m); @@ -495,7 +495,7 @@ public void testBindPush() throws Exception { () -> PushSubscribeOptions.builder().stream(tsc.stream).bind(true).build()); assertThrows(IllegalArgumentException.class, - () -> PushSubscribeOptions.builder().durable(tsc.name()).bind(true).build()); + () -> PushSubscribeOptions.builder().durable(tsc.consumerName()).bind(true).build()); assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.builder().stream(EMPTY).bind(true).build()); @@ -514,7 +514,7 @@ public void testBindPull() throws Exception { jsPublish(js, tsc.subject(), 1, 1); PullSubscribeOptions pso = PullSubscribeOptions.builder() - .durable(tsc.name()) + .durable(tsc.consumerName()) .build(); JetStreamSubscription s = js.subscribe(tsc.subject(), pso); s.pull(1); @@ -527,7 +527,7 @@ public void testBindPull() throws Exception { jsPublish(js, tsc.subject(), 2, 1); pso = PullSubscribeOptions.builder() .stream(tsc.stream) - .durable(tsc.name()) + .durable(tsc.consumerName()) .bind(true) .build(); s = js.subscribe(tsc.subject(), pso); @@ -539,7 +539,7 @@ public void testBindPull() throws Exception { unsubscribeEnsureNotBound(s); jsPublish(js, tsc.subject(), 3, 1); - pso = PullSubscribeOptions.bind(tsc.stream, tsc.name()); + pso = PullSubscribeOptions.bind(tsc.stream, tsc.consumerName()); s = js.subscribe(tsc.subject(), pso); s.pull(1); m = s.nextMessage(DEFAULT_TIMEOUT); @@ -960,9 +960,9 @@ public void testInternalLookupConsumerInfoCoverage() throws Exception { // - consumer not found // - stream does not exist JetStreamSubscription sub = js.subscribe(tsc.subject()); - assertNull(((NatsJetStream)js).lookupConsumerInfo(tsc.stream, tsc.name())); + assertNull(((NatsJetStream)js).lookupConsumerInfo(tsc.stream, tsc.consumerName())); assertThrows(JetStreamApiException.class, - () -> ((NatsJetStream)js).lookupConsumerInfo(stream(999), tsc.name())); + () -> ((NatsJetStream)js).lookupConsumerInfo(stream(999), tsc.consumerName())); }); } diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index 757b79f2d..2012b6ee2 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -827,7 +827,7 @@ public void testAddPausedConsumer() throws Exception { ZonedDateTime pauseUntil = ZonedDateTime.now(ZONE_ID_GMT).plusMinutes(2); ConsumerConfiguration cc = ConsumerConfiguration.builder() - .durable(tsc.name()) + .durable(tsc.consumerName()) .pauseUntil(pauseUntil) .build(); @@ -849,7 +849,7 @@ public void testPauseResumeConsumer() throws Exception { assertEquals(0, list.size()); ConsumerConfiguration cc = ConsumerConfiguration.builder() - .durable(tsc.name()) + .durable(tsc.consumerName()) .build(); // durable and name can both be null @@ -886,9 +886,9 @@ public void testPauseResumeConsumer() throws Exception { ci = jsm.getConsumerInfo(tsc.stream, ci.getName()); assertFalse(ci.getPaused()); - assertThrows(JetStreamApiException.class, () -> jsm.pauseConsumer(stream(), tsc.name(), pauseUntil)); + assertThrows(JetStreamApiException.class, () -> jsm.pauseConsumer(stream(), tsc.consumerName(), pauseUntil)); assertThrows(JetStreamApiException.class, () -> jsm.pauseConsumer(tsc.stream, name(), pauseUntil)); - assertThrows(JetStreamApiException.class, () -> jsm.resumeConsumer(stream(), tsc.name())); + assertThrows(JetStreamApiException.class, () -> jsm.resumeConsumer(stream(), tsc.consumerName())); assertThrows(JetStreamApiException.class, () -> jsm.resumeConsumer(tsc.stream, name())); }); } @@ -1009,7 +1009,7 @@ public void testConsumerMetadata() throws Exception { TestingStreamContainer tsc = new TestingStreamContainer(jsm); ConsumerConfiguration cc = ConsumerConfiguration.builder() - .durable(tsc.name()) + .durable(tsc.consumerName()) .metadata(metaData) .build(); @@ -1065,14 +1065,14 @@ public void testGetConsumerInfo() throws Exception { jsServer.run(nc -> { JetStreamManagement jsm = nc.jetStreamManagement(); TestingStreamContainer tsc = new TestingStreamContainer(jsm); - assertThrows(JetStreamApiException.class, () -> jsm.getConsumerInfo(tsc.stream, tsc.name())); - ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.name()).build(); + assertThrows(JetStreamApiException.class, () -> jsm.getConsumerInfo(tsc.stream, tsc.consumerName())); + ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.consumerName()).build(); ConsumerInfo ci = jsm.addOrUpdateConsumer(tsc.stream, cc); assertEquals(tsc.stream, ci.getStreamName()); - assertEquals(tsc.name(), ci.getName()); - ci = jsm.getConsumerInfo(tsc.stream, tsc.name()); + assertEquals(tsc.consumerName(), ci.getName()); + ci = jsm.getConsumerInfo(tsc.stream, tsc.consumerName()); assertEquals(tsc.stream, ci.getStreamName()); - assertEquals(tsc.name(), ci.getName()); + assertEquals(tsc.consumerName(), ci.getName()); assertThrows(JetStreamApiException.class, () -> jsm.getConsumerInfo(tsc.stream, durable(999))); if (nc.getServerInfo().isSameOrNewerThanVersion("2.10")) { assertNotNull(ci.getTimestamp()); @@ -1228,14 +1228,14 @@ public void testConsumerReplica() throws Exception { TestingStreamContainer tsc = new TestingStreamContainer(nc); final ConsumerConfiguration cc0 = ConsumerConfiguration.builder() - .durable(tsc.name()) + .durable(tsc.consumerName()) .build(); ConsumerInfo ci = jsm.addOrUpdateConsumer(tsc.stream, cc0); // server returns 0 when value is not set assertEquals(0, ci.getConsumerConfiguration().getNumReplicas()); final ConsumerConfiguration cc1 = ConsumerConfiguration.builder() - .durable(tsc.name()) + .durable(tsc.consumerName()) .numReplicas(1) .build(); ci = jsm.addOrUpdateConsumer(tsc.stream, cc1); diff --git a/src/test/java/io/nats/client/impl/JetStreamPullTests.java b/src/test/java/io/nats/client/impl/JetStreamPullTests.java index 54f56ef88..3a29ca946 100644 --- a/src/test/java/io/nats/client/impl/JetStreamPullTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamPullTests.java @@ -64,12 +64,12 @@ public void testFetch() throws Exception { .build(); PullSubscribeOptions options = PullSubscribeOptions.builder() - .durable(tsc.name()) + .durable(tsc.consumerName()) .configuration(cc) .build(); JetStreamSubscription sub = js.subscribe(tsc.subject(), options); - assertSubscription(sub, tsc.stream, tsc.name(), null, true); + assertSubscription(sub, tsc.stream, tsc.consumerName(), null, true); nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server List messages = sub.fetch(10, fetchDur); @@ -139,12 +139,12 @@ public void testIterate() throws Exception { .build(); PullSubscribeOptions options = PullSubscribeOptions.builder() - .durable(tsc.name()) + .durable(tsc.consumerName()) .configuration(cc) .build(); JetStreamSubscription sub = js.subscribe(tsc.subject(), options); - assertSubscription(sub, tsc.stream, tsc.name(), null, true); + assertSubscription(sub, tsc.stream, tsc.consumerName(), null, true); nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server Iterator iterator = sub.iterate(10, fetchDur); @@ -218,11 +218,11 @@ public void testBasic() throws Exception { TestingStreamContainer tsc = new TestingStreamContainer(nc); // Build our subscription options. - PullSubscribeOptions options = PullSubscribeOptions.builder().durable(tsc.name()).build(); + PullSubscribeOptions options = PullSubscribeOptions.builder().durable(tsc.consumerName()).build(); // Subscribe synchronously. JetStreamSubscription sub = js.subscribe(tsc.subject(), options); - assertSubscription(sub, tsc.stream, tsc.name(), null, true); + assertSubscription(sub, tsc.stream, tsc.consumerName(), null, true); nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server // publish some amount of messages, but not entire pull size @@ -317,11 +317,11 @@ public void testNoWait() throws Exception { TestingStreamContainer tsc = new TestingStreamContainer(nc); // Build our subscription options. - PullSubscribeOptions options = PullSubscribeOptions.builder().durable(tsc.name()).build(); + PullSubscribeOptions options = PullSubscribeOptions.builder().durable(tsc.consumerName()).build(); // Subscribe synchronously. JetStreamSubscription sub = js.subscribe(tsc.subject(), options); - assertSubscription(sub, tsc.stream, tsc.name(), null, true); + assertSubscription(sub, tsc.stream, tsc.consumerName(), null, true); nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server // publish 10 messages @@ -391,11 +391,11 @@ public void testPullExpires() throws Exception { TestingStreamContainer tsc = new TestingStreamContainer(nc); // Build our subscription options. - PullSubscribeOptions options = PullSubscribeOptions.builder().durable(tsc.name()).build(); + PullSubscribeOptions options = PullSubscribeOptions.builder().durable(tsc.consumerName()).build(); // Subscribe synchronously. JetStreamSubscription sub = js.subscribe(tsc.subject(), options); - assertSubscription(sub, tsc.stream, tsc.name(), null, true); + assertSubscription(sub, tsc.stream, tsc.consumerName(), null, true); nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server long expires = 500; // millis @@ -574,7 +574,7 @@ public void testAckWaitTimeout() throws Exception { .ackWait(1500) .build(); PullSubscribeOptions pso = PullSubscribeOptions.builder() - .durable(tsc.name()) + .durable(tsc.consumerName()) .configuration(cc) .build(); @@ -1108,10 +1108,10 @@ public void testReader() throws Exception { JetStream js = nc.jetStream(); // Pre define a consumer - ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.name()).filterSubjects(tsc.subject()).build(); + ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.consumerName()).filterSubjects(tsc.subject()).build(); jsm.addOrUpdateConsumer(tsc.stream, cc); - PullSubscribeOptions so = PullSubscribeOptions.bind(tsc.stream, tsc.name()); + PullSubscribeOptions so = PullSubscribeOptions.bind(tsc.stream, tsc.consumerName()); JetStreamSubscription sub = js.subscribe(tsc.subject(), so); JetStreamReader reader = sub.reader(500, 125); diff --git a/src/test/java/io/nats/client/impl/JetStreamPushQueueTests.java b/src/test/java/io/nats/client/impl/JetStreamPushQueueTests.java index e7c32397f..47064559b 100644 --- a/src/test/java/io/nats/client/impl/JetStreamPushQueueTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamPushQueueTests.java @@ -43,7 +43,7 @@ public void testQueueSubWorkflow() throws Exception { // - the PushSubscribeOptions can be re-used since all the subscribers are the same // - use a concurrent integer to track all the messages received // - have a list of subscribers and threads so I can track them - PushSubscribeOptions pso = PushSubscribeOptions.builder().durable(tsc.name()).build(); + PushSubscribeOptions pso = PushSubscribeOptions.builder().durable(tsc.consumerName()).build(); AtomicInteger allReceived = new AtomicInteger(); List subscribers = new ArrayList<>(); List subThreads = new ArrayList<>(); diff --git a/src/test/java/io/nats/client/impl/JetStreamTestBase.java b/src/test/java/io/nats/client/impl/JetStreamTestBase.java index a52f80849..31e2b476d 100644 --- a/src/test/java/io/nats/client/impl/JetStreamTestBase.java +++ b/src/test/java/io/nats/client/impl/JetStreamTestBase.java @@ -141,11 +141,11 @@ public String subject(Object variant) { return subjects.computeIfAbsent(variant, TestBase::subject); } - public String name() { - return name(defaultNameVariant); + public String consumerName() { + return consumerName(defaultNameVariant); } - public String name(Object variant) { + public String consumerName(Object variant) { return names.computeIfAbsent(variant, TestBase::name); } } diff --git a/src/test/java/io/nats/client/impl/SimplificationTests.java b/src/test/java/io/nats/client/impl/SimplificationTests.java index de77a3b12..0c2f9b07c 100644 --- a/src/test/java/io/nats/client/impl/SimplificationTests.java +++ b/src/test/java/io/nats/client/impl/SimplificationTests.java @@ -268,11 +268,11 @@ public void testIterableConsumer() throws Exception { JetStream js = nc.jetStream(); // Pre define a consumer - ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.name()).build(); + ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.consumerName()).build(); jsm.addOrUpdateConsumer(tsc.stream, cc); // Consumer[Context] - ConsumerContext consumerContext = js.getConsumerContext(tsc.stream, tsc.name()); + ConsumerContext consumerContext = js.getConsumerContext(tsc.stream, tsc.consumerName()); int stopCount = 500; // create the consumer then use it @@ -355,11 +355,11 @@ public void testConsumeWithHandler() throws Exception { jsPublish(js, tsc.subject(), 2500); // Pre define a consumer - ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.name()).build(); + ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.consumerName()).build(); jsm.addOrUpdateConsumer(tsc.stream, cc); // Consumer[Context] - ConsumerContext consumerContext = js.getConsumerContext(tsc.stream, tsc.name()); + ConsumerContext consumerContext = js.getConsumerContext(tsc.stream, tsc.consumerName()); int stopCount = 500; @@ -428,10 +428,10 @@ public void testCoverage() throws Exception { JetStream js = nc.jetStream(); // Pre define a consumer - jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.name(1)).build()); - jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.name(2)).build()); - jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.name(3)).build()); - jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.name(4)).build()); + jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.consumerName(1)).build()); + jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.consumerName(2)).build()); + jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.consumerName(3)).build()); + jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.consumerName(4)).build()); // Stream[Context] StreamContext sctx1 = nc.getStreamContext(tsc.stream); @@ -439,19 +439,19 @@ public void testCoverage() throws Exception { js.getStreamContext(tsc.stream); // Consumer[Context] - ConsumerContext cctx1 = nc.getConsumerContext(tsc.stream, tsc.name(1)); - ConsumerContext cctx2 = nc.getConsumerContext(tsc.stream, tsc.name(2), JetStreamOptions.DEFAULT_JS_OPTIONS); - ConsumerContext cctx3 = js.getConsumerContext(tsc.stream, tsc.name(3)); - ConsumerContext cctx4 = sctx1.getConsumerContext(tsc.name(4)); - ConsumerContext cctx5 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(tsc.name(5)).build()); - ConsumerContext cctx6 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(tsc.name(6)).build()); - - after(cctx1.iterate(), tsc.name(1), true); - after(cctx2.iterate(ConsumeOptions.DEFAULT_CONSUME_OPTIONS), tsc.name(2), true); - after(cctx3.consume(m -> {}), tsc.name(3), true); - after(cctx4.consume(ConsumeOptions.DEFAULT_CONSUME_OPTIONS, m -> {}), tsc.name(4), true); - after(cctx5.fetchMessages(1), tsc.name(5), false); - after(cctx6.fetchBytes(1000), tsc.name(6), false); + ConsumerContext cctx1 = nc.getConsumerContext(tsc.stream, tsc.consumerName(1)); + ConsumerContext cctx2 = nc.getConsumerContext(tsc.stream, tsc.consumerName(2), JetStreamOptions.DEFAULT_JS_OPTIONS); + ConsumerContext cctx3 = js.getConsumerContext(tsc.stream, tsc.consumerName(3)); + ConsumerContext cctx4 = sctx1.getConsumerContext(tsc.consumerName(4)); + ConsumerContext cctx5 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(tsc.consumerName(5)).build()); + ConsumerContext cctx6 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(tsc.consumerName(6)).build()); + + after(cctx1.iterate(), tsc.consumerName(1), true); + after(cctx2.iterate(ConsumeOptions.DEFAULT_CONSUME_OPTIONS), tsc.consumerName(2), true); + after(cctx3.consume(m -> {}), tsc.consumerName(3), true); + after(cctx4.consume(ConsumeOptions.DEFAULT_CONSUME_OPTIONS, m -> {}), tsc.consumerName(4), true); + after(cctx5.fetchMessages(1), tsc.consumerName(5), false); + after(cctx6.fetchBytes(1000), tsc.consumerName(6), false); }); } diff --git a/src/test/java/io/nats/client/support/ValidatorTests.java b/src/test/java/io/nats/client/support/ValidatorTests.java index 2563db948..594c902dc 100644 --- a/src/test/java/io/nats/client/support/ValidatorTests.java +++ b/src/test/java/io/nats/client/support/ValidatorTests.java @@ -552,7 +552,7 @@ public void testSemver() { } @Test - public void testConsumerFilterSubjectsAreEquivalent() { + public void testListsAreEquivalent() { List l1 = Arrays.asList("one", "two"); List l2 = Arrays.asList("two", "one"); List l3 = Arrays.asList("one", "not");