From dabca743b26fce5b9dcb87186f6106cfbf12a3d3 Mon Sep 17 00:00:00 2001 From: Scott Fauerbach Date: Wed, 16 Oct 2024 13:31:32 -0400 Subject: [PATCH] Extract from main 2.11 branch things not specific to 2.11 Part 3 (#1243) * Extract 2.11 changes part 3 * remove tests too --- .../io/nats/client/JetStreamManagement.java | 43 ---- .../java/io/nats/client/api/MessageInfo.java | 44 ---- .../client/impl/NatsJetStreamManagement.java | 108 --------- .../client/impl/JetStreamManagementTests.java | 228 +----------------- 4 files changed, 4 insertions(+), 419 deletions(-) diff --git a/src/main/java/io/nats/client/JetStreamManagement.java b/src/main/java/io/nats/client/JetStreamManagement.java index 6755e49d0..499c8ef4d 100644 --- a/src/main/java/io/nats/client/JetStreamManagement.java +++ b/src/main/java/io/nats/client/JetStreamManagement.java @@ -17,7 +17,6 @@ import java.io.IOException; import java.time.ZonedDateTime; import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; /** * JetStream Management context for creation and access to streams and consumers in NATS. @@ -325,48 +324,6 @@ public interface JetStreamManagement { */ MessageInfo getNextMessage(String streamName, long seq, String subject) throws IOException, JetStreamApiException; - /** - * Request a batch of messages using a {@link MessageBatchGetRequest}. - *

- * This API is currently EXPERIMENTAL and is subject to change. - * - * @param streamName the name of the stream - * @param messageBatchGetRequest the request details - * @return a list containing {@link MessageInfo} - * @throws IOException covers various communication issues with the NATS - * server such as timeout or interruption - * @throws JetStreamApiException the request had an error related to the data - */ - List fetchMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException; - - /** - * Request a batch of messages using a {@link MessageBatchGetRequest}. - *

- * This API is currently EXPERIMENTAL and is subject to change. - * - * @param streamName the name of the stream - * @param messageBatchGetRequest the request details - * @return a queue used to asynchronously receive {@link MessageInfo} - * @throws IOException covers various communication issues with the NATS - * server such as timeout or interruption - * @throws JetStreamApiException the request had an error related to the data - */ - LinkedBlockingQueue queueMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException; - - /** - * Request a batch of messages using a {@link MessageBatchGetRequest}. - *

- * This API is currently EXPERIMENTAL and is subject to change. - * - * @param streamName the name of the stream - * @param messageBatchGetRequest the request details - * @param handler the handler used for receiving {@link MessageInfo} - * @throws IOException covers various communication issues with the NATS - * server such as timeout or interruption - * @throws JetStreamApiException the request had an error related to the data - */ - void requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException; - /** * Deletes a message, overwriting the message data with garbage * This can be considered an expensive (time-consuming) operation, but is more secure. diff --git a/src/main/java/io/nats/client/api/MessageInfo.java b/src/main/java/io/nats/client/api/MessageInfo.java index 5ded6746f..dbdf0d7b6 100644 --- a/src/main/java/io/nats/client/api/MessageInfo.java +++ b/src/main/java/io/nats/client/api/MessageInfo.java @@ -32,11 +32,6 @@ */ public class MessageInfo extends ApiResponse { - /** - * Message returned as a response in {@link MessageBatchGetRequest} to signal end of data. - */ - public static final MessageInfo EOD = new MessageInfo(null, false); - private final boolean direct; private final String subject; private final long seq; @@ -45,7 +40,6 @@ public class MessageInfo extends ApiResponse { private final Headers headers; private final String stream; private final long lastSeq; - private final long numPending; /** * Create a Message Info @@ -57,25 +51,6 @@ public MessageInfo(Message msg) { this(msg, null, false); } - /** - * Create a Message Info - * This signature is public for testing purposes and is not intended to be used externally. - * @param error the error - * @param direct true if the object is being created from a get direct api call instead of the standard get message - */ - public MessageInfo(Error error, boolean direct) { - super(error); - this.direct = direct; - subject = null; - data = null; - seq = -1; - time = null; - headers = null; - stream = null; - lastSeq = -1; - numPending = -1; - } - /** * Create a Message Info * This signature is public for testing purposes and is not intended to be used externally. @@ -102,14 +77,6 @@ public MessageInfo(Message msg, String streamName, boolean direct) { else { lastSeq = JsonUtils.safeParseLong(tempLastSeq, -1); } - String tempNumPending = msgHeaders.getLast(NATS_NUM_PENDING); - if (tempNumPending == null) { - numPending = -1; - } - else { - // Num pending is +1 since it includes EOB message, correct that here. - numPending = Long.parseLong(tempNumPending) - 1; - } // these are control headers, not real headers so don't give them to the user. headers = new Headers(msgHeaders, true, MESSAGE_INFO_HEADERS); } @@ -121,7 +88,6 @@ else if (hasError()) { headers = null; stream = null; lastSeq = -1; - numPending = -1; } else { JsonValue mjv = readValue(jv, MESSAGE); @@ -133,7 +99,6 @@ else if (hasError()) { headers = hdrBytes == null ? null : new IncomingHeadersProcessor(hdrBytes).getHeaders(); stream = streamName; lastSeq = -1; - numPending = -1; } } @@ -193,14 +158,6 @@ public long getLastSeq() { return lastSeq; } - /** - * Amount of pending messages that can be requested with a subsequent batch request. - * @return number of pending messages - */ - public long getNumPending() { - return numPending; - } - @Override public String toString() { StringBuilder sb = JsonUtils.beginJsonPrefixed("\"MessageInfo\":"); @@ -217,7 +174,6 @@ public String toString() { JsonUtils.addField(sb, TIME, time); JsonUtils.addField(sb, STREAM, stream); JsonUtils.addField(sb, LAST_SEQ, lastSeq); - JsonUtils.addField(sb, NUM_PENDING, numPending); JsonUtils.addField(sb, SUBJECT, subject); JsonUtils.addField(sb, HDRS, headers); return JsonUtils.endJson(sb).toString(); diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java index 21ecf8a98..3d3dc3eeb 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java @@ -16,17 +16,12 @@ import io.nats.client.*; import io.nats.client.api.Error; import io.nats.client.api.*; -import io.nats.client.support.Status; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.ZonedDateTime; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; -import static io.nats.client.support.NatsJetStreamClientError.JsAllowDirectRequired; -import static io.nats.client.support.NatsJetStreamClientError.JsDirectBatchGet211NotAvailable; import static io.nats.client.support.Validator.*; public class NatsJetStreamManagement extends NatsJetStreamImpl implements JetStreamManagement { @@ -345,109 +340,6 @@ private MessageInfo _getMessage(String streamName, MessageGetRequest messageGetR } } - /** - * {@inheritDoc} - */ - @Override - public List fetchMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException { - validateMessageBatchGetRequest(streamName, messageBatchGetRequest); - List results = new ArrayList<>(); - _requestMessageBatch(streamName, messageBatchGetRequest, msg -> { - if (msg != MessageInfo.EOD) { - results.add(msg); - } - }); - return results; - } - - /** - * {@inheritDoc} - */ - @Override - public LinkedBlockingQueue queueMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException { - validateMessageBatchGetRequest(streamName, messageBatchGetRequest); - final LinkedBlockingQueue q = new LinkedBlockingQueue<>(); - conn.getOptions().getExecutor().submit(() -> _requestMessageBatch(streamName, messageBatchGetRequest, q::add)); - return q; - } - - /** - * {@inheritDoc} - */ - @Override - public void requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException { - validateMessageBatchGetRequest(streamName, messageBatchGetRequest); - _requestMessageBatch(streamName, messageBatchGetRequest, handler); - } - - public void _requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) { - Subscription sub = null; - try { - String replyTo = conn.createInbox(); - sub = conn.subscribe(replyTo); - - String requestSubject = prependPrefix(String.format(JSAPI_DIRECT_GET, streamName)); - conn.publish(requestSubject, replyTo, messageBatchGetRequest.serialize()); - - long maxTimeMillis = getTimeout().toMillis(); - long timeLeft = maxTimeMillis; - long start = System.currentTimeMillis(); - while (true) { - Message msg = sub.nextMessage(timeLeft); - if (msg == null) { - break; - } - if (msg.isStatusMessage()) { - Status status = msg.getStatus(); - // Report error, otherwise successful status. - if (status.getCode() < 200 || status.getCode() > 299) { - MessageInfo messageInfo = new MessageInfo(Error.convert(status), true); - handler.onMessageInfo(messageInfo); - } - break; - } - - Headers headers = msg.getHeaders(); - if (headers == null || headers.getLast(NATS_NUM_PENDING) == null) { - throw JsDirectBatchGet211NotAvailable.instance(); - } - - MessageInfo messageInfo = new MessageInfo(msg, streamName, true); - handler.onMessageInfo(messageInfo); - timeLeft = maxTimeMillis - (System.currentTimeMillis() - start); - } - } catch (InterruptedException e) { - // sub.nextMessage was fetching one message - // and data is not completely read - // so it seems like this is an error condition - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } finally { - try { - handler.onMessageInfo(MessageInfo.EOD); - } catch (Exception ignore) { - } - try { - //noinspection DataFlowIssue - sub.unsubscribe(); - } catch (Exception ignore) { - } - } - } - - private void validateMessageBatchGetRequest(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException { - validateNotNull(messageBatchGetRequest, "Message Batch Get Request"); - - if (!directBatchGet211Available) { - throw JsDirectBatchGet211NotAvailable.instance(); - } - - CachedStreamInfo csi = getCachedStreamInfo(streamName); - if (!csi.allowDirect) { - throw JsAllowDirectRequired.instance(); - } - } - /** * {@inheritDoc} */ diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index 2012b6ee2..826a5f772 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -23,13 +23,11 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.time.Instant; -import java.time.ZoneOffset; import java.time.ZonedDateTime; -import java.util.*; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import static io.nats.client.support.DateTimeUtils.DEFAULT_TIME; import static io.nats.client.support.DateTimeUtils.ZONE_ID_GMT; @@ -1550,222 +1548,4 @@ public void testCreateConsumerUpdateConsumer() throws Exception { assertEquals(fs1, ci.getConsumerConfiguration().getFilterSubject()); }); } - - @Test - public void testBatchDirectGet() throws Exception { - jsServer.run(TestBase::atLeast2_11, nc -> { - JetStream js = nc.jetStream(); - JetStreamManagement jsm = nc.jetStreamManagement(); - - TestingStreamContainer tsc = new TestingStreamContainer(nc); - assertFalse(tsc.si.getConfiguration().getAllowDirect()); - - List expected = Arrays.asList("foo", "bar", "baz"); - for (String data : expected) { - js.publish(tsc.subject(), data.getBytes(StandardCharsets.UTF_8)); - } - - List batch = new ArrayList<>(); - MessageInfoHandler handler = msg -> { - if (!msg.hasError() && msg != MessageInfo.EOD) { - batch.add(msg); - } - }; - - // Stream doesn't have AllowDirect enabled, will error. - assertThrows(IllegalArgumentException.class, () -> { - MessageBatchGetRequest request = MessageBatchGetRequest.builder().build(); - jsm.requestMessageBatch(tsc.stream, request, handler); - }); - - // Enable AllowDirect. - StreamConfiguration sc = StreamConfiguration.builder(tsc.si.getConfiguration()).allowDirect(true).build(); - StreamInfo si = jsm.updateStream(sc); - assertTrue(si.getConfiguration().getAllowDirect()); - - // Empty request errors. - AtomicBoolean hasError = new AtomicBoolean(); - MessageInfoHandler errorHandler = msg -> { - hasError.compareAndSet(false, msg.hasError()); - }; - MessageBatchGetRequest request = MessageBatchGetRequest.builder().build(); - jsm.requestMessageBatch(tsc.stream, request, errorHandler); - assertTrue(hasError.get()); - List list = jsm.fetchMessageBatch(tsc.stream, request); - assertEquals(1, list.size()); - assertTrue(list.get(0).hasError()); - LinkedBlockingQueue queue = jsm.queueMessageBatch(tsc.stream, request); - assertTrue(queue.take().hasError()); - assertEquals(MessageInfo.EOD, queue.take()); - - // First batch gets first two messages. - request = MessageBatchGetRequest.builder() - .batch(2) - .subject(tsc.subject()) - .build(); - jsm.requestMessageBatch(tsc.stream, request, handler); - MessageInfo last = batch.get(batch.size() - 1); - assertEquals(1, last.getNumPending()); - assertEquals(2, last.getSeq()); - assertEquals(1, last.getLastSeq()); - - // Second batch gets last message. - request = MessageBatchGetRequest.builder(request) - .sequence(last.getSeq() + 1) - .build(); - jsm.requestMessageBatch(tsc.stream, request, handler); - - List actual = batch.stream().map(m -> new String(m.getData())).collect(Collectors.toList()); - assertEquals(expected, actual); - - last = batch.get(batch.size() - 1); - assertEquals(0, last.getNumPending()); - assertEquals(3, last.getSeq()); - assertEquals(0, last.getLastSeq()); - }); - } - - @Test - public void testBatchDirectGetAlternatives() throws Exception { - jsServer.run(TestBase::atLeast2_11, nc -> { - JetStream js = nc.jetStream(); - JetStreamManagement jsm = nc.jetStreamManagement(); - - TestingStreamContainer tsc = new TestingStreamContainer(nc); - assertFalse(tsc.si.getConfiguration().getAllowDirect()); - - // Enable AllowDirect. - StreamConfiguration sc = StreamConfiguration.builder(tsc.si.getConfiguration()).allowDirect(true).build(); - StreamInfo si = jsm.updateStream(sc); - assertTrue(si.getConfiguration().getAllowDirect()); - - List expected = Arrays.asList("foo", "bar", "baz"); - for (String data : expected) { - js.publish(tsc.subject(), data.getBytes(StandardCharsets.UTF_8)); - } - - // Request stays the same for all options. - MessageBatchGetRequest request = MessageBatchGetRequest.builder() - .batch(3) - .subject(tsc.subject()) - .build(); - - // Get using handler. - List batch = new ArrayList<>(); - MessageInfoHandler handler = msg -> { - if (!msg.hasError() && msg != MessageInfo.EOD) { - batch.add(msg); - } - }; - jsm.requestMessageBatch(tsc.stream, request, handler); - assertEquals(3, batch.size()); - MessageInfo last = batch.get(batch.size() - 1); - assertEquals(0, last.getNumPending()); - assertEquals(3, last.getSeq()); - assertEquals(2, last.getLastSeq()); - - // Get using queue. - batch.clear(); - LinkedBlockingQueue queue = jsm.queueMessageBatch(tsc.stream, request); - MessageInfo msg; - while ((msg = queue.take()) != MessageInfo.EOD) { - if (!msg.hasError()) { - batch.add(msg); - } - } - assertEquals(3, batch.size()); - last = batch.get(batch.size() - 1); - assertEquals(0, last.getNumPending()); - assertEquals(3, last.getSeq()); - assertEquals(2, last.getLastSeq()); - - // Get using fetch. - batch.clear(); - batch.addAll(jsm.fetchMessageBatch(tsc.stream, request)); - assertEquals(3, batch.size()); - last = batch.get(batch.size() - 1); - assertEquals(0, last.getNumPending()); - assertEquals(3, last.getSeq()); - assertEquals(2, last.getLastSeq()); - }); - } - - @Test - public void testBatchDirectGetMultiLast() throws Exception { - jsServer.run(TestBase::atLeast2_11, nc -> { - JetStream js = nc.jetStream(); - JetStreamManagement jsm = nc.jetStreamManagement(); - - String stream = stream(); - jsm.addStream(StreamConfiguration.builder() - .name(stream) - .subjects(stream + ".a.>") - .allowDirect(true) - .build()); - - String subjectAFoo = stream + ".a.foo"; - String subjectABar = stream + ".a.bar"; - String subjectABaz = stream + ".a.baz"; - js.publish(subjectAFoo, "foo".getBytes(StandardCharsets.UTF_8)); - js.publish(subjectABar, "bar".getBytes(StandardCharsets.UTF_8)); - js.publish(subjectABaz, "baz".getBytes(StandardCharsets.UTF_8)); - - MessageBatchGetRequest request = MessageBatchGetRequest.builder() - .multiLastForSubjects(subjectAFoo, subjectABaz) - .build(); - - List keys = new ArrayList<>(); - MessageInfoHandler handler = msg -> { - if (!msg.hasError() && msg != MessageInfo.EOD) { - keys.add(msg.getSubject()); - } - }; - jsm.requestMessageBatch(stream, request, handler); - assertEquals(2, keys.size()); - assertEquals(subjectAFoo, keys.get(0)); - assertEquals(subjectABaz, keys.get(1)); - }); - } - - @Test - public void testBatchDirectGetBuilder() { - // Request options. - MessageBatchGetRequest requestOptions = MessageBatchGetRequest.builder() - .maxBytes(1234) - .batch(2) - .build(); - assertEquals(1234, requestOptions.getMaxBytes()); - assertEquals(2, requestOptions.getBatch()); - assertEquals("{\"batch\":2,\"max_bytes\":1234}", requestOptions.toJson()); - - // Batch direct get - simple - ZonedDateTime time = Instant.EPOCH.atZone(ZoneOffset.UTC); - MessageBatchGetRequest simple = MessageBatchGetRequest.builder() - .sequence(1) - .startTime(time) - .subject("subject") - .build(); - assertEquals(1, simple.getSequence()); - assertEquals(time, simple.getStartTime()); - assertEquals("subject", simple.getSubject()); - assertEquals("{\"seq\":1,\"start_time\":\"1970-01-01T00:00:00.000000000Z\",\"next_by_subj\":\"subject\"}", simple.toJson()); - - // Batch direct get - multi last - List multiLastFor = Collections.singletonList("multi.last"); - MessageBatchGetRequest multiLast = MessageBatchGetRequest.builder() - .multiLastForSubjects("multi.last") - .upToSequence(1) - .upToTime(time) - .build(); - assertEquals(Collections.singletonList("multi.last"), multiLast.getMultiLastForSubjects()); - assertEquals(1, multiLast.getUpToSequence()); - assertEquals(time, multiLast.getUpToTime()); - assertEquals("{\"multi_last\":[\"multi.last\"],\"up_to_seq\":1,\"up_to_time\":\"1970-01-01T00:00:00.000000000Z\"}", multiLast.toJson()); - - MessageBatchGetRequest multiLastAlternative = MessageBatchGetRequest.builder() - .multiLastForSubjects(multiLastFor) - .build(); - assertEquals(multiLastFor, multiLastAlternative.getMultiLastForSubjects()); - assertEquals("{\"multi_last\":[\"multi.last\"]}", multiLastAlternative.toJson()); - } }