diff --git a/README.md b/README.md index 32e9b89e7..876af993f 100644 --- a/README.md +++ b/README.md @@ -777,6 +777,7 @@ You can however set the deliver policy which will be used to start the subscript | JsSoNameMismatch | SO | 90110 | Builder name must match the consumer configuration name if both are provided. | | JsSoOrderedMemStorageNotSuppliedOrTrue | SO | 90111 | Mem Storage must be true if supplied. | | JsSoOrderedReplicasNotSuppliedOrOne | SO | 90112 | Replicas must be 1 if supplied. | +| JsSoNameOrDurableRequiredForBind | SO | 90113 | Name or Durable required for Bind. | | JsSubPullCantHaveDeliverGroup | SUB | 90001 | Pull subscriptions can't have a deliver group. | | JsSubPullCantHaveDeliverSubject | SUB | 90002 | Pull subscriptions can't have a deliver subject. | | JsSubPushCantHaveMaxPullWaiting | SUB | 90003 | Push subscriptions cannot supply max pull waiting. | diff --git a/src/examples/java/io/nats/examples/jetstream/NatsJsPullSubMultipleWorkers.java b/src/examples/java/io/nats/examples/jetstream/NatsJsPullSubMultipleWorkers.java index 6062835f8..1ac64e2b1 100644 --- a/src/examples/java/io/nats/examples/jetstream/NatsJsPullSubMultipleWorkers.java +++ b/src/examples/java/io/nats/examples/jetstream/NatsJsPullSubMultipleWorkers.java @@ -88,7 +88,7 @@ public static void main(String[] args) { // - the PullSubscribeOptions can be re-used since all the subscribers are the same // - use a concurrent integer to track all the messages received // - have a list of subscribers and threads so I can track them - PullSubscribeOptions pso = PullSubscribeOptions.bind(exArgs.stream, exArgs.durable); + PullSubscribeOptions pso = PullSubscribeOptions.fastBind(exArgs.stream, exArgs.durable); AtomicInteger allReceived = new AtomicInteger(); List subscribers = new ArrayList<>(); List subThreads = new ArrayList<>(); diff --git a/src/main/java/io/nats/client/PullSubscribeOptions.java b/src/main/java/io/nats/client/PullSubscribeOptions.java index 41fc0ea07..d791b6982 100644 --- a/src/main/java/io/nats/client/PullSubscribeOptions.java +++ b/src/main/java/io/nats/client/PullSubscribeOptions.java @@ -33,14 +33,36 @@ public static Builder builder() { } /** - * Create PullSubscribeOptions where you are binding to - * a specific stream, specific durable and are using bind mode + * Create PullSubscribeOptions for binding to + * a specific stream and consumer by name. + * The client validates regular (non-fast) + * binds to ensure that provided consumer configuration + * is consistent with the server version and that + * consumer type (push versus pull) matches the subscription type. + * and that it matches the subscription type. * @param stream the stream name to bind to - * @param name the consumer name, commonly the durable name + * @param name the consumer name * @return push subscribe options */ public static PullSubscribeOptions bind(String stream, String name) { - return new PullSubscribeOptions.Builder().stream(stream).durable(name).bind(true).build(); + return new Builder().stream(stream).name(name).bind(true).build(); + } + + /** + * Create PullSubscribeOptions where you are fast-binding to + * a specific stream and consumer by name. + * The client does not validate that the provided consumer configuration + * is consistent with the server version or that + * consumer type (push versus pull) matches the subscription type. + * An inconsistent consumer configuration for instance can result in + * receiving messages from unexpected subjects. + * A consumer type mismatch will result in an error from the server. + * @param stream the stream name to bind to + * @param name the consumer name, commonly the durable name + * @return push subscribe options + */ + public static PullSubscribeOptions fastBind(String stream, String name) { + return new Builder().stream(stream).name(name).fastBind(true).build(); } /** @@ -63,5 +85,21 @@ protected Builder getThis() { public PullSubscribeOptions build() { return new PullSubscribeOptions(this); } + + /** + * Specify binding to an existing consumer via name. + * The client does not validate that the provided consumer configuration + * is consistent with the server version or that + * consumer type (push versus pull) matches the subscription type. + * An inconsistent consumer configuration for instance can result in + * receiving messages from unexpected subjects. + * A consumer type mismatch will result in an error from the server. + * @return the builder + * @param fastBind whether to fast bind or not + */ + public Builder fastBind(boolean fastBind) { + this.fastBind = fastBind; + return this; + } } } diff --git a/src/main/java/io/nats/client/PushSubscribeOptions.java b/src/main/java/io/nats/client/PushSubscribeOptions.java index ff03153d4..ad1363eba 100644 --- a/src/main/java/io/nats/client/PushSubscribeOptions.java +++ b/src/main/java/io/nats/client/PushSubscribeOptions.java @@ -48,8 +48,8 @@ public String getDeliverGroup() { * where you must specify the stream because * the subject could apply to both a stream and a mirror. * @deprecated - * This method is no longer used as bind has a different meaning. - * See {@link #stream(String)} instead. + * This method resolves to {@link #stream(String)} as bind has a different meaning + * and requires both stream and consumer name * @param stream the stream name * @return push subscribe options */ @@ -70,14 +70,19 @@ public static PushSubscribeOptions stream(String stream) { } /** - * Macro to create a PushSubscribeOptions where you are - * binding to an existing stream and durable consumer. + * Create PushSubscribeOptions for binding to + * a specific stream and consumer by name. + * The client validates regular (non-fast) + * binds to ensure that provided consumer configuration + * is consistent with the server version and that + * consumer type (push versus pull) matches the subscription type. + * and that it matches the subscription type. * @param stream the stream name - * @param durable the durable name + * @param name the consumer name * @return push subscribe options */ - public static PushSubscribeOptions bind(String stream, String durable) { - return new PushSubscribeOptions.Builder().stream(stream).durable(durable).bind(true).build(); + public static PushSubscribeOptions bind(String stream, String name) { + return new Builder().stream(stream).name(name).bind(true).build(); } /** diff --git a/src/main/java/io/nats/client/SubscribeOptions.java b/src/main/java/io/nats/client/SubscribeOptions.java index d08349a2d..4bfd419a6 100644 --- a/src/main/java/io/nats/client/SubscribeOptions.java +++ b/src/main/java/io/nats/client/SubscribeOptions.java @@ -30,11 +30,13 @@ public abstract class SubscribeOptions { protected final String stream; protected final boolean pull; protected final boolean bind; + protected final boolean fastBind; protected final boolean ordered; protected final long messageAlarmTime; protected final ConsumerConfiguration consumerConfig; protected final long pendingMessageLimit; // Only applicable for non dispatched (sync) push consumers. protected final long pendingByteLimit; // Only applicable for non dispatched (sync) push consumers. + protected final String name; @SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars protected SubscribeOptions(Builder builder, boolean isPull, @@ -42,7 +44,8 @@ protected SubscribeOptions(Builder builder, boolean isPull, long pendingMessageLimit, long pendingByteLimit) { pull = isPull; - bind = builder.bind; + fastBind = builder.fastBind; + bind = fastBind || builder.bind; ordered = builder.ordered; messageAlarmTime = builder.messageAlarmTime; @@ -52,12 +55,18 @@ protected SubscribeOptions(Builder builder, boolean isPull, stream = validateStreamName(builder.stream, bind); // required when bind mode - String durable = validateMustMatchIfBothSupplied(builder.durable, builder.cc == null ? null : builder.cc.getDurable(), JsSoDurableMismatch); - durable = validateDurable(durable, bind); // required when bind - - String name = validateMustMatchIfBothSupplied(builder.name, builder.cc == null ? null : builder.cc.getName(), JsSoNameMismatch); - - validateMustMatchIfBothSupplied(name, durable, JsConsumerNameDurableMismatch); + // read the names and do basic validation + String temp = validateConsumerName( + validateMustMatchIfBothSupplied(builder.name, builder.cc == null ? null : builder.cc.getName(), JsSoNameMismatch), + false); + String durable = validateDurable( + validateMustMatchIfBothSupplied(builder.durable, builder.cc == null ? null : builder.cc.getDurable(), JsSoDurableMismatch), + false); + name = validateMustMatchIfBothSupplied(temp, durable, JsConsumerNameDurableMismatch); + + if (bind && name == null) { + throw JsSoNameOrDurableRequiredForBind.instance(); + } deliverGroup = validateMustMatchIfBothSupplied(deliverGroup, builder.cc == null ? null : builder.cc.getDeliverGroup(), JsSoDeliverGroupMismatch); @@ -96,7 +105,7 @@ protected SubscribeOptions(Builder builder, boolean isPull, .ackPolicy(AckPolicy.None) .maxDeliver(1) .ackWait(Duration.ofHours(22)) - .name(name) + .name(temp) .memStorage(true) .numReplicas(1); @@ -110,7 +119,7 @@ protected SubscribeOptions(Builder builder, boolean isPull, .durable(durable) .deliverSubject(deliverSubject) .deliverGroup(deliverGroup) - .name(name) + .name(temp) .build(); } } @@ -131,6 +140,14 @@ public String getDurable() { return consumerConfig.getDurable(); } + /** + * Gets the name of the consumer. Same as durable when the consumer is durable. + * @return the name + */ + public String getName() { + return name; + } + /** * Gets whether this is a pull subscription * @return the pull flag @@ -140,13 +157,22 @@ public boolean isPull() { } /** - * Gets whether this subscription is expected to bind to an existing stream and durable consumer + * Gets whether this subscription is expected to bind to an existing stream and consumer * @return the bind flag */ public boolean isBind() { return bind; } + /** + * Gets whether this subscription is expected to fast bind to an existing stream and consumer. + * Overrides bind + * @return the fast bind flag + */ + public boolean isFastBind() { + return fastBind; + } + /** * Gets whether this subscription is expected to ensure messages come in order * @return the ordered flag @@ -204,6 +230,7 @@ public String toString() { protected static abstract class Builder { protected String stream; protected boolean bind; + protected boolean fastBind; protected String durable; protected String name; protected ConsumerConfiguration cc; @@ -224,7 +251,11 @@ public B stream(String stream) { } /** - * Specify the to attach in direct mode + * Specify binding to an existing consumer via name. + * The client validates regular (non-fast) + * binds to ensure that provided consumer configuration + * is consistent with the server version and that + * consumer type (push versus pull) matches the subscription type. * @return the builder * @param bind whether to bind or not */ @@ -232,7 +263,6 @@ public B bind(boolean bind) { this.bind = bind; return getThis(); } - /** * Sets the durable name for the consumer. * Null or empty clears the field. diff --git a/src/main/java/io/nats/client/impl/NatsConsumerContext.java b/src/main/java/io/nats/client/impl/NatsConsumerContext.java index bea6eece4..2e9f2dc9c 100644 --- a/src/main/java/io/nats/client/impl/NatsConsumerContext.java +++ b/src/main/java/io/nats/client/impl/NatsConsumerContext.java @@ -51,7 +51,7 @@ public class NatsConsumerContext implements ConsumerContext, SimplifiedSubscript consumerName = ci.getName(); originalOrderedCc = null; subscribeSubject = null; - unorderedBindPso = PullSubscribeOptions.bind(sc.streamName, consumerName); + unorderedBindPso = PullSubscribeOptions.fastBind(sc.streamName, consumerName); cachedConsumerInfo = ci; } diff --git a/src/main/java/io/nats/client/impl/NatsJetStream.java b/src/main/java/io/nats/client/impl/NatsJetStream.java index 323b5abbb..eaccd8e37 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStream.java +++ b/src/main/java/io/nats/client/impl/NatsJetStream.java @@ -315,7 +315,7 @@ JetStreamSubscription createSubscription(String subject, String inboxDeliver = userCC.getDeliverSubject(); // 3. Does this consumer already exist? - if (consumerName != null) { + if (!so.isFastBind() && consumerName != null) { ConsumerInfo serverInfo = lookupConsumerInfo(fnlStream, consumerName); if (serverInfo != null) { // the consumer for that durable already exists @@ -390,7 +390,11 @@ else if (inboxDeliver == null) { // If the consumer exists, I know what the settled info is final String settledConsumerName; final ConsumerConfiguration settledServerCC; - if (serverCC == null) { + if (so.isFastBind() || serverCC != null) { + settledServerCC = serverCC; + settledConsumerName = so.getName(); + } + else { ConsumerConfiguration.Builder ccBuilder = ConsumerConfiguration.builder(userCC); // Pull mode doesn't maintain a deliver subject. It's actually an error if we send it. @@ -407,10 +411,6 @@ else if (inboxDeliver == null) { settledServerCC = ccBuilder.build(); settledConsumerName = null; } - else { - settledServerCC = serverCC; - settledConsumerName = consumerName; - } // 6. create the subscription. lambda needs final or effectively final vars final MessageManager mm; diff --git a/src/main/java/io/nats/client/impl/NatsMessage.java b/src/main/java/io/nats/client/impl/NatsMessage.java index 87516e165..dfe9ae4aa 100644 --- a/src/main/java/io/nats/client/impl/NatsMessage.java +++ b/src/main/java/io/nats/client/impl/NatsMessage.java @@ -376,9 +376,9 @@ public long consumeByteCount() { @Override public String toString() { if (subject == null) { - return "NatsMessage | " + protocolBytesToString(); + return getClass().getSimpleName() + " | " + protocolBytesToString(); } - return "NatsMessage |" + subject + "|" + replyToString() + "|" + dataToString() + "|"; + return getClass().getSimpleName() + " |" + subject + "|" + replyToString() + "|" + dataToString() + "|"; } String toDetailString() { diff --git a/src/main/java/io/nats/client/support/NatsJetStreamClientError.java b/src/main/java/io/nats/client/support/NatsJetStreamClientError.java index f563eff4c..80340d426 100644 --- a/src/main/java/io/nats/client/support/NatsJetStreamClientError.java +++ b/src/main/java/io/nats/client/support/NatsJetStreamClientError.java @@ -54,6 +54,7 @@ public class NatsJetStreamClientError { public static final NatsJetStreamClientError JsSoNameMismatch = new NatsJetStreamClientError(SO, 90110, "Builder name must match the consumer configuration name if both are provided."); public static final NatsJetStreamClientError JsSoOrderedMemStorageNotSuppliedOrTrue = new NatsJetStreamClientError(SO, 90111, "Mem Storage must be true if supplied."); public static final NatsJetStreamClientError JsSoOrderedReplicasNotSuppliedOrOne = new NatsJetStreamClientError(SO, 90112, "Replicas must be 1 if supplied."); + public static final NatsJetStreamClientError JsSoNameOrDurableRequiredForBind = new NatsJetStreamClientError(SO, 90113, "Name or Durable required for Bind."); public static final NatsJetStreamClientError OsObjectNotFound = new NatsJetStreamClientError(OS, 90201, "The object was not found."); public static final NatsJetStreamClientError OsObjectIsDeleted = new NatsJetStreamClientError(OS, 90202, "The object is deleted."); diff --git a/src/main/java/io/nats/client/support/Validator.java b/src/main/java/io/nats/client/support/Validator.java index e59c2bb73..e0a55c4ad 100644 --- a/src/main/java/io/nats/client/support/Validator.java +++ b/src/main/java/io/nats/client/support/Validator.java @@ -131,6 +131,13 @@ public static String required(String s, String label) { return s; } + public static String required(String s1, String s2, String label) { + if (emptyAsNull(s1) == null && emptyAsNull(s2) == null) { + throw new IllegalArgumentException(label + " cannot be null or empty."); + } + return s1; + } + public static T required(T o, String label) { if (o == null) { throw new IllegalArgumentException(label + " cannot be null."); diff --git a/src/test/java/io/nats/client/SubscribeOptionsTests.java b/src/test/java/io/nats/client/SubscribeOptionsTests.java index 301494a2f..c1b27e9f2 100644 --- a/src/test/java/io/nats/client/SubscribeOptionsTests.java +++ b/src/test/java/io/nats/client/SubscribeOptionsTests.java @@ -33,6 +33,7 @@ public void testPushAffirmative() { // starts out all null which is fine assertNull(so.getStream()); assertNull(so.getDurable()); + assertNull(so.getName()); assertNull(so.getDeliverSubject()); so = PushSubscribeOptions.builder() @@ -40,6 +41,7 @@ public void testPushAffirmative() { assertEquals(STREAM, so.getStream()); assertEquals(DURABLE, so.getDurable()); + assertEquals(DURABLE, so.getName()); assertEquals(DELIVER, so.getDeliverSubject()); // demonstrate that you can clear the builder @@ -47,6 +49,7 @@ public void testPushAffirmative() { .stream(null).deliverSubject(null).durable(null).build(); assertNull(so.getStream()); assertNull(so.getDurable()); + assertNull(so.getName()); assertNull(so.getDeliverSubject()); assertFalse(so.isPull()); @@ -56,29 +59,32 @@ public void testPushAffirmative() { @Test public void testDurableValidation() { // push - assertNull(PushSubscribeOptions.builder() - .durable(null) - .configuration(ConsumerConfiguration.builder().durable(null).build()) - .build() - .getDurable()); - - assertEquals("y", PushSubscribeOptions.builder() - .durable(null) - .configuration(ConsumerConfiguration.builder().durable("y").build()) - .build() - .getDurable()); - - assertEquals("x", PushSubscribeOptions.builder() - .durable("x") - .configuration(ConsumerConfiguration.builder().durable(null).build()) - .build() - .getDurable()); - - assertEquals("x", PushSubscribeOptions.builder() - .durable("x") - .configuration(ConsumerConfiguration.builder().durable("x").build()) - .build() - .getDurable()); + PushSubscribeOptions uso = PushSubscribeOptions.builder() + .durable(null) + .configuration(ConsumerConfiguration.builder().durable(null).build()) + .build(); + assertNull(uso.getDurable()); + + uso = PushSubscribeOptions.builder() + .durable(null) + .configuration(ConsumerConfiguration.builder().durable("y").build()) + .build(); + assertEquals("y", uso.getDurable()); + assertEquals("y", uso.getName()); + + uso = PushSubscribeOptions.builder() + .durable("x") + .configuration(ConsumerConfiguration.builder().durable(null).build()) + .build(); + assertEquals("x", uso.getDurable()); + assertEquals("x", uso.getName()); + + uso = PushSubscribeOptions.builder() + .durable("x") + .configuration(ConsumerConfiguration.builder().durable("x").build()) + .build(); + assertEquals("x", uso.getDurable()); + assertEquals("x", uso.getName()); assertClientError(JsSoDurableMismatch, () -> PushSubscribeOptions.builder() .durable("x") @@ -88,23 +94,26 @@ public void testDurableValidation() { assertNull(PushSubscribeOptions.builder().build().getDurable()); // pull - assertEquals("y", PullSubscribeOptions.builder() - .durable(null) - .configuration(ConsumerConfiguration.builder().durable("y").build()) - .build() - .getDurable()); - - assertEquals("x", PullSubscribeOptions.builder() - .durable("x") - .configuration(ConsumerConfiguration.builder().durable(null).build()) - .build() - .getDurable()); - - assertEquals("x", PullSubscribeOptions.builder() - .durable("x") - .configuration(ConsumerConfiguration.builder().durable("x").build()) - .build() - .getDurable()); + PullSubscribeOptions lso = PullSubscribeOptions.builder() + .durable(null) + .configuration(ConsumerConfiguration.builder().durable("y").build()) + .build(); + assertEquals("y", lso.getDurable()); + assertEquals("y", lso.getName()); + + lso = PullSubscribeOptions.builder() + .durable("x") + .configuration(ConsumerConfiguration.builder().durable(null).build()) + .build(); + assertEquals("x", lso.getDurable()); + assertEquals("x", lso.getName()); + + lso = PullSubscribeOptions.builder() + .durable("x") + .configuration(ConsumerConfiguration.builder().durable("x").build()) + .build(); + assertEquals("x", lso.getDurable()); + assertEquals("x", lso.getName()); assertClientError(JsSoDurableMismatch, () -> PullSubscribeOptions.builder() .durable("x") @@ -185,6 +194,7 @@ public void testPullAffirmative() { PullSubscribeOptions so = builder.build(); assertEquals(STREAM, so.getStream()); assertEquals(DURABLE, so.getDurable()); + assertEquals(DURABLE, so.getName()); assertTrue(so.isPull()); assertNotNull(so.toString()); // COVERAGE @@ -204,14 +214,22 @@ public void testPushFieldValidation() { assertClientError(JsConsumerNameDurableMismatch, () -> PushSubscribeOptions.builder().name(NAME).durable(DURABLE).build()); // durable directly - PushSubscribeOptions.builder().durable(DURABLE).build(); - PushSubscribeOptions.builder().name(NAME).build(); + PushSubscribeOptions uso = PushSubscribeOptions.builder().durable(DURABLE).build(); + assertEquals(DURABLE, uso.getDurable()); + assertEquals(DURABLE, uso.getName()); + uso = PushSubscribeOptions.builder().name(NAME).build(); + assertNull(uso.getDurable()); + assertEquals(NAME, uso.getName()); // in configuration ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(DURABLE).build(); - PushSubscribeOptions.builder().configuration(cc).build(); + uso = PushSubscribeOptions.builder().configuration(cc).build(); + assertEquals(DURABLE, uso.getDurable()); + assertEquals(DURABLE, uso.getName()); cc = ConsumerConfiguration.builder().name(NAME).build(); - PushSubscribeOptions.builder().configuration(cc).build(); + uso = PushSubscribeOptions.builder().configuration(cc).build(); + assertNull(uso.getDurable()); + assertEquals(NAME, uso.getName()); // new helper ConsumerConfiguration.builder().durable(DURABLE).buildPushSubscribeOptions(); @@ -232,14 +250,20 @@ public void testPullFieldValidation() { assertClientError(JsConsumerNameDurableMismatch, () -> PullSubscribeOptions.builder().name(NAME).durable(DURABLE).build()); // durable directly - PullSubscribeOptions.builder().durable(DURABLE).build(); + PullSubscribeOptions lso = PullSubscribeOptions.builder().durable(DURABLE).build(); + assertEquals(DURABLE, lso.getDurable()); + assertEquals(DURABLE, lso.getName()); // in configuration ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(DURABLE).build(); - PullSubscribeOptions.builder().configuration(cc).build(); + lso = PullSubscribeOptions.builder().configuration(cc).build(); + assertEquals(DURABLE, lso.getDurable()); + assertEquals(DURABLE, lso.getName()); // new helper - ConsumerConfiguration.builder().durable(DURABLE).buildPullSubscribeOptions(); + lso = ConsumerConfiguration.builder().durable(DURABLE).buildPullSubscribeOptions(); + assertEquals(DURABLE, lso.getDurable()); + assertEquals(DURABLE, lso.getName()); } @Test @@ -259,23 +283,51 @@ public void testCreationErrors() { @Test public void testBindCreationErrors() { - assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.bind(null, DURABLE)); - assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.bind(EMPTY, DURABLE)); + String durOrName = name(); + + // bind + assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.bind(null, durOrName)); + assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.bind(EMPTY, durOrName)); assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.bind(STREAM, null)); assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.bind(STREAM, EMPTY)); assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.builder().stream(STREAM).bind(true).build()); - assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.builder().stream(EMPTY).durable(DURABLE).bind(true).build()); - assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.builder().durable(DURABLE).bind(true).build()); + + assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.builder().stream(EMPTY).durable(durOrName).bind(true).build()); + assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.builder().durable(durOrName).bind(true).build()); assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.builder().stream(STREAM).durable(EMPTY).bind(true).build()); - assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.bind(null, DURABLE)); - assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.bind(EMPTY, DURABLE)); + assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.builder().stream(EMPTY).name(durOrName).bind(true).build()); + assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.builder().name(durOrName).bind(true).build()); + assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.builder().stream(STREAM).name(EMPTY).bind(true).build()); + + assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.bind(null, durOrName)); + assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.bind(EMPTY, durOrName)); assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.bind(STREAM, null)); assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.bind(STREAM, EMPTY)); assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().stream(STREAM).bind(true).build()); - assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().stream(EMPTY).durable(DURABLE).bind(true).build()); - assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().durable(DURABLE).bind(true).build()); + + assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().stream(EMPTY).durable(durOrName).bind(true).build()); + assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().durable(durOrName).bind(true).build()); assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().stream(STREAM).durable(EMPTY).bind(true).build()); + + assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().stream(EMPTY).name(durOrName).bind(true).build()); + assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().name(durOrName).bind(true).build()); + assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().stream(STREAM).name(EMPTY).bind(true).build()); + + // fast bind + assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.fastBind(null, durOrName)); + assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.fastBind(EMPTY, durOrName)); + assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.fastBind(STREAM, null)); + assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.fastBind(STREAM, EMPTY)); + assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().stream(STREAM).fastBind(true).build()); + + assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().stream(EMPTY).durable(durOrName).fastBind(true).build()); + assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().durable(durOrName).fastBind(true).build()); + assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().stream(STREAM).durable(EMPTY).fastBind(true).build()); + + assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().stream(EMPTY).name(durOrName).fastBind(true).build()); + assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().name(durOrName).fastBind(true).build()); + assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().stream(STREAM).name(EMPTY).fastBind(true).build()); } @Test diff --git a/src/test/java/io/nats/client/impl/JetStreamPullTests.java b/src/test/java/io/nats/client/impl/JetStreamPullTests.java index eb7ec2692..5fc7bc043 100644 --- a/src/test/java/io/nats/client/impl/JetStreamPullTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamPullTests.java @@ -600,31 +600,84 @@ public void testAckWaitTimeout() throws Exception { public void testDurable() throws Exception { runInJsServer(nc -> { // create the stream. - createDefaultTestStream(nc); + CreateStreamResult csr = createMemoryStream(nc); + String durable = durable(); // Create our JetStream context. JetStream js = nc.jetStream(); // Build our subscription options normally - PullSubscribeOptions options1 = PullSubscribeOptions.builder().durable(DURABLE).build(); - _testDurable(js, () -> js.subscribe(SUBJECT, options1)); + PullSubscribeOptions options1 = PullSubscribeOptions.builder().durable(durable).build(); + _testDurableOrNamed(js, csr.subject, () -> js.subscribe(csr.subject, options1)); // bind long form PullSubscribeOptions options2 = PullSubscribeOptions.builder() - .stream(STREAM) - .durable(DURABLE) + .stream(csr.stream) + .durable(durable) + .bind(true) + .build(); + _testDurableOrNamed(js, csr.subject, () -> js.subscribe(null, options2)); + + // fast bind long form + PullSubscribeOptions options3 = PullSubscribeOptions.builder() + .stream(csr.stream) + .durable(durable) + .fastBind(true) + .build(); + _testDurableOrNamed(js, csr.subject, () -> js.subscribe(null, options3)); + + // bind short form + PullSubscribeOptions options4 = PullSubscribeOptions.bind(csr.stream, durable); + _testDurableOrNamed(js, csr.subject, () -> js.subscribe(null, options4)); + + // fast bind short form + PullSubscribeOptions options5 = PullSubscribeOptions.fastBind(csr.stream, durable); + _testDurableOrNamed(js, csr.subject, () -> js.subscribe(null, options5)); + }); + } + + @Test + public void testNamed() throws Exception { + runInJsServer(nc -> { + JetStream js = nc.jetStream(); + JetStreamManagement jsm = nc.jetStreamManagement(); + + CreateStreamResult csr = createMemoryStream(jsm); + String name = name(); + + jsm.addOrUpdateConsumer(csr.stream, ConsumerConfiguration.builder() + .name(name) + .inactiveThreshold(10_000) + .build()); + + // bind long form + PullSubscribeOptions options2 = PullSubscribeOptions.builder() + .stream(csr.stream) + .name(name) .bind(true) .build(); - _testDurable(js, () -> js.subscribe(null, options2)); + _testDurableOrNamed(js, csr.subject, () -> js.subscribe(null, options2)); + + // fast bind long form + PullSubscribeOptions options3 = PullSubscribeOptions.builder() + .stream(csr.stream) + .name(name) + .fastBind(true) + .build(); + _testDurableOrNamed(js, csr.subject, () -> js.subscribe(null, options3)); // bind short form - PullSubscribeOptions options3 = PullSubscribeOptions.bind(STREAM, DURABLE); - _testDurable(js, () -> js.subscribe(null, options3)); + PullSubscribeOptions options4 = PullSubscribeOptions.bind(csr.stream, name); + _testDurableOrNamed(js, csr.subject, () -> js.subscribe(null, options4)); + + // fast bind short form + PullSubscribeOptions options5 = PullSubscribeOptions.fastBind(csr.stream, name); + _testDurableOrNamed(js, csr.subject, () -> js.subscribe(null, options5)); }); } - private void _testDurable(JetStream js, SubscriptionSupplier supplier) throws IOException, JetStreamApiException, InterruptedException { - jsPublish(js, SUBJECT, 2); + private void _testDurableOrNamed(JetStream js, String subject, SubscriptionSupplier supplier) throws IOException, JetStreamApiException, InterruptedException { + jsPublish(js, subject, 2); JetStreamSubscription sub = supplier.get();