diff --git a/src/Connection/Internal/Http2ConnectionProcessor.php b/src/Connection/Internal/Http2ConnectionProcessor.php index f56e05d0..f1f1721b 100644 --- a/src/Connection/Internal/Http2ConnectionProcessor.php +++ b/src/Connection/Internal/Http2ConnectionProcessor.php @@ -722,7 +722,25 @@ public function handleStreamReset(int $streamId, int $errorCode): void return; } - $this->handleStreamException(new Http2StreamException("Stream closed by server", $streamId, $errorCode)); + $message = match ($errorCode) { + Http2Parser::GRACEFUL_SHUTDOWN => 'Graceful shutdown', + Http2Parser::PROTOCOL_ERROR => 'Protocol error', + Http2Parser::INTERNAL_ERROR => 'Internal error', + Http2Parser::FLOW_CONTROL_ERROR => 'Flow control error', + Http2Parser::SETTINGS_TIMEOUT => 'Settings timeout', + Http2Parser::STREAM_CLOSED => 'Stream closed', + Http2Parser::FRAME_SIZE_ERROR => 'Frame size error', + Http2Parser::REFUSED_STREAM => 'Stream refused', + Http2Parser::CANCEL => 'Stream cancelled', + Http2Parser::COMPRESSION_ERROR => 'Compression error', + Http2Parser::CONNECT_ERROR => 'Connect error', + Http2Parser::ENHANCE_YOUR_CALM => 'Enhance your calm', + Http2Parser::INADEQUATE_SECURITY => 'Inadequate security', + Http2Parser::HTTP_1_1_REQUIRED => 'HTTP/1.1 required', + default => 'Unknown error' . $errorCode + }; + + $this->handleStreamException(new Http2StreamException("Stream closed by server: $message", $streamId, $errorCode)); } public function handleStreamException(Http2StreamException $exception): void diff --git a/src/Internal/EventInvoker.php b/src/Internal/EventInvoker.php index b95b3892..ca6a9cea 100644 --- a/src/Internal/EventInvoker.php +++ b/src/Internal/EventInvoker.php @@ -26,11 +26,18 @@ public static function getPhase(Request $request): Phase return self::get()->requestPhase[$request] ?? Phase::Unprocessed; } + public static function isRejected(Request $request): bool + { + return self::get()->requestRejected[$request] ?? false; + } + private \WeakMap $requestPhase; + private \WeakMap $requestRejected; public function __construct() { $this->requestPhase = new \WeakMap(); + $this->requestRejected = new \WeakMap(); } private function invoke(Request $request, \Closure $closure): void @@ -47,6 +54,10 @@ public function requestStart(Request $request): void throw new \Error('Invalid request phase transition from ' . $previousPhase->name . ' to Blocked'); } + if (self::isRejected($request)) { + throw new \Error('Request has been rejected by the server. Use a new request for retries.'); + } + $this->requestPhase[$request] = Phase::Blocked; $this->invoke($request, fn (EventListener $eventListener) => $eventListener->requestStart($request)); @@ -55,7 +66,7 @@ public function requestStart(Request $request): void public function requestFailed(Request $request, \Throwable $exception): void { $previousPhase = self::getPhase($request); - if (\in_array($previousPhase, [Phase::Complete, Phase::Failed, Phase::Rejected], true)) { + if (\in_array($previousPhase, [Phase::Complete, Phase::Failed], true)) { throw new \Error('Invalid request phase transition from ' . $previousPhase->name . ' to Failed'); } @@ -67,7 +78,7 @@ public function requestFailed(Request $request, \Throwable $exception): void public function requestEnd(Request $request, Response $response): void { $previousPhase = self::getPhase($request); - if ($previousPhase !== Phase::Blocked && $previousPhase !== Phase::ResponseHeaders && $previousPhase !== Phase::ResponseBody) { + if (!\in_array($previousPhase, [Phase::Blocked, Phase::ResponseHeaders, Phase::ResponseBody], true)) { throw new \Error('Invalid request phase transition from ' . $previousPhase->name . ' to Complete'); } @@ -78,12 +89,7 @@ public function requestEnd(Request $request, Response $response): void public function requestRejected(Request $request): void { - $previousPhase = self::getPhase($request); - if (\in_array($previousPhase, [Phase::Complete, Phase::Failed, Phase::Rejected], true)) { - throw new \Error('Invalid request phase transition from ' . $previousPhase->name . ' to Failed'); - } - - $this->requestPhase[$request] = Phase::Rejected; + $this->requestRejected[$request] = true; $this->invoke($request, fn (EventListener $eventListener) => $eventListener->requestRejected($request)); } @@ -119,6 +125,10 @@ public function requestHeaderStart(Request $request, Stream $stream): void throw new \Error('Invalid request phase transition from ' . $previousPhase->name . ' to RequestHeaders'); } + if (self::isRejected($request)) { + throw new \Error('Request has been rejected by the server. Use a new request for retries.'); + } + $this->requestPhase[$request] = Phase::RequestHeaders; $this->invoke($request, fn (EventListener $eventListener) => $eventListener->requestHeaderStart($request, $stream)); diff --git a/src/Request.php b/src/Request.php index 676709ae..45dd7d06 100644 --- a/src/Request.php +++ b/src/Request.php @@ -87,6 +87,10 @@ public function getEventListeners(): array */ public function isUnprocessed(): bool { + if (EventInvoker::isRejected($this)) { + return true; + } + return \in_array(EventInvoker::getPhase($this), [Phase::Unprocessed, Phase::Blocked, Phase::Connected, Phase::Rejected], true); } diff --git a/test/Connection/Http2ConnectionTest.php b/test/Connection/Http2ConnectionTest.php index 3cf980f5..859773ea 100644 --- a/test/Connection/Http2ConnectionTest.php +++ b/test/Connection/Http2ConnectionTest.php @@ -493,6 +493,36 @@ public function testServerPushingOddStream(): void $response->getBody()->buffer(); } + public function testServerStreamRefuse(): void + { + [$server, $client] = Socket\createSocketPair(); + + $connection = new Http2Connection($client, 0, null); + $server->write(self::packFrame('', Http2Parser::SETTINGS, 0)); + $connection->initialize(); + + $request = new Request('http://localhost/'); + events()->requestStart($request); + $stream = $connection->getStream($request); + + async(static function () use ($server) { + delay(0.1); + + $server->write(self::packFrame(\pack("N", Http2Parser::REFUSED_STREAM), Http2Parser::RST_STREAM, Http2Parser::NO_FLAG, 1)); + }); + + try { + $stream->request($request, new NullCancellation()); + + self::fail('SocketException expected'); + } catch (SocketException $socketException) { + events()->requestFailed($request, $socketException); + + $this->assertSame('Stream closed by server: Stream refused', $socketException->getMessage()); + $this->assertTrue($request->isUnprocessed()); + } + } + /** * @throws Socket\SocketException * @throws \Amp\ByteStream\ClosedException