Skip to content

Commit

Permalink
Fix event phase issue for rejected requests
Browse files Browse the repository at this point in the history
Fixes #351.
  • Loading branch information
kelunik committed Sep 3, 2023
1 parent 35e8ecb commit 8621dbb
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 9 deletions.
20 changes: 19 additions & 1 deletion src/Connection/Internal/Http2ConnectionProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,25 @@ public function handleStreamReset(int $streamId, int $errorCode): void
return;
}

$this->handleStreamException(new Http2StreamException("Stream closed by server", $streamId, $errorCode));
$message = match ($errorCode) {
Http2Parser::GRACEFUL_SHUTDOWN => 'Graceful shutdown',
Http2Parser::PROTOCOL_ERROR => 'Protocol error',
Http2Parser::INTERNAL_ERROR => 'Internal error',
Http2Parser::FLOW_CONTROL_ERROR => 'Flow control error',
Http2Parser::SETTINGS_TIMEOUT => 'Settings timeout',
Http2Parser::STREAM_CLOSED => 'Stream closed',
Http2Parser::FRAME_SIZE_ERROR => 'Frame size error',
Http2Parser::REFUSED_STREAM => 'Stream refused',
Http2Parser::CANCEL => 'Stream cancelled',
Http2Parser::COMPRESSION_ERROR => 'Compression error',
Http2Parser::CONNECT_ERROR => 'Connect error',
Http2Parser::ENHANCE_YOUR_CALM => 'Enhance your calm',
Http2Parser::INADEQUATE_SECURITY => 'Inadequate security',
Http2Parser::HTTP_1_1_REQUIRED => 'HTTP/1.1 required',
default => 'Unknown error' . $errorCode
};

$this->handleStreamException(new Http2StreamException("Stream closed by server: $message", $streamId, $errorCode));
}

public function handleStreamException(Http2StreamException $exception): void
Expand Down
26 changes: 18 additions & 8 deletions src/Internal/EventInvoker.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,18 @@ public static function getPhase(Request $request): Phase
return self::get()->requestPhase[$request] ?? Phase::Unprocessed;
}

public static function isRejected(Request $request): bool
{
return self::get()->requestRejected[$request] ?? false;
}

private \WeakMap $requestPhase;
private \WeakMap $requestRejected;

public function __construct()
{
$this->requestPhase = new \WeakMap();
$this->requestRejected = new \WeakMap();
}

private function invoke(Request $request, \Closure $closure): void
Expand All @@ -47,6 +54,10 @@ public function requestStart(Request $request): void
throw new \Error('Invalid request phase transition from ' . $previousPhase->name . ' to Blocked');
}

if (self::isRejected($request)) {
throw new \Error('Request has been rejected by the server. Use a new request for retries.');
}

$this->requestPhase[$request] = Phase::Blocked;

$this->invoke($request, fn (EventListener $eventListener) => $eventListener->requestStart($request));
Expand All @@ -55,7 +66,7 @@ public function requestStart(Request $request): void
public function requestFailed(Request $request, \Throwable $exception): void
{
$previousPhase = self::getPhase($request);
if (\in_array($previousPhase, [Phase::Complete, Phase::Failed, Phase::Rejected], true)) {
if (\in_array($previousPhase, [Phase::Complete, Phase::Failed], true)) {
throw new \Error('Invalid request phase transition from ' . $previousPhase->name . ' to Failed');
}

Expand All @@ -67,7 +78,7 @@ public function requestFailed(Request $request, \Throwable $exception): void
public function requestEnd(Request $request, Response $response): void
{
$previousPhase = self::getPhase($request);
if ($previousPhase !== Phase::Blocked && $previousPhase !== Phase::ResponseHeaders && $previousPhase !== Phase::ResponseBody) {
if (!\in_array($previousPhase, [Phase::Blocked, Phase::ResponseHeaders, Phase::ResponseBody], true)) {
throw new \Error('Invalid request phase transition from ' . $previousPhase->name . ' to Complete');
}

Expand All @@ -78,12 +89,7 @@ public function requestEnd(Request $request, Response $response): void

public function requestRejected(Request $request): void
{
$previousPhase = self::getPhase($request);
if (\in_array($previousPhase, [Phase::Complete, Phase::Failed, Phase::Rejected], true)) {
throw new \Error('Invalid request phase transition from ' . $previousPhase->name . ' to Failed');
}

$this->requestPhase[$request] = Phase::Rejected;
$this->requestRejected[$request] = true;

$this->invoke($request, fn (EventListener $eventListener) => $eventListener->requestRejected($request));
}
Expand Down Expand Up @@ -119,6 +125,10 @@ public function requestHeaderStart(Request $request, Stream $stream): void
throw new \Error('Invalid request phase transition from ' . $previousPhase->name . ' to RequestHeaders');
}

if (self::isRejected($request)) {
throw new \Error('Request has been rejected by the server. Use a new request for retries.');
}

$this->requestPhase[$request] = Phase::RequestHeaders;

$this->invoke($request, fn (EventListener $eventListener) => $eventListener->requestHeaderStart($request, $stream));
Expand Down
4 changes: 4 additions & 0 deletions src/Request.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ public function getEventListeners(): array
*/
public function isUnprocessed(): bool
{
if (EventInvoker::isRejected($this)) {
return true;
}

return \in_array(EventInvoker::getPhase($this), [Phase::Unprocessed, Phase::Blocked, Phase::Connected, Phase::Rejected], true);
}

Expand Down
30 changes: 30 additions & 0 deletions test/Connection/Http2ConnectionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,36 @@ public function testServerPushingOddStream(): void
$response->getBody()->buffer();
}

public function testServerStreamRefuse(): void
{
[$server, $client] = Socket\createSocketPair();

$connection = new Http2Connection($client, 0, null);
$server->write(self::packFrame('', Http2Parser::SETTINGS, 0));
$connection->initialize();

$request = new Request('http://localhost/');
events()->requestStart($request);
$stream = $connection->getStream($request);

async(static function () use ($server) {
delay(0.1);

$server->write(self::packFrame(\pack("N", Http2Parser::REFUSED_STREAM), Http2Parser::RST_STREAM, Http2Parser::NO_FLAG, 1));
});

try {
$stream->request($request, new NullCancellation());

self::fail('SocketException expected');
} catch (SocketException $socketException) {
events()->requestFailed($request, $socketException);

$this->assertSame('Stream closed by server: Stream refused', $socketException->getMessage());
$this->assertTrue($request->isUnprocessed());
}
}

/**
* @throws Socket\SocketException
* @throws \Amp\ByteStream\ClosedException
Expand Down

0 comments on commit 8621dbb

Please sign in to comment.