diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/WebsocketTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/WebsocketTest.java index 84186f3a98..8b16e502e4 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/WebsocketTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/WebsocketTest.java @@ -32,14 +32,18 @@ import java.util.function.Predicate; import java.util.stream.Stream; +import io.netty.buffer.DefaultByteBufHolder; import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.CorruptedFrameException; +import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.cookie.ClientCookieDecoder; import io.netty.handler.codec.http.cookie.ClientCookieEncoder; +import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; +import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; @@ -49,6 +53,7 @@ import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.ssl.util.SelfSignedCertificate; +import io.netty.util.CharsetUtil; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Named; import org.junit.jupiter.api.Test; @@ -80,6 +85,7 @@ import reactor.util.annotation.Nullable; import reactor.util.function.Tuple2; +import static io.netty.handler.codec.http.websocketx.WebSocketCloseStatus.ABNORMAL_CLOSURE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; @@ -1517,6 +1523,55 @@ public void testIssue3036(HttpProtocol[] serverProtocols, HttpProtocol[] clientP assertThat(responseHeaders.get()).contains("permessage-deflate"); } + @Test + void testIssue3295() throws Exception { + AtomicReference serverError = new AtomicReference<>(); + CountDownLatch serverLatch = new CountDownLatch(1); + disposableServer = + createServer() + .handle((req, res) -> res.sendWebsocket((in, out) -> + in.aggregateFrames(10) + .receiveFrames() + .doOnError(t -> { + serverError.set(t); + serverLatch.countDown(); + }) + .cast(BinaryWebSocketFrame.class) + .map(DefaultByteBufHolder::content) + .then())) + .bindNow(); + + AtomicReference clientStatus = new AtomicReference<>(); + AtomicReference connection = new AtomicReference<>(); + CountDownLatch clientLatch = new CountDownLatch(1); + byte[] content1 = "Content1".getBytes(CharsetUtil.UTF_8); + byte[] content2 = "Content2".getBytes(CharsetUtil.UTF_8); + byte[] content3 = "Content3".getBytes(CharsetUtil.UTF_8); + createClient(disposableServer.port()) + .websocket() + .handle((in, out) -> { + in.withConnection(connection::set); + in.receiveCloseStatus().subscribe(s -> { + clientStatus.set(s); + clientLatch.countDown(); + }); + return out.sendObject(Flux.just( + new BinaryWebSocketFrame(false, 0, Unpooled.wrappedBuffer(content1)), + new ContinuationWebSocketFrame(false, 0, Unpooled.wrappedBuffer(content2)), + new ContinuationWebSocketFrame(true, 0, Unpooled.wrappedBuffer(content3)))); + }) + .then() + .block(Duration.ofSeconds(5)); + + assertThat(serverLatch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(serverError.get()).isNotNull().isInstanceOf(TooLongFrameException.class); + + assertThat(clientLatch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(clientStatus.get()).isNotNull().isEqualTo(ABNORMAL_CLOSURE); + assertThat(connection.get()).isNotNull(); + assertThat(connection.get().channel().isActive()).isFalse(); + } + static Stream http11CompatibleProtocols() { return Stream.of( Arguments.of(new HttpProtocol[]{HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, null, null),