From 5ae457f055e21270fa1e34134d834a33656cbd5e Mon Sep 17 00:00:00 2001 From: scottf Date: Thu, 21 Sep 2023 18:04:49 -0400 Subject: [PATCH 1/2] Revert remove client side check against server info max_payload when publishing --- README.md | 12 +----- src/main/java/io/nats/client/Options.java | 23 ++++++----- .../io/nats/client/impl/NatsConnection.java | 9 ++++ .../java/io/nats/client/OptionsTests.java | 31 +++++++++----- .../java/io/nats/client/PublishTests.java | 41 +++++++++++++++++++ src/test/resources/max_payload.conf | 1 + 6 files changed, 84 insertions(+), 33 deletions(-) create mode 100644 src/test/resources/max_payload.conf diff --git a/README.md b/README.md index 13f20dc22..d3aba310d 100644 --- a/README.md +++ b/README.md @@ -83,17 +83,6 @@ In this release, support was added to For details on the other features, see the "Options" sections -#### Version 2.16.12: Max Payload Check - -As of version 2.16.12, there is no longer client side checking -1. that a message payload is less than the server configuration (Core and JetStream publishes) -2. is less than the stream configuration (JetStream publishes) - -Please see unit test for examples of this behavior. -`testMaxPayload` in [PublishTests.java](src/test/java/io/nats/client/PublishTests.java) -and -`testMaxPayloadJs` in [JetStreamPubTests.cs](src/test/java/io/nats/client/impl/JetStreamPubTests.java) - #### Version 2.16.8: Websocket Support As of version 2.16.8 Websocket (`ws` and `wss`) protocols are supported for connecting to the server. @@ -247,6 +236,7 @@ When options are built, the SSLContext will be accepted or created in the follow | io.nats.client.norandomize | Property used to configure noRandomize. | | io.nats.client.noResolveHostnames | Property used to configure noResolveHostnames. | | io.nats.client.reportNoResponders | Property used to configure reportNoResponders. | +| io.nats.clientsidelimitchecks | Property use to configure clientsidelimitchecks. | | io.nats.client.url | Property used to configure server. The value can be a comma-separated list of server URLs. | | io.nats.client.servers | Property used to configure servers. The value can be a comma-separated list of server URLs. | | io.nats.client.password | Property used to configure userinfo password. | diff --git a/src/main/java/io/nats/client/Options.java b/src/main/java/io/nats/client/Options.java index 718991714..2961b13f3 100644 --- a/src/main/java/io/nats/client/Options.java +++ b/src/main/java/io/nats/client/Options.java @@ -318,6 +318,10 @@ public class Options { * Property used to configure a builder from a Properties object. {@value}, see {@link Builder#reportNoResponders() reportNoResponders}. */ public static final String PROP_REPORT_NO_RESPONDERS = PFX + "reportNoResponders"; + /** + * Property used to configure a builder from a Properties object. {@value}, see {@link Builder#clientSideLimitChecks() clientSideLimitChecks}. + */ + public static final String PROP_CLIENT_SIDE_LIMIT_CHECKS = PFX + "clientsidelimitchecks"; /** * Property used to configure a builder from a Properties object. {@value}, * see {@link Builder#servers(String[]) servers}. The value can be a comma-separated list of server URLs. @@ -423,12 +427,6 @@ public class Options { * Property used to set the path to a credentials file to be used in a FileAuthHandler */ public static final String PROP_CREDENTIAL_PATH = PFX + "credential.path"; - /** - * Property used to configure a builder from a Properties object. {@value}, see {@link Builder#clientSideLimitChecks() clientSideLimitChecks}. - * @deprecated Client Side Limit checks are no longer performed. - */ - @Deprecated - public static final String PROP_CLIENT_SIDE_LIMIT_CHECKS = PFX + "clientsidelimitchecks"; /** * This property is used to enable support for UTF8 subjects. See {@link Builder#supportUTF8Subjects() supportUTF8Subjects()} * @deprecated only plain ascii subjects are supported @@ -557,6 +555,7 @@ public class Options { private final boolean noEcho; private final boolean noHeaders; private final boolean noNoResponders; + private final boolean clientSideLimitChecks; private final int maxMessagesInOutgoingQueue; private final boolean discardMessagesWhenOutgoingQueueFull; private final boolean ignoreDiscoveredServers; @@ -661,6 +660,7 @@ public static class Builder { private boolean noEcho = false; private boolean noHeaders = false; private boolean noNoResponders = false; + private boolean clientSideLimitChecks = true; private String inboxPrefix = DEFAULT_INBOX_PREFIX; private int maxMessagesInOutgoingQueue = DEFAULT_MAX_MESSAGES_IN_OUTGOING_QUEUE; private boolean discardMessagesWhenOutgoingQueueFull = DEFAULT_DISCARD_MESSAGES_WHEN_OUTGOING_QUEUE_FULL; @@ -762,6 +762,7 @@ public Builder properties(Properties props) { booleanProperty(props, PROP_NO_ECHO, b -> this.noEcho = b); booleanProperty(props, PROP_NO_HEADERS, b -> this.noHeaders = b); booleanProperty(props, PROP_NO_NORESPONDERS, b -> this.noNoResponders = b); + booleanProperty(props, PROP_CLIENT_SIDE_LIMIT_CHECKS, b -> this.clientSideLimitChecks = b); booleanProperty(props, PROP_PEDANTIC, b -> this.pedantic = b); intProperty(props, PROP_MAX_RECONNECT, DEFAULT_MAX_RECONNECT, i -> this.maxReconnect = i); @@ -896,12 +897,12 @@ public Builder noNoResponders() { } /** - * @deprecated Client Side Limit checks are no longer performed. + * Set client side limit checks. Default is true * @param checks the checks flag * @return the Builder for chaining */ - @Deprecated public Builder clientSideLimitChecks(boolean checks) { + this.clientSideLimitChecks = checks; return this; } @@ -1602,6 +1603,7 @@ public Builder(Options o) { this.noEcho = o.noEcho; this.noHeaders = o.noHeaders; this.noNoResponders = o.noNoResponders; + this.clientSideLimitChecks = o.clientSideLimitChecks; this.inboxPrefix = o.inboxPrefix; this.traceConnection = o.traceConnection; this.maxMessagesInOutgoingQueue = o.maxMessagesInOutgoingQueue; @@ -1661,6 +1663,7 @@ private Options(Builder b) { this.noEcho = b.noEcho; this.noHeaders = b.noHeaders; this.noNoResponders = b.noNoResponders; + this.clientSideLimitChecks = b.clientSideLimitChecks; this.inboxPrefix = b.inboxPrefix; this.traceConnection = b.traceConnection; this.maxMessagesInOutgoingQueue = b.maxMessagesInOutgoingQueue; @@ -1840,12 +1843,10 @@ public boolean isNoNoResponders() { } /** - * @deprecated Client Side Limit checks are no longer performed. * @return clientSideLimitChecks flag */ - @Deprecated public boolean clientSideLimitChecks() { - return false; + return clientSideLimitChecks; } /** diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index 8aaba3605..c4b738763 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -822,6 +822,7 @@ public void publish(Message message) { void publishInternal(String subject, String replyTo, Headers headers, byte[] data) { checkIfNeedsHeaderSupport(headers); + checkPayloadSize(data); if (isClosed()) { throw new IllegalStateException("Connection is Closed"); @@ -847,6 +848,12 @@ private void checkIfNeedsHeaderSupport(Headers headers) { } } + private void checkPayloadSize(byte[] body) { + if (options.clientSideLimitChecks() && body != null && body.length > this.getMaxPayload() && this.getMaxPayload() > 0) { + throw new IllegalArgumentException( + "Message payload size exceed server configuration " + body.length + " vs " + this.getMaxPayload()); + } + } /** * {@inheritDoc} */ @@ -1120,6 +1127,8 @@ public CompletableFuture request(Message message) { } CompletableFuture requestFutureInternal(String subject, Headers headers, byte[] data, Duration futureTimeout, CancelAction cancelAction) { + checkPayloadSize(data); + if (isClosed()) { throw new IllegalStateException("Connection is Closed"); } else if (isDraining()) { diff --git a/src/test/java/io/nats/client/OptionsTests.java b/src/test/java/io/nats/client/OptionsTests.java index 6a8160402..2ea22a4b9 100644 --- a/src/test/java/io/nats/client/OptionsTests.java +++ b/src/test/java/io/nats/client/OptionsTests.java @@ -14,10 +14,7 @@ package io.nats.client; import io.nats.client.ConnectionListener.Events; -import io.nats.client.impl.DataPort; -import io.nats.client.impl.ErrorListenerLoggerImpl; -import io.nats.client.impl.TestHandler; -import io.nats.client.impl.TestStatisticsCollector; +import io.nats.client.impl.*; import io.nats.client.support.HttpRequest; import io.nats.client.support.NatsUri; import io.nats.client.utils.CloseOnUpgradeAttempt; @@ -354,23 +351,35 @@ private static void _testPropertiesSSLOptions(Options o) { @SuppressWarnings("deprecation") @Test public void testDeprecated() { - // clientSideLimitChecks, supportUTF8Subjects are deprecated and always returns false + // supportUTF8Subjects are deprecated and always returns false Options o = new Options.Builder().build(); - assertFalse(o.clientSideLimitChecks()); assertFalse(o.supportUTF8Subjects()); - o = new Options.Builder().clientSideLimitChecks(true).supportUTF8Subjects().build(); - assertFalse(o.clientSideLimitChecks()); + o = new Options.Builder().supportUTF8Subjects().build(); assertFalse(o.supportUTF8Subjects()); Properties props = new Properties(); - props.setProperty(Options.PROP_CLIENT_SIDE_LIMIT_CHECKS, "true"); props.setProperty(Options.PROP_UTF8_SUBJECTS, "true"); o = new Options.Builder(props).build(); - assertFalse(o.clientSideLimitChecks()); assertFalse(o.supportUTF8Subjects()); } + @Test + public void testBuilderCoverageOptions() { + Options o = new Options.Builder().build(); + assertTrue(o.clientSideLimitChecks()); + assertNull(o.getServerPool()); // there is a default provider + + o = new Options.Builder().clientSideLimitChecks(true).build(); + assertTrue(o.clientSideLimitChecks()); + o = new Options.Builder() + .clientSideLimitChecks(false) + .serverPool(new NatsServerPool()) + .build(); + assertFalse(o.clientSideLimitChecks()); + assertNotNull(o.getServerPool()); + } + @Test public void testProperties() throws Exception { Properties props = new Properties(); @@ -492,7 +501,7 @@ private static void _testPropertiesCoverageOptions(Options o) { assertNull(o.getSslContext()); assertTrue(o.isNoHeaders()); assertTrue(o.isNoNoResponders()); - assertFalse(o.clientSideLimitChecks()); // clientSideLimitChecks is deprecated and always returns false + assertTrue(o.clientSideLimitChecks()); assertTrue(o.isIgnoreDiscoveredServers()); assertTrue(o.isNoResolveHostnames()); } diff --git a/src/test/java/io/nats/client/PublishTests.java b/src/test/java/io/nats/client/PublishTests.java index 4b2001d68..6f493ae46 100644 --- a/src/test/java/io/nats/client/PublishTests.java +++ b/src/test/java/io/nats/client/PublishTests.java @@ -18,10 +18,12 @@ import org.junit.jupiter.api.Test; import java.io.IOException; +import java.net.SocketException; import java.nio.charset.StandardCharsets; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import static io.nats.client.support.NatsConstants.*; @@ -64,6 +66,45 @@ public void testThrowsWithoutSubject() { }); } + @Test + public void testThrowsIfTooBig() throws Exception { + try (NatsTestServer ts = new NatsTestServer("src/test/resources/max_payload.conf", false, false)) + { + Connection nc = Nats.connect(ts.getURI()); + assertSame(Connection.Status.CONNECTED, nc.getStatus(), "Connected Status"); + + byte[] body = new byte[1001]; + assertThrows(IllegalArgumentException.class, () -> nc.publish("subject", null, null, body)); + nc.close(); + + AtomicBoolean mpv = new AtomicBoolean(false); + AtomicBoolean se = new AtomicBoolean(false); + ErrorListener el = new ErrorListener() { + @Override + public void errorOccurred(Connection conn, String error) { + mpv.set(error.contains("Maximum Payload Violation")); + } + + @Override + public void exceptionOccurred(Connection conn, Exception exp) { + se.set(exp instanceof SocketException); + } + }; + Options options = Options.builder() + .server(ts.getURI()) + .clientSideLimitChecks(false) + .errorListener(el) + .build(); + Connection nc2 = Nats.connect(options); + assertSame(Connection.Status.CONNECTED, nc2.getStatus(), "Connected Status"); + nc2.publish("subject", null, null, body); + + sleep(100); + assertTrue(mpv.get()); + assertTrue(se.get()); + } + } + @Test public void testThrowsIfheadersNotSupported() { assertThrows(IllegalArgumentException.class, () -> { diff --git a/src/test/resources/max_payload.conf b/src/test/resources/max_payload.conf new file mode 100644 index 000000000..d1dcd5427 --- /dev/null +++ b/src/test/resources/max_payload.conf @@ -0,0 +1 @@ +max_payload: 1000 From 540e8318be2f27240268bb720977f3565914bad7 Mon Sep 17 00:00:00 2001 From: scottf Date: Fri, 22 Sep 2023 10:11:38 -0400 Subject: [PATCH 2/2] proper exceptions with reverted check --- src/test/java/io/nats/client/PublishTests.java | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/src/test/java/io/nats/client/PublishTests.java b/src/test/java/io/nats/client/PublishTests.java index 6f493ae46..6671cfb6e 100644 --- a/src/test/java/io/nats/client/PublishTests.java +++ b/src/test/java/io/nats/client/PublishTests.java @@ -238,17 +238,27 @@ public void testMaxPayload() throws Exception { nc.publish("mptest", new byte[maxPayload-1]); nc.publish("mptest", new byte[maxPayload]); }); + try { - runInServer(standardOptionsBuilder().noReconnect(), nc -> { + runInServer(standardOptionsBuilder().noReconnect().clientSideLimitChecks(false), nc -> { int maxPayload = (int)nc.getServerInfo().getMaxPayload(); for (int x = 1; x < 1000; x++) { nc.publish("mptest", new byte[maxPayload + x]); } }); + fail("Expecting IllegalStateException"); } - catch (IllegalStateException e) { - return; + catch (IllegalStateException ignore) {} + + try { + runInServer(standardOptionsBuilder().noReconnect(), nc -> { + int maxPayload = (int)nc.getServerInfo().getMaxPayload(); + for (int x = 1; x < 1000; x++) { + nc.publish("mptest", new byte[maxPayload + x]); + } + }); + fail("Expecting IllegalArgumentException"); } - fail("Expecting connection to be closed"); + catch (IllegalArgumentException ignore) {} } }