From 20c20bbf335e1dc11dd82a4dbbc1c6349c49abba Mon Sep 17 00:00:00 2001
From: Scott Fauerbach <scottfauerbach@gmail.com>
Date: Tue, 29 Aug 2023 11:29:00 -0400
Subject: [PATCH] Fast Bind (#959)

---
 README.md                                     |   1 +
 .../NatsJsPullSubMultipleWorkers.java         |   2 +-
 .../io/nats/client/PullSubscribeOptions.java  |  46 ++++-
 .../io/nats/client/PushSubscribeOptions.java  |  19 +-
 .../java/io/nats/client/SubscribeOptions.java |  54 ++++--
 .../nats/client/impl/NatsConsumerContext.java |   2 +-
 .../io/nats/client/impl/NatsJetStream.java    |  12 +-
 .../java/io/nats/client/impl/NatsMessage.java |   4 +-
 .../support/NatsJetStreamClientError.java     |   1 +
 .../io/nats/client/support/Validator.java     |   7 +
 .../io/nats/client/SubscribeOptionsTests.java | 162 ++++++++++++------
 .../nats/client/impl/JetStreamPullTests.java  |  73 ++++++--
 12 files changed, 285 insertions(+), 98 deletions(-)

diff --git a/README.md b/README.md
index 32e9b89e7..876af993f 100644
--- a/README.md
+++ b/README.md
@@ -777,6 +777,7 @@ You can however set the deliver policy which will be used to start the subscript
 | JsSoNameMismatch                             | SO    | 90110 | Builder name must match the consumer configuration name if both are provided.                       |
 | JsSoOrderedMemStorageNotSuppliedOrTrue       | SO    | 90111 | Mem Storage must be true if supplied.                                                               |
 | JsSoOrderedReplicasNotSuppliedOrOne          | SO    | 90112 | Replicas must be 1 if supplied.                                                                     |
+| JsSoNameOrDurableRequiredForBind             | SO    | 90113 | Name or Durable required for Bind.                                                                  |
 | JsSubPullCantHaveDeliverGroup                | SUB   | 90001 | Pull subscriptions can't have a deliver group.                                                      |
 | JsSubPullCantHaveDeliverSubject              | SUB   | 90002 | Pull subscriptions can't have a deliver subject.                                                    |
 | JsSubPushCantHaveMaxPullWaiting              | SUB   | 90003 | Push subscriptions cannot supply max pull waiting.                                                  |
diff --git a/src/examples/java/io/nats/examples/jetstream/NatsJsPullSubMultipleWorkers.java b/src/examples/java/io/nats/examples/jetstream/NatsJsPullSubMultipleWorkers.java
index 6062835f8..1ac64e2b1 100644
--- a/src/examples/java/io/nats/examples/jetstream/NatsJsPullSubMultipleWorkers.java
+++ b/src/examples/java/io/nats/examples/jetstream/NatsJsPullSubMultipleWorkers.java
@@ -88,7 +88,7 @@ public static void main(String[] args) {
             // - the PullSubscribeOptions can be re-used since all the subscribers are the same
             // - use a concurrent integer to track all the messages received
             // - have a list of subscribers and threads so I can track them
-            PullSubscribeOptions pso = PullSubscribeOptions.bind(exArgs.stream, exArgs.durable);
+            PullSubscribeOptions pso = PullSubscribeOptions.fastBind(exArgs.stream, exArgs.durable);
             AtomicInteger allReceived = new AtomicInteger();
             List<JsPullSubWorker> subscribers = new ArrayList<>();
             List<Thread> subThreads = new ArrayList<>();
diff --git a/src/main/java/io/nats/client/PullSubscribeOptions.java b/src/main/java/io/nats/client/PullSubscribeOptions.java
index 41fc0ea07..d791b6982 100644
--- a/src/main/java/io/nats/client/PullSubscribeOptions.java
+++ b/src/main/java/io/nats/client/PullSubscribeOptions.java
@@ -33,14 +33,36 @@ public static Builder builder() {
     }
 
     /**
-     * Create PullSubscribeOptions where you are binding to
-     * a specific stream, specific durable and are using bind mode
+     * Create PullSubscribeOptions for binding to
+     * a specific stream and consumer by name.
+     * The client validates regular (non-fast)
+     * binds to ensure that provided consumer configuration
+     * is consistent with the server version and that
+     * consumer type (push versus pull) matches the subscription type.
+     * and that it matches the subscription type.
      * @param stream the stream name to bind to
-     * @param name the consumer name, commonly the durable name
+     * @param name the consumer name
      * @return push subscribe options
      */
     public static PullSubscribeOptions bind(String stream, String name) {
-        return new PullSubscribeOptions.Builder().stream(stream).durable(name).bind(true).build();
+        return new Builder().stream(stream).name(name).bind(true).build();
+    }
+
+    /**
+     * Create PullSubscribeOptions where you are fast-binding to
+     * a specific stream and consumer by name.
+     * The client does not validate that the provided consumer configuration
+     * is consistent with the server version or that
+     * consumer type (push versus pull) matches the subscription type.
+     * An inconsistent consumer configuration for instance can result in
+     * receiving messages from unexpected subjects.
+     * A consumer type mismatch will result in an error from the server.
+     * @param stream the stream name to bind to
+     * @param name the consumer name, commonly the durable name
+     * @return push subscribe options
+     */
+    public static PullSubscribeOptions fastBind(String stream, String name) {
+        return new Builder().stream(stream).name(name).fastBind(true).build();
     }
 
     /**
@@ -63,5 +85,21 @@ protected Builder getThis() {
         public PullSubscribeOptions build() {
             return new PullSubscribeOptions(this);
         }
+
+        /**
+         * Specify binding to an existing consumer via name.
+         * The client does not validate that the provided consumer configuration
+         * is consistent with the server version or that
+         * consumer type (push versus pull) matches the subscription type.
+         * An inconsistent consumer configuration for instance can result in
+         * receiving messages from unexpected subjects.
+         * A consumer type mismatch will result in an error from the server.
+         * @return the builder
+         * @param fastBind whether to fast bind or not
+         */
+        public Builder fastBind(boolean fastBind) {
+            this.fastBind = fastBind;
+            return this;
+        }
     }
 }
diff --git a/src/main/java/io/nats/client/PushSubscribeOptions.java b/src/main/java/io/nats/client/PushSubscribeOptions.java
index ff03153d4..ad1363eba 100644
--- a/src/main/java/io/nats/client/PushSubscribeOptions.java
+++ b/src/main/java/io/nats/client/PushSubscribeOptions.java
@@ -48,8 +48,8 @@ public String getDeliverGroup() {
      * where you must specify the stream because
      * the subject could apply to both a stream and a mirror.
      * @deprecated
-     * This method is no longer used as bind has a different meaning.
-     * See {@link #stream(String)} instead.
+     * This method resolves to {@link #stream(String)} as bind has a different meaning
+     * and requires both stream and consumer name
      * @param stream the stream name
      * @return push subscribe options
      */
@@ -70,14 +70,19 @@ public static PushSubscribeOptions stream(String stream) {
     }
 
     /**
-     * Macro to create a PushSubscribeOptions where you are
-     * binding to an existing stream and durable consumer.
+     * Create PushSubscribeOptions for binding to
+     * a specific stream and consumer by name.
+     * The client validates regular (non-fast)
+     * binds to ensure that provided consumer configuration
+     * is consistent with the server version and that
+     * consumer type (push versus pull) matches the subscription type.
+     * and that it matches the subscription type.
      * @param stream the stream name
-     * @param durable the durable name
+     * @param name the consumer name
      * @return push subscribe options
      */
-    public static PushSubscribeOptions bind(String stream, String durable) {
-        return new PushSubscribeOptions.Builder().stream(stream).durable(durable).bind(true).build();
+    public static PushSubscribeOptions bind(String stream, String name) {
+        return new Builder().stream(stream).name(name).bind(true).build();
     }
 
     /**
diff --git a/src/main/java/io/nats/client/SubscribeOptions.java b/src/main/java/io/nats/client/SubscribeOptions.java
index d08349a2d..4bfd419a6 100644
--- a/src/main/java/io/nats/client/SubscribeOptions.java
+++ b/src/main/java/io/nats/client/SubscribeOptions.java
@@ -30,11 +30,13 @@ public abstract class SubscribeOptions {
     protected final String stream;
     protected final boolean pull;
     protected final boolean bind;
+    protected final boolean fastBind;
     protected final boolean ordered;
     protected final long messageAlarmTime;
     protected final ConsumerConfiguration consumerConfig;
     protected final long pendingMessageLimit; // Only applicable for non dispatched (sync) push consumers.
     protected final long pendingByteLimit; // Only applicable for non dispatched (sync) push consumers.
+    protected final String name;
 
     @SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars
     protected SubscribeOptions(Builder builder, boolean isPull,
@@ -42,7 +44,8 @@ protected SubscribeOptions(Builder builder, boolean isPull,
                                long pendingMessageLimit, long pendingByteLimit) {
 
         pull = isPull;
-        bind = builder.bind;
+        fastBind = builder.fastBind;
+        bind = fastBind || builder.bind;
         ordered = builder.ordered;
         messageAlarmTime = builder.messageAlarmTime;
 
@@ -52,12 +55,18 @@ protected SubscribeOptions(Builder builder, boolean isPull,
 
         stream = validateStreamName(builder.stream, bind); // required when bind mode
 
-        String durable = validateMustMatchIfBothSupplied(builder.durable, builder.cc == null ? null : builder.cc.getDurable(), JsSoDurableMismatch);
-        durable = validateDurable(durable, bind); // required when bind
-
-        String name = validateMustMatchIfBothSupplied(builder.name, builder.cc == null ? null : builder.cc.getName(), JsSoNameMismatch);
-
-        validateMustMatchIfBothSupplied(name, durable, JsConsumerNameDurableMismatch);
+        // read the names and do basic validation
+        String temp = validateConsumerName(
+            validateMustMatchIfBothSupplied(builder.name, builder.cc == null ? null : builder.cc.getName(), JsSoNameMismatch),
+            false);
+        String durable = validateDurable(
+            validateMustMatchIfBothSupplied(builder.durable, builder.cc == null ? null : builder.cc.getDurable(), JsSoDurableMismatch),
+            false);
+        name = validateMustMatchIfBothSupplied(temp, durable, JsConsumerNameDurableMismatch);
+
+        if (bind && name == null) {
+            throw JsSoNameOrDurableRequiredForBind.instance();
+        }
 
         deliverGroup = validateMustMatchIfBothSupplied(deliverGroup, builder.cc == null ? null : builder.cc.getDeliverGroup(), JsSoDeliverGroupMismatch);
 
@@ -96,7 +105,7 @@ protected SubscribeOptions(Builder builder, boolean isPull,
                 .ackPolicy(AckPolicy.None)
                 .maxDeliver(1)
                 .ackWait(Duration.ofHours(22))
-                .name(name)
+                .name(temp)
                 .memStorage(true)
                 .numReplicas(1);
 
@@ -110,7 +119,7 @@ protected SubscribeOptions(Builder builder, boolean isPull,
                 .durable(durable)
                 .deliverSubject(deliverSubject)
                 .deliverGroup(deliverGroup)
-                .name(name)
+                .name(temp)
                 .build();
         }
     }
@@ -131,6 +140,14 @@ public String getDurable() {
         return consumerConfig.getDurable();
     }
 
+    /**
+     * Gets the name of the consumer. Same as durable when the consumer is durable.
+     * @return the name
+     */
+    public String getName() {
+        return name;
+    }
+
     /**
      * Gets whether this is a pull subscription
      * @return the pull flag
@@ -140,13 +157,22 @@ public boolean isPull() {
     }
 
     /**
-     * Gets whether this subscription is expected to bind to an existing stream and durable consumer
+     * Gets whether this subscription is expected to bind to an existing stream and consumer
      * @return the bind flag
      */
     public boolean isBind() {
         return bind;
     }
 
+    /**
+     * Gets whether this subscription is expected to fast bind to an existing stream and consumer.
+     * Overrides bind
+     * @return the fast bind flag
+     */
+    public boolean isFastBind() {
+        return fastBind;
+    }
+
     /**
      * Gets whether this subscription is expected to ensure messages come in order
      * @return the ordered flag
@@ -204,6 +230,7 @@ public String toString() {
     protected static abstract class Builder<B, SO> {
         protected String stream;
         protected boolean bind;
+        protected boolean fastBind;
         protected String durable;
         protected String name;
         protected ConsumerConfiguration cc;
@@ -224,7 +251,11 @@ public B stream(String stream) {
         }
 
         /**
-         * Specify the to attach in direct mode
+         * Specify binding to an existing consumer via name.
+         * The client validates regular (non-fast)
+         * binds to ensure that provided consumer configuration
+         * is consistent with the server version and that
+         * consumer type (push versus pull) matches the subscription type.
          * @return the builder
          * @param bind whether to bind or not
          */
@@ -232,7 +263,6 @@ public B bind(boolean bind) {
             this.bind = bind;
             return getThis();
         }
-
         /**
          * Sets the durable name for the consumer.
          * Null or empty clears the field.
diff --git a/src/main/java/io/nats/client/impl/NatsConsumerContext.java b/src/main/java/io/nats/client/impl/NatsConsumerContext.java
index bea6eece4..2e9f2dc9c 100644
--- a/src/main/java/io/nats/client/impl/NatsConsumerContext.java
+++ b/src/main/java/io/nats/client/impl/NatsConsumerContext.java
@@ -51,7 +51,7 @@ public class NatsConsumerContext implements ConsumerContext, SimplifiedSubscript
         consumerName = ci.getName();
         originalOrderedCc = null;
         subscribeSubject = null;
-        unorderedBindPso = PullSubscribeOptions.bind(sc.streamName, consumerName);
+        unorderedBindPso = PullSubscribeOptions.fastBind(sc.streamName, consumerName);
         cachedConsumerInfo = ci;
     }
 
diff --git a/src/main/java/io/nats/client/impl/NatsJetStream.java b/src/main/java/io/nats/client/impl/NatsJetStream.java
index 323b5abbb..eaccd8e37 100644
--- a/src/main/java/io/nats/client/impl/NatsJetStream.java
+++ b/src/main/java/io/nats/client/impl/NatsJetStream.java
@@ -315,7 +315,7 @@ JetStreamSubscription createSubscription(String subject,
         String inboxDeliver = userCC.getDeliverSubject();
 
         // 3. Does this consumer already exist?
-        if (consumerName != null) {
+        if (!so.isFastBind() && consumerName != null) {
             ConsumerInfo serverInfo = lookupConsumerInfo(fnlStream, consumerName);
 
             if (serverInfo != null) { // the consumer for that durable already exists
@@ -390,7 +390,11 @@ else if (inboxDeliver == null) {
         //    If the consumer exists, I know what the settled info is
         final String settledConsumerName;
         final ConsumerConfiguration settledServerCC;
-        if (serverCC == null) {
+        if (so.isFastBind() || serverCC != null) {
+            settledServerCC = serverCC;
+            settledConsumerName = so.getName();
+        }
+        else {
             ConsumerConfiguration.Builder ccBuilder = ConsumerConfiguration.builder(userCC);
 
             // Pull mode doesn't maintain a deliver subject. It's actually an error if we send it.
@@ -407,10 +411,6 @@ else if (inboxDeliver == null) {
             settledServerCC = ccBuilder.build();
             settledConsumerName = null;
         }
-        else {
-            settledServerCC = serverCC;
-            settledConsumerName = consumerName;
-        }
 
         // 6. create the subscription. lambda needs final or effectively final vars
         final MessageManager mm;
diff --git a/src/main/java/io/nats/client/impl/NatsMessage.java b/src/main/java/io/nats/client/impl/NatsMessage.java
index 87516e165..dfe9ae4aa 100644
--- a/src/main/java/io/nats/client/impl/NatsMessage.java
+++ b/src/main/java/io/nats/client/impl/NatsMessage.java
@@ -376,9 +376,9 @@ public long consumeByteCount() {
     @Override
     public String toString() {
         if (subject == null) {
-            return "NatsMessage | " + protocolBytesToString();
+            return getClass().getSimpleName() + " | " + protocolBytesToString();
         }
-        return "NatsMessage |" + subject + "|" + replyToString() + "|" + dataToString() + "|";
+        return getClass().getSimpleName() + " |" + subject + "|" + replyToString() + "|" + dataToString() + "|";
     }
 
     String toDetailString() {
diff --git a/src/main/java/io/nats/client/support/NatsJetStreamClientError.java b/src/main/java/io/nats/client/support/NatsJetStreamClientError.java
index f563eff4c..80340d426 100644
--- a/src/main/java/io/nats/client/support/NatsJetStreamClientError.java
+++ b/src/main/java/io/nats/client/support/NatsJetStreamClientError.java
@@ -54,6 +54,7 @@ public class NatsJetStreamClientError {
     public static final NatsJetStreamClientError JsSoNameMismatch = new NatsJetStreamClientError(SO, 90110, "Builder name must match the consumer configuration name if both are provided.");
     public static final NatsJetStreamClientError JsSoOrderedMemStorageNotSuppliedOrTrue = new NatsJetStreamClientError(SO, 90111, "Mem Storage must be true if supplied.");
     public static final NatsJetStreamClientError JsSoOrderedReplicasNotSuppliedOrOne = new NatsJetStreamClientError(SO, 90112, "Replicas must be 1 if supplied.");
+    public static final NatsJetStreamClientError JsSoNameOrDurableRequiredForBind = new NatsJetStreamClientError(SO, 90113, "Name or Durable required for Bind.");
 
     public static final NatsJetStreamClientError OsObjectNotFound = new NatsJetStreamClientError(OS, 90201, "The object was not found.");
     public static final NatsJetStreamClientError OsObjectIsDeleted = new NatsJetStreamClientError(OS, 90202, "The object is deleted.");
diff --git a/src/main/java/io/nats/client/support/Validator.java b/src/main/java/io/nats/client/support/Validator.java
index e59c2bb73..e0a55c4ad 100644
--- a/src/main/java/io/nats/client/support/Validator.java
+++ b/src/main/java/io/nats/client/support/Validator.java
@@ -131,6 +131,13 @@ public static String required(String s, String label) {
         return s;
     }
 
+    public static String required(String s1, String s2, String label) {
+        if (emptyAsNull(s1) == null && emptyAsNull(s2) == null) {
+            throw new IllegalArgumentException(label + " cannot be null or empty.");
+        }
+        return s1;
+    }
+
     public static <T> T required(T o, String label) {
         if (o == null) {
             throw new IllegalArgumentException(label + " cannot be null.");
diff --git a/src/test/java/io/nats/client/SubscribeOptionsTests.java b/src/test/java/io/nats/client/SubscribeOptionsTests.java
index 301494a2f..c1b27e9f2 100644
--- a/src/test/java/io/nats/client/SubscribeOptionsTests.java
+++ b/src/test/java/io/nats/client/SubscribeOptionsTests.java
@@ -33,6 +33,7 @@ public void testPushAffirmative() {
         // starts out all null which is fine
         assertNull(so.getStream());
         assertNull(so.getDurable());
+        assertNull(so.getName());
         assertNull(so.getDeliverSubject());
 
         so = PushSubscribeOptions.builder()
@@ -40,6 +41,7 @@ public void testPushAffirmative() {
 
         assertEquals(STREAM, so.getStream());
         assertEquals(DURABLE, so.getDurable());
+        assertEquals(DURABLE, so.getName());
         assertEquals(DELIVER, so.getDeliverSubject());
 
         // demonstrate that you can clear the builder
@@ -47,6 +49,7 @@ public void testPushAffirmative() {
                 .stream(null).deliverSubject(null).durable(null).build();
         assertNull(so.getStream());
         assertNull(so.getDurable());
+        assertNull(so.getName());
         assertNull(so.getDeliverSubject());
         assertFalse(so.isPull());
 
@@ -56,29 +59,32 @@ public void testPushAffirmative() {
     @Test
     public void testDurableValidation() {
         // push
-        assertNull(PushSubscribeOptions.builder()
-                .durable(null)
-                .configuration(ConsumerConfiguration.builder().durable(null).build())
-                .build()
-                .getDurable());
-
-        assertEquals("y", PushSubscribeOptions.builder()
-                .durable(null)
-                .configuration(ConsumerConfiguration.builder().durable("y").build())
-                .build()
-                .getDurable());
-
-        assertEquals("x", PushSubscribeOptions.builder()
-                .durable("x")
-                .configuration(ConsumerConfiguration.builder().durable(null).build())
-                .build()
-                .getDurable());
-
-        assertEquals("x", PushSubscribeOptions.builder()
-                .durable("x")
-                .configuration(ConsumerConfiguration.builder().durable("x").build())
-                .build()
-                .getDurable());
+        PushSubscribeOptions uso = PushSubscribeOptions.builder()
+            .durable(null)
+            .configuration(ConsumerConfiguration.builder().durable(null).build())
+            .build();
+        assertNull(uso.getDurable());
+
+        uso = PushSubscribeOptions.builder()
+            .durable(null)
+            .configuration(ConsumerConfiguration.builder().durable("y").build())
+            .build();
+        assertEquals("y", uso.getDurable());
+        assertEquals("y", uso.getName());
+
+        uso = PushSubscribeOptions.builder()
+            .durable("x")
+            .configuration(ConsumerConfiguration.builder().durable(null).build())
+            .build();
+        assertEquals("x", uso.getDurable());
+        assertEquals("x", uso.getName());
+
+        uso = PushSubscribeOptions.builder()
+            .durable("x")
+            .configuration(ConsumerConfiguration.builder().durable("x").build())
+            .build();
+        assertEquals("x", uso.getDurable());
+        assertEquals("x", uso.getName());
 
         assertClientError(JsSoDurableMismatch, () -> PushSubscribeOptions.builder()
                 .durable("x")
@@ -88,23 +94,26 @@ public void testDurableValidation() {
         assertNull(PushSubscribeOptions.builder().build().getDurable());
 
         // pull
-        assertEquals("y", PullSubscribeOptions.builder()
-                .durable(null)
-                .configuration(ConsumerConfiguration.builder().durable("y").build())
-                .build()
-                .getDurable());
-
-        assertEquals("x", PullSubscribeOptions.builder()
-                .durable("x")
-                .configuration(ConsumerConfiguration.builder().durable(null).build())
-                .build()
-                .getDurable());
-
-        assertEquals("x", PullSubscribeOptions.builder()
-                .durable("x")
-                .configuration(ConsumerConfiguration.builder().durable("x").build())
-                .build()
-                .getDurable());
+        PullSubscribeOptions lso = PullSubscribeOptions.builder()
+            .durable(null)
+            .configuration(ConsumerConfiguration.builder().durable("y").build())
+            .build();
+        assertEquals("y", lso.getDurable());
+        assertEquals("y", lso.getName());
+
+        lso = PullSubscribeOptions.builder()
+            .durable("x")
+            .configuration(ConsumerConfiguration.builder().durable(null).build())
+            .build();
+        assertEquals("x", lso.getDurable());
+        assertEquals("x", lso.getName());
+
+        lso = PullSubscribeOptions.builder()
+            .durable("x")
+            .configuration(ConsumerConfiguration.builder().durable("x").build())
+            .build();
+        assertEquals("x", lso.getDurable());
+        assertEquals("x", lso.getName());
 
         assertClientError(JsSoDurableMismatch, () -> PullSubscribeOptions.builder()
                 .durable("x")
@@ -185,6 +194,7 @@ public void testPullAffirmative() {
         PullSubscribeOptions so = builder.build();
         assertEquals(STREAM, so.getStream());
         assertEquals(DURABLE, so.getDurable());
+        assertEquals(DURABLE, so.getName());
         assertTrue(so.isPull());
 
         assertNotNull(so.toString()); // COVERAGE
@@ -204,14 +214,22 @@ public void testPushFieldValidation() {
         assertClientError(JsConsumerNameDurableMismatch, () -> PushSubscribeOptions.builder().name(NAME).durable(DURABLE).build());
 
         // durable directly
-        PushSubscribeOptions.builder().durable(DURABLE).build();
-        PushSubscribeOptions.builder().name(NAME).build();
+        PushSubscribeOptions uso = PushSubscribeOptions.builder().durable(DURABLE).build();
+        assertEquals(DURABLE, uso.getDurable());
+        assertEquals(DURABLE, uso.getName());
+        uso = PushSubscribeOptions.builder().name(NAME).build();
+        assertNull(uso.getDurable());
+        assertEquals(NAME, uso.getName());
 
         // in configuration
         ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(DURABLE).build();
-        PushSubscribeOptions.builder().configuration(cc).build();
+        uso = PushSubscribeOptions.builder().configuration(cc).build();
+        assertEquals(DURABLE, uso.getDurable());
+        assertEquals(DURABLE, uso.getName());
         cc = ConsumerConfiguration.builder().name(NAME).build();
-        PushSubscribeOptions.builder().configuration(cc).build();
+        uso = PushSubscribeOptions.builder().configuration(cc).build();
+        assertNull(uso.getDurable());
+        assertEquals(NAME, uso.getName());
 
         // new helper
         ConsumerConfiguration.builder().durable(DURABLE).buildPushSubscribeOptions();
@@ -232,14 +250,20 @@ public void testPullFieldValidation() {
         assertClientError(JsConsumerNameDurableMismatch, () -> PullSubscribeOptions.builder().name(NAME).durable(DURABLE).build());
 
         // durable directly
-        PullSubscribeOptions.builder().durable(DURABLE).build();
+        PullSubscribeOptions lso = PullSubscribeOptions.builder().durable(DURABLE).build();
+        assertEquals(DURABLE, lso.getDurable());
+        assertEquals(DURABLE, lso.getName());
 
         // in configuration
         ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(DURABLE).build();
-        PullSubscribeOptions.builder().configuration(cc).build();
+        lso = PullSubscribeOptions.builder().configuration(cc).build();
+        assertEquals(DURABLE, lso.getDurable());
+        assertEquals(DURABLE, lso.getName());
 
         // new helper
-        ConsumerConfiguration.builder().durable(DURABLE).buildPullSubscribeOptions();
+        lso = ConsumerConfiguration.builder().durable(DURABLE).buildPullSubscribeOptions();
+        assertEquals(DURABLE, lso.getDurable());
+        assertEquals(DURABLE, lso.getName());
     }
 
     @Test
@@ -259,23 +283,51 @@ public void testCreationErrors() {
 
     @Test
     public void testBindCreationErrors() {
-        assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.bind(null, DURABLE));
-        assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.bind(EMPTY, DURABLE));
+        String durOrName = name();
+
+        // bind
+        assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.bind(null, durOrName));
+        assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.bind(EMPTY, durOrName));
         assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.bind(STREAM, null));
         assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.bind(STREAM, EMPTY));
         assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.builder().stream(STREAM).bind(true).build());
-        assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.builder().stream(EMPTY).durable(DURABLE).bind(true).build());
-        assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.builder().durable(DURABLE).bind(true).build());
+
+        assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.builder().stream(EMPTY).durable(durOrName).bind(true).build());
+        assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.builder().durable(durOrName).bind(true).build());
         assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.builder().stream(STREAM).durable(EMPTY).bind(true).build());
 
-        assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.bind(null, DURABLE));
-        assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.bind(EMPTY, DURABLE));
+        assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.builder().stream(EMPTY).name(durOrName).bind(true).build());
+        assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.builder().name(durOrName).bind(true).build());
+        assertThrows(IllegalArgumentException.class, () -> PushSubscribeOptions.builder().stream(STREAM).name(EMPTY).bind(true).build());
+
+        assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.bind(null, durOrName));
+        assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.bind(EMPTY, durOrName));
         assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.bind(STREAM, null));
         assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.bind(STREAM, EMPTY));
         assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().stream(STREAM).bind(true).build());
-        assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().stream(EMPTY).durable(DURABLE).bind(true).build());
-        assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().durable(DURABLE).bind(true).build());
+
+        assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().stream(EMPTY).durable(durOrName).bind(true).build());
+        assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().durable(durOrName).bind(true).build());
         assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().stream(STREAM).durable(EMPTY).bind(true).build());
+
+        assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().stream(EMPTY).name(durOrName).bind(true).build());
+        assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().name(durOrName).bind(true).build());
+        assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().stream(STREAM).name(EMPTY).bind(true).build());
+
+        // fast bind
+        assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.fastBind(null, durOrName));
+        assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.fastBind(EMPTY, durOrName));
+        assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.fastBind(STREAM, null));
+        assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.fastBind(STREAM, EMPTY));
+        assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().stream(STREAM).fastBind(true).build());
+
+        assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().stream(EMPTY).durable(durOrName).fastBind(true).build());
+        assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().durable(durOrName).fastBind(true).build());
+        assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().stream(STREAM).durable(EMPTY).fastBind(true).build());
+
+        assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().stream(EMPTY).name(durOrName).fastBind(true).build());
+        assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().name(durOrName).fastBind(true).build());
+        assertThrows(IllegalArgumentException.class, () -> PullSubscribeOptions.builder().stream(STREAM).name(EMPTY).fastBind(true).build());
     }
 
     @Test
diff --git a/src/test/java/io/nats/client/impl/JetStreamPullTests.java b/src/test/java/io/nats/client/impl/JetStreamPullTests.java
index eb7ec2692..5fc7bc043 100644
--- a/src/test/java/io/nats/client/impl/JetStreamPullTests.java
+++ b/src/test/java/io/nats/client/impl/JetStreamPullTests.java
@@ -600,31 +600,84 @@ public void testAckWaitTimeout() throws Exception {
     public void testDurable() throws Exception {
         runInJsServer(nc -> {
             // create the stream.
-            createDefaultTestStream(nc);
+            CreateStreamResult csr = createMemoryStream(nc);
+            String durable = durable();
 
             // Create our JetStream context.
             JetStream js = nc.jetStream();
 
             // Build our subscription options normally
-            PullSubscribeOptions options1 = PullSubscribeOptions.builder().durable(DURABLE).build();
-            _testDurable(js, () -> js.subscribe(SUBJECT, options1));
+            PullSubscribeOptions options1 = PullSubscribeOptions.builder().durable(durable).build();
+            _testDurableOrNamed(js, csr.subject, () -> js.subscribe(csr.subject, options1));
 
             // bind long form
             PullSubscribeOptions options2 = PullSubscribeOptions.builder()
-                .stream(STREAM)
-                .durable(DURABLE)
+                .stream(csr.stream)
+                .durable(durable)
+                .bind(true)
+                .build();
+            _testDurableOrNamed(js, csr.subject, () -> js.subscribe(null, options2));
+
+            // fast bind long form
+            PullSubscribeOptions options3 = PullSubscribeOptions.builder()
+                .stream(csr.stream)
+                .durable(durable)
+                .fastBind(true)
+                .build();
+            _testDurableOrNamed(js, csr.subject, () -> js.subscribe(null, options3));
+
+            // bind short form
+            PullSubscribeOptions options4 = PullSubscribeOptions.bind(csr.stream, durable);
+            _testDurableOrNamed(js, csr.subject, () -> js.subscribe(null, options4));
+
+            // fast bind short form
+            PullSubscribeOptions options5 = PullSubscribeOptions.fastBind(csr.stream, durable);
+            _testDurableOrNamed(js, csr.subject, () -> js.subscribe(null, options5));
+        });
+    }
+
+    @Test
+    public void testNamed() throws Exception {
+        runInJsServer(nc -> {
+            JetStream js = nc.jetStream();
+            JetStreamManagement jsm = nc.jetStreamManagement();
+
+            CreateStreamResult csr = createMemoryStream(jsm);
+            String name = name();
+
+            jsm.addOrUpdateConsumer(csr.stream, ConsumerConfiguration.builder()
+                .name(name)
+                .inactiveThreshold(10_000)
+                .build());
+
+            // bind long form
+            PullSubscribeOptions options2 = PullSubscribeOptions.builder()
+                .stream(csr.stream)
+                .name(name)
                 .bind(true)
                 .build();
-            _testDurable(js, () -> js.subscribe(null, options2));
+            _testDurableOrNamed(js, csr.subject, () -> js.subscribe(null, options2));
+
+            // fast bind long form
+            PullSubscribeOptions options3 = PullSubscribeOptions.builder()
+                .stream(csr.stream)
+                .name(name)
+                .fastBind(true)
+                .build();
+            _testDurableOrNamed(js, csr.subject, () -> js.subscribe(null, options3));
 
             // bind short form
-            PullSubscribeOptions options3 = PullSubscribeOptions.bind(STREAM, DURABLE);
-            _testDurable(js, () -> js.subscribe(null, options3));
+            PullSubscribeOptions options4 = PullSubscribeOptions.bind(csr.stream, name);
+            _testDurableOrNamed(js, csr.subject, () -> js.subscribe(null, options4));
+
+            // fast bind short form
+            PullSubscribeOptions options5 = PullSubscribeOptions.fastBind(csr.stream, name);
+            _testDurableOrNamed(js, csr.subject, () -> js.subscribe(null, options5));
         });
     }
 
-    private void _testDurable(JetStream js, SubscriptionSupplier supplier) throws IOException, JetStreamApiException, InterruptedException {
-        jsPublish(js, SUBJECT, 2);
+    private void _testDurableOrNamed(JetStream js, String subject, SubscriptionSupplier supplier) throws IOException, JetStreamApiException, InterruptedException {
+        jsPublish(js, subject, 2);
 
         JetStreamSubscription sub = supplier.get();