Skip to content

Commit

Permalink
Consumer Limits Stream Configuration (#979)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Sep 22, 2023
1 parent f666550 commit 7ebdd78
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 14 deletions.
137 changes: 137 additions & 0 deletions src/main/java/io/nats/client/api/ConsumerLimits.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client.api;

import io.nats.client.support.JsonSerializable;
import io.nats.client.support.JsonUtils;
import io.nats.client.support.JsonValue;

import java.time.Duration;

import static io.nats.client.api.ConsumerConfiguration.*;
import static io.nats.client.support.ApiConstants.INACTIVE_THRESHOLD;
import static io.nats.client.support.ApiConstants.MAX_ACK_PENDING;
import static io.nats.client.support.JsonUtils.beginJson;
import static io.nats.client.support.JsonUtils.endJson;
import static io.nats.client.support.JsonValueUtils.readInteger;
import static io.nats.client.support.JsonValueUtils.readNanos;

/**
* ConsumerLimits
*/
public class ConsumerLimits implements JsonSerializable {
private final Duration inactiveThreshold;
private final Integer maxAckPending;

static ConsumerLimits optionalInstance(JsonValue vConsumerLimits) {
return vConsumerLimits == null ? null : new ConsumerLimits(vConsumerLimits);
}

ConsumerLimits(JsonValue vConsumerLimits) {
inactiveThreshold = readNanos(vConsumerLimits, INACTIVE_THRESHOLD);
maxAckPending = readInteger(vConsumerLimits, MAX_ACK_PENDING);
}

ConsumerLimits(ConsumerLimits.Builder b) {
this.inactiveThreshold = b.inactiveThreshold;
this.maxAckPending = b.maxAckPending;
}

/**
* Get the amount of time before the consumer is deemed inactive.
* @return the inactive threshold
*/
public Duration getInactiveThreshold() {
return inactiveThreshold;
}

/**
* Gets the maximum ack pending configuration.
* @return maximum ack pending.
*/
public long getMaxAckPending() {
return getOrUnset(maxAckPending);
}

public String toJson() {
StringBuilder sb = beginJson();
JsonUtils.addFieldAsNanos(sb, INACTIVE_THRESHOLD, inactiveThreshold);
JsonUtils.addField(sb, MAX_ACK_PENDING, maxAckPending);
return endJson(sb).toString();
}

/**
* Creates a builder for a consumer limits object.
* @return the builder.
*/
public static Builder builder() {
return new Builder();
}

/**
* ConsumerLimits can be created using a Builder.
*/
public static class Builder {
private Duration inactiveThreshold;
private Integer maxAckPending;

/**
* sets the amount of time before the consumer is deemed inactive.
* @param inactiveThreshold the threshold duration
* @return Builder
*/
public Builder inactiveThreshold(Duration inactiveThreshold) {
this.inactiveThreshold = normalize(inactiveThreshold);
return this;
}

/**
* sets the amount of time before the consumer is deemed inactive.
* @param inactiveThreshold the threshold duration in milliseconds
* @return Builder
*/
public Builder inactiveThreshold(long inactiveThreshold) {
this.inactiveThreshold = normalizeDuration(inactiveThreshold);
return this;
}

/**
* Sets the maximum ack pending or null to unset / clear.
* @param maxAckPending maximum pending acknowledgements.
* @return Builder
*/
public Builder maxAckPending(Long maxAckPending) {
this.maxAckPending = normalize(maxAckPending, STANDARD_MIN);
return this;
}

/**
* Sets the maximum ack pending.
* @param maxAckPending maximum pending acknowledgements.
* @return Builder
*/
public Builder maxAckPending(long maxAckPending) {
this.maxAckPending = normalize(maxAckPending, STANDARD_MIN);
return this;
}

/**
* Build a ConsumerLimits object
* @return the ConsumerLimits
*/
public ConsumerLimits build() {
return new ConsumerLimits(this);
}
}
}
41 changes: 28 additions & 13 deletions src/main/java/io/nats/client/api/StreamConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class StreamConfiguration implements JsonSerializable {
private final Placement placement;
private final Republish republish;
private final SubjectTransform subjectTransform;
private final ConsumerLimits consumerLimits;
private final Mirror mirror;
private final List<Source> sources;
private final boolean sealed;
Expand Down Expand Up @@ -91,6 +92,7 @@ static StreamConfiguration instance(JsonValue v) {
builder.placement(Placement.optionalInstance(readValue(v, PLACEMENT)));
builder.republish(Republish.optionalInstance(readValue(v, REPUBLISH)));
builder.subjectTransform(SubjectTransform.optionalInstance(readValue(v, SUBJECT_TRANSFORM)));
builder.consumerLimits(ConsumerLimits.optionalInstance(readValue(v, CONSUMER_LIMITS)));
builder.mirror(Mirror.optionalInstance(readValue(v, MIRROR)));
builder.sources(Source.optionalListOf(readValue(v, SOURCES)));
builder.sealed(readBoolean(v, SEALED));
Expand Down Expand Up @@ -127,6 +129,7 @@ static StreamConfiguration instance(JsonValue v) {
this.placement = b.placement;
this.republish = b.republish;
this.subjectTransform = b.subjectTransform;
this.consumerLimits = b.consumerLimits;
this.mirror = b.mirror;
this.sources = b.sources;
this.sealed = b.sealed;
Expand Down Expand Up @@ -168,20 +171,12 @@ public String toJson() {
addField(sb, TEMPLATE_OWNER, templateOwner);
addField(sb, DISCARD, discardPolicy.toString());
addFieldAsNanos(sb, DUPLICATE_WINDOW, duplicateWindow);
if (placement != null) {
addField(sb, PLACEMENT, placement);
}
if (republish != null) {
addField(sb, REPUBLISH, republish);
}
if (subjectTransform != null) {
addField(sb, SUBJECT_TRANSFORM, subjectTransform);
}
if (mirror != null) {
addField(sb, MIRROR, mirror);
}
addField(sb, PLACEMENT, placement);
addField(sb, REPUBLISH, republish);
addField(sb, SUBJECT_TRANSFORM, subjectTransform);
addField(sb, CONSUMER_LIMITS, consumerLimits);
addField(sb, MIRROR, mirror);
addJsons(sb, SOURCES, sources);

addFldWhenTrue(sb, SEALED, sealed);
addFldWhenTrue(sb, ALLOW_ROLLUP_HDRS, allowRollup);
addFldWhenTrue(sb, ALLOW_DIRECT, allowDirect);
Expand Down Expand Up @@ -357,6 +352,14 @@ public SubjectTransform getSubjectTransform() {
return subjectTransform;
}

/**
* Get the consumerLimits configuration. May be null.
* @return the consumerLimits object
*/
public ConsumerLimits getConsumerLimits() {
return consumerLimits;
}

/**
* The mirror definition for this stream
* @return the mirror
Expand Down Expand Up @@ -526,6 +529,7 @@ public static class Builder {
private Placement placement = null;
private Republish republish = null;
private SubjectTransform subjectTransform = null;
private ConsumerLimits consumerLimits = null;
private Mirror mirror = null;
private final List<Source> sources = new ArrayList<>();
private boolean sealed = false;
Expand Down Expand Up @@ -569,6 +573,7 @@ public Builder(StreamConfiguration sc) {
this.placement = sc.placement;
this.republish = sc.republish;
this.subjectTransform = sc.subjectTransform;
this.consumerLimits = sc.consumerLimits;
this.mirror = sc.mirror;
sources(sc.sources);
this.sealed = sc.sealed;
Expand Down Expand Up @@ -847,6 +852,16 @@ public Builder subjectTransform(SubjectTransform subjectTransform) {
return this;
}

/**
* Sets the consumerLimits config object
* @param consumerLimits the consumerLimits config object
* @return Builder
*/
public Builder consumerLimits(ConsumerLimits consumerLimits) {
this.consumerLimits = consumerLimits;
return this;
}

/**
* Sets the mirror object
* @param mirror the mirror object
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/support/ApiConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public interface ApiConstants {
String CONNECT_URLS = "connect_urls";
String CONSUMER_COUNT = "consumer_count";
String CONSUMER_SEQ = "consumer_seq";
String CONSUMER_LIMITS = "consumer_limits";
String CONSUMERS = "consumers";
String CREATED = "created";
String CURRENT = "current";
Expand Down
45 changes: 44 additions & 1 deletion src/test/java/io/nats/client/api/StreamConfigurationTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import static io.nats.client.api.CompressionOption.None;
import static io.nats.client.api.CompressionOption.S2;
import static io.nats.client.api.ConsumerConfiguration.*;
import static org.junit.jupiter.api.Assertions.*;

public class StreamConfigurationTests extends JetStreamTestBase {
Expand Down Expand Up @@ -105,7 +106,8 @@ public void testConstruction() {
.denyPurge(testSc.getDenyPurge())
.discardNewPerSubject(testSc.isDiscardNewPerSubject())
.metadata(metaData)
.firstSequence(82942);
.firstSequence(82942)
.consumerLimits(testSc.getConsumerLimits());
validate(builder.build(), false);
validate(builder.addSources((Source)null).build(), false);

Expand Down Expand Up @@ -461,6 +463,10 @@ private void validate(StreamConfiguration sc, boolean serverTest) {
assertNotNull(sc.getSubjectTransform());
assertEquals("st.>", sc.getSubjectTransform().getSource());
assertEquals("stdest.>", sc.getSubjectTransform().getDestination());

assertNotNull(sc.getConsumerLimits());
assertEquals(Duration.ofSeconds(50), sc.getConsumerLimits().getInactiveThreshold());
assertEquals(42, sc.getConsumerLimits().getMaxAckPending());
}
}

Expand Down Expand Up @@ -520,6 +526,43 @@ public void testSubjectTransform() {
assertNull(st.getDestination());
}

@Test
public void testConsumerLimits() {
ConsumerLimits cl = ConsumerLimits.builder().build();
assertEquals(null, cl.getInactiveThreshold());
assertEquals(INTEGER_UNSET, cl.getMaxAckPending());

cl = ConsumerLimits.builder().inactiveThreshold(Duration.ofMillis(0)).build();
assertEquals(Duration.ZERO, cl.getInactiveThreshold());

cl = ConsumerLimits.builder().inactiveThreshold(0L).build();
assertEquals(Duration.ZERO, cl.getInactiveThreshold());

cl = ConsumerLimits.builder().inactiveThreshold(Duration.ofMillis(1)).build();
assertEquals(Duration.ofMillis(1), cl.getInactiveThreshold());

cl = ConsumerLimits.builder().inactiveThreshold(1L).build();
assertEquals(Duration.ofMillis(1), cl.getInactiveThreshold());

cl = ConsumerLimits.builder().inactiveThreshold(Duration.ofMillis(-1)).build();
assertEquals(DURATION_UNSET, cl.getInactiveThreshold());

cl = ConsumerLimits.builder().inactiveThreshold(-1).build();
assertEquals(DURATION_UNSET, cl.getInactiveThreshold());

cl = ConsumerLimits.builder().maxAckPending(STANDARD_MIN).build();
assertEquals(STANDARD_MIN, cl.getMaxAckPending());

cl = ConsumerLimits.builder().maxAckPending(INTEGER_UNSET).build();
assertEquals(INTEGER_UNSET, cl.getMaxAckPending());

cl = ConsumerLimits.builder().maxAckPending(-2).build();
assertEquals(INTEGER_UNSET, cl.getMaxAckPending());

cl = ConsumerLimits.builder().maxAckPending(Long.MAX_VALUE).build();
assertEquals(Integer.MAX_VALUE, cl.getMaxAckPending());
}

@Test
public void testExternal() {
External e = External.builder().api("api").deliver("deliver").build();
Expand Down
4 changes: 4 additions & 0 deletions src/test/resources/data/StreamConfiguration.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
"src": "st.>",
"dest": "stdest.>"
},
"consumer_limits": {
"inactive_threshold": 50000000000,
"max_ack_pending": 42
},
"mirror": {
"name": "eman",
"opt_start_seq": 736,
Expand Down

0 comments on commit 7ebdd78

Please sign in to comment.