Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #12266 - InvocationType improvements and cleanups. #12551

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -508,12 +508,6 @@ public void failed(Throwable x)
promise.failed(x);
}

@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
sbordet marked this conversation as resolved.
Show resolved Hide resolved

@Override
public void onFillable()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.Attributes;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.slf4j.Logger;
Expand All @@ -43,6 +44,7 @@ public class ServerFCGIConnection extends AbstractMetaDataConnection implements
{
private static final Logger LOG = LoggerFactory.getLogger(ServerFCGIConnection.class);

private final Callback fillableCallback = new FillableCallback();
private final HttpChannel.Factory httpChannelFactory = new HttpChannel.DefaultFactory();
private final Attributes attributes = new Lazy();
private final Connector connector;
Expand Down Expand Up @@ -161,7 +163,7 @@ public void clearAttributes()
public void onOpen()
{
super.onOpen();
fillInterested();
fillInterested(fillableCallback);
}

@Override
Expand Down Expand Up @@ -189,7 +191,7 @@ public void onFillable()
else if (read == 0)
{
releaseInputBuffer();
fillInterested();
fillInterested(fillableCallback);
return;
}
else
Expand Down Expand Up @@ -305,7 +307,7 @@ void onCompleted(Throwable failure)
{
releaseInputBuffer();
if (failure == null)
fillInterested();
fillInterested(fillableCallback);
else
getFlusher().shutdown();
}
Expand Down Expand Up @@ -407,25 +409,41 @@ public void onFailure(int request, Throwable failure)
@Override
public void close()
{
if (stream != null)
try
{
Runnable task = stream.getHttpChannel().onClose();
if (task != null)
if (stream != null)
{
ThreadPool.executeImmediately(getExecutor(), () ->
{
try
{
task.run();
}
finally
{
super.close();
}
});
return;
Runnable task = stream.getHttpChannel().onClose();
if (task != null)
task.run();
}
}
super.close();
finally
{
super.close();
}
}

private class FillableCallback implements Callback
{
private final InvocationType invocationType = getConnector().getServer().getInvocationType();

@Override
public void succeeded()
{
onFillable();
}

@Override
public void failed(Throwable x)
{
onFillInterestedFailed(x);
}

@Override
public InvocationType getInvocationType()
{
return invocationType;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,6 @@ public void failed(Throwable x)
close();
promise.failed(x);
}

@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
}

private static class ConnectionListener implements Connection.Listener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ else if (filled == 0)
}
networkBuffer = null;
if (interested)
getEndPoint().fillInterested(fillableCallback);
fillInterested(fillableCallback);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,19 +180,31 @@ public void onStreamTimeout(Stream stream, TimeoutException timeout, Promise<Boo
if (LOG.isDebugEnabled())
LOG.debug("Idle timeout on {}", stream, timeout);
HTTP2Channel.Server channel = (HTTP2Channel.Server)((HTTP2Stream)stream).getAttachment();
if (channel != null)
if (channel == null)
{
promise.succeeded(false);
return;
}
channel.onTimeout(timeout, (task, timedOut) ->
{
channel.onTimeout(timeout, (task, timedOut) ->
if (task == null)
{
if (task != null)
offerTask(task, true);
promise.succeeded(timedOut);
return;
}
ThreadPool.executeImmediately(getExecutor(), () ->
{
try
{
task.run();
promise.succeeded(timedOut);
}
catch (Throwable x)
{
promise.failed(x);
}
});
}
else
{
promise.succeeded(false);
}
});
}

public void onStreamFailure(Stream stream, Throwable failure, Callback callback)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,6 @@ public void testServerIdleTimeoutIsEnforcedForQueuedRequest() throws Exception
@Override
public boolean handle(Request request, Response response, Callback callback)
{
System.err.println("processing request " + request.getHttpURI().getPath());
requests.incrementAndGet();
handled.incrementAndGet();
phaser.get().countDown();
Expand Down Expand Up @@ -732,24 +731,24 @@ public void onHeaders(Stream stream, HeadersFrame frame)
}

// Send one more request to consume the whole session flow control window.
CountDownLatch resetLatch = new CountDownLatch(1);
CountDownLatch extraLatch = new CountDownLatch(1);
MetaData.Request request = newRequest("GET", HttpFields.EMPTY);
HeadersFrame frame = new HeadersFrame(request, null, false);
FuturePromise<Stream> promise = new FuturePromise<>();
client.newStream(frame, promise, new Stream.Listener()
{
@Override
public void onReset(Stream stream, ResetFrame frame, Callback callback)
public void onHeaders(Stream stream, HeadersFrame frame)
{
callback.succeeded();
resetLatch.countDown();
responses.incrementAndGet();
extraLatch.countDown();
}
});
Stream stream = promise.get(5, TimeUnit.SECONDS);
ByteBuffer data = ByteBuffer.allocate(((HTTP2Session)client).updateSendWindow(0));
stream.data(new DataFrame(stream.getId(), data, true), Callback.NOOP);

assertTrue(resetLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
assertTrue(extraLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));

// Wait for WINDOW_UPDATEs to be processed by the client.
await().atMost(5, TimeUnit.SECONDS).until(() -> ((HTTP2Session)client).updateSendWindow(0), Matchers.greaterThan(0));
Expand All @@ -758,7 +757,9 @@ public void onReset(Stream stream, ResetFrame frame, Callback callback)
await().atMost(5, TimeUnit.SECONDS).until(handled::get, is(0));
assertThat(requests.get(), is(count - 1));

await().atMost(5, TimeUnit.SECONDS).until(responses::get, is(count - 1));
// The first 2 requests are handled normally and responded with 200, the last 2 are
// not handled due to timeout while queued, but they are responded anyway with a 500.
await().atMost(5, TimeUnit.SECONDS).until(responses::get, is(count + 1));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.eclipse.jetty.http3.server;

import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;

import org.eclipse.jetty.http.HttpFields;
Expand All @@ -35,6 +36,7 @@
import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -153,27 +155,36 @@ public void onTrailer(Stream.Server stream, HeadersFrame frame)
public void onIdleTimeout(Stream.Server stream, TimeoutException timeout, Promise<Boolean> promise)
{
HTTP3Stream http3Stream = (HTTP3Stream)stream;
getConnection().onIdleTimeout((HTTP3Stream)stream, timeout, (task, timedOut) ->
getConnection().onIdleTimeout(http3Stream, timeout, (task, timedOut) ->
{
if (task != null)
if (task == null)
{
ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession();
protocolSession.offer(task, true);
promise.succeeded(timedOut);
return;
}
promise.succeeded(timedOut);
Executor executor = http3Stream.getSession().getProtocolSession().getQuicSession().getExecutor();
ThreadPool.executeImmediately(executor, () ->
{
try
{
task.run();
gregw marked this conversation as resolved.
Show resolved Hide resolved
promise.succeeded(timedOut);
}
catch (Throwable x)
{
promise.failed(x);
}
});
});
}

@Override
public void onFailure(Stream.Server stream, long error, Throwable failure)
{
HTTP3Stream http3Stream = (HTTP3Stream)stream;
Runnable task = getConnection().onFailure((HTTP3Stream)stream, failure);
if (task != null)
{
ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession();
protocolSession.offer(task, true);
}
Runnable task = getConnection().onFailure(http3Stream, failure);
Executor executor = http3Stream.getSession().getProtocolSession().getQuicSession().getExecutor();
ThreadPool.executeImmediately(executor, task);
}
}
}
Loading
Loading