diff --git a/README.md b/README.md
index 5fb7add1b..13f20dc22 100644
--- a/README.md
+++ b/README.md
@@ -4,11 +4,11 @@
### A [Java](http://java.com) client for the [NATS messaging system](https://nats.io).
-**Current Release**: 2.16.14 **Current Snapshot**: 2.16.15-SNAPSHOT
+**Current Release**: 2.16.14 **Current Snapshot**: 2.17.0-SNAPSHOT
[![License Apache 2](https://img.shields.io/badge/License-Apache2-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/io.nats/jnats/badge.svg)](https://maven-badges.herokuapp.com/maven-central/io.nats/jnats)
-[![Javadoc](http://javadoc.io/badge/io.nats/jnats.svg?branch=master)](http://javadoc.io/doc/io.nats/jnats?branch=master)
+[![javadoc](https://javadoc.io/badge2/io.nats/jnats/javadoc.svg)](https://javadoc.io/doc/io.nats/jnats)
[![Coverage Status](https://coveralls.io/repos/github/nats-io/nats.java/badge.svg?branch=main)](https://coveralls.io/github/nats-io/nats.java?branch=main)
[![Build Main Badge](https://github.com/nats-io/nats.java/actions/workflows/build-main.yml/badge.svg?event=push)](https://github.com/nats-io/nats.java/actions/workflows/build-main.yml)
[![Release Badge](https://github.com/nats-io/nats.java/actions/workflows/build-release.yml/badge.svg?event=release)](https://github.com/nats-io/nats.java/actions/workflows/build-release.yml)
@@ -51,7 +51,30 @@ Version 2.5.0 adds some back pressure to publish calls to alleviate issues when
Previous versions are still available in the repo.
-#### Version 2.16.14 Options properties improvements
+#### Version 2.17.0: Server 2.10 support. Subject and Queue Name Validation
+
+With the release of the 2.10 server, the client has been updated to support new features.
+The most important new feature is the ability to have multipler filter subjects for any single JetStream consumer.
+```java
+ConsumerConfiguration cc = ConsumerConfiguration.builder()
+ ...
+ .filterSubjects("subject1", "subject2")
+ .build();
+```
+
+For subjects, up until now, the client has been very strict when validating subject names for consumer subject filters and subscriptions.
+It only allowed printable ascii characters except for `*`, `>`, `.`, `\\` and `/`. This restriction has been changed to the following:
+* cannot contain spaces \r \n \t
+* cannot start or end with subject token delimiter .
+* cannot have empty segments
+
+**This means that UTF characters are now allowed in this client.**
+
+
+For queue names, there has been inconsistent validation, if any. Queue names now require the same validation as subjects.
+**Important** We realize this may affect existing applications, but need to require consistency across clients
+
+#### Version 2.16.14: Options properties improvements
In this release, support was added to
* support properties keys with or without the prefix 'io.nats.client.'
@@ -60,7 +83,7 @@ In this release, support was added to
For details on the other features, see the "Options" sections
-#### Version 2.16.12 Max Payload Check
+#### Version 2.16.12: Max Payload Check
As of version 2.16.12, there is no longer client side checking
1. that a message payload is less than the server configuration (Core and JetStream publishes)
@@ -71,7 +94,7 @@ Please see unit test for examples of this behavior.
and
`testMaxPayloadJs` in [JetStreamPubTests.cs](src/test/java/io/nats/client/impl/JetStreamPubTests.java)
-#### Version 2.16.8 Websocket Support
+#### Version 2.16.8: Websocket Support
As of version 2.16.8 Websocket (`ws` and `wss`) protocols are supported for connecting to the server.
For instance, your server bootstrap url might be `ws://my-nats-host:80` or `wss://my-nats-host:443`.
@@ -82,7 +105,7 @@ for more information.
If you use secure websockets (wss), your connection must be securely configured in the same way you would configure a `tls` connection.
-#### Version 2.16.0 Consumer Create
+#### Version 2.16.0: Consumer Create
This release by default will use a new JetStream consumer create API when interacting with nats-server version 2.9.0 or higher.
This changes the subjects used by the client to create consumers, which might in some cases require changes in access and import/export configuration.
@@ -277,6 +300,39 @@ Also, there is a detailed [OCSP Example](https://github.com/nats-io/java-nats-ex
The current version of this client supports subjects with ASCII printable characters and wildcards when subscribing.
+### JetStream Subscribe Subject
+
+With the introduction of simplification, the various original subscribe methods available will eventually be deprecated.
+But since they are available, we need to address the concept of the subscribe subject.
+
+All the subscribe methods take a "subject" as the first parameter. We call this the subscribe subject.
+The subject could be something like `my.subject` or `my.star.*` or `my.gt.>` or even `>`.
+This parameter is used and validated in different ways depending on the context of the call,
+including looking up the stream, if stream is not provided via subscribe options.
+
+The subscribe subject could be used to make a simple subscription. In this case it is required.
+An ephemeral consumer will be created for that subject, assuming that subject can be looked up in a stream.
+```java
+JetStream js = nc.jetStream();
+JetStreamSubscription sub = subscribe(subject)
+```
+
+If subscribe call has either a PushSubscribeOptions or PullSubscribeOptions that have a ConsumerConfiguration
+with one or more filter subjects, the subscribe subject is optional since we can use the first filter subject as
+the subscribe subject.
+
+```java
+PushSubscribeOptions pso = ConsumerConfiguration.builder().filterSubject("my.subject").buildPushSubscribeOptions();
+js.subscribe(null, pso);
+```
+
+The other time you can skip the subject parameter is when you "bind". Since the stream name and consumer name are
+part of the bind, the subject will be retrieved from the consumer looked-up via the bind stream and consumer name information.
+
+#### On-the-fly Subscribes
+
+On the fly subscribes rely on
+
### NKey-based Challenge Response Authentication
The NATS server is adding support for a challenge response authentication scheme based on [NKeys](https://github.com/nats-io/nkeys). Version 2.2.0 of
@@ -851,6 +907,7 @@ You can however set the deliver policy which will be used to start the subscript
| JsSubPushAsyncCantSetPending | SUB | 90021 | Pending limits must be set directly on the dispatcher. |
| JsConsumerCreate290NotAvailable | CON | 90301 | Name field not valid when v2.9.0 consumer create api is not available. |
| JsConsumerNameDurableMismatch | CON | 90302 | Name must match durable if both are supplied. |
+| JsMultipleFilterSubjects210NotAvailable | CON | 90303 | Multiple filter subjects not available until server version 2.10.0. |
| OsObjectNotFound | OS | 90201 | The object was not found. |
| OsObjectIsDeleted | OS | 90202 | The object is deleted. |
| OsObjectAlreadyExists | OS | 90203 | An object with that name already exists. |
diff --git a/build.gradle b/build.gradle
index 496e2bfe2..1baae35d0 100644
--- a/build.gradle
+++ b/build.gradle
@@ -13,7 +13,7 @@ plugins {
id 'signing'
}
-def jarVersion = "2.16.15"
+def jarVersion = "2.17.0"
def isRelease = System.getenv("BUILD_EVENT") == "release"
def bs = System.getenv("BRANCH_SNAPSHOT")
diff --git a/src/main/java/io/nats/client/JetStream.java b/src/main/java/io/nats/client/JetStream.java
index b26d40d5e..3d6e6535b 100644
--- a/src/main/java/io/nats/client/JetStream.java
+++ b/src/main/java/io/nats/client/JetStream.java
@@ -358,13 +358,13 @@ public interface JetStream {
*
See {@link io.nats.client.Connection#createDispatcher(MessageHandler) createDispatcher} for
* information about creating an asynchronous subscription with callbacks.
*
- * @param subject the subject to subscribe to
+ * @param subscribeSubject the subject to subscribe to
* @return The subscription
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
- JetStreamSubscription subscribe(String subject) throws IOException, JetStreamApiException;
+ JetStreamSubscription subscribe(String subscribeSubject) throws IOException, JetStreamApiException;
/**
* Create a synchronous subscription to the specified subject.
@@ -375,14 +375,15 @@ public interface JetStream {
*
See {@link io.nats.client.Connection#createDispatcher(MessageHandler) createDispatcher} for
* information about creating an asynchronous subscription with callbacks.
*
- * @param subject the subject to subscribe to
+ * @param subscribeSubject the subject to subscribe to.
+ * Can be null or empty when the options have a ConsumerConfiguration that supplies a filter subject.
* @param options optional subscription options
* @return The subscription
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
- JetStreamSubscription subscribe(String subject, PushSubscribeOptions options) throws IOException, JetStreamApiException;
+ JetStreamSubscription subscribe(String subscribeSubject, PushSubscribeOptions options) throws IOException, JetStreamApiException;
/**
* Create a synchronous subscription to the specified subject.
@@ -393,7 +394,8 @@ public interface JetStream {
*
See {@link io.nats.client.Connection#createDispatcher(MessageHandler) createDispatcher} for
* information about creating an asynchronous subscription with callbacks.
*
- * @param subject the subject to subscribe to
+ * @param subscribeSubject the subject to subscribe to
+ * Can be null or empty when the options have a ConsumerConfiguration that supplies a filter subject.
* @param queue the optional queue group to join
* @param options optional subscription options
* @return The subscription
@@ -401,14 +403,14 @@ public interface JetStream {
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
- JetStreamSubscription subscribe(String subject, String queue, PushSubscribeOptions options) throws IOException, JetStreamApiException;
+ JetStreamSubscription subscribe(String subscribeSubject, String queue, PushSubscribeOptions options) throws IOException, JetStreamApiException;
/**
* Create an asynchronous subscription to the specified subject under the control of the
* specified dispatcher. Since a MessageHandler is also required, the Dispatcher will
* not prevent duplicate subscriptions from being made.
*
- * @param subject The subject to subscribe to
+ * @param subscribeSubject The subject to subscribe to
* @param dispatcher The dispatcher to handle this subscription
* @param handler The target for the messages
* @param autoAck Whether to auto ack
@@ -417,14 +419,14 @@ public interface JetStream {
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
- JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck) throws IOException, JetStreamApiException;
+ JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck) throws IOException, JetStreamApiException;
/**
* Create an asynchronous subscription to the specified subject under the control of the
* specified dispatcher. Since a MessageHandler is also required, the Dispatcher will
* not prevent duplicate subscriptions from being made.
- *
- * @param subject The subject to subscribe to.
+ * @param subscribeSubject The subject to subscribe to.
+ * Can be null or empty when the options have a ConsumerConfiguration that supplies a filter subject.
* @param dispatcher The dispatcher to handle this subscription
* @param handler The target for the messages
* @param autoAck Whether to auto ack
@@ -434,14 +436,15 @@ public interface JetStream {
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
- JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException;
+ JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException;
/**
* Create an asynchronous subscription to the specified subject under the control of the
* specified dispatcher. Since a MessageHandler is also required, the Dispatcher will
* not prevent duplicate subscriptions from being made.
*
- * @param subject The subject to subscribe to.
+ * @param subscribeSubject The subject to subscribe to.
+ * Can be null or empty when the options have a ConsumerConfiguration that supplies a filter subject.
* @param queue the optional queue group to join
* @param dispatcher The dispatcher to handle this subscription
* @param handler The target for the messages
@@ -452,22 +455,24 @@ public interface JetStream {
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
- JetStreamSubscription subscribe(String subject, String queue, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException;
+ JetStreamSubscription subscribe(String subscribeSubject, String queue, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException;
/**
* Create a subscription to the specified subject in the mode of pull, with additional options
- * @param subject The subject to subscribe to
+ * @param subscribeSubject The subject to subscribe to
+ * Can be null or empty when the options have a ConsumerConfiguration that supplies a filter subject.
* @param options pull subscription options
* @return The subscription
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
- JetStreamSubscription subscribe(String subject, PullSubscribeOptions options) throws IOException, JetStreamApiException;
+ JetStreamSubscription subscribe(String subscribeSubject, PullSubscribeOptions options) throws IOException, JetStreamApiException;
/**
* Create an asynchronous subscription to the specified subject in the mode of pull, with additional options
- * @param subject The subject to subscribe to
+ * @param subscribeSubject The subject to subscribe to
+ * Can be null or empty when the options have a ConsumerConfiguration that supplies a filter subject.
* @param dispatcher The dispatcher to handle this subscription
* @param handler The target for the messages
* @param options pull subscription options
@@ -476,7 +481,7 @@ public interface JetStream {
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
- JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, PullSubscribeOptions options) throws IOException, JetStreamApiException;
+ JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispatcher, MessageHandler handler, PullSubscribeOptions options) throws IOException, JetStreamApiException;
/**
* Get a stream context for a specific named stream. Verifies that the stream exists.
diff --git a/src/main/java/io/nats/client/api/ConsumerConfiguration.java b/src/main/java/io/nats/client/api/ConsumerConfiguration.java
index 25f81e54a..8a16c76d1 100644
--- a/src/main/java/io/nats/client/api/ConsumerConfiguration.java
+++ b/src/main/java/io/nats/client/api/ConsumerConfiguration.java
@@ -66,7 +66,7 @@ public class ConsumerConfiguration implements JsonSerializable {
protected final String name;
protected final String deliverSubject;
protected final String deliverGroup;
- protected final String filterSubject;
+ protected final List filterSubjects;
protected final String sampleFrequency;
protected final ZonedDateTime startTime;
protected final Duration ackWait;
@@ -96,7 +96,7 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) {
this.name = cc.name;
this.deliverSubject = cc.deliverSubject;
this.deliverGroup = cc.deliverGroup;
- this.filterSubject = cc.filterSubject;
+ this.filterSubjects = cc.filterSubjects;
this.sampleFrequency = cc.sampleFrequency;
this.startTime = cc.startTime;
this.ackWait = cc.ackWait;
@@ -128,7 +128,13 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) {
name = readString(v, NAME);
deliverSubject = readString(v, DELIVER_SUBJECT);
deliverGroup = readString(v, DELIVER_GROUP);
- filterSubject = readString(v, FILTER_SUBJECT);
+ String tempFs = readString(v, FILTER_SUBJECT);
+ if (tempFs != null) {
+ filterSubjects = Collections.singletonList(tempFs);
+ }
+ else {
+ filterSubjects = readStringList(v, FILTER_SUBJECTS);
+ }
sampleFrequency = readString(v, SAMPLE_FREQ);
startTime = readDate(v, OPT_START_TIME);
@@ -166,7 +172,7 @@ protected ConsumerConfiguration(Builder b)
this.name = b.name;
this.startTime = b.startTime;
this.ackWait = b.ackWait;
- this.filterSubject = b.filterSubject;
+ this.filterSubjects = b.filterSubjects;
this.sampleFrequency = b.sampleFrequency;
this.deliverSubject = b.deliverSubject;
this.deliverGroup = b.deliverGroup;
@@ -210,7 +216,12 @@ public String toJson() {
JsonUtils.addFieldAsNanos(sb, ACK_WAIT, ackWait);
JsonUtils.addFieldWhenGtZero(sb, MAX_DELIVER, maxDeliver);
JsonUtils.addField(sb, MAX_ACK_PENDING, maxAckPending);
- JsonUtils.addField(sb, FILTER_SUBJECT, filterSubject);
+ if (filterSubjects.size() > 1) {
+ JsonUtils.addStrings(sb, FILTER_SUBJECTS, filterSubjects);
+ }
+ else if (filterSubjects.size() == 1) {
+ JsonUtils.addField(sb, FILTER_SUBJECT, filterSubjects.get(0));
+ }
JsonUtils.addField(sb, REPLAY_POLICY, GetOrDefault(replayPolicy).toString());
JsonUtils.addField(sb, SAMPLE_FREQ, sampleFrequency);
JsonUtils.addFieldWhenGtZero(sb, RATE_LIMIT_BPS, rateLimit);
@@ -318,11 +329,19 @@ public long getMaxDeliver() {
}
/**
- * Gets the max filter subject of this consumer configuration.
- * @return the filter subject.
+ * Gets the first filter subject of this consumer configuration. Will be null if there are none.
+ * @return the first filter subject.
*/
public String getFilterSubject() {
- return filterSubject;
+ return filterSubjects.isEmpty() ? null : filterSubjects.get(0);
+ }
+
+ /**
+ * Gets the filter subjects as a list. May be empty, but won't be null
+ * @return the list
+ */
+ public List getFilterSubjects() {
+ return filterSubjects;
}
/**
@@ -616,7 +635,7 @@ public static class Builder {
private String name;
private String deliverSubject;
private String deliverGroup;
- private String filterSubject;
+ private List filterSubjects = new ArrayList<>();
private String sampleFrequency;
private ZonedDateTime startTime;
@@ -654,7 +673,7 @@ public Builder(ConsumerConfiguration cc) {
this.name = cc.name;
this.deliverSubject = cc.deliverSubject;
this.deliverGroup = cc.deliverGroup;
- this.filterSubject = cc.filterSubject;
+ this.filterSubjects = new ArrayList<>(cc.filterSubjects);
this.sampleFrequency = cc.sampleFrequency;
this.startTime = cc.startTime;
@@ -833,7 +852,37 @@ public Builder maxDeliver(long maxDeliver) {
* @return Builder
*/
public Builder filterSubject(String filterSubject) {
- this.filterSubject = emptyAsNull(filterSubject);
+ this.filterSubjects.clear();
+ if (!nullOrEmpty(filterSubject)) {
+ this.filterSubjects.add(filterSubject);
+ }
+ return this;
+ }
+
+
+ /**
+ * Sets the filter subjects of the ConsumerConfiguration.
+ * @param filterSubjects the array of filter subjects
+ * @return Builder
+ */
+ public Builder filterSubjects(String... filterSubjects) {
+ return filterSubjects(Arrays.asList(filterSubjects));
+ }
+
+ /**
+ * Sets the filter subjects of the ConsumerConfiguration.
+ * @param filterSubjects the list of filter subjects
+ * @return Builder
+ */
+ public Builder filterSubjects(List filterSubjects) {
+ this.filterSubjects.clear();
+ if (filterSubjects != null) {
+ for (String fs : filterSubjects) {
+ if (!nullOrEmpty(fs)) {
+ this.filterSubjects.add(fs);
+ }
+ }
+ }
return this;
}
@@ -1218,7 +1267,7 @@ public String toString() {
", ackPolicy=" + ackPolicy +
", ackWait=" + ackWait +
", maxDeliver=" + maxDeliver +
- ", filterSubject='" + filterSubject + '\'' +
+ ", filterSubjects='" + String.join(",", filterSubjects) + '\'' +
", replayPolicy=" + replayPolicy +
", sampleFrequency='" + sampleFrequency + '\'' +
", rateLimit=" + rateLimit +
diff --git a/src/main/java/io/nats/client/api/OrderedConsumerConfiguration.java b/src/main/java/io/nats/client/api/OrderedConsumerConfiguration.java
index d877faba3..9cdc06567 100644
--- a/src/main/java/io/nats/client/api/OrderedConsumerConfiguration.java
+++ b/src/main/java/io/nats/client/api/OrderedConsumerConfiguration.java
@@ -14,14 +14,18 @@
package io.nats.client.api;
import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
-import static io.nats.client.support.Validator.emptyOrNullAs;
+import static io.nats.client.support.Validator.emptyAsNull;
public class OrderedConsumerConfiguration {
public static String DEFAULT_FILTER_SUBJECT = ">";
- private String filterSubject;
+ private final List filterSubjects;
private DeliverPolicy deliverPolicy;
private Long startSequence;
private ZonedDateTime startTime;
@@ -35,7 +39,8 @@ public class OrderedConsumerConfiguration {
*/
public OrderedConsumerConfiguration() {
startSequence = ConsumerConfiguration.LONG_UNSET;
- filterSubject = DEFAULT_FILTER_SUBJECT;
+ filterSubjects = new ArrayList<>();
+ filterSubjects.add(DEFAULT_FILTER_SUBJECT);
}
/**
@@ -44,7 +49,34 @@ public OrderedConsumerConfiguration() {
* @return Builder
*/
public OrderedConsumerConfiguration filterSubject(String filterSubject) {
- this.filterSubject = emptyOrNullAs(filterSubject, DEFAULT_FILTER_SUBJECT);
+ return filterSubjects(Collections.singletonList(filterSubject));
+ }
+
+ /**
+ * Sets the filter subjects of the OrderedConsumerConfiguration.
+ * @param filterSubject the filter subject
+ * @return Builder
+ */
+ public OrderedConsumerConfiguration filterSubjects(String... filterSubject) {
+ return filterSubjects(Arrays.asList(filterSubject));
+ }
+
+ /**
+ * Sets the filter subject of the OrderedConsumerConfiguration.
+ * @param filterSubjects one or more filter subjects
+ * @return Builder
+ */
+ public OrderedConsumerConfiguration filterSubjects(List filterSubjects) {
+ this.filterSubjects.clear();
+ for (String fs : filterSubjects) {
+ String fsean = emptyAsNull(fs);
+ if (fsean != null) {
+ this.filterSubjects.add(fsean);
+ }
+ }
+ if (this.filterSubjects.isEmpty()) {
+ this.filterSubjects.add(DEFAULT_FILTER_SUBJECT);
+ }
return this;
}
@@ -100,7 +132,11 @@ public OrderedConsumerConfiguration headersOnly(Boolean headersOnly) {
}
public String getFilterSubject() {
- return filterSubject;
+ return filterSubjects.get(0);
+ }
+
+ public List getFilterSubjects() {
+ return filterSubjects;
}
public DeliverPolicy getDeliverPolicy() {
diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java
index 8bb4a9599..8aaba3605 100644
--- a/src/main/java/io/nats/client/impl/NatsConnection.java
+++ b/src/main/java/io/nats/client/impl/NatsConnection.java
@@ -38,12 +38,10 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import static io.nats.client.support.NatsConstants.*;
import static io.nats.client.support.NatsRequestCompletableFuture.CancelAction;
-import static io.nats.client.support.Validator.validateNotNull;
+import static io.nats.client.support.Validator.*;
import static java.nio.charset.StandardCharsets.UTF_8;
class NatsConnection implements Connection {
@@ -854,18 +852,7 @@ private void checkIfNeedsHeaderSupport(Headers headers) {
*/
@Override
public Subscription subscribe(String subject) {
-
- if (subject == null || subject.length() == 0) {
- throw new IllegalArgumentException("Subject is required in subscribe");
- }
-
- Pattern pattern = Pattern.compile("\\s");
- Matcher matcher = pattern.matcher(subject);
-
- if (matcher.find()) {
- throw new IllegalArgumentException("Subject cannot contain whitespace");
- }
-
+ validateSubject(subject, true);
return createSubscription(subject, null, null, null);
}
@@ -874,28 +861,8 @@ public Subscription subscribe(String subject) {
*/
@Override
public Subscription subscribe(String subject, String queueName) {
-
- if (subject == null || subject.length() == 0) {
- throw new IllegalArgumentException("Subject is required in subscribe");
- }
-
- Pattern pattern = Pattern.compile("\\s");
- Matcher smatcher = pattern.matcher(subject);
-
- if (smatcher.find()) {
- throw new IllegalArgumentException("Subject cannot contain whitespace");
- }
-
- if (queueName == null || queueName.length() == 0) {
- throw new IllegalArgumentException("QueueName is required in subscribe");
- }
-
- Matcher qmatcher = pattern.matcher(queueName);
-
- if (qmatcher.find()) {
- throw new IllegalArgumentException("Queue names cannot contain whitespace");
- }
-
+ validateSubject(subject, true);
+ validateQueueName(queueName, true);
return createSubscription(subject, queueName, null, null);
}
diff --git a/src/main/java/io/nats/client/impl/NatsConsumerContext.java b/src/main/java/io/nats/client/impl/NatsConsumerContext.java
index 2e9f2dc9c..9d2000cde 100644
--- a/src/main/java/io/nats/client/impl/NatsConsumerContext.java
+++ b/src/main/java/io/nats/client/impl/NatsConsumerContext.java
@@ -61,7 +61,7 @@ public class NatsConsumerContext implements ConsumerContext, SimplifiedSubscript
ordered = true;
consumerName = null;
originalOrderedCc = ConsumerConfiguration.builder()
- .filterSubject(config.getFilterSubject())
+ .filterSubjects(config.getFilterSubjects())
.deliverPolicy(config.getDeliverPolicy())
.startSequence(config.getStartSequence())
.startTime(config.getStartTime())
diff --git a/src/main/java/io/nats/client/impl/NatsDispatcher.java b/src/main/java/io/nats/client/impl/NatsDispatcher.java
index fd6d5fa38..e2dffff38 100644
--- a/src/main/java/io/nats/client/impl/NatsDispatcher.java
+++ b/src/main/java/io/nats/client/impl/NatsDispatcher.java
@@ -23,6 +23,8 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
+import static io.nats.client.support.Validator.*;
+
class NatsDispatcher extends NatsConsumer implements Dispatcher, Runnable {
private MessageQueue incoming;
@@ -187,53 +189,32 @@ void remove(NatsSubscription sub) {
}
public Dispatcher subscribe(String subject) {
- if (subject == null || subject.length() == 0) {
- throw new IllegalArgumentException("Subject is required in subscribe");
- }
-
+ validateSubject(subject, true);
this.subscribeImplCore(subject, null, null);
return this;
}
- NatsSubscription subscribeReturningSubscription(String subject) {
- if (subject == null || subject.length() == 0) {
- throw new IllegalArgumentException("Subject is required in subscribe");
- }
+ NatsSubscription subscribeReturningSubscription(String subject) {
+ validateSubject(subject, true);
return this.subscribeImplCore(subject, null, null);
}
public Subscription subscribe(String subject, MessageHandler handler) {
- if (subject == null || subject.length() == 0) {
- throw new IllegalArgumentException("Subject is required in subscribe");
- }
-
- if (handler == null) {
- throw new IllegalArgumentException("MessageHandler is required in subscribe");
- }
+ validateSubject(subject, true);
+ required(handler, "Handler");
return this.subscribeImplCore(subject, null, handler);
}
public Dispatcher subscribe(String subject, String queueName) {
- if (subject == null || subject.length() == 0) {
- throw new IllegalArgumentException("Subject is required in subscribe");
- }
-
- if (queueName == null || queueName.length() == 0) {
- throw new IllegalArgumentException("QueueName is required in subscribe");
- }
+ validateSubject(subject, true);
+ validateQueueName(queueName, true);
this.subscribeImplCore(subject, queueName, null);
return this;
}
public Subscription subscribe(String subject, String queueName, MessageHandler handler) {
- if (subject == null || subject.length() == 0) {
- throw new IllegalArgumentException("Subject is required in subscribe");
- }
-
- if (queueName == null || queueName.length() == 0) {
- throw new IllegalArgumentException("QueueName is required in subscribe");
- }
-
+ validateSubject(subject, true);
+ validateQueueName(queueName, true);
if (handler == null) {
throw new IllegalArgumentException("MessageHandler is required in subscribe");
}
diff --git a/src/main/java/io/nats/client/impl/NatsJetStream.java b/src/main/java/io/nats/client/impl/NatsJetStream.java
index eaccd8e37..72d56b9cc 100644
--- a/src/main/java/io/nats/client/impl/NatsJetStream.java
+++ b/src/main/java/io/nats/client/impl/NatsJetStream.java
@@ -234,22 +234,27 @@ MessageManager createMessageManager(
MessageManagerFactory _pullOrderedMessageManagerFactory =
(mmConn, mmJs, mmStream, mmSo, mmCc, mmQueueMode, mmSyncMode) -> new OrderedPullMessageManager(mmConn, mmJs, mmStream, mmSo, mmCc, mmSyncMode);
- JetStreamSubscription createSubscription(String subject,
+ JetStreamSubscription createSubscription(String userSubscribeSubject,
+ PushSubscribeOptions pushSubscribeOptions,
+ PullSubscribeOptions pullSubscribeOptions,
String queueName,
NatsDispatcher dispatcher,
MessageHandler userHandler,
- boolean isAutoAck,
- PushSubscribeOptions pushSubscribeOptions,
- PullSubscribeOptions pullSubscribeOptions
+ boolean isAutoAck
) throws IOException, JetStreamApiException {
- // 1. Prepare for all the validation
+ // Parameter notes. For those relating to the callers, you can see all the callers further down in this source file.
+ // - pull subscribe callers guarantee that pullSubscribeOptions is not null
+ // - qgroup is always null with pull callers
+ // - callers only ever provide one of the subscribe options
+
+ // 1. Initial prep and validation
boolean isPullMode = pullSubscribeOptions != null;
SubscribeOptions so;
String stream;
- String qgroup;
ConsumerConfiguration userCC;
+ String deliverGroup = null; // push might set this
if (isPullMode) {
so = pullSubscribeOptions; // options must have already been checked to be non-null
@@ -257,13 +262,12 @@ JetStreamSubscription createSubscription(String subject,
userCC = so.getConsumerConfiguration();
- qgroup = null; // just to make compiler happy both paths set variable
validateNotSupplied(userCC.getDeliverGroup(), JsSubPullCantHaveDeliverGroup);
validateNotSupplied(userCC.getDeliverSubject(), JsSubPullCantHaveDeliverSubject);
}
else {
so = pushSubscribeOptions == null ? DEFAULT_PUSH_OPTS : pushSubscribeOptions;
- stream = so.getStream(); // might be null, that's ok (see directBind)
+ stream = so.getStream();
userCC = so.getConsumerConfiguration();
@@ -272,8 +276,8 @@ JetStreamSubscription createSubscription(String subject,
if (userCC.maxBytesWasSet()) { throw JsSubPushCantHaveMaxBytes.instance(); }
// figure out the queue name
- qgroup = validateMustMatchIfBothSupplied(userCC.getDeliverGroup(), queueName, JsSubQueueDeliverGroupMismatch);
- if (so.isOrdered() && qgroup != null) {
+ deliverGroup = validateMustMatchIfBothSupplied(userCC.getDeliverGroup(), queueName, JsSubQueueDeliverGroupMismatch);
+ if (so.isOrdered() && deliverGroup != null) {
throw JsSubOrderedNotAllowOnQueues.instance();
}
@@ -285,26 +289,47 @@ JetStreamSubscription createSubscription(String subject,
}
}
- // 2A. Flow Control / heartbeat not always valid
+ // 1B. Flow Control / heartbeat not always valid
if (userCC.getIdleHeartbeat() != null && userCC.getIdleHeartbeat().toMillis() > 0) {
if (isPullMode) {
throw JsSubFcHbNotValidPull.instance();
}
- if (qgroup != null) {
+ if (deliverGroup != null) {
throw JsSubFcHbNotValidQueue.instance();
}
}
- // 2B. Did they tell me what stream? No? look it up.
- final String fnlStream;
+ // 2. figure out user provided subjects and prepare the userCcFilterSubjects
+ userSubscribeSubject = emptyAsNull(userSubscribeSubject);
+ List userCcFilterSubjects = new ArrayList<>();
+ if (userCC.getFilterSubject() == null) { // empty filterSubjects gives null
+ // userCC.filterSubjects empty, populate userCcFilterSubjects w/userSubscribeSubject if possible
+ if (userSubscribeSubject != null) {
+ userCcFilterSubjects.add(userSubscribeSubject);
+ }
+ }
+ else {
+ // userCC.filterSubjects not empty, validate them
+ userCcFilterSubjects.addAll(userCC.getFilterSubjects());
+ // If userSubscribeSubject is provided it must be one of the filter subjects.
+ if (userSubscribeSubject != null && !userCcFilterSubjects.contains(userSubscribeSubject)) {
+ throw JsSubSubjectDoesNotMatchFilter.instance();
+ }
+ }
+
+ // 3. Did they tell me what stream? No? look it up.
+ final String settledStream;
if (stream == null) {
- fnlStream = lookupStreamBySubject(subject);
- if (fnlStream == null) {
+ if (userCcFilterSubjects.isEmpty()) {
+ throw new IllegalArgumentException("Subject needed to lookup stream. Provide either a subscribe subject or a ConsumerConfiguration filter subject.");
+ }
+ settledStream = lookupStreamBySubject(userCcFilterSubjects.get(0));
+ if (settledStream == null) {
throw JsSubNoMatchingStreamForSubject.instance();
}
}
else {
- fnlStream = stream;
+ settledStream = stream;
}
ConsumerConfiguration serverCC = null;
@@ -314,9 +339,10 @@ JetStreamSubscription createSubscription(String subject,
}
String inboxDeliver = userCC.getDeliverSubject();
- // 3. Does this consumer already exist?
+ // 4. Does this consumer already exist? FastBind bypasses the lookup;
+ // the dev better know what they are doing...
if (!so.isFastBind() && consumerName != null) {
- ConsumerInfo serverInfo = lookupConsumerInfo(fnlStream, consumerName);
+ ConsumerInfo serverInfo = lookupConsumerInfo(settledStream, consumerName);
if (serverInfo != null) { // the consumer for that durable already exists
serverCC = serverInfo.getConsumerConfiguration();
@@ -341,7 +367,7 @@ else if (nullOrEmpty(serverCC.getDeliverSubject())) {
if (serverCC.getDeliverGroup() == null) {
// lookedUp was null, means existing consumer is not a queue consumer
- if (qgroup == null) {
+ if (deliverGroup == null) {
// ok fine, no queue requested and the existing consumer is also not a queue consumer
// we must check if the consumer is in use though
if (serverInfo.isPushBound()) {
@@ -352,18 +378,21 @@ else if (nullOrEmpty(serverCC.getDeliverSubject())) {
throw JsSubExistingConsumerNotQueue.instance();
}
}
- else if (qgroup == null) {
+ else if (deliverGroup == null) {
throw JsSubExistingConsumerIsQueue.instance();
}
- else if (!serverCC.getDeliverGroup().equals(qgroup)) {
+ else if (!serverCC.getDeliverGroup().equals(deliverGroup)) {
throw JsSubExistingQueueDoesNotMatchRequestedQueue.instance();
}
- // durable already exists, make sure the filter subject matches
- if (nullOrEmpty(subject)) { // allowed if they had given both stream and durable
- subject = userCC.getFilterSubject();
+ // consumer already exists, make sure the filter subject matches
+ // subscribeSubject, if supplied came from the user directly
+ // or in the userCC or might not have been in either place
+ if (userCcFilterSubjects.isEmpty()) {
+ // still also might be null, which the server treats as >
+ userCcFilterSubjects = serverCC.getFilterSubjects();
}
- else if (!isFilterMatch(subject, serverCC.getFilterSubject(), fnlStream)) {
+ else if (!listsAreEquivalent(userCcFilterSubjects, serverCC.getFilterSubjects())) {
throw JsSubSubjectDoesNotMatchFilter.instance();
}
@@ -374,59 +403,60 @@ else if (so.isBind()) {
}
}
- // 4. If pull or no deliver subject (inbox) provided or found, make an inbox.
- final String fnlInboxDeliver;
+ // 5. If pull or no deliver subject (inbox) provided or found, make an inbox.
+ final String settledInboxDeliver;
if (isPullMode) {
- fnlInboxDeliver = conn.createInbox() + ".*";
+ settledInboxDeliver = conn.createInbox() + ".*";
}
else if (inboxDeliver == null) {
- fnlInboxDeliver = conn.createInbox();
+ settledInboxDeliver = conn.createInbox();
}
else {
- fnlInboxDeliver = inboxDeliver;
+ settledInboxDeliver = inboxDeliver;
}
- // 5. If consumer does not exist, create and settle on the config. Name will have to wait
+ // 6. If consumer does not exist, create and settle on the config. Name will have to wait
// If the consumer exists, I know what the settled info is
final String settledConsumerName;
- final ConsumerConfiguration settledServerCC;
+ final ConsumerConfiguration settledCC;
if (so.isFastBind() || serverCC != null) {
- settledServerCC = serverCC;
- settledConsumerName = so.getName();
+ settledCC = serverCC;
+ settledConsumerName = so.getName(); // will never be null in this case
}
else {
ConsumerConfiguration.Builder ccBuilder = ConsumerConfiguration.builder(userCC);
// Pull mode doesn't maintain a deliver subject. It's actually an error if we send it.
if (!isPullMode) {
- ccBuilder.deliverSubject(fnlInboxDeliver);
+ ccBuilder.deliverSubject(settledInboxDeliver);
}
- if (userCC.getFilterSubject() == null) {
- ccBuilder.filterSubject(subject);
- }
+ // userCC.filterSubjects might have originally been empty
+ // but there might have been a userSubscribeSubject,
+ // so this makes sure it's resolved either way
+ ccBuilder.filterSubjects(userCcFilterSubjects);
- ccBuilder.deliverGroup(qgroup);
+ ccBuilder.deliverGroup(deliverGroup);
- settledServerCC = ccBuilder.build();
- settledConsumerName = null;
+ settledCC = ccBuilder.build();
+ settledConsumerName = null; // the server will give us a name
}
- // 6. create the subscription. lambda needs final or effectively final vars
+ // 7. create the subscription. lambda needs final or effectively final vars
final MessageManager mm;
final NatsSubscriptionFactory subFactory;
if (isPullMode) {
MessageManagerFactory mmFactory = so.isOrdered() ? _pullOrderedMessageManagerFactory : _pullMessageManagerFactory;
- mm = mmFactory.createMessageManager(conn, this, fnlStream, so, settledServerCC, false, dispatcher == null);
+ mm = mmFactory.createMessageManager(conn, this, settledStream, so, settledCC, false, dispatcher == null);
subFactory = (sid, lSubject, lQgroup, lConn, lDispatcher)
- -> new NatsJetStreamPullSubscription(sid, lSubject, lConn, lDispatcher, this, fnlStream, settledConsumerName, mm);
+ -> new NatsJetStreamPullSubscription(sid, lSubject, lConn, lDispatcher, this, settledStream, settledConsumerName, mm);
}
else {
MessageManagerFactory mmFactory = so.isOrdered() ? _pushOrderedMessageManagerFactory : _pushMessageManagerFactory;
- mm = mmFactory.createMessageManager(conn, this, fnlStream, so, settledServerCC, false, dispatcher == null);
+ mm = mmFactory.createMessageManager(conn, this, settledStream, so, settledCC, false, dispatcher == null);
subFactory = (sid, lSubject, lQgroup, lConn, lDispatcher) -> {
NatsJetStreamSubscription nsub = new NatsJetStreamSubscription(sid, lSubject, lQgroup, lConn, lDispatcher,
- this, fnlStream, settledConsumerName, mm);
+ this, settledStream, settledConsumerName, mm);
if (lDispatcher == null) {
nsub.setPendingLimits(so.getPendingMessageLimit(), so.getPendingByteLimit());
}
@@ -435,16 +465,16 @@ else if (inboxDeliver == null) {
}
NatsJetStreamSubscription sub;
if (dispatcher == null) {
- sub = (NatsJetStreamSubscription) conn.createSubscription(fnlInboxDeliver, qgroup, null, subFactory);
+ sub = (NatsJetStreamSubscription) conn.createSubscription(settledInboxDeliver, deliverGroup, null, subFactory);
}
else {
- AsyncMessageHandler handler = new AsyncMessageHandler(mm, userHandler, isAutoAck, settledServerCC);
- sub = (NatsJetStreamSubscription) dispatcher.subscribeImplJetStream(fnlInboxDeliver, qgroup, handler, subFactory);
+ AsyncMessageHandler handler = new AsyncMessageHandler(mm, userHandler, isAutoAck, settledCC);
+ sub = (NatsJetStreamSubscription) dispatcher.subscribeImplJetStream(settledInboxDeliver, deliverGroup, handler, subFactory);
}
- // 7. The consumer might need to be created, do it here
+ // 8. The consumer might need to be created, do it here
if (settledConsumerName == null) {
- _createConsumerUnsubscribeOnException(fnlStream, settledServerCC, sub);
+ _createConsumerUnsubscribeOnException(settledStream, settledCC, sub);
}
return sub;
@@ -484,14 +514,14 @@ public List getChanges(ConsumerConfiguration serverCc) {
if (startTime != null && !startTime.equals(serverCcc.startTime)) { changes.add("startTime"); }
- if (filterSubject != null && !filterSubject.equals(serverCcc.filterSubject)) { changes.add("filterSubject"); }
+ if (!filterSubjects.isEmpty() && !listsAreEquivalent(filterSubjects, serverCcc.filterSubjects)) { changes.add("filterSubjects"); }
if (description != null && !description.equals(serverCcc.description)) { changes.add("description"); }
if (sampleFrequency != null && !sampleFrequency.equals(serverCcc.sampleFrequency)) { changes.add("sampleFrequency"); }
if (deliverSubject != null && !deliverSubject.equals(serverCcc.deliverSubject)) { changes.add("deliverSubject"); }
if (deliverGroup != null && !deliverGroup.equals(serverCcc.deliverGroup)) { changes.add("deliverGroup"); }
- if (backoff != null && !listsAreEqual(backoff, serverCcc.backoff, true)) { changes.add("backoff"); }
- if (metadata != null && !mapsAreEqual(metadata, serverCcc.metadata, true)) { changes.add("metadata"); }
+ if (backoff != null && !listsAreEquivalent(backoff, serverCcc.backoff)) { changes.add("backoff"); }
+ if (metadata != null && !mapsAreEquivalent(metadata, serverCcc.metadata)) { changes.add("metadata"); }
// do not need to check Durable because the original is retrieved by the durable name
@@ -521,24 +551,6 @@ public void onMessage(Message msg) throws InterruptedException {
}
}
- private boolean isFilterMatch(String subscribeSubject, String filterSubject, String stream) throws IOException, JetStreamApiException {
-
- // subscribeSubject guaranteed to not be null
- // filterSubject may be null or empty or have value
-
- if (subscribeSubject.equals(filterSubject)) {
- return true;
- }
-
- if (nullOrEmpty(filterSubject) || filterSubject.equals(">")) {
- // lookup stream subject returns null if there is not exactly one subject
- String streamSubject = lookupStreamSubject(stream);
- return subscribeSubject.equals(streamSubject);
- }
-
- return false;
- }
-
private String lookupStreamSubject(String stream) throws IOException, JetStreamApiException {
StreamInfo si = _getStreamInfo(stream, null);
List streamSubjects = si.getConfiguration().getSubjects();
@@ -549,88 +561,84 @@ private String lookupStreamSubject(String stream) throws IOException, JetStreamA
* {@inheritDoc}
*/
@Override
- public JetStreamSubscription subscribe(String subject) throws IOException, JetStreamApiException {
- validateSubject(subject, true);
- return createSubscription(subject, null, null, null, false, null, null);
+ public JetStreamSubscription subscribe(String subscribeSubject) throws IOException, JetStreamApiException {
+ subscribeSubject = validateSubject(subscribeSubject, true);
+ return createSubscription(subscribeSubject, null, null, null, null, null, false);
}
/**
* {@inheritDoc}
*/
@Override
- public JetStreamSubscription subscribe(String subject, PushSubscribeOptions options) throws IOException, JetStreamApiException {
- validateSubject(subject, isSubjectRequired(options));
- return createSubscription(subject, null, null, null, false, options, null);
+ public JetStreamSubscription subscribe(String subscribeSubject, PushSubscribeOptions options) throws IOException, JetStreamApiException {
+ subscribeSubject = validateSubject(subscribeSubject, false);
+ return createSubscription(subscribeSubject, options, null, null, null, null, false);
}
/**
* {@inheritDoc}
*/
@Override
- public JetStreamSubscription subscribe(String subject, String queue, PushSubscribeOptions options) throws IOException, JetStreamApiException {
- validateSubject(subject, isSubjectRequired(options));
- queue = emptyAsNull(validateQueueName(queue, false));
- return createSubscription(subject, queue, null, null, false, options, null);
+ public JetStreamSubscription subscribe(String subscribeSubject, String queue, PushSubscribeOptions options) throws IOException, JetStreamApiException {
+ subscribeSubject = validateSubject(subscribeSubject, false);
+ validateQueueName(queue, false);
+ return createSubscription(subscribeSubject, options, null, queue, null, null, false);
}
/**
* {@inheritDoc}
*/
@Override
- public JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck) throws IOException, JetStreamApiException {
- validateSubject(subject, true);
+ public JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck) throws IOException, JetStreamApiException {
+ subscribeSubject = validateSubject(subscribeSubject, false);
validateNotNull(dispatcher, "Dispatcher");
validateNotNull(handler, "Handler");
- return createSubscription(subject, null, (NatsDispatcher) dispatcher, handler, autoAck, null, null);
+ return createSubscription(subscribeSubject, null, null, null, (NatsDispatcher) dispatcher, handler, autoAck);
}
/**
* {@inheritDoc}
*/
@Override
- public JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException {
- validateSubject(subject, isSubjectRequired(options));
+ public JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException {
+ subscribeSubject = validateSubject(subscribeSubject, false);
validateNotNull(dispatcher, "Dispatcher");
validateNotNull(handler, "Handler");
- return createSubscription(subject, null, (NatsDispatcher) dispatcher, handler, autoAck, options, null);
+ return createSubscription(subscribeSubject, options, null, null, (NatsDispatcher) dispatcher, handler, autoAck);
}
/**
* {@inheritDoc}
*/
@Override
- public JetStreamSubscription subscribe(String subject, String queue, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException {
- validateSubject(subject, isSubjectRequired(options));
- queue = emptyAsNull(validateQueueName(queue, false));
+ public JetStreamSubscription subscribe(String subscribeSubject, String queue, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException {
+ subscribeSubject = validateSubject(subscribeSubject, false);
+ validateQueueName(queue, false);
validateNotNull(dispatcher, "Dispatcher");
validateNotNull(handler, "Handler");
- return createSubscription(subject, queue, (NatsDispatcher) dispatcher, handler, autoAck, options, null);
+ return createSubscription(subscribeSubject, options, null, queue, (NatsDispatcher) dispatcher, handler, autoAck);
}
/**
* {@inheritDoc}
*/
@Override
- public JetStreamSubscription subscribe(String subject, PullSubscribeOptions options) throws IOException, JetStreamApiException {
- validateSubject(subject, isSubjectRequired(options));
+ public JetStreamSubscription subscribe(String subscribeSubject, PullSubscribeOptions options) throws IOException, JetStreamApiException {
+ subscribeSubject = validateSubject(subscribeSubject, false);
validateNotNull(options, "Pull Subscribe Options");
- return createSubscription(subject, null, null, null, false, null, options);
+ return createSubscription(subscribeSubject, null, options, null, null, null, false);
}
/**
* {@inheritDoc}
*/
@Override
- public JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, PullSubscribeOptions options) throws IOException, JetStreamApiException {
- validateSubject(subject, isSubjectRequired(options));
+ public JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispatcher, MessageHandler handler, PullSubscribeOptions options) throws IOException, JetStreamApiException {
+ subscribeSubject = validateSubject(subscribeSubject, false);
validateNotNull(dispatcher, "Dispatcher");
validateNotNull(handler, "Handler");
validateNotNull(options, "Pull Subscribe Options");
- return createSubscription(subject, null, (NatsDispatcher) dispatcher, handler, false, null, options);
- }
-
- private boolean isSubjectRequired(SubscribeOptions options) {
- return options == null || !options.isBind();
+ return createSubscription(subscribeSubject, null, options, null, (NatsDispatcher) dispatcher, handler, false);
}
/**
diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java b/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java
index bf87f3342..64e011d82 100644
--- a/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java
+++ b/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java
@@ -26,6 +26,7 @@
import java.util.concurrent.ConcurrentHashMap;
import static io.nats.client.support.NatsJetStreamClientError.JsConsumerCreate290NotAvailable;
+import static io.nats.client.support.NatsJetStreamClientError.JsMultipleFilterSubjects210NotAvailable;
import static io.nats.client.support.NatsRequestCompletableFuture.CancelAction;
class NatsJetStreamImpl implements NatsJetStreamConstants {
@@ -44,6 +45,7 @@ public CachedStreamInfo(StreamInfo si) {
final NatsConnection conn;
final JetStreamOptions jso;
final boolean consumerCreate290Available;
+ final boolean multipleSubjectFilter210Available;
// ----------------------------------------------------------------------------------------------------
// Create / Init
@@ -59,6 +61,7 @@ public CachedStreamInfo(StreamInfo si) {
jso = JetStreamOptions.builder(jsOptions).requestTimeout(rt).build();
consumerCreate290Available = conn.getInfo().isSameOrNewerThanVersion("2.9.0") && !jso.isOptOut290ConsumerCreate();
+ multipleSubjectFilter210Available = conn.getInfo().isNewerVersionThan("2.9.99");
}
// ----------------------------------------------------------------------------------------------------
@@ -76,10 +79,16 @@ ConsumerInfo _createConsumer(String streamName, ConsumerConfiguration config) th
if (consumerName != null && !consumerCreate290Available) {
throw JsConsumerCreate290NotAvailable.instance();
}
- String durable = config.getDurable();
+ boolean multipleFilterSubject = config.getFilterSubjects().size() > 1;
+ if (multipleFilterSubject && !multipleSubjectFilter210Available) {
+ throw JsMultipleFilterSubjects210NotAvailable.instance();
+ }
+
+ String durable = config.getDurable();
String subj;
- if (consumerCreate290Available) {
+ // new consumer create not available before 290 and can't be used with multiple filter subjects
+ if (consumerCreate290Available && !multipleFilterSubject) {
if (consumerName == null) {
// if both consumerName and durable are null, generate a name
consumerName = durable == null ? generateConsumerName() : durable;
diff --git a/src/main/java/io/nats/client/support/ApiConstants.java b/src/main/java/io/nats/client/support/ApiConstants.java
index 03dffe1e8..2c5821f4c 100644
--- a/src/main/java/io/nats/client/support/ApiConstants.java
+++ b/src/main/java/io/nats/client/support/ApiConstants.java
@@ -69,6 +69,7 @@ public interface ApiConstants {
String EXTERNAL = "external";
String FILTER = "filter";
String FILTER_SUBJECT = "filter_subject";
+ String FILTER_SUBJECTS = "filter_subjects";
String FIRST_SEQ = "first_seq";
String FIRST_TS = "first_ts";
String FLOW_CONTROL = "flow_control";
diff --git a/src/main/java/io/nats/client/support/JsonValueUtils.java b/src/main/java/io/nats/client/support/JsonValueUtils.java
index cfc26e366..03672b0f4 100644
--- a/src/main/java/io/nats/client/support/JsonValueUtils.java
+++ b/src/main/java/io/nats/client/support/JsonValueUtils.java
@@ -145,7 +145,7 @@ public static List listOf(JsonValue v, Function provider) {
public static List optionalListOf(JsonValue v, Function provider) {
List list = listOf(v, provider);
- return list.size() == 0 ? null : list;
+ return list.isEmpty() ? null : list;
}
public static List readStringList(JsonValue jsonValue, String key) {
@@ -156,7 +156,7 @@ public static List readStringListIgnoreEmpty(JsonValue jsonValue, String
return read(jsonValue, key, v -> listOf(v, jv -> {
if (jv.string != null) {
String s = jv.string.trim();
- if (s.length() > 0) {
+ if (!s.isEmpty()) {
return s;
}
}
@@ -182,7 +182,7 @@ public static List readNanosList(JsonValue jsonValue, String key, bool
return l == null ? null : Duration.ofNanos(l);
})
);
- return list.size() == 0 && nullIfEmpty ? null : list;
+ return list.isEmpty() && nullIfEmpty ? null : list;
}
public static byte[] readBytes(JsonValue jsonValue, String key) {
diff --git a/src/main/java/io/nats/client/support/NatsJetStreamClientError.java b/src/main/java/io/nats/client/support/NatsJetStreamClientError.java
index d991ff000..4250635e8 100644
--- a/src/main/java/io/nats/client/support/NatsJetStreamClientError.java
+++ b/src/main/java/io/nats/client/support/NatsJetStreamClientError.java
@@ -68,6 +68,7 @@ public class NatsJetStreamClientError {
public static final NatsJetStreamClientError JsConsumerCreate290NotAvailable = new NatsJetStreamClientError(CON, 90301, "Name field not valid when v2.9.0 consumer create api is not available.");
public static final NatsJetStreamClientError JsConsumerNameDurableMismatch = new NatsJetStreamClientError(CON, 90302, "Name must match durable if both are supplied.");
+ public static final NatsJetStreamClientError JsMultipleFilterSubjects210NotAvailable = new NatsJetStreamClientError(CON, 90303, "Multiple filter subjects not available until server version 2.10.0.");
@Deprecated // Fixed spelling error
public static final NatsJetStreamClientError JsSubFcHbHbNotValidQueue = new NatsJetStreamClientError(SUB, 90006, "Flow Control and/or heartbeat is not valid in queue mode.");
diff --git a/src/main/java/io/nats/client/support/Validator.java b/src/main/java/io/nats/client/support/Validator.java
index e0a55c4ad..2c70f9b1c 100644
--- a/src/main/java/io/nats/client/support/Validator.java
+++ b/src/main/java/io/nats/client/support/Validator.java
@@ -17,37 +17,73 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
+import java.util.function.Supplier;
import java.util.regex.Pattern;
import static io.nats.client.support.NatsConstants.DOT;
import static io.nats.client.support.NatsJetStreamConstants.MAX_HISTORY_PER_KEY;
+@SuppressWarnings("UnusedReturnValue")
public abstract class Validator {
- private Validator() {
- } /* ensures cannot be constructed */
-
- public static String validateSubject(String s, boolean required) {
- return validateSubject(s, "Subject", required, false);
- }
-
- public static String validateSubject(String subject, String label, boolean required, boolean cantEndWithGt) {
- if (emptyAsNull(subject) == null) {
+ private Validator() {} /* ensures cannot be constructed */
+
+ /*
+ cannot contain spaces \r \n \t
+ cannot start or end with subject token delimiter .
+ some things don't allow it to end greater
+ */
+ public static String validateSubjectTerm(String subject, String label, boolean required) {
+ subject = emptyAsNull(subject);
+ if (subject == null) {
if (required) {
throw new IllegalArgumentException(label + " cannot be null or empty.");
}
return null;
}
+ if (subject.endsWith(".")) {
+ throw new IllegalArgumentException(label + " cannot end with '.'");
+ }
+
String[] segments = subject.split("\\.");
- for (int x = 0; x < segments.length; x++) {
- String segment = segments[x];
- if (segment.equals(">")) {
- if (cantEndWithGt || x != segments.length - 1) { // if it can end with gt, gt must be last segment
- throw new IllegalArgumentException(label + " cannot contain '>'");
+ for (int seg = 0; seg < segments.length; seg++) {
+ String segment = segments[seg];
+ int sl = segment.length();
+ if (sl == 0) {
+ if (seg == 0) {
+ throw new IllegalArgumentException(label + " cannot start with '.'");
+ }
+ throw new IllegalArgumentException(label + " segment cannot be empty");
+ }
+ else {
+ for (int m = 0; m < sl; m++) {
+ char c = segment.charAt(m);
+ switch (c) {
+ case 32:
+ case '\r':
+ case '\n':
+ case '\t':
+ throw new IllegalArgumentException(label + " cannot contain space, tab, carriage return or linefeed character");
+ case '*':
+ case '>':
+ if (sl != 1) {
+ throw new IllegalArgumentException(label + " wildcard improperly placed.");
+ }
+ break;
+ }
}
}
- else if (!segment.equals("*") && notPrintable(segment)) {
- throw new IllegalArgumentException(label + " must be printable characters only.");
- }
+ }
+ return subject;
+ }
+
+ public static String validateSubject(String s, boolean required) {
+ return validateSubjectTerm(s, "Subject", required);
+ }
+
+ public static String validateSubject(String subject, String label, boolean required, boolean cantEndWithGt) {
+ subject = validateSubjectTerm(subject, label, required);
+ if (subject != null && cantEndWithGt && subject.endsWith(".>")) {
+ throw new IllegalArgumentException(label + " last segment cannot be '>'");
}
return subject;
}
@@ -57,7 +93,7 @@ public static String validateReplyTo(String s, boolean required) {
}
public static String validateQueueName(String s, boolean required) {
- return validatePrintableExceptWildDotGt(s, "Queue", required);
+ return validateSubjectTerm(s, "QueueName", required);
}
public static String validateStreamName(String s, boolean required) {
@@ -120,10 +156,6 @@ public static String validateMustMatchIfBothSupplied(String s1, String s2, NatsJ
throw err.instance();
}
- interface Check {
- String check();
- }
-
public static String required(String s, String label) {
if (emptyAsNull(s) == null) {
throw new IllegalArgumentException(label + " cannot be null or empty.");
@@ -157,14 +189,14 @@ public static void required(Map, ?> m, String label) {
}
}
- public static String _validate(String s, boolean required, String label, Check check) {
+ public static String _validate(String s, boolean required, String label, Supplier customValidate) {
if (emptyAsNull(s) == null) {
if (required) {
throw new IllegalArgumentException(label + " cannot be null or empty.");
}
return null;
}
- return check.check();
+ return customValidate.get();
}
public static String validateMaxLength(String s, int maxLength, boolean required, String label) {
@@ -367,7 +399,7 @@ public static long validateGtEqZero(long l, String label) {
// Helpers
// ----------------------------------------------------------------------------------------------------
public static boolean nullOrEmpty(String s) {
- return s == null || s.trim().length() == 0;
+ return s == null || s.trim().isEmpty();
}
public static boolean notPrintable(String s) {
@@ -561,42 +593,40 @@ public static boolean isSemVer(String s) {
return SEMVER_PATTERN.matcher(s).find();
}
- public static boolean listsAreEqual(List l1, List l2, boolean nullSecondEqualsEmptyFirst)
+ public static boolean listsAreEquivalent(List l1, List l2)
{
- if (l1 == null)
- {
- return l2 == null;
- }
+ int s1 = l1 == null ? 0 : l1.size();
+ int s2 = l2 == null ? 0 : l2.size();
- if (l2 == null)
- {
- return nullSecondEqualsEmptyFirst && l1.size() == 0;
+ if (s1 != s2) {
+ return false;
}
- return l1.equals(l2);
+ if (s1 > 0) {
+ for (T t : l1) {
+ if (!l2.contains(t)) {
+ return false;
+ }
+ }
+ }
+ return true;
}
-
- public static boolean mapsAreEqual(Map m1, Map m2, boolean nullSecondEqualsEmptyFirst)
+ public static boolean mapsAreEquivalent(Map m1, Map m2)
{
- if (m1 == null)
- {
- return m2 == null;
- }
+ int s1 = m1 == null ? 0 : m1.size();
+ int s2 = m2 == null ? 0 : m2.size();
- if (m2 == null)
- {
- return nullSecondEqualsEmptyFirst && m1.size() == 0;
- }
-
- if (m1.size() != m2.size()) {
+ if (s1 != s2) {
return false;
}
- for (Map.Entry entry : m1.entrySet())
- {
- if (!entry.getValue().equals(m2.get(entry.getKey()))) {
- return false;
+ if (s1 > 0) {
+ for (Map.Entry entry : m1.entrySet())
+ {
+ if (!entry.getValue().equals(m2.get(entry.getKey()))) {
+ return false;
+ }
}
}
diff --git a/src/main/java/io/nats/service/Endpoint.java b/src/main/java/io/nats/service/Endpoint.java
index 339d7aaab..1914cca38 100644
--- a/src/main/java/io/nats/service/Endpoint.java
+++ b/src/main/java/io/nats/service/Endpoint.java
@@ -16,6 +16,7 @@
import io.nats.client.support.JsonSerializable;
import io.nats.client.support.JsonUtils;
import io.nats.client.support.JsonValue;
+import io.nats.client.support.Validator;
import java.util.HashMap;
import java.util.Map;
@@ -26,7 +27,6 @@
import static io.nats.client.support.JsonValueUtils.readString;
import static io.nats.client.support.JsonValueUtils.readStringStringMap;
import static io.nats.client.support.Validator.validateIsRestrictedTerm;
-import static io.nats.client.support.Validator.validateSubject;
/**
* Endpoint encapsulates the name, subject and metadata for a {@link ServiceEndpoint}.
@@ -85,7 +85,7 @@ public Endpoint(String name, String subject, Map metadata) {
this.subject = this.name;
}
else {
- this.subject = validateSubject(subject, "Endpoint Subject", false, false);
+ this.subject = Validator.validateSubjectTerm(subject, "Endpoint Subject", false);
}
}
else {
diff --git a/src/main/java/io/nats/service/Group.java b/src/main/java/io/nats/service/Group.java
index c6d65ac39..d6da0a8f7 100644
--- a/src/main/java/io/nats/service/Group.java
+++ b/src/main/java/io/nats/service/Group.java
@@ -13,10 +13,12 @@
package io.nats.service;
+import io.nats.client.support.Validator;
+
import java.util.Objects;
import static io.nats.client.support.NatsConstants.DOT;
-import static io.nats.client.support.Validator.validateSubject;
+import static io.nats.client.support.Validator.emptyAsNull;
/**
* Group is way to organize endpoints by serving as a common prefix to all endpoints registered in it.
@@ -27,11 +29,20 @@ public class Group {
/**
* Construct a group.
- * Group names and subjects are considered 'Restricted Terms' and must only contain A-Z, a-z, 0-9, '-' or '_'
+ * Group names are considered 'Restricted Terms' and must only contain A-Z, a-z, 0-9, '-' or '_'
* @param name the group name
*/
public Group(String name) {
- this.name = validateSubject(name, "Group Name", true, true);
+ name = emptyAsNull(name);
+ if (name == null) {
+ throw new IllegalArgumentException("Group name cannot be null or empty.");
+ }
+
+ if (name.contains(">")) {
+ throw new IllegalArgumentException("Group name cannot contain '>'.");
+ }
+
+ this.name = Validator.validateSubjectTerm(name, "Group name", false);
}
/**
diff --git a/src/main/javadoc/overview.html b/src/main/javadoc/overview.html
index 4abff4ca7..adf37e3ca 100644
--- a/src/main/javadoc/overview.html
+++ b/src/main/javadoc/overview.html
@@ -1,5 +1,5 @@