Skip to content
This repository has been archived by the owner on Oct 22, 2024. It is now read-only.

Only send updates on frames and pings being received when there are listeners #137

Merged
merged 7 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 2.3.0

- Only send updates on frames and pings being received when there are listeners, as to not fill up memory.

## 2.2.0

- Transform headers to lowercase.
Expand Down
12 changes: 7 additions & 5 deletions lib/src/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ abstract class Connection {

final StreamController<int> _pingReceived = StreamController<int>();

final StreamController<void> _receivedFrame = StreamController<void>();
final StreamController<void> _frameReceived = StreamController<void>();

/// Future which completes when the first SETTINGS frame is received from
/// the peer.
Expand Down Expand Up @@ -183,7 +183,7 @@ abstract class Connection {
// Setup handlers.
_settingsHandler = SettingsHandler(_hpackContext.encoder, _frameWriter,
acknowledgedSettings, peerSettings);
_pingHandler = PingHandler(_frameWriter, _pingReceived.sink);
_pingHandler = PingHandler(_frameWriter, _pingReceived);

var settings = _decodeSettings(settingsObject);

Expand Down Expand Up @@ -344,7 +344,9 @@ abstract class Connection {
frame.decodedHeaders =
_hpackContext.decoder.decode(frame.headerBlockFragment);
}
_receivedFrame.add(null);
if (_frameReceived.hasListener) {
_frameReceived.add(null);
}

// Handle the frame as either a connection or a stream frame.
if (frame.header.streamId == 0) {
Expand Down Expand Up @@ -483,7 +485,7 @@ class ClientConnection extends Connection implements ClientTransportConnection {
Stream<int> get onPingReceived => _pingReceived.stream;

@override
Stream<void> get onFrameReceived => _receivedFrame.stream;
Stream<void> get onFrameReceived => _frameReceived.stream;
}

class ServerConnection extends Connection implements ServerTransportConnection {
Expand All @@ -505,5 +507,5 @@ class ServerConnection extends Connection implements ServerTransportConnection {
Stream<int> get onPingReceived => _pingReceived.stream;

@override
Stream<void> get onFrameReceived => _receivedFrame.stream;
Stream<void> get onFrameReceived => _frameReceived.stream;
}
16 changes: 11 additions & 5 deletions lib/src/ping/ping_handler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@ class PingHandler extends Object with TerminatableMixin {
final FrameWriter _frameWriter;
final Map<int, Completer> _remainingPings = {};
final Sink<int>? pingReceived;
final bool Function() isListeningToPings;
int _nextId = 1;

PingHandler(this._frameWriter, [this.pingReceived]);
PingHandler(this._frameWriter, StreamController<int> pingStream)
: pingReceived = pingStream.sink,
isListeningToPings = (() => pingStream.hasListener);

@override
void onTerminated(Object? error) {
var values = _remainingPings.values.toList();
final remainingPings = _remainingPings.values.toList();
_remainingPings.clear();
for (var value in values) {
value.completeError(error ?? 'Unspecified error');
for (final ping in remainingPings) {
ping.completeError(
error ?? 'Remaining ping completed with unspecified error');
}
}

Expand All @@ -37,7 +41,9 @@ class PingHandler extends Object with TerminatableMixin {
}

if (!frame.hasAckFlag) {
pingReceived?.add(frame.opaqueData);
if (isListeningToPings()) {
pingReceived?.add(frame.opaqueData);
}
_frameWriter.writePingFrame(frame.opaqueData, ack: true);
} else {
var c = _remainingPings.remove(frame.opaqueData);
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: http2
version: 2.2.0
version: 2.3.0
description: A HTTP/2 implementation in Dart.
repository: https://github.com/dart-lang/http2

Expand Down
41 changes: 28 additions & 13 deletions test/src/ping/ping_handler_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ void main() {
group('ping-handler', () {
test('successful-ping', () async {
var writer = FrameWriterMock();
var pingHandler = PingHandler(writer);
var pingHandler = instantiateHandler(writer);

var p1 = pingHandler.ping();
var p2 = pingHandler.ping();
Expand All @@ -37,7 +37,7 @@ void main() {

test('successful-ack-to-remote-ping', () async {
var writer = FrameWriterMock();
var pingHandler = PingHandler(writer);
var pingHandler = instantiateHandler(writer);

var header = FrameHeader(8, FrameType.PING, 0, 0);
pingHandler.processPingFrame(PingFrame(header, 1));
Expand All @@ -53,7 +53,7 @@ void main() {

test('ping-unknown-opaque-data', () async {
var writer = FrameWriterMock();
var pingHandler = PingHandler(writer);
var pingHandler = instantiateHandler(writer);

var future = pingHandler.ping();
verify(writer.writePingFrame(1)).called(1);
Expand All @@ -73,7 +73,7 @@ void main() {

test('terminate-ping-handler', () async {
var writer = FrameWriterMock();
var pingHandler = PingHandler(writer);
var pingHandler = instantiateHandler(writer);

pingHandler.terminate('hello world');
expect(
Expand All @@ -86,7 +86,7 @@ void main() {

test('ping-non-zero-stream-id', () async {
var writer = FrameWriterMock();
var pingHandler = PingHandler(writer);
var pingHandler = instantiateHandler(writer);

var header = FrameHeader(8, FrameType.PING, PingFrame.FLAG_ACK, 1);
expect(() => pingHandler.processPingFrame(PingFrame(header, 1)),
Expand All @@ -95,17 +95,32 @@ void main() {
});

test('receiving-ping-calls-stream', () async {
var writer = FrameWriterMock();
var streamController = StreamController<int>();
var pingHandler = PingHandler(writer, streamController.sink);
final pings = <int>[];

var header = FrameHeader(8, FrameType.PING, 0, 0);
pingHandler.processPingFrame(PingFrame(header, 1));
var header2 = FrameHeader(8, FrameType.PING, 0, 0);
pingHandler.processPingFrame(PingFrame(header2, 2));
await expectLater(streamController.stream, emitsInOrder([1, 2]));
final writer = FrameWriterMock();
final pingStream = StreamController<int>()
..stream.listen((event) => pings.add(event));

PingHandler(writer, pingStream)
..processPingFrame(PingFrame(
FrameHeader(8, FrameType.PING, 0, 0),
1,
))
..processPingFrame(PingFrame(
FrameHeader(8, FrameType.PING, 0, 0),
2,
));

await pingStream.close();

expect(pings, [1, 2]);
});
});
}

PingHandler instantiateHandler(FrameWriterMock writer) {
StreamController<int> controller = StreamController();
return PingHandler(writer, controller);
}

class FrameWriterMock extends Mock implements FrameWriter {}