diff --git a/vertx-core/src/main/java/io/vertx/core/net/impl/VertxConnection.java b/vertx-core/src/main/java/io/vertx/core/net/impl/VertxConnection.java index 5d93897e3cb..6ed702c6ea7 100644 --- a/vertx-core/src/main/java/io/vertx/core/net/impl/VertxConnection.java +++ b/vertx-core/src/main/java/io/vertx/core/net/impl/VertxConnection.java @@ -463,8 +463,8 @@ public final void doResume() { return; } paused = false; - if (pending != null) { - assert !read; + if (pending != null && !pending.isEmpty()) { + boolean end = !read; read = true; try { Object msg; @@ -472,7 +472,9 @@ public final void doResume() { handleMessage(msg); } } finally { - endReadAndFlush(); + if (end) { + endReadAndFlush(); + } if (pending.isEmpty() && !autoRead) { autoRead = true; chctx.channel().config().setAutoRead(true); diff --git a/vertx-core/src/test/java/io/vertx/tests/deployment/VirtualThreadDeploymentTest.java b/vertx-core/src/test/java/io/vertx/tests/deployment/VirtualThreadDeploymentTest.java index 525e02d4016..5ea96989b42 100644 --- a/vertx-core/src/test/java/io/vertx/tests/deployment/VirtualThreadDeploymentTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/deployment/VirtualThreadDeploymentTest.java @@ -192,9 +192,7 @@ public Future start() throws Exception { .compose(HttpClientRequest::send) .await(); } catch (Throwable e) { - if (e instanceof InterruptedException) { - interruptedThreads.add(Thread.currentThread()); - } + interruptedThreads.add(Thread.currentThread()); } }); } diff --git a/vertx-core/src/test/java/io/vertx/tests/net/ConnectionBaseTest.java b/vertx-core/src/test/java/io/vertx/tests/net/ConnectionBaseTest.java index 14dec4106f6..91e7daad049 100644 --- a/vertx-core/src/test/java/io/vertx/tests/net/ConnectionBaseTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/net/ConnectionBaseTest.java @@ -634,4 +634,30 @@ public void testPauseWhenResuming() { ch.runPendingTasks(); assertEquals(2, count.get()); } + + @Test + public void testResumeWhenReadInProgress() { + MessageFactory factory = new MessageFactory(); + EmbeddedChannel ch = new EmbeddedChannel(); + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(VertxHandler.create(chctx -> new TestConnection(chctx))); + TestConnection connection = (TestConnection) pipeline.get(VertxHandler.class).getConnection(); + AtomicInteger count = new AtomicInteger(); + connection.handler = event -> count.incrementAndGet(); + connection.pause(); + pipeline.fireChannelRead(factory.next()); + assertEquals(0, count.get()); + Object expected = new Object(); + connection.write(expected, false, ch.newPromise()); + connection.resume(); + assertEquals(0, count.get()); + assertTrue(ch.hasPendingTasks()); + ch.runPendingTasks(); + assertEquals(1, count.get()); + Object outbound = ch.readOutbound(); + assertNull(outbound); + pipeline.fireChannelReadComplete(); + outbound = ch.readOutbound(); + assertSame(expected, outbound); + } }