From 6c4a13ae80cb0bd334f60967d3150dccbba72d2e Mon Sep 17 00:00:00 2001 From: Scott Fauerbach Date: Wed, 20 Sep 2023 10:17:11 -0400 Subject: [PATCH] Stream Configuration Compression Option (#976) --- .../io/nats/client/api/CompressionOption.java | 48 ++++++++++++++++++ .../nats/client/api/StreamConfiguration.java | 27 ++++++++++ .../io/nats/client/support/ApiConstants.java | 1 + .../client/api/StreamConfigurationTests.java | 49 ++++++++++++++----- .../resources/data/StreamConfiguration.json | 1 + 5 files changed, 115 insertions(+), 11 deletions(-) create mode 100644 src/main/java/io/nats/client/api/CompressionOption.java diff --git a/src/main/java/io/nats/client/api/CompressionOption.java b/src/main/java/io/nats/client/api/CompressionOption.java new file mode 100644 index 000000000..531c6950a --- /dev/null +++ b/src/main/java/io/nats/client/api/CompressionOption.java @@ -0,0 +1,48 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client.api; + +import java.util.HashMap; +import java.util.Map; + +/** + * Stream compression policies. + */ +public enum CompressionOption { + None("none"), + S2("s2"); + + private final String policy; + + CompressionOption(String p) { + policy = p; + } + + @Override + public String toString() { + return policy; + } + + private static final Map strEnumHash = new HashMap<>(); + + static { + for (CompressionOption env : CompressionOption.values()) { + strEnumHash.put(env.toString(), env); + } + } + + public static CompressionOption get(String value) { + return strEnumHash.get(value); + } +} diff --git a/src/main/java/io/nats/client/api/StreamConfiguration.java b/src/main/java/io/nats/client/api/StreamConfiguration.java index f25a861fe..edf219e1a 100644 --- a/src/main/java/io/nats/client/api/StreamConfiguration.java +++ b/src/main/java/io/nats/client/api/StreamConfiguration.java @@ -41,6 +41,7 @@ public class StreamConfiguration implements JsonSerializable { private final String description; private final List subjects; private final RetentionPolicy retentionPolicy; + private final CompressionOption compressionOption; private final long maxConsumers; private final long maxMsgs; private final long maxMsgsPerSubject; @@ -70,6 +71,7 @@ public class StreamConfiguration implements JsonSerializable { static StreamConfiguration instance(JsonValue v) { Builder builder = new Builder(); builder.retentionPolicy(RetentionPolicy.get(readString(v, RETENTION))); + builder.compressionOption(CompressionOption.get(readString(v, COMPRESSION))); builder.storageType(StorageType.get(readString(v, STORAGE))); builder.discardPolicy(DiscardPolicy.get(readString(v, DISCARD))); builder.name(readString(v, NAME)); @@ -107,6 +109,7 @@ static StreamConfiguration instance(JsonValue v) { this.description = b.description; this.subjects = b.subjects; this.retentionPolicy = b.retentionPolicy; + this.compressionOption = b.compressionOption; this.maxConsumers = b.maxConsumers; this.maxMsgs = b.maxMsgs; this.maxMsgsPerSubject = b.maxMsgsPerSubject; @@ -147,6 +150,9 @@ public String toJson() { JsonUtils.addField(sb, DESCRIPTION, description); addStrings(sb, SUBJECTS, subjects); addField(sb, RETENTION, retentionPolicy.toString()); + if (compressionOption != CompressionOption.None) { + addField(sb, COMPRESSION, compressionOption.toString()); + } addField(sb, MAX_CONSUMERS, maxConsumers); addField(sb, MAX_MSGS, maxMsgs); addField(sb, MAX_MSGS_PER_SUB, maxMsgsPerSubject); @@ -223,6 +229,14 @@ public RetentionPolicy getRetentionPolicy() { return retentionPolicy; } + /** + * Gets the compression option for this stream configuration. + * @return the compression option for this stream. + */ + public CompressionOption getCompressionOption() { + return compressionOption; + } + /** * Gets the maximum number of consumers for this stream configuration. * @return the maximum number of consumers for this stream. @@ -425,6 +439,7 @@ public String toString() { ", description='" + description + '\'' + ", subjects=" + subjects + ", retentionPolicy=" + retentionPolicy + + ", compressionOption=" + compressionOption + ", maxConsumers=" + maxConsumers + ", maxMsgs=" + maxMsgs + ", maxMsgsPerSubject=" + maxMsgsPerSubject + @@ -481,6 +496,7 @@ public static class Builder { private String description = null; private final List subjects = new ArrayList<>(); private RetentionPolicy retentionPolicy = RetentionPolicy.Limits; + private CompressionOption compressionOption = CompressionOption.None; private long maxConsumers = -1; private long maxMsgs = -1; private long maxMsgsPerSubject = -1; @@ -522,6 +538,7 @@ public Builder(StreamConfiguration sc) { this.description = sc.description; subjects(sc.subjects); this.retentionPolicy = sc.retentionPolicy; + this.compressionOption = sc.compressionOption; this.maxConsumers = sc.maxConsumers; this.maxMsgs = sc.maxMsgs; this.maxMsgsPerSubject = sc.maxMsgsPerSubject; @@ -630,6 +647,16 @@ public Builder retentionPolicy(RetentionPolicy policy) { return this; } + /** + * Sets the compression option in the StreamConfiguration. + * @param compressionOption the compression option of the StreamConfiguration + * @return Builder + */ + public Builder compressionOption(CompressionOption compressionOption) { + this.compressionOption = compressionOption == null ? CompressionOption.None : compressionOption; + return this; + } + /** * Sets the maximum number of consumers in the StreamConfiguration. * @param maxConsumers the maximum number of consumers diff --git a/src/main/java/io/nats/client/support/ApiConstants.java b/src/main/java/io/nats/client/support/ApiConstants.java index 2c5821f4c..c489f5e46 100644 --- a/src/main/java/io/nats/client/support/ApiConstants.java +++ b/src/main/java/io/nats/client/support/ApiConstants.java @@ -35,6 +35,7 @@ public interface ApiConstants { String CLIENT_IP = "client_ip"; String CLUSTER = "cluster"; String CODE = "code"; + String COMPRESSION = "compression"; String CONFIG = "config"; String CONNECT_URLS = "connect_urls"; String CONSUMER_COUNT = "consumer_count"; diff --git a/src/test/java/io/nats/client/api/StreamConfigurationTests.java b/src/test/java/io/nats/client/api/StreamConfigurationTests.java index 5a2c55e7c..0c227bc70 100644 --- a/src/test/java/io/nats/client/api/StreamConfigurationTests.java +++ b/src/test/java/io/nats/client/api/StreamConfigurationTests.java @@ -26,6 +26,8 @@ import java.time.ZonedDateTime; import java.util.*; +import static io.nats.client.api.CompressionOption.None; +import static io.nats.client.api.CompressionOption.S2; import static org.junit.jupiter.api.Assertions.*; public class StreamConfigurationTests extends JetStreamTestBase { @@ -40,6 +42,7 @@ private StreamConfiguration getTestConfiguration() { @Test public void testRoundTrip() throws Exception { runInJsServer(si -> si.isNewerVersionThan("2.8.4"), nc -> { + CompressionOption compressionOption = nc.getServerInfo().isOlderThanVersion("2.10.0") ? None : S2; StreamConfiguration sc = StreamConfiguration.builder(getTestConfiguration()) .mirror(null) .sources() @@ -49,9 +52,10 @@ public void testRoundTrip() throws Exception { .allowDirect(false) .mirrorDirect(false) .sealed(false) + .compressionOption(compressionOption) .build(); JetStreamManagement jsm = nc.jetStreamManagement(); - validate(jsm.addStream(sc).getConfiguration(), true); + validate(jsm.addStream(sc).getConfiguration(), true, compressionOption); }); } @@ -59,13 +63,13 @@ public void testRoundTrip() throws Exception { public void testConstruction() { StreamConfiguration testSc = getTestConfiguration(); // from json - validate(testSc, false); + validate(testSc, false, S2); // test toJson - validate(StreamConfiguration.instance(JsonParser.parseUnchecked(testSc.toJson())), false); + validate(StreamConfiguration.instance(JsonParser.parseUnchecked(testSc.toJson())), false, S2); // copy constructor - validate(StreamConfiguration.builder(testSc).build(), false); + validate(StreamConfiguration.builder(testSc).build(), false, S2); Map metaData = new HashMap<>(); metaData.put("meta-foo", "meta-bar"); @@ -75,6 +79,7 @@ public void testConstruction() { .description(testSc.getDescription()) .subjects(testSc.getSubjects()) .retentionPolicy(testSc.getRetentionPolicy()) + .compressionOption(testSc.getCompressionOption()) .maxConsumers(testSc.getMaxConsumers()) .maxMessages(testSc.getMaxMsgs()) .maxMessagesPerSubject(testSc.getMaxMsgsPerSubject()) @@ -100,14 +105,14 @@ public void testConstruction() { .discardNewPerSubject(testSc.isDiscardNewPerSubject()) .metadata(metaData) .firstSequence(82942); - validate(builder.build(), false); - validate(builder.addSources((Source)null).build(), false); + validate(builder.build(), false, S2); + validate(builder.addSources((Source)null).build(), false, S2); List sources = new ArrayList<>(testSc.getSources()); sources.add(null); Source copy = new Source(JsonParser.parseUnchecked(sources.get(0).toJson())); sources.add(copy); - validate(builder.addSources(sources).build(), false); + validate(builder.addSources(sources).build(), false, S2); // covering add a single source sources = new ArrayList<>(testSc.getSources()); @@ -117,7 +122,7 @@ public void testConstruction() { builder.addSource(source); } builder.addSource(sources.get(0)); - validate(builder.build(), false); + validate(builder.build(), false, S2); // equals and hashcode coverage External external = copy.getExternal(); @@ -327,14 +332,34 @@ public void testRetentionPolicy() { StreamConfiguration.Builder builder = StreamConfiguration.builder(); assertEquals(RetentionPolicy.Limits, builder.build().getRetentionPolicy()); + builder.retentionPolicy(RetentionPolicy.Limits); + assertEquals(RetentionPolicy.Limits, builder.build().getRetentionPolicy()); + + builder.retentionPolicy(null); + assertEquals(RetentionPolicy.Limits, builder.build().getRetentionPolicy()); + builder.retentionPolicy(RetentionPolicy.Interest); assertEquals(RetentionPolicy.Interest, builder.build().getRetentionPolicy()); builder.retentionPolicy(RetentionPolicy.WorkQueue); assertEquals(RetentionPolicy.WorkQueue, builder.build().getRetentionPolicy()); + } - builder.retentionPolicy(null); - assertEquals(RetentionPolicy.Limits, builder.build().getRetentionPolicy()); + @Test + public void testCompressionOption() { + StreamConfiguration.Builder builder = StreamConfiguration.builder(); + assertEquals(None, builder.build().getCompressionOption()); + + builder.compressionOption(None); + assertEquals(None, builder.build().getCompressionOption()); + + builder.compressionOption(null); + assertEquals(None, builder.build().getCompressionOption()); + assertFalse(builder.build().toJson().contains("\"compression\"")); + + builder.compressionOption(S2); + assertEquals(S2, builder.build().getCompressionOption()); + assertTrue(builder.build().toJson().contains("\"compression\":\"s2\"")); } @Test @@ -361,7 +386,7 @@ public void testDiscardPolicy() { assertEquals(DiscardPolicy.Old, builder.build().getDiscardPolicy()); } - private void validate(StreamConfiguration sc, boolean serverTest) { + private void validate(StreamConfiguration sc, boolean serverTest, CompressionOption compressionOption) { assertEquals("sname", sc.getName()); assertEquals("blah blah", sc.getDescription()); assertEquals(3, sc.getSubjects().size()); @@ -369,6 +394,8 @@ private void validate(StreamConfiguration sc, boolean serverTest) { assertEquals("bar", sc.getSubjects().get(1)); assertEquals("repub.>", sc.getSubjects().get(2)); + assertSame(compressionOption, sc.getCompressionOption()); + assertSame(RetentionPolicy.Interest, sc.getRetentionPolicy()); assertEquals(730, sc.getMaxConsumers()); assertEquals(731, sc.getMaxMsgs()); diff --git a/src/test/resources/data/StreamConfiguration.json b/src/test/resources/data/StreamConfiguration.json index c034e6da4..bb704ed43 100644 --- a/src/test/resources/data/StreamConfiguration.json +++ b/src/test/resources/data/StreamConfiguration.json @@ -3,6 +3,7 @@ "description": "blah blah", "subjects": ["foo", "bar", "repub.>"], "retention": "interest", + "compression": "s2", "max_consumers": 730, "max_msgs": 731, "max_msgs_per_subject": 741,