Skip to content

Commit

Permalink
Direct Batch EOD and loop changes
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Oct 10, 2024
1 parent 30b264c commit 62a73b3
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 23 deletions.
4 changes: 4 additions & 0 deletions src/main/java/io/nats/client/api/Error.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ public static Error convert(Status status) {
return new Error(status.getCode(), NOT_SET, status.getMessage());
}

public static Error convert(Exception e) {
return new Error(500, NOT_SET, e.getMessage());
}

public static final Error JsBadRequestErr = new Error(400, 10003, "bad request");
public static final Error JsNoMessageFoundErr = new Error(404, 10037, "no message found");
}
40 changes: 19 additions & 21 deletions src/main/java/io/nats/client/impl/NatsJetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -351,12 +351,8 @@ private MessageInfo _getMessage(String streamName, MessageGetRequest messageGetR
@Override
public List<MessageInfo> fetchMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException {
validateMessageBatchGetRequest(streamName, messageBatchGetRequest);
List<MessageInfo> results = new ArrayList<>();
_requestMessageBatch(streamName, messageBatchGetRequest, msg -> {
if (msg != MessageInfo.EOD) {
results.add(msg);
}
});
final List<MessageInfo> results = new ArrayList<>();
_requestMessageBatch(streamName, messageBatchGetRequest, false, results::add);
return results;
}

Expand All @@ -367,7 +363,7 @@ public List<MessageInfo> fetchMessageBatch(String streamName, MessageBatchGetReq
public LinkedBlockingQueue<MessageInfo> queueMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException {
validateMessageBatchGetRequest(streamName, messageBatchGetRequest);
final LinkedBlockingQueue<MessageInfo> q = new LinkedBlockingQueue<>();
conn.getOptions().getExecutor().submit(() -> _requestMessageBatch(streamName, messageBatchGetRequest, q::add));
conn.getOptions().getExecutor().submit(() -> _requestMessageBatch(streamName, messageBatchGetRequest, true, q::add));
return q;
}

Expand All @@ -377,32 +373,31 @@ public LinkedBlockingQueue<MessageInfo> queueMessageBatch(String streamName, Mes
@Override
public void requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException {
validateMessageBatchGetRequest(streamName, messageBatchGetRequest);
_requestMessageBatch(streamName, messageBatchGetRequest, handler);
_requestMessageBatch(streamName, messageBatchGetRequest, true, handler);
}

public void _requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) {
public void _requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, boolean sendEod, MessageInfoHandler handler) {
Subscription sub = null;

// the default end of data will be a normal end of data (vs status or exception)
try {
String replyTo = conn.createInbox();
sub = conn.subscribe(replyTo);

String requestSubject = prependPrefix(String.format(JSAPI_DIRECT_GET, streamName));
conn.publish(requestSubject, replyTo, messageBatchGetRequest.serialize());

long maxTimeMillis = getTimeout().toMillis();
long timeLeft = maxTimeMillis;
long start = System.currentTimeMillis();
while (true) {
Message msg = sub.nextMessage(timeLeft);
Message msg = sub.nextMessage(getTimeout());
if (msg == null) {
break;
}
if (msg.isStatusMessage()) {
Status status = msg.getStatus();
// Report error, otherwise successful status.
if (status.getCode() < 200 || status.getCode() > 299) {
MessageInfo messageInfo = new MessageInfo(Error.convert(status), true);
handler.onMessageInfo(messageInfo);
if (status.getCode() != Status.EOB) {
handler.onMessageInfo(new MessageInfo(Error.convert(status), true));
sendEod = false; // the error is the EOD since we always end on error
}
break;
}
Expand All @@ -414,18 +409,21 @@ public void _requestMessageBatch(String streamName, MessageBatchGetRequest messa

MessageInfo messageInfo = new MessageInfo(msg, streamName, true);
handler.onMessageInfo(messageInfo);
timeLeft = maxTimeMillis - (System.currentTimeMillis() - start);
}
} catch (InterruptedException e) {
}
catch (InterruptedException e) {
// sub.nextMessage was fetching one message
// and data is not completely read
// so it seems like this is an error condition
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
try {
handler.onMessageInfo(MessageInfo.EOD);
} catch (Exception ignore) {
if (sendEod) {
try {
handler.onMessageInfo(MessageInfo.EOD);
}
catch (Exception ignore) {
}
}
try {
//noinspection DataFlowIssue
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/support/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class Status {
public static final int NOT_FOUND_CODE = 404;
public static final int REQUEST_TIMEOUT_CODE = 408;
public static final int CONFLICT_CODE = 409;
public static final int EOB = 204;

public static String BAD_REQUEST = "Bad Request"; // 400
public static String NO_MESSAGES = "No Messages"; // 404
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1583,20 +1583,21 @@ public void testBatchDirectGet() throws Exception {
StreamInfo si = jsm.updateStream(sc);
assertTrue(si.getConfiguration().getAllowDirect());

// Empty request errors.
// Empty (Invalid) request errors.
AtomicBoolean hasError = new AtomicBoolean();
MessageInfoHandler errorHandler = msg -> {
hasError.compareAndSet(false, msg.hasError());
};
MessageBatchGetRequest request = MessageBatchGetRequest.builder().build();
jsm.requestMessageBatch(tsc.stream, request, errorHandler);
assertTrue(hasError.get());
// fetch
List<MessageInfo> list = jsm.fetchMessageBatch(tsc.stream, request);
assertEquals(1, list.size());
assertTrue(list.get(0).hasError());
// queue
LinkedBlockingQueue<MessageInfo> queue = jsm.queueMessageBatch(tsc.stream, request);
assertTrue(queue.take().hasError());
assertEquals(MessageInfo.EOD, queue.take());

// First batch gets first two messages.
request = MessageBatchGetRequest.builder()
Expand Down

0 comments on commit 62a73b3

Please sign in to comment.