Skip to content

Commit

Permalink
Merged branch 'jetty-12.0.x' into 'jetty-12.1.x'.
Browse files Browse the repository at this point in the history
Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed Nov 22, 2024
2 parents 6b85dab + f93c75f commit c56a24f
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -407,25 +407,18 @@ public void onFailure(int request, Throwable failure)
@Override
public void close()
{
if (stream != null)
try
{
Runnable task = stream.getHttpChannel().onClose();
if (task != null)
if (stream != null)
{
ThreadPool.executeImmediately(getExecutor(), () ->
{
try
{
task.run();
}
finally
{
super.close();
}
});
return;
Runnable task = stream.getHttpChannel().onClose();
if (task != null)
task.run();
}
}
super.close();
finally
{
super.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,19 +180,31 @@ public void onStreamTimeout(Stream stream, TimeoutException timeout, Promise<Boo
if (LOG.isDebugEnabled())
LOG.debug("Idle timeout on {}", stream, timeout);
HTTP2Channel.Server channel = (HTTP2Channel.Server)((HTTP2Stream)stream).getAttachment();
if (channel != null)
if (channel == null)
{
promise.succeeded(false);
return;
}
channel.onTimeout(timeout, (task, timedOut) ->
{
channel.onTimeout(timeout, (task, timedOut) ->
if (task == null)
{
if (task != null)
offerTask(task, true);
promise.succeeded(timedOut);
return;
}
ThreadPool.executeImmediately(getExecutor(), () ->
{
try
{
task.run();
promise.succeeded(timedOut);
}
catch (Throwable x)
{
promise.failed(x);
}
});
}
else
{
promise.succeeded(false);
}
});
}

public void onStreamFailure(Stream stream, Throwable failure, Callback callback)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,6 @@ public void testServerIdleTimeoutIsEnforcedForQueuedRequest() throws Exception
@Override
public boolean handle(Request request, Response response, Callback callback)
{
System.err.println("processing request " + request.getHttpURI().getPath());
requests.incrementAndGet();
handled.incrementAndGet();
phaser.get().countDown();
Expand Down Expand Up @@ -732,24 +731,24 @@ public void onHeaders(Stream stream, HeadersFrame frame)
}

// Send one more request to consume the whole session flow control window.
CountDownLatch resetLatch = new CountDownLatch(1);
CountDownLatch extraLatch = new CountDownLatch(1);
MetaData.Request request = newRequest("GET", HttpFields.EMPTY);
HeadersFrame frame = new HeadersFrame(request, null, false);
FuturePromise<Stream> promise = new FuturePromise<>();
client.newStream(frame, promise, new Stream.Listener()
{
@Override
public void onReset(Stream stream, ResetFrame frame, Callback callback)
public void onHeaders(Stream stream, HeadersFrame frame)
{
callback.succeeded();
resetLatch.countDown();
responses.incrementAndGet();
extraLatch.countDown();
}
});
Stream stream = promise.get(5, TimeUnit.SECONDS);
ByteBuffer data = ByteBuffer.allocate(((HTTP2Session)client).updateSendWindow(0));
stream.data(new DataFrame(stream.getId(), data, true), Callback.NOOP);

assertTrue(resetLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
assertTrue(extraLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));

// Wait for WINDOW_UPDATEs to be processed by the client.
await().atMost(5, TimeUnit.SECONDS).until(() -> ((HTTP2Session)client).updateSendWindow(0), Matchers.greaterThan(0));
Expand All @@ -758,7 +757,9 @@ public void onReset(Stream stream, ResetFrame frame, Callback callback)
await().atMost(5, TimeUnit.SECONDS).until(handled::get, is(0));
assertThat(requests.get(), is(count - 1));

await().atMost(5, TimeUnit.SECONDS).until(responses::get, is(count - 1));
// The first 2 requests are handled normally and responded with 200, the last 2 are
// not handled due to timeout while queued, but they are responded anyway with a 500.
await().atMost(5, TimeUnit.SECONDS).until(responses::get, is(count + 1));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.eclipse.jetty.http3.server;

import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;

import org.eclipse.jetty.http.HttpFields;
Expand All @@ -35,6 +36,7 @@
import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -153,27 +155,36 @@ public void onTrailer(Stream.Server stream, HeadersFrame frame)
public void onIdleTimeout(Stream.Server stream, TimeoutException timeout, Promise<Boolean> promise)
{
HTTP3Stream http3Stream = (HTTP3Stream)stream;
getConnection().onIdleTimeout((HTTP3Stream)stream, timeout, (task, timedOut) ->
getConnection().onIdleTimeout(http3Stream, timeout, (task, timedOut) ->
{
if (task != null)
if (task == null)
{
ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession();
protocolSession.offer(task, true);
promise.succeeded(timedOut);
return;
}
promise.succeeded(timedOut);
Executor executor = http3Stream.getSession().getProtocolSession().getQuicSession().getExecutor();
ThreadPool.executeImmediately(executor, () ->
{
try
{
task.run();
promise.succeeded(timedOut);
}
catch (Throwable x)
{
promise.failed(x);
}
});
});
}

@Override
public void onFailure(Stream.Server stream, long error, Throwable failure)
{
HTTP3Stream http3Stream = (HTTP3Stream)stream;
Runnable task = getConnection().onFailure((HTTP3Stream)stream, failure);
if (task != null)
{
ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession();
protocolSession.offer(task, true);
}
Runnable task = getConnection().onFailure(http3Stream, failure);
Executor executor = http3Stream.getSession().getProtocolSession().getQuicSession().getExecutor();
ThreadPool.executeImmediately(executor, task);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -641,10 +641,16 @@ public boolean onIdleExpired(TimeoutException timeout)
@Override
public void close()
{
Runnable task = _httpChannel.onClose();
if (task != null)
task.run();
super.close();
try
{
Runnable task = _httpChannel.onClose();
if (task != null)
task.run();
}
finally
{
super.close();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ static void executeImmediately(Executor executor, Runnable task)
if (task == null)
return;

Invocable.InvocationType invocationType = Invocable.getInvocationType(task);
if (invocationType == Invocable.InvocationType.NON_BLOCKING)
{
task.run();
return;
}
if (invocationType == Invocable.InvocationType.EITHER)
{
Invocable.invokeNonBlocking(task);
return;
}

if (executor instanceof TryExecutor tryExecutor && tryExecutor.tryExecute(task))
return;

Expand All @@ -112,21 +124,13 @@ static void executeImmediately(Executor executor, Runnable task)
return;
}

switch (Invocable.getInvocationType(task))
try
{
new Thread(task, "jetty-immediate-executor").start();
}
catch (Throwable ignored)
{
case NON_BLOCKING -> task.run();
case EITHER -> Invocable.invokeNonBlocking(task);
default ->
{
try
{
new Thread(task).start();
}
catch (Throwable ignored)
{
task.run();
}
}
task.run();
}
}
}

0 comments on commit c56a24f

Please sign in to comment.