From 15cb79d13c805c9cc7c436c89222a7551c326d74 Mon Sep 17 00:00:00 2001 From: Scott Fauerbach Date: Mon, 4 Sep 2023 10:41:15 -0400 Subject: [PATCH] stream/consumer info timestamps, stream configuration (#962) --- .../java/io/nats/client/api/ConsumerInfo.java | 15 ++++++ .../nats/client/api/StreamConfiguration.java | 26 ++++++++- .../java/io/nats/client/api/StreamInfo.java | 28 +++++++--- .../java/io/nats/client/api/StreamState.java | 3 +- .../io/nats/client/support/ApiConstants.java | 1 + .../io/nats/client/support/JsonUtils.java | 14 +++++ .../io/nats/client/api/ConsumerInfoTests.java | 11 +--- .../client/api/StreamConfigurationTests.java | 4 +- .../io/nats/client/api/StreamInfoTests.java | 6 +-- .../client/impl/JetStreamManagementTests.java | 54 +++++++++++++++---- .../nats/client/support/JsonUtilsTests.java | 9 ++++ src/test/resources/data/ConsumerInfo.json | 1 + .../resources/data/StreamConfiguration.json | 1 + src/test/resources/data/StreamInfo.json | 4 +- 14 files changed, 142 insertions(+), 35 deletions(-) diff --git a/src/main/java/io/nats/client/api/ConsumerInfo.java b/src/main/java/io/nats/client/api/ConsumerInfo.java index 4838fcfc5..1a1276f69 100644 --- a/src/main/java/io/nats/client/api/ConsumerInfo.java +++ b/src/main/java/io/nats/client/api/ConsumerInfo.java @@ -38,6 +38,7 @@ public class ConsumerInfo extends ApiResponse { private final long numRedelivered; private final ClusterInfo clusterInfo; private final boolean pushBound; + private final ZonedDateTime timestamp; public ConsumerInfo(Message msg) { this(parseMessage(msg)); @@ -61,6 +62,8 @@ public ConsumerInfo(JsonValue vConsumerInfo) { clusterInfo = ClusterInfo.optionalInstance(readValue(jv, CLUSTER)); pushBound = readBoolean(jv, PUSH_BOUND); + + timestamp = readDate(jv, TIMESTAMP); } public ConsumerConfiguration getConsumerConfiguration() { @@ -75,6 +78,10 @@ public String getStreamName() { return stream; } + /** + * Gets the creation time of the consumer. + * @return the creation date and time. + */ public ZonedDateTime getCreationTime() { return created; } @@ -111,6 +118,14 @@ public boolean isPushBound() { return pushBound; } + /** + * Gets the server time the info was gathered + * @return the server gathered timed + */ + public ZonedDateTime getTimestamp() { + return timestamp; + } + public long getCalculatedPending() { return numPending + delivered.getConsumerSequence(); } diff --git a/src/main/java/io/nats/client/api/StreamConfiguration.java b/src/main/java/io/nats/client/api/StreamConfiguration.java index b060eb058..f25a861fe 100644 --- a/src/main/java/io/nats/client/api/StreamConfiguration.java +++ b/src/main/java/io/nats/client/api/StreamConfiguration.java @@ -65,6 +65,7 @@ public class StreamConfiguration implements JsonSerializable { private final boolean denyPurge; private final boolean discardNewPerSubject; private final Map metadata; + private final long firstSequence; static StreamConfiguration instance(JsonValue v) { Builder builder = new Builder(); @@ -96,7 +97,7 @@ static StreamConfiguration instance(JsonValue v) { builder.denyPurge(readBoolean(v, DENY_PURGE)); builder.discardNewPerSubject(readBoolean(v, DISCARD_NEW_PER_SUBJECT)); builder.metadata(readStringStringMap(v, METADATA)); - + builder.firstSequence(readLong(v, FIRST_SEQ, 1)); return builder.build(); } @@ -130,6 +131,7 @@ static StreamConfiguration instance(JsonValue v) { this.denyPurge = b.denyPurge; this.discardNewPerSubject = b.discardNewPerSubject; this.metadata = b.metadata; + this.firstSequence = b.firstSequence; } /** @@ -176,6 +178,7 @@ public String toJson() { addFldWhenTrue(sb, DENY_PURGE, denyPurge); addFldWhenTrue(sb, DISCARD_NEW_PER_SUBJECT, discardNewPerSubject); addField(sb, METADATA, metadata); + addFieldWhenGreaterThan(sb, FIRST_SEQ, firstSequence, 1); return endJson(sb).toString(); } @@ -407,6 +410,14 @@ public Map getMetadata() { return metadata; } + /** + * The first sequence used in the stream. + * @return the first sequence + */ + public long getFirstSequence() { + return firstSequence; + } + @Override public String toString() { return "StreamConfiguration{" + @@ -432,6 +443,7 @@ public String toString() { ", denyDelete=" + denyDelete + ", denyPurge=" + denyPurge + ", discardNewPerSubject=" + discardNewPerSubject + + ", firstSequence=" + firstSequence + ", " + mirror + ", " + placement + ", sources=" + sources + @@ -493,6 +505,7 @@ public static class Builder { private boolean denyPurge = false; private boolean discardNewPerSubject = false; private Map metadata; + private long firstSequence = 1; /** * Default Builder @@ -535,6 +548,7 @@ public Builder(StreamConfiguration sc) { if (sc.metadata != null) { this.metadata = new HashMap<>(sc.metadata); } + this.firstSequence = sc.firstSequence; } } @@ -937,6 +951,16 @@ public Builder metadata(Map metadata) { return this; } + /** + * Sets the first sequence to be used. 1 is the default. All values less than 2 are treated as 1. + * @param firstSeq specify the first_seq in the stream config when creating the stream. + * @return Builder + */ + public Builder firstSequence(long firstSeq) { + this.firstSequence = firstSeq > 1 ? firstSeq : 1; + return this; + } + /** * Builds the StreamConfiguration * @return a stream configuration. diff --git a/src/main/java/io/nats/client/api/StreamInfo.java b/src/main/java/io/nats/client/api/StreamInfo.java index 58d81b4fd..56ecec54a 100644 --- a/src/main/java/io/nats/client/api/StreamInfo.java +++ b/src/main/java/io/nats/client/api/StreamInfo.java @@ -29,12 +29,13 @@ */ public class StreamInfo extends ApiResponse { - private final ZonedDateTime created; + private final ZonedDateTime createTime; private final StreamConfiguration config; - private final StreamState state; + private final StreamState streamState; private final ClusterInfo clusterInfo; private final MirrorInfo mirrorInfo; private final List sourceInfos; + private final ZonedDateTime timestamp; public StreamInfo(Message msg) { this(parseUnchecked(msg.getData())); @@ -42,14 +43,15 @@ public StreamInfo(Message msg) { public StreamInfo(JsonValue vStreamInfo) { super(vStreamInfo); - created = readDate(jv, CREATED); + createTime = readDate(jv, CREATED); config = StreamConfiguration.instance(readValue(jv, CONFIG)); - state = new StreamState(readValue(jv, STATE)); + streamState = new StreamState(readValue(jv, STATE)); clusterInfo = ClusterInfo.optionalInstance(readValue(jv, CLUSTER)); mirrorInfo = MirrorInfo.optionalInstance(readValue(jv, MIRROR)); sourceInfos = SourceInfo.optionalListOf(readValue(jv, SOURCES)); + timestamp = readDate(jv, TIMESTAMP); } - + /** * Gets the stream configuration. * @return the stream configuration. @@ -63,7 +65,7 @@ public StreamConfiguration getConfiguration() { * @return the stream state */ public StreamState getStreamState() { - return state; + return streamState; } /** @@ -71,7 +73,7 @@ public StreamState getStreamState() { * @return the creation date and time. */ public ZonedDateTime getCreateTime() { - return created; + return createTime; } public MirrorInfo getMirrorInfo() { @@ -85,4 +87,16 @@ public List getSourceInfos() { public ClusterInfo getClusterInfo() { return clusterInfo; } + + public StreamConfiguration getConfig() { + return config; + } + + /** + * Gets the server time the info was gathered + * @return the server gathered timed + */ + public ZonedDateTime getTimestamp() { + return timestamp; + } } diff --git a/src/main/java/io/nats/client/api/StreamState.java b/src/main/java/io/nats/client/api/StreamState.java index be1230dfb..37027e549 100644 --- a/src/main/java/io/nats/client/api/StreamState.java +++ b/src/main/java/io/nats/client/api/StreamState.java @@ -69,8 +69,7 @@ public long getByteCount() { } /** - * Gets the first sequence number of the stream. - * + * Gets the first sequence number of the stream. May be 0 if there are no messages. * @return a sequence number */ public long getFirstSequence() { diff --git a/src/main/java/io/nats/client/support/ApiConstants.java b/src/main/java/io/nats/client/support/ApiConstants.java index b8c10db3b..03dffe1e8 100644 --- a/src/main/java/io/nats/client/support/ApiConstants.java +++ b/src/main/java/io/nats/client/support/ApiConstants.java @@ -183,6 +183,7 @@ public interface ApiConstants { String TEMPLATE_OWNER = "template_owner"; String TIERS = "tiers"; String TIME = "time"; + String TIMESTAMP = "ts"; String TLS = "tls_required"; String TOTAL = "total"; String TYPE = "type"; diff --git a/src/main/java/io/nats/client/support/JsonUtils.java b/src/main/java/io/nats/client/support/JsonUtils.java index ff37ce44d..a1275a0b2 100644 --- a/src/main/java/io/nats/client/support/JsonUtils.java +++ b/src/main/java/io/nats/client/support/JsonUtils.java @@ -247,6 +247,20 @@ public static void addFieldWhenGteMinusOne(StringBuilder sb, String fname, Long } } + /** + * Appends a json field to a string builder. + * @param sb string builder + * @param fname fieldname + * @param value field value + */ + public static void addFieldWhenGreaterThan(StringBuilder sb, String fname, Long value, long gt) { + if (value != null && value > gt) { + sb.append(Q); + jsonEncode(sb, fname); + sb.append(QCOLON).append(value).append(COMMA); + } + } + /** * Appends a json field to a string builder. * @param sb string builder diff --git a/src/test/java/io/nats/client/api/ConsumerInfoTests.java b/src/test/java/io/nats/client/api/ConsumerInfoTests.java index 289318cd0..76a9f27b8 100644 --- a/src/test/java/io/nats/client/api/ConsumerInfoTests.java +++ b/src/test/java/io/nats/client/api/ConsumerInfoTests.java @@ -28,20 +28,13 @@ public class ConsumerInfoTests { static JsonValue vConsumerInfo = JsonParser.parseUnchecked(dataAsString("ConsumerInfo.json")); - @Test - public void testTime() { - long start = System.currentTimeMillis(); - for (int x = 0; x < 1_000_000; x++) { - new ConsumerInfo(vConsumerInfo); - } - System.out.println(System.currentTimeMillis() - start); - } - @Test public void testConsumerInfo() { ConsumerInfo ci = new ConsumerInfo(vConsumerInfo); assertEquals("foo-stream", ci.getStreamName()); assertEquals("foo-consumer", ci.getName()); + assertEquals(DateTimeUtils.parseDateTime("2020-11-05T19:33:21.163377Z"), ci.getCreationTime()); + assertEquals(DateTimeUtils.parseDateTime("2023-08-29T19:33:21.163377Z"), ci.getTimestamp()); SequencePair sp = ci.getDelivered(); assertEquals(1, sp.getConsumerSequence()); diff --git a/src/test/java/io/nats/client/api/StreamConfigurationTests.java b/src/test/java/io/nats/client/api/StreamConfigurationTests.java index 218c02bca..5a2c55e7c 100644 --- a/src/test/java/io/nats/client/api/StreamConfigurationTests.java +++ b/src/test/java/io/nats/client/api/StreamConfigurationTests.java @@ -98,7 +98,8 @@ public void testConstruction() { .denyDelete(testSc.getDenyDelete()) .denyPurge(testSc.getDenyPurge()) .discardNewPerSubject(testSc.isDiscardNewPerSubject()) - .metadata(metaData); + .metadata(metaData) + .firstSequence(82942); validate(builder.build(), false); validate(builder.addSources((Source)null).build(), false); @@ -426,6 +427,7 @@ private void validate(StreamConfiguration sc, boolean serverTest) { assertEquals(1, sc.getMetadata().size()); assertEquals("meta-bar", sc.getMetadata().get("meta-foo")); + assertEquals(82942, sc.getFirstSequence()); } } diff --git a/src/test/java/io/nats/client/api/StreamInfoTests.java b/src/test/java/io/nats/client/api/StreamInfoTests.java index 0f948f006..8a2e6dd4f 100644 --- a/src/test/java/io/nats/client/api/StreamInfoTests.java +++ b/src/test/java/io/nats/client/api/StreamInfoTests.java @@ -19,7 +19,6 @@ import org.junit.jupiter.api.Test; import java.time.Duration; -import java.time.ZonedDateTime; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -42,8 +41,8 @@ public void testStreamInfo() { } private void validateStreamInfo(StreamInfo si) { - ZonedDateTime zdt = DateTimeUtils.parseDateTime("2021-01-25T20:09:10.6225191Z"); - assertEquals(zdt, si.getCreateTime()); + assertEquals(DateTimeUtils.parseDateTime("2021-01-25T20:09:10.6225191Z"), si.getCreateTime()); + assertEquals(DateTimeUtils.parseDateTime("2023-08-29T19:33:21.163377Z"), si.getTimestamp()); StreamConfiguration sc = si.getConfiguration(); assertEquals("streamName", sc.getName()); @@ -51,6 +50,7 @@ private void validateStreamInfo(StreamInfo si) { assertEquals("sub0", sc.getSubjects().get(0)); assertEquals("sub1", sc.getSubjects().get(1)); assertEquals("x.>", sc.getSubjects().get(2)); + assertEquals(82942, sc.getFirstSequence()); assertEquals(RetentionPolicy.Limits, sc.getRetentionPolicy()); assertEquals(DiscardPolicy.Old, sc.getDiscardPolicy()); diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index 22c202a30..4343b7487 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -81,6 +81,21 @@ public void testStreamCreate() throws Exception { assertEquals(0, ss.getFirstSequence()); assertEquals(0, ss.getLastSequence()); assertEquals(0, ss.getConsumerCount()); + + if (nc.getServerInfo().isSameOrNewerThanVersion("2.10")) { + JetStream js = nc.jetStream(); + String stream = stream(); + sc = StreamConfiguration.builder() + .name(stream) + .storageType(StorageType.Memory) + .firstSequence(42) + .subjects("test-first-seq").build(); + si = jsm.addStream(sc); + assertNotNull(si.getTimestamp()); + assertEquals(42, si.getConfiguration().getFirstSequence()); + PublishAck pa = js.publish("test-first-seq", null); + assertEquals(42, pa.getSeqno()); + } }); } @@ -302,39 +317,50 @@ public void testGetStreamInfo() throws Exception { JetStreamManagement jsm = nc.jetStreamManagement(); assertThrows(JetStreamApiException.class, () -> jsm.getStreamInfo(STREAM)); + JetStream js = nc.jetStream(); + String[] subjects = new String[6]; for (int x = 0; x < 5; x++) { subjects[x] = subject(x + 1); } subjects[5] = "foo.>"; - createMemoryStream(jsm, STREAM, subjects); - StreamInfo si = jsm.getStreamInfo(STREAM); - assertEquals(STREAM, si.getConfiguration().getName()); + String stream = stream(); + createMemoryStream(jsm, stream, subjects); + + StreamInfo si = jsm.getStreamInfo(stream); + assertEquals(stream, si.getConfiguration().getName()); assertEquals(0, si.getStreamState().getSubjectCount()); assertEquals(0, si.getStreamState().getSubjects().size()); assertEquals(0, si.getStreamState().getDeletedCount()); assertEquals(0, si.getStreamState().getDeleted().size()); + if (nc.getServerInfo().isOlderThanVersion("2.10")) { + assertNull(si.getTimestamp()); + } + else { + assertNotNull(si.getTimestamp()); + } + assertEquals(1, si.getConfiguration().getFirstSequence()); + List packs = new ArrayList<>(); - JetStream js = nc.jetStream(); for (int x = 0; x < 5; x++) { jsPublish(js, subject(x + 1), x + 1); PublishAck pa = jsPublish(js, subject(x + 1), data(x + 2)); packs.add(pa); - jsm.deleteMessage(STREAM, pa.getSeqno()); + jsm.deleteMessage(stream, pa.getSeqno()); } jsPublish(js, "foo.bar", 6); - si = jsm.getStreamInfo(STREAM); - assertEquals(STREAM, si.getConfiguration().getName()); + si = jsm.getStreamInfo(stream); + assertEquals(stream, si.getConfiguration().getName()); assertEquals(6, si.getStreamState().getSubjectCount()); assertEquals(0, si.getStreamState().getSubjects().size()); assertEquals(5, si.getStreamState().getDeletedCount()); assertEquals(0, si.getStreamState().getDeleted().size()); - si = jsm.getStreamInfo(STREAM, StreamInfoOptions.builder().allSubjects().deletedDetails().build()); - assertEquals(STREAM, si.getConfiguration().getName()); + si = jsm.getStreamInfo(stream, StreamInfoOptions.builder().allSubjects().deletedDetails().build()); + assertEquals(stream, si.getConfiguration().getName()); assertEquals(6, si.getStreamState().getSubjectCount()); List list = si.getStreamState().getSubjects(); assertNotNull(list); @@ -361,7 +387,7 @@ public void testGetStreamInfo() throws Exception { jsPublish(js, "foo.baz", 2); sleep(100); - si = jsm.getStreamInfo(STREAM, StreamInfoOptions.builder().filterSubjects("foo.>").deletedDetails().build()); + si = jsm.getStreamInfo(stream, StreamInfoOptions.builder().filterSubjects("foo.>").deletedDetails().build()); assertEquals(7, si.getStreamState().getSubjectCount()); list = si.getStreamState().getSubjects(); assertNotNull(list); @@ -377,7 +403,7 @@ public void testGetStreamInfo() throws Exception { assertNotNull(s); assertEquals(2, s.getCount()); - si = jsm.getStreamInfo(STREAM, StreamInfoOptions.builder().filterSubjects(subject(5)).build()); + si = jsm.getStreamInfo(stream, StreamInfoOptions.builder().filterSubjects(subject(5)).build()); list = si.getStreamState().getSubjects(); assertNotNull(list); assertEquals(1, list.size()); @@ -910,6 +936,12 @@ public void testGetConsumerInfo() throws Exception { assertEquals(STREAM, ci.getStreamName()); assertEquals(DURABLE, ci.getName()); assertThrows(JetStreamApiException.class, () -> jsm.getConsumerInfo(STREAM, durable(999))); + if (nc.getServerInfo().isSameOrNewerThanVersion("2.10")) { + assertNotNull(ci.getTimestamp()); + } + else { + assertNull(ci.getTimestamp()); + } }); } diff --git a/src/test/java/io/nats/client/support/JsonUtilsTests.java b/src/test/java/io/nats/client/support/JsonUtilsTests.java index 1ea381627..f64f40eed 100644 --- a/src/test/java/io/nats/client/support/JsonUtilsTests.java +++ b/src/test/java/io/nats/client/support/JsonUtilsTests.java @@ -241,6 +241,15 @@ public void testAddFields() { addField(sb, "zdt", DateTimeUtils.gmtNow()); assertEquals(180, sb.length()); + + addFieldWhenGreaterThan(sb, "xgt", 0L, 1); + assertEquals(180, sb.length()); + + addFieldWhenGreaterThan(sb, "xgt", 1L, 1); + assertEquals(180, sb.length()); + + addFieldWhenGreaterThan(sb, "xgt", 2L, 1); + assertEquals(188, sb.length()); } static final String EXPECTED_LIST_JSON = "{\"a1\":[\"one\"],\"a2\":[\"two\",\"too\"],\"l1\":[\"one\"],\"l2\":[\"two\",\"too\"],\"j1\":[{\"filter\":\"sub1\",\"keep\":421}],\"j2\":[{\"filter\":\"sub2\",\"seq\":732},{\"filter\":\"sub3\"}],\"d1\":[1000000],\"d2\":[2000000,3000000]}"; diff --git a/src/test/resources/data/ConsumerInfo.json b/src/test/resources/data/ConsumerInfo.json index 0a540d2c3..0abc6702e 100644 --- a/src/test/resources/data/ConsumerInfo.json +++ b/src/test/resources/data/ConsumerInfo.json @@ -3,6 +3,7 @@ "stream_name": "foo-stream", "name": "foo-consumer", "created": "2020-11-05T19:33:21.163377Z", + "ts": "2023-08-29T19:33:21.163377Z", "config": { "durable_name": "foo-consumer", "deliver_subject": "bar", diff --git a/src/test/resources/data/StreamConfiguration.json b/src/test/resources/data/StreamConfiguration.json index 331dcdafb..c034e6da4 100644 --- a/src/test/resources/data/StreamConfiguration.json +++ b/src/test/resources/data/StreamConfiguration.json @@ -23,6 +23,7 @@ "allow_direct": true, "mirror_direct": true, "metadata":{"meta-foo":"meta-bar"}, + "first_seq": 82942, "placement": { "cluster": "clstr", "tags": ["tag1", "tag2"] diff --git a/src/test/resources/data/StreamInfo.json b/src/test/resources/data/StreamInfo.json index 0d84120d9..2ea6ec4a1 100644 --- a/src/test/resources/data/StreamInfo.json +++ b/src/test/resources/data/StreamInfo.json @@ -23,9 +23,11 @@ "ptag1", "ptag2" ] - } + }, + "first_seq": 82942 }, "created": "2021-01-25T20:09:10.6225191Z", + "ts": "2023-08-29T19:33:21.163377Z", "state": { "messages": 11, "bytes": 12,