diff --git a/src/main/java/io/nats/client/BaseConsumeOptions.java b/src/main/java/io/nats/client/BaseConsumeOptions.java
index c6cc5b6ae..ab746458e 100644
--- a/src/main/java/io/nats/client/BaseConsumeOptions.java
+++ b/src/main/java/io/nats/client/BaseConsumeOptions.java
@@ -24,6 +24,7 @@
import static io.nats.client.support.JsonValueUtils.readBoolean;
import static io.nats.client.support.JsonValueUtils.readInteger;
import static io.nats.client.support.JsonValueUtils.readLong;
+import static io.nats.client.support.JsonValueUtils.*;
/**
* Base Consume Options are provided to customize the way the consume and
@@ -44,9 +45,11 @@ public class BaseConsumeOptions implements JsonSerializable {
protected final long idleHeartbeat;
protected final int thresholdPercent;
protected final boolean noWait;
+ protected final String group;
+ protected final long minPending;
+ protected final long minAckPending;
- @SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars
- protected BaseConsumeOptions(Builder b) {
+ protected BaseConsumeOptions(Builder, ?> b) {
bytes = b.bytes;
if (bytes > 0) {
messages = b.messages < 0 ? DEFAULT_MESSAGE_COUNT_WHEN_BYTES : b.messages;
@@ -55,6 +58,10 @@ protected BaseConsumeOptions(Builder b) {
messages = b.messages < 0 ? DEFAULT_MESSAGE_COUNT : b.messages;
}
+ this.group = b.group;
+ this.minPending = b.minPending;
+ this.minAckPending = b.minAckPending;
+
// validation handled in builder
thresholdPercent = b.thresholdPercent;
noWait = b.noWait;
@@ -82,6 +89,9 @@ public String toJson() {
addField(sb, IDLE_HEARTBEAT, idleHeartbeat);
addField(sb, THRESHOLD_PERCENT, thresholdPercent);
addFldWhenTrue(sb, NO_WAIT, noWait);
+ addField(sb, GROUP, group);
+ addField(sb, MIN_PENDING, minPending);
+ addField(sb, MIN_ACK_PENDING, minAckPending);
return endJson(sb).toString();
}
@@ -101,12 +111,27 @@ public boolean isNoWait() {
return noWait;
}
+ public String getGroup() {
+ return group;
+ }
+
+ public long getMinPending() {
+ return minPending;
+ }
+
+ public long getMinAckPending() {
+ return minAckPending;
+ }
+
protected static abstract class Builder {
protected int messages = -1;
protected long bytes = 0;
protected int thresholdPercent = DEFAULT_THRESHOLD_PERCENT;
protected long expiresIn = DEFAULT_EXPIRES_IN_MILLIS;
protected boolean noWait = false;
+ protected String group;
+ protected long minPending = -1;
+ protected long minAckPending = -1;
protected abstract B getThis();
@@ -137,6 +162,9 @@ public B jsonValue(JsonValue jsonValue) {
if (readBoolean(jsonValue, NO_WAIT, false)) {
noWait();
}
+ group(readStringEmptyAsNull(jsonValue, GROUP));
+ minPending(readLong(jsonValue, MIN_PENDING, -1));
+ minAckPending(readLong(jsonValue, MIN_ACK_PENDING, -1));
return getThis();
}
@@ -190,6 +218,36 @@ public B thresholdPercent(int thresholdPercent) {
return getThis();
}
+ /**
+ * Sets the group
+ * @param group the priority group for this pull
+ * @return Builder
+ */
+ public B group(String group) {
+ this.group = group;
+ return getThis();
+ }
+
+ /**
+ * When specified, the consumer will only receive messages when the consumer has at least this many pending messages.
+ * @param minPending the min pending
+ * @return the builder
+ */
+ public B minPending(long minPending) {
+ this.minPending = minPending < 1 ? -1 : minPending;
+ return getThis();
+ }
+
+ /**
+ * When specified, the consumer will only receive messages when the consumer has at least this many ack pending messages.
+ * @param minAckPending the min ack pending
+ * @return the builder
+ */
+ public B minAckPending(long minAckPending) {
+ this.minAckPending = minAckPending < 1 ? -1 : minAckPending;
+ return getThis();
+ }
+
/**
* Build the options.
* @return the built options
diff --git a/src/main/java/io/nats/client/JetStreamManagement.java b/src/main/java/io/nats/client/JetStreamManagement.java
index c1058da4d..098fe67cc 100644
--- a/src/main/java/io/nats/client/JetStreamManagement.java
+++ b/src/main/java/io/nats/client/JetStreamManagement.java
@@ -17,6 +17,7 @@
import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
/**
* JetStream Management context for creation and access to streams and consumers in NATS.
@@ -260,6 +261,17 @@ public interface JetStreamManagement {
*/
MessageInfo getMessage(String streamName, long seq) throws IOException, JetStreamApiException;
+ /**
+ * Get MessageInfo for the message matching the {@link MessageGetRequest}.
+ * @param streamName the name of the stream.
+ * @param messageGetRequest the {@link MessageGetRequest} to get a message
+ * @return The MessageInfo
+ * @throws IOException covers various communication issues with the NATS
+ * server such as timeout or interruption
+ * @throws JetStreamApiException the request had an error related to the data
+ */
+ MessageInfo getMessage(String streamName, MessageGetRequest messageGetRequest) throws IOException, JetStreamApiException;
+
/**
* Get MessageInfo for the last message of the subject.
* @param streamName the name of the stream.
@@ -282,6 +294,33 @@ public interface JetStreamManagement {
*/
MessageInfo getFirstMessage(String streamName, String subject) throws IOException, JetStreamApiException;
+ /**
+ * Get MessageInfo for the first message created at or after the start time.
+ *
+ * This API 1) is currently EXPERIMENTAL and is subject to change. 2) Works on Server 2.11 or later
+ * @param streamName the name of the stream.
+ * @param startTime the start time to get the first message for.
+ * @return The MessageInfo
+ * @throws IOException covers various communication issues with the NATS
+ * server such as timeout or interruption
+ * @throws JetStreamApiException the request had an error related to the data
+ */
+ MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime) throws IOException, JetStreamApiException;
+
+ /**
+ * Get MessageInfo for the first message created at or after the start time matching the subject.
+ *
+ * This API 1) is currently EXPERIMENTAL and is subject to change. 2) Works on Server 2.11 or later
+ * @param streamName the name of the stream.
+ * @param startTime the start time to get the first message for.
+ * @param subject the subject to get the first message for.
+ * @return The MessageInfo
+ * @throws IOException covers various communication issues with the NATS
+ * server such as timeout or interruption
+ * @throws JetStreamApiException the request had an error related to the data
+ */
+ MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime, String subject) throws IOException, JetStreamApiException;
+
/**
* Get MessageInfo for the message of the message sequence
* is equal to or greater the requested sequence for the subject.
@@ -295,6 +334,46 @@ public interface JetStreamManagement {
*/
MessageInfo getNextMessage(String streamName, long seq, String subject) throws IOException, JetStreamApiException;
+ /**
+ * Request a batch of messages using a {@link MessageBatchGetRequest}.
+ *
+ * This API 1) is currently EXPERIMENTAL and is subject to change. 2) Works on Server 2.11 or later
+ * @param streamName the name of the stream
+ * @param messageBatchGetRequest the request details
+ * @return a list containing {@link MessageInfo}
+ * @throws IOException covers various communication issues with the NATS
+ * server such as timeout or interruption
+ * @throws JetStreamApiException the request had an error related to the data
+ */
+ List fetchMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException;
+
+ /**
+ * Request a batch of messages using a {@link MessageBatchGetRequest}.
+ *
+ * This API 1) is currently EXPERIMENTAL and is subject to change. 2) Works on Server 2.11 or later
+ * @param streamName the name of the stream
+ * @param messageBatchGetRequest the request details
+ * @return a queue used to asynchronously receive {@link MessageInfo}
+ * @throws IOException covers various communication issues with the NATS
+ * server such as timeout or interruption
+ * @throws JetStreamApiException the request had an error related to the data
+ */
+ LinkedBlockingQueue queueMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException;
+
+ /**
+ * Request a batch of messages using a {@link MessageBatchGetRequest}.
+ *
+ * This API 1) is currently EXPERIMENTAL and is subject to change. 2) Works on Server 2.11 or later
+ * @param streamName the name of the stream
+ * @param messageBatchGetRequest the request details
+ * @param handler the handler used for receiving {@link MessageInfo}
+ * @return true if all messages were received and properly terminated with a server EOB
+ * @throws IOException covers various communication issues with the NATS
+ * server such as timeout or interruption
+ * @throws JetStreamApiException the request had an error related to the data
+ */
+ boolean requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException;
+
/**
* Deletes a message, overwriting the message data with garbage
* This can be considered an expensive (time-consuming) operation, but is more secure.
diff --git a/src/main/java/io/nats/client/MessageInfoHandler.java b/src/main/java/io/nats/client/MessageInfoHandler.java
index 6a9697a31..7f6c02d34 100644
--- a/src/main/java/io/nats/client/MessageInfoHandler.java
+++ b/src/main/java/io/nats/client/MessageInfoHandler.java
@@ -23,7 +23,6 @@ public interface MessageInfoHandler {
* Called to deliver a {@link MessageInfo} to the handler.
*
* @param messageInfo the received {@link MessageInfo}
- * @throws InterruptedException if the thread for this handler is interrupted
*/
- void onMessageInfo(MessageInfo messageInfo) throws InterruptedException;
+ void onMessageInfo(MessageInfo messageInfo);
}
diff --git a/src/main/java/io/nats/client/PullRequestOptions.java b/src/main/java/io/nats/client/PullRequestOptions.java
index e1feb91f0..68b1634af 100644
--- a/src/main/java/io/nats/client/PullRequestOptions.java
+++ b/src/main/java/io/nats/client/PullRequestOptions.java
@@ -31,6 +31,9 @@ public class PullRequestOptions implements JsonSerializable {
private final boolean noWait;
private final Duration expiresIn;
private final Duration idleHeartbeat;
+ private final String group;
+ private final long minPending;
+ private final long minAckPending;
public PullRequestOptions(Builder b) {
this.batchSize = b.batchSize;
@@ -38,6 +41,9 @@ public PullRequestOptions(Builder b) {
this.noWait = b.noWait;
this.expiresIn = b.expiresIn;
this.idleHeartbeat = b.idleHeartbeat;
+ this.group = b.group;
+ this.minPending = b.minPending < 0 ? -1 : b.minPending;
+ this.minAckPending = b.minAckPending < 0 ? -1 : b.minAckPending;
}
@Override
@@ -48,6 +54,10 @@ public String toJson() {
JsonUtils.addFldWhenTrue(sb, NO_WAIT, noWait);
JsonUtils.addFieldAsNanos(sb, EXPIRES, expiresIn);
JsonUtils.addFieldAsNanos(sb, IDLE_HEARTBEAT, idleHeartbeat);
+
+ JsonUtils.addField(sb, GROUP, group);
+ JsonUtils.addField(sb, MIN_PENDING, minPending);
+ JsonUtils.addField(sb, MIN_ACK_PENDING, minAckPending);
return JsonUtils.endJson(sb).toString();
}
@@ -91,6 +101,18 @@ public Duration getIdleHeartbeat() {
return idleHeartbeat;
}
+ public String getGroup() {
+ return group;
+ }
+
+ public long getMinPending() {
+ return minPending;
+ }
+
+ public long getMinAckPending() {
+ return minAckPending;
+ }
+
/**
* Creates a builder for the pull options, with batch size since it's always required
* @param batchSize the size of the batch. Must be greater than 0
@@ -115,6 +137,9 @@ public static class Builder {
private boolean noWait;
private Duration expiresIn;
private Duration idleHeartbeat;
+ private String group;
+ private long minPending = -1;
+ private long minAckPending = -1;
/**
* Set the batch size for the pull
@@ -195,6 +220,37 @@ public Builder idleHeartbeat(Duration idleHeartbeat) {
return this;
}
+ /**
+ * Sets the group
+ * Replaces any other groups set in the builder
+ * @param group the priority group for this pull
+ * @return Builder
+ */
+ public Builder group(String group) {
+ this.group = group;
+ return this;
+ }
+
+ /**
+ * When specified, the pull request will only receive messages when the consumer has at least this many pending messages.
+ * @param minPending the min pending
+ * @return the builder
+ */
+ public Builder minPending(long minPending) {
+ this.minPending = minPending < 1 ? -1 : minPending;
+ return this;
+ }
+
+ /**
+ * When specified, this Pull request will only receive messages when the consumer has at least this many ack pending messages.
+ * @param minAckPending the min ack pending
+ * @return the builder
+ */
+ public Builder minAckPending(long minAckPending) {
+ this.minAckPending = minAckPending < 1 ? -1 : minAckPending;
+ return this;
+ }
+
/**
* Build the PullRequestOptions.
*
Validates that the batch size is greater than 0
diff --git a/src/main/java/io/nats/client/api/ConsumerConfiguration.java b/src/main/java/io/nats/client/api/ConsumerConfiguration.java
index b6b651043..035c56137 100644
--- a/src/main/java/io/nats/client/api/ConsumerConfiguration.java
+++ b/src/main/java/io/nats/client/api/ConsumerConfiguration.java
@@ -13,8 +13,6 @@
package io.nats.client.api;
-import io.nats.client.Connection;
-import io.nats.client.JetStream;
import io.nats.client.PullSubscribeOptions;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.support.*;
@@ -44,6 +42,7 @@ public class ConsumerConfiguration implements JsonSerializable {
public static final DeliverPolicy DEFAULT_DELIVER_POLICY = DeliverPolicy.All;
public static final AckPolicy DEFAULT_ACK_POLICY = AckPolicy.Explicit;
public static final ReplayPolicy DEFAULT_REPLAY_POLICY = ReplayPolicy.Instant;
+ public static final PriorityPolicy DEFAULT_PRIORITY_POLICY = PriorityPolicy.None;
public static final Duration DURATION_UNSET = Duration.ZERO;
public static final Duration MIN_IDLE_HEARTBEAT = Duration.ofMillis(100);
@@ -88,6 +87,8 @@ public class ConsumerConfiguration implements JsonSerializable {
protected final List backoff;
protected final Map metadata;
protected final List filterSubjects;
+ protected final List priorityGroups;
+ protected final PriorityPolicy priorityPolicy;
protected ConsumerConfiguration(ConsumerConfiguration cc) {
this.deliverPolicy = cc.deliverPolicy;
@@ -119,6 +120,8 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) {
this.backoff = cc.backoff == null ? null : new ArrayList<>(cc.backoff);
this.metadata = cc.metadata == null ? null : new HashMap<>(cc.metadata);
this.filterSubjects = cc.filterSubjects == null ? null : new ArrayList<>(cc.filterSubjects);
+ this.priorityGroups = cc.priorityGroups == null ? null : new ArrayList<>(cc.priorityGroups);
+ this.priorityPolicy = cc.priorityPolicy;
}
// For the builder
@@ -157,6 +160,9 @@ protected ConsumerConfiguration(Builder b)
this.backoff = b.backoff;
this.metadata = b.metadata;
this.filterSubjects = b.filterSubjects;
+
+ this.priorityGroups = b.priorityGroups;
+ this.priorityPolicy = b.priorityPolicy;
}
/**
@@ -202,6 +208,10 @@ else if (filterSubjects.size() == 1) {
JsonUtils.addField(sb, FILTER_SUBJECT, filterSubjects.get(0));
}
}
+ JsonUtils.addStrings(sb, PRIORITY_GROUPS, priorityGroups);
+ if (priorityPolicy != null && priorityPolicy != DEFAULT_PRIORITY_POLICY) {
+ JsonUtils.addField(sb, PRIORITY_POLICY, priorityPolicy.toString());
+ }
return endJson(sb).toString();
}
@@ -311,6 +321,14 @@ public List getFilterSubjects() {
return filterSubjects;
}
+ /**
+ * Gets the priority groups as a list. May be null, otherwise won't be empty
+ * @return the list
+ */
+ public List getPriorityGroups() {
+ return priorityGroups;
+ }
+
/**
* Whether there are multiple filter subjects for this consumer configuration.
* @return true if there are multiple filter subjects
@@ -456,6 +474,14 @@ public ZonedDateTime getPauseUntil() {
return pauseUntil;
}
+ /**
+ * Gets the priority policy of this consumer configuration.
+ * @return the priority policy.
+ */
+ public PriorityPolicy getPriorityPolicy() {
+ return GetOrDefault(priorityPolicy);
+ }
+
/**
* Gets whether deliver policy of this consumer configuration was set or left unset
* @return true if the policy was set, false if the policy was not set
@@ -584,6 +610,14 @@ public boolean metadataWasSet() {
return metadata != null;
}
+ /**
+ * Gets whether priority policy for this consumer configuration was set or left unset
+ * @return true if the policy was set, false if the policy was not set
+ */
+ public boolean priorityPolicyWasSet() {
+ return priorityPolicy != null;
+ }
+
/**
* Creates a builder for the options.
* @return a publish options builder
@@ -644,6 +678,9 @@ public static class Builder {
private Map metadata;
private List filterSubjects;
+ private List priorityGroups;
+ private PriorityPolicy priorityPolicy;
+
/**
* Construct the builder
*/
@@ -680,6 +717,7 @@ public Builder(ConsumerConfiguration cc) {
this.maxBatch = cc.maxBatch;
this.maxBytes = cc.maxBytes;
this.numReplicas = cc.numReplicas;
+ this.pauseUntil = cc.pauseUntil;
this.flowControl = cc.flowControl;
this.headersOnly = cc.headersOnly;
@@ -694,6 +732,11 @@ public Builder(ConsumerConfiguration cc) {
if (cc.filterSubjects != null) {
this.filterSubjects = new ArrayList<>(cc.filterSubjects);
}
+
+ if (cc.priorityGroups != null) {
+ this.priorityGroups = new ArrayList<>(cc.priorityGroups);
+ }
+ this.priorityPolicy = cc.priorityPolicy;
}
}
@@ -715,6 +758,7 @@ public Builder json(String json) throws JsonParseException {
public Builder jsonValue(JsonValue jsonValue) {
deliverPolicy(DeliverPolicy.get(readString(jsonValue, DELIVER_POLICY)));
ackPolicy(AckPolicy.get(readString(jsonValue, ACK_POLICY)));
+
replayPolicy(ReplayPolicy.get(readString(jsonValue, REPLAY_POLICY)));
description(readString(jsonValue, DESCRIPTION));
@@ -774,6 +818,9 @@ public Builder jsonValue(JsonValue jsonValue) {
filterSubject(fs);
}
+ priorityGroups(readOptionalStringList(jsonValue, PRIORITY_GROUPS));
+ priorityPolicy(PriorityPolicy.get(readString(jsonValue, PRIORITY_POLICY)));
+
return this;
}
@@ -936,7 +983,6 @@ public Builder filterSubject(String filterSubject) {
return this;
}
-
/**
* Sets the filter subjects of the ConsumerConfiguration.
* Replaces any other filter subjects set in the builder
@@ -1295,6 +1341,47 @@ public Builder metadata(Map metadata) {
return this;
}
+ /**
+ * Sets the priority groups of the ConsumerConfiguration.
+ * Replaces any other priority groups set in the builder
+ * @param priorityGroups one or more priority groups
+ * @return Builder
+ */
+ public Builder priorityGroups(String... priorityGroups) {
+ return priorityGroups(Arrays.asList(priorityGroups));
+ }
+
+ /**
+ * Sets the priority groups of the ConsumerConfiguration.
+ * Replaces any other priority groups set in the builder
+ * @param priorityGroups the list of priority groups
+ * @return Builder
+ */
+ public Builder priorityGroups(List priorityGroups) {
+ this.priorityGroups = new ArrayList<>();
+ if (priorityGroups != null) {
+ for (String pg : priorityGroups) {
+ if (!nullOrEmpty(pg)) {
+ this.priorityGroups.add(pg);
+ }
+ }
+ }
+ if (this.priorityGroups.isEmpty()) {
+ this.priorityGroups = null;
+ }
+ return this;
+ }
+
+ /**
+ * Sets the priority policy of the ConsumerConfiguration.
+ * @param policy the priority policy.
+ * @return Builder
+ */
+ public Builder priorityPolicy(PriorityPolicy policy) {
+ this.priorityPolicy = policy;
+ return this;
+ }
+
/**
* Builds the ConsumerConfiguration
* @return The consumer configuration.
@@ -1401,4 +1488,5 @@ protected static Duration normalizeDuration(long millis)
protected static DeliverPolicy GetOrDefault(DeliverPolicy p) { return p == null ? DEFAULT_DELIVER_POLICY : p; }
protected static AckPolicy GetOrDefault(AckPolicy p) { return p == null ? DEFAULT_ACK_POLICY : p; }
protected static ReplayPolicy GetOrDefault(ReplayPolicy p) { return p == null ? DEFAULT_REPLAY_POLICY : p; }
+ protected static PriorityPolicy GetOrDefault(PriorityPolicy p) { return p == null ? DEFAULT_PRIORITY_POLICY : p; }
}
diff --git a/src/main/java/io/nats/client/api/MessageBatchGetRequest.java b/src/main/java/io/nats/client/api/MessageBatchGetRequest.java
new file mode 100644
index 000000000..58e41bdb7
--- /dev/null
+++ b/src/main/java/io/nats/client/api/MessageBatchGetRequest.java
@@ -0,0 +1,207 @@
+// Copyright 2024 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package io.nats.client.api;
+
+import io.nats.client.support.JsonSerializable;
+import io.nats.client.support.Validator;
+
+import java.time.ZonedDateTime;
+import java.util.List;
+
+import static io.nats.client.support.ApiConstants.*;
+import static io.nats.client.support.JsonUtils.*;
+
+/**
+ * Object used to make a request for message batch get requests.
+ */
+public class MessageBatchGetRequest implements JsonSerializable {
+
+ private final int batch;
+ private final String nextBySubject;
+ private final int maxBytes;
+ private final long minSequence;
+ private final ZonedDateTime startTime;
+ private final List multiLastBySubjects;
+ private final long upToSequence;
+ private final ZonedDateTime upToTime;
+
+ // batch constructor
+ private MessageBatchGetRequest(String subject,
+ int batch,
+ int maxBytes,
+ long minSequence,
+ ZonedDateTime startTime)
+ {
+ Validator.required(subject, "Subject");
+ Validator.validateGtZero(batch, "Batch");
+
+ this.nextBySubject = subject;
+ this.batch = batch;
+ this.maxBytes = maxBytes;
+ this.startTime = startTime;
+ this.multiLastBySubjects = null;
+ this.upToSequence = -1;
+ this.upToTime = null;
+
+ this.minSequence = startTime == null && minSequence < 1 ? 1 : minSequence;
+ }
+
+ public static MessageBatchGetRequest batch(String subject, int batch) {
+ return new MessageBatchGetRequest(subject, batch, -1, -1, null);
+ }
+
+ public static MessageBatchGetRequest batch(String subject, int batch, long minSequence) {
+ return new MessageBatchGetRequest(subject, batch, -1, minSequence, null);
+ }
+
+ public static MessageBatchGetRequest batch(String subject, int batch, ZonedDateTime startTime) {
+ return new MessageBatchGetRequest(subject, batch, -1, -1, startTime);
+ }
+
+ public static MessageBatchGetRequest batchBytes(String subject, int batch, int maxBytes) {
+ return new MessageBatchGetRequest(subject, batch, maxBytes, -1, null);
+ }
+
+ public static MessageBatchGetRequest batchBytes(String subject, int batch, int maxBytes, long minSequence) {
+ return new MessageBatchGetRequest(subject, batch, maxBytes, minSequence, null);
+ }
+
+ public static MessageBatchGetRequest batchBytes(String subject, int batch, int maxBytes, ZonedDateTime startTime) {
+ return new MessageBatchGetRequest(subject, batch, maxBytes, -1, startTime);
+ }
+
+ // multi for constructor
+ private MessageBatchGetRequest(List subjects, long upToSequence, ZonedDateTime upToTime, int batch) {
+ Validator.required(subjects, "Subjects");
+ this.batch = batch;
+ nextBySubject = null;
+ this.maxBytes = -1;
+ this.minSequence = -1;
+ this.startTime = null;
+ this.multiLastBySubjects = subjects;
+ this.upToSequence = upToSequence;
+ this.upToTime = upToTime;
+ }
+
+ public static MessageBatchGetRequest multiLastForSubjects(List subjects) {
+ return new MessageBatchGetRequest(subjects, -1, null, -1);
+ }
+
+ public static MessageBatchGetRequest multiLastForSubjects(List subjects, long upToSequence) {
+ return new MessageBatchGetRequest(subjects, upToSequence, null, -1);
+ }
+
+ public static MessageBatchGetRequest multiLastForSubjects(List subjects, ZonedDateTime upToTime) {
+ return new MessageBatchGetRequest(subjects, -1, upToTime, -1);
+ }
+
+ public static MessageBatchGetRequest multiLastForSubjectsBatch(List subjects, int batch) {
+ return new MessageBatchGetRequest(subjects, -1, null, batch);
+ }
+
+ public static MessageBatchGetRequest multiLastForSubjectsBatch(List subjects, long upToSequence, int batch) {
+ return new MessageBatchGetRequest(subjects, upToSequence, null, batch);
+ }
+
+ public static MessageBatchGetRequest multiLastForSubjectsBatch(List subjects, ZonedDateTime upToTime, int batch) {
+ return new MessageBatchGetRequest(subjects, -1, upToTime, batch);
+ }
+
+ /**
+ * Maximum amount of messages to be returned for this request.
+ * @return batch size
+ */
+ public int getBatch() {
+ return batch;
+ }
+
+ /**
+ * Maximum amount of returned bytes for this request.
+ * Limits the amount of returned messages to not exceed this.
+ * @return maximum bytes
+ */
+ public int getMaxBytes() {
+ return maxBytes;
+ }
+
+ /**
+ * Minimum sequence for returned messages.
+ * All returned messages will have a sequence equal to or higher than this.
+ * @return minimum message sequence
+ */
+ public long getMinSequence() {
+ return minSequence;
+ }
+
+ /**
+ * Minimum start time for returned messages.
+ * All returned messages will have a start time equal to or higher than this.
+ * @return minimum message start time
+ */
+ public ZonedDateTime getStartTime() {
+ return startTime;
+ }
+
+ /**
+ * Subject used to filter messages that should be returned.
+ * @return the subject to filter
+ */
+ public String getNextBySubject() {
+ return nextBySubject;
+ }
+
+ /**
+ * Subjects filter used, these can include wildcards.
+ * Will get the last messages matching the subjects.
+ * @return the subjects to get the last messages for
+ */
+ public List getMultiLastBySubjects() {
+ return multiLastBySubjects;
+ }
+
+ /**
+ * Only return messages up to this sequence.
+ * @return the maximum message sequence to return results for
+ */
+ public long getUpToSequence() {
+ return upToSequence;
+ }
+
+ /**
+ * Only return messages up to this time.
+ * @return the maximum message time to return results for
+ */
+ public ZonedDateTime getUpToTime() {
+ return upToTime;
+ }
+
+ @Override
+ public String toJson() {
+ StringBuilder sb = beginJson();
+ addField(sb, BATCH, batch);
+ addField(sb, MAX_BYTES, maxBytes);
+ addField(sb, START_TIME, startTime);
+ addField(sb, SEQ, minSequence);
+ addField(sb, NEXT_BY_SUBJECT, nextBySubject);
+ addStrings(sb, MULTI_LAST, multiLastBySubjects);
+ addField(sb, UP_TO_SEQ, upToSequence);
+ addField(sb, UP_TO_TIME, upToTime);
+ return endJson(sb).toString();
+ }
+
+ @Override
+ public String toString() {
+ return toJson();
+ }
+}
diff --git a/src/main/java/io/nats/client/api/MessageGetRequest.java b/src/main/java/io/nats/client/api/MessageGetRequest.java
index da608587e..fce4e87e6 100644
--- a/src/main/java/io/nats/client/api/MessageGetRequest.java
+++ b/src/main/java/io/nats/client/api/MessageGetRequest.java
@@ -21,7 +21,7 @@
import static io.nats.client.support.JsonUtils.*;
/**
- * Object used to make a request for special message get requests.
+ * Object used to make a request for message get requests.
*/
public class MessageGetRequest implements JsonSerializable {
private final long sequence;
@@ -53,46 +53,6 @@ public static MessageGetRequest nextForSubject(long sequence, String subject) {
return new MessageGetRequest(sequence, null, subject, null);
}
- /**
- * @deprecated use static method forSequence with .serialize instead
- * @param sequence start sequence
- * @return rendered output
- */
- @Deprecated
- public static byte[] seqBytes(long sequence) {
- return forSequence(sequence).serialize();
- }
-
- /**
- * @deprecated use static method lastForSubject with .serialize instead
- * @param subject filter subject
- * @return rendered output
- */
- @Deprecated
- public static byte[] lastBySubjectBytes(String subject) {
- return lastForSubject(subject).serialize();
- }
-
- /**
- * @deprecated use static method forSequence instead
- *
- * @param sequence start sequence number
- */
- @Deprecated
- public MessageGetRequest(long sequence) {
- this(sequence, null, null, null);
- }
-
- /**
- * @deprecated use static method lastForSubject instead
- *
- * @param lastBySubject filter subject
- */
- @Deprecated
- public MessageGetRequest(String lastBySubject) {
- this(-1, lastBySubject, null, null);
- }
-
private MessageGetRequest(long sequence, String lastBySubject, String nextBySubject, ZonedDateTime startTime) {
this.sequence = sequence;
this.lastBySubject = lastBySubject;
@@ -133,4 +93,44 @@ public String toJson() {
addField(sb, START_TIME, startTime);
return endJson(sb).toString();
}
+
+ /**
+ * @deprecated use static method forSequence with .serialize instead
+ * @param sequence start sequence
+ * @return rendered output
+ */
+ @Deprecated
+ public static byte[] seqBytes(long sequence) {
+ return forSequence(sequence).serialize();
+ }
+
+ /**
+ * @deprecated use static method lastForSubject with .serialize instead
+ * @param subject filter subject
+ * @return rendered output
+ */
+ @Deprecated
+ public static byte[] lastBySubjectBytes(String subject) {
+ return lastForSubject(subject).serialize();
+ }
+
+ /**
+ * @deprecated use static method forSequence instead
+ *
+ * @param sequence start sequence number
+ */
+ @Deprecated
+ public MessageGetRequest(long sequence) {
+ this(sequence, null, null, null);
+ }
+
+ /**
+ * @deprecated use static method lastForSubject instead
+ *
+ * @param lastBySubject filter subject
+ */
+ @Deprecated
+ public MessageGetRequest(String lastBySubject) {
+ this(-1, lastBySubject, null, null);
+ }
}
diff --git a/src/main/java/io/nats/client/api/MessageInfo.java b/src/main/java/io/nats/client/api/MessageInfo.java
index dbdf0d7b6..123983b85 100644
--- a/src/main/java/io/nats/client/api/MessageInfo.java
+++ b/src/main/java/io/nats/client/api/MessageInfo.java
@@ -15,10 +15,7 @@
import io.nats.client.Message;
import io.nats.client.impl.Headers;
-import io.nats.client.support.DateTimeUtils;
-import io.nats.client.support.IncomingHeadersProcessor;
-import io.nats.client.support.JsonUtils;
-import io.nats.client.support.JsonValue;
+import io.nats.client.support.*;
import java.time.ZonedDateTime;
@@ -32,7 +29,6 @@
*/
public class MessageInfo extends ApiResponse {
- private final boolean direct;
private final String subject;
private final long seq;
private final byte[] data;
@@ -40,6 +36,8 @@ public class MessageInfo extends ApiResponse {
private final Headers headers;
private final String stream;
private final long lastSeq;
+ private final long numPending;
+ private final Status status;
/**
* Create a Message Info
@@ -48,58 +46,85 @@ public class MessageInfo extends ApiResponse {
*/
@Deprecated
public MessageInfo(Message msg) {
- this(msg, null, false);
+ this(msg, null, null, false);
}
/**
* Create a Message Info
- * This signature is public for testing purposes and is not intended to be used externally.
* @param msg the message
* @param streamName the stream name if known
- * @param direct true if the object is being created from a get direct api call instead of the standard get message
+ * @param parseDirect true if the object is being created from a direct api call instead of get message
*/
- public MessageInfo(Message msg, String streamName, boolean direct) {
- super(direct ? null : msg);
+ public MessageInfo(Message msg, String streamName, boolean parseDirect) {
+ this(msg, null, streamName, parseDirect);
+ }
- this.direct = direct;
+ /**
+ * Create a Message Info
+ * @param status the status
+ * @param streamName the stream name if known
+ */
+ public MessageInfo(Status status, String streamName) {
+ this(null, status, streamName, false);
+ }
- if (direct) {
+ private MessageInfo(Message msg, Status status, String streamName, boolean parseDirect) {
+ super(parseDirect ? null : msg);
+
+ // working vars because the object vars are final
+ String _subject = null;
+ long _seq = -1;
+ byte[] _data = null;
+ ZonedDateTime _time = null;
+ Headers _headers = null;
+ String _stream = null;
+ long _lastSeq = -1;
+ long _numPending = -1;
+ Status _status = null;
+
+ if (status != null) {
+ _status = status;
+ _stream = streamName;
+ }
+ else if (parseDirect) {
Headers msgHeaders = msg.getHeaders();
- this.subject = msgHeaders.getLast(NATS_SUBJECT);
- this.data = msg.getData();
- seq = Long.parseLong(msgHeaders.getLast(NATS_SEQUENCE));
- time = DateTimeUtils.parseDateTime(msgHeaders.getLast(NATS_TIMESTAMP));
- stream = msgHeaders.getLast(NATS_STREAM);
+ _subject = msgHeaders.getLast(NATS_SUBJECT);
+ _data = msg.getData();
+ _seq = Long.parseLong(msgHeaders.getLast(NATS_SEQUENCE));
+ _time = DateTimeUtils.parseDateTime(msgHeaders.getLast(NATS_TIMESTAMP));
+ _stream = msgHeaders.getLast(NATS_STREAM);
String tempLastSeq = msgHeaders.getLast(NATS_LAST_SEQUENCE);
- if (tempLastSeq == null) {
- lastSeq = -1;
+ if (tempLastSeq != null) {
+ _lastSeq = JsonUtils.safeParseLong(tempLastSeq, -1);
}
- else {
- lastSeq = JsonUtils.safeParseLong(tempLastSeq, -1);
+ String tempNumPending = msgHeaders.getLast(NATS_NUM_PENDING);
+ if (tempNumPending != null) {
+ _numPending = Long.parseLong(tempNumPending) - 1;
}
- // these are control headers, not real headers so don't give them to the user.
- headers = new Headers(msgHeaders, true, MESSAGE_INFO_HEADERS);
- }
- else if (hasError()) {
- subject = null;
- data = null;
- seq = -1;
- time = null;
- headers = null;
- stream = null;
- lastSeq = -1;
+
+ // these are control headers, not real headers so don't give them to the user. Must be done last
+ _headers = new Headers(msgHeaders, true, MESSAGE_INFO_HEADERS);
}
- else {
+ else if (!hasError()){
JsonValue mjv = readValue(jv, MESSAGE);
- subject = readString(mjv, SUBJECT);
- data = readBase64(mjv, DATA);
- seq = readLong(mjv, SEQ, 0);
- time = readDate(mjv, TIME);
+ _subject = readString(mjv, SUBJECT);
+ _data = readBase64(mjv, DATA);
+ _seq = readLong(mjv, SEQ, 0);
+ _time = readDate(mjv, TIME);
byte[] hdrBytes = readBase64(mjv, HDRS);
- headers = hdrBytes == null ? null : new IncomingHeadersProcessor(hdrBytes).getHeaders();
- stream = streamName;
- lastSeq = -1;
+ _headers = hdrBytes == null ? null : new IncomingHeadersProcessor(hdrBytes).getHeaders();
+ _stream = streamName;
}
+
+ this.subject = _subject;
+ this.data = _data;
+ this.seq = _seq;
+ this.time = _time;
+ this.headers = _headers;
+ this.stream = _stream;
+ this.lastSeq = _lastSeq;
+ this.numPending = _numPending;
+ this.status = _status;
}
/**
@@ -158,24 +183,79 @@ public long getLastSeq() {
return lastSeq;
}
+ /**
+ * Amount of pending messages that can be requested with a subsequent batch request.
+ * @return number of pending messages
+ */
+ public long getNumPending() {
+ return numPending;
+ }
+
+ /**
+ * Get the Status object. Null if this MessageInfo is not a Status.
+ * @return the status object
+ */
+ public Status getStatus() {
+ return status;
+ }
+
+ /**
+ * Whether this MessageInfo is a regular message
+ * @return true if the MessageInfo is a regular message
+ */
+ public boolean isMessage() {
+ return status == null && !hasError();
+ }
+
+ /**
+ * Whether this MessageInfo is a status message
+ * @return true if this MessageInfo is a status message
+ */
+ public boolean isStatus() {
+ return status != null;
+ }
+
+ /**
+ * Whether this MessageInfo is a status message and is a direct EOB status
+ * @return true if this MessageInfo is a status message and is a direct EOB status
+ */
+ public boolean isEobStatus() {
+ return status != null && status.isEob();
+ }
+
+ /**
+ * Whether this MessageInfo is a status message and is an error status
+ * @return true if this MessageInfo is a status message and is an error status
+ */
+ public boolean isErrorStatus() {
+ return status != null && !status.isEob();
+ }
+
@Override
public String toString() {
StringBuilder sb = JsonUtils.beginJsonPrefixed("\"MessageInfo\":");
- JsonUtils.addField(sb, "direct", direct);
- JsonUtils.addField(sb, ERROR, getError());
- JsonUtils.addField(sb, SUBJECT, subject);
- JsonUtils.addField(sb, SEQ, seq);
- if (data == null) {
- addRawJson(sb, DATA, "null");
+ if (status != null) {
+ JsonUtils.addField(sb, "status_code", status.getCode());
+ JsonUtils.addField(sb, "status_message", status.getMessage());
+ }
+ else if (hasError()) {
+ JsonUtils.addField(sb, ERROR, getError());
}
else {
- JsonUtils.addField(sb, "data_length", data.length);
+ JsonUtils.addField(sb, SEQ, seq);
+ JsonUtils.addField(sb, LAST_SEQ, lastSeq);
+ JsonUtils.addFieldWhenGteMinusOne(sb, NUM_PENDING, numPending);
+ JsonUtils.addField(sb, STREAM, stream);
+ JsonUtils.addField(sb, SUBJECT, subject);
+ JsonUtils.addField(sb, TIME, time);
+ if (data == null) {
+ addRawJson(sb, DATA, "null");
+ }
+ else {
+ JsonUtils.addField(sb, "data_length", data.length);
+ }
+ JsonUtils.addField(sb, HDRS, headers);
}
- JsonUtils.addField(sb, TIME, time);
- JsonUtils.addField(sb, STREAM, stream);
- JsonUtils.addField(sb, LAST_SEQ, lastSeq);
- JsonUtils.addField(sb, SUBJECT, subject);
- JsonUtils.addField(sb, HDRS, headers);
return JsonUtils.endJson(sb).toString();
}
}
diff --git a/src/main/java/io/nats/client/api/PriorityPolicy.java b/src/main/java/io/nats/client/api/PriorityPolicy.java
new file mode 100644
index 000000000..0e0d93f16
--- /dev/null
+++ b/src/main/java/io/nats/client/api/PriorityPolicy.java
@@ -0,0 +1,49 @@
+// Copyright 2024 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package io.nats.client.api;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Represents the Priority Policy of a consumer
+ */
+public enum PriorityPolicy {
+ None("none"),
+ Overflow("overflow"),
+ PinnedClient("pinned_client");
+
+ private final String policy;
+
+ PriorityPolicy(String p) {
+ policy = p;
+ }
+
+ @Override
+ public String toString() {
+ return policy;
+ }
+
+ private static final Map strEnumHash = new HashMap<>();
+
+ static {
+ for (PriorityPolicy env : PriorityPolicy.values()) {
+ strEnumHash.put(env.toString(), env);
+ }
+ }
+
+ public static PriorityPolicy get(String value) {
+ return strEnumHash.get(value);
+ }
+}
diff --git a/src/main/java/io/nats/client/impl/NatsFetchConsumer.java b/src/main/java/io/nats/client/impl/NatsFetchConsumer.java
index ad8df7870..6cafaaf74 100644
--- a/src/main/java/io/nats/client/impl/NatsFetchConsumer.java
+++ b/src/main/java/io/nats/client/impl/NatsFetchConsumer.java
@@ -22,7 +22,6 @@
import static io.nats.client.BaseConsumeOptions.MIN_EXPIRES_MILLS;
class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer, PullManagerObserver {
- private final boolean isNoWait;
private final boolean isNoWaitNoExpires;
private final long maxWaitNanos;
private final String pullSubject;
@@ -34,7 +33,7 @@ class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer
{
super(cachedConsumerInfo);
- isNoWait = fetchConsumeOptions.isNoWait();
+ boolean isNoWait = fetchConsumeOptions.isNoWait();
long expiresInMillis = fetchConsumeOptions.getExpiresInMillis();
isNoWaitNoExpires = isNoWait && expiresInMillis == ConsumerConfiguration.LONG_UNSET;
@@ -53,6 +52,9 @@ class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer
.expiresIn(expiresInMillis)
.idleHeartbeat(fetchConsumeOptions.getIdleHeartbeat())
.noWait(isNoWait)
+ .group(fetchConsumeOptions.getGroup())
+ .minPending(fetchConsumeOptions.getMinPending())
+ .minAckPending(fetchConsumeOptions.getMinAckPending())
.build();
initSub(subscriptionMaker.subscribe(null, null, null, inactiveThreshold));
pullSubject = sub._pull(pro, false, this);
diff --git a/src/main/java/io/nats/client/impl/NatsJetStream.java b/src/main/java/io/nats/client/impl/NatsJetStream.java
index 8363e2ac7..fc88da33d 100644
--- a/src/main/java/io/nats/client/impl/NatsJetStream.java
+++ b/src/main/java/io/nats/client/impl/NatsJetStream.java
@@ -531,6 +531,9 @@ public List getChanges(ConsumerConfiguration serverCc) {
if (metadata != null && !mapsAreEquivalent(metadata, serverCcc.metadata)) { changes.add("metadata"); }
if (filterSubjects != null && !listsAreEquivalent(filterSubjects, serverCcc.filterSubjects)) { changes.add("filterSubjects"); }
+ if (priorityGroups != null && !listsAreEquivalent(priorityGroups, serverCcc.priorityGroups)) { changes.add("priorityGroups"); }
+ if (priorityPolicy != null && priorityPolicy != serverCcc.getPriorityPolicy()) { changes.add("priorityPolicy"); }
+
// do not need to check Durable because the original is retrieved by the durable name
return changes;
diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java
index 4ad522ee9..54355c949 100644
--- a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java
+++ b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java
@@ -16,12 +16,17 @@
import io.nats.client.*;
import io.nats.client.api.Error;
import io.nats.client.api.*;
+import io.nats.client.support.Status;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.ZonedDateTime;
+import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import static io.nats.client.support.NatsJetStreamClientError.JsAllowDirectRequired;
+import static io.nats.client.support.NatsJetStreamClientError.JsDirectBatchGet211NotAvailable;
import static io.nats.client.support.Validator.*;
public class NatsJetStreamManagement extends NatsJetStreamImpl implements JetStreamManagement {
@@ -273,6 +278,11 @@ public MessageInfo getMessage(String streamName, long seq) throws IOException, J
return _getMessage(streamName, MessageGetRequest.forSequence(seq));
}
+ @Override
+ public MessageInfo getMessage(String streamName, MessageGetRequest messageGetRequest) throws IOException, JetStreamApiException {
+ return _getMessage(streamName, messageGetRequest);
+ }
+
/**
* {@inheritDoc}
*/
@@ -289,6 +299,22 @@ public MessageInfo getFirstMessage(String streamName, String subject) throws IOE
return _getMessage(streamName, MessageGetRequest.firstForSubject(subject));
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime) throws IOException, JetStreamApiException {
+ return _getMessage(streamName, MessageGetRequest.firstForStartTime(startTime));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime, String subject) throws IOException, JetStreamApiException {
+ return _getMessage(streamName, MessageGetRequest.firstForStartTimeAndSubject(startTime, subject));
+ }
+
/**
* {@inheritDoc}
*/
@@ -324,6 +350,111 @@ private MessageInfo _getMessage(String streamName, MessageGetRequest messageGetR
}
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List fetchMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException {
+ validateMessageBatchGetRequest(streamName, messageBatchGetRequest);
+ final List results = new ArrayList<>();
+ _requestMessageBatch(streamName, messageBatchGetRequest, false, mi -> {
+ if (mi.isErrorStatus()) {
+ results.clear();
+ }
+ results.add(mi);
+ });
+ return results;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public LinkedBlockingQueue queueMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException {
+ validateMessageBatchGetRequest(streamName, messageBatchGetRequest);
+ final LinkedBlockingQueue q = new LinkedBlockingQueue<>();
+ conn.getOptions().getExecutor().submit(
+ () -> _requestMessageBatch(streamName, messageBatchGetRequest, true, q::add));
+ return q;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException {
+ validateMessageBatchGetRequest(streamName, messageBatchGetRequest);
+ return _requestMessageBatch(streamName, messageBatchGetRequest, true, handler);
+ }
+
+ private boolean _requestMessageBatch(String streamName, MessageBatchGetRequest mbgr, boolean sendEob, MessageInfoHandler handler) {
+ Subscription sub = null;
+
+ try {
+ String replyTo = conn.createInbox();
+ sub = conn.subscribe(replyTo);
+
+ String subject = prependPrefix(String.format(JSAPI_DIRECT_GET, streamName));
+ conn.publish(subject, replyTo, mbgr.serialize());
+
+ while (true) {
+ Message msg = sub.nextMessage(getTimeout());
+ Status errorOrNonEob = null;
+ if (msg == null) {
+ errorOrNonEob = Status.TIMEOUT_OR_NO_MESSAGES;
+ }
+ else if (msg.isStatusMessage()) {
+ if (msg.getStatus().isEob()) {
+ return true; // will send eob in finally if caller asked
+ }
+ errorOrNonEob = msg.getStatus();
+ }
+
+ if (errorOrNonEob != null) {
+ // All error or non eob statuses, always send, but it is the last message to the caller
+ sendEob = false;
+ handler.onMessageInfo(new MessageInfo(errorOrNonEob, streamName));
+ return false; // should not time out before eob
+ }
+
+ MessageInfo messageInfo = new MessageInfo(msg, streamName, true);
+ handler.onMessageInfo(messageInfo);
+ }
+ }
+ catch (InterruptedException e) {
+ // sub.nextMessage was fetching one message
+ // and data is not completely read
+ // so it seems like this is an error condition
+ Thread.currentThread().interrupt();
+ sendEob = false;
+ return false;
+ } finally {
+ if (sendEob) {
+ try {
+ handler.onMessageInfo(new MessageInfo(Status.EOB, streamName));
+ }
+ catch (RuntimeException ignore) { /* user handler runtime error */ }
+ }
+ try {
+ //noinspection DataFlowIssue
+ sub.unsubscribe();
+ } catch (RuntimeException ignore) { /* don't want this to fail here */ }
+ }
+ }
+
+ private void validateMessageBatchGetRequest(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException {
+ validateNotNull(messageBatchGetRequest, "Message Batch Get Request");
+
+ if (!directBatchGet211Available) {
+ throw JsDirectBatchGet211NotAvailable.instance();
+ }
+
+ CachedStreamInfo csi = getCachedStreamInfo(streamName);
+ if (!csi.allowDirect) {
+ throw JsAllowDirectRequired.instance();
+ }
+ }
+
/**
* {@inheritDoc}
*/
diff --git a/src/main/java/io/nats/client/impl/NatsMessageConsumer.java b/src/main/java/io/nats/client/impl/NatsMessageConsumer.java
index 8e4f7a125..0b0a322ca 100644
--- a/src/main/java/io/nats/client/impl/NatsMessageConsumer.java
+++ b/src/main/java/io/nats/client/impl/NatsMessageConsumer.java
@@ -19,7 +19,7 @@
import java.io.IOException;
class NatsMessageConsumer extends NatsMessageConsumerBase implements PullManagerObserver {
- protected final ConsumeOptions opts;
+ protected final ConsumeOptions consumeOpts;
protected final int thresholdMessages;
protected final long thresholdBytes;
protected final SimplifiedSubscriptionMaker subscriptionMaker;
@@ -28,21 +28,21 @@ class NatsMessageConsumer extends NatsMessageConsumerBase implements PullManager
NatsMessageConsumer(SimplifiedSubscriptionMaker subscriptionMaker,
ConsumerInfo cachedConsumerInfo,
- ConsumeOptions opts,
+ ConsumeOptions consumeOpts,
Dispatcher userDispatcher,
final MessageHandler userMessageHandler) throws IOException, JetStreamApiException
{
super(cachedConsumerInfo);
this.subscriptionMaker = subscriptionMaker;
- this.opts = opts;
+ this.consumeOpts = consumeOpts;
this.userDispatcher = userDispatcher;
this.userMessageHandler = userMessageHandler;
- int bm = opts.getBatchSize();
- long bb = opts.getBatchBytes();
- int rePullMessages = Math.max(1, bm * opts.getThresholdPercent() / 100);
- long rePullBytes = bb == 0 ? 0 : Math.max(1, bb * opts.getThresholdPercent() / 100);
+ int bm = consumeOpts.getBatchSize();
+ long bb = consumeOpts.getBatchBytes();
+ int rePullMessages = Math.max(1, bm * consumeOpts.getThresholdPercent() / 100);
+ long rePullBytes = bb == 0 ? 0 : Math.max(1, bb * consumeOpts.getThresholdPercent() / 100);
thresholdMessages = bm - rePullMessages;
thresholdBytes = bb == 0 ? Integer.MIN_VALUE : bb - rePullBytes;
@@ -85,12 +85,15 @@ public void pendingUpdated() {
}
private void repull() {
- int rePullMessages = Math.max(1, opts.getBatchSize() - pmm.pendingMessages);
- long rePullBytes = opts.getBatchBytes() == 0 ? 0 : opts.getBatchBytes() - pmm.pendingBytes;
+ int rePullMessages = Math.max(1, consumeOpts.getBatchSize() - pmm.pendingMessages);
+ long rePullBytes = consumeOpts.getBatchBytes() == 0 ? 0 : consumeOpts.getBatchBytes() - pmm.pendingBytes;
PullRequestOptions pro = PullRequestOptions.builder(rePullMessages)
.maxBytes(rePullBytes)
- .expiresIn(opts.getExpiresInMillis())
- .idleHeartbeat(opts.getIdleHeartbeat())
+ .expiresIn(consumeOpts.getExpiresInMillis())
+ .idleHeartbeat(consumeOpts.getIdleHeartbeat())
+ .group(consumeOpts.getGroup())
+ .minPending(consumeOpts.getMinPending())
+ .minAckPending(consumeOpts.getMinAckPending())
.build();
sub._pull(pro, false, this);
}
diff --git a/src/main/java/io/nats/client/support/Status.java b/src/main/java/io/nats/client/support/Status.java
index dd2de4705..ed985bc9b 100644
--- a/src/main/java/io/nats/client/support/Status.java
+++ b/src/main/java/io/nats/client/support/Status.java
@@ -21,13 +21,15 @@ public class Status {
public static final String FLOW_CONTROL_TEXT = "FlowControl Request";
public static final String HEARTBEAT_TEXT = "Idle Heartbeat";
public static final String NO_RESPONDERS_TEXT = "No Responders Available For Request";
+ public static final String EOB_TEXT = "EOB";
public static final int FLOW_OR_HEARTBEAT_STATUS_CODE = 100;
public static final int NO_RESPONDERS_CODE = 503;
public static final int BAD_REQUEST_CODE = 400;
public static final int NOT_FOUND_CODE = 404;
- public static final int REQUEST_TIMEOUT_CODE = 408;
+ public static final int BAD_JS_REQUEST_CODE = 408;
+ public static final int REQUEST_TIMEOUT_CODE = BAD_JS_REQUEST_CODE; // only left in for b/w compat
public static final int CONFLICT_CODE = 409;
- public static final int EOB = 204;
+ public static final int EOB_CODE = 204;
public static String BAD_REQUEST = "Bad Request"; // 400
public static String NO_MESSAGES = "No Messages"; // 404
@@ -44,6 +46,9 @@ public class Status {
public static String SERVER_SHUTDOWN = "Server Shutdown"; // 409 informational with headers
public static String LEADERSHIP_CHANGE = "Leadership Change"; // 409
+ public static final Status EOB = new Status(EOB_CODE, EOB_TEXT);
+ public static final Status TIMEOUT_OR_NO_MESSAGES = new Status(NOT_FOUND_CODE, "Timeout or No Messages");
+
private final int code;
private final String message;
@@ -112,4 +117,8 @@ public boolean isHeartbeat() {
public boolean isNoResponders() {
return code == NO_RESPONDERS_CODE && message.equals(NO_RESPONDERS_TEXT);
}
+
+ public boolean isEob() {
+ return code == EOB_CODE && message.equals(EOB_TEXT);
+ }
}
diff --git a/src/test/java/io/nats/client/api/ConsumerConfigurationTests.java b/src/test/java/io/nats/client/api/ConsumerConfigurationTests.java
index 7d843661d..ada96a042 100644
--- a/src/test/java/io/nats/client/api/ConsumerConfigurationTests.java
+++ b/src/test/java/io/nats/client/api/ConsumerConfigurationTests.java
@@ -67,13 +67,18 @@ public void testBuilder() throws Exception {
.headersOnly(true)
.memStorage(true)
.backoff(1000, 2000, 3000)
- .metadata(metadata);
-
- ConsumerConfiguration c = builder.build();
- assertNotNull(c.toString()); // COVERAGE
- assertAsBuilt(c, zdt);
-
- ConsumerCreateRequest ccr = new ConsumerCreateRequest(STREAM, c);
+ .metadata(metadata)
+ .priorityGroups("pgroup1", "pgroup2")
+ .priorityPolicy(PriorityPolicy.Overflow)
+ ;
+
+ ConsumerConfiguration cc = builder.build();
+ assertNotNull(cc.toString()); // COVERAGE
+ assertAsBuilt(cc, zdt);
+ assertAsBuilt(ConsumerConfiguration.builder(cc).build(), zdt);
+ assertAsBuilt(ConsumerConfiguration.builder().json(cc.toJson()).build(), zdt);
+
+ ConsumerCreateRequest ccr = new ConsumerCreateRequest(STREAM, cc);
assertNotNull(ccr.toString()); // COVERAGE
assertEquals(STREAM, ccr.getStreamName());
assertNotNull(ccr.getConfig());
@@ -83,22 +88,22 @@ public void testBuilder() throws Exception {
assertDefaultCc(new SerializableConsumerConfiguration().getConsumerConfiguration());
_testSerializing(new SerializableConsumerConfiguration(builder), zdt);
- _testSerializing(new SerializableConsumerConfiguration(c), zdt);
+ _testSerializing(new SerializableConsumerConfiguration(cc), zdt);
// flow control idle heartbeat combo
- c = ConsumerConfiguration.builder()
+ cc = ConsumerConfiguration.builder()
.flowControl(Duration.ofMillis(501)).build();
- assertTrue(c.isFlowControl());
- assertEquals(501, c.getIdleHeartbeat().toMillis());
+ assertTrue(cc.isFlowControl());
+ assertEquals(501, cc.getIdleHeartbeat().toMillis());
- c = ConsumerConfiguration.builder()
+ cc = ConsumerConfiguration.builder()
.flowControl(502).build();
- assertTrue(c.isFlowControl());
- assertEquals(502, c.getIdleHeartbeat().toMillis());
+ assertTrue(cc.isFlowControl());
+ assertEquals(502, cc.getIdleHeartbeat().toMillis());
// millis instead of duration coverage
// supply null as deliverPolicy, ackPolicy , replayPolicy,
- c = ConsumerConfiguration.builder()
+ cc = ConsumerConfiguration.builder()
.deliverPolicy(null)
.ackPolicy(null)
.replayPolicy(null)
@@ -106,11 +111,11 @@ public void testBuilder() throws Exception {
.idleHeartbeat(6000) // millis
.build();
- assertEquals(DEFAULT_ACK_POLICY, c.getAckPolicy());
- assertEquals(DEFAULT_DELIVER_POLICY, c.getDeliverPolicy());
- assertEquals(DEFAULT_REPLAY_POLICY, c.getReplayPolicy());
- assertEquals(Duration.ofSeconds(9), c.getAckWait());
- assertEquals(Duration.ofSeconds(6), c.getIdleHeartbeat());
+ assertEquals(DEFAULT_ACK_POLICY, cc.getAckPolicy());
+ assertEquals(DEFAULT_DELIVER_POLICY, cc.getDeliverPolicy());
+ assertEquals(DEFAULT_REPLAY_POLICY, cc.getReplayPolicy());
+ assertEquals(Duration.ofSeconds(9), cc.getAckWait());
+ assertEquals(Duration.ofSeconds(6), cc.getIdleHeartbeat());
ConsumerConfiguration original = ConsumerConfiguration.builder().build();
validateDefault(original);
@@ -125,47 +130,47 @@ public void testBuilder() throws Exception {
validateDefault(ccTest);
// flow control coverage
- c = ConsumerConfiguration.builder().build();
- assertFalse(c.isFlowControl());
+ cc = ConsumerConfiguration.builder().build();
+ assertFalse(cc.isFlowControl());
- c = ConsumerConfiguration.builder().flowControl(1000).build();
- assertTrue(c.isFlowControl());
+ cc = ConsumerConfiguration.builder().flowControl(1000).build();
+ assertTrue(cc.isFlowControl());
// headers only coverage
- c = ConsumerConfiguration.builder().build();
- assertFalse(c.isHeadersOnly());
+ cc = ConsumerConfiguration.builder().build();
+ assertFalse(cc.isHeadersOnly());
- c = ConsumerConfiguration.builder().headersOnly(false).build();
- assertFalse(c.isHeadersOnly());
+ cc = ConsumerConfiguration.builder().headersOnly(false).build();
+ assertFalse(cc.isHeadersOnly());
- c = ConsumerConfiguration.builder().headersOnly(true).build();
- assertTrue(c.isHeadersOnly());
+ cc = ConsumerConfiguration.builder().headersOnly(true).build();
+ assertTrue(cc.isHeadersOnly());
// mem storage coverage
- c = ConsumerConfiguration.builder().build();
- assertFalse(c.isMemStorage());
+ cc = ConsumerConfiguration.builder().build();
+ assertFalse(cc.isMemStorage());
- c = ConsumerConfiguration.builder().memStorage(false).build();
- assertFalse(c.isMemStorage());
+ cc = ConsumerConfiguration.builder().memStorage(false).build();
+ assertFalse(cc.isMemStorage());
- c = ConsumerConfiguration.builder().memStorage(true).build();
- assertTrue(c.isMemStorage());
+ cc = ConsumerConfiguration.builder().memStorage(true).build();
+ assertTrue(cc.isMemStorage());
// idleHeartbeat coverage
- c = ConsumerConfiguration.builder().idleHeartbeat(null).build();
- assertNull(c.getIdleHeartbeat());
+ cc = ConsumerConfiguration.builder().idleHeartbeat(null).build();
+ assertNull(cc.getIdleHeartbeat());
- c = ConsumerConfiguration.builder().idleHeartbeat(Duration.ZERO).build();
- assertEquals(DURATION_UNSET, c.getIdleHeartbeat());
+ cc = ConsumerConfiguration.builder().idleHeartbeat(Duration.ZERO).build();
+ assertEquals(DURATION_UNSET, cc.getIdleHeartbeat());
- c = ConsumerConfiguration.builder().idleHeartbeat(0).build();
- assertEquals(DURATION_UNSET, c.getIdleHeartbeat());
+ cc = ConsumerConfiguration.builder().idleHeartbeat(0).build();
+ assertEquals(DURATION_UNSET, cc.getIdleHeartbeat());
- c = ConsumerConfiguration.builder().idleHeartbeat(Duration.ofMillis(MIN_IDLE_HEARTBEAT_MILLIS + 1)).build();
- assertEquals(Duration.ofMillis(MIN_IDLE_HEARTBEAT_MILLIS + 1), c.getIdleHeartbeat());
+ cc = ConsumerConfiguration.builder().idleHeartbeat(Duration.ofMillis(MIN_IDLE_HEARTBEAT_MILLIS + 1)).build();
+ assertEquals(Duration.ofMillis(MIN_IDLE_HEARTBEAT_MILLIS + 1), cc.getIdleHeartbeat());
- c = ConsumerConfiguration.builder().idleHeartbeat(MIN_IDLE_HEARTBEAT_MILLIS + 1).build();
- assertEquals(Duration.ofMillis(MIN_IDLE_HEARTBEAT_MILLIS + 1), c.getIdleHeartbeat());
+ cc = ConsumerConfiguration.builder().idleHeartbeat(MIN_IDLE_HEARTBEAT_MILLIS + 1).build();
+ assertEquals(Duration.ofMillis(MIN_IDLE_HEARTBEAT_MILLIS + 1), cc.getIdleHeartbeat());
assertThrows(IllegalArgumentException.class,
() -> ConsumerConfiguration.builder().idleHeartbeat(Duration.ofMillis(MIN_IDLE_HEARTBEAT_MILLIS - 1)).build());
@@ -174,20 +179,20 @@ public void testBuilder() throws Exception {
() -> ConsumerConfiguration.builder().idleHeartbeat(MIN_IDLE_HEARTBEAT_MILLIS - 1).build());
// backoff coverage
- c = ConsumerConfiguration.builder().backoff(Duration.ofSeconds(1), null, Duration.ofSeconds(2)).build();
- assertEquals(2, c.getBackoff().size());
- assertEquals(Duration.ofSeconds(1), c.getBackoff().get(0));
- assertEquals(Duration.ofSeconds(2), c.getBackoff().get(1));
+ cc = ConsumerConfiguration.builder().backoff(Duration.ofSeconds(1), null, Duration.ofSeconds(2)).build();
+ assertEquals(2, cc.getBackoff().size());
+ assertEquals(Duration.ofSeconds(1), cc.getBackoff().get(0));
+ assertEquals(Duration.ofSeconds(2), cc.getBackoff().get(1));
assertThrows(IllegalArgumentException.class,
() -> ConsumerConfiguration.builder().backoff(Duration.ZERO).build());
assertThrows(IllegalArgumentException.class,
() -> ConsumerConfiguration.builder().backoff(Duration.ofNanos(DURATION_MIN_LONG - 1)).build());
- c = ConsumerConfiguration.builder().backoff(1000, 2000).build();
- assertEquals(2, c.getBackoff().size());
- assertEquals(Duration.ofSeconds(1), c.getBackoff().get(0));
- assertEquals(Duration.ofSeconds(2), c.getBackoff().get(1));
+ cc = ConsumerConfiguration.builder().backoff(1000, 2000).build();
+ assertEquals(2, cc.getBackoff().size());
+ assertEquals(Duration.ofSeconds(1), cc.getBackoff().get(0));
+ assertEquals(Duration.ofSeconds(2), cc.getBackoff().get(1));
assertThrows(IllegalArgumentException.class,
() -> ConsumerConfiguration.builder().backoff(0).build());
@@ -224,6 +229,7 @@ private void validateDefault(ConsumerConfiguration cc) {
assertFalse(cc.maxBytesWasSet());
assertFalse(cc.numReplicasWasSet());
assertFalse(cc.memStorageWasSet());
+ assertFalse(cc.priorityPolicyWasSet());
}
private void assertAsBuilt(ConsumerConfiguration c, ZonedDateTime zdt) {
@@ -273,45 +279,62 @@ private void assertAsBuilt(ConsumerConfiguration c, ZonedDateTime zdt) {
assertEquals(Duration.ofSeconds(3), c.getBackoff().get(2));
assertEquals(1, c.getMetadata().size());
assertEquals(META_VALUE, c.getMetadata().get(META_KEY));
+ assertEquals(PriorityPolicy.Overflow, c.getPriorityPolicy());
+ assertNotNull(c.getPriorityGroups());
+ assertEquals(2, c.getPriorityGroups().size());
+ assertTrue(c.getPriorityGroups().contains("pgroup1"));
+ assertTrue(c.getPriorityGroups().contains("pgroup2"));
}
@Test
- public void testParsingAndSetters() {
+ public void testParsingAndSetters() throws JsonParseException {
String json = dataAsString("ConsumerConfiguration.json");
- ConsumerConfiguration c = ConsumerConfiguration.builder().jsonValue(JsonParser.parseUnchecked(json)).build();
-
- assertEquals("foo-desc", c.getDescription());
- assertEquals(DeliverPolicy.All, c.getDeliverPolicy());
- assertEquals(AckPolicy.All, c.getAckPolicy());
- assertEquals(Duration.ofSeconds(30), c.getAckWait());
- assertEquals(Duration.ofSeconds(20), c.getIdleHeartbeat());
- assertEquals(10, c.getMaxDeliver());
- assertEquals(73, c.getRateLimit());
- assertEquals(ReplayPolicy.Original, c.getReplayPolicy());
- assertEquals(2020, c.getStartTime().getYear(), 2020);
- assertEquals(21, c.getStartTime().getSecond(), 21);
- assertEquals("foo-name", c.getName());
- assertEquals("foo-name", c.getDurable());
- assertEquals("bar", c.getDeliverSubject());
- assertEquals("foo-filter", c.getFilterSubject());
- assertEquals(42, c.getMaxAckPending());
- assertEquals("sample_freq-value", c.getSampleFrequency());
- assertTrue(c.isFlowControl());
- assertEquals(128, c.getMaxPullWaiting());
- assertTrue(c.isHeadersOnly());
- assertTrue(c.isMemStorage());
- assertEquals(99, c.getStartSequence());
- assertEquals(55, c.getMaxBatch());
- assertEquals(56, c.getMaxBytes());
- assertEquals(5, c.getNumReplicas());
- assertEquals(Duration.ofSeconds(40), c.getMaxExpires());
- assertEquals(Duration.ofSeconds(50), c.getInactiveThreshold());
- assertEquals(3, c.getBackoff().size());
- assertEquals(Duration.ofSeconds(1), c.getBackoff().get(0));
- assertEquals(Duration.ofSeconds(2), c.getBackoff().get(1));
- assertEquals(Duration.ofSeconds(3), c.getBackoff().get(2));
- assertEquals(1, c.getMetadata().size());
- assertEquals(META_VALUE, c.getMetadata().get(META_KEY));
+ ConsumerConfiguration cc = ConsumerConfiguration.builder().jsonValue(JsonParser.parseUnchecked(json)).build();
+
+ assertEquals("foo-desc", cc.getDescription());
+ assertEquals(DeliverPolicy.All, cc.getDeliverPolicy());
+ assertEquals(AckPolicy.All, cc.getAckPolicy());
+ assertEquals(Duration.ofSeconds(30), cc.getAckWait());
+ assertEquals(Duration.ofSeconds(20), cc.getIdleHeartbeat());
+ assertEquals(10, cc.getMaxDeliver());
+ assertEquals(73, cc.getRateLimit());
+ assertEquals(ReplayPolicy.Original, cc.getReplayPolicy());
+ assertEquals(2020, cc.getStartTime().getYear(), 2020);
+ assertEquals(21, cc.getStartTime().getSecond(), 21);
+ assertEquals("foo-name", cc.getName());
+ assertEquals("foo-name", cc.getDurable());
+ assertEquals("bar", cc.getDeliverSubject());
+ assertEquals("foo-filter", cc.getFilterSubject());
+ assertEquals(42, cc.getMaxAckPending());
+ assertEquals("sample_freq-value", cc.getSampleFrequency());
+ assertTrue(cc.isFlowControl());
+ assertEquals(128, cc.getMaxPullWaiting());
+ assertTrue(cc.isHeadersOnly());
+ assertTrue(cc.isMemStorage());
+ assertEquals(99, cc.getStartSequence());
+ assertEquals(55, cc.getMaxBatch());
+ assertEquals(56, cc.getMaxBytes());
+ assertEquals(5, cc.getNumReplicas());
+ assertEquals(Duration.ofSeconds(40), cc.getMaxExpires());
+ assertEquals(Duration.ofSeconds(50), cc.getInactiveThreshold());
+ assertEquals(3, cc.getBackoff().size());
+ assertEquals(Duration.ofSeconds(1), cc.getBackoff().get(0));
+ assertEquals(Duration.ofSeconds(2), cc.getBackoff().get(1));
+ assertEquals(Duration.ofSeconds(3), cc.getBackoff().get(2));
+ assertEquals(1, cc.getMetadata().size());
+ assertEquals(META_VALUE, cc.getMetadata().get(META_KEY));
+ assertEquals(PriorityPolicy.Overflow, cc.getPriorityPolicy());
+ assertNotNull(cc.getPriorityGroups());
+ assertEquals(2, cc.getPriorityGroups().size());
+ assertTrue(cc.getPriorityGroups().contains("pgroup1"));
+ assertTrue(cc.getPriorityGroups().contains("pgroup2"));
+
+ String edit = cc.toJson().replace("filter_subject", "filter_subjects").replace("\"foo-filter\"","[\"fs1\",\"fs2\"]");
+ cc = ConsumerConfiguration.builder().json(edit).build();
+ assertNotNull(cc.getFilterSubjects());
+ assertEquals(2, cc.getFilterSubjects().size());
+ assertTrue(cc.getFilterSubjects().contains("fs1"));
+ assertTrue(cc.getFilterSubjects().contains("fs2"));
assertDefaultCc(new ConsumerConfiguration(ConsumerConfiguration.builder().jsonValue(JsonValue.EMPTY_MAP).build()));
}
diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java
index 2db7c56a2..7b853a300 100644
--- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java
+++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java
@@ -16,6 +16,7 @@
import io.nats.client.*;
import io.nats.client.api.*;
import io.nats.client.support.DateTimeUtils;
+import io.nats.client.support.JsonUtils;
import io.nats.client.utils.TestBase;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
@@ -24,14 +25,15 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.ZonedDateTime;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import static io.nats.client.support.ApiConstants.*;
import static io.nats.client.support.DateTimeUtils.DEFAULT_TIME;
import static io.nats.client.support.DateTimeUtils.ZONE_ID_GMT;
+import static io.nats.client.support.NatsJetStreamClientError.JsAllowDirectRequired;
import static io.nats.client.support.NatsJetStreamConstants.*;
+import static io.nats.client.support.Status.NOT_FOUND_CODE;
import static io.nats.client.utils.ResourceUtils.dataAsString;
import static org.junit.jupiter.api.Assertions.*;
@@ -1545,4 +1547,359 @@ public void testCreateConsumerUpdateConsumer() throws Exception {
assertEquals(fs1, ci.getConsumerConfiguration().getFilterSubject());
});
}
+
+ @Test
+ public void testBatchDirectGetErrorsAndStatuses() throws Exception {
+ assertThrows(IllegalArgumentException.class, () -> MessageBatchGetRequest.batch(null, 1));
+ assertThrows(IllegalArgumentException.class, () -> MessageBatchGetRequest.batch("", 1));
+ assertThrows(IllegalArgumentException.class, () -> MessageBatchGetRequest.batch(">", 0));
+ assertThrows(IllegalArgumentException.class, () -> MessageBatchGetRequest.multiLastForSubjects(null));
+
+ jsServer.run(TestBase::atLeast2_11, nc -> {
+ JetStreamManagement jsm = nc.jetStreamManagement();
+
+ String streamNoDirect = variant();
+ String subject = variant();
+ StreamConfiguration sc = StreamConfiguration.builder()
+ .name(streamNoDirect)
+ .storageType(StorageType.Memory)
+ .subjects(subject)
+ .build();
+ StreamInfo si = jsm.addStream(sc);
+ assertFalse(si.getConfiguration().getAllowDirect());
+
+ // Stream doesn't have AllowDirect enabled, will error.
+ IllegalArgumentException iae = assertThrows(IllegalArgumentException.class, () -> {
+ MessageBatchGetRequest request = MessageBatchGetRequest.batch("subject", 1);
+ jsm.requestMessageBatch(streamNoDirect, request, mi -> {});
+ });
+ assertTrue(iae.getMessage().contains(JsAllowDirectRequired.id()));
+
+ String stream = variant();
+ subject = variant();
+ sc = StreamConfiguration.builder()
+ .name(stream)
+ .storageType(StorageType.Memory)
+ .subjects(subject)
+ .allowDirect(true)
+ .build();
+ jsm.addStream(sc);
+
+ MessageBatchGetRequest request = MessageBatchGetRequest.batch(subject, 3);
+
+ // no messages yet - handler
+ List list = new ArrayList<>();
+ jsm.requestMessageBatch(stream, request, list::add);
+ verifyError(list, NOT_FOUND_CODE);
+
+ // no messages yet - fetch
+ verifyError(jsm.fetchMessageBatch(stream, request), NOT_FOUND_CODE);
+
+ // no messages yet - queue
+ LinkedBlockingQueue queue = jsm.queueMessageBatch(stream, request);
+ verifyError(queueToList(queue), NOT_FOUND_CODE);
+
+ jsm.jetStream().publish(subject, dataBytes());
+
+ // subject not found
+ request = MessageBatchGetRequest.batch("invalid", 3);
+ verifyError(jsm.fetchMessageBatch(stream, request), NOT_FOUND_CODE);
+
+ request = MessageBatchGetRequest.multiLastForSubjects(Collections.singletonList("invalid"), 3);
+ verifyError(jsm.fetchMessageBatch(stream, request), NOT_FOUND_CODE);
+
+ // sequence larger
+ request = MessageBatchGetRequest.batch(subject, 3, 2);
+ verifyError(jsm.fetchMessageBatch(stream, request), NOT_FOUND_CODE);
+
+ List subjects = Collections.singletonList(subject);
+
+ // batch, time after
+ // awaiting https://github.com/nats-io/nats-server/issues/6032
+// ZonedDateTime time = ZonedDateTime.now().plusSeconds(10);
+// request = MessageBatchGetRequest.batch(subject, 3, time);
+// verifyError(jsm.fetchMessageBatch(stream, request), NOT_FOUND_CODE);
+
+ // last for, time before
+ // awaiting https://github.com/nats-io/nats-server/issues/6077
+// time = ZonedDateTime.now().minusSeconds(10);
+// request = MessageBatchGetRequest.multiLastForSubjects(subjects, time);
+// verifyError(jsm.fetchMessageBatch(stream, request), NOT_FOUND_CODE);
+ });
+ }
+
+ private static void verifyError(List list, int code) {
+ assertEquals(1, list.size());
+ MessageInfo mi = list.get(0);
+ assertFalse(mi.isMessage());
+ assertTrue(mi.isStatus());
+ assertFalse(mi.isEobStatus());
+ assertTrue(mi.isErrorStatus());
+ assertEquals(code, mi.getStatus().getCode());
+ }
+
+ @Test
+ public void testBatchDirectGet() throws Exception {
+ jsServer.run(TestBase::atLeast2_11, nc -> {
+ JetStream js = nc.jetStream();
+ JetStreamManagement jsm = nc.jetStreamManagement();
+
+ String stream = variant();
+ String subject = variant();
+ StreamConfiguration sc = StreamConfiguration.builder()
+ .name(stream)
+ .storageType(StorageType.Memory)
+ .subjects(subject + ".>")
+ .allowDirect(true)
+ .build();
+ StreamInfo si = jsm.addStream(sc);
+ assertTrue(si.getConfiguration().getAllowDirect());
+
+ byte[] payload = new byte[1000];
+ for (int per = 1; per <= 5; per++) {
+ for (char c = 'A'; c <= 'E'; c++) {
+ String s = subject + "." + c;
+ js.publish(s, payload);
+ Thread.sleep(10); // make sure there are no duplicate times
+ }
+ Thread.sleep(2500);
+ }
+ ZonedDateTime time = jsm.getMessage(stream, 6).getTime().minusSeconds(1);
+
+ String subjectAll = subject + ".>";
+ String subjectA = subject + ".A";
+ String subjectC = subject + ".C";
+ String subjectE = subject + ".E";
+
+ // 1/1A batch only
+ // 2/2a batch with starting sequence
+ // 3/3a batch with start time
+ // 4/4a batch with max bytes
+ // 5/5a batch with max bytes and starting sequence
+ // 6/6a batch with max bytes and start time
+ MessageBatchGetRequest requestBatch1 = MessageBatchGetRequest.batch(subjectAll, 3);
+ MessageBatchGetRequest requestBatch1A = MessageBatchGetRequest.batch(subjectA, 3);
+ MessageBatchGetRequest requestBatch2 = MessageBatchGetRequest.batch(subjectAll, 3, 4);
+ MessageBatchGetRequest requestBatch2A = MessageBatchGetRequest.batch(subjectA, 3, 4);
+ MessageBatchGetRequest requestBatch3 = MessageBatchGetRequest.batch(subjectAll, 3, time);
+ MessageBatchGetRequest requestBatch3A = MessageBatchGetRequest.batch(subjectA, 3, time);
+ MessageBatchGetRequest requestBatch4 = MessageBatchGetRequest.batchBytes(subjectAll, 3, 2002);
+ MessageBatchGetRequest requestBatch4A = MessageBatchGetRequest.batchBytes(subjectA, 3, 2002);
+ MessageBatchGetRequest requestBatch5 = MessageBatchGetRequest.batchBytes(subjectAll, 3, 2002, 4);
+ MessageBatchGetRequest requestBatch5A = MessageBatchGetRequest.batchBytes(subjectA, 3, 2002, 4);
+ MessageBatchGetRequest requestBatch6 = MessageBatchGetRequest.batchBytes(subjectAll, 3, 2002, time);
+ MessageBatchGetRequest requestBatch6A = MessageBatchGetRequest.batchBytes(subjectA, 3, 2002, time);
+
+ // 1/1A just subjects
+ // 2/2A subjects with up_to_seq
+ // 3/3A subjects with up_to_time
+ List subjectAllList = Collections.singletonList(subjectAll);
+ List subjectsList = Arrays.asList(subjectA, subjectC, subjectE);
+ MessageBatchGetRequest requestMulti1 = MessageBatchGetRequest.multiLastForSubjects(subjectsList);
+ MessageBatchGetRequest requestMulti1A = MessageBatchGetRequest.multiLastForSubjects(subjectAllList);
+ MessageBatchGetRequest requestMulti2 = MessageBatchGetRequest.multiLastForSubjects(subjectsList, 22);
+ MessageBatchGetRequest requestMulti2A = MessageBatchGetRequest.multiLastForSubjects(subjectAllList, 22);
+ MessageBatchGetRequest requestMulti3 = MessageBatchGetRequest.multiLastForSubjects(subjectsList, time);
+ MessageBatchGetRequest requestMulti3A = MessageBatchGetRequest.multiLastForSubjects(subjectAllList, time);
+ MessageBatchGetRequest requestMulti4 = MessageBatchGetRequest.multiLastForSubjectsBatch(subjectsList, 2);
+ MessageBatchGetRequest requestMulti4A = MessageBatchGetRequest.multiLastForSubjectsBatch(subjectAllList, 2);
+ MessageBatchGetRequest requestMulti5 = MessageBatchGetRequest.multiLastForSubjectsBatch(subjectsList, 22, 2);
+ MessageBatchGetRequest requestMulti5A = MessageBatchGetRequest.multiLastForSubjectsBatch(subjectAllList, 22, 2);
+ MessageBatchGetRequest requestMulti6 = MessageBatchGetRequest.multiLastForSubjectsBatch(subjectsList, time, 2);
+ MessageBatchGetRequest requestMulti6A = MessageBatchGetRequest.multiLastForSubjectsBatch(subjectAllList, time, 2);
+
+ // Get using handler.
+ doHandler(jsm, stream, requestBatch1, "1");
+ doHandler(jsm, stream, requestBatch1A, "1A");
+ doHandler(jsm, stream, requestBatch2, "2");
+ doHandler(jsm, stream, requestBatch2A, "2A");
+ doHandler(jsm, stream, requestBatch3, "3");
+ doHandler(jsm, stream, requestBatch3A, "3A");
+ doHandler(jsm, stream, requestBatch4, "4");
+ doHandler(jsm, stream, requestBatch4A, "4A");
+ doHandler(jsm, stream, requestBatch5, "5");
+ doHandler(jsm, stream, requestBatch5A, "5A");
+ doHandler(jsm, stream, requestBatch6, "6");
+ doHandler(jsm, stream, requestBatch6A, "6A");
+ doHandler(jsm, stream, requestMulti1, "M1");
+ doHandler(jsm, stream, requestMulti1A, "M1A");
+ doHandler(jsm, stream, requestMulti2, "M2");
+ doHandler(jsm, stream, requestMulti2A, "M2A");
+ doHandler(jsm, stream, requestMulti3, "M3");
+ doHandler(jsm, stream, requestMulti3A, "M3A");
+ doHandler(jsm, stream, requestMulti4, "M4");
+ doHandler(jsm, stream, requestMulti4A, "M4A");
+ doHandler(jsm, stream, requestMulti5, "M5");
+ doHandler(jsm, stream, requestMulti5A, "M5A");
+ doHandler(jsm, stream, requestMulti6, "M6");
+ doHandler(jsm, stream, requestMulti6A, "M6A");
+
+ doFetch(jsm, stream, requestBatch1, "1");
+ doFetch(jsm, stream, requestBatch1A, "1A");
+ doFetch(jsm, stream, requestBatch2, "2");
+ doFetch(jsm, stream, requestBatch2A, "2A");
+ doFetch(jsm, stream, requestBatch3, "3");
+ doFetch(jsm, stream, requestBatch3A, "3A");
+ doFetch(jsm, stream, requestBatch4, "4");
+ doFetch(jsm, stream, requestBatch4A, "4A");
+ doFetch(jsm, stream, requestBatch5, "5");
+ doFetch(jsm, stream, requestBatch5A, "5A");
+ doFetch(jsm, stream, requestBatch6, "6");
+ doFetch(jsm, stream, requestBatch6A, "6A");
+ doFetch(jsm, stream, requestMulti1, "M1");
+ doFetch(jsm, stream, requestMulti1A, "M1A");
+ doFetch(jsm, stream, requestMulti2, "M2");
+ doFetch(jsm, stream, requestMulti2A, "M2A");
+ doFetch(jsm, stream, requestMulti3, "M3");
+ doFetch(jsm, stream, requestMulti3A, "M3A");
+ doFetch(jsm, stream, requestMulti4, "M4");
+ doFetch(jsm, stream, requestMulti4A, "M4A");
+ doFetch(jsm, stream, requestMulti5, "M5");
+ doFetch(jsm, stream, requestMulti5A, "M5A");
+ doFetch(jsm, stream, requestMulti6, "M6");
+ doFetch(jsm, stream, requestMulti6A, "M6A");
+
+ // Get using queue.
+ doQueue(jsm, stream, requestBatch1, "1");
+ doQueue(jsm, stream, requestBatch1A, "1A");
+ doQueue(jsm, stream, requestBatch2, "2");
+ doQueue(jsm, stream, requestBatch2A, "2A");
+ doQueue(jsm, stream, requestBatch3, "3");
+ doQueue(jsm, stream, requestBatch3A, "3A");
+ doQueue(jsm, stream, requestBatch4, "4");
+ doQueue(jsm, stream, requestBatch4A, "4A");
+ doQueue(jsm, stream, requestBatch5, "5");
+ doQueue(jsm, stream, requestBatch5A, "5A");
+ doQueue(jsm, stream, requestBatch6, "6");
+ doQueue(jsm, stream, requestBatch6A, "6A");
+ doQueue(jsm, stream, requestMulti1, "M1");
+ doQueue(jsm, stream, requestMulti1A, "M1A");
+ doQueue(jsm, stream, requestMulti2, "M2");
+ doQueue(jsm, stream, requestMulti2A, "M2A");
+ doQueue(jsm, stream, requestMulti3, "M3");
+ doQueue(jsm, stream, requestMulti3A, "M3A");
+ doQueue(jsm, stream, requestMulti4, "M4");
+ doQueue(jsm, stream, requestMulti4A, "M4A");
+ doQueue(jsm, stream, requestMulti5, "M5");
+ doQueue(jsm, stream, requestMulti5A, "M5A");
+ doQueue(jsm, stream, requestMulti6, "M6");
+ doQueue(jsm, stream, requestMulti6A, "M6A");
+ });
+ }
+
+ private static String miString(MessageInfo mi) {
+ StringBuilder sb = JsonUtils.beginJson();
+ if (mi.isStatus()) {
+ JsonUtils.addField(sb, "status_code", mi.getStatus().getCode());
+ JsonUtils.addField(sb, "status_message", mi.getStatus().getMessage());
+ }
+ else if (mi.hasError()) {
+ JsonUtils.addField(sb, ERROR, mi.getError());
+ }
+ else {
+ JsonUtils.addField(sb, SEQ, mi.getSeq());
+ JsonUtils.addField(sb, LAST_SEQ, mi.getLastSeq());
+ JsonUtils.addFieldWhenGteMinusOne(sb, NUM_PENDING, mi.getNumPending());
+ JsonUtils.addField(sb, SUBJECT, mi.getSubject());
+ JsonUtils.addField(sb, TIME, mi.getTime());
+ }
+ return JsonUtils.endJson(sb).toString();
+ }
+
+// private static void debug(List list, MessageBatchGetRequest mbgr, String label) {
+// System.out.println(label + " | " + mbgr);
+// for (MessageInfo mi : list) {
+// System.out.println(miString(mi));
+// }
+// System.out.println();
+// }
+
+ private static void doHandler(JetStreamManagement jsm, String stream, MessageBatchGetRequest mbgr, String label) throws Exception {
+ List list = new ArrayList<>();
+ jsm.requestMessageBatch(stream, mbgr, list::add);
+ _verify(list, label, true);
+ }
+
+ private static void doFetch(JetStreamManagement jsm, String stream, MessageBatchGetRequest mbgr, String label) throws Exception {
+ List list = jsm.fetchMessageBatch(stream, mbgr);
+ _verify(list, label, false);
+ }
+
+ private static void doQueue(JetStreamManagement jsm, String stream, MessageBatchGetRequest mbgr, String label) throws Exception {
+ LinkedBlockingQueue queue = jsm.queueMessageBatch(stream, mbgr);
+ _verify(queueToList(queue), label, true);
+ }
+
+ @SuppressWarnings("DuplicateBranchesInSwitch")
+ private static void _verify(List list, String label, boolean lastIsEob) {
+ switch (label) {
+ case "1" : _verify(list, 1, 23, lastIsEob, 1, 2, 3); break;
+ case "1A" : _verify(list, 1, 3, lastIsEob, 1, 6, 11); break;
+ case "2" : _verify(list, 4, 20, lastIsEob, 4, 5, 6); break;
+ case "2A" : _verify(list, 6, 2, lastIsEob, 6, 11, 16); break;
+ case "3" : _verify(list, 6, 18, lastIsEob, 6, 7, 8); break;
+ case "3A" : _verify(list, 6, 2, lastIsEob, 6, 11, 16); break;
+ case "4" : _verify(list, 1, 23, lastIsEob, 1, 2); break;
+ case "4A" : _verify(list, 1, 3, lastIsEob, 1, 6); break;
+ case "5" : _verify(list, 4, 20, lastIsEob, 4, 5); break;
+ case "5A" : _verify(list, 6, 2, lastIsEob, 6, 11); break;
+ case "6" : _verify(list, 6, 18, lastIsEob, 6, 7); break;
+ case "6A" : _verify(list, 6, 2, lastIsEob, 6, 11); break;
+ case "M1" : _verify(list, 21, 0, lastIsEob, 21, 23, 25); break;
+ case "M1A" : _verify(list, 21, 2, lastIsEob, 21, 22, 23, 24, 25); break;
+ case "M2" : _verify(list, 18, 0, lastIsEob, 18, 20, 21); break;
+ case "M2A" : _verify(list, 18, 2, lastIsEob, 18, 19, 20, 21, 22); break;
+ case "M3" : _verify(list, 1, 0, lastIsEob, 1, 3, 5); break;
+ case "M3A" : _verify(list, 1, 2, lastIsEob, 1, 2, 3, 4, 5); break;
+ case "M4" : _verify(list, 21, 0, lastIsEob, 21, 23); break;
+ case "M4A" : _verify(list, 21, 2, lastIsEob, 21, 22); break;
+ case "M5" : _verify(list, 18, 0, lastIsEob, 18, 20); break;
+ case "M5A" : _verify(list, 18, 2, lastIsEob, 18, 19); break;
+ case "M6" : _verify(list, 1, 0, lastIsEob, 1, 3); break;
+ case "M6A" : _verify(list, 1, 2, lastIsEob, 1, 2); break;
+ }
+ }
+
+ private static void _verify(List list, long lastSeq1, long pending1, boolean lastIsEob, long... expected) {
+ assertEquals(lastIsEob ? expected.length + 1 : expected.length, list.size());
+ for (int x = 0; x < expected.length; x++) {
+ MessageInfo mi = list.get(x);
+ if (x == 1) {
+ assertEquals(pending1, mi.getNumPending());
+ assertEquals(lastSeq1, mi.getLastSeq());
+ }
+ assertEquals(expected[x], mi.getSeq());
+ verifyMessage(mi);
+ }
+ if (lastIsEob) {
+ verifyEob(list);
+ }
+ }
+
+ private static void verifyMessage(MessageInfo mi) {
+ assertTrue(mi.isMessage());
+ assertFalse(mi.isStatus());
+ assertFalse(mi.isEobStatus());
+ assertFalse(mi.isErrorStatus());
+ }
+
+ private static void verifyEob(List list) {
+ MessageInfo mi = list.get(list.size() - 1);
+ assertFalse(mi.isMessage());
+ assertTrue(mi.isStatus());
+ assertTrue(mi.isEobStatus());
+ assertFalse(mi.isErrorStatus());
+ }
+
+ private static List queueToList(LinkedBlockingQueue queue) throws InterruptedException {
+ List list = new ArrayList<>();
+ while (true) {
+ MessageInfo mi = queue.take();
+ list.add(mi);
+ if (!mi.isMessage()) {
+ break;
+ }
+ }
+ return list;
+ }
}
diff --git a/src/test/java/io/nats/client/impl/JetStreamPullTests.java b/src/test/java/io/nats/client/impl/JetStreamPullTests.java
index 3a29ca946..d07d2c43d 100644
--- a/src/test/java/io/nats/client/impl/JetStreamPullTests.java
+++ b/src/test/java/io/nats/client/impl/JetStreamPullTests.java
@@ -16,6 +16,7 @@
import io.nats.client.*;
import io.nats.client.api.AckPolicy;
import io.nats.client.api.ConsumerConfiguration;
+import io.nats.client.api.PriorityPolicy;
import io.nats.client.support.JsonUtils;
import io.nats.client.support.Status;
import io.nats.client.utils.TestBase;
@@ -724,6 +725,9 @@ public void testPullRequestOptionsBuilder() {
assertNull(pro.getExpiresIn());
assertNull(pro.getIdleHeartbeat());
assertFalse(pro.isNoWait());
+ assertNull(pro.getGroup());
+ assertEquals(-1, pro.getMinPending());
+ assertEquals(-1, pro.getMinAckPending());
pro = PullRequestOptions.noWait(21).build();
assertEquals(21, pro.getBatchSize());
@@ -754,6 +758,15 @@ public void testPullRequestOptionsBuilder() {
assertEquals(43, pro.getExpiresIn().toMillis());
assertEquals(21, pro.getIdleHeartbeat().toMillis());
assertFalse(pro.isNoWait());
+
+ pro = PullRequestOptions.builder(41)
+ .group("g")
+ .minPending(1)
+ .minAckPending(2)
+ .build();
+ assertEquals("g", pro.getGroup());
+ assertEquals(1, pro.getMinPending());
+ assertEquals(2, pro.getMinAckPending());
}
interface ConflictSetup {
@@ -1161,4 +1174,130 @@ private static Thread getReaderThread(AtomicInteger count, int stopCount, JetStr
readerThread.start();
return readerThread;
}
+
+ @Test
+ public void testOverflow() throws Exception {
+ ListenerForTesting l = new ListenerForTesting();
+ Options.Builder b = Options.builder().errorListener(l);
+ jsServer.run(b, TestBase::atLeast2_11, nc -> {
+ JetStreamManagement jsm = nc.jetStreamManagement();
+ TestingStreamContainer tsc = new TestingStreamContainer(jsm);
+ JetStream js = nc.jetStream();
+ jsPublish(js, tsc.subject(), 100);
+
+ // Setting PriorityPolicy requires at least one PriorityGroup to be set
+ ConsumerConfiguration ccNoGroup = ConsumerConfiguration.builder()
+ .priorityPolicy(PriorityPolicy.Overflow)
+ .build();
+ JetStreamApiException jsae = assertThrows(JetStreamApiException.class,
+ () -> jsm.addOrUpdateConsumer(tsc.stream, ccNoGroup));
+ assertEquals(10159, jsae.getApiErrorCode());
+
+ // Testing errors
+ String group = variant();
+ String consumer = variant();
+
+ ConsumerConfiguration cc = ConsumerConfiguration.builder()
+ .name(consumer)
+ .priorityPolicy(PriorityPolicy.Overflow)
+ .priorityGroups(group)
+ .filterSubjects(tsc.subject()).build();
+ jsm.addOrUpdateConsumer(tsc.stream, cc);
+
+ PullSubscribeOptions so = PullSubscribeOptions.fastBind(tsc.stream, consumer);
+ JetStreamSubscription sub = js.subscribe(null, so);
+
+ // 400 Bad Request - Priority Group missing
+ sub.pull(1);
+ assertThrows(JetStreamStatusException.class, () -> sub.nextMessage(1000));
+
+ // 400 Bad Request - Invalid Priority Group
+ sub.pull(PullRequestOptions.builder(5).group("bogus").build());
+ assertThrows(JetStreamStatusException.class, () -> sub.nextMessage(1000));
+
+ // Testing min ack pending
+ group = variant();
+ consumer = variant();
+
+ cc = ConsumerConfiguration.builder()
+ .name(consumer)
+ .priorityPolicy(PriorityPolicy.Overflow)
+ .priorityGroups(group)
+ .ackWait(60_000)
+ .filterSubjects(tsc.subject()).build();
+ jsm.addOrUpdateConsumer(tsc.stream, cc);
+
+ so = PullSubscribeOptions.fastBind(tsc.stream, consumer);
+ JetStreamSubscription subPrime = js.subscribe(null, so);
+ JetStreamSubscription subOver = js.subscribe(null, so);
+
+ PullRequestOptions proNoMin = PullRequestOptions.builder(5)
+ .group(group)
+ .build();
+
+ PullRequestOptions proOverA = PullRequestOptions.builder(5)
+ .group(group)
+ .minAckPending(5)
+ .build();
+
+ PullRequestOptions proOverB = PullRequestOptions.builder(5)
+ .group(group)
+ .minAckPending(10)
+ .build();
+
+ _overflowCheck(subPrime, proNoMin, true, 5);
+ _overflowCheck(subOver, proNoMin, true, 5);
+
+ _overflowCheck(subPrime, proNoMin, false, 5);
+ _overflowCheck(subOver, proOverA, true, 5);
+ _overflowCheck(subOver, proOverB, true, 0);
+
+ // Testing min pending
+ group = variant();
+ consumer = variant();
+
+ cc = ConsumerConfiguration.builder()
+ .name(consumer)
+ .priorityPolicy(PriorityPolicy.Overflow)
+ .priorityGroups(group)
+ .filterSubjects(tsc.subject()).build();
+ jsm.addOrUpdateConsumer(tsc.stream, cc);
+
+ so = PullSubscribeOptions.fastBind(tsc.stream, consumer);
+ subPrime = js.subscribe(null, so);
+ subOver = js.subscribe(null, so);
+
+ proNoMin = PullRequestOptions.builder(5)
+ .group(group)
+ .build();
+
+ proOverA = PullRequestOptions.builder(5)
+ .group(group)
+ .minPending(78)
+ .build();
+
+ _overflowCheck(subPrime, proNoMin, true, 5);
+ _overflowCheck(subOver, proNoMin, true, 5);
+ _overflowCheck(subOver, proOverA, true, 5);
+ _overflowCheck(subOver, proOverA, true, 5);
+ // exactly 80 messages now pending, gt or eq to pull min pending for 3 (80, 79, 78)
+ _overflowCheck(subOver, proOverA, true, 3);
+ // exactly 77 messages now pending lt pull min pending
+ _overflowCheck(subOver, proOverA, true, 0);
+ });
+ }
+
+ private static void _overflowCheck(JetStreamSubscription sub, PullRequestOptions pro, boolean ack, int expected) throws InterruptedException, JetStreamApiException, IOException {
+ sub.pull(pro);
+ int count = 0;
+ Message m = sub.nextMessage(1000);
+ while (m != null) {
+ count++;
+ if (ack) {
+ m.ack();
+ }
+ m = sub.nextMessage(100);
+ }
+ assertEquals(expected, count);
+ }
}
diff --git a/src/test/java/io/nats/client/impl/SimplificationTests.java b/src/test/java/io/nats/client/impl/SimplificationTests.java
index 3bd146542..94ce3fb89 100644
--- a/src/test/java/io/nats/client/impl/SimplificationTests.java
+++ b/src/test/java/io/nats/client/impl/SimplificationTests.java
@@ -521,6 +521,16 @@ public void testFetchConsumeOptionsBuilder() throws IOException, ClassNotFoundEx
fco = FetchConsumeOptions.builder().max(1000, 100).thresholdPercent(50).build();
check_values(fco, 100, 1000, 50);
check_values(roundTripSerialize(fco), 100, 1000, 50);
+
+ fco = FetchConsumeOptions.builder().group("g").minPending(1).minAckPending(2).build();
+ assertEquals("g", fco.getGroup());
+ assertEquals(1, fco.getMinPending());
+ assertEquals(2, fco.getMinAckPending());
+
+ fco = roundTripSerialize(fco);
+ assertEquals("g", fco.getGroup());
+ assertEquals(1, fco.getMinPending());
+ assertEquals(2, fco.getMinAckPending());
}
private static void check_default_values(FetchConsumeOptions fco) {
@@ -541,6 +551,9 @@ private static void _check_values(FetchConsumeOptions fco, int maxMessages, int
assertEquals(maxMessages, fco.getMaxMessages());
assertEquals(maxBytes, fco.getMaxBytes());
assertEquals(thresholdPercent, fco.getThresholdPercent());
+ assertNull(fco.getGroup());
+ assertEquals(-1, fco.getMinPending());
+ assertEquals(-1, fco.getMinAckPending());
}
private static FetchConsumeOptions roundTripSerialize(FetchConsumeOptions fco) throws IOException, ClassNotFoundException {
@@ -596,6 +609,16 @@ public void testConsumeOptionsBuilder() throws IOException, ClassNotFoundExcepti
assertThrows(IllegalArgumentException.class,
() -> ConsumeOptions.builder().expiresIn(MIN_EXPIRES_MILLS - 1).build());
+
+ co = ConsumeOptions.builder().group("g").minPending(1).minAckPending(2).build();
+ assertEquals("g", co.getGroup());
+ assertEquals(1, co.getMinPending());
+ assertEquals(2, co.getMinAckPending());
+
+ co = roundTripSerialize(co);
+ assertEquals("g", co.getGroup());
+ assertEquals(1, co.getMinPending());
+ assertEquals(2, co.getMinAckPending());
}
private static void check_default_values(ConsumeOptions co) throws IOException, ClassNotFoundException {
@@ -610,6 +633,9 @@ private static void check_values(ConsumeOptions co, int batchSize, int batchByte
assertEquals(batchSize, co.getBatchSize());
assertEquals(batchBytes, co.getBatchBytes());
assertEquals(thresholdPercent, co.getThresholdPercent());
+ assertNull(co.getGroup());
+ assertEquals(-1, co.getMinPending());
+ assertEquals(-1, co.getMinAckPending());
}
private static ConsumeOptions roundTripSerialize(ConsumeOptions co) throws IOException, ClassNotFoundException {
@@ -1103,4 +1129,224 @@ private static Object roundTripSerialize(Serializable s) throws IOException, Cla
return new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray())).readObject();
}
}
+
+ @Test
+ public void testOverflowFetch() throws Exception {
+ ListenerForTesting l = new ListenerForTesting();
+ Options.Builder b = Options.builder().errorListener(l);
+ jsServer.run(b, TestBase::atLeast2_11, nc -> {
+ JetStreamManagement jsm = nc.jetStreamManagement();
+ TestingStreamContainer tsc = new TestingStreamContainer(jsm);
+ JetStream js = nc.jetStream();
+ jsPublish(js, tsc.subject(), 100);
+
+ // Testing min ack pending
+ String group = variant();
+ String consumer = variant();
+
+ ConsumerConfiguration cc = ConsumerConfiguration.builder()
+ .name(consumer)
+ .priorityPolicy(PriorityPolicy.Overflow)
+ .priorityGroups(group)
+ .ackWait(60_000)
+ .filterSubjects(tsc.subject()).build();
+ jsm.addOrUpdateConsumer(tsc.stream, cc);
+
+ ConsumerContext ctxPrime = nc.getConsumerContext(tsc.stream, consumer);
+ ConsumerContext ctxOver = nc.getConsumerContext(tsc.stream, consumer);
+
+ FetchConsumeOptions fcoNoMin = FetchConsumeOptions.builder()
+ .maxMessages(5)
+ .expiresIn(2000)
+ .group(group)
+ .build();
+
+ FetchConsumeOptions fcoOverA = FetchConsumeOptions.builder()
+ .maxMessages(5)
+ .expiresIn(2000)
+ .group(group)
+ .minAckPending(5)
+ .build();
+
+ FetchConsumeOptions fcoOverB = FetchConsumeOptions.builder()
+ .maxMessages(5)
+ .expiresIn(2000)
+ .group(group)
+ .minAckPending(6)
+ .build();
+
+ _overflowFetch(ctxPrime, fcoNoMin, true, 5);
+ _overflowFetch(ctxOver, fcoNoMin, true, 5);
+
+ _overflowFetch(ctxPrime, fcoNoMin, false, 5);
+ _overflowFetch(ctxOver, fcoOverA, true, 5);
+ _overflowFetch(ctxOver, fcoOverB, true, 0);
+ });
+ }
+
+ private static void _overflowFetch(ConsumerContext cctx, FetchConsumeOptions fco, boolean ack, int expected) throws Exception {
+ try (FetchConsumer fc = cctx.fetch(fco)) {
+ int count = 0;
+ Message m = fc.nextMessage();
+ while (m != null) {
+ count++;
+ if (ack) {
+ m.ack();
+ }
+ m = fc.nextMessage();
+ }
+ assertEquals(expected, count);
+ }
+ }
+
+ @Test
+ public void testOverflowIterate() throws Exception {
+ ListenerForTesting l = new ListenerForTesting();
+ Options.Builder b = Options.builder().errorListener(l);
+ runInJsServer(b, TestBase::atLeast2_11, nc -> {
+ JetStreamManagement jsm = nc.jetStreamManagement();
+ TestingStreamContainer tsc = new TestingStreamContainer(jsm);
+ JetStream js = nc.jetStream();
+ jsPublish(js, tsc.subject(), 100);
+
+ // Testing min ack pending
+ String group = variant();
+ String consumer = variant();
+
+ ConsumerConfiguration cc = ConsumerConfiguration.builder()
+ .name(consumer)
+ .priorityPolicy(PriorityPolicy.Overflow)
+ .priorityGroups(group)
+ .ackWait(30_000)
+ .filterSubjects(tsc.subject()).build();
+ jsm.addOrUpdateConsumer(tsc.stream, cc);
+
+ ConsumerContext ctxPrime = nc.getConsumerContext(tsc.stream, consumer);
+ ConsumerContext ctxOver = nc.getConsumerContext(tsc.stream, consumer);
+
+ ConsumeOptions coPrime = ConsumeOptions.builder()
+ .group(group)
+ .build();
+
+ ConsumeOptions coOver = ConsumeOptions.builder()
+ .group(group)
+ .minAckPending(101)
+ .build();
+
+ // start the overflow consumer
+ AtomicLong primeCount = new AtomicLong();
+ AtomicLong overCount = new AtomicLong();
+ AtomicLong left = new AtomicLong(100);
+
+ Thread tOver = new Thread(() -> {
+ try {
+ IterableConsumer ic = ctxOver.iterate(coOver);
+ while (left.get() > 0 && !Thread.currentThread().isInterrupted()) {
+ Message m = ic.nextMessage(100);
+ if (m != null) {
+ m.ack();
+ overCount.incrementAndGet();
+ left.decrementAndGet();
+ }
+ }
+ }
+ catch (InterruptedException ignore) {
+ }
+ catch (Exception e) {
+ fail(e);
+ }
+ });
+ tOver.start();
+
+ Thread tPrime = new Thread(() -> {
+ try {
+ IterableConsumer ic = ctxPrime.iterate(coPrime);
+ while (left.get() > 0 && !Thread.currentThread().isInterrupted()) {
+ Message m = ic.nextMessage(100);
+ if (m != null) {
+ m.ack();
+ primeCount.incrementAndGet();
+ left.decrementAndGet();
+ }
+ }
+ }
+ catch (InterruptedException ignore) {
+ }
+ catch (Exception e) {
+ fail(e);
+ }
+ });
+ tPrime.start();
+
+ tPrime.join();
+ tOver.join();
+ assertEquals(100, primeCount.get());
+ assertEquals(0, overCount.get());
+ });
+ }
+
+ @Test
+ public void testOverflowConsume() throws Exception {
+ ListenerForTesting l = new ListenerForTesting();
+ Options.Builder b = Options.builder().errorListener(l);
+ runInJsServer(b, TestBase::atLeast2_11, nc -> {
+ JetStreamManagement jsm = nc.jetStreamManagement();
+ TestingStreamContainer tsc = new TestingStreamContainer(jsm);
+ JetStream js = nc.jetStream();
+ jsPublish(js, tsc.subject(), 1000);
+
+ // Testing min ack pending
+ String group = variant();
+ String consumer = variant();
+
+ ConsumerConfiguration cc = ConsumerConfiguration.builder()
+ .name(consumer)
+ .priorityPolicy(PriorityPolicy.Overflow)
+ .priorityGroups(group)
+ .ackWait(30_000)
+ .filterSubjects(tsc.subject()).build();
+ jsm.addOrUpdateConsumer(tsc.stream, cc);
+
+ ConsumerContext ctxPrime = nc.getConsumerContext(tsc.stream, consumer);
+ ConsumerContext ctxOver = nc.getConsumerContext(tsc.stream, consumer);
+
+ ConsumeOptions coPrime = ConsumeOptions.builder()
+ .group(group)
+ .build();
+
+ ConsumeOptions coOver = ConsumeOptions.builder()
+ .group(group)
+ .minAckPending(1001)
+ .build();
+
+ // start the overflow consumer
+ AtomicLong primeCount = new AtomicLong();
+ AtomicLong overCount = new AtomicLong();
+ AtomicLong left = new AtomicLong(500);
+
+ MessageHandler overHandler = m -> {
+ m.ack();
+ overCount.incrementAndGet();
+ left.decrementAndGet();
+ };
+
+ MessageHandler primeHandler = m -> {
+ m.ack();
+ primeCount.incrementAndGet();
+ left.decrementAndGet();
+ };
+
+ MessageConsumer mcOver = ctxOver.consume(coOver, overHandler);
+ MessageConsumer mcPrime = ctxPrime.consume(coPrime, primeHandler);
+
+ while (left.get() > 0) {
+ sleep(100);
+ }
+ mcOver.stop();
+ mcPrime.stop();
+
+ assertTrue(primeCount.get() > 0);
+ assertEquals(0, overCount.get());
+ });
+ }
}
diff --git a/src/test/resources/data/ConsumerConfiguration.json b/src/test/resources/data/ConsumerConfiguration.json
index 5a8737392..90a9bb8d6 100644
--- a/src/test/resources/data/ConsumerConfiguration.json
+++ b/src/test/resources/data/ConsumerConfiguration.json
@@ -26,5 +26,7 @@
"backoff": [1000000000, 2000000000, 3000000000],
"num_replicas": 5,
"mem_storage": true,
- "metadata":{"meta-test-key":"meta-test-value"}
+ "metadata":{"meta-test-key":"meta-test-value"},
+ "priority_groups": ["pgroup1", "pgroup2"],
+ "priority_policy": "overflow"
}