diff --git a/src/main/java/io/nats/client/api/Error.java b/src/main/java/io/nats/client/api/Error.java index 3ec3124b9..61d700b1f 100644 --- a/src/main/java/io/nats/client/api/Error.java +++ b/src/main/java/io/nats/client/api/Error.java @@ -97,6 +97,10 @@ public static Error convert(Status status) { return new Error(status.getCode(), NOT_SET, status.getMessage()); } + public static Error convert(Exception e) { + return new Error(500, NOT_SET, e.getMessage()); + } + public static final Error JsBadRequestErr = new Error(400, 10003, "bad request"); public static final Error JsNoMessageFoundErr = new Error(404, 10037, "no message found"); } diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java index 21ecf8a98..82affa481 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java @@ -351,12 +351,8 @@ private MessageInfo _getMessage(String streamName, MessageGetRequest messageGetR @Override public List fetchMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException { validateMessageBatchGetRequest(streamName, messageBatchGetRequest); - List results = new ArrayList<>(); - _requestMessageBatch(streamName, messageBatchGetRequest, msg -> { - if (msg != MessageInfo.EOD) { - results.add(msg); - } - }); + final List results = new ArrayList<>(); + _requestMessageBatch(streamName, messageBatchGetRequest, false, results::add); return results; } @@ -367,7 +363,7 @@ public List fetchMessageBatch(String streamName, MessageBatchGetReq public LinkedBlockingQueue queueMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException { validateMessageBatchGetRequest(streamName, messageBatchGetRequest); final LinkedBlockingQueue q = new LinkedBlockingQueue<>(); - conn.getOptions().getExecutor().submit(() -> _requestMessageBatch(streamName, messageBatchGetRequest, q::add)); + conn.getOptions().getExecutor().submit(() -> _requestMessageBatch(streamName, messageBatchGetRequest, true, q::add)); return q; } @@ -377,11 +373,13 @@ public LinkedBlockingQueue queueMessageBatch(String streamName, Mes @Override public void requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException { validateMessageBatchGetRequest(streamName, messageBatchGetRequest); - _requestMessageBatch(streamName, messageBatchGetRequest, handler); + _requestMessageBatch(streamName, messageBatchGetRequest, true, handler); } - public void _requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) { + public void _requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, boolean sendEod, MessageInfoHandler handler) { Subscription sub = null; + + // the default end of data will be a normal end of data (vs status or exception) try { String replyTo = conn.createInbox(); sub = conn.subscribe(replyTo); @@ -389,20 +387,17 @@ public void _requestMessageBatch(String streamName, MessageBatchGetRequest messa String requestSubject = prependPrefix(String.format(JSAPI_DIRECT_GET, streamName)); conn.publish(requestSubject, replyTo, messageBatchGetRequest.serialize()); - long maxTimeMillis = getTimeout().toMillis(); - long timeLeft = maxTimeMillis; - long start = System.currentTimeMillis(); while (true) { - Message msg = sub.nextMessage(timeLeft); + Message msg = sub.nextMessage(getTimeout()); if (msg == null) { break; } if (msg.isStatusMessage()) { Status status = msg.getStatus(); // Report error, otherwise successful status. - if (status.getCode() < 200 || status.getCode() > 299) { - MessageInfo messageInfo = new MessageInfo(Error.convert(status), true); - handler.onMessageInfo(messageInfo); + if (status.getCode() != Status.EOB) { + handler.onMessageInfo(new MessageInfo(Error.convert(status), true)); + sendEod = false; // the error is the EOD since we always end on error } break; } @@ -414,18 +409,21 @@ public void _requestMessageBatch(String streamName, MessageBatchGetRequest messa MessageInfo messageInfo = new MessageInfo(msg, streamName, true); handler.onMessageInfo(messageInfo); - timeLeft = maxTimeMillis - (System.currentTimeMillis() - start); } - } catch (InterruptedException e) { + } + catch (InterruptedException e) { // sub.nextMessage was fetching one message // and data is not completely read // so it seems like this is an error condition Thread.currentThread().interrupt(); throw new RuntimeException(e); } finally { - try { - handler.onMessageInfo(MessageInfo.EOD); - } catch (Exception ignore) { + if (sendEod) { + try { + handler.onMessageInfo(MessageInfo.EOD); + } + catch (Exception ignore) { + } } try { //noinspection DataFlowIssue diff --git a/src/main/java/io/nats/client/support/Status.java b/src/main/java/io/nats/client/support/Status.java index ecdf0ab79..dd2de4705 100644 --- a/src/main/java/io/nats/client/support/Status.java +++ b/src/main/java/io/nats/client/support/Status.java @@ -27,6 +27,7 @@ public class Status { public static final int NOT_FOUND_CODE = 404; public static final int REQUEST_TIMEOUT_CODE = 408; public static final int CONFLICT_CODE = 409; + public static final int EOB = 204; public static String BAD_REQUEST = "Bad Request"; // 400 public static String NO_MESSAGES = "No Messages"; // 404 diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index 757b79f2d..61cb322d0 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -1583,7 +1583,7 @@ public void testBatchDirectGet() throws Exception { StreamInfo si = jsm.updateStream(sc); assertTrue(si.getConfiguration().getAllowDirect()); - // Empty request errors. + // Empty (Invalid) request errors. AtomicBoolean hasError = new AtomicBoolean(); MessageInfoHandler errorHandler = msg -> { hasError.compareAndSet(false, msg.hasError()); @@ -1591,12 +1591,13 @@ public void testBatchDirectGet() throws Exception { MessageBatchGetRequest request = MessageBatchGetRequest.builder().build(); jsm.requestMessageBatch(tsc.stream, request, errorHandler); assertTrue(hasError.get()); + // fetch List list = jsm.fetchMessageBatch(tsc.stream, request); assertEquals(1, list.size()); assertTrue(list.get(0).hasError()); + // queue LinkedBlockingQueue queue = jsm.queueMessageBatch(tsc.stream, request); assertTrue(queue.take().hasError()); - assertEquals(MessageInfo.EOD, queue.take()); // First batch gets first two messages. request = MessageBatchGetRequest.builder()