Skip to content

Commit

Permalink
Merge #3290 into 1.2.0-M4
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Jun 12, 2024
2 parents c355a48 + 203d185 commit d65f31e
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2011-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -109,21 +109,29 @@ public final void channelRead(ChannelHandlerContext ctx, Object msg) {
return;
}
try {
ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());
Connection connection = Connection.from(ctx.channel());
ChannelOperations<?, ?> ops = connection.as(ChannelOperations.class);
if (ops != null) {
ops.onInboundNext(ctx, msg);
}
else {
if (log.isDebugEnabled()) {
if (msg instanceof DecoderResultProvider) {
DecoderResult decoderResult = ((DecoderResultProvider) msg).decoderResult();
if (decoderResult.isFailure()) {
if (msg instanceof DecoderResultProvider) {
DecoderResult decoderResult = ((DecoderResultProvider) msg).decoderResult();
if (decoderResult.isFailure()) {
if (log.isDebugEnabled()) {
log.debug(format(ctx.channel(), "Decoding failed."), decoderResult.cause());
}

//"FutureReturnValueIgnored" this is deliberate
ctx.close();
listener.onUncaughtException(connection, decoderResult.cause());
}
}

if (log.isDebugEnabled()) {
log.debug(format(ctx.channel(), "No ChannelOperation attached."));
}

ReferenceCountUtil.release(msg);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,11 +683,10 @@ protected void onOutboundError(Throwable err) {
protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse) msg;
if (response.decoderResult()
.isFailure()) {
onInboundError(response.decoderResult()
.cause());
if (response.decoderResult().isFailure()) {
onInboundError(response.decoderResult().cause());
ReferenceCountUtil.release(msg);
terminate();
return;
}
if (HttpResponseStatus.CONTINUE.equals(response.status())) {
Expand Down Expand Up @@ -751,6 +750,13 @@ protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {

if (msg instanceof LastHttpContent) {
LastHttpContent lastHttpContent = (LastHttpContent) msg;
if (lastHttpContent.decoderResult().isFailure()) {
onInboundError(lastHttpContent.decoderResult().cause());
lastHttpContent.release();
terminate();
return;
}

if (is100Continue) {
lastHttpContent.release();
channel().read();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3346,4 +3346,64 @@ private void doTestIssue1943(HttpProtocol protocol) {
.block(Duration.ofSeconds(5));
}
}

@Test
void testIssue3285NoOperations() throws Exception {
testIssue3285("HTTP/1.1 200 OK\r\nContent-Length:4\r\n\r\ntest\r\n\r\nsomething\r\n\r\n", null);
}

@Test
void testIssue3285LastContent() throws Exception {
testIssue3285("HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\ntest\r\n\r\n", NumberFormatException.class);
}

@Test
void testIssue3285HttpResponse() throws Exception {
testIssue3285("HTTP/1 200 OK\r\n\r\n", IllegalArgumentException.class);
}

void testIssue3285(String serverResponse, @Nullable Class<? extends Throwable> expectedException) throws Exception {
disposableServer =
TcpServer.create()
.host("localhost")
.port(0)
.wiretap(true)
.handle((in, out) -> in.receive().flatMap(b -> out.sendString(Mono.just(serverResponse))))
.bindNow();

CountDownLatch latch = new CountDownLatch(2);
ConnectionProvider provider = ConnectionProvider.create("testIssue3285", 1);
HttpClient client = createHttpClientForContextWithAddress(provider)
.doOnRequest((req, conn) -> conn.channel().closeFuture().addListener(f -> latch.countDown()));

try (LogTracker logTracker = new LogTracker("reactor.netty.channel.ChannelOperationsHandler", 2, "Decoding failed.")) {
testIssue3285SendRequest(client, expectedException);

testIssue3285SendRequest(client, expectedException);

assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();

if (expectedException == null) {
assertThat(logTracker.latch.await(5, TimeUnit.SECONDS)).isTrue();
}
}
}

static void testIssue3285SendRequest(HttpClient client, @Nullable Class<? extends Throwable> exception) {
Mono<String> response =
client.get()
.uri("/")
.responseSingle((res, bytes) -> bytes.asString());
if (exception != null) {
response.as(StepVerifier::create)
.expectError(exception)
.verify(Duration.ofSeconds(5));
}
else {
response.as(StepVerifier::create)
.expectNext("test")
.expectComplete()
.verify(Duration.ofSeconds(5));
}
}
}

0 comments on commit d65f31e

Please sign in to comment.