diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperationsHandler.java b/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperationsHandler.java index ff5077bbb0..1842004f7a 100755 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperationsHandler.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperationsHandler.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2023 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2011-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -109,21 +109,29 @@ public final void channelRead(ChannelHandlerContext ctx, Object msg) { return; } try { - ChannelOperations ops = ChannelOperations.get(ctx.channel()); + Connection connection = Connection.from(ctx.channel()); + ChannelOperations ops = connection.as(ChannelOperations.class); if (ops != null) { ops.onInboundNext(ctx, msg); } else { - if (log.isDebugEnabled()) { - if (msg instanceof DecoderResultProvider) { - DecoderResult decoderResult = ((DecoderResultProvider) msg).decoderResult(); - if (decoderResult.isFailure()) { + if (msg instanceof DecoderResultProvider) { + DecoderResult decoderResult = ((DecoderResultProvider) msg).decoderResult(); + if (decoderResult.isFailure()) { + if (log.isDebugEnabled()) { log.debug(format(ctx.channel(), "Decoding failed."), decoderResult.cause()); } + + //"FutureReturnValueIgnored" this is deliberate + ctx.close(); + listener.onUncaughtException(connection, decoderResult.cause()); } + } + if (log.isDebugEnabled()) { log.debug(format(ctx.channel(), "No ChannelOperation attached.")); } + ReferenceCountUtil.release(msg); } } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientOperations.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientOperations.java index cedbbad2de..d114afb0e1 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientOperations.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientOperations.java @@ -683,11 +683,10 @@ protected void onOutboundError(Throwable err) { protected void onInboundNext(ChannelHandlerContext ctx, Object msg) { if (msg instanceof HttpResponse) { HttpResponse response = (HttpResponse) msg; - if (response.decoderResult() - .isFailure()) { - onInboundError(response.decoderResult() - .cause()); + if (response.decoderResult().isFailure()) { + onInboundError(response.decoderResult().cause()); ReferenceCountUtil.release(msg); + terminate(); return; } if (HttpResponseStatus.CONTINUE.equals(response.status())) { @@ -751,6 +750,13 @@ protected void onInboundNext(ChannelHandlerContext ctx, Object msg) { if (msg instanceof LastHttpContent) { LastHttpContent lastHttpContent = (LastHttpContent) msg; + if (lastHttpContent.decoderResult().isFailure()) { + onInboundError(lastHttpContent.decoderResult().cause()); + lastHttpContent.release(); + terminate(); + return; + } + if (is100Continue) { lastHttpContent.release(); channel().read(); diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java index 5b8eafc93e..973d2f33d8 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java @@ -3346,4 +3346,64 @@ private void doTestIssue1943(HttpProtocol protocol) { .block(Duration.ofSeconds(5)); } } + + @Test + void testIssue3285NoOperations() throws Exception { + testIssue3285("HTTP/1.1 200 OK\r\nContent-Length:4\r\n\r\ntest\r\n\r\nsomething\r\n\r\n", null); + } + + @Test + void testIssue3285LastContent() throws Exception { + testIssue3285("HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\ntest\r\n\r\n", NumberFormatException.class); + } + + @Test + void testIssue3285HttpResponse() throws Exception { + testIssue3285("HTTP/1 200 OK\r\n\r\n", IllegalArgumentException.class); + } + + void testIssue3285(String serverResponse, @Nullable Class expectedException) throws Exception { + disposableServer = + TcpServer.create() + .host("localhost") + .port(0) + .wiretap(true) + .handle((in, out) -> in.receive().flatMap(b -> out.sendString(Mono.just(serverResponse)))) + .bindNow(); + + CountDownLatch latch = new CountDownLatch(2); + ConnectionProvider provider = ConnectionProvider.create("testIssue3285", 1); + HttpClient client = createHttpClientForContextWithAddress(provider) + .doOnRequest((req, conn) -> conn.channel().closeFuture().addListener(f -> latch.countDown())); + + try (LogTracker logTracker = new LogTracker("reactor.netty.channel.ChannelOperationsHandler", 2, "Decoding failed.")) { + testIssue3285SendRequest(client, expectedException); + + testIssue3285SendRequest(client, expectedException); + + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + + if (expectedException == null) { + assertThat(logTracker.latch.await(5, TimeUnit.SECONDS)).isTrue(); + } + } + } + + static void testIssue3285SendRequest(HttpClient client, @Nullable Class exception) { + Mono response = + client.get() + .uri("/") + .responseSingle((res, bytes) -> bytes.asString()); + if (exception != null) { + response.as(StepVerifier::create) + .expectError(exception) + .verify(Duration.ofSeconds(5)); + } + else { + response.as(StepVerifier::create) + .expectNext("test") + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + } }