diff --git a/src/main/java/io/nats/client/api/MessageInfo.java b/src/main/java/io/nats/client/api/MessageInfo.java index 4da39fa41..90a948b94 100644 --- a/src/main/java/io/nats/client/api/MessageInfo.java +++ b/src/main/java/io/nats/client/api/MessageInfo.java @@ -87,7 +87,7 @@ private MessageInfo(Message msg, Status status, String streamName, boolean fromD Headers _headers = null; String _stream = null; long _lastSeq = -1; - long _numPending = -1; + long _numPending = 0; Status _status = null; if (status != null) { diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index 9fad18dcf..2c3e756aa 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -29,6 +29,7 @@ import static io.nats.client.support.DateTimeUtils.DEFAULT_TIME; import static io.nats.client.support.DateTimeUtils.ZONE_ID_GMT; +import static io.nats.client.support.NatsJetStreamClientError.JsAllowDirectRequired; import static io.nats.client.support.NatsJetStreamConstants.*; import static io.nats.client.support.Status.NOT_FOUND_CODE; import static io.nats.client.utils.ResourceUtils.dataAsString; @@ -1546,7 +1547,7 @@ public void testCreateConsumerUpdateConsumer() throws Exception { } @Test - public void testBatchDirectGetErrors() throws Exception { + public void testBatchDirectGetErrorsAndStatuses() throws Exception { assertThrows(IllegalArgumentException.class, () -> MessageBatchGetRequest.batch(null, 1)); assertThrows(IllegalArgumentException.class, () -> MessageBatchGetRequest.batch("", 1)); assertThrows(IllegalArgumentException.class, () -> MessageBatchGetRequest.batch(">", 0)); @@ -1566,10 +1567,11 @@ public void testBatchDirectGetErrors() throws Exception { assertFalse(si.getConfiguration().getAllowDirect()); // Stream doesn't have AllowDirect enabled, will error. - assertThrows(IllegalArgumentException.class, () -> { + IllegalArgumentException iae = assertThrows(IllegalArgumentException.class, () -> { MessageBatchGetRequest request = MessageBatchGetRequest.batch("subject", 1); jsm.requestMessageBatch(streamNoDirect, request, mi -> {}); }); + assertTrue(iae.getMessage().contains(JsAllowDirectRequired.id())); String stream = variant(); subject = variant(); @@ -1595,35 +1597,43 @@ public void testBatchDirectGetErrors() throws Exception { LinkedBlockingQueue queue = jsm.queueMessageBatch(stream, request); verifyError(queueToList(queue), NOT_FOUND_CODE); - jsm.jetStream().publish(subject, dataBytes()); // so there are messages - Thread.sleep(2500); // for start_time + jsm.jetStream().publish(subject, dataBytes()); - // Empty (Invalid) request errors. -// request = MessageBatchGetRequest.builder().build(); -// verifyError(jsm.fetchMessageBatch(stream, request), BAD_JS_REQUEST_CODE); -// -// request = MessageBatchGetRequest.builder().batch(1).nextBySubject("not").build(); -// verifyError(jsm.fetchMessageBatch(stream, request), NOT_FOUND_CODE); -// -// request = MessageBatchGetRequest.builder().batch(1).minSequence(99).build(); + // subject not found + request = MessageBatchGetRequest.batch("invalid", 3); + verifyError(jsm.fetchMessageBatch(stream, request), NOT_FOUND_CODE); + + request = MessageBatchGetRequest.multiLastForSubjects(Collections.singletonList("invalid"), 3); + verifyError(jsm.fetchMessageBatch(stream, request), NOT_FOUND_CODE); + + // sequence larger + request = MessageBatchGetRequest.batch(subject, 3, 2); + verifyError(jsm.fetchMessageBatch(stream, request), NOT_FOUND_CODE); + + List subjects = Collections.singletonList(subject); + + // batch, time after + // awaiting https://github.com/nats-io/nats-server/issues/6032 +// ZonedDateTime time = ZonedDateTime.now().plusSeconds(10); +// request = MessageBatchGetRequest.batch(subject, 3, time); // verifyError(jsm.fetchMessageBatch(stream, request), NOT_FOUND_CODE); -// DOESN'T WORK AS ASSUMED -// request = MessageBatchGetRequest.builder() -// .batch(1) -// .startTime(ZonedDateTime.now()).build(); + // last for, time before + // awaiting https://github.com/nats-io/nats-server/issues/6077 +// time = ZonedDateTime.now().minusSeconds(10); +// request = MessageBatchGetRequest.multiLastForSubjects(subjects, time); // verifyError(jsm.fetchMessageBatch(stream, request), NOT_FOUND_CODE); }); } private static void verifyError(List list, int code) { assertEquals(1, list.size()); - MessageInfo miErr = list.get(0); - assertFalse(miErr.isMessage()); - assertTrue(miErr.isStatus()); - assertFalse(miErr.isEobStatus()); - assertTrue(miErr.isErrorStatus()); - assertEquals(code, miErr.getStatus().getCode()); + MessageInfo mi = list.get(0); + assertFalse(mi.isMessage()); + assertTrue(mi.isStatus()); + assertFalse(mi.isEobStatus()); + assertTrue(mi.isErrorStatus()); + assertEquals(code, mi.getStatus().getCode()); } @Test