diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/ProxyProtocolClientConnectionFactory.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/ProxyProtocolClientConnectionFactory.java
index 4eef6cfdb79..5f864b33d74 100644
--- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/ProxyProtocolClientConnectionFactory.java
+++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/ProxyProtocolClientConnectionFactory.java
@@ -460,7 +460,7 @@ public Connection newConnection(EndPoint endPoint, Map context)
protected abstract ProxyProtocolConnection newProxyProtocolConnection(EndPoint endPoint, Map context);
- protected abstract static class ProxyProtocolConnection extends AbstractConnection implements Callback
+ protected abstract static class ProxyProtocolConnection extends AbstractConnection.NonBlocking implements Callback
{
static final Logger LOG = LoggerFactory.getLogger(ProxyProtocolConnection.class);
@@ -508,12 +508,6 @@ public void failed(Throwable x)
promise.failed(x);
}
- @Override
- public InvocationType getInvocationType()
- {
- return InvocationType.NON_BLOCKING;
- }
-
@Override
public void onFillable()
{
diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java
index 9a78993bdc4..f991af3d9ab 100644
--- a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java
+++ b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java
@@ -34,6 +34,7 @@
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.Attributes;
+import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.slf4j.Logger;
@@ -43,6 +44,7 @@ public class ServerFCGIConnection extends AbstractMetaDataConnection implements
{
private static final Logger LOG = LoggerFactory.getLogger(ServerFCGIConnection.class);
+ private final Callback fillableCallback = new FillableCallback();
private final HttpChannel.Factory httpChannelFactory = new HttpChannel.DefaultFactory();
private final Attributes attributes = new Lazy();
private final Connector connector;
@@ -161,7 +163,7 @@ public void clearAttributes()
public void onOpen()
{
super.onOpen();
- fillInterested();
+ fillInterested(fillableCallback);
}
@Override
@@ -189,7 +191,7 @@ public void onFillable()
else if (read == 0)
{
releaseInputBuffer();
- fillInterested();
+ fillInterested(fillableCallback);
return;
}
else
@@ -305,7 +307,7 @@ void onCompleted(Throwable failure)
{
releaseInputBuffer();
if (failure == null)
- fillInterested();
+ fillInterested(fillableCallback);
else
getFlusher().shutdown();
}
@@ -421,4 +423,27 @@ public void close()
super.close();
}
}
+
+ private class FillableCallback implements Callback
+ {
+ private final InvocationType invocationType = getConnector().getServer().getInvocationType();
+
+ @Override
+ public void succeeded()
+ {
+ onFillable();
+ }
+
+ @Override
+ public void failed(Throwable x)
+ {
+ onFillInterestedFailed(x);
+ }
+
+ @Override
+ public InvocationType getInvocationType()
+ {
+ return invocationType;
+ }
+ }
}
diff --git a/jetty-core/jetty-http2/jetty-http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java b/jetty-core/jetty-http2/jetty-http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java
index 289d8bce024..c9487fde74c 100644
--- a/jetty-core/jetty-http2/jetty-http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java
+++ b/jetty-core/jetty-http2/jetty-http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java
@@ -174,12 +174,6 @@ public void failed(Throwable x)
close();
promise.failed(x);
}
-
- @Override
- public InvocationType getInvocationType()
- {
- return InvocationType.NON_BLOCKING;
- }
}
private static class ConnectionListener implements Connection.Listener
diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java
index 3d6f03dca95..c421acac6de 100644
--- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java
+++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java
@@ -432,7 +432,7 @@ else if (filled == 0)
}
networkBuffer = null;
if (interested)
- getEndPoint().fillInterested(fillableCallback);
+ fillInterested(fillableCallback);
}
}
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java
index df299d5e349..adbb86cc54e 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java
@@ -36,10 +36,10 @@ public abstract class AbstractConnection implements Connection, Invocable
private static final Logger LOG = LoggerFactory.getLogger(AbstractConnection.class);
private final List _listeners = new CopyOnWriteArrayList<>();
+ private final Callback _fillableCallback = new FillableCallback();
private final long _created = System.currentTimeMillis();
private final EndPoint _endPoint;
private final Executor _executor;
- private final Callback _readCallback;
private int _inputBufferSize = 2048;
protected AbstractConnection(EndPoint endPoint, Executor executor)
@@ -48,16 +48,6 @@ protected AbstractConnection(EndPoint endPoint, Executor executor)
throw new IllegalArgumentException("Executor must not be null!");
_endPoint = endPoint;
_executor = executor;
- _readCallback = new ReadCallback();
- }
-
- @Deprecated
- @Override
- public InvocationType getInvocationType()
- {
- // TODO consider removing the #fillInterested method from the connection and only use #fillInterestedCallback
- // so a connection need not be Invocable
- return Invocable.super.getInvocationType();
}
@Override
@@ -90,25 +80,27 @@ protected Executor getExecutor()
}
/**
- * Utility method to be called to register read interest.
- * After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
- * will be called back as appropriate.
+ * Registers read interest using the default {@link Callback} with {@link Invocable.InvocationType#BLOCKING}.
+ * When read readiness is signaled, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
+ * will be invoked.
+ * This method should be used sparingly, mainly from {@link #onOpen()}, and {@link #fillInterested(Callback)}
+ * should be preferred instead, passing a {@link Callback} that specifies the {@link Invocable.InvocationType}
+ * for each specific case where read interest needs to be registered.
*
+ * @see #fillInterested(Callback)
* @see #onFillable()
+ * @see #onFillInterestedFailed(Throwable)
*/
public void fillInterested()
{
- if (LOG.isDebugEnabled())
- LOG.debug("fillInterested {}", this);
- getEndPoint().fillInterested(_readCallback);
+ fillInterested(_fillableCallback);
}
/**
- * Utility method to be called to register read interest.
- * After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
- * will be called back as appropriate.
+ * Registers read interest with the given callback.
+ * When read readiness is signaled, the callback will be completed.
*
- * @see #onFillable()
+ * @param callback the callback to complete when read readiness is signaled
*/
public void fillInterested(Callback callback)
{
@@ -130,7 +122,7 @@ public boolean isFillInterested()
/**
* Callback method invoked when the endpoint is ready to be read.
*
- * @see #fillInterested()
+ * @see #fillInterested(Callback)
*/
public abstract void onFillable();
@@ -139,7 +131,7 @@ public boolean isFillInterested()
*
* @param cause the exception that caused the failure
*/
- protected void onFillInterestedFailed(Throwable cause)
+ public void onFillInterestedFailed(Throwable cause)
{
if (LOG.isDebugEnabled())
LOG.debug("onFillInterestedFailed {}", this, cause);
@@ -286,7 +278,39 @@ public String toConnectionString()
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
}
- private class ReadCallback implements Callback, Invocable
+ public abstract static class NonBlocking extends AbstractConnection
+ {
+ private final Callback _nonBlockingReadCallback = new NonBlockingFillableCallback();
+
+ public NonBlocking(EndPoint endPoint, Executor executor)
+ {
+ super(endPoint, executor);
+ }
+
+ /**
+ * Registers read interest using the default {@link Callback} with {@link Invocable.InvocationType#NON_BLOCKING}.
+ * When read readiness is signaled, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
+ * will be invoked.
+ * This method should be used sparingly, and {@link #fillInterested(Callback)}
+ * should be preferred instead, passing a {@link Callback} that specifies the {@link Invocable.InvocationType}
+ * for each specific case where read interest needs to be registered.
*/
+ @Override
+ public void fillInterested()
+ {
+ fillInterested(_nonBlockingReadCallback);
+ }
+
+ private class NonBlockingFillableCallback extends FillableCallback
+ {
+ @Override
+ public InvocationType getInvocationType()
+ {
+ return InvocationType.NON_BLOCKING;
+ }
+ }
+ }
+
+ private class FillableCallback implements Callback
{
@Override
public void succeeded()
@@ -305,11 +329,5 @@ public String toString()
{
return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), AbstractConnection.this);
}
-
- @Override
- public InvocationType getInvocationType()
- {
- return AbstractConnection.this.getInvocationType();
- }
}
}
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/NegotiatingClientConnection.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/NegotiatingClientConnection.java
index 9691c1ddc3d..75a88ffb005 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/NegotiatingClientConnection.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/NegotiatingClientConnection.java
@@ -22,7 +22,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class NegotiatingClientConnection extends AbstractConnection
+public abstract class NegotiatingClientConnection extends AbstractConnection.NonBlocking
{
private static final Logger LOG = LoggerFactory.getLogger(NegotiatingClientConnection.class);
diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/DetectorConnectionFactory.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/DetectorConnectionFactory.java
index 5a7fb4b3fea..1c3d791db2d 100644
--- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/DetectorConnectionFactory.java
+++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/DetectorConnectionFactory.java
@@ -137,7 +137,7 @@ public Connection newConnection(Connector connector, EndPoint endPoint)
return configure(new DetectorConnection(endPoint, connector), connector, endPoint);
}
- private class DetectorConnection extends AbstractConnection implements Connection.UpgradeFrom, Connection.UpgradeTo
+ private class DetectorConnection extends AbstractConnection.NonBlocking implements Connection.UpgradeFrom, Connection.UpgradeTo
{
private final Connector _connector;
private final RetainableByteBuffer _buffer;
diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/NegotiatingServerConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/NegotiatingServerConnection.java
index 59b1f6eb981..7c9db3ee871 100644
--- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/NegotiatingServerConnection.java
+++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/NegotiatingServerConnection.java
@@ -25,7 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class NegotiatingServerConnection extends AbstractConnection
+public abstract class NegotiatingServerConnection extends AbstractConnection.NonBlocking
{
private static final Logger LOG = LoggerFactory.getLogger(NegotiatingServerConnection.class);
diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java
index a660220853b..cd2dec0b0d8 100644
--- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java
+++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java
@@ -136,7 +136,7 @@ public Connection newConnection(Connector connector, EndPoint endp)
return configure(new ProxyProtocolV1Connection(endp, connector, nextConnectionFactory), connector, endp);
}
- private static class ProxyProtocolV1Connection extends AbstractConnection implements Connection.UpgradeFrom, Connection.UpgradeTo
+ private static class ProxyProtocolV1Connection extends AbstractConnection.NonBlocking implements Connection.UpgradeFrom, Connection.UpgradeTo
{
// 0 1 2 3 4 5 6
// 98765432109876543210987654321
@@ -451,7 +451,7 @@ public Connection newConnection(Connector connector, EndPoint endp)
return configure(new ProxyProtocolV2Connection(endp, connector, nextConnectionFactory), connector, endp);
}
- private class ProxyProtocolV2Connection extends AbstractConnection implements Connection.UpgradeFrom, Connection.UpgradeTo
+ private class ProxyProtocolV2Connection extends AbstractConnection.NonBlocking implements Connection.UpgradeFrom, Connection.UpgradeTo
{
private static final int HEADER_LENGTH = 16;
diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java
index 985cd618caf..a5d2dcd9b7d 100644
--- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java
+++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java
@@ -513,7 +513,7 @@ protected void connectionFailed(SelectableChannel channel, final Throwable ex, f
}
}
- protected static class ConnectContext
+ public static class ConnectContext
{
private final ConcurrentMap context = new ConcurrentHashMap<>();
private final Request request;
@@ -659,7 +659,7 @@ protected void write(EndPoint endPoint, ByteBuffer buffer, Callback callback)
}
}
- private abstract static class TunnelConnection extends AbstractConnection
+ private abstract static class TunnelConnection extends AbstractConnection.NonBlocking
{
private final IteratingCallback pipe = new ProxyIteratingCallback();
private final ByteBufferPool bufferPool;
@@ -781,6 +781,12 @@ protected void onCompleteFailure(Throwable cause)
buffer = Retainable.release(buffer);
}
+ @Override
+ public InvocationType getInvocationType()
+ {
+ return InvocationType.NON_BLOCKING;
+ }
+
private void disconnect(Throwable x)
{
TunnelConnection.this.close(x);
diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java
index 39ac531a7b8..980dee65636 100644
--- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java
+++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java
@@ -445,11 +445,16 @@ private Runnable onFailure(Throwable x, boolean remote)
// If not handled, then we just fail the request callback
if (!_handled && _handling == null)
{
+ if (LOG.isDebugEnabled())
+ LOG.debug("failing request not yet handled {} {}", _request, this);
Callback callback = _request._callback;
task = () -> callback.failed(x);
}
else
{
+ if (LOG.isDebugEnabled())
+ LOG.debug("failing request {} {}", _request, this);
+
// Set the failure to arrange for any subsequent reads or demands to fail.
if (_readFailure == null)
_readFailure = Content.Chunk.from(x, true);
diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java
index 560efcdfe0b..23239eabb04 100644
--- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java
+++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java
@@ -88,6 +88,7 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
private static final ThreadLocal __currentConnection = new ThreadLocal<>();
private static final AtomicLong __connectionIdGenerator = new AtomicLong();
+ private final Callback _fillableCallback = new FillableCallback();
private final TunnelSupport _tunnelSupport = new TunnelSupportOverHTTP1();
private final AtomicLong _streamIdGenerator = new AtomicLong();
private final long _id;
@@ -146,12 +147,6 @@ public HttpConnection(HttpConfiguration configuration, Connector connector, EndP
LOG.debug("New HTTP Connection {}", this);
}
- @Override
- public InvocationType getInvocationType()
- {
- return getServer().getInvocationType();
- }
-
protected HttpGenerator newHttpGenerator()
{
HttpGenerator generator = new HttpGenerator();
@@ -443,7 +438,7 @@ public void onFillable()
// If we have already released the request buffer, then use fill interest before allocating another
if (_requestBuffer == null)
{
- fillInterested();
+ fillInterested(_fillableCallback);
break;
}
}
@@ -451,7 +446,7 @@ else if (filled == 0)
{
assert isRequestBufferEmpty();
releaseRequestBuffer();
- fillInterested();
+ fillInterested(_fillableCallback);
break;
}
else if (filled < 0)
@@ -623,7 +618,7 @@ private boolean upgrade(HttpStreamOverHTTP1 stream)
}
@Override
- protected void onFillInterestedFailed(Throwable cause)
+ public void onFillInterestedFailed(Throwable cause)
{
_parser.close();
super.onFillInterestedFailed(cause);
@@ -658,7 +653,7 @@ public void onOpen()
{
super.onOpen();
if (isRequestBufferEmpty())
- fillInterested();
+ fillInterested(_fillableCallback);
else
getExecutor().execute(this);
}
@@ -1684,4 +1679,27 @@ public String getReason()
return getMessage();
}
}
+
+ private class FillableCallback implements Callback
+ {
+ private final InvocationType _invocationType = getServer().getInvocationType();
+
+ @Override
+ public void succeeded()
+ {
+ onFillable();
+ }
+
+ @Override
+ public void failed(Throwable x)
+ {
+ onFillInterestedFailed(x);
+ }
+
+ @Override
+ public InvocationType getInvocationType()
+ {
+ return _invocationType;
+ }
+ }
}
diff --git a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/ThreadStarvationTest.java b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/ThreadStarvationTest.java
index 7f76a6ba8a2..14e8d3d0690 100644
--- a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/ThreadStarvationTest.java
+++ b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/ThreadStarvationTest.java
@@ -13,6 +13,7 @@
package org.eclipse.jetty.test.client.transport;
+import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -25,6 +26,7 @@
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
@@ -32,6 +34,7 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ThreadStarvationTest extends AbstractTest
@@ -91,9 +94,101 @@ public boolean handle(Request request, Response response, Callback callback) thr
});
// Finish the request, the server should be able to process it.
- content.write(false, UTF_8.encode("123456789"), Callback.NOOP);
content.close();
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
}
+
+ @ParameterizedTest
+ @MethodSource("transports")
+ public void testIdleTimeoutStarvation(Transport transport) throws Exception
+ {
+ long idleTimeout = 1000;
+ AtomicReference handlerThreadRef = new AtomicReference<>();
+ CountDownLatch serverExceptionLatch = new CountDownLatch(1);
+ prepareServer(transport, new Handler.Abstract()
+ {
+ @Override
+ public boolean handle(Request request, Response response, Callback callback) throws Exception
+ {
+ handlerThreadRef.set(Thread.currentThread());
+ try
+ {
+ while (true)
+ {
+ Content.Chunk chunk = request.read();
+ if (chunk == null)
+ {
+ CountDownLatch latch = new CountDownLatch(1);
+ // The lambda passed to demand() has invocationType==BLOCKING.
+ request.demand(latch::countDown);
+ // Block here until more chunks are available.
+ assertTrue(latch.await(5 * idleTimeout, TimeUnit.MILLISECONDS));
+ continue;
+ }
+ if (Content.Chunk.isFailure(chunk))
+ throw IO.rethrow(chunk.getFailure());
+ chunk.release();
+ if (chunk.isLast())
+ break;
+ }
+ callback.succeeded();
+ return true;
+ }
+ catch (IOException x)
+ {
+ serverExceptionLatch.countDown();
+ throw x;
+ }
+ }
+ });
+ // Leave only 1 thread available to handle requests.
+ // 1 acceptor (0 for H3), 1 selector, 1 available.
+ int maxThreads = transport == Transport.H3 ? 2 : 3;
+ QueuedThreadPool serverThreads = (QueuedThreadPool)server.getThreadPool();
+ serverThreads.setReservedThreads(0);
+ serverThreads.setDetailedDump(true);
+ serverThreads.setMinThreads(maxThreads);
+ serverThreads.setMaxThreads(maxThreads);
+ setStreamIdleTimeout(idleTimeout);
+ server.start();
+
+ await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(1, serverThreads.getReadyThreads()));
+
+ startClient(transport);
+
+ // Send one request that will block the last thread on the server.
+ CountDownLatch responseLatch = new CountDownLatch(1);
+ AsyncRequestContent content = new AsyncRequestContent();
+ client.newRequest(newURI(transport))
+ .method(HttpMethod.POST)
+ .body(content)
+ .timeout(2 * idleTimeout, TimeUnit.MILLISECONDS)
+ .send(result ->
+ {
+ // The response should arrive correctly,
+ // it is the request that failed.
+ assertTrue(result.isFailed());
+ assertNull(result.getResponseFailure());
+ assertEquals(HttpStatus.INTERNAL_SERVER_ERROR_500, result.getResponse().getStatus());
+ responseLatch.countDown();
+ });
+
+ // Wait for the request to block on the server.
+ await().atMost(5, TimeUnit.SECONDS).until(() ->
+ {
+ Thread thread = handlerThreadRef.get();
+ if (thread == null)
+ return false;
+ return switch (thread.getState())
+ {
+ case WAITING, TIMED_WAITING -> true;
+ default -> false;
+ };
+ });
+
+ // The idle timeout should wake up the blocked read.
+ assertTrue(serverExceptionLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
+ assertTrue(responseLatch.await(3 * idleTimeout, TimeUnit.MILLISECONDS), server.dump());
+ }
}
diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java
index 504115b5ed0..26ebfd861d7 100644
--- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java
+++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java
@@ -63,8 +63,8 @@ public void runWithoutBlocking(Runnable task, Executor executor)
/**
* Invoking the task does not block the invoker thread,
* and the invocation must be performed immediately in the invoker thread.
- * This invocation type is suitable for tasks that can not be deferred and is
- * guaranteed to never block the invoker thread.
+ * This invocation type is suitable for tasks that cannot be deferred and
+ * guarantee that their code never blocks the invoker thread.
*/
NON_BLOCKING
{
@@ -74,20 +74,20 @@ public void runWithoutBlocking(Runnable task, Executor ignored)
}
},
/**
- * Invoking the task may act either as a {@code BLOCKING} task if invoked directly; or as a {@code NON_BLOCKING}
- * task if invoked via {@link Invocable#invokeNonBlocking(Runnable)}. The implementation of the task must check
- * {@link Invocable#isNonBlockingInvocation()} to determine how it was called.
- *
- * This invocation type is suitable for tasks that have multiple subtasks, some of which that cannot be deferred
- * mixed with other subtasks that can be.
- * An invoker which has an {@code EITHER} task must call it immediately, either directly, so that it may block; or
- * via {@link Invocable#invokeNonBlocking(Runnable)} so that it may not.
+ *
Invoking the task may act either as a {@code BLOCKING} task if invoked directly;
+ * or as a {@code NON_BLOCKING} task if invoked via {@link Invocable#invokeNonBlocking(Runnable)}.
+ * The implementation of the task must check {@link Invocable#isNonBlockingInvocation()}
+ * to determine how it was called.
+ * This invocation type is suitable for tasks that have multiple subtasks,
+ * some of which that cannot be deferred mixed with other subtasks that can be.
+ * An invoker which has an {@code EITHER} task must call it immediately, either
+ * directly, so that it may block; or via {@link Invocable#invokeNonBlocking(Runnable)}
+ * so that it may not.
* The invoker cannot defer the task execution, and specifically it must not
- * queue the {@code EITHER} task in a thread pool.
- *
- * See the {@link org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy} for an example of
- * both an invoker of {@code EITHER} tasks, and as an implementation of an {@code EITHER} task, when used in a
- * chain of {@link ExecutionStrategy}s.
+ * queue the {@code EITHER} task in a thread pool.
+ * See the {@link org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy}
+ * for an example of both an invoker of {@code EITHER} tasks, and as an implementation
+ * of an {@code EITHER} task, when used in a chain of {@link ExecutionStrategy}s.
*/
EITHER
{
@@ -98,23 +98,24 @@ public void runWithoutBlocking(Runnable task, Executor ignored)
};
/**
- * Run or Execute the task according to the InvocationType without blocking the caller:
+ * Runs or executes the task according to the {@code InvocationType},
+ * without blocking the caller:
*
* - {@link InvocationType#NON_BLOCKING}
* - The task is run directly
* - {@link InvocationType#BLOCKING}
- * - The task is executed by the passed executor
+ * - The task is submitted to the given {@link Executor}
* - {@link InvocationType#EITHER}
* - The task is invoked via {@link Invocable#invokeNonBlocking(Runnable)}
*
* @param task The task to run
- * @param executor The executor to use if necessary
+ * @param executor The {@link Executor} to use if necessary
*/
public abstract void runWithoutBlocking(Runnable task, Executor executor);
}
/**
- * A task with an {@link InvocationType}.
+ * A {@link Runnable} task with an {@link InvocationType}.
*/
interface Task extends Invocable, Runnable
{
@@ -178,7 +179,7 @@ public void run()
@Override
public String toString()
{
- return String.format("%s@%x[%s|%s]", getClass().getSimpleName(), hashCode(), getInvocationType(), task);
+ return String.format("%s[%s]", super.toString(), task);
}
}
@@ -197,7 +198,7 @@ static Task from(InvocationType type, Runnable task)
}
/**
- * Test if the current thread has been tagged as non blocking
+ * Tests if the current thread has been tagged as non-blocking.
*
* @return True if the task the current thread is running has
* indicated that it will not block.
@@ -231,7 +232,7 @@ static void invokeNonBlocking(Runnable task)
* Combine two invocation type.
* @param it1 A type
* @param it2 Another type
- * @return The combination of both type, where any tendency to block overrules any non blocking.
+ * @return The combination of both type, where any tendency to block overrules any non-blocking.
*/
static InvocationType combine(InvocationType it1, InvocationType it2)
{
@@ -258,7 +259,7 @@ static InvocationType combineTypes(InvocationType... it)
}
/**
- * Get the invocation type of an Object.
+ * Get the {@code InvocationType} of the given object.
*
* @param o The object to check the invocation type of.
* @return If the object is an Invocable, it is coerced and the {@link #getInvocationType()}