Skip to content

Commit

Permalink
Fast Bind (#959)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Aug 29, 2023
1 parent 06e73f2 commit 20c20bb
Show file tree
Hide file tree
Showing 12 changed files with 285 additions and 98 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,7 @@ You can however set the deliver policy which will be used to start the subscript
| JsSoNameMismatch | SO | 90110 | Builder name must match the consumer configuration name if both are provided. |
| JsSoOrderedMemStorageNotSuppliedOrTrue | SO | 90111 | Mem Storage must be true if supplied. |
| JsSoOrderedReplicasNotSuppliedOrOne | SO | 90112 | Replicas must be 1 if supplied. |
| JsSoNameOrDurableRequiredForBind | SO | 90113 | Name or Durable required for Bind. |
| JsSubPullCantHaveDeliverGroup | SUB | 90001 | Pull subscriptions can't have a deliver group. |
| JsSubPullCantHaveDeliverSubject | SUB | 90002 | Pull subscriptions can't have a deliver subject. |
| JsSubPushCantHaveMaxPullWaiting | SUB | 90003 | Push subscriptions cannot supply max pull waiting. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public static void main(String[] args) {
// - the PullSubscribeOptions can be re-used since all the subscribers are the same
// - use a concurrent integer to track all the messages received
// - have a list of subscribers and threads so I can track them
PullSubscribeOptions pso = PullSubscribeOptions.bind(exArgs.stream, exArgs.durable);
PullSubscribeOptions pso = PullSubscribeOptions.fastBind(exArgs.stream, exArgs.durable);
AtomicInteger allReceived = new AtomicInteger();
List<JsPullSubWorker> subscribers = new ArrayList<>();
List<Thread> subThreads = new ArrayList<>();
Expand Down
46 changes: 42 additions & 4 deletions src/main/java/io/nats/client/PullSubscribeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,36 @@ public static Builder builder() {
}

/**
* Create PullSubscribeOptions where you are binding to
* a specific stream, specific durable and are using bind mode
* Create PullSubscribeOptions for binding to
* a specific stream and consumer by name.
* The client validates regular (non-fast)
* binds to ensure that provided consumer configuration
* is consistent with the server version and that
* consumer type (push versus pull) matches the subscription type.
* and that it matches the subscription type.
* @param stream the stream name to bind to
* @param name the consumer name, commonly the durable name
* @param name the consumer name
* @return push subscribe options
*/
public static PullSubscribeOptions bind(String stream, String name) {
return new PullSubscribeOptions.Builder().stream(stream).durable(name).bind(true).build();
return new Builder().stream(stream).name(name).bind(true).build();
}

/**
* Create PullSubscribeOptions where you are fast-binding to
* a specific stream and consumer by name.
* The client does not validate that the provided consumer configuration
* is consistent with the server version or that
* consumer type (push versus pull) matches the subscription type.
* An inconsistent consumer configuration for instance can result in
* receiving messages from unexpected subjects.
* A consumer type mismatch will result in an error from the server.
* @param stream the stream name to bind to
* @param name the consumer name, commonly the durable name
* @return push subscribe options
*/
public static PullSubscribeOptions fastBind(String stream, String name) {
return new Builder().stream(stream).name(name).fastBind(true).build();
}

/**
Expand All @@ -63,5 +85,21 @@ protected Builder getThis() {
public PullSubscribeOptions build() {
return new PullSubscribeOptions(this);
}

/**
* Specify binding to an existing consumer via name.
* The client does not validate that the provided consumer configuration
* is consistent with the server version or that
* consumer type (push versus pull) matches the subscription type.
* An inconsistent consumer configuration for instance can result in
* receiving messages from unexpected subjects.
* A consumer type mismatch will result in an error from the server.
* @return the builder
* @param fastBind whether to fast bind or not
*/
public Builder fastBind(boolean fastBind) {
this.fastBind = fastBind;
return this;
}
}
}
19 changes: 12 additions & 7 deletions src/main/java/io/nats/client/PushSubscribeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public String getDeliverGroup() {
* where you must specify the stream because
* the subject could apply to both a stream and a mirror.
* @deprecated
* This method is no longer used as bind has a different meaning.
* See {@link #stream(String)} instead.
* This method resolves to {@link #stream(String)} as bind has a different meaning
* and requires both stream and consumer name
* @param stream the stream name
* @return push subscribe options
*/
Expand All @@ -70,14 +70,19 @@ public static PushSubscribeOptions stream(String stream) {
}

/**
* Macro to create a PushSubscribeOptions where you are
* binding to an existing stream and durable consumer.
* Create PushSubscribeOptions for binding to
* a specific stream and consumer by name.
* The client validates regular (non-fast)
* binds to ensure that provided consumer configuration
* is consistent with the server version and that
* consumer type (push versus pull) matches the subscription type.
* and that it matches the subscription type.
* @param stream the stream name
* @param durable the durable name
* @param name the consumer name
* @return push subscribe options
*/
public static PushSubscribeOptions bind(String stream, String durable) {
return new PushSubscribeOptions.Builder().stream(stream).durable(durable).bind(true).build();
public static PushSubscribeOptions bind(String stream, String name) {
return new Builder().stream(stream).name(name).bind(true).build();
}

/**
Expand Down
54 changes: 42 additions & 12 deletions src/main/java/io/nats/client/SubscribeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,22 @@ public abstract class SubscribeOptions {
protected final String stream;
protected final boolean pull;
protected final boolean bind;
protected final boolean fastBind;
protected final boolean ordered;
protected final long messageAlarmTime;
protected final ConsumerConfiguration consumerConfig;
protected final long pendingMessageLimit; // Only applicable for non dispatched (sync) push consumers.
protected final long pendingByteLimit; // Only applicable for non dispatched (sync) push consumers.
protected final String name;

@SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars
protected SubscribeOptions(Builder builder, boolean isPull,
String deliverSubject, String deliverGroup,
long pendingMessageLimit, long pendingByteLimit) {

pull = isPull;
bind = builder.bind;
fastBind = builder.fastBind;
bind = fastBind || builder.bind;
ordered = builder.ordered;
messageAlarmTime = builder.messageAlarmTime;

Expand All @@ -52,12 +55,18 @@ protected SubscribeOptions(Builder builder, boolean isPull,

stream = validateStreamName(builder.stream, bind); // required when bind mode

String durable = validateMustMatchIfBothSupplied(builder.durable, builder.cc == null ? null : builder.cc.getDurable(), JsSoDurableMismatch);
durable = validateDurable(durable, bind); // required when bind

String name = validateMustMatchIfBothSupplied(builder.name, builder.cc == null ? null : builder.cc.getName(), JsSoNameMismatch);

validateMustMatchIfBothSupplied(name, durable, JsConsumerNameDurableMismatch);
// read the names and do basic validation
String temp = validateConsumerName(
validateMustMatchIfBothSupplied(builder.name, builder.cc == null ? null : builder.cc.getName(), JsSoNameMismatch),
false);
String durable = validateDurable(
validateMustMatchIfBothSupplied(builder.durable, builder.cc == null ? null : builder.cc.getDurable(), JsSoDurableMismatch),
false);
name = validateMustMatchIfBothSupplied(temp, durable, JsConsumerNameDurableMismatch);

if (bind && name == null) {
throw JsSoNameOrDurableRequiredForBind.instance();
}

deliverGroup = validateMustMatchIfBothSupplied(deliverGroup, builder.cc == null ? null : builder.cc.getDeliverGroup(), JsSoDeliverGroupMismatch);

Expand Down Expand Up @@ -96,7 +105,7 @@ protected SubscribeOptions(Builder builder, boolean isPull,
.ackPolicy(AckPolicy.None)
.maxDeliver(1)
.ackWait(Duration.ofHours(22))
.name(name)
.name(temp)
.memStorage(true)
.numReplicas(1);

Expand All @@ -110,7 +119,7 @@ protected SubscribeOptions(Builder builder, boolean isPull,
.durable(durable)
.deliverSubject(deliverSubject)
.deliverGroup(deliverGroup)
.name(name)
.name(temp)
.build();
}
}
Expand All @@ -131,6 +140,14 @@ public String getDurable() {
return consumerConfig.getDurable();
}

/**
* Gets the name of the consumer. Same as durable when the consumer is durable.
* @return the name
*/
public String getName() {
return name;
}

/**
* Gets whether this is a pull subscription
* @return the pull flag
Expand All @@ -140,13 +157,22 @@ public boolean isPull() {
}

/**
* Gets whether this subscription is expected to bind to an existing stream and durable consumer
* Gets whether this subscription is expected to bind to an existing stream and consumer
* @return the bind flag
*/
public boolean isBind() {
return bind;
}

/**
* Gets whether this subscription is expected to fast bind to an existing stream and consumer.
* Overrides bind
* @return the fast bind flag
*/
public boolean isFastBind() {
return fastBind;
}

/**
* Gets whether this subscription is expected to ensure messages come in order
* @return the ordered flag
Expand Down Expand Up @@ -204,6 +230,7 @@ public String toString() {
protected static abstract class Builder<B, SO> {
protected String stream;
protected boolean bind;
protected boolean fastBind;
protected String durable;
protected String name;
protected ConsumerConfiguration cc;
Expand All @@ -224,15 +251,18 @@ public B stream(String stream) {
}

/**
* Specify the to attach in direct mode
* Specify binding to an existing consumer via name.
* The client validates regular (non-fast)
* binds to ensure that provided consumer configuration
* is consistent with the server version and that
* consumer type (push versus pull) matches the subscription type.
* @return the builder
* @param bind whether to bind or not
*/
public B bind(boolean bind) {
this.bind = bind;
return getThis();
}

/**
* Sets the durable name for the consumer.
* Null or empty clears the field.
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/impl/NatsConsumerContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class NatsConsumerContext implements ConsumerContext, SimplifiedSubscript
consumerName = ci.getName();
originalOrderedCc = null;
subscribeSubject = null;
unorderedBindPso = PullSubscribeOptions.bind(sc.streamName, consumerName);
unorderedBindPso = PullSubscribeOptions.fastBind(sc.streamName, consumerName);
cachedConsumerInfo = ci;
}

Expand Down
12 changes: 6 additions & 6 deletions src/main/java/io/nats/client/impl/NatsJetStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ JetStreamSubscription createSubscription(String subject,
String inboxDeliver = userCC.getDeliverSubject();

// 3. Does this consumer already exist?
if (consumerName != null) {
if (!so.isFastBind() && consumerName != null) {
ConsumerInfo serverInfo = lookupConsumerInfo(fnlStream, consumerName);

if (serverInfo != null) { // the consumer for that durable already exists
Expand Down Expand Up @@ -390,7 +390,11 @@ else if (inboxDeliver == null) {
// If the consumer exists, I know what the settled info is
final String settledConsumerName;
final ConsumerConfiguration settledServerCC;
if (serverCC == null) {
if (so.isFastBind() || serverCC != null) {
settledServerCC = serverCC;
settledConsumerName = so.getName();
}
else {
ConsumerConfiguration.Builder ccBuilder = ConsumerConfiguration.builder(userCC);

// Pull mode doesn't maintain a deliver subject. It's actually an error if we send it.
Expand All @@ -407,10 +411,6 @@ else if (inboxDeliver == null) {
settledServerCC = ccBuilder.build();
settledConsumerName = null;
}
else {
settledServerCC = serverCC;
settledConsumerName = consumerName;
}

// 6. create the subscription. lambda needs final or effectively final vars
final MessageManager mm;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/nats/client/impl/NatsMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,9 @@ public long consumeByteCount() {
@Override
public String toString() {
if (subject == null) {
return "NatsMessage | " + protocolBytesToString();
return getClass().getSimpleName() + " | " + protocolBytesToString();
}
return "NatsMessage |" + subject + "|" + replyToString() + "|" + dataToString() + "|";
return getClass().getSimpleName() + " |" + subject + "|" + replyToString() + "|" + dataToString() + "|";
}

String toDetailString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class NatsJetStreamClientError {
public static final NatsJetStreamClientError JsSoNameMismatch = new NatsJetStreamClientError(SO, 90110, "Builder name must match the consumer configuration name if both are provided.");
public static final NatsJetStreamClientError JsSoOrderedMemStorageNotSuppliedOrTrue = new NatsJetStreamClientError(SO, 90111, "Mem Storage must be true if supplied.");
public static final NatsJetStreamClientError JsSoOrderedReplicasNotSuppliedOrOne = new NatsJetStreamClientError(SO, 90112, "Replicas must be 1 if supplied.");
public static final NatsJetStreamClientError JsSoNameOrDurableRequiredForBind = new NatsJetStreamClientError(SO, 90113, "Name or Durable required for Bind.");

public static final NatsJetStreamClientError OsObjectNotFound = new NatsJetStreamClientError(OS, 90201, "The object was not found.");
public static final NatsJetStreamClientError OsObjectIsDeleted = new NatsJetStreamClientError(OS, 90202, "The object is deleted.");
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/io/nats/client/support/Validator.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ public static String required(String s, String label) {
return s;
}

public static String required(String s1, String s2, String label) {
if (emptyAsNull(s1) == null && emptyAsNull(s2) == null) {
throw new IllegalArgumentException(label + " cannot be null or empty.");
}
return s1;
}

public static <T> T required(T o, String label) {
if (o == null) {
throw new IllegalArgumentException(label + " cannot be null.");
Expand Down
Loading

0 comments on commit 20c20bb

Please sign in to comment.