Skip to content

Commit

Permalink
validate queue name and test cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Sep 14, 2023
1 parent d827d1f commit 764ce45
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 134 deletions.
41 changes: 4 additions & 37 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,10 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static io.nats.client.support.NatsConstants.*;
import static io.nats.client.support.NatsRequestCompletableFuture.CancelAction;
import static io.nats.client.support.Validator.validateNotNull;
import static io.nats.client.support.Validator.*;
import static java.nio.charset.StandardCharsets.UTF_8;

class NatsConnection implements Connection {
Expand Down Expand Up @@ -854,18 +852,7 @@ private void checkIfNeedsHeaderSupport(Headers headers) {
*/
@Override
public Subscription subscribe(String subject) {

if (subject == null || subject.length() == 0) {
throw new IllegalArgumentException("Subject is required in subscribe");
}

Pattern pattern = Pattern.compile("\\s");
Matcher matcher = pattern.matcher(subject);

if (matcher.find()) {
throw new IllegalArgumentException("Subject cannot contain whitespace");
}

validateSubject(subject, true);
return createSubscription(subject, null, null, null);
}

Expand All @@ -874,28 +861,8 @@ public Subscription subscribe(String subject) {
*/
@Override
public Subscription subscribe(String subject, String queueName) {

if (subject == null || subject.length() == 0) {
throw new IllegalArgumentException("Subject is required in subscribe");
}

Pattern pattern = Pattern.compile("\\s");
Matcher smatcher = pattern.matcher(subject);

if (smatcher.find()) {
throw new IllegalArgumentException("Subject cannot contain whitespace");
}

if (queueName == null || queueName.length() == 0) {
throw new IllegalArgumentException("QueueName is required in subscribe");
}

Matcher qmatcher = pattern.matcher(queueName);

if (qmatcher.find()) {
throw new IllegalArgumentException("Queue names cannot contain whitespace");
}

validateSubject(subject, true);
validateQueueName(queueName, true);
return createSubscription(subject, queueName, null, null);
}

Expand Down
41 changes: 11 additions & 30 deletions src/main/java/io/nats/client/impl/NatsDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

import static io.nats.client.support.Validator.*;

class NatsDispatcher extends NatsConsumer implements Dispatcher, Runnable {

private MessageQueue incoming;
Expand Down Expand Up @@ -187,53 +189,32 @@ void remove(NatsSubscription sub) {
}

public Dispatcher subscribe(String subject) {
if (subject == null || subject.length() == 0) {
throw new IllegalArgumentException("Subject is required in subscribe");
}

validateSubject(subject, true);
this.subscribeImplCore(subject, null, null);
return this;
}
NatsSubscription subscribeReturningSubscription(String subject) {
if (subject == null || subject.length() == 0) {
throw new IllegalArgumentException("Subject is required in subscribe");
}

NatsSubscription subscribeReturningSubscription(String subject) {
validateSubject(subject, true);
return this.subscribeImplCore(subject, null, null);
}

public Subscription subscribe(String subject, MessageHandler handler) {
if (subject == null || subject.length() == 0) {
throw new IllegalArgumentException("Subject is required in subscribe");
}

if (handler == null) {
throw new IllegalArgumentException("MessageHandler is required in subscribe");
}
validateSubject(subject, true);
required(handler, "Handler");
return this.subscribeImplCore(subject, null, handler);
}

public Dispatcher subscribe(String subject, String queueName) {
if (subject == null || subject.length() == 0) {
throw new IllegalArgumentException("Subject is required in subscribe");
}

if (queueName == null || queueName.length() == 0) {
throw new IllegalArgumentException("QueueName is required in subscribe");
}
validateSubject(subject, true);
validateQueueName(queueName, true);
this.subscribeImplCore(subject, queueName, null);
return this;
}

public Subscription subscribe(String subject, String queueName, MessageHandler handler) {
if (subject == null || subject.length() == 0) {
throw new IllegalArgumentException("Subject is required in subscribe");
}

if (queueName == null || queueName.length() == 0) {
throw new IllegalArgumentException("QueueName is required in subscribe");
}

validateSubject(subject, true);
validateQueueName(queueName, true);
if (handler == null) {
throw new IllegalArgumentException("MessageHandler is required in subscribe");
}
Expand Down
12 changes: 10 additions & 2 deletions src/main/java/io/nats/client/impl/NatsJetStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ private String lookupStreamSubject(String stream) throws IOException, JetStreamA
*/
@Override
public JetStreamSubscription subscribe(String subscribeSubject) throws IOException, JetStreamApiException {
subscribeSubject = validateSubject(subscribeSubject, true);
return createSubscription(subscribeSubject, null, null, null, null, null, false);
}

Expand All @@ -570,6 +571,7 @@ public JetStreamSubscription subscribe(String subscribeSubject) throws IOExcepti
*/
@Override
public JetStreamSubscription subscribe(String subscribeSubject, PushSubscribeOptions options) throws IOException, JetStreamApiException {
subscribeSubject = validateSubject(subscribeSubject, false);
return createSubscription(subscribeSubject, options, null, null, null, null, false);
}

Expand All @@ -578,7 +580,8 @@ public JetStreamSubscription subscribe(String subscribeSubject, PushSubscribeOpt
*/
@Override
public JetStreamSubscription subscribe(String subscribeSubject, String queue, PushSubscribeOptions options) throws IOException, JetStreamApiException {
queue = validateQueueName(emptyAsNull(queue), false);
subscribeSubject = validateSubject(subscribeSubject, false);
validateQueueName(queue, false);
return createSubscription(subscribeSubject, options, null, queue, null, null, false);
}

Expand All @@ -587,6 +590,7 @@ public JetStreamSubscription subscribe(String subscribeSubject, String queue, Pu
*/
@Override
public JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck) throws IOException, JetStreamApiException {
subscribeSubject = validateSubject(subscribeSubject, false);
validateNotNull(dispatcher, "Dispatcher");
validateNotNull(handler, "Handler");
return createSubscription(subscribeSubject, null, null, null, (NatsDispatcher) dispatcher, handler, autoAck);
Expand All @@ -597,6 +601,7 @@ public JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispa
*/
@Override
public JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException {
subscribeSubject = validateSubject(subscribeSubject, false);
validateNotNull(dispatcher, "Dispatcher");
validateNotNull(handler, "Handler");
return createSubscription(subscribeSubject, options, null, null, (NatsDispatcher) dispatcher, handler, autoAck);
Expand All @@ -607,7 +612,8 @@ public JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispa
*/
@Override
public JetStreamSubscription subscribe(String subscribeSubject, String queue, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException {
queue = validateQueueName(emptyAsNull(queue), false);
subscribeSubject = validateSubject(subscribeSubject, false);
validateQueueName(queue, false);
validateNotNull(dispatcher, "Dispatcher");
validateNotNull(handler, "Handler");
return createSubscription(subscribeSubject, options, null, queue, (NatsDispatcher) dispatcher, handler, autoAck);
Expand All @@ -618,6 +624,7 @@ public JetStreamSubscription subscribe(String subscribeSubject, String queue, Di
*/
@Override
public JetStreamSubscription subscribe(String subscribeSubject, PullSubscribeOptions options) throws IOException, JetStreamApiException {
subscribeSubject = validateSubject(subscribeSubject, false);
validateNotNull(options, "Pull Subscribe Options");
return createSubscription(subscribeSubject, null, options, null, null, null, false);
}
Expand All @@ -627,6 +634,7 @@ public JetStreamSubscription subscribe(String subscribeSubject, PullSubscribeOpt
*/
@Override
public JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispatcher, MessageHandler handler, PullSubscribeOptions options) throws IOException, JetStreamApiException {
subscribeSubject = validateSubject(subscribeSubject, false);
validateNotNull(dispatcher, "Dispatcher");
validateNotNull(handler, "Handler");
validateNotNull(options, "Pull Subscribe Options");
Expand Down
48 changes: 23 additions & 25 deletions src/main/java/io/nats/client/support/Validator.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,30 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.regex.Pattern;

import static io.nats.client.support.NatsConstants.DOT;
import static io.nats.client.support.NatsJetStreamConstants.MAX_HISTORY_PER_KEY;

@SuppressWarnings("UnusedReturnValue")
public abstract class Validator {
private Validator() {
} /* ensures cannot be constructed */

public static String validateSubject(String s, boolean required) {
return validateSubject(s, "Subject", required);
}

public static String validateSubject(String subject, String label, boolean required, boolean cantEndWithGt) {
subject = validateSubject(subject, label, required);
if (cantEndWithGt && subject.endsWith(".>")) {
throw new IllegalArgumentException(label + " last segment cannot be '>'");
}
return subject;
}
private Validator() {} /* ensures cannot be constructed */

/*
cannot contain spaces \r \n \t
cannot start or with subject token delimiter .
some things don't allow it to end greater
*/
public static String validateSubject(String subject, String label, boolean required) {
if (emptyAsNull(subject) == null) {
public static String validateSubjectTerm(String subject, String label, boolean required) {
subject = emptyAsNull(subject);
if (subject == null) {
if (required) {
throw new IllegalArgumentException(label + " cannot be null or empty.");
}
return null;
}

subject = subject.trim();
String[] segments = subject.split("\\.");
for (int seg = 0; seg < segments.length; seg++) {
String segment = segments[seg];
Expand All @@ -69,7 +58,8 @@ public static String validateSubject(String subject, String label, boolean requi
case 32:
case '\r':
case '\n':
throw new IllegalArgumentException(label + " cannot contain space, carriage return or linefeed character");
case '\t':
throw new IllegalArgumentException(label + " cannot contain space, tab, carriage return or linefeed character");
case '*':
case '>':
if (sl != 1) {
Expand All @@ -83,12 +73,24 @@ public static String validateSubject(String subject, String label, boolean requi
return subject;
}

public static String validateSubject(String s, boolean required) {
return validateSubjectTerm(s, "Subject", required);
}

public static String validateSubject(String subject, String label, boolean required, boolean cantEndWithGt) {
subject = validateSubjectTerm(subject, label, required);
if (subject != null && cantEndWithGt && subject.endsWith(".>")) {
throw new IllegalArgumentException(label + " last segment cannot be '>'");
}
return subject;
}

public static String validateReplyTo(String s, boolean required) {
return validatePrintableExceptWildGt(s, "Reply To", required);
}

public static String validateQueueName(String s, boolean required) {
return validatePrintableExceptWildDotGt(s, "Queue", required);
return validateSubjectTerm(s, "QueueName", required);
}

public static String validateStreamName(String s, boolean required) {
Expand Down Expand Up @@ -151,10 +153,6 @@ public static String validateMustMatchIfBothSupplied(String s1, String s2, NatsJ
throw err.instance();
}

interface Check {
String check();
}

public static String required(String s, String label) {
if (emptyAsNull(s) == null) {
throw new IllegalArgumentException(label + " cannot be null or empty.");
Expand Down Expand Up @@ -188,14 +186,14 @@ public static void required(Map<?, ?> m, String label) {
}
}

public static String _validate(String s, boolean required, String label, Check check) {
public static String _validate(String s, boolean required, String label, Supplier<String> customValidate) {
if (emptyAsNull(s) == null) {
if (required) {
throw new IllegalArgumentException(label + " cannot be null or empty.");
}
return null;
}
return check.check();
return customValidate.get();
}

public static String validateMaxLength(String s, int maxLength, boolean required, String label) {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/nats/service/Endpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.nats.client.support.JsonSerializable;
import io.nats.client.support.JsonUtils;
import io.nats.client.support.JsonValue;
import io.nats.client.support.Validator;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -26,7 +27,6 @@
import static io.nats.client.support.JsonValueUtils.readString;
import static io.nats.client.support.JsonValueUtils.readStringStringMap;
import static io.nats.client.support.Validator.validateIsRestrictedTerm;
import static io.nats.client.support.Validator.validateSubject;

/**
* Endpoint encapsulates the name, subject and metadata for a {@link ServiceEndpoint}.
Expand Down Expand Up @@ -85,7 +85,7 @@ public Endpoint(String name, String subject, Map<String, String> metadata) {
this.subject = this.name;
}
else {
this.subject = validateSubject(subject, "Endpoint Subject", false);
this.subject = Validator.validateSubjectTerm(subject, "Endpoint Subject", false);
}
}
else {
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/io/nats/service/Group.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@

package io.nats.service;

import io.nats.client.support.Validator;

import java.util.Objects;

import static io.nats.client.support.NatsConstants.DOT;
import static io.nats.client.support.Validator.emptyAsNull;
import static io.nats.client.support.Validator.validateSubject;

/**
* Group is way to organize endpoints by serving as a common prefix to all endpoints registered in it.
Expand All @@ -41,7 +42,7 @@ public Group(String name) {
throw new IllegalArgumentException("Group name cannot contain '>'.");
}

this.name = validateSubject(name, "Group name", false);
this.name = Validator.validateSubjectTerm(name, "Group name", false);
}

/**
Expand Down
Loading

0 comments on commit 764ce45

Please sign in to comment.