Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Main for server v2.11 #1239

Open
wants to merge 48 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
62a73b3
Direct Batch EOD and loop changes
scottf Oct 10, 2024
c83faff
Direct Batch EOD and loop changes
scottf Oct 10, 2024
aaecbff
Direct Batch EOD and loop changes
scottf Oct 10, 2024
1ee791b
Consumer Priority Group Overflow (#1233)
scottf Oct 10, 2024
ad60f23
Merge branch 'main' into main-2-11
scottf Oct 10, 2024
cfa5cbc
Merge branch 'main' into main-2-11
scottf Oct 10, 2024
3dbc5fb
Merge branch 'main' into main-2-11
scottf Oct 11, 2024
d4667b4
Merge branch 'main' into main-2-11
scottf Oct 15, 2024
2b21962
Merge branch 'main' into main-2-11
scottf Oct 18, 2024
93510f2
Refactoring based on testing
scottf Oct 18, 2024
6b8d3df
Removed debug
scottf Oct 21, 2024
ad6694b
Removed debug
scottf Oct 21, 2024
f32791d
Support setting [min] sequence when needed
scottf Oct 21, 2024
6b82bca
In progress waiting for server PR's
scottf Oct 28, 2024
aa97394
Merge branch 'main' into main-2-11
scottf Oct 31, 2024
9cf1e16
Merge branch 'main' into main-2-11
scottf Nov 1, 2024
788c160
Server main branch has all 2.11 features
scottf Nov 1, 2024
7a9e0a5
Merge branch 'main' into main-2-11
scottf Nov 1, 2024
e0df8c0
Fixing removal
scottf Nov 1, 2024
ed87573
MessageBatchGetRequest reform based on adr
scottf Nov 5, 2024
b7379bb
more testing
scottf Nov 5, 2024
e242724
multi last for with batch
scottf Nov 7, 2024
3bfa65c
Merge branch 'refs/heads/main' into main-2-11
scottf Nov 14, 2024
6e9c067
Merge branch 'main' into main-2-11
scottf Nov 14, 2024
c97c535
Added MessageInfo getMessage(String streamName, MessageGetRequest mes…
scottf Nov 15, 2024
151b309
Merge branch 'main' into main-2-11
scottf Nov 15, 2024
050f106
Merge branch 'main' into main-2-11
scottf Nov 15, 2024
9c13f6a
Merge branch 'main' into main-2-11
scottf Nov 22, 2024
6a405ba
Update Doc
scottf Nov 24, 2024
0318b12
tuned request message batch
scottf Nov 25, 2024
592f596
Merge branch 'main' into main-2-11
scottf Nov 26, 2024
96271a1
tuned request message batch
scottf Nov 26, 2024
0aeb173
Merge branch 'main' into main-2-11
scottf Dec 3, 2024
e37d659
Merge branch 'main' into main-2-11
scottf Dec 4, 2024
e34817a
Merge branch 'main' into main-2-11
scottf Dec 4, 2024
800a684
Merge branch 'main' into main-2-11
scottf Dec 18, 2024
c1d1b58
Merge branch 'main' into main-2-11
scottf Jan 6, 2025
6884f7e
Merge branch 'main' into main-2-11
scottf Jan 9, 2025
5108a14
Merge branch 'main' into main-2-11
scottf Jan 16, 2025
b2ca330
Removed Direct Batch to Orbit
scottf Feb 13, 2025
c371e64
Merge branch 'main' into main-2-11
scottf Feb 17, 2025
e95930a
Merge branch 'main' into main-2-11
scottf Feb 18, 2025
af57d78
Merge branch 'main' into main-2-11
scottf Feb 19, 2025
0b11ac0
Merge branch 'main' into main-2-11
scottf Feb 20, 2025
804808c
Add Message TTL Stream Configuration (#1280)
scottf Feb 24, 2025
2b795b0
Merge branch 'main' into main-2-11
scottf Feb 24, 2025
18d8a43
Merge branch 'main' into main-2-11
scottf Feb 25, 2025
8f3d5fc
Fix tests after merge
scottf Feb 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Direct Batch EOD and loop changes
  • Loading branch information
scottf committed Oct 10, 2024
commit 62a73b33791e067edce5fa2122ffcfbc0bcf8582
4 changes: 4 additions & 0 deletions src/main/java/io/nats/client/api/Error.java
Original file line number Diff line number Diff line change
@@ -97,6 +97,10 @@ public static Error convert(Status status) {
return new Error(status.getCode(), NOT_SET, status.getMessage());
}

public static Error convert(Exception e) {
return new Error(500, NOT_SET, e.getMessage());
}

public static final Error JsBadRequestErr = new Error(400, 10003, "bad request");
public static final Error JsNoMessageFoundErr = new Error(404, 10037, "no message found");
}
40 changes: 19 additions & 21 deletions src/main/java/io/nats/client/impl/NatsJetStreamManagement.java
Original file line number Diff line number Diff line change
@@ -351,12 +351,8 @@ private MessageInfo _getMessage(String streamName, MessageGetRequest messageGetR
@Override
public List<MessageInfo> fetchMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException {
validateMessageBatchGetRequest(streamName, messageBatchGetRequest);
List<MessageInfo> results = new ArrayList<>();
_requestMessageBatch(streamName, messageBatchGetRequest, msg -> {
if (msg != MessageInfo.EOD) {
results.add(msg);
}
});
final List<MessageInfo> results = new ArrayList<>();
_requestMessageBatch(streamName, messageBatchGetRequest, false, results::add);
return results;
}

@@ -367,7 +363,7 @@ public List<MessageInfo> fetchMessageBatch(String streamName, MessageBatchGetReq
public LinkedBlockingQueue<MessageInfo> queueMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException {
validateMessageBatchGetRequest(streamName, messageBatchGetRequest);
final LinkedBlockingQueue<MessageInfo> q = new LinkedBlockingQueue<>();
conn.getOptions().getExecutor().submit(() -> _requestMessageBatch(streamName, messageBatchGetRequest, q::add));
conn.getOptions().getExecutor().submit(() -> _requestMessageBatch(streamName, messageBatchGetRequest, true, q::add));
return q;
}

@@ -377,32 +373,31 @@ public LinkedBlockingQueue<MessageInfo> queueMessageBatch(String streamName, Mes
@Override
public void requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException {
validateMessageBatchGetRequest(streamName, messageBatchGetRequest);
_requestMessageBatch(streamName, messageBatchGetRequest, handler);
_requestMessageBatch(streamName, messageBatchGetRequest, true, handler);
}

public void _requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) {
public void _requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, boolean sendEod, MessageInfoHandler handler) {
Subscription sub = null;

// the default end of data will be a normal end of data (vs status or exception)
try {
String replyTo = conn.createInbox();
sub = conn.subscribe(replyTo);

String requestSubject = prependPrefix(String.format(JSAPI_DIRECT_GET, streamName));
conn.publish(requestSubject, replyTo, messageBatchGetRequest.serialize());

long maxTimeMillis = getTimeout().toMillis();
long timeLeft = maxTimeMillis;
long start = System.currentTimeMillis();
while (true) {
Message msg = sub.nextMessage(timeLeft);
Message msg = sub.nextMessage(getTimeout());
if (msg == null) {
break;
}
if (msg.isStatusMessage()) {
Status status = msg.getStatus();
// Report error, otherwise successful status.
if (status.getCode() < 200 || status.getCode() > 299) {
MessageInfo messageInfo = new MessageInfo(Error.convert(status), true);
handler.onMessageInfo(messageInfo);
if (status.getCode() != Status.EOB) {
handler.onMessageInfo(new MessageInfo(Error.convert(status), true));
sendEod = false; // the error is the EOD since we always end on error
}
break;
}
@@ -414,18 +409,21 @@ public void _requestMessageBatch(String streamName, MessageBatchGetRequest messa

MessageInfo messageInfo = new MessageInfo(msg, streamName, true);
handler.onMessageInfo(messageInfo);
timeLeft = maxTimeMillis - (System.currentTimeMillis() - start);
}
} catch (InterruptedException e) {
}
catch (InterruptedException e) {
// sub.nextMessage was fetching one message
// and data is not completely read
// so it seems like this is an error condition
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
try {
handler.onMessageInfo(MessageInfo.EOD);
} catch (Exception ignore) {
if (sendEod) {
try {
handler.onMessageInfo(MessageInfo.EOD);
}
catch (Exception ignore) {
}
}
try {
//noinspection DataFlowIssue
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/support/Status.java
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@ public class Status {
public static final int NOT_FOUND_CODE = 404;
public static final int REQUEST_TIMEOUT_CODE = 408;
public static final int CONFLICT_CODE = 409;
public static final int EOB = 204;

public static String BAD_REQUEST = "Bad Request"; // 400
public static String NO_MESSAGES = "No Messages"; // 404
Original file line number Diff line number Diff line change
@@ -1583,20 +1583,21 @@ public void testBatchDirectGet() throws Exception {
StreamInfo si = jsm.updateStream(sc);
assertTrue(si.getConfiguration().getAllowDirect());

// Empty request errors.
// Empty (Invalid) request errors.
AtomicBoolean hasError = new AtomicBoolean();
MessageInfoHandler errorHandler = msg -> {
hasError.compareAndSet(false, msg.hasError());
};
MessageBatchGetRequest request = MessageBatchGetRequest.builder().build();
jsm.requestMessageBatch(tsc.stream, request, errorHandler);
assertTrue(hasError.get());
// fetch
List<MessageInfo> list = jsm.fetchMessageBatch(tsc.stream, request);
assertEquals(1, list.size());
assertTrue(list.get(0).hasError());
// queue
LinkedBlockingQueue<MessageInfo> queue = jsm.queueMessageBatch(tsc.stream, request);
assertTrue(queue.take().hasError());
assertEquals(MessageInfo.EOD, queue.take());

// First batch gets first two messages.
request = MessageBatchGetRequest.builder()
Loading