Skip to content

Commit

Permalink
Stream Configuration Compression Option (#976)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Sep 20, 2023
1 parent c9e6762 commit 6c4a13a
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 11 deletions.
48 changes: 48 additions & 0 deletions src/main/java/io/nats/client/api/CompressionOption.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client.api;

import java.util.HashMap;
import java.util.Map;

/**
* Stream compression policies.
*/
public enum CompressionOption {
None("none"),
S2("s2");

private final String policy;

CompressionOption(String p) {
policy = p;
}

@Override
public String toString() {
return policy;
}

private static final Map<String, CompressionOption> strEnumHash = new HashMap<>();

static {
for (CompressionOption env : CompressionOption.values()) {
strEnumHash.put(env.toString(), env);
}
}

public static CompressionOption get(String value) {
return strEnumHash.get(value);
}
}
27 changes: 27 additions & 0 deletions src/main/java/io/nats/client/api/StreamConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class StreamConfiguration implements JsonSerializable {
private final String description;
private final List<String> subjects;
private final RetentionPolicy retentionPolicy;
private final CompressionOption compressionOption;
private final long maxConsumers;
private final long maxMsgs;
private final long maxMsgsPerSubject;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class StreamConfiguration implements JsonSerializable {
static StreamConfiguration instance(JsonValue v) {
Builder builder = new Builder();
builder.retentionPolicy(RetentionPolicy.get(readString(v, RETENTION)));
builder.compressionOption(CompressionOption.get(readString(v, COMPRESSION)));
builder.storageType(StorageType.get(readString(v, STORAGE)));
builder.discardPolicy(DiscardPolicy.get(readString(v, DISCARD)));
builder.name(readString(v, NAME));
Expand Down Expand Up @@ -107,6 +109,7 @@ static StreamConfiguration instance(JsonValue v) {
this.description = b.description;
this.subjects = b.subjects;
this.retentionPolicy = b.retentionPolicy;
this.compressionOption = b.compressionOption;
this.maxConsumers = b.maxConsumers;
this.maxMsgs = b.maxMsgs;
this.maxMsgsPerSubject = b.maxMsgsPerSubject;
Expand Down Expand Up @@ -147,6 +150,9 @@ public String toJson() {
JsonUtils.addField(sb, DESCRIPTION, description);
addStrings(sb, SUBJECTS, subjects);
addField(sb, RETENTION, retentionPolicy.toString());
if (compressionOption != CompressionOption.None) {
addField(sb, COMPRESSION, compressionOption.toString());
}
addField(sb, MAX_CONSUMERS, maxConsumers);
addField(sb, MAX_MSGS, maxMsgs);
addField(sb, MAX_MSGS_PER_SUB, maxMsgsPerSubject);
Expand Down Expand Up @@ -223,6 +229,14 @@ public RetentionPolicy getRetentionPolicy() {
return retentionPolicy;
}

/**
* Gets the compression option for this stream configuration.
* @return the compression option for this stream.
*/
public CompressionOption getCompressionOption() {
return compressionOption;
}

/**
* Gets the maximum number of consumers for this stream configuration.
* @return the maximum number of consumers for this stream.
Expand Down Expand Up @@ -425,6 +439,7 @@ public String toString() {
", description='" + description + '\'' +
", subjects=" + subjects +
", retentionPolicy=" + retentionPolicy +
", compressionOption=" + compressionOption +
", maxConsumers=" + maxConsumers +
", maxMsgs=" + maxMsgs +
", maxMsgsPerSubject=" + maxMsgsPerSubject +
Expand Down Expand Up @@ -481,6 +496,7 @@ public static class Builder {
private String description = null;
private final List<String> subjects = new ArrayList<>();
private RetentionPolicy retentionPolicy = RetentionPolicy.Limits;
private CompressionOption compressionOption = CompressionOption.None;
private long maxConsumers = -1;
private long maxMsgs = -1;
private long maxMsgsPerSubject = -1;
Expand Down Expand Up @@ -522,6 +538,7 @@ public Builder(StreamConfiguration sc) {
this.description = sc.description;
subjects(sc.subjects);
this.retentionPolicy = sc.retentionPolicy;
this.compressionOption = sc.compressionOption;
this.maxConsumers = sc.maxConsumers;
this.maxMsgs = sc.maxMsgs;
this.maxMsgsPerSubject = sc.maxMsgsPerSubject;
Expand Down Expand Up @@ -630,6 +647,16 @@ public Builder retentionPolicy(RetentionPolicy policy) {
return this;
}

/**
* Sets the compression option in the StreamConfiguration.
* @param compressionOption the compression option of the StreamConfiguration
* @return Builder
*/
public Builder compressionOption(CompressionOption compressionOption) {
this.compressionOption = compressionOption == null ? CompressionOption.None : compressionOption;
return this;
}

/**
* Sets the maximum number of consumers in the StreamConfiguration.
* @param maxConsumers the maximum number of consumers
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/support/ApiConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public interface ApiConstants {
String CLIENT_IP = "client_ip";
String CLUSTER = "cluster";
String CODE = "code";
String COMPRESSION = "compression";
String CONFIG = "config";
String CONNECT_URLS = "connect_urls";
String CONSUMER_COUNT = "consumer_count";
Expand Down
49 changes: 38 additions & 11 deletions src/test/java/io/nats/client/api/StreamConfigurationTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.time.ZonedDateTime;
import java.util.*;

import static io.nats.client.api.CompressionOption.None;
import static io.nats.client.api.CompressionOption.S2;
import static org.junit.jupiter.api.Assertions.*;

public class StreamConfigurationTests extends JetStreamTestBase {
Expand All @@ -40,6 +42,7 @@ private StreamConfiguration getTestConfiguration() {
@Test
public void testRoundTrip() throws Exception {
runInJsServer(si -> si.isNewerVersionThan("2.8.4"), nc -> {
CompressionOption compressionOption = nc.getServerInfo().isOlderThanVersion("2.10.0") ? None : S2;
StreamConfiguration sc = StreamConfiguration.builder(getTestConfiguration())
.mirror(null)
.sources()
Expand All @@ -49,23 +52,24 @@ public void testRoundTrip() throws Exception {
.allowDirect(false)
.mirrorDirect(false)
.sealed(false)
.compressionOption(compressionOption)
.build();
JetStreamManagement jsm = nc.jetStreamManagement();
validate(jsm.addStream(sc).getConfiguration(), true);
validate(jsm.addStream(sc).getConfiguration(), true, compressionOption);
});
}

@Test
public void testConstruction() {
StreamConfiguration testSc = getTestConfiguration();
// from json
validate(testSc, false);
validate(testSc, false, S2);

// test toJson
validate(StreamConfiguration.instance(JsonParser.parseUnchecked(testSc.toJson())), false);
validate(StreamConfiguration.instance(JsonParser.parseUnchecked(testSc.toJson())), false, S2);

// copy constructor
validate(StreamConfiguration.builder(testSc).build(), false);
validate(StreamConfiguration.builder(testSc).build(), false, S2);

Map<String, String> metaData = new HashMap<>(); metaData.put("meta-foo", "meta-bar");

Expand All @@ -75,6 +79,7 @@ public void testConstruction() {
.description(testSc.getDescription())
.subjects(testSc.getSubjects())
.retentionPolicy(testSc.getRetentionPolicy())
.compressionOption(testSc.getCompressionOption())
.maxConsumers(testSc.getMaxConsumers())
.maxMessages(testSc.getMaxMsgs())
.maxMessagesPerSubject(testSc.getMaxMsgsPerSubject())
Expand All @@ -100,14 +105,14 @@ public void testConstruction() {
.discardNewPerSubject(testSc.isDiscardNewPerSubject())
.metadata(metaData)
.firstSequence(82942);
validate(builder.build(), false);
validate(builder.addSources((Source)null).build(), false);
validate(builder.build(), false, S2);
validate(builder.addSources((Source)null).build(), false, S2);

List<Source> sources = new ArrayList<>(testSc.getSources());
sources.add(null);
Source copy = new Source(JsonParser.parseUnchecked(sources.get(0).toJson()));
sources.add(copy);
validate(builder.addSources(sources).build(), false);
validate(builder.addSources(sources).build(), false, S2);

// covering add a single source
sources = new ArrayList<>(testSc.getSources());
Expand All @@ -117,7 +122,7 @@ public void testConstruction() {
builder.addSource(source);
}
builder.addSource(sources.get(0));
validate(builder.build(), false);
validate(builder.build(), false, S2);

// equals and hashcode coverage
External external = copy.getExternal();
Expand Down Expand Up @@ -327,14 +332,34 @@ public void testRetentionPolicy() {
StreamConfiguration.Builder builder = StreamConfiguration.builder();
assertEquals(RetentionPolicy.Limits, builder.build().getRetentionPolicy());

builder.retentionPolicy(RetentionPolicy.Limits);
assertEquals(RetentionPolicy.Limits, builder.build().getRetentionPolicy());

builder.retentionPolicy(null);
assertEquals(RetentionPolicy.Limits, builder.build().getRetentionPolicy());

builder.retentionPolicy(RetentionPolicy.Interest);
assertEquals(RetentionPolicy.Interest, builder.build().getRetentionPolicy());

builder.retentionPolicy(RetentionPolicy.WorkQueue);
assertEquals(RetentionPolicy.WorkQueue, builder.build().getRetentionPolicy());
}

builder.retentionPolicy(null);
assertEquals(RetentionPolicy.Limits, builder.build().getRetentionPolicy());
@Test
public void testCompressionOption() {
StreamConfiguration.Builder builder = StreamConfiguration.builder();
assertEquals(None, builder.build().getCompressionOption());

builder.compressionOption(None);
assertEquals(None, builder.build().getCompressionOption());

builder.compressionOption(null);
assertEquals(None, builder.build().getCompressionOption());
assertFalse(builder.build().toJson().contains("\"compression\""));

builder.compressionOption(S2);
assertEquals(S2, builder.build().getCompressionOption());
assertTrue(builder.build().toJson().contains("\"compression\":\"s2\""));
}

@Test
Expand All @@ -361,14 +386,16 @@ public void testDiscardPolicy() {
assertEquals(DiscardPolicy.Old, builder.build().getDiscardPolicy());
}

private void validate(StreamConfiguration sc, boolean serverTest) {
private void validate(StreamConfiguration sc, boolean serverTest, CompressionOption compressionOption) {
assertEquals("sname", sc.getName());
assertEquals("blah blah", sc.getDescription());
assertEquals(3, sc.getSubjects().size());
assertEquals("foo", sc.getSubjects().get(0));
assertEquals("bar", sc.getSubjects().get(1));
assertEquals("repub.>", sc.getSubjects().get(2));

assertSame(compressionOption, sc.getCompressionOption());

assertSame(RetentionPolicy.Interest, sc.getRetentionPolicy());
assertEquals(730, sc.getMaxConsumers());
assertEquals(731, sc.getMaxMsgs());
Expand Down
1 change: 1 addition & 0 deletions src/test/resources/data/StreamConfiguration.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"description": "blah blah",
"subjects": ["foo", "bar", "repub.>"],
"retention": "interest",
"compression": "s2",
"max_consumers": 730,
"max_msgs": 731,
"max_msgs_per_subject": 741,
Expand Down

0 comments on commit 6c4a13a

Please sign in to comment.