Skip to content

Commit

Permalink
more testing
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Nov 5, 2024
1 parent ed87573 commit b7379bb
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 23 deletions.
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/api/MessageInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private MessageInfo(Message msg, Status status, String streamName, boolean fromD
Headers _headers = null;
String _stream = null;
long _lastSeq = -1;
long _numPending = -1;
long _numPending = 0;
Status _status = null;

if (status != null) {
Expand Down
54 changes: 32 additions & 22 deletions src/test/java/io/nats/client/impl/JetStreamManagementTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import static io.nats.client.support.DateTimeUtils.DEFAULT_TIME;
import static io.nats.client.support.DateTimeUtils.ZONE_ID_GMT;
import static io.nats.client.support.NatsJetStreamClientError.JsAllowDirectRequired;
import static io.nats.client.support.NatsJetStreamConstants.*;
import static io.nats.client.support.Status.NOT_FOUND_CODE;
import static io.nats.client.utils.ResourceUtils.dataAsString;
Expand Down Expand Up @@ -1546,7 +1547,7 @@ public void testCreateConsumerUpdateConsumer() throws Exception {
}

@Test
public void testBatchDirectGetErrors() throws Exception {
public void testBatchDirectGetErrorsAndStatuses() throws Exception {
assertThrows(IllegalArgumentException.class, () -> MessageBatchGetRequest.batch(null, 1));
assertThrows(IllegalArgumentException.class, () -> MessageBatchGetRequest.batch("", 1));
assertThrows(IllegalArgumentException.class, () -> MessageBatchGetRequest.batch(">", 0));
Expand All @@ -1566,10 +1567,11 @@ public void testBatchDirectGetErrors() throws Exception {
assertFalse(si.getConfiguration().getAllowDirect());

// Stream doesn't have AllowDirect enabled, will error.
assertThrows(IllegalArgumentException.class, () -> {
IllegalArgumentException iae = assertThrows(IllegalArgumentException.class, () -> {
MessageBatchGetRequest request = MessageBatchGetRequest.batch("subject", 1);
jsm.requestMessageBatch(streamNoDirect, request, mi -> {});
});
assertTrue(iae.getMessage().contains(JsAllowDirectRequired.id()));

String stream = variant();
subject = variant();
Expand All @@ -1595,35 +1597,43 @@ public void testBatchDirectGetErrors() throws Exception {
LinkedBlockingQueue<MessageInfo> queue = jsm.queueMessageBatch(stream, request);
verifyError(queueToList(queue), NOT_FOUND_CODE);

jsm.jetStream().publish(subject, dataBytes()); // so there are messages
Thread.sleep(2500); // for start_time
jsm.jetStream().publish(subject, dataBytes());

// Empty (Invalid) request errors.
// request = MessageBatchGetRequest.builder().build();
// verifyError(jsm.fetchMessageBatch(stream, request), BAD_JS_REQUEST_CODE);
//
// request = MessageBatchGetRequest.builder().batch(1).nextBySubject("not").build();
// verifyError(jsm.fetchMessageBatch(stream, request), NOT_FOUND_CODE);
//
// request = MessageBatchGetRequest.builder().batch(1).minSequence(99).build();
// subject not found
request = MessageBatchGetRequest.batch("invalid", 3);
verifyError(jsm.fetchMessageBatch(stream, request), NOT_FOUND_CODE);

request = MessageBatchGetRequest.multiLastForSubjects(Collections.singletonList("invalid"), 3);
verifyError(jsm.fetchMessageBatch(stream, request), NOT_FOUND_CODE);

// sequence larger
request = MessageBatchGetRequest.batch(subject, 3, 2);
verifyError(jsm.fetchMessageBatch(stream, request), NOT_FOUND_CODE);

List<String> subjects = Collections.singletonList(subject);

// batch, time after
// awaiting https://github.com/nats-io/nats-server/issues/6032
// ZonedDateTime time = ZonedDateTime.now().plusSeconds(10);
// request = MessageBatchGetRequest.batch(subject, 3, time);
// verifyError(jsm.fetchMessageBatch(stream, request), NOT_FOUND_CODE);

// DOESN'T WORK AS ASSUMED
// request = MessageBatchGetRequest.builder()
// .batch(1)
// .startTime(ZonedDateTime.now()).build();
// last for, time before
// awaiting https://github.com/nats-io/nats-server/issues/6077
// time = ZonedDateTime.now().minusSeconds(10);
// request = MessageBatchGetRequest.multiLastForSubjects(subjects, time);
// verifyError(jsm.fetchMessageBatch(stream, request), NOT_FOUND_CODE);
});
}

private static void verifyError(List<MessageInfo> list, int code) {
assertEquals(1, list.size());
MessageInfo miErr = list.get(0);
assertFalse(miErr.isMessage());
assertTrue(miErr.isStatus());
assertFalse(miErr.isEobStatus());
assertTrue(miErr.isErrorStatus());
assertEquals(code, miErr.getStatus().getCode());
MessageInfo mi = list.get(0);
assertFalse(mi.isMessage());
assertTrue(mi.isStatus());
assertFalse(mi.isEobStatus());
assertTrue(mi.isErrorStatus());
assertEquals(code, mi.getStatus().getCode());
}

@Test
Expand Down

0 comments on commit b7379bb

Please sign in to comment.