Skip to content

Commit

Permalink
tuned request message batch
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Nov 26, 2024
1 parent 592f596 commit 96271a1
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 17 deletions.
22 changes: 8 additions & 14 deletions src/main/java/io/nats/client/api/MessageInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
*/
public class MessageInfo extends ApiResponse<MessageInfo> {

private final boolean fromDirect;
private final String subject;
private final long seq;
private final byte[] data;
Expand All @@ -54,27 +53,23 @@ public MessageInfo(Message msg) {
* Create a Message Info
* @param msg the message
* @param streamName the stream name if known
* @param fromDirect true if the object is being created from a get fromDirect api call instead of the standard get message
* @param parseDirect true if the object is being created from a direct api call instead of get message
*/
public MessageInfo(Message msg, String streamName, boolean fromDirect) {
this(msg, null, streamName, fromDirect);
public MessageInfo(Message msg, String streamName, boolean parseDirect) {
this(msg, null, streamName, parseDirect);
}


/**
* Create a Message Info
* @param status the status
* @param streamName the stream name if known
* @param fromDirect whether this was called from a direct get
*/
public MessageInfo(Status status, String streamName, boolean fromDirect) {
this(null, status, streamName, fromDirect);
public MessageInfo(Status status, String streamName) {
this(null, status, streamName, false);
}

private MessageInfo(Message msg, Status status, String streamName, boolean fromDirect) {
super(fromDirect ? null : msg);

this.fromDirect = fromDirect;
private MessageInfo(Message msg, Status status, String streamName, boolean parseDirect) {
super(parseDirect ? null : msg);

// working vars because the object vars are final
String _subject = null;
Expand All @@ -91,7 +86,7 @@ private MessageInfo(Message msg, Status status, String streamName, boolean fromD
_status = status;
_stream = streamName;
}
else if (fromDirect) {
else if (parseDirect) {
Headers msgHeaders = msg.getHeaders();
_subject = msgHeaders.getLast(NATS_SUBJECT);
_data = msg.getData();
Expand Down Expand Up @@ -260,7 +255,6 @@ else if (hasError()) {
JsonUtils.addField(sb, "data_length", data.length);
}
JsonUtils.addField(sb, HDRS, headers);
JsonUtils.addField(sb, "from_direct", fromDirect);
}
return JsonUtils.endJson(sb).toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ else if (msg.isStatusMessage()) {
if (errorOrNonEob != null) {
// All error or non eob statuses, always send, but it is the last message to the caller
sendEob = false;
handler.onMessageInfo(new MessageInfo(Status.TIMEOUT_OR_NO_MESSAGES, streamName, true));
handler.onMessageInfo(new MessageInfo(errorOrNonEob, streamName));
return false; // should not time out before eob
}

Expand All @@ -426,11 +426,12 @@ else if (msg.isStatusMessage()) {
// and data is not completely read
// so it seems like this is an error condition
Thread.currentThread().interrupt();
throw new RuntimeException(e);
sendEob = false;
return false;
} finally {
if (sendEob) {
try {
handler.onMessageInfo(new MessageInfo(Status.EOB, streamName, true));
handler.onMessageInfo(new MessageInfo(Status.EOB, streamName));
}
catch (RuntimeException ignore) { /* user handler runtime error */ }
}
Expand Down

0 comments on commit 96271a1

Please sign in to comment.