diff --git a/.github/dependabot.yaml b/.github/dependabot.yaml index 439e796..bf6b38a 100644 --- a/.github/dependabot.yaml +++ b/.github/dependabot.yaml @@ -8,3 +8,7 @@ updates: interval: monthly labels: - autosubmit + groups: + github-actions: + patterns: + - "*" diff --git a/.github/workflows/health.yaml b/.github/workflows/health.yaml new file mode 100644 index 0000000..9f47a03 --- /dev/null +++ b/.github/workflows/health.yaml @@ -0,0 +1,15 @@ +name: Health + +on: + pull_request: + branches: [ master ] + types: [opened, synchronize, reopened, labeled, unlabeled] + +jobs: + health: + uses: dart-lang/ecosystem/.github/workflows/health.yaml@main + with: + coverage_web: false + sdk: dev + permissions: + pull-requests: write diff --git a/.github/workflows/no-response.yml b/.github/workflows/no-response.yml new file mode 100644 index 0000000..ab1ac49 --- /dev/null +++ b/.github/workflows/no-response.yml @@ -0,0 +1,37 @@ +# A workflow to close issues where the author hasn't responded to a request for +# more information; see https://github.com/actions/stale. + +name: No Response + +# Run as a daily cron. +on: + schedule: + # Every day at 8am + - cron: '0 8 * * *' + +# All permissions not specified are set to 'none'. +permissions: + issues: write + pull-requests: write + +jobs: + no-response: + runs-on: ubuntu-latest + if: ${{ github.repository_owner == 'dart-lang' }} + steps: + - uses: actions/stale@28ca1036281a5e5922ead5184a1bbf96e5fc984e + with: + # Don't automatically mark inactive issues+PRs as stale. + days-before-stale: -1 + # Close needs-info issues and PRs after 14 days of inactivity. + days-before-close: 14 + stale-issue-label: "needs-info" + close-issue-message: > + Without additional information we're not able to resolve this issue. + Feel free to add more info or respond to any questions above and we + can reopen the case. Thanks for your contribution! + stale-pr-label: "needs-info" + close-pr-message: > + Without additional information we're not able to resolve this PR. + Feel free to add more info or respond to any questions above. + Thanks for your contribution! diff --git a/.github/workflows/post_summaries.yaml b/.github/workflows/post_summaries.yaml new file mode 100644 index 0000000..e082efe --- /dev/null +++ b/.github/workflows/post_summaries.yaml @@ -0,0 +1,17 @@ +name: Comment on the pull request + +on: + # Trigger this workflow after the Health workflow completes. This workflow will have permissions to + # do things like create comments on the PR, even if the original workflow couldn't. + workflow_run: + workflows: + - Health + - Publish + types: + - completed + +jobs: + upload: + uses: dart-lang/ecosystem/.github/workflows/post_summaries.yaml@main + permissions: + pull-requests: write diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml new file mode 100644 index 0000000..27157a0 --- /dev/null +++ b/.github/workflows/publish.yaml @@ -0,0 +1,17 @@ +# A CI configuration to auto-publish pub packages. + +name: Publish + +on: + pull_request: + branches: [ master ] + push: + tags: [ 'v[0-9]+.[0-9]+.[0-9]+' ] + +jobs: + publish: + if: ${{ github.repository_owner == 'dart-lang' }} + uses: dart-lang/ecosystem/.github/workflows/publish.yaml@main + permissions: + id-token: write # Required for authentication using OIDC + pull-requests: write # Required for writing the pull request note diff --git a/.github/workflows/test-package.yml b/.github/workflows/test-package.yml index c21c3dd..b82cea5 100644 --- a/.github/workflows/test-package.yml +++ b/.github/workflows/test-package.yml @@ -22,8 +22,8 @@ jobs: matrix: sdk: [dev] steps: - - uses: actions/checkout@8e5e7e5ab8b370d6c329ec480221332ada57f0ab - - uses: dart-lang/setup-dart@d6a63dab3335f427404425de0fbfed4686d93c4f + - uses: actions/checkout@a5ac7e51b41094c92402da3b24376905380afc29 + - uses: dart-lang/setup-dart@f0ead981b4d9a35b37f30d36160575d60931ec30 with: sdk: ${{ matrix.sdk }} - id: install @@ -47,10 +47,10 @@ jobs: matrix: # Add macos-latest and/or windows-latest if relevant for this package. os: [ubuntu-latest] - sdk: [2.17.0, dev] + sdk: [3.2, dev] steps: - - uses: actions/checkout@8e5e7e5ab8b370d6c329ec480221332ada57f0ab - - uses: dart-lang/setup-dart@d6a63dab3335f427404425de0fbfed4686d93c4f + - uses: actions/checkout@a5ac7e51b41094c92402da3b24376905380afc29 + - uses: dart-lang/setup-dart@f0ead981b4d9a35b37f30d36160575d60931ec30 with: sdk: ${{ matrix.sdk }} - id: install diff --git a/CHANGELOG.md b/CHANGELOG.md index d0a7646..8248505 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,22 @@ -## 2.0.2-dev +## 2.3.1-wip +- Require Dart 3.2 +- Add topics to `pubspec.yaml` + +## 2.3.0 + +- Only send updates on frames and pings being received when there are listeners, as to not fill up memory. + +## 2.2.0 + +- Transform headers to lowercase. +- Expose pings to connection to enable the KEEPALIVE feature for gRPC. + +## 2.1.0 + +- Require Dart `3.0.0` - Require Dart `2.17.0`. +- Send `WINDOW_UPDATE` frames for the connection to account for data being sent on closed streams until the `RST_STREAM` has been processed. ## 2.0.1 diff --git a/README.md b/README.md index e776da2..36a8aaa 100644 --- a/README.md +++ b/README.md @@ -15,10 +15,10 @@ import 'dart:io'; import 'package:http2/http2.dart'; -main() async { - var uri = Uri.parse('https://www.google.com/'); +Future main() async { + final uri = Uri.parse('https://www.google.com/'); - var transport = new ClientTransportConnection.viaSocket( + final transport = ClientTransportConnection.viaSocket( await SecureSocket.connect( uri.host, uri.port, @@ -26,12 +26,12 @@ main() async { ), ); - var stream = transport.makeRequest( + final stream = transport.makeRequest( [ - new Header.ascii(':method', 'GET'), - new Header.ascii(':path', uri.path), - new Header.ascii(':scheme', uri.scheme), - new Header.ascii(':authority', uri.host), + Header.ascii(':method', 'GET'), + Header.ascii(':path', uri.path), + Header.ascii(':scheme', uri.scheme), + Header.ascii(':authority', uri.host), ], endStream: true, ); @@ -39,8 +39,8 @@ main() async { await for (var message in stream.incomingMessages) { if (message is HeadersStreamMessage) { for (var header in message.headers) { - var name = utf8.decode(header.name); - var value = utf8.decode(header.value); + final name = utf8.decode(header.name); + final value = utf8.decode(header.value); print('Header: $name: $value'); } } else if (message is DataStreamMessage) { diff --git a/analysis_options.yaml b/analysis_options.yaml index 8b5373d..9f9fe93 100644 --- a/analysis_options.yaml +++ b/analysis_options.yaml @@ -1,15 +1,9 @@ -include: package:lints/recommended.yaml +# https://dart.dev/tools/analysis#the-analysis-options-file +include: package:dart_flutter_team_lints/analysis_options.yaml analyzer: language: strict-casts: true errors: - unused_element: error - unused_import: error - unused_local_variable: error - dead_code: error - -linter: - rules: # Disabled as there are several dozen violations. - constant_identifier_names: false + constant_identifier_names: ignore diff --git a/example/display_headers.dart b/example/display_headers.dart index 9cefdab..42dbf22 100644 --- a/example/display_headers.dart +++ b/example/display_headers.dart @@ -1,3 +1,7 @@ +// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + import 'dart:async'; import 'dart:convert'; import 'dart:io'; diff --git a/lib/multiprotocol_server.dart b/lib/multiprotocol_server.dart index 0b1aeeb..54fe6c0 100644 --- a/lib/multiprotocol_server.dart +++ b/lib/multiprotocol_server.dart @@ -45,7 +45,7 @@ class MultiProtocolHttpServer { /// /// See also [startServing]. static Future bind( - address, int port, SecurityContext context, + Object? address, int port, SecurityContext context, {http2.ServerSettings? settings}) async { context.setAlpnProtocols(['h2', 'h2-14', 'http/1.1'], true); var secureServer = await SecureServerSocket.bind(address, port, context); diff --git a/lib/src/async_utils/async_utils.dart b/lib/src/async_utils/async_utils.dart index 6148328..22a73e9 100644 --- a/lib/src/async_utils/async_utils.dart +++ b/lib/src/async_utils/async_utils.dart @@ -60,15 +60,9 @@ class BufferedSink { bufferIndicator.markBuffered(); _controller - ..onListen = () { - bufferIndicator.markUnBuffered(); - } - ..onPause = () { - bufferIndicator.markBuffered(); - } - ..onResume = () { - bufferIndicator.markUnBuffered(); - } + ..onListen = bufferIndicator.markUnBuffered + ..onPause = bufferIndicator.markBuffered + ..onResume = bufferIndicator.markUnBuffered ..onCancel = () { // TODO: We may want to propagate cancel events as errors. // Currently `_doneFuture` will just complete normally if the sink diff --git a/lib/src/connection.dart b/lib/src/connection.dart index e27b90e..4e52e57 100644 --- a/lib/src/connection.dart +++ b/lib/src/connection.dart @@ -99,6 +99,10 @@ abstract class Connection { final Completer _onInitialPeerSettingsReceived = Completer(); + final StreamController _pingReceived = StreamController(); + + final StreamController _frameReceived = StreamController(); + /// Future which completes when the first SETTINGS frame is received from /// the peer. Future get onInitialPeerSettingsReceived => @@ -179,12 +183,12 @@ abstract class Connection { // Setup handlers. _settingsHandler = SettingsHandler(_hpackContext.encoder, _frameWriter, acknowledgedSettings, peerSettings); - _pingHandler = PingHandler(_frameWriter); + _pingHandler = PingHandler(_frameWriter, _pingReceived); var settings = _decodeSettings(settingsObject); // Do the initial settings handshake (possibly with pushes disabled). - _settingsHandler.changeSettings(settings).catchError((error) { + _settingsHandler.changeSettings(settings).catchError((Object error) { // TODO: The [error] can contain sensitive information we now expose via // a [Goaway] frame. We should somehow ensure we're only sending useful // but non-sensitive information. @@ -267,15 +271,15 @@ abstract class Connection { } /// Pings the remote peer (can e.g. be used for measuring latency). - Future ping() { + Future ping() { return _pingHandler.ping().catchError((e, s) { - return Future.error( + return Future.error( TransportException('The connection has been terminated.')); }, test: (e) => e is TerminatedException); } /// Finishes this connection. - Future finish() { + Future finish() { _finishing(active: true); // TODO: There is probably more we need to wait for. @@ -284,8 +288,8 @@ abstract class Connection { } /// Terminates this connection forcefully. - Future terminate() { - return _terminate(ErrorCode.NO_ERROR); + Future terminate([int? errorCode]) { + return _terminate(errorCode ?? ErrorCode.NO_ERROR); } void _activeStateHandler(bool isActive) => @@ -340,6 +344,9 @@ abstract class Connection { frame.decodedHeaders = _hpackContext.decoder.decode(frame.headerBlockFragment); } + if (_frameReceived.hasListener) { + _frameReceived.add(null); + } // Handle the frame as either a connection or a stream frame. if (frame.header.streamId == 0) { @@ -434,16 +441,15 @@ abstract class Connection { _settingsHandler.terminate(exception); return Future.wait([cancelFuture, closeFuture]) - .catchError((_) => const []); + .catchError((_) => const []); } - return Future.value(); + return Future.value(); } } class ClientConnection extends Connection implements ClientTransportConnection { - ClientConnection._(Stream> incoming, StreamSink> outgoing, - Settings settings) - : super(incoming, outgoing, settings, isClientConnection: true); + ClientConnection._(super.incoming, super.outgoing, super.settings) + : super(isClientConnection: true); factory ClientConnection(Stream> incoming, StreamSink> outgoing, ClientSettings clientSettings) { @@ -473,12 +479,17 @@ class ClientConnection extends Connection implements ClientTransportConnection { } return hStream; } + + @override + Stream get onPingReceived => _pingReceived.stream; + + @override + Stream get onFrameReceived => _frameReceived.stream; } class ServerConnection extends Connection implements ServerTransportConnection { - ServerConnection._(Stream> incoming, StreamSink> outgoing, - Settings settings) - : super(incoming, outgoing, settings, isClientConnection: false); + ServerConnection._(super.incoming, super.outgoing, super.settings) + : super(isClientConnection: false); factory ServerConnection(Stream> incoming, StreamSink> outgoing, ServerSettings serverSettings) { @@ -489,4 +500,10 @@ class ServerConnection extends Connection implements ServerTransportConnection { @override Stream get incomingStreams => _streams.incomingStreams.cast(); + + @override + Stream get onPingReceived => _pingReceived.stream; + + @override + Stream get onFrameReceived => _frameReceived.stream; } diff --git a/lib/src/connection_preface.dart b/lib/src/connection_preface.dart index c2bf652..2f0b98c 100644 --- a/lib/src/connection_preface.dart +++ b/lib/src/connection_preface.dart @@ -96,15 +96,14 @@ Stream> readConnectionPreface(Stream> incoming) { } result.onListen = () { - subscription = incoming.listen(onData, - onError: (Object e, StackTrace s) => result.addError(e, s), - onDone: () { - if (!connectionPrefaceRead) { - terminate('EOS before connection preface could be read.'); - } else { - result.close(); - } - }); + subscription = + incoming.listen(onData, onError: result.addError, onDone: () { + if (!connectionPrefaceRead) { + terminate('EOS before connection preface could be read.'); + } else { + result.close(); + } + }); result ..onPause = subscription.pause ..onResume = subscription.resume diff --git a/lib/src/error_handler.dart b/lib/src/error_handler.dart index d599dc2..a8e1920 100644 --- a/lib/src/error_handler.dart +++ b/lib/src/error_handler.dart @@ -7,11 +7,11 @@ import 'dart:async'; import 'sync_errors.dart'; /// Used by classes which may be terminated from the outside. -class TerminatableMixin { +mixin TerminatableMixin { bool _terminated = false; /// Terminates this stream message queue. Further operations on it will fail. - void terminate([error]) { + void terminate([Object? error]) { if (!wasTerminated) { _terminated = true; onTerminated(error); @@ -40,7 +40,7 @@ class TerminatableMixin { } /// Used by classes which may be cancelled. -class CancellableMixin { +mixin CancellableMixin { bool _cancelled = false; final _cancelCompleter = Completer.sync(); @@ -58,9 +58,9 @@ class CancellableMixin { } /// Used by classes which may be closed. -class ClosableMixin { +mixin ClosableMixin { bool _closing = false; - final Completer _completer = Completer(); + final Completer _completer = Completer(); Future get done => _completer.future; @@ -91,13 +91,13 @@ class ClosableMixin { return f(); } - void closeWithValue([value]) { + void closeWithValue([Object? value]) { if (!wasClosed) { _completer.complete(value); } } - void closeWithError(error) { + void closeWithError(Object? error) { if (!wasClosed) { _completer.complete(error); } diff --git a/lib/src/flowcontrol/connection_queues.dart b/lib/src/flowcontrol/connection_queues.dart index b624787..f1a499a 100644 --- a/lib/src/flowcontrol/connection_queues.dart +++ b/lib/src/flowcontrol/connection_queues.dart @@ -10,11 +10,9 @@ import 'dart:async'; import 'dart:collection'; import '../../transport.dart'; - import '../byte_utils.dart'; import '../error_handler.dart'; import '../frames/frames.dart'; - import 'queue_messages.dart'; import 'stream_queues.dart'; import 'window_handler.dart'; @@ -66,7 +64,7 @@ class ConnectionMessageQueueOut extends Object } @override - void onTerminated(error) { + void onTerminated(Object? error) { _messages.clear(); closeWithError(error); } @@ -182,7 +180,7 @@ class ConnectionMessageQueueIn extends Object final IncomingWindowHandler _windowUpdateHandler; /// Catches any protocol errors and acts upon them. - final Function _catchProtocolErrors; + final void Function(void Function()) _catchProtocolErrors; /// A mapping from stream-id to the corresponding stream-specific /// [StreamMessageQueueIn]. @@ -200,7 +198,7 @@ class ConnectionMessageQueueIn extends Object this._windowUpdateHandler, this._catchProtocolErrors); @override - void onTerminated(error) { + void onTerminated(Object? error) { // NOTE: The higher level will be shutdown first, so all streams // should have been removed at this point. assert(_stream2messageQueue.isEmpty); @@ -260,7 +258,7 @@ class ConnectionMessageQueueIn extends Object /// If a [DataFrame] will be ignored, this method will take the minimal /// action necessary. void processIgnoredDataFrame(DataFrame frame) { - _windowUpdateHandler.gotData(frame.bytes.length); + _windowUpdateHandler.dataProcessed(frame.bytes.length); } /// Processes an incoming [HeadersFrame] which is addressed to a specific diff --git a/lib/src/flowcontrol/stream_queues.dart b/lib/src/flowcontrol/stream_queues.dart index ff9711c..de7950a 100644 --- a/lib/src/flowcontrol/stream_queues.dart +++ b/lib/src/flowcontrol/stream_queues.dart @@ -84,7 +84,7 @@ class StreamMessageQueueOut extends Object } @override - void onTerminated(error) { + void onTerminated(Object? error) { _messages.clear(); closeWithError(error); } @@ -183,11 +183,7 @@ class StreamMessageQueueIn extends Object _tryUpdateBufferIndicator(); } } - ..onPause = () { - _tryUpdateBufferIndicator(); - // TODO: Would we ever want to decrease the window size in this - // situation? - } + ..onPause = _tryUpdateBufferIndicator ..onResume = () { if (!wasClosed && !wasTerminated) { _tryDispatch(); diff --git a/lib/src/flowcontrol/window_handler.dart b/lib/src/flowcontrol/window_handler.dart index 60d1f38..0f82b98 100644 --- a/lib/src/flowcontrol/window_handler.dart +++ b/lib/src/flowcontrol/window_handler.dart @@ -58,12 +58,12 @@ abstract class AbstractOutgoingWindowHandler { /// Handles the connection window for outgoing data frames. class OutgoingConnectionWindowHandler extends AbstractOutgoingWindowHandler { - OutgoingConnectionWindowHandler(Window window) : super(window); + OutgoingConnectionWindowHandler(super.window); } /// Handles the window for outgoing messages to the peer. class OutgoingStreamWindowHandler extends AbstractOutgoingWindowHandler { - OutgoingStreamWindowHandler(Window window) : super(window); + OutgoingStreamWindowHandler(super.window); /// Update the peer window by adding [difference] to it. /// diff --git a/lib/src/frames/frame_reader.dart b/lib/src/frames/frame_reader.dart index 2d93f01..7b69261 100644 --- a/lib/src/frames/frame_reader.dart +++ b/lib/src/frames/frame_reader.dart @@ -2,7 +2,7 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. -part of http2.src.frames; +part of 'frames.dart'; /// Used for converting a `Stream>` to a `Stream`. class FrameReader { diff --git a/lib/src/frames/frame_types.dart b/lib/src/frames/frame_types.dart index abab588..33c9e4a 100644 --- a/lib/src/frames/frame_types.dart +++ b/lib/src/frames/frame_types.dart @@ -2,7 +2,7 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. -part of http2.src.frames; +part of 'frames.dart'; const int FRAME_HEADER_SIZE = 9; @@ -67,7 +67,7 @@ class DataFrame extends Frame { final List bytes; - DataFrame(FrameHeader header, this.padLength, this.bytes) : super(header); + DataFrame(super.header, this.padLength, this.bytes); bool get hasEndStreamFlag => _isFlagSet(header.flags, FLAG_END_STREAM); bool get hasPaddedFlag => _isFlagSet(header.flags, FLAG_PADDED); @@ -99,9 +99,14 @@ class HeadersFrame extends Frame { final int? weight; final List headerBlockFragment; - HeadersFrame(FrameHeader header, this.padLength, this.exclusiveDependency, - this.streamDependency, this.weight, this.headerBlockFragment) - : super(header); + HeadersFrame( + super.header, + this.padLength, + this.exclusiveDependency, + this.streamDependency, + this.weight, + this.headerBlockFragment, + ); /// This will be set from the outside after decoding. late List
decodedHeaders; @@ -148,9 +153,12 @@ class PriorityFrame extends Frame { final int streamDependency; final int weight; - PriorityFrame(FrameHeader header, this.exclusiveDependency, - this.streamDependency, this.weight) - : super(header); + PriorityFrame( + super.header, + this.exclusiveDependency, + this.streamDependency, + this.weight, + ); @override Map toJson() => super.toJson() @@ -166,7 +174,7 @@ class RstStreamFrame extends Frame { final int errorCode; - RstStreamFrame(FrameHeader header, this.errorCode) : super(header); + RstStreamFrame(super.header, this.errorCode); @override Map toJson() => super.toJson() @@ -199,7 +207,7 @@ class SettingsFrame extends Frame { final List settings; - SettingsFrame(FrameHeader header, this.settings) : super(header); + SettingsFrame(super.header, this.settings); bool get hasAckFlag => _isFlagSet(header.flags, FLAG_ACK); @@ -225,9 +233,12 @@ class PushPromiseFrame extends Frame { /// This will be set from the outside after decoding. late List
decodedHeaders; - PushPromiseFrame(FrameHeader header, this.padLength, this.promisedStreamId, - this.headerBlockFragment) - : super(header); + PushPromiseFrame( + super.header, + this.padLength, + this.promisedStreamId, + this.headerBlockFragment, + ); bool get hasEndHeadersFlag => _isFlagSet(header.flags, FLAG_END_HEADERS); bool get hasPaddedFlag => _isFlagSet(header.flags, FLAG_PADDED); @@ -267,7 +278,7 @@ class PingFrame extends Frame { final int opaqueData; - PingFrame(FrameHeader header, this.opaqueData) : super(header); + PingFrame(super.header, this.opaqueData); bool get hasAckFlag => _isFlagSet(header.flags, FLAG_ACK); @@ -283,9 +294,7 @@ class GoawayFrame extends Frame { final int errorCode; final List debugData; - GoawayFrame( - FrameHeader header, this.lastStreamId, this.errorCode, this.debugData) - : super(header); + GoawayFrame(super.header, this.lastStreamId, this.errorCode, this.debugData); @override Map toJson() => super.toJson() @@ -301,8 +310,7 @@ class WindowUpdateFrame extends Frame { final int windowSizeIncrement; - WindowUpdateFrame(FrameHeader header, this.windowSizeIncrement) - : super(header); + WindowUpdateFrame(super.header, this.windowSizeIncrement); @override Map toJson() => super.toJson() @@ -316,8 +324,7 @@ class ContinuationFrame extends Frame { final List headerBlockFragment; - ContinuationFrame(FrameHeader header, this.headerBlockFragment) - : super(header); + ContinuationFrame(super.header, this.headerBlockFragment); bool get hasEndHeadersFlag => _isFlagSet(header.flags, FLAG_END_HEADERS); @@ -331,7 +338,7 @@ class ContinuationFrame extends Frame { class UnknownFrame extends Frame { final List data; - UnknownFrame(FrameHeader header, this.data) : super(header); + UnknownFrame(super.header, this.data); @override Map toJson() => super.toJson() diff --git a/lib/src/frames/frame_utils.dart b/lib/src/frames/frame_utils.dart index 73a3173..11c30a5 100644 --- a/lib/src/frames/frame_utils.dart +++ b/lib/src/frames/frame_utils.dart @@ -2,6 +2,6 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. -part of http2.src.frames; +part of 'frames.dart'; bool _isFlagSet(int value, int flag) => value & flag == flag; diff --git a/lib/src/frames/frame_writer.dart b/lib/src/frames/frame_writer.dart index 211aa54..50caad7 100644 --- a/lib/src/frames/frame_writer.dart +++ b/lib/src/frames/frame_writer.dart @@ -2,7 +2,7 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. -part of http2.src.frames; +part of 'frames.dart'; // TODO: No support for writing padded information. // TODO: No support for stream priorities. diff --git a/lib/src/hpack/hpack.dart b/lib/src/hpack/hpack.dart index e59cd37..ab5d9c8 100644 --- a/lib/src/hpack/hpack.dart +++ b/lib/src/hpack/hpack.dart @@ -11,7 +11,6 @@ import 'dart:convert' show ascii; import 'dart:typed_data'; import '../byte_utils.dart'; - import 'huffman.dart'; import 'huffman_table.dart'; @@ -49,7 +48,10 @@ class Header { Header(this.name, this.value, {this.neverIndexed = false}); factory Header.ascii(String name, String value) { - return Header(ascii.encode(name), ascii.encode(value)); + // Specs: `However, header field names MUST be converted to lowercase prior + // to their encoding in HTTP/2. A request or response containing uppercase + // header field names MUST be treated as malformed (Section 8.1.2.6).` + return Header(ascii.encode(name.toLowerCase()), ascii.encode(value)); } } @@ -154,6 +156,7 @@ class HPackDecoder { } } return headers; + // ignore: avoid_catching_errors } on RangeError catch (e) { throw HPackDecodingException('$e'); } on HuffmanDecodingException catch (e) { diff --git a/lib/src/ping/ping_handler.dart b/lib/src/ping/ping_handler.dart index 20ce487..f9be1f9 100644 --- a/lib/src/ping/ping_handler.dart +++ b/lib/src/ping/ping_handler.dart @@ -16,16 +16,21 @@ import '../sync_errors.dart'; class PingHandler extends Object with TerminatableMixin { final FrameWriter _frameWriter; final Map _remainingPings = {}; + final Sink? pingReceived; + final bool Function() isListeningToPings; int _nextId = 1; - PingHandler(this._frameWriter); + PingHandler(this._frameWriter, StreamController pingStream) + : pingReceived = pingStream.sink, + isListeningToPings = (() => pingStream.hasListener); @override void onTerminated(Object? error) { - var values = _remainingPings.values.toList(); + final remainingPings = _remainingPings.values.toList(); _remainingPings.clear(); - for (var value in values) { - value.completeError(error ?? 'Unspecified error'); + for (final ping in remainingPings) { + ping.completeError( + error ?? 'Remaining ping completed with unspecified error'); } } @@ -36,6 +41,9 @@ class PingHandler extends Object with TerminatableMixin { } if (!frame.hasAckFlag) { + if (isListeningToPings()) { + pingReceived?.add(frame.opaqueData); + } _frameWriter.writePingFrame(frame.opaqueData, ack: true); } else { var c = _remainingPings.remove(frame.opaqueData); @@ -53,7 +61,7 @@ class PingHandler extends Object with TerminatableMixin { Future ping() { return ensureNotTerminatedAsync(() { - var c = Completer(); + var c = Completer(); var id = _nextId++; _remainingPings[id] = c; _frameWriter.writePingFrame(id); diff --git a/lib/src/settings/settings.dart b/lib/src/settings/settings.dart index 948658d..291c668 100644 --- a/lib/src/settings/settings.dart +++ b/lib/src/settings/settings.dart @@ -163,7 +163,7 @@ class SettingsHandler extends Object with TerminatableMixin { return ensureNotTerminatedAsync(() { // TODO: Have a timeout: When ACK doesn't get back in a reasonable time // frame we should quit with ErrorCode.SETTINGS_TIMEOUT. - var completer = Completer(); + var completer = Completer(); _toBeAcknowledgedSettings.add(changes); _toBeAcknowledgedCompleters.add(completer); _frameWriter.writeSettingsFrame(changes); diff --git a/lib/src/streams/stream_handler.dart b/lib/src/streams/stream_handler.dart index f32a1e5..92a228d 100644 --- a/lib/src/streams/stream_handler.dart +++ b/lib/src/streams/stream_handler.dart @@ -283,7 +283,7 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin { 'not in "idle" state.'); } - var sameDirection = (nextStreamId + remoteStreamId) % 2 == 0; + var sameDirection = (nextStreamId + remoteStreamId).isEven; assert(!sameDirection); lastRemoteStreamId = remoteStreamId; @@ -343,7 +343,7 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin { // NOTE: We are not interested whether the streams were normally finished // or abnormally terminated. Therefore we use 'catchError((_) {})'! var streamDone = [streamQueueIn.done, streamQueueOut.done]; - Future.wait(streamDone).catchError((_) => const []).whenComplete(() { + Future.wait(streamDone).catchError((_) => const []).whenComplete(() { _cleanupClosedStream(stream); }); @@ -593,11 +593,26 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin { } else if (frame is PushPromiseFrame) { throw ProtocolException('Cannot push on a non-existent stream ' '(stream ${frame.header.streamId} does not exist)'); + } else if (frame is DataFrame) { + // http/2 spec: + // However, after sending the RST_STREAM, the sending endpoint + // MUST be prepared to receive and process additional frames sent + // on the stream that might have been sent by the peer prior to + // the arrival of the RST_STREAM. + // and: + // A receiver that receives a flow-controlled frame MUST always + // account for its contribution against the connection + // flow-control window, unless the receiver treats this as a + // connection error (Section 5.4.1). This is necessary even if the + // frame is in error. The sender counts the frame toward the + // flow-control window, but if the receiver does not, the + // flow-control window at the sender and receiver can become + // different. + incomingQueue.processIgnoredDataFrame(frame); + // Still respond with an error, as the stream is closed. + throw _throwStreamClosedException(frame.header.streamId); } else { - throw StreamClosedException( - frame.header.streamId, - 'No open stream found and was not a headers frame opening a ' - 'new stream.'); + throw _throwStreamClosedException(frame.header.streamId); } } else { if (frame is HeadersFrame) { @@ -868,4 +883,11 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin { var id = stream.id; return (isServer && id.isEven) || (!isServer && id.isOdd); } + + static Exception _throwStreamClosedException(int streamId) => + StreamClosedException( + streamId, + 'No open stream found and was not a headers frame opening a ' + 'new stream.', + ); } diff --git a/lib/src/sync_errors.dart b/lib/src/sync_errors.dart index 2423e6a..3d11616 100644 --- a/lib/src/sync_errors.dart +++ b/lib/src/sync_errors.dart @@ -45,8 +45,7 @@ class StreamException implements Exception { } class StreamClosedException extends StreamException { - StreamClosedException(int streamId, [String message = '']) - : super(streamId, message); + StreamClosedException(super.streamId, [super.message = '']); @override String toString() => 'StreamClosedException(stream id: $streamId): $_message'; diff --git a/lib/transport.dart b/lib/transport.dart index 6423f93..87a10f6 100644 --- a/lib/transport.dart +++ b/lib/transport.dart @@ -8,6 +8,7 @@ import 'dart:io'; import 'src/connection.dart'; import 'src/hpack/hpack.dart' show Header; +export 'src/frames/frames.dart' show ErrorCode; export 'src/hpack/hpack.dart' show Header; typedef ActiveStateHandler = void Function(bool isActive); @@ -27,10 +28,7 @@ abstract class Settings { /// Settings for a [TransportConnection] a server can make. class ServerSettings extends Settings { - const ServerSettings({int? concurrentStreamLimit, int? streamWindowSize}) - : super( - concurrentStreamLimit: concurrentStreamLimit, - streamWindowSize: streamWindowSize); + const ServerSettings({super.concurrentStreamLimit, super.streamWindowSize}); } /// Settings for a [TransportConnection] a client can make. @@ -39,12 +37,9 @@ class ClientSettings extends Settings { final bool allowServerPushes; const ClientSettings( - {int? concurrentStreamLimit, - int? streamWindowSize, - this.allowServerPushes = false}) - : super( - concurrentStreamLimit: concurrentStreamLimit, - streamWindowSize: streamWindowSize); + {super.concurrentStreamLimit, + super.streamWindowSize, + this.allowServerPushes = false}); } /// Represents a HTTP/2 connection. @@ -64,13 +59,21 @@ abstract class TransportConnection { /// the peer. Future get onInitialPeerSettingsReceived; + /// Stream which emits an event with the ping id every time a ping is received + /// on this connection. + Stream get onPingReceived; + + /// Stream which emits an event every time a ping is received on this + /// connection. + Stream get onFrameReceived; + /// Finish this connection. /// /// No new streams will be accepted or can be created. Future finish(); /// Terminates this connection forcefully. - Future terminate(); + Future terminate([int? errorCode]); } abstract class ClientTransportConnection extends TransportConnection { @@ -185,8 +188,7 @@ abstract class StreamMessage { class DataStreamMessage extends StreamMessage { final List bytes; - DataStreamMessage(this.bytes, {bool? endStream}) - : super(endStream: endStream); + DataStreamMessage(this.bytes, {super.endStream}); @override String toString() => 'DataStreamMessage(${bytes.length} bytes)'; @@ -196,8 +198,7 @@ class DataStreamMessage extends StreamMessage { class HeadersStreamMessage extends StreamMessage { final List
headers; - HeadersStreamMessage(this.headers, {bool? endStream}) - : super(endStream: endStream); + HeadersStreamMessage(this.headers, {super.endStream}); @override String toString() => 'HeadersStreamMessage(${headers.length} headers)'; diff --git a/manual_test/out_of_stream_ids_test.dart b/manual_test/out_of_stream_ids_test.dart index d653793..acfbbd9 100644 --- a/manual_test/out_of_stream_ids_test.dart +++ b/manual_test/out_of_stream_ids_test.dart @@ -11,6 +11,8 @@ /// /// without this patch this test will run for a _long_ time. /// --------------------------------------------------------------------------- +library; + import 'dart:async'; import 'package:http2/src/streams/stream_handler.dart'; @@ -46,7 +48,7 @@ void main() { expect(() => client.makeRequest(headers), throwsA(const TypeMatcher())); - await Future.delayed(const Duration(seconds: 1)); + await Future.delayed(const Duration(seconds: 1)); await client.finish(); } diff --git a/pubspec.yaml b/pubspec.yaml index eb9187a..ac0e139 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,13 +1,18 @@ name: http2 -version: 2.0.2-dev +version: 2.3.1-wip description: A HTTP/2 implementation in Dart. repository: https://github.com/dart-lang/http2 +topics: + - http + - network + - protocols + environment: - sdk: '>=2.17.0 <3.0.0' + sdk: ^3.2.0 dev_dependencies: build_runner: ^2.3.0 - lints: ^2.0.0 + dart_flutter_team_lints: ^2.0.0 mockito: ^5.3.2 test: ^1.21.4 diff --git a/test/client_test.dart b/test/client_test.dart index 3d0424c..726707f 100644 --- a/test/client_test.dart +++ b/test/client_test.dart @@ -24,13 +24,13 @@ void main() { FrameWriter serverWriter, StreamIterator serverReader, Future Function() nextFrame) async { - var settingsDone = Completer(); + var settingsDone = Completer(); Future serverFun() async { serverWriter.writeSettingsFrame([]); - expect(await nextFrame() is SettingsFrame, true); + expect(await nextFrame(), isA()); serverWriter.writeSettingsAckFrame(); - expect(await nextFrame() is SettingsFrame, true); + expect(await nextFrame(), isA()); settingsDone.complete(); @@ -67,14 +67,14 @@ void main() { FrameWriter serverWriter, StreamIterator serverReader, Future Function() nextFrame) async { - final settingsDone = Completer(); + final settingsDone = Completer(); Future serverFun() async { serverWriter.writeSettingsFrame([]); settingsDone.complete(); - expect(await nextFrame() is SettingsFrame, true); + expect(await nextFrame(), isA()); serverWriter.writeSettingsAckFrame(); - expect(await nextFrame() is SettingsFrame, true); + expect(await nextFrame(), isA()); // Make sure we get the graceful shutdown message. expect( @@ -89,7 +89,7 @@ void main() { Future clientFun() async { await settingsDone.future; await client.onInitialPeerSettingsReceived - .timeout(Duration(milliseconds: 20)); // Should complete + .timeout(const Duration(milliseconds: 20)); // Should complete expect(client.isOpen, true); @@ -109,11 +109,11 @@ void main() { FrameWriter serverWriter, StreamIterator serverReader, Future Function() nextFrame) async { - final goawayReceived = Completer(); + final goawayReceived = Completer(); Future serverFun() async { serverWriter.writePingFrame(42); - expect(await nextFrame() is SettingsFrame, true); - expect(await nextFrame() is GoawayFrame, true); + expect(await nextFrame(), isA()); + expect(await nextFrame(), isA()); goawayReceived.complete(); expect(await serverReader.moveNext(), false); } @@ -123,7 +123,7 @@ void main() { expect( client.onInitialPeerSettingsReceived - .timeout(Duration(seconds: 1)), + .timeout(const Duration(seconds: 1)), throwsA(isA())); // We wait until the server received the error (it's actually later @@ -153,11 +153,11 @@ void main() { FrameWriter serverWriter, StreamIterator serverReader, Future Function() nextFrame) async { - var goawayReceived = Completer(); + var goawayReceived = Completer(); Future serverFun() async { serverWriter.writePingFrame(42); - expect(await nextFrame() is SettingsFrame, true); - expect(await nextFrame() is GoawayFrame, true); + expect(await nextFrame(), isA()); + expect(await nextFrame(), isA()); goawayReceived.complete(); expect(await serverReader.moveNext(), false); } @@ -191,10 +191,10 @@ void main() { StreamIterator serverReader, Future Function() nextFrame) async { Future serverFun() async { - expect(await nextFrame() is SettingsFrame, true); - expect(await nextFrame() is HeadersFrame, true); + expect(await nextFrame(), isA()); + expect(await nextFrame(), isA()); serverWriter.writePingFrame(42); - expect(await nextFrame() is GoawayFrame, true); + expect(await nextFrame(), isA()); expect(await serverReader.moveNext(), false); } @@ -220,25 +220,31 @@ void main() { FrameWriter serverWriter, StreamIterator serverReader, Future Function() nextFrame) async { - var handshakeCompleter = Completer(); + var handshakeCompleter = Completer(); Future serverFun() async { serverWriter.writeSettingsFrame([]); - expect(await nextFrame() is SettingsFrame, true); + expect(await nextFrame(), isA()); serverWriter.writeSettingsAckFrame(); - expect(await nextFrame() is SettingsFrame, true); + expect(await nextFrame(), isA()); handshakeCompleter.complete(); var headers = await nextFrame() as HeadersFrame; - var finFrame = await nextFrame() as DataFrame; - expect(finFrame.hasEndStreamFlag, true); + expect( + await nextFrame(), + isA().having( + (p0) => p0.hasEndStreamFlag, 'Last data frame', true)); // Write a data frame for a non-existent stream. var invalidStreamId = headers.header.streamId + 2; serverWriter.writeDataFrame(invalidStreamId, [42]); // Make sure the client sends a [RstStreamFrame] frame. + expect( + await nextFrame(), + isA() + .having((p0) => p0.header.streamId, 'Connection update', 0)); expect( await nextFrame(), isA() @@ -252,7 +258,7 @@ void main() { endStream: true); // Wait for the client finish. - expect(await nextFrame() is GoawayFrame, true); + expect(await nextFrame(), isA()); expect(await serverReader.moveNext(), false); await serverWriter.close(); } @@ -275,27 +281,58 @@ void main() { FrameWriter serverWriter, StreamIterator serverReader, Future Function() nextFrame) async { - var handshakeCompleter = Completer(); + var handshakeCompleter = Completer(); Future serverFun() async { serverWriter.writeSettingsFrame([]); - expect(await nextFrame() is SettingsFrame, true); + expect(await nextFrame(), isA()); serverWriter.writeSettingsAckFrame(); - expect(await nextFrame() is SettingsFrame, true); + expect(await nextFrame(), isA()); handshakeCompleter.complete(); var headers = await nextFrame() as HeadersFrame; - var finFrame = await nextFrame() as DataFrame; - expect(finFrame.hasEndStreamFlag, true); + expect( + await nextFrame(), + isA().having( + (p0) => p0.hasEndStreamFlag, 'Last data frame', true)); var streamId = headers.header.streamId; // Write a data frame for a non-existent stream. - serverWriter.writeDataFrame(streamId, [42], endStream: true); + var data1 = [42, 42]; + serverWriter.writeDataFrame(streamId, data1, endStream: true); // Write more data on the closed stream. - serverWriter.writeDataFrame(streamId, [42]); + var data2 = [42]; + serverWriter.writeDataFrame(streamId, data2); + + // NOTE: The order of the window update frame / rst frame just + // happens to be like that ATM. + + // The two WindowUpdateFrames for the data1 DataFrame. + expect( + await nextFrame(), + isA() + .having((p0) => p0.header.streamId, 'Stream update', 1) + .having((p0) => p0.windowSizeIncrement, 'Windowsize', + data1.length)); + + expect( + await nextFrame(), + isA() + .having((p0) => p0.header.streamId, 'Connection update', 0) + .having((p0) => p0.windowSizeIncrement, 'Windowsize', + data1.length)); + + // The [WindowUpdateFrame] for the frame on the closed stream, which + // should still update the connection. + expect( + await nextFrame(), + isA() + .having((p0) => p0.header.streamId, 'Connection update', 0) + .having((p0) => p0.windowSizeIncrement, 'Windowsize', + data2.length)); // Make sure we get a [RstStreamFrame] frame. expect( @@ -307,7 +344,7 @@ void main() { (f) => f.header.streamId, 'header.streamId', streamId)); // Wait for the client finish. - expect(await nextFrame() is GoawayFrame, true); + expect(await nextFrame(), isA()); expect(await serverReader.moveNext(), false); await serverWriter.close(); } @@ -319,7 +356,11 @@ void main() { await stream.outgoingMessages.close(); var messages = await stream.incomingMessages.toList(); expect(messages, hasLength(1)); - expect((messages[0] as DataStreamMessage).bytes, [42]); + expect( + messages[0], + isA() + .having((p0) => p0.bytes, 'Same as `data1` above', [42, 42]), + ); await client.finish(); } @@ -332,22 +373,23 @@ void main() { FrameWriter serverWriter, StreamIterator serverReader, Future Function() nextFrame) async { - var handshakeCompleter = Completer(); - var cancelDone = Completer(); - var endDone = Completer(); + var handshakeCompleter = Completer(); + var cancelDone = Completer(); + var endDone = Completer(); Future serverFun() async { serverWriter.writeSettingsFrame([]); - expect(await nextFrame() is SettingsFrame, true); + expect(await nextFrame(), isA()); serverWriter.writeSettingsAckFrame(); - expect(await nextFrame() is SettingsFrame, true); + expect(await nextFrame(), isA()); handshakeCompleter.complete(); var headers = await nextFrame() as HeadersFrame; - var finFrame = await nextFrame() as DataFrame; - expect(finFrame.hasEndStreamFlag, true); - + expect( + await nextFrame(), + isA().having( + (p0) => p0.hasEndStreamFlag, 'Last data frame', true)); var streamId = headers.header.streamId; // Write a data frame. @@ -355,6 +397,26 @@ void main() { await cancelDone.future; serverWriter.writeDataFrame(streamId, [43]); + // NOTE: The order of the window update frame / rst frame just + // happens to be like that ATM. + + // Await stream/connection window update frame. + expect( + await nextFrame(), + isA() + .having((p0) => p0.header.streamId, 'Stream update', 1) + .having((p0) => p0.windowSizeIncrement, 'Windowsize', 1)); + expect( + await nextFrame(), + isA() + .having((p0) => p0.header.streamId, 'Connection update', 0) + .having((p0) => p0.windowSizeIncrement, 'Windowsize', 1)); + expect( + await nextFrame(), + isA() + .having((p0) => p0.header.streamId, 'Connection update', 0) + .having((p0) => p0.windowSizeIncrement, 'Windowsize', 1)); + // Make sure we get a [RstStreamFrame] frame. expect( await nextFrame(), @@ -368,7 +430,7 @@ void main() { endDone.complete(); // Wait for the client finish. - expect(await nextFrame() is GoawayFrame, true); + expect(await nextFrame(), isA()); expect(await serverReader.moveNext(), false); await serverWriter.close(); } @@ -381,7 +443,12 @@ void main() { // first will cancel the stream var message = await stream.incomingMessages.first; - expect((message as DataStreamMessage).bytes, [42]); + expect( + message, + isA() + .having((p0) => p0.bytes, 'Same sent data above', [42]), + ); + cancelDone.complete(); await endDone.future; @@ -396,16 +463,16 @@ void main() { FrameWriter serverWriter, StreamIterator serverReader, Future Function() nextFrame) async { - var handshakeCompleter = Completer(); - var cancelDone = Completer(); - var endDone = Completer(); - var clientDone = Completer(); + var handshakeCompleter = Completer(); + var cancelDone = Completer(); + var endDone = Completer(); + var clientDone = Completer(); Future serverFun() async { serverWriter.writeSettingsFrame([]); - expect(await nextFrame() is SettingsFrame, true); + expect(await nextFrame(), isA()); serverWriter.writeSettingsAckFrame(); - expect(await nextFrame() is SettingsFrame, true); + expect(await nextFrame(), isA()); handshakeCompleter.complete(); @@ -420,9 +487,32 @@ void main() { serverWriter.writeRstStreamFrame(streamId, ErrorCode.STREAM_CLOSED); endDone.complete(); + // NOTE: The order of the window update frame / rst frame just + // happens to be like that ATM. + + // Await stream/connection window update frame. + + expect( + await nextFrame(), + isA() + .having((p0) => p0.header.streamId, 'Stream update', 1) + .having((p0) => p0.windowSizeIncrement, 'Windowsize', 1)); + expect( + await nextFrame(), + isA() + .having((p0) => p0.header.streamId, 'Connection update', 0) + .having((p0) => p0.windowSizeIncrement, 'Windowsize', 1)); + expect( + await nextFrame(), + isA() + .having((p0) => p0.header.streamId, 'Connection update', 0) + .having((p0) => p0.windowSizeIncrement, 'Windowsize', 1)); + await clientDone.future; - var finFrame = await nextFrame() as DataFrame; - expect(finFrame.hasEndStreamFlag, true); + expect( + await nextFrame(), + isA().having( + (p0) => p0.hasEndStreamFlag, 'Last data frame', true)); // Wait for the client finish. expect(await serverReader.moveNext(), false); @@ -436,7 +526,12 @@ void main() { // first will cancel the stream var message = await stream.incomingMessages.first; - expect((message as DataStreamMessage).bytes, [42]); + expect( + message, + isA() + .having((p0) => p0.bytes, 'Same sent data above', [42]), + ); + cancelDone.complete(); await endDone.future; @@ -455,19 +550,21 @@ void main() { FrameWriter serverWriter, StreamIterator serverReader, Future Function() nextFrame) async { - var handshakeCompleter = Completer(); + var handshakeCompleter = Completer(); Future serverFun() async { serverWriter.writeSettingsFrame([]); - expect(await nextFrame() is SettingsFrame, true); + expect(await nextFrame(), isA()); serverWriter.writeSettingsAckFrame(); - expect(await nextFrame() is SettingsFrame, true); + expect(await nextFrame(), isA()); handshakeCompleter.complete(); var headers = await nextFrame() as HeadersFrame; - var finFrame = await nextFrame() as DataFrame; - expect(finFrame.hasEndStreamFlag, true); + expect( + await nextFrame(), + isA().having( + (p0) => p0.hasEndStreamFlag, 'Last data frame', true)); var streamId = headers.header.streamId; @@ -495,8 +592,13 @@ void main() { await stream.outgoingMessages.close(); var messages = await stream.incomingMessages.toList(); expect(messages, hasLength(1)); - expect((messages[0] as HeadersStreamMessage).headers.first, - isHeader('a', 'b')); + + expect( + messages[0], + isA().having((p0) => p0.headers.first, + 'Same sent headers above', isHeader('a', 'b')), + ); + await client.finish(); } @@ -508,13 +610,13 @@ void main() { FrameWriter serverWriter, StreamIterator serverReader, Future Function() nextFrame) async { - var handshakeCompleter = Completer(); + var handshakeCompleter = Completer(); Future serverFun() async { serverWriter.writeSettingsFrame([]); - expect(await nextFrame() is SettingsFrame, true); + expect(await nextFrame(), isA()); serverWriter.writeSettingsAckFrame(); - expect(await nextFrame() is SettingsFrame, true); + expect(await nextFrame(), isA()); handshakeCompleter.complete(); @@ -559,13 +661,13 @@ void main() { FrameWriter serverWriter, StreamIterator serverReader, Future Function() nextFrame) async { - var handshakeCompleter = Completer(); + var handshakeCompleter = Completer(); Future serverFun() async { serverWriter.writeSettingsFrame([]); - expect(await nextFrame() is SettingsFrame, true); + expect(await nextFrame(), isA()); serverWriter.writeSettingsAckFrame(); - expect(await nextFrame() is SettingsFrame, true); + expect(await nextFrame(), isA()); handshakeCompleter.complete(); @@ -600,7 +702,7 @@ void main() { expectAsync1((StreamMessage msg) {}, count: 0), onError: expectAsync1((Object error) {})); sub.pause(); - await Future.delayed(const Duration(milliseconds: 40)); + await Future.delayed(const Duration(milliseconds: 40)); sub.resume(); await client.finish(); @@ -615,16 +717,16 @@ void main() { FrameWriter serverWriter, StreamIterator serverReader, Future Function() nextFrame) async { - var settingsDone = Completer(); - var headersDone = Completer(); + var settingsDone = Completer(); + var headersDone = Completer(); Future serverFun() async { var decoder = HPackDecoder(); serverWriter.writeSettingsFrame([]); - expect(await nextFrame() is SettingsFrame, true); + expect(await nextFrame(), isA()); serverWriter.writeSettingsAckFrame(); - expect(await nextFrame() is SettingsFrame, true); + expect(await nextFrame(), isA()); settingsDone.complete(); @@ -638,12 +740,16 @@ void main() { headersDone.complete(); // Make sure we got the stream reset. - var frame2 = await nextFrame() as RstStreamFrame; - expect(frame2.errorCode, ErrorCode.CANCEL); + expect( + await nextFrame(), + isA().having( + (p0) => p0.errorCode, 'Stream reset', ErrorCode.CANCEL)); // Make sure we get the graceful shutdown message. - var frame3 = await nextFrame() as GoawayFrame; - expect(frame3.errorCode, ErrorCode.NO_ERROR); + expect( + await nextFrame(), + isA().having( + (p0) => p0.errorCode, 'Stream reset', ErrorCode.NO_ERROR)); // Make sure the client ended the connection. expect(await serverReader.moveNext(), false); @@ -675,15 +781,15 @@ void main() { FrameWriter serverWriter, StreamIterator serverReader, Future Function() nextFrame) async { - var settingsDone = Completer(); + var settingsDone = Completer(); Future serverFun() async { var decoder = HPackDecoder(); serverWriter.writeSettingsFrame([]); - expect(await nextFrame() is SettingsFrame, true); + expect(await nextFrame(), isA()); serverWriter.writeSettingsAckFrame(); - expect(await nextFrame() is SettingsFrame, true); + expect(await nextFrame(), isA()); settingsDone.complete(); diff --git a/test/multiprotocol_server_test.dart b/test/multiprotocol_server_test.dart index 997f0da..736e7da 100644 --- a/test/multiprotocol_server_test.dart +++ b/test/multiprotocol_server_test.dart @@ -74,7 +74,7 @@ Future makeHttp11Request( Future handleHttp11Request(HttpRequest request, int i) async { expect(request.uri.path, '/abc$i'); - await request.drain(); + await request.drain(); request.response.write('answer$i'); await request.response.close(); } diff --git a/test/server_test.dart b/test/server_test.dart index dffc845..1af51c9 100644 --- a/test/server_test.dart +++ b/test/server_test.dart @@ -92,6 +92,8 @@ void main() { clientWriter.writeDataFrame(3, [1, 2, 3]); // Make sure the client gets a [RstStreamFrame] frame. + var frame = await nextFrame(); + expect(frame is WindowUpdateFrame, true); expect( await nextFrame(), isA() diff --git a/test/src/flowcontrol/connection_queues_test.dart b/test/src/flowcontrol/connection_queues_test.dart index a1d2755..1cde81b 100644 --- a/test/src/flowcontrol/connection_queues_test.dart +++ b/test/src/flowcontrol/connection_queues_test.dart @@ -155,7 +155,7 @@ void main() { var header = FrameHeader(0, 0, 0, STREAM_ID); queue.processIgnoredDataFrame(DataFrame(header, 0, bytes)); expect(queue.pendingMessages, 0); - verify(windowMock.gotData(bytes.length)).called(1); + verify(windowMock.dataProcessed(bytes.length)).called(1); verifyNoMoreInteractions(windowMock); }); }); diff --git a/test/src/flowcontrol/mocks.mocks.dart b/test/src/flowcontrol/mocks.mocks.dart index b15faae..0f09888 100644 --- a/test/src/flowcontrol/mocks.mocks.dart +++ b/test/src/flowcontrol/mocks.mocks.dart @@ -1,7 +1,9 @@ -// Mocks generated by Mockito 5.3.2 from annotations +// Mocks generated by Mockito 5.4.1 from annotations // in http2/test/src/flowcontrol/mocks.dart. // Do not manually edit this file. +// @dart=2.19 + // ignore_for_file: no_leading_underscores_for_library_prefixes import 'dart:async' as _i5; diff --git a/test/src/hpack/hpack_test.dart b/test/src/hpack/hpack_test.dart index 4d3bf53..aadf6cb 100644 --- a/test/src/hpack/hpack_test.dart +++ b/test/src/hpack/hpack_test.dart @@ -609,8 +609,8 @@ void main() { test('update-dynamic-table-size-too-high', () { var context = HPackContext(); // Sets dynamic table to 4096 - expect( - context.decoder.decode(TestHelper.newInteger(0x20, 5, 4096)), []); + expect(context.decoder.decode(TestHelper.newInteger(0x20, 5, 4096)), + []); }); test('dynamic table entry', () { @@ -809,7 +809,7 @@ class _HeaderMatcher extends Matcher { Description describe(Description description) => description.add('Header'); @override - bool matches(item, Map matchState) { + bool matches(Object? item, Map matchState) { return item is Header && _compareLists(item.name, header.name) && _compareLists(item.value, header.value); diff --git a/test/src/ping/ping_handler_test.dart b/test/src/ping/ping_handler_test.dart index f466f45..df02420 100644 --- a/test/src/ping/ping_handler_test.dart +++ b/test/src/ping/ping_handler_test.dart @@ -15,7 +15,7 @@ void main() { group('ping-handler', () { test('successful-ping', () async { var writer = FrameWriterMock(); - var pingHandler = PingHandler(writer); + var pingHandler = instantiateHandler(writer); var p1 = pingHandler.ping(); var p2 = pingHandler.ping(); @@ -37,7 +37,7 @@ void main() { test('successful-ack-to-remote-ping', () async { var writer = FrameWriterMock(); - var pingHandler = PingHandler(writer); + var pingHandler = instantiateHandler(writer); var header = FrameHeader(8, FrameType.PING, 0, 0); pingHandler.processPingFrame(PingFrame(header, 1)); @@ -53,7 +53,7 @@ void main() { test('ping-unknown-opaque-data', () async { var writer = FrameWriterMock(); - var pingHandler = PingHandler(writer); + var pingHandler = instantiateHandler(writer); var future = pingHandler.ping(); verify(writer.writePingFrame(1)).called(1); @@ -73,7 +73,7 @@ void main() { test('terminate-ping-handler', () async { var writer = FrameWriterMock(); - var pingHandler = PingHandler(writer); + var pingHandler = instantiateHandler(writer); pingHandler.terminate('hello world'); expect( @@ -86,14 +86,40 @@ void main() { test('ping-non-zero-stream-id', () async { var writer = FrameWriterMock(); - var pingHandler = PingHandler(writer); + var pingHandler = instantiateHandler(writer); var header = FrameHeader(8, FrameType.PING, PingFrame.FLAG_ACK, 1); expect(() => pingHandler.processPingFrame(PingFrame(header, 1)), throwsA(isProtocolException)); verifyZeroInteractions(writer); }); + + test('receiving-ping-calls-stream', () async { + final pings = []; + + final writer = FrameWriterMock(); + final pingStream = StreamController()..stream.listen(pings.add); + + PingHandler(writer, pingStream) + ..processPingFrame(PingFrame( + FrameHeader(8, FrameType.PING, 0, 0), + 1, + )) + ..processPingFrame(PingFrame( + FrameHeader(8, FrameType.PING, 0, 0), + 2, + )); + + await pingStream.close(); + + expect(pings, [1, 2]); + }); }); } +PingHandler instantiateHandler(FrameWriterMock writer) { + var controller = StreamController(); + return PingHandler(writer, controller); +} + class FrameWriterMock extends Mock implements FrameWriter {} diff --git a/test/src/streams/simple_flow_test.dart b/test/src/streams/simple_flow_test.dart index 7a29468..25226d1 100644 --- a/test/src/streams/simple_flow_test.dart +++ b/test/src/streams/simple_flow_test.dart @@ -30,7 +30,7 @@ void main() { }); } - var serverReceivedAllBytes = Completer(); + var serverReceivedAllBytes = Completer(); void Function(StreamMessage) messageTestFun(String type) { var expectHeader = true; @@ -83,7 +83,7 @@ void main() { sStream.incomingMessages .listen(messageTestFun('server'), onDone: expectAsync0(() {})); sStream.sendHeaders(expectedHeaders, endStream: true); - expect(await serverReceivedAllBytes.future, completes); + await serverReceivedAllBytes.future; })); TransportStream cStream = client.makeRequest(expectedHeaders); diff --git a/test/src/streams/simple_push_test.dart b/test/src/streams/simple_push_test.dart index e91f95b..d12dbb5 100644 --- a/test/src/streams/simple_push_test.dart +++ b/test/src/streams/simple_push_test.dart @@ -34,7 +34,7 @@ void main() { }); } - var serverReceivedAllBytes = Completer(); + var serverReceivedAllBytes = Completer(); Future readData(StreamIterator iterator) async { var all = []; @@ -66,7 +66,7 @@ void main() { unawaited(sStream.incomingMessages.drain()); sStream.sendHeaders(expectedHeaders, endStream: true); - expect(await serverReceivedAllBytes.future, completes); + await serverReceivedAllBytes.future; })); var cStream = client.makeRequest(expectedHeaders, endStream: true); @@ -84,7 +84,7 @@ void main() { var msg = await readData(iterator); expect(msg, 'pushing "hello world" :)'); })); - }, settings: ClientSettings(allowServerPushes: true)); + }, settings: const ClientSettings(allowServerPushes: true)); }); }); } diff --git a/test/src/streams/streams_test.dart b/test/src/streams/streams_test.dart index 18109fc..6bfa49c 100644 --- a/test/src/streams/streams_test.dart +++ b/test/src/streams/streams_test.dart @@ -67,7 +67,7 @@ void main() { server.incomingStreams .listen(expectAsync1((TransportStream sStream) async { var isFirst = true; - var receivedChunks = []; + var receivedChunks = >[]; sStream.incomingMessages.listen( expectAsync1((StreamMessage msg) { if (isFirst) { diff --git a/test/transport_test.dart b/test/transport_test.dart index bd5140a..5cbdb37 100644 --- a/test/transport_test.dart +++ b/test/transport_test.dart @@ -67,8 +67,8 @@ void main() { // The default is unlimited, which is why we have to wait for the server // setting to arrive on the client. // At the moment, delaying by 2 microtask cycles is enough. - await Future.value(); - await Future.value(); + await Future.value(); + await Future.value(); final streams = []; for (var i = 0; i < concurrentStreamLimit; ++i) { @@ -93,7 +93,7 @@ void main() { await Future.wait([clientFun(), serverFun()]); }, serverSettings: - ServerSettings(concurrentStreamLimit: concurrentStreamLimit)); + const ServerSettings(concurrentStreamLimit: concurrentStreamLimit)); transportTest('disabled-push', (ClientTransportConnection client, ServerTransportConnection server) async { @@ -188,7 +188,7 @@ void main() { await client.terminate(); await serverFuture; }, - clientSettings: ClientSettings( + clientSettings: const ClientSettings( concurrentStreamLimit: kDefaultStreamLimit, allowServerPushes: true)); @@ -216,7 +216,7 @@ void main() { transportTest('client-terminates-stream', (ClientTransportConnection client, ServerTransportConnection server) async { - var readyForError = Completer(); + var readyForError = Completer(); Future serverFun() async { await for (ServerTransportStream stream in server.incomingStreams) { @@ -268,7 +268,7 @@ void main() { transportTest('client-terminates-stream-after-half-close', (ClientTransportConnection client, ServerTransportConnection server) async { - var readyForError = Completer(); + var readyForError = Completer(); Future serverFun() async { await for (ServerTransportStream stream in server.incomingStreams) { @@ -304,7 +304,7 @@ void main() { transportTest('server-terminates-stream-after-half-close', (ClientTransportConnection client, ServerTransportConnection server) async { - var readyForError = Completer(); + var readyForError = Completer(); Future serverFun() async { await for (ServerTransportStream stream in server.incomingStreams) { @@ -375,19 +375,19 @@ void main() { // This extra await is needed to allow the idle handler to run before // verifying the idleCount, because the stream cleanup runs // asynchronously after the stream is closed. - await Future.value(); + await Future.value(); expect(activeCount, 1); expect(idleCount, 1); var stream = client.makeRequest([]); await stream.outgoingMessages.close(); await stream.incomingMessages.toList(); - await Future.value(); + await Future.value(); stream = client.makeRequest([]); await stream.outgoingMessages.close(); await stream.incomingMessages.toList(); - await Future.value(); + await Future.value(); await client.finish(); expect(activeCount, 3); @@ -409,7 +409,7 @@ void main() { lessThan(kChunkSize * kNumberOfMessages)); var serverSentBytes = 0; - var flowcontrolWindowFull = Completer(); + var flowcontrolWindowFull = Completer(); Future serverFun() async { await for (ServerTransportStream stream in server.incomingStreams) { @@ -437,9 +437,7 @@ void main() { } controller - ..onListen = () { - addData(); - } + ..onListen = addData ..onPause = expectAsync0(() { // Assert that we're now at the place (since the granularity // of adding is [kChunkSize], it could be that we added @@ -449,9 +447,7 @@ void main() { lessThan(expectedStreamFlowcontrolWindow)); flowcontrolWindowFull.complete(); }) - ..onResume = () { - addData(); - } + ..onResume = addData ..onCancel = () {}; await stream.outgoingMessages.addStream(controller.stream); @@ -510,6 +506,11 @@ void main() { }, clientSettings: ClientSettings(streamWindowSize: 8096), ); + transportTest('fast-sender-receiver-paused--10kb-window-size', + (ClientTransportConnection client, + ServerTransportConnection server) async { + await testWindowSize(client, server, 8096); + }, clientSettings: const ClientSettings(streamWindowSize: 8096)); }); }); }