From 7ebdd78a2a12a406615d5be3aa91829b56bb41d6 Mon Sep 17 00:00:00 2001 From: Scott Fauerbach Date: Fri, 22 Sep 2023 12:51:05 -0400 Subject: [PATCH] Consumer Limits Stream Configuration (#979) --- .../io/nats/client/api/ConsumerLimits.java | 137 ++++++++++++++++++ .../nats/client/api/StreamConfiguration.java | 41 ++++-- .../io/nats/client/support/ApiConstants.java | 1 + .../client/api/StreamConfigurationTests.java | 45 +++++- .../resources/data/StreamConfiguration.json | 4 + 5 files changed, 214 insertions(+), 14 deletions(-) create mode 100644 src/main/java/io/nats/client/api/ConsumerLimits.java diff --git a/src/main/java/io/nats/client/api/ConsumerLimits.java b/src/main/java/io/nats/client/api/ConsumerLimits.java new file mode 100644 index 000000000..c866fbd7c --- /dev/null +++ b/src/main/java/io/nats/client/api/ConsumerLimits.java @@ -0,0 +1,137 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client.api; + +import io.nats.client.support.JsonSerializable; +import io.nats.client.support.JsonUtils; +import io.nats.client.support.JsonValue; + +import java.time.Duration; + +import static io.nats.client.api.ConsumerConfiguration.*; +import static io.nats.client.support.ApiConstants.INACTIVE_THRESHOLD; +import static io.nats.client.support.ApiConstants.MAX_ACK_PENDING; +import static io.nats.client.support.JsonUtils.beginJson; +import static io.nats.client.support.JsonUtils.endJson; +import static io.nats.client.support.JsonValueUtils.readInteger; +import static io.nats.client.support.JsonValueUtils.readNanos; + +/** + * ConsumerLimits + */ +public class ConsumerLimits implements JsonSerializable { + private final Duration inactiveThreshold; + private final Integer maxAckPending; + + static ConsumerLimits optionalInstance(JsonValue vConsumerLimits) { + return vConsumerLimits == null ? null : new ConsumerLimits(vConsumerLimits); + } + + ConsumerLimits(JsonValue vConsumerLimits) { + inactiveThreshold = readNanos(vConsumerLimits, INACTIVE_THRESHOLD); + maxAckPending = readInteger(vConsumerLimits, MAX_ACK_PENDING); + } + + ConsumerLimits(ConsumerLimits.Builder b) { + this.inactiveThreshold = b.inactiveThreshold; + this.maxAckPending = b.maxAckPending; + } + + /** + * Get the amount of time before the consumer is deemed inactive. + * @return the inactive threshold + */ + public Duration getInactiveThreshold() { + return inactiveThreshold; + } + + /** + * Gets the maximum ack pending configuration. + * @return maximum ack pending. + */ + public long getMaxAckPending() { + return getOrUnset(maxAckPending); + } + + public String toJson() { + StringBuilder sb = beginJson(); + JsonUtils.addFieldAsNanos(sb, INACTIVE_THRESHOLD, inactiveThreshold); + JsonUtils.addField(sb, MAX_ACK_PENDING, maxAckPending); + return endJson(sb).toString(); + } + + /** + * Creates a builder for a consumer limits object. + * @return the builder. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * ConsumerLimits can be created using a Builder. + */ + public static class Builder { + private Duration inactiveThreshold; + private Integer maxAckPending; + + /** + * sets the amount of time before the consumer is deemed inactive. + * @param inactiveThreshold the threshold duration + * @return Builder + */ + public Builder inactiveThreshold(Duration inactiveThreshold) { + this.inactiveThreshold = normalize(inactiveThreshold); + return this; + } + + /** + * sets the amount of time before the consumer is deemed inactive. + * @param inactiveThreshold the threshold duration in milliseconds + * @return Builder + */ + public Builder inactiveThreshold(long inactiveThreshold) { + this.inactiveThreshold = normalizeDuration(inactiveThreshold); + return this; + } + + /** + * Sets the maximum ack pending or null to unset / clear. + * @param maxAckPending maximum pending acknowledgements. + * @return Builder + */ + public Builder maxAckPending(Long maxAckPending) { + this.maxAckPending = normalize(maxAckPending, STANDARD_MIN); + return this; + } + + /** + * Sets the maximum ack pending. + * @param maxAckPending maximum pending acknowledgements. + * @return Builder + */ + public Builder maxAckPending(long maxAckPending) { + this.maxAckPending = normalize(maxAckPending, STANDARD_MIN); + return this; + } + + /** + * Build a ConsumerLimits object + * @return the ConsumerLimits + */ + public ConsumerLimits build() { + return new ConsumerLimits(this); + } + } +} diff --git a/src/main/java/io/nats/client/api/StreamConfiguration.java b/src/main/java/io/nats/client/api/StreamConfiguration.java index a29fe0c6a..933784abf 100644 --- a/src/main/java/io/nats/client/api/StreamConfiguration.java +++ b/src/main/java/io/nats/client/api/StreamConfiguration.java @@ -57,6 +57,7 @@ public class StreamConfiguration implements JsonSerializable { private final Placement placement; private final Republish republish; private final SubjectTransform subjectTransform; + private final ConsumerLimits consumerLimits; private final Mirror mirror; private final List sources; private final boolean sealed; @@ -91,6 +92,7 @@ static StreamConfiguration instance(JsonValue v) { builder.placement(Placement.optionalInstance(readValue(v, PLACEMENT))); builder.republish(Republish.optionalInstance(readValue(v, REPUBLISH))); builder.subjectTransform(SubjectTransform.optionalInstance(readValue(v, SUBJECT_TRANSFORM))); + builder.consumerLimits(ConsumerLimits.optionalInstance(readValue(v, CONSUMER_LIMITS))); builder.mirror(Mirror.optionalInstance(readValue(v, MIRROR))); builder.sources(Source.optionalListOf(readValue(v, SOURCES))); builder.sealed(readBoolean(v, SEALED)); @@ -127,6 +129,7 @@ static StreamConfiguration instance(JsonValue v) { this.placement = b.placement; this.republish = b.republish; this.subjectTransform = b.subjectTransform; + this.consumerLimits = b.consumerLimits; this.mirror = b.mirror; this.sources = b.sources; this.sealed = b.sealed; @@ -168,20 +171,12 @@ public String toJson() { addField(sb, TEMPLATE_OWNER, templateOwner); addField(sb, DISCARD, discardPolicy.toString()); addFieldAsNanos(sb, DUPLICATE_WINDOW, duplicateWindow); - if (placement != null) { - addField(sb, PLACEMENT, placement); - } - if (republish != null) { - addField(sb, REPUBLISH, republish); - } - if (subjectTransform != null) { - addField(sb, SUBJECT_TRANSFORM, subjectTransform); - } - if (mirror != null) { - addField(sb, MIRROR, mirror); - } + addField(sb, PLACEMENT, placement); + addField(sb, REPUBLISH, republish); + addField(sb, SUBJECT_TRANSFORM, subjectTransform); + addField(sb, CONSUMER_LIMITS, consumerLimits); + addField(sb, MIRROR, mirror); addJsons(sb, SOURCES, sources); - addFldWhenTrue(sb, SEALED, sealed); addFldWhenTrue(sb, ALLOW_ROLLUP_HDRS, allowRollup); addFldWhenTrue(sb, ALLOW_DIRECT, allowDirect); @@ -357,6 +352,14 @@ public SubjectTransform getSubjectTransform() { return subjectTransform; } + /** + * Get the consumerLimits configuration. May be null. + * @return the consumerLimits object + */ + public ConsumerLimits getConsumerLimits() { + return consumerLimits; + } + /** * The mirror definition for this stream * @return the mirror @@ -526,6 +529,7 @@ public static class Builder { private Placement placement = null; private Republish republish = null; private SubjectTransform subjectTransform = null; + private ConsumerLimits consumerLimits = null; private Mirror mirror = null; private final List sources = new ArrayList<>(); private boolean sealed = false; @@ -569,6 +573,7 @@ public Builder(StreamConfiguration sc) { this.placement = sc.placement; this.republish = sc.republish; this.subjectTransform = sc.subjectTransform; + this.consumerLimits = sc.consumerLimits; this.mirror = sc.mirror; sources(sc.sources); this.sealed = sc.sealed; @@ -847,6 +852,16 @@ public Builder subjectTransform(SubjectTransform subjectTransform) { return this; } + /** + * Sets the consumerLimits config object + * @param consumerLimits the consumerLimits config object + * @return Builder + */ + public Builder consumerLimits(ConsumerLimits consumerLimits) { + this.consumerLimits = consumerLimits; + return this; + } + /** * Sets the mirror object * @param mirror the mirror object diff --git a/src/main/java/io/nats/client/support/ApiConstants.java b/src/main/java/io/nats/client/support/ApiConstants.java index 697ba7d40..3fa834f80 100644 --- a/src/main/java/io/nats/client/support/ApiConstants.java +++ b/src/main/java/io/nats/client/support/ApiConstants.java @@ -40,6 +40,7 @@ public interface ApiConstants { String CONNECT_URLS = "connect_urls"; String CONSUMER_COUNT = "consumer_count"; String CONSUMER_SEQ = "consumer_seq"; + String CONSUMER_LIMITS = "consumer_limits"; String CONSUMERS = "consumers"; String CREATED = "created"; String CURRENT = "current"; diff --git a/src/test/java/io/nats/client/api/StreamConfigurationTests.java b/src/test/java/io/nats/client/api/StreamConfigurationTests.java index 676c7842a..96b0e3f03 100644 --- a/src/test/java/io/nats/client/api/StreamConfigurationTests.java +++ b/src/test/java/io/nats/client/api/StreamConfigurationTests.java @@ -28,6 +28,7 @@ import static io.nats.client.api.CompressionOption.None; import static io.nats.client.api.CompressionOption.S2; +import static io.nats.client.api.ConsumerConfiguration.*; import static org.junit.jupiter.api.Assertions.*; public class StreamConfigurationTests extends JetStreamTestBase { @@ -105,7 +106,8 @@ public void testConstruction() { .denyPurge(testSc.getDenyPurge()) .discardNewPerSubject(testSc.isDiscardNewPerSubject()) .metadata(metaData) - .firstSequence(82942); + .firstSequence(82942) + .consumerLimits(testSc.getConsumerLimits()); validate(builder.build(), false); validate(builder.addSources((Source)null).build(), false); @@ -461,6 +463,10 @@ private void validate(StreamConfiguration sc, boolean serverTest) { assertNotNull(sc.getSubjectTransform()); assertEquals("st.>", sc.getSubjectTransform().getSource()); assertEquals("stdest.>", sc.getSubjectTransform().getDestination()); + + assertNotNull(sc.getConsumerLimits()); + assertEquals(Duration.ofSeconds(50), sc.getConsumerLimits().getInactiveThreshold()); + assertEquals(42, sc.getConsumerLimits().getMaxAckPending()); } } @@ -520,6 +526,43 @@ public void testSubjectTransform() { assertNull(st.getDestination()); } + @Test + public void testConsumerLimits() { + ConsumerLimits cl = ConsumerLimits.builder().build(); + assertEquals(null, cl.getInactiveThreshold()); + assertEquals(INTEGER_UNSET, cl.getMaxAckPending()); + + cl = ConsumerLimits.builder().inactiveThreshold(Duration.ofMillis(0)).build(); + assertEquals(Duration.ZERO, cl.getInactiveThreshold()); + + cl = ConsumerLimits.builder().inactiveThreshold(0L).build(); + assertEquals(Duration.ZERO, cl.getInactiveThreshold()); + + cl = ConsumerLimits.builder().inactiveThreshold(Duration.ofMillis(1)).build(); + assertEquals(Duration.ofMillis(1), cl.getInactiveThreshold()); + + cl = ConsumerLimits.builder().inactiveThreshold(1L).build(); + assertEquals(Duration.ofMillis(1), cl.getInactiveThreshold()); + + cl = ConsumerLimits.builder().inactiveThreshold(Duration.ofMillis(-1)).build(); + assertEquals(DURATION_UNSET, cl.getInactiveThreshold()); + + cl = ConsumerLimits.builder().inactiveThreshold(-1).build(); + assertEquals(DURATION_UNSET, cl.getInactiveThreshold()); + + cl = ConsumerLimits.builder().maxAckPending(STANDARD_MIN).build(); + assertEquals(STANDARD_MIN, cl.getMaxAckPending()); + + cl = ConsumerLimits.builder().maxAckPending(INTEGER_UNSET).build(); + assertEquals(INTEGER_UNSET, cl.getMaxAckPending()); + + cl = ConsumerLimits.builder().maxAckPending(-2).build(); + assertEquals(INTEGER_UNSET, cl.getMaxAckPending()); + + cl = ConsumerLimits.builder().maxAckPending(Long.MAX_VALUE).build(); + assertEquals(Integer.MAX_VALUE, cl.getMaxAckPending()); + } + @Test public void testExternal() { External e = External.builder().api("api").deliver("deliver").build(); diff --git a/src/test/resources/data/StreamConfiguration.json b/src/test/resources/data/StreamConfiguration.json index 98d2b3826..eba567175 100644 --- a/src/test/resources/data/StreamConfiguration.json +++ b/src/test/resources/data/StreamConfiguration.json @@ -38,6 +38,10 @@ "src": "st.>", "dest": "stdest.>" }, + "consumer_limits": { + "inactive_threshold": 50000000000, + "max_ack_pending": 42 + }, "mirror": { "name": "eman", "opt_start_seq": 736,