From aaa65b229bd4c4e76b4b7f1cb9bb1092d1e32b72 Mon Sep 17 00:00:00 2001 From: scottf Date: Wed, 20 Sep 2023 12:46:49 -0400 Subject: [PATCH 1/2] Subject Transform Stream Configuration --- .../java/io/nats/client/api/Republish.java | 6 +- .../nats/client/api/StreamConfiguration.java | 30 ++++- .../io/nats/client/api/SubjectTransform.java | 124 ++++++++++++++++++ .../io/nats/client/support/ApiConstants.java | 1 + .../client/api/StreamConfigurationTests.java | 31 +++-- .../resources/data/StreamConfiguration.json | 6 +- 6 files changed, 180 insertions(+), 18 deletions(-) create mode 100644 src/main/java/io/nats/client/api/SubjectTransform.java diff --git a/src/main/java/io/nats/client/api/Republish.java b/src/main/java/io/nats/client/api/Republish.java index c1ae6e672..56697936d 100644 --- a/src/main/java/io/nats/client/api/Republish.java +++ b/src/main/java/io/nats/client/api/Republish.java @@ -23,7 +23,7 @@ import static io.nats.client.support.JsonValueUtils.readString; /** - * Republish directives to consider + * Republish Configuration */ public class Republish implements JsonSerializable { private final String source; @@ -47,6 +47,8 @@ static Republish optionalInstance(JsonValue vRepublish) { * @param headersOnly Whether to RePublish only headers (no body) */ public Republish(String source, String destination, boolean headersOnly) { + Validator.required(source, "Source"); + Validator.required(destination, "Destination"); this.source = source; this.destination = destination; this.headersOnly = headersOnly; @@ -134,8 +136,6 @@ public Builder headersOnly(Boolean headersOnly) { * @return the Placement */ public Republish build() { - Validator.required(source, "Source"); - Validator.required(destination, "Destination"); return new Republish(source, destination, headersOnly); } } diff --git a/src/main/java/io/nats/client/api/StreamConfiguration.java b/src/main/java/io/nats/client/api/StreamConfiguration.java index edf219e1a..a29fe0c6a 100644 --- a/src/main/java/io/nats/client/api/StreamConfiguration.java +++ b/src/main/java/io/nats/client/api/StreamConfiguration.java @@ -56,6 +56,7 @@ public class StreamConfiguration implements JsonSerializable { private final Duration duplicateWindow; private final Placement placement; private final Republish republish; + private final SubjectTransform subjectTransform; private final Mirror mirror; private final List sources; private final boolean sealed; @@ -89,6 +90,7 @@ static StreamConfiguration instance(JsonValue v) { builder.subjects(readStringList(v, SUBJECTS)); builder.placement(Placement.optionalInstance(readValue(v, PLACEMENT))); builder.republish(Republish.optionalInstance(readValue(v, REPUBLISH))); + builder.subjectTransform(SubjectTransform.optionalInstance(readValue(v, SUBJECT_TRANSFORM))); builder.mirror(Mirror.optionalInstance(readValue(v, MIRROR))); builder.sources(Source.optionalListOf(readValue(v, SOURCES))); builder.sealed(readBoolean(v, SEALED)); @@ -124,6 +126,7 @@ static StreamConfiguration instance(JsonValue v) { this.duplicateWindow = b.duplicateWindow; this.placement = b.placement; this.republish = b.republish; + this.subjectTransform = b.subjectTransform; this.mirror = b.mirror; this.sources = b.sources; this.sealed = b.sealed; @@ -171,6 +174,9 @@ public String toJson() { if (republish != null) { addField(sb, REPUBLISH, republish); } + if (subjectTransform != null) { + addField(sb, SUBJECT_TRANSFORM, subjectTransform); + } if (mirror != null) { addField(sb, MIRROR, mirror); } @@ -343,6 +349,14 @@ public Republish getRepublish() { return republish; } + /** + * Get the subjectTransform configuration. May be null. + * @return the subjectTransform object + */ + public SubjectTransform getSubjectTransform() { + return subjectTransform; + } + /** * The mirror definition for this stream * @return the mirror @@ -511,6 +525,7 @@ public static class Builder { private Duration duplicateWindow = Duration.ZERO; private Placement placement = null; private Republish republish = null; + private SubjectTransform subjectTransform = null; private Mirror mirror = null; private final List sources = new ArrayList<>(); private boolean sealed = false; @@ -553,6 +568,7 @@ public Builder(StreamConfiguration sc) { this.duplicateWindow = sc.duplicateWindow; this.placement = sc.placement; this.republish = sc.republish; + this.subjectTransform = sc.subjectTransform; this.mirror = sc.mirror; sources(sc.sources); this.sealed = sc.sealed; @@ -812,8 +828,8 @@ public Builder placement(Placement placement) { } /** - * Sets the republish directive object - * @param republish the republish directive object + * Sets the republish config object + * @param republish the republish config object * @return Builder */ public Builder republish(Republish republish) { @@ -821,6 +837,16 @@ public Builder republish(Republish republish) { return this; } + /** + * Sets the subjectTransform config object + * @param subjectTransform the subjectTransform config object + * @return Builder + */ + public Builder subjectTransform(SubjectTransform subjectTransform) { + this.subjectTransform = subjectTransform; + return this; + } + /** * Sets the mirror object * @param mirror the mirror object diff --git a/src/main/java/io/nats/client/api/SubjectTransform.java b/src/main/java/io/nats/client/api/SubjectTransform.java new file mode 100644 index 000000000..f93da22fd --- /dev/null +++ b/src/main/java/io/nats/client/api/SubjectTransform.java @@ -0,0 +1,124 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client.api; + +import io.nats.client.support.JsonSerializable; +import io.nats.client.support.JsonValue; +import io.nats.client.support.Validator; + +import static io.nats.client.support.ApiConstants.DEST; +import static io.nats.client.support.ApiConstants.SRC; +import static io.nats.client.support.JsonUtils.*; +import static io.nats.client.support.JsonValueUtils.readString; + +/** + * SubjectTransform + */ +public class SubjectTransform implements JsonSerializable { + private final String source; + private final String destination; + + static SubjectTransform optionalInstance(JsonValue vSubjectTransform) { + return vSubjectTransform == null ? null : new SubjectTransform(vSubjectTransform); + } + + SubjectTransform(JsonValue vSubjectTransform) { + source = readString(vSubjectTransform, SRC); + destination = readString(vSubjectTransform, DEST); + } + + /** + * Construct a 'SubjectTransform' object + * @param source the Published Subject-matching filter + * @param destination the SubjectTransform Subject template + */ + public SubjectTransform(String source, String destination) { + source = Validator.emptyAsNull(source); + destination = Validator.emptyAsNull(destination); + if (source == null) { + if (destination == null) { + throw new IllegalArgumentException("Source and/or destination is required."); + } + source = ">"; + } + this.source = source; + this.destination = destination; + } + + /** + * Get source, the Published Subject-matching filter + * @return the source + */ + public String getSource() { + return source; + } + + /** + * Get destination, the SubjectTransform Subject template + * @return the destination + */ + public String getDestination() { + return destination; + } + + public String toJson() { + StringBuilder sb = beginJson(); + addField(sb, SRC, source); + addField(sb, DEST, destination); + return endJson(sb).toString(); + } + + /** + * Creates a builder for a placements object. + * @return the builder. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Placement can be created using a Builder. + */ + public static class Builder { + private String source; + private String destination; + + /** + * Set the Published Subject-matching filter + * @param source the source + * @return the builder + */ + public Builder source(String source) { + this.source = source; + return this; + } + /** + * Set the SubjectTransform Subject template + * @param destination the destination + * @return the builder + */ + public Builder destination(String destination) { + this.destination = destination; + return this; + } + + /** + * Build a Placement object + * @return the Placement + */ + public SubjectTransform build() { + return new SubjectTransform(source, destination); + } + } +} diff --git a/src/main/java/io/nats/client/support/ApiConstants.java b/src/main/java/io/nats/client/support/ApiConstants.java index c489f5e46..697ba7d40 100644 --- a/src/main/java/io/nats/client/support/ApiConstants.java +++ b/src/main/java/io/nats/client/support/ApiConstants.java @@ -178,6 +178,7 @@ public interface ApiConstants { String STREAM = "stream"; String STREAMS = "streams"; String SUBJECT = "subject"; + String SUBJECT_TRANSFORM = "subject_transform"; String SUBJECTS = "subjects"; String SUBJECTS_FILTER = "subjects_filter"; String SUCCESS = "success"; diff --git a/src/test/java/io/nats/client/api/StreamConfigurationTests.java b/src/test/java/io/nats/client/api/StreamConfigurationTests.java index 0c227bc70..2ec9acff4 100644 --- a/src/test/java/io/nats/client/api/StreamConfigurationTests.java +++ b/src/test/java/io/nats/client/api/StreamConfigurationTests.java @@ -55,7 +55,7 @@ public void testRoundTrip() throws Exception { .compressionOption(compressionOption) .build(); JetStreamManagement jsm = nc.jetStreamManagement(); - validate(jsm.addStream(sc).getConfiguration(), true, compressionOption); + validate(jsm.addStream(sc).getConfiguration(), true); }); } @@ -63,13 +63,13 @@ public void testRoundTrip() throws Exception { public void testConstruction() { StreamConfiguration testSc = getTestConfiguration(); // from json - validate(testSc, false, S2); + validate(testSc, false); // test toJson - validate(StreamConfiguration.instance(JsonParser.parseUnchecked(testSc.toJson())), false, S2); + validate(StreamConfiguration.instance(JsonParser.parseUnchecked(testSc.toJson())), false); // copy constructor - validate(StreamConfiguration.builder(testSc).build(), false, S2); + validate(StreamConfiguration.builder(testSc).build(), false); Map metaData = new HashMap<>(); metaData.put("meta-foo", "meta-bar"); @@ -94,6 +94,7 @@ public void testConstruction() { .duplicateWindow(testSc.getDuplicateWindow()) .placement(testSc.getPlacement()) .republish(testSc.getRepublish()) + .subjectTransform(testSc.getSubjectTransform()) .mirror(testSc.getMirror()) .sources(testSc.getSources()) .sealed(testSc.getSealed()) @@ -105,14 +106,14 @@ public void testConstruction() { .discardNewPerSubject(testSc.isDiscardNewPerSubject()) .metadata(metaData) .firstSequence(82942); - validate(builder.build(), false, S2); - validate(builder.addSources((Source)null).build(), false, S2); + validate(builder.build(), false); + validate(builder.addSources((Source)null).build(), false); List sources = new ArrayList<>(testSc.getSources()); sources.add(null); Source copy = new Source(JsonParser.parseUnchecked(sources.get(0).toJson())); sources.add(copy); - validate(builder.addSources(sources).build(), false, S2); + validate(builder.addSources(sources).build(), false); // covering add a single source sources = new ArrayList<>(testSc.getSources()); @@ -122,7 +123,7 @@ public void testConstruction() { builder.addSource(source); } builder.addSource(sources.get(0)); - validate(builder.build(), false, S2); + validate(builder.build(), false); // equals and hashcode coverage External external = copy.getExternal(); @@ -386,15 +387,14 @@ public void testDiscardPolicy() { assertEquals(DiscardPolicy.Old, builder.build().getDiscardPolicy()); } - private void validate(StreamConfiguration sc, boolean serverTest, CompressionOption compressionOption) { + private void validate(StreamConfiguration sc, boolean serverTest) { assertEquals("sname", sc.getName()); assertEquals("blah blah", sc.getDescription()); - assertEquals(3, sc.getSubjects().size()); + assertEquals(4, sc.getSubjects().size()); assertEquals("foo", sc.getSubjects().get(0)); assertEquals("bar", sc.getSubjects().get(1)); assertEquals("repub.>", sc.getSubjects().get(2)); - - assertSame(compressionOption, sc.getCompressionOption()); + assertEquals("st.>", sc.getSubjects().get(3)); assertSame(RetentionPolicy.Interest, sc.getRetentionPolicy()); assertEquals(730, sc.getMaxConsumers()); @@ -455,6 +455,13 @@ private void validate(StreamConfiguration sc, boolean serverTest, CompressionOpt assertEquals(1, sc.getMetadata().size()); assertEquals("meta-bar", sc.getMetadata().get("meta-foo")); assertEquals(82942, sc.getFirstSequence()); + + assertSame(S2, sc.getCompressionOption()); + + assertNotNull(sc.getSubjectTransform()); + assertEquals("st.>", sc.getSubjectTransform().getSource()); + assertEquals("stdest.>", sc.getSubjectTransform().getDestination()); + assertTrue(sc.getRepublish().isHeadersOnly()); } } diff --git a/src/test/resources/data/StreamConfiguration.json b/src/test/resources/data/StreamConfiguration.json index bb704ed43..98d2b3826 100644 --- a/src/test/resources/data/StreamConfiguration.json +++ b/src/test/resources/data/StreamConfiguration.json @@ -1,7 +1,7 @@ { "name": "sname", "description": "blah blah", - "subjects": ["foo", "bar", "repub.>"], + "subjects": ["foo", "bar", "repub.>", "st.>"], "retention": "interest", "compression": "s2", "max_consumers": 730, @@ -34,6 +34,10 @@ "dest": "dest.>", "headers_only": true }, + "subject_transform": { + "src": "st.>", + "dest": "stdest.>" + }, "mirror": { "name": "eman", "opt_start_seq": 736, From 68e04379891b3ddcd5fa4cb882e1281a7e2cd837 Mon Sep 17 00:00:00 2001 From: scottf Date: Wed, 20 Sep 2023 15:08:04 -0400 Subject: [PATCH 2/2] Subject Transform Stream Configuration checking validation --- .../java/io/nats/client/api/Republish.java | 4 +-- .../io/nats/client/api/SubjectTransform.java | 15 ++-------- .../client/api/StreamConfigurationTests.java | 28 +++++++++++++------ 3 files changed, 24 insertions(+), 23 deletions(-) diff --git a/src/main/java/io/nats/client/api/Republish.java b/src/main/java/io/nats/client/api/Republish.java index 56697936d..dcf5832c4 100644 --- a/src/main/java/io/nats/client/api/Republish.java +++ b/src/main/java/io/nats/client/api/Republish.java @@ -42,7 +42,7 @@ static Republish optionalInstance(JsonValue vRepublish) { /** * Construct a 'republish' object - * @param source the Published Subject-matching filter + * @param source the Published subject matching filter * @param destination the RePublish Subject template * @param headersOnly Whether to RePublish only headers (no body) */ @@ -55,7 +55,7 @@ public Republish(String source, String destination, boolean headersOnly) { } /** - * Get source, the Published Subject-matching filter + * Get source, the Published subject matching filter * @return the source */ public String getSource() { diff --git a/src/main/java/io/nats/client/api/SubjectTransform.java b/src/main/java/io/nats/client/api/SubjectTransform.java index f93da22fd..c63e0ae52 100644 --- a/src/main/java/io/nats/client/api/SubjectTransform.java +++ b/src/main/java/io/nats/client/api/SubjectTransform.java @@ -1,4 +1,4 @@ -// Copyright 2022 The NATS Authors +// Copyright 2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at: @@ -15,7 +15,6 @@ import io.nats.client.support.JsonSerializable; import io.nats.client.support.JsonValue; -import io.nats.client.support.Validator; import static io.nats.client.support.ApiConstants.DEST; import static io.nats.client.support.ApiConstants.SRC; @@ -40,24 +39,16 @@ static SubjectTransform optionalInstance(JsonValue vSubjectTransform) { /** * Construct a 'SubjectTransform' object - * @param source the Published Subject-matching filter + * @param source the subject matching filter * @param destination the SubjectTransform Subject template */ public SubjectTransform(String source, String destination) { - source = Validator.emptyAsNull(source); - destination = Validator.emptyAsNull(destination); - if (source == null) { - if (destination == null) { - throw new IllegalArgumentException("Source and/or destination is required."); - } - source = ">"; - } this.source = source; this.destination = destination; } /** - * Get source, the Published Subject-matching filter + * Get source, the subject matching filter * @return the source */ public String getSource() { diff --git a/src/test/java/io/nats/client/api/StreamConfigurationTests.java b/src/test/java/io/nats/client/api/StreamConfigurationTests.java index 2ec9acff4..676c7842a 100644 --- a/src/test/java/io/nats/client/api/StreamConfigurationTests.java +++ b/src/test/java/io/nats/client/api/StreamConfigurationTests.java @@ -461,7 +461,6 @@ private void validate(StreamConfiguration sc, boolean serverTest) { assertNotNull(sc.getSubjectTransform()); assertEquals("st.>", sc.getSubjectTransform().getSource()); assertEquals("stdest.>", sc.getSubjectTransform().getDestination()); - assertTrue(sc.getRepublish().isHeadersOnly()); } } @@ -499,15 +498,26 @@ public void testRepublish() { assertThrows(IllegalArgumentException.class, () -> Republish.builder().source("src.>").build()); assertThrows(IllegalArgumentException.class, () -> Republish.builder().destination("dest.>").build()); - Republish p = Republish.builder().source("src.>").destination("dest.>").build(); - assertEquals("src.>", p.getSource()); - assertEquals("dest.>", p.getDestination()); - assertFalse(p.isHeadersOnly()); + Republish r = Republish.builder().source("src.>").destination("dest.>").build(); + assertEquals("src.>", r.getSource()); + assertEquals("dest.>", r.getDestination()); + assertFalse(r.isHeadersOnly()); - p = Republish.builder().source("src.>").destination("dest.>").headersOnly(true).build(); - assertEquals("src.>", p.getSource()); - assertEquals("dest.>", p.getDestination()); - assertTrue(p.isHeadersOnly()); + r = Republish.builder().source("src.>").destination("dest.>").headersOnly(true).build(); + assertEquals("src.>", r.getSource()); + assertEquals("dest.>", r.getDestination()); + assertTrue(r.isHeadersOnly()); + } + + @Test + public void testSubjectTransform() { + SubjectTransform st = SubjectTransform.builder().source("src.>").destination("dest.>").build(); + assertEquals("src.>", st.getSource()); + assertEquals("dest.>", st.getDestination()); + + st = SubjectTransform.builder().build(); + assertNull(st.getSource()); + assertNull(st.getDestination()); } @Test