Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove UnprocessedRequestException #349

Merged
merged 5 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 14 additions & 20 deletions src/Connection/DefaultConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,14 @@ public function create(Request $request, Cancellation $cancellation): Connection
$connectContext->withConnectTimeout($request->getTcpConnectTimeout()),
$cancellation
);
} catch (Socket\ConnectException $e) {
throw new UnprocessedRequestException(new SocketException(\sprintf("Connection to '%s' failed", $authority), 0, $e));
} catch (Socket\ConnectException $connectException) {
throw new SocketException(\sprintf("Connection to '%s' failed", $authority), 0, $connectException);
} catch (CancelledException) {
// In case of a user cancellation request, throw the expected exception
$cancellation->throwIfRequested();

// Otherwise we ran into a timeout of our TimeoutCancellation
throw new UnprocessedRequestException(
new TimeoutException(\sprintf("Connection to '%s' timed out, took longer than " . $request->getTcpConnectTimeout() . ' s', $authority))
);
throw new TimeoutException(\sprintf("Connection to '%s' timed out, took longer than " . $request->getTcpConnectTimeout() . ' s', $authority));
}

$tlsHandshakeDuration = null;
Expand All @@ -126,48 +124,44 @@ public function create(Request $request, Cancellation $cancellation): Connection
if ($tlsState !== Socket\TlsState::Disabled) {
$socket->close();

throw new UnprocessedRequestException(
new SocketException('Failed to setup TLS connection, connection was in an unexpected TLS state (' . $tlsState->name . ')')
);
throw new SocketException('Failed to setup TLS connection, connection was in an unexpected TLS state (' . $tlsState->name . ')');
}

$socket->setupTls(new CompositeCancellation(
$cancellation,
new TimeoutCancellation($request->getTlsHandshakeTimeout())
));
} catch (StreamException $exception) {
} catch (StreamException $streamException) {
$socket->close();

throw new UnprocessedRequestException(new SocketException(\sprintf(
throw new SocketException(\sprintf(
"Connection to '%s' @ '%s' closed during TLS handshake",
$authority,
$socket->getRemoteAddress()->toString()
), 0, $exception));
), 0, $streamException);
} catch (CancelledException) {
$socket->close();

// In case of a user cancellation request, throw the expected exception
$cancellation->throwIfRequested();

// Otherwise we ran into a timeout of our TimeoutCancellation
throw new UnprocessedRequestException(new TimeoutException(\sprintf(
throw new TimeoutException(\sprintf(
"TLS handshake with '%s' @ '%s' timed out, took longer than " . $request->getTlsHandshakeTimeout() . ' s',
$authority,
$socket->getRemoteAddress()->toString()
)));
));
}

$tlsInfo = $socket->getTlsInfo();
if ($tlsInfo === null) {
$socket->close();

throw new UnprocessedRequestException(
new SocketException(\sprintf(
"Socket closed after TLS handshake with '%s' @ '%s'",
$authority,
$socket->getRemoteAddress()->toString()
))
);
throw new SocketException(\sprintf(
"Socket closed after TLS handshake with '%s' @ '%s'",
$authority,
$socket->getRemoteAddress()->toString()
));
}

$tlsHandshakeDuration = now() - $tlsHandshakeStart;
Expand Down
2 changes: 1 addition & 1 deletion src/Connection/Http1Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ private function writeRequest(
try {
$socket = $this->socket;
if ($socket === null) {
throw new UnprocessedRequestException(new SocketException('Socket closed before request started'));
throw new SocketException('Socket closed before request started');
}

events()->requestHeaderStart($request, $stream);
Expand Down
75 changes: 32 additions & 43 deletions src/Connection/Internal/Http2ConnectionProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
use Amp\Future;
use Amp\Http\Client\Connection\HttpStream;
use Amp\Http\Client\Connection\Stream;
use Amp\Http\Client\Connection\UnprocessedRequestException;
use Amp\Http\Client\HttpException;
use Amp\Http\Client\Internal\EventInvoker;
use Amp\Http\Client\Internal\Phase;
Expand Down Expand Up @@ -133,9 +132,7 @@ public function initialize(Cancellation $cancellation): void
$this->initializeStarted = true;

if ($this->socket->isClosed()) {
throw new UnprocessedRequestException(
new SocketException('The socket closed before the connection could be initialized')
);
throw new SocketException('The socket closed before the connection could be initialized');
}

$this->settings = new DeferredFuture;
Expand All @@ -146,10 +143,10 @@ public function initialize(Cancellation $cancellation): void

try {
$future->await($cancellation);
} catch (CancelledException $exception) {
$exception = new SocketException('Connecting cancelled', 0, $exception);
} catch (CancelledException $cancelledException) {
$exception = new SocketException('Connecting cancelled', 0, $cancelledException);
$this->shutdown($exception);
throw new UnprocessedRequestException($exception);
throw $exception;
}
}

Expand Down Expand Up @@ -414,7 +411,7 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool

$cancellationId = $cancellation->subscribe(function (CancelledException $exception) use ($streamId): void {
if (isset($this->streams[$streamId])) {
$this->releaseStream($streamId, $exception);
$this->releaseStream($streamId, $exception, false);
}
});

Expand Down Expand Up @@ -685,7 +682,7 @@ static function () {
$pushId
): void {
if (isset($this->streams[$pushId])) {
$this->releaseStream($pushId, $exception);
$this->releaseStream($pushId, $exception, false);
}
});

Expand Down Expand Up @@ -735,12 +732,8 @@ public function handleStreamException(Http2StreamException $exception): void

$exception = new SocketException($exception->getMessage(), $code, $exception);

if ($code === Http2Parser::REFUSED_STREAM) {
$exception = new UnprocessedRequestException($exception);
}

if (isset($this->streams[$id])) {
$this->releaseStream($id, $exception);
$this->releaseStream($id, $exception, $code === Http2Parser::REFUSED_STREAM);
}
}

Expand Down Expand Up @@ -878,7 +871,7 @@ public function handleStreamEnd(int $streamId): void
$trailers->complete(new Trailers([]));
}

$this->releaseStream($streamId);
$this->releaseStream($streamId, null, false);
}

public function isIdle(): bool
Expand Down Expand Up @@ -917,20 +910,18 @@ public function request(Request $request, Cancellation $cancellation, Stream $st
{
try {
if ($this->shutdown !== null) {
throw new UnprocessedRequestException(new SocketException(\sprintf(
throw new SocketException(\sprintf(
"Connection from '%s' to '%s' has already been shut down",
$this->socket->getLocalAddress()->toString(),
$this->socket->getRemoteAddress()->toString()
)));
));
}

if ($this->hasTimeout && !$this->ping()->await()) {
throw new UnprocessedRequestException(
new SocketException(\sprintf(
"Socket to '%s' missed responding to PINGs",
$this->socket->getRemoteAddress()->toString()
))
);
throw new SocketException(\sprintf(
"Socket to '%s' missed responding to PINGs",
$this->socket->getRemoteAddress()->toString()
));
}

RequestNormalizer::normalizeRequest($request);
Expand All @@ -949,16 +940,11 @@ public function request(Request $request, Cancellation $cancellation, Stream $st
$request->setProtocolVersions(['2']);

if ($this->socket->isClosed()) {
throw new UnprocessedRequestException(
new SocketException(\sprintf(
"Socket to '%s' closed before the request could be sent",
$this->socket->getRemoteAddress()->toString()
))
);
throw new SocketException(\sprintf(
"Socket to '%s' closed before the request could be sent",
$this->socket->getRemoteAddress()->toString()
));
}

$body = $request->getBody()->getContent();
$chunk = $body->read($cancellation);
} catch (\Throwable $exception) {
throw $this->wrapException($exception, "Request initialization failed");
}
Expand All @@ -984,7 +970,7 @@ public function request(Request $request, Cancellation $cancellation, Stream $st

$cancellationId = $cancellation->subscribe(function (CancelledException $exception) use ($streamId): void {
if (isset($this->streams[$streamId])) {
$this->releaseStream($streamId, $exception);
$this->releaseStream($streamId, $exception, false);
}
});

Expand All @@ -996,6 +982,9 @@ public function request(Request $request, Cancellation $cancellation, Stream $st
try {
events()->requestHeaderStart($request, $stream);

$body = $request->getBody()->getContent();
$chunk = $body->read($cancellation);

$headers = $this->hpack->encode($this->generateHeaders($request));
$flag = Http2Parser::END_HEADERS | ($chunk === null ? Http2Parser::END_STREAM : Http2Parser::NO_FLAG);

Expand Down Expand Up @@ -1060,7 +1049,7 @@ public function request(Request $request, Cancellation $cancellation, Stream $st
}

if (isset($this->streams[$streamId])) {
$this->releaseStream($streamId, $exception);
$this->releaseStream($streamId, $exception, false);
}

throw $exception;
Expand Down Expand Up @@ -1318,7 +1307,7 @@ private function writeBufferedData(Http2Stream $stream): Future
return $stream->windowSizeIncrease->getFuture();
}

private function releaseStream(int $streamId, ?\Throwable $exception = null): void
private function releaseStream(int $streamId, ?\Throwable $exception, bool $unprocessed): void
{
\assert(isset($this->streams[$streamId]));

Expand Down Expand Up @@ -1357,6 +1346,10 @@ private function releaseStream(int $streamId, ?\Throwable $exception = null): vo
$streamId,
\pack("N", Http2Parser::CANCEL)
)->ignore();

if ($unprocessed) {
events()->requestRejected($stream->request);
}
}

if (!$this->streams && !$this->socket->isClosed() && $this->socket instanceof ResourceStream) {
Expand Down Expand Up @@ -1423,19 +1416,15 @@ private function shutdown(?HttpException $reason = null, ?int $lastId = null): v

if ($this->settings !== null) {
$message = "Connection closed before HTTP/2 settings could be received";
$this->settings->error(new UnprocessedRequestException(new SocketException($message, 0, $reason)));
$this->settings->error(new SocketException($message, 0, $reason));
$this->settings = null;
}

$exception = $reason;
foreach ($this->streams as $id => $stream) {
if ($lastId !== null && $id > $lastId) {
$exception = $exception instanceof UnprocessedRequestException
? $exception
: new UnprocessedRequestException($reason);
}
$unprocessed = $lastId !== null && $id > $lastId;

$this->releaseStream($id, $exception);
$this->releaseStream($id, $exception, $unprocessed);
}

$previous = $reason->getPrevious();
Expand Down Expand Up @@ -1576,7 +1565,7 @@ private function createStreamTimeoutWatcher(int $streamId, float $timeout, strin

$watcher = EventLoop::delay($timeout, function () use ($streamId, $timeout, $message): void {
\assert(isset($this->streams[$streamId]), 'Stream watcher invoked after stream closed');
$this->releaseStream($streamId, new TimeoutException($message));
$this->releaseStream($streamId, new TimeoutException($message), false);
});

EventLoop::unreference($watcher);
Expand Down
22 changes: 0 additions & 22 deletions src/Connection/UnprocessedRequestException.php

This file was deleted.

5 changes: 5 additions & 0 deletions src/EventListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ public function requestFailed(Request $request, HttpException $exception): void;
*/
public function requestEnd(Request $request, Response $response): void;

/**
* Called when the request is rejected by the server and not yet processed.
*/
public function requestRejected(Request $request): void;

/**
* Called before an application interceptor is invoked.
*/
Expand Down
5 changes: 5 additions & 0 deletions src/EventListener/LogHttpArchive.php
Original file line number Diff line number Diff line change
Expand Up @@ -364,4 +364,9 @@ public function push(Request $request): void
{
// nothing to do
}

public function requestRejected(Request $request): void
{
// nothing to do
}
}
15 changes: 7 additions & 8 deletions src/Interceptor/RetryRequests.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
use Amp\ForbidCloning;
use Amp\ForbidSerialization;
use Amp\Http\Client\ApplicationInterceptor;
use Amp\Http\Client\Connection\UnprocessedRequestException;
use Amp\Http\Client\DelegateHttpClient;
use Amp\Http\Client\HttpException;
use Amp\Http\Client\Request;
use Amp\Http\Client\Response;
use Amp\Http\Client\SocketException;

final class RetryRequests implements ApplicationInterceptor
{
Expand All @@ -33,15 +32,15 @@ public function request(

do {
try {
// TODO: Clone requests here
return $httpClient->request($request, $cancellation);
} catch (UnprocessedRequestException $exception) {
// Request was deemed retryable by connection, so carry on.
} catch (SocketException $exception) {
if (!$request->isIdempotent()) {
throw $exception;
} catch (HttpException $exception) {
if ($request->isIdempotent() || $request->isUnprocessed()) {
// Request was deemed retryable by connection, so carry on.
continue;
}

// Request can safely be retried.
throw $exception;
}
} while ($attempt++ <= $this->retryLimit);

Expand Down
12 changes: 12 additions & 0 deletions src/Internal/EventInvoker.php
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ public function requestEnd(Request $request, Response $response): void
$this->invoke($request, fn (EventListener $eventListener) => $eventListener->requestEnd($request, $response));
}

public function requestRejected(Request $request): void
{
$previousPhase = self::getPhase($request);
if ($previousPhase === Phase::Complete || $previousPhase === Phase::Failed) {
throw new \Error('Invalid request phase transition from ' . $previousPhase->name . ' to Failed');
}

$this->requestPhase[$request] = Phase::Rejected;

$this->invoke($request, fn (EventListener $eventListener) => $eventListener->requestRejected($request));
}

public function connectionAcquired(Request $request, Connection $connection): void
{
$previousPhase = self::getPhase($request);
Expand Down
1 change: 1 addition & 0 deletions src/Internal/Phase.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ enum Phase
case ResponseBody;
case Complete;
case Failed;
case Rejected;
}
Loading