Skip to content

Commit

Permalink
Multiple Filter Subjects and Subject validation changes (#965)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Sep 19, 2023
1 parent 47532a0 commit c9e6762
Show file tree
Hide file tree
Showing 33 changed files with 980 additions and 543 deletions.
69 changes: 63 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

### A [Java](http://java.com) client for the [NATS messaging system](https://nats.io).

**Current Release**: 2.16.14   **Current Snapshot**: 2.16.15-SNAPSHOT
**Current Release**: 2.16.14   **Current Snapshot**: 2.17.0-SNAPSHOT

[![License Apache 2](https://img.shields.io/badge/License-Apache2-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/io.nats/jnats/badge.svg)](https://maven-badges.herokuapp.com/maven-central/io.nats/jnats)
[![Javadoc](http://javadoc.io/badge/io.nats/jnats.svg?branch=master)](http://javadoc.io/doc/io.nats/jnats?branch=master)
[![javadoc](https://javadoc.io/badge2/io.nats/jnats/javadoc.svg)](https://javadoc.io/doc/io.nats/jnats)
[![Coverage Status](https://coveralls.io/repos/github/nats-io/nats.java/badge.svg?branch=main)](https://coveralls.io/github/nats-io/nats.java?branch=main)
[![Build Main Badge](https://github.com/nats-io/nats.java/actions/workflows/build-main.yml/badge.svg?event=push)](https://github.com/nats-io/nats.java/actions/workflows/build-main.yml)
[![Release Badge](https://github.com/nats-io/nats.java/actions/workflows/build-release.yml/badge.svg?event=release)](https://github.com/nats-io/nats.java/actions/workflows/build-release.yml)
Expand Down Expand Up @@ -51,7 +51,30 @@ Version 2.5.0 adds some back pressure to publish calls to alleviate issues when

Previous versions are still available in the repo.

#### Version 2.16.14 Options properties improvements
#### Version 2.17.0: Server 2.10 support. Subject and Queue Name Validation

With the release of the 2.10 server, the client has been updated to support new features.
The most important new feature is the ability to have multipler filter subjects for any single JetStream consumer.
```java
ConsumerConfiguration cc = ConsumerConfiguration.builder()
...
.filterSubjects("subject1", "subject2")
.build();
```

For subjects, up until now, the client has been very strict when validating subject names for consumer subject filters and subscriptions.
It only allowed printable ascii characters except for `*`, `>`, `.`, `\\` and `/`. This restriction has been changed to the following:
* cannot contain spaces \r \n \t
* cannot start or end with subject token delimiter .
* cannot have empty segments

**This means that UTF characters are now allowed in this client.**


For queue names, there has been inconsistent validation, if any. Queue names now require the same validation as subjects.
**Important** We realize this may affect existing applications, but need to require consistency across clients

#### Version 2.16.14: Options properties improvements

In this release, support was added to
* support properties keys with or without the prefix 'io.nats.client.'
Expand All @@ -60,7 +83,7 @@ In this release, support was added to

For details on the other features, see the "Options" sections

#### Version 2.16.12 Max Payload Check
#### Version 2.16.12: Max Payload Check

As of version 2.16.12, there is no longer client side checking
1. that a message payload is less than the server configuration (Core and JetStream publishes)
Expand All @@ -71,7 +94,7 @@ Please see unit test for examples of this behavior.
and
`testMaxPayloadJs` in [JetStreamPubTests.cs](src/test/java/io/nats/client/impl/JetStreamPubTests.java)

#### Version 2.16.8 Websocket Support
#### Version 2.16.8: Websocket Support

As of version 2.16.8 Websocket (`ws` and `wss`) protocols are supported for connecting to the server.
For instance, your server bootstrap url might be `ws://my-nats-host:80` or `wss://my-nats-host:443`.
Expand All @@ -82,7 +105,7 @@ for more information.

If you use secure websockets (wss), your connection must be securely configured in the same way you would configure a `tls` connection.

#### Version 2.16.0 Consumer Create
#### Version 2.16.0: Consumer Create

This release by default will use a new JetStream consumer create API when interacting with nats-server version 2.9.0 or higher.
This changes the subjects used by the client to create consumers, which might in some cases require changes in access and import/export configuration.
Expand Down Expand Up @@ -277,6 +300,39 @@ Also, there is a detailed [OCSP Example](https://github.com/nats-io/java-nats-ex

The current version of this client supports subjects with ASCII printable characters and wildcards when subscribing.

### JetStream Subscribe Subject

With the introduction of simplification, the various original subscribe methods available will eventually be deprecated.
But since they are available, we need to address the concept of the subscribe subject.

All the subscribe methods take a "subject" as the first parameter. We call this the subscribe subject.
The subject could be something like `my.subject` or `my.star.*` or `my.gt.>` or even `>`.
This parameter is used and validated in different ways depending on the context of the call,
including looking up the stream, if stream is not provided via subscribe options.

The subscribe subject could be used to make a simple subscription. In this case it is required.
An ephemeral consumer will be created for that subject, assuming that subject can be looked up in a stream.
```java
JetStream js = nc.jetStream();
JetStreamSubscription sub = subscribe(subject)
```

If subscribe call has either a PushSubscribeOptions or PullSubscribeOptions that have a ConsumerConfiguration
with one or more filter subjects, the subscribe subject is optional since we can use the first filter subject as
the subscribe subject.

```java
PushSubscribeOptions pso = ConsumerConfiguration.builder().filterSubject("my.subject").buildPushSubscribeOptions();
js.subscribe(null, pso);
```

The other time you can skip the subject parameter is when you "bind". Since the stream name and consumer name are
part of the bind, the subject will be retrieved from the consumer looked-up via the bind stream and consumer name information.

#### On-the-fly Subscribes

On the fly subscribes rely on

### NKey-based Challenge Response Authentication

The NATS server is adding support for a challenge response authentication scheme based on [NKeys](https://github.com/nats-io/nkeys). Version 2.2.0 of
Expand Down Expand Up @@ -851,6 +907,7 @@ You can however set the deliver policy which will be used to start the subscript
| JsSubPushAsyncCantSetPending | SUB | 90021 | Pending limits must be set directly on the dispatcher. |
| JsConsumerCreate290NotAvailable | CON | 90301 | Name field not valid when v2.9.0 consumer create api is not available. |
| JsConsumerNameDurableMismatch | CON | 90302 | Name must match durable if both are supplied. |
| JsMultipleFilterSubjects210NotAvailable | CON | 90303 | Multiple filter subjects not available until server version 2.10.0. |
| OsObjectNotFound | OS | 90201 | The object was not found. |
| OsObjectIsDeleted | OS | 90202 | The object is deleted. |
| OsObjectAlreadyExists | OS | 90203 | An object with that name already exists. |
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ plugins {
id 'signing'
}

def jarVersion = "2.16.15"
def jarVersion = "2.17.0"

def isRelease = System.getenv("BUILD_EVENT") == "release"
def bs = System.getenv("BRANCH_SNAPSHOT")
Expand Down
39 changes: 22 additions & 17 deletions src/main/java/io/nats/client/JetStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -358,13 +358,13 @@ public interface JetStream {
* <p>See {@link io.nats.client.Connection#createDispatcher(MessageHandler) createDispatcher} for
* information about creating an asynchronous subscription with callbacks.
*
* @param subject the subject to subscribe to
* @param subscribeSubject the subject to subscribe to
* @return The subscription
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
JetStreamSubscription subscribe(String subject) throws IOException, JetStreamApiException;
JetStreamSubscription subscribe(String subscribeSubject) throws IOException, JetStreamApiException;

/**
* Create a synchronous subscription to the specified subject.
Expand All @@ -375,14 +375,15 @@ public interface JetStream {
* <p>See {@link io.nats.client.Connection#createDispatcher(MessageHandler) createDispatcher} for
* information about creating an asynchronous subscription with callbacks.
*
* @param subject the subject to subscribe to
* @param subscribeSubject the subject to subscribe to.
* Can be null or empty when the options have a ConsumerConfiguration that supplies a filter subject.
* @param options optional subscription options
* @return The subscription
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
JetStreamSubscription subscribe(String subject, PushSubscribeOptions options) throws IOException, JetStreamApiException;
JetStreamSubscription subscribe(String subscribeSubject, PushSubscribeOptions options) throws IOException, JetStreamApiException;

/**
* Create a synchronous subscription to the specified subject.
Expand All @@ -393,22 +394,23 @@ public interface JetStream {
* <p>See {@link io.nats.client.Connection#createDispatcher(MessageHandler) createDispatcher} for
* information about creating an asynchronous subscription with callbacks.
*
* @param subject the subject to subscribe to
* @param subscribeSubject the subject to subscribe to
* Can be null or empty when the options have a ConsumerConfiguration that supplies a filter subject.
* @param queue the optional queue group to join
* @param options optional subscription options
* @return The subscription
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
JetStreamSubscription subscribe(String subject, String queue, PushSubscribeOptions options) throws IOException, JetStreamApiException;
JetStreamSubscription subscribe(String subscribeSubject, String queue, PushSubscribeOptions options) throws IOException, JetStreamApiException;

/**
* Create an asynchronous subscription to the specified subject under the control of the
* specified dispatcher. Since a MessageHandler is also required, the Dispatcher will
* not prevent duplicate subscriptions from being made.
*
* @param subject The subject to subscribe to
* @param subscribeSubject The subject to subscribe to
* @param dispatcher The dispatcher to handle this subscription
* @param handler The target for the messages
* @param autoAck Whether to auto ack
Expand All @@ -417,14 +419,14 @@ public interface JetStream {
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck) throws IOException, JetStreamApiException;
JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck) throws IOException, JetStreamApiException;

/**
* Create an asynchronous subscription to the specified subject under the control of the
* specified dispatcher. Since a MessageHandler is also required, the Dispatcher will
* not prevent duplicate subscriptions from being made.
*
* @param subject The subject to subscribe to.
* @param subscribeSubject The subject to subscribe to.
* Can be null or empty when the options have a ConsumerConfiguration that supplies a filter subject.
* @param dispatcher The dispatcher to handle this subscription
* @param handler The target for the messages
* @param autoAck Whether to auto ack
Expand All @@ -434,14 +436,15 @@ public interface JetStream {
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException;
JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException;

/**
* Create an asynchronous subscription to the specified subject under the control of the
* specified dispatcher. Since a MessageHandler is also required, the Dispatcher will
* not prevent duplicate subscriptions from being made.
*
* @param subject The subject to subscribe to.
* @param subscribeSubject The subject to subscribe to.
* Can be null or empty when the options have a ConsumerConfiguration that supplies a filter subject.
* @param queue the optional queue group to join
* @param dispatcher The dispatcher to handle this subscription
* @param handler The target for the messages
Expand All @@ -452,22 +455,24 @@ public interface JetStream {
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
JetStreamSubscription subscribe(String subject, String queue, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException;
JetStreamSubscription subscribe(String subscribeSubject, String queue, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException;

/**
* Create a subscription to the specified subject in the mode of pull, with additional options
* @param subject The subject to subscribe to
* @param subscribeSubject The subject to subscribe to
* Can be null or empty when the options have a ConsumerConfiguration that supplies a filter subject.
* @param options pull subscription options
* @return The subscription
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
JetStreamSubscription subscribe(String subject, PullSubscribeOptions options) throws IOException, JetStreamApiException;
JetStreamSubscription subscribe(String subscribeSubject, PullSubscribeOptions options) throws IOException, JetStreamApiException;

/**
* Create an asynchronous subscription to the specified subject in the mode of pull, with additional options
* @param subject The subject to subscribe to
* @param subscribeSubject The subject to subscribe to
* Can be null or empty when the options have a ConsumerConfiguration that supplies a filter subject.
* @param dispatcher The dispatcher to handle this subscription
* @param handler The target for the messages
* @param options pull subscription options
Expand All @@ -476,7 +481,7 @@ public interface JetStream {
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, PullSubscribeOptions options) throws IOException, JetStreamApiException;
JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispatcher, MessageHandler handler, PullSubscribeOptions options) throws IOException, JetStreamApiException;

/**
* Get a stream context for a specific named stream. Verifies that the stream exists.
Expand Down
Loading

0 comments on commit c9e6762

Please sign in to comment.