diff --git a/src/main/java/io/nats/client/api/MessageBatchGetRequest.java b/src/main/java/io/nats/client/api/MessageBatchGetRequest.java index e54a5af6e..67453e7ac 100644 --- a/src/main/java/io/nats/client/api/MessageBatchGetRequest.java +++ b/src/main/java/io/nats/client/api/MessageBatchGetRequest.java @@ -14,11 +14,9 @@ package io.nats.client.api; import io.nats.client.support.JsonSerializable; +import io.nats.client.support.Validator; import java.time.ZonedDateTime; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.List; import static io.nats.client.support.ApiConstants.*; @@ -30,23 +28,82 @@ public class MessageBatchGetRequest implements JsonSerializable { private final int batch; + private final String nextBySubject; private final int maxBytes; private final long minSequence; private final ZonedDateTime startTime; - private final String nextBySubject; private final List multiLastBySubjects; private final long upToSequence; private final ZonedDateTime upToTime; - MessageBatchGetRequest(Builder b) { - this.batch = b.batch; - this.maxBytes = b.maxBytes; - this.minSequence = b.minSequence; - this.startTime = b.startTime; - this.nextBySubject = b.nextBySubject; - this.multiLastBySubjects = b.multiLastBySubjects; - this.upToSequence = b.upToSequence; - this.upToTime = b.upToTime; + // batch constructor + private MessageBatchGetRequest(String subject, + int batch, + int maxBytes, + long minSequence, + ZonedDateTime startTime) + { + Validator.required(subject, "Subject"); + Validator.validateGtZero(batch, "Batch"); + + this.nextBySubject = subject; + this.batch = batch; + this.maxBytes = maxBytes; + this.startTime = startTime; + this.multiLastBySubjects = null; + this.upToSequence = -1; + this.upToTime = null; + + this.minSequence = startTime == null && minSequence < 1 ? 1 : minSequence; + } + + public static MessageBatchGetRequest batch(String subject, int batch) { + return new MessageBatchGetRequest(subject, batch, -1, -1, null); + } + + public static MessageBatchGetRequest batch(String subject, int batch, long minSequence) { + return new MessageBatchGetRequest(subject, batch, -1, minSequence, null); + } + + public static MessageBatchGetRequest batch(String subject, int batch, ZonedDateTime startTime) { + return new MessageBatchGetRequest(subject, batch, -1, -1, startTime); + } + + public static MessageBatchGetRequest batchBytes(String subject, int batch, int maxBytes) { + return new MessageBatchGetRequest(subject, batch, maxBytes, -1, null); + } + + public static MessageBatchGetRequest batchBytes(String subject, int batch, int maxBytes, long minSequence) { + return new MessageBatchGetRequest(subject, batch, maxBytes, minSequence, null); + } + + public static MessageBatchGetRequest batchBytes(String subject, int batch, int maxBytes, ZonedDateTime startTime) { + return new MessageBatchGetRequest(subject, batch, maxBytes, -1, startTime); + } + + // multi for constructor + private MessageBatchGetRequest(List subjects, long upToSequence, ZonedDateTime upToTime) { + Validator.required(subjects, "Subjects"); + batch = -1; + nextBySubject = null; + this.maxBytes = -1; + this.minSequence = -1; + this.startTime = null; + this.multiLastBySubjects = subjects; + this.upToSequence = upToSequence; + this.upToTime = upToTime; + } + + public static MessageBatchGetRequest multiLastForSubjects(List subjects) { + return new MessageBatchGetRequest(subjects, -1, null); + } + + public static MessageBatchGetRequest multiLastForSubjects(List subjects, long upToSequence) { + return new MessageBatchGetRequest(subjects, upToSequence, null); + } + + public static MessageBatchGetRequest multiLastForSubjects(List subjects, ZonedDateTime upToTime) { + return new MessageBatchGetRequest(subjects, -1, upToTime); } /** @@ -123,195 +180,16 @@ public String toJson() { addField(sb, BATCH, batch); addField(sb, MAX_BYTES, maxBytes); addField(sb, START_TIME, startTime); + addField(sb, SEQ, minSequence); addField(sb, NEXT_BY_SUBJECT, nextBySubject); addStrings(sb, MULTI_LAST, multiLastBySubjects); addField(sb, UP_TO_SEQ, upToSequence); addField(sb, UP_TO_TIME, upToTime); - - // THIS IS A WORKAROUND https://github.com/nats-io/nats-server/issues/6026 - if (minSequence < 1) { - if (maxBytes > 0) { - addField(sb, SEQ, 1); - } - } - else { - addField(sb, SEQ, minSequence); - } return endJson(sb).toString(); } - /** - * Creates a builder for the request. - * @return Builder - */ - public static Builder builder() { - return new Builder(); - } - - /** - * Creates a builder for the request. - * @param req the {@link MessageBatchGetRequest} - * @return Builder - */ - public static Builder builder(MessageBatchGetRequest req) { - return req == null ? new Builder() : new Builder(req); - } - - /** - * {@link MessageBatchGetRequest} is created using a Builder. The builder supports chaining and will - * create a default set of options if no methods are calls. - *

{@code MessageBatchGetRequest.builder().build()} will create a default {@link MessageBatchGetRequest}. - */ - public static class Builder { - private int batch = -1; - private int maxBytes = -1; - private long minSequence = -1; - private ZonedDateTime startTime = null; - private String nextBySubject = null; - private List multiLastBySubjects = new ArrayList<>(); - private long upToSequence = -1; - private ZonedDateTime upToTime = null; - - /** - * Construct the builder - */ - public Builder() { - } - - /** - * Construct the builder and initialize values with the existing {@link MessageBatchGetRequest} - * @param req the {@link MessageBatchGetRequest} to clone - */ - public Builder(MessageBatchGetRequest req) { - if (req != null) { - this.batch = req.batch; - this.maxBytes = req.maxBytes; - this.minSequence = req.minSequence; - this.startTime = req.startTime; - this.nextBySubject = req.nextBySubject; - this.multiLastBySubjects = req.multiLastBySubjects; - this.upToSequence = req.upToSequence; - this.upToTime = req.upToTime; - } - } - - /** - * Set the maximum amount of messages to be returned for this request. - * @param batch the batch size - * @return Builder - */ - public Builder batch(int batch) { - this.batch = batch < 1 ? -1 : batch; - return this; - } - - /** - * Maximum amount of returned bytes for this request. - * Limits the amount of returned messages to not exceed this. - * @param maxBytes the maximum bytes - * @return Builder - */ - public Builder maxBytes(int maxBytes) { - this.maxBytes = maxBytes < 1 ? -1 : maxBytes; - return this; - } - - /** - * Minimum sequence for returned messages. - * All returned messages will have a sequence equal to or higher than this. - * @param sequence the minimum message sequence - * @return Builder - */ - public Builder minSequence(long sequence) { - this.minSequence = sequence < 1 ? -1 : sequence; - return this; - } - - /** - * Minimum start time for returned messages. - * All returned messages will have a start time equal to or higher than this. - * @param startTime the minimum message start time - * @return Builder - */ - public Builder startTime(ZonedDateTime startTime) { - this.startTime = startTime; - return this; - } - - /** - * Subject used to filter messages that should be returned. - * @param nextBySubject the subject to filter - * @return Builder - */ - public Builder nextBySubject(String nextBySubject) { - if (!multiLastBySubjects.isEmpty()) { - throw new IllegalArgumentException("nextBySubject cannot be used when multiLastBySubjects is used."); - } - this.nextBySubject = nextBySubject; - return this; - } - - /** - * Subjects filter used, these can include wildcards. - * Will get the last messages matching the subjects. - * @param subjects the subjects to get the last messages for - * @return Builder - */ - public Builder multiLastBySubjects(String... subjects) { - if (nextBySubject != null) { - throw new IllegalArgumentException("multiLastBySubjects cannot be used when nextBySubject is used."); - } - this.multiLastBySubjects.clear(); - if (subjects != null && subjects.length > 0) { - this.multiLastBySubjects.addAll(Arrays.asList(subjects)); - } - return this; - } - - /** - * Subjects filter used, these can include wildcards. - * Will get the last messages matching the subjects. - * @param subjects the subjects to get the last messages for - * @return Builder - */ - public Builder multiLastBySubjects(Collection subjects) { - if (nextBySubject != null) { - throw new IllegalArgumentException("multiLastBySubjects cannot be used when nextBySubject is used."); - } - this.multiLastBySubjects.clear(); - if (subjects != null && !subjects.isEmpty()) { - this.multiLastBySubjects.addAll(subjects); - } - return this; - } - - /** - * Only return messages up to this sequence. - * If not set, will be last sequence for the stream. - * @param upToSequence the maximum message sequence to return results for - * @return Builder - */ - public Builder upToSequence(long upToSequence) { - this.upToSequence = upToSequence < 1 ? -1 : upToSequence; - return this; - } - - /** - * Only return messages up to this time. - * @param upToTime the maximum message time to return results for - * @return Builder - */ - public Builder upToTime(ZonedDateTime upToTime) { - this.upToTime = upToTime; - return this; - } - - /** - * Build the {@link MessageBatchGetRequest}. - * @return MessageBatchGetRequest - */ - public MessageBatchGetRequest build() { - return new MessageBatchGetRequest(this); - } + @Override + public String toString() { + return toJson(); } } diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index c31344153..9fad18dcf 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -15,7 +15,7 @@ import io.nats.client.*; import io.nats.client.api.*; -import io.nats.client.support.*; +import io.nats.client.support.DateTimeUtils; import io.nats.client.utils.TestBase; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; @@ -24,14 +24,12 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.ZonedDateTime; -import java.time.temporal.ChronoUnit; import java.util.*; import java.util.concurrent.LinkedBlockingQueue; import static io.nats.client.support.DateTimeUtils.DEFAULT_TIME; import static io.nats.client.support.DateTimeUtils.ZONE_ID_GMT; import static io.nats.client.support.NatsJetStreamConstants.*; -import static io.nats.client.support.Status.BAD_JS_REQUEST_CODE; import static io.nats.client.support.Status.NOT_FOUND_CODE; import static io.nats.client.utils.ResourceUtils.dataAsString; import static org.junit.jupiter.api.Assertions.*; @@ -1547,112 +1545,13 @@ public void testCreateConsumerUpdateConsumer() throws Exception { }); } - private static List queueToList(LinkedBlockingQueue queue) throws InterruptedException { - List list = new ArrayList<>(); - while (true) { - MessageInfo mi = queue.take(); - list.add(mi); - if (!mi.isMessage()) { - break; - } - } - return list; - } - - private static void verifyRequest1(List list, boolean lastIsEob) { - assertEquals(lastIsEob ? 4 : 3, list.size()); - MessageInfo mi = list.get(2); - assertEquals(2, mi.getNumPending()); - assertEquals(3, mi.getSeq()); - assertEquals(2, mi.getLastSeq()); - verifyMessage(mi); - verifyEob(list, lastIsEob, 3); - } - - private static void verifyRequest2(List list, boolean lastIsEob) { - assertEquals(lastIsEob ? 3 : 2, list.size()); - MessageInfo mi = list.get(1); - assertEquals(0, mi.getNumPending()); - assertEquals(5, mi.getSeq()); - assertEquals(4, mi.getLastSeq()); - - verifyMessage(mi); - verifyEob(list, lastIsEob, 2); - } - - private static void verifyMessage(MessageInfo mi) { - assertTrue(mi.isMessage()); - assertFalse(mi.isStatus()); - assertFalse(mi.isEobStatus()); - assertFalse(mi.isErrorStatus()); - } - - private static void verifyEob(List list, boolean lastIsEob, int ix) { - if (lastIsEob) { - MessageInfo mi = list.get(ix); - assertFalse(mi.isMessage()); - assertTrue(mi.isStatus()); - assertTrue(mi.isEobStatus()); - assertFalse(mi.isErrorStatus()); - } - } - - @Test - public void testBatchDirectGet() throws Exception { - jsServer.run(TestBase::atLeast2_11, nc -> { - JetStream js = nc.jetStream(); - JetStreamManagement jsm = nc.jetStreamManagement(); - - String stream = variant(); - String subject = variant(); - StreamConfiguration sc = StreamConfiguration.builder() - .name(stream) - .storageType(StorageType.Memory) - .subjects(subject) - .allowDirect(true) - .build(); - StreamInfo si = jsm.addStream(sc); - assertTrue(si.getConfiguration().getAllowDirect()); - - // Requests stay the same for all options. - MessageBatchGetRequest request1 = MessageBatchGetRequest.builder() - .batch(3) - .nextBySubject(subject) - .build(); - MessageBatchGetRequest request2 = MessageBatchGetRequest.builder() - .batch(3) - .minSequence(4) - .nextBySubject(subject) - .build(); - - for (int x = 0; x < 5; x++) { - js.publish(subject, ("" + x).getBytes(StandardCharsets.UTF_8)); - } - - // Get using handler. - List list = new ArrayList<>(); - jsm.requestMessageBatch(stream, request1, list::add); - verifyRequest1(list, true); - - list.clear(); - jsm.requestMessageBatch(stream, request2, list::add); - verifyRequest2(list, true); - - // Get using fetch. - verifyRequest1(jsm.fetchMessageBatch(stream, request1), false); - verifyRequest2(jsm.fetchMessageBatch(stream, request2), false); - - // Get using queue. - LinkedBlockingQueue queue = jsm.queueMessageBatch(stream, request1); - verifyRequest1(queueToList(queue), true); - - queue = jsm.queueMessageBatch(stream, request2); - verifyRequest2(queueToList(queue), true); - }); - } - @Test public void testBatchDirectGetErrors() throws Exception { + assertThrows(IllegalArgumentException.class, () -> MessageBatchGetRequest.batch(null, 1)); + assertThrows(IllegalArgumentException.class, () -> MessageBatchGetRequest.batch("", 1)); + assertThrows(IllegalArgumentException.class, () -> MessageBatchGetRequest.batch(">", 0)); + assertThrows(IllegalArgumentException.class, () -> MessageBatchGetRequest.multiLastForSubjects(null)); + jsServer.run(TestBase::atLeast2_11, nc -> { JetStreamManagement jsm = nc.jetStreamManagement(); @@ -1668,7 +1567,7 @@ public void testBatchDirectGetErrors() throws Exception { // Stream doesn't have AllowDirect enabled, will error. assertThrows(IllegalArgumentException.class, () -> { - MessageBatchGetRequest request = MessageBatchGetRequest.builder().build(); + MessageBatchGetRequest request = MessageBatchGetRequest.batch("subject", 1); jsm.requestMessageBatch(streamNoDirect, request, mi -> {}); }); @@ -1682,10 +1581,7 @@ public void testBatchDirectGetErrors() throws Exception { .build(); jsm.addStream(sc); - MessageBatchGetRequest request = MessageBatchGetRequest.builder() - .batch(3) - .nextBySubject(subject) - .build(); + MessageBatchGetRequest request = MessageBatchGetRequest.batch(subject, 3); // no messages yet - handler List list = new ArrayList<>(); @@ -1703,14 +1599,14 @@ public void testBatchDirectGetErrors() throws Exception { Thread.sleep(2500); // for start_time // Empty (Invalid) request errors. - request = MessageBatchGetRequest.builder().build(); - verifyError(jsm.fetchMessageBatch(stream, request), BAD_JS_REQUEST_CODE); - - request = MessageBatchGetRequest.builder().batch(1).nextBySubject("not").build(); - verifyError(jsm.fetchMessageBatch(stream, request), NOT_FOUND_CODE); - - request = MessageBatchGetRequest.builder().batch(1).minSequence(99).build(); - verifyError(jsm.fetchMessageBatch(stream, request), NOT_FOUND_CODE); +// request = MessageBatchGetRequest.builder().build(); +// verifyError(jsm.fetchMessageBatch(stream, request), BAD_JS_REQUEST_CODE); +// +// request = MessageBatchGetRequest.builder().batch(1).nextBySubject("not").build(); +// verifyError(jsm.fetchMessageBatch(stream, request), NOT_FOUND_CODE); +// +// request = MessageBatchGetRequest.builder().batch(1).minSequence(99).build(); +// verifyError(jsm.fetchMessageBatch(stream, request), NOT_FOUND_CODE); // DOESN'T WORK AS ASSUMED // request = MessageBatchGetRequest.builder() @@ -1731,230 +1627,338 @@ private static void verifyError(List list, int code) { } @Test - public void testBatchDirectGetMultiLast() throws Exception { + public void testBatchDirectGet() throws Exception { jsServer.run(TestBase::atLeast2_11, nc -> { JetStream js = nc.jetStream(); JetStreamManagement jsm = nc.jetStreamManagement(); - String stream = stream(); - jsm.addStream(StreamConfiguration.builder() + String stream = variant(); + String subject = variant(); + StreamConfiguration sc = StreamConfiguration.builder() .name(stream) - .subjects(stream + ".a.>") + .storageType(StorageType.Memory) + .subjects(subject + ".>") .allowDirect(true) - .build()); + .build(); + StreamInfo si = jsm.addStream(sc); + assertTrue(si.getConfiguration().getAllowDirect()); - String subjectAFoo = stream + ".a.foo"; - String subjectABar = stream + ".a.bar"; - String subjectABaz = stream + ".a.baz"; - js.publish(subjectAFoo, "foo".getBytes(StandardCharsets.UTF_8)); - js.publish(subjectABar, "bar".getBytes(StandardCharsets.UTF_8)); - js.publish(subjectABaz, "baz".getBytes(StandardCharsets.UTF_8)); + byte[] payload = new byte[1000]; + for (int per = 1; per <= 5; per++) { + for (char c = 'A'; c <= 'E'; c++) { + String s = subject + "." + c; + js.publish(s, payload); + Thread.sleep(10); // make sure there are no duplicate times + } + Thread.sleep(2500); + } + ZonedDateTime time = jsm.getMessage(stream, 6).getTime().minusSeconds(1); - MessageBatchGetRequest request = MessageBatchGetRequest.builder() - .multiLastBySubjects(subjectAFoo, subjectABaz) - .build(); + String subjectAll = subject + ".>"; + String subjectA = subject + ".A"; + String subjectD = subject + ".D"; - List list = jsm.fetchMessageBatch(stream, request); - assertEquals(2, list.size()); - assertEquals(subjectAFoo, list.get(0).getSubject()); - assertEquals(subjectABaz, list.get(1).getSubject()); + // Requests stay the same for all options. + MessageBatchGetRequest requestBatch1 = MessageBatchGetRequest.batch(subjectAll, 3); + MessageBatchGetRequest requestBatch1A = MessageBatchGetRequest.batch(subjectA, 3); + MessageBatchGetRequest requestBatch2 = MessageBatchGetRequest.batch(subjectAll, 3, 4); + MessageBatchGetRequest requestBatch2A = MessageBatchGetRequest.batch(subjectA, 3, 4); + MessageBatchGetRequest requestBatch3 = MessageBatchGetRequest.batch(subjectAll, 3, time); + MessageBatchGetRequest requestBatch3A = MessageBatchGetRequest.batch(subjectA, 3, time); + MessageBatchGetRequest requestBatch4 = MessageBatchGetRequest.batchBytes(subjectAll, 3, 2002); + MessageBatchGetRequest requestBatch4A = MessageBatchGetRequest.batchBytes(subjectA, 3, 2002); + MessageBatchGetRequest requestBatch5 = MessageBatchGetRequest.batchBytes(subjectAll, 3, 2002, 4); + MessageBatchGetRequest requestBatch5A = MessageBatchGetRequest.batchBytes(subjectA, 3, 2002, 4); + MessageBatchGetRequest requestBatch6 = MessageBatchGetRequest.batchBytes(subjectAll, 3, 2002, time); + MessageBatchGetRequest requestBatch6A = MessageBatchGetRequest.batchBytes(subjectA, 3, 2002, time); + + List subjectAllList = Collections.singletonList(subjectAll); + List subjectsList = Arrays.asList(subjectA, subjectD); + MessageBatchGetRequest requestMulti1 = MessageBatchGetRequest.multiLastForSubjects(subjectsList); + MessageBatchGetRequest requestMulti1All = MessageBatchGetRequest.multiLastForSubjects(subjectAllList); + MessageBatchGetRequest requestMulti2 = MessageBatchGetRequest.multiLastForSubjects(subjectsList, 23); + MessageBatchGetRequest requestMulti2All = MessageBatchGetRequest.multiLastForSubjects(subjectAllList, 23); + MessageBatchGetRequest requestMulti3 = MessageBatchGetRequest.multiLastForSubjects(subjectsList, time); + MessageBatchGetRequest requestMulti3All = MessageBatchGetRequest.multiLastForSubjects(subjectAllList, time); + + // Get using handler. + List list = new ArrayList<>(); + jsm.requestMessageBatch(stream, requestBatch1, list::add); + verifyBatch1(list, true); + + list.clear(); + jsm.requestMessageBatch(stream, requestBatch1A, list::add); + verifyBatch1A(list, true); + + list.clear(); + jsm.requestMessageBatch(stream, requestBatch2, list::add); + verifyBatch2(list, true); + + list.clear(); + jsm.requestMessageBatch(stream, requestBatch2A, list::add); + verifyBatch2A(list, true); + + list.clear(); + jsm.requestMessageBatch(stream, requestBatch3, list::add); + verifyBatch3(list, true); + + list.clear(); + jsm.requestMessageBatch(stream, requestBatch3A, list::add); + verifyBatch3A(list, true); + + list.clear(); + jsm.requestMessageBatch(stream, requestBatch4, list::add); + verifyBatch4(list, true); + + list.clear(); + jsm.requestMessageBatch(stream, requestBatch4A, list::add); + verifyBatch4A(list, true); + + list.clear(); + jsm.requestMessageBatch(stream, requestBatch5, list::add); + verifyBatch5(list, true); + + list.clear(); + jsm.requestMessageBatch(stream, requestBatch5A, list::add); + verifyBatch5A(list, true); + + list.clear(); + jsm.requestMessageBatch(stream, requestBatch6, list::add); + verifyBatch6(list, true); + + list.clear(); + jsm.requestMessageBatch(stream, requestBatch6A, list::add); + verifyBatch6A(list, true); + + list.clear(); + jsm.requestMessageBatch(stream, requestMulti1, list::add); + verifyMulti1(list, true); + + list.clear(); + jsm.requestMessageBatch(stream, requestMulti1All, list::add); + verifyMulti1All(list, true); + + list.clear(); + jsm.requestMessageBatch(stream, requestMulti2, list::add); + verifyMulti2(list, true); + + list.clear(); + jsm.requestMessageBatch(stream, requestMulti2All, list::add); + verifyMulti2All(list, true); + + list.clear(); + jsm.requestMessageBatch(stream, requestMulti3, list::add); + verifyMulti3(list, true); + + list.clear(); + jsm.requestMessageBatch(stream, requestMulti3All, list::add); + verifyMulti3All(list, true); + + // Get using fetch. + verifyBatch1(jsm.fetchMessageBatch(stream, requestBatch1), false); + verifyBatch1A(jsm.fetchMessageBatch(stream, requestBatch1A), false); + verifyBatch2(jsm.fetchMessageBatch(stream, requestBatch2), false); + verifyBatch2A(jsm.fetchMessageBatch(stream, requestBatch2A), false); + verifyBatch3(jsm.fetchMessageBatch(stream, requestBatch3), false); + verifyBatch3A(jsm.fetchMessageBatch(stream, requestBatch3A), false); + verifyBatch4(jsm.fetchMessageBatch(stream, requestBatch4), false); + verifyBatch4A(jsm.fetchMessageBatch(stream, requestBatch4A), false); + verifyBatch5(jsm.fetchMessageBatch(stream, requestBatch5), false); + verifyBatch5A(jsm.fetchMessageBatch(stream, requestBatch5A), false); + verifyBatch6(jsm.fetchMessageBatch(stream, requestBatch6), false); + verifyBatch6A(jsm.fetchMessageBatch(stream, requestBatch6A), false); + + verifyMulti1(jsm.fetchMessageBatch(stream, requestMulti1), false); + verifyMulti1All(jsm.fetchMessageBatch(stream, requestMulti1All), false); + verifyMulti2(jsm.fetchMessageBatch(stream, requestMulti2), false); + verifyMulti2All(jsm.fetchMessageBatch(stream, requestMulti2All), false); + verifyMulti3(jsm.fetchMessageBatch(stream, requestMulti3), false); + verifyMulti3All(jsm.fetchMessageBatch(stream, requestMulti3All), false); + + // Get using queue. + LinkedBlockingQueue queue = jsm.queueMessageBatch(stream, requestBatch1); + verifyBatch1(queueToList(queue), true); + + queue = jsm.queueMessageBatch(stream, requestBatch1A); + verifyBatch1A(queueToList(queue), true); + + queue = jsm.queueMessageBatch(stream, requestBatch2); + verifyBatch2(queueToList(queue), true); + + queue = jsm.queueMessageBatch(stream, requestBatch2A); + verifyBatch2A(queueToList(queue), true); + + queue = jsm.queueMessageBatch(stream, requestBatch3); + verifyBatch3(queueToList(queue), true); + + queue = jsm.queueMessageBatch(stream, requestBatch3A); + verifyBatch3A(queueToList(queue), true); + + queue = jsm.queueMessageBatch(stream, requestBatch4); + verifyBatch4(queueToList(queue), true); + + queue = jsm.queueMessageBatch(stream, requestBatch4A); + verifyBatch4A(queueToList(queue), true); + + queue = jsm.queueMessageBatch(stream, requestBatch5); + verifyBatch5(queueToList(queue), true); + + queue = jsm.queueMessageBatch(stream, requestBatch5A); + verifyBatch5A(queueToList(queue), true); + + queue = jsm.queueMessageBatch(stream, requestBatch6); + verifyBatch6(queueToList(queue), true); + + queue = jsm.queueMessageBatch(stream, requestBatch6A); + verifyBatch6A(queueToList(queue), true); + + queue = jsm.queueMessageBatch(stream, requestMulti1); + verifyMulti1(queueToList(queue), true); + + queue = jsm.queueMessageBatch(stream, requestMulti1All); + verifyMulti1All(queueToList(queue), true); + + queue = jsm.queueMessageBatch(stream, requestMulti2); + verifyMulti2(queueToList(queue), true); + + queue = jsm.queueMessageBatch(stream, requestMulti2All); + verifyMulti2All(queueToList(queue), true); + + queue = jsm.queueMessageBatch(stream, requestMulti3); + verifyMulti3(queueToList(queue), true); + + queue = jsm.queueMessageBatch(stream, requestMulti3All); + verifyMulti3All(queueToList(queue), true); }); } - @Test - public void testBatchDirectGetMiscConstraints() throws Exception { - jsServer.run(TestBase::atLeast2_11, nc -> { - JetStream js = nc.jetStream(); - JetStreamManagement jsm = nc.jetStreamManagement(); + private static List queueToList(LinkedBlockingQueue queue) throws InterruptedException { + List list = new ArrayList<>(); + while (true) { + MessageInfo mi = queue.take(); + list.add(mi); + if (!mi.isMessage()) { + break; + } + } + return list; + } - String stream = stream(); - String subject = subject(); - jsm.addStream(StreamConfiguration.builder() - .name(stream) - .subjects(subject) - .allowDirect(true) - .build()); + private static void showList(List list, String label) { + System.out.println(label); + for (MessageInfo mi : list) { + System.out.println(mi); + } + System.out.println(); + } - List messages = new ArrayList<>(); - Dispatcher d = nc.createDispatcher(); - js.subscribe(subject, d, messages::add, true); - - String dataA = "Aaaaaaaaaa"; - String dataB = "Bbbbbbbbbb"; - String dataC = "Cccccccccc"; - String dataD = "Dddddddddd"; - String dataE = "Eeeeeeeeee"; - js.publish(subject, dataA.getBytes()); - sleep(500); - js.publish(subject, dataB.getBytes()); - js.publish(subject, dataC.getBytes()); - js.publish(subject, dataD.getBytes()); - js.publish(subject, dataE.getBytes()); - - sleep(500); // wait for all the messages to be received - - // min sequence - MessageBatchGetRequest request = MessageBatchGetRequest.builder() - .batch(5) - .minSequence(2) - .build(); - List list = jsm.fetchMessageBatch(stream, request); - assertEquals(4, list.size()); - assertEquals(dataB, new String(list.get(0).getData())); - assertEquals(dataC, new String(list.get(1).getData())); - assertEquals(dataD, new String(list.get(2).getData())); - assertEquals(dataE, new String(list.get(3).getData())); - - // start time - request = MessageBatchGetRequest.builder() - .batch(5) - .startTime(messages.get(0).metaData().timestamp().plus(1, ChronoUnit.MILLIS)) - .build(); - list = jsm.fetchMessageBatch(stream, request); - assertEquals(4, list.size()); - assertEquals(dataB, new String(list.get(0).getData())); - assertEquals(dataC, new String(list.get(1).getData())); - assertEquals(dataD, new String(list.get(2).getData())); - assertEquals(dataE, new String(list.get(3).getData())); - - // maxBytes this is an empty request -// request = MessageBatchGetRequest.builder() -// .batch(5) -// .maxBytes(20) -// .build(); -// list = jsm.fetchMessageBatch(stream, request); -// assertEquals(1, list.size()); -// assertEquals(dataA, new String(list.get(0).getData())); -// assertEquals(1, list.get(0).getSeq()); - -// TODO Not a feature yet - // up to sequence -// request = MessageBatchGetRequest.builder() -// .batch(5) -// .upToSequence(3) -// .build(); -// list = jsm.fetchMessageBatch(stream, request); -// assertEquals(3, list.size()); -// assertEquals(dataA, new String(list.get(0).getData())); -// assertEquals(dataB, new String(list.get(1).getData())); -// assertEquals(dataC, new String(list.get(2).getData())); - -// TODO Not a feature yet - // up to time -// request = MessageBatchGetRequest.builder() -// .batch(5) -// .upToTime(messages.get(2).metaData().timestamp().plus(1, ChronoUnit.MILLIS)) -// .build(); -// list = jsm.fetchMessageBatch(stream, request); -// assertEquals(2, list.size()); -// assertEquals(dataA, new String(list.get(0).getData())); -// assertEquals(dataB, new String(list.get(1).getData())); - }); + private static void verifyBatch1(List list, boolean lastIsEob) { + _verifyBatch(list, 3, lastIsEob, 23, 2, 1); } - @Test - public void testBatchDirectGetBuilder() throws Exception { - // Default / empty - MessageBatchGetRequest mbgr = MessageBatchGetRequest.builder().build(); - verifyBuilder(mbgr, -1, -1, -1, null, null, null, -1, null, "{}"); - - mbgr = MessageBatchGetRequest.builder() - .batch(0) - .maxBytes(0) - .minSequence(0) - .build(); - verifyBuilder(mbgr, -1, -1, -1, null, null, null, -1, null, "{}"); + private static void verifyBatch1A(List list, boolean lastIsEob) { + _verifyBatch(list, 3, lastIsEob, 3, 6, 1); + } - mbgr = MessageBatchGetRequest.builder() - .batch(Integer.MIN_VALUE) - .maxBytes(Integer.MIN_VALUE) - .minSequence(Long.MIN_VALUE) - .build(); - verifyBuilder(mbgr, -1, -1, -1, null, null, null, -1, null, "{}"); - - ZonedDateTime time = ZonedDateTime.now(); - String timeStr = DateTimeUtils.toRfc3339(time); - - mbgr = MessageBatchGetRequest.builder() - .batch(42) - .maxBytes(43) - .minSequence(44) - .startTime(time) - .nextBySubject("subject") - .upToSequence(45) - .upToTime(time) - .build(); - verifyBuilder(mbgr, 42, 43, 44, time, "subject", null, 45, time, - "{\"batch\":42" + - ",\"max_bytes\":43" + - ",\"start_time\":\"" + timeStr + "\"" + - ",\"next_by_subj\":\"subject\"" + - ",\"up_to_seq\":45" + - ",\"up_to_time\":\"" + timeStr + "\"" + - ",\"seq\":44" + - "}" - ); - - // Batch direct get - multi last - array - List lastBySubjects = Collections.singletonList("multi.last"); - mbgr = MessageBatchGetRequest.builder() - .multiLastBySubjects("multi.last") - .build(); - verifyBuilder(mbgr, -1, -1, -1, null, null, lastBySubjects, -1, null, - "{\"multi_last\":[\"multi.last\"]}"); + private static void verifyBatch2(List list, boolean lastIsEob) { + _verifyBatch(list, 3, lastIsEob, 20, 5, 4); + } - // Batch direct get - multi last - collection - mbgr = MessageBatchGetRequest.builder() - .multiLastBySubjects(lastBySubjects) - .build(); - verifyBuilder(mbgr, -1, -1, -1, null, null, lastBySubjects, -1, null, - "{\"multi_last\":[\"multi.last\"]}"); - - assertThrows(IllegalArgumentException.class, () -> MessageBatchGetRequest.builder().nextBySubject("nbs").multiLastBySubjects("mlbs")); - assertThrows(IllegalArgumentException.class, () -> MessageBatchGetRequest.builder().nextBySubject("nbs").multiLastBySubjects(lastBySubjects)); - assertThrows(IllegalArgumentException.class, () -> MessageBatchGetRequest.builder().multiLastBySubjects("mlbs").nextBySubject("nbs")); - } - - public void verifyBuilder(MessageBatchGetRequest mbgr, - int batch, - int maxBytes, - long minSequence, - ZonedDateTime startTime, - String nextBySubject, - List multiLastBySubjects, - long upToSequence, - ZonedDateTime upToTime, - String json) throws JsonParseException { - assertEquals(batch, mbgr.getBatch()); - assertEquals(maxBytes, mbgr.getMaxBytes()); - assertEquals(minSequence, mbgr.getMinSequence()); - if (startTime == null) { - assertNull(mbgr.getStartTime()); - } - else { - assertEquals(startTime.withZoneSameInstant(ZONE_ID_GMT), mbgr.getStartTime().withZoneSameInstant(ZONE_ID_GMT)); - } - assertEquals(nextBySubject, mbgr.getNextBySubject()); - assertTrue(Validator.listsAreEquivalent(multiLastBySubjects, mbgr.getMultiLastBySubjects())); - assertEquals(upToSequence, mbgr.getUpToSequence()); - if (upToTime == null) { - assertNull(mbgr.getUpToTime()); + private static void verifyBatch2A(List list, boolean lastIsEob) { + _verifyBatch(list, 3, lastIsEob, 2, 11, 6); + } + + private static void verifyBatch3(List list, boolean lastIsEob) { + _verifyBatch(list, 3, lastIsEob, 18, 7, 6); + } + + private static void verifyBatch3A(List list, boolean lastIsEob) { + _verifyBatch(list, 3, lastIsEob, 2, 11, 6); + } + + private static void verifyBatch4(List list, boolean lastIsEob) { + _verifyBatch(list, 2, lastIsEob, 23, 2, 1); + } + + private static void verifyBatch4A(List list, boolean lastIsEob) { + _verifyBatch(list, 2, lastIsEob, 3, 6, 1); + } + + private static void verifyBatch5(List list, boolean lastIsEob) { + _verifyBatch(list, 2, lastIsEob, 20, 5, 4); + } + + private static void verifyBatch5A(List list, boolean lastIsEob) { + _verifyBatch(list, 2, lastIsEob, 2, 11, 6); + } + + private static void verifyBatch6(List list, boolean lastIsEob) { + _verifyBatch(list, 2, lastIsEob, 18, 7, 6); + } + + private static void verifyBatch6A(List list, boolean lastIsEob) { + _verifyBatch(list, 2, lastIsEob, 2, 11, 6); + } + + private void verifyMulti1(List list, boolean lastIsEob) { + _verifyMulti(list, 2, lastIsEob, 21, 24); + } + + private void verifyMulti1All(List list, boolean lastIsEob) { + _verifyMulti(list, 5, lastIsEob, 21, 22, 23, 24, 25); + } + + private void verifyMulti2(List list, boolean lastIsEob) { + _verifyMulti(list, 2, lastIsEob, 19, 21); + } + + private void verifyMulti2All(List list, boolean lastIsEob) { + _verifyMulti(list, 5, lastIsEob, 19, 20, 21, 22, 23); + } + + private void verifyMulti3(List list, boolean lastIsEob) { + _verifyMulti(list, 2, lastIsEob, 1, 4); + } + + private void verifyMulti3All(List list, boolean lastIsEob) { + _verifyMulti(list, 5, lastIsEob, 1, 2, 3, 4, 5); + } + + private static void _verifyMulti(List list, int sizeNoEob, boolean lastIsEob, long... seq) { + assertEquals(lastIsEob ? sizeNoEob + 1 : sizeNoEob, list.size()); + for (int x = 0; x < seq.length; x++) { + assertEquals(seq[x], list.get(x).getSeq()); + verifyMessage(list.get(x)); } - else { - assertEquals(upToTime.withZoneSameInstant(ZONE_ID_GMT), mbgr.getUpToTime().withZoneSameInstant(ZONE_ID_GMT)); + if (lastIsEob) { + verifyEob(list); } - if (json != null) { - if (json.equals("{}")) { - assertEquals(json, mbgr.toJson()); - } - else { - JsonValue expected = JsonParser.parse(json); - JsonValue actual = JsonParser.parse(mbgr.toJson()); - assertEquals(expected.map.size(), actual.map.size()); - for (String key : expected.map.keySet()) { - assertEquals(expected.map.get(key).toString(), actual.map.get(key).toString()); - } - } + } + + private static void _verifyBatch(List list, int sizeNoEob, boolean lastIsEob, long pending, long seq, long lastSeq) { + assertEquals(lastIsEob ? sizeNoEob + 1 : sizeNoEob, list.size()); + MessageInfo mi = list.get(1); + assertEquals(pending, mi.getNumPending()); + assertEquals(seq, mi.getSeq()); + assertEquals(lastSeq, mi.getLastSeq()); + verifyMessage(mi); + if (lastIsEob) { + verifyEob(list); } } + + private static void verifyMessage(MessageInfo mi) { + assertTrue(mi.isMessage()); + assertFalse(mi.isStatus()); + assertFalse(mi.isEobStatus()); + assertFalse(mi.isErrorStatus()); + } + + private static void verifyEob(List list) { + MessageInfo mi = list.get(list.size() - 1); + assertFalse(mi.isMessage()); + assertTrue(mi.isStatus()); + assertTrue(mi.isEobStatus()); + assertFalse(mi.isErrorStatus()); + } }