From f1b25be51d8e5ed1c534c45997fd62fe3654c747 Mon Sep 17 00:00:00 2001 From: Moritz Date: Tue, 6 Jun 2023 14:24:06 +0200 Subject: [PATCH] Increase Window Update size --- lib/src/flowcontrol/window.dart | 9 ++++- lib/src/flowcontrol/window_handler.dart | 10 +++-- test/client_test.dart | 39 ------------------- test/src/flowcontrol/window_handler_test.dart | 17 ++++---- test/transport_test.dart | 15 ++++--- 5 files changed, 33 insertions(+), 57 deletions(-) diff --git a/lib/src/flowcontrol/window.dart b/lib/src/flowcontrol/window.dart index 51b9016..8530ef2 100644 --- a/lib/src/flowcontrol/window.dart +++ b/lib/src/flowcontrol/window.dart @@ -11,13 +11,20 @@ class Window { /// streams is 65535). /// /// NOTE: This value can potentially become negative. + final int _initialSize; int _size; - Window({int initialSize = (1 << 16) - 1}) : _size = initialSize; + Window({int initialSize = (1 << 16) - 1}) + : _size = initialSize, + _initialSize = initialSize; /// The current size of the flow control window. int get size => _size; + bool get isTooSmall => size < _initialSize / 2; + + int get updateSize => _initialSize ~/ 2; + void modify(int difference) { _size += difference; } diff --git a/lib/src/flowcontrol/window_handler.dart b/lib/src/flowcontrol/window_handler.dart index 6f9a5a6..60d1f38 100644 --- a/lib/src/flowcontrol/window_handler.dart +++ b/lib/src/flowcontrol/window_handler.dart @@ -152,11 +152,15 @@ class IncomingWindowHandler { // - either stop sending window update frames // - or decreasing the window size void dataProcessed(int numberOfBytes) { - _localWindow.modify(numberOfBytes); - // TODO: This can be optimized by delaying the window update to // send one update with a bigger difference than multiple small update // frames. - _frameWriter.writeWindowUpdate(numberOfBytes, streamId: _streamId); + if (_localWindow.isTooSmall) { + _frameWriter.writeWindowUpdate( + _localWindow.updateSize, + streamId: _streamId, + ); + } + _localWindow.modify(_localWindow.updateSize); } } diff --git a/test/client_test.dart b/test/client_test.dart index d6a4df3..3d0424c 100644 --- a/test/client_test.dart +++ b/test/client_test.dart @@ -297,17 +297,6 @@ void main() { // Write more data on the closed stream. serverWriter.writeDataFrame(streamId, [42]); - // NOTE: The order of the window update frame / rst frame just - // happens to be like that ATM. - - // Await stream/connection window update frame. - var win = await nextFrame() as WindowUpdateFrame; - expect(win.header.streamId, 1); - expect(win.windowSizeIncrement, 1); - win = await nextFrame() as WindowUpdateFrame; - expect(win.header.streamId, 0); - expect(win.windowSizeIncrement, 1); - // Make sure we get a [RstStreamFrame] frame. expect( await nextFrame(), @@ -366,20 +355,6 @@ void main() { await cancelDone.future; serverWriter.writeDataFrame(streamId, [43]); - // NOTE: The order of the window update frame / rst frame just - // happens to be like that ATM. - - // Await stream/connection window update frame. - var win = await nextFrame() as WindowUpdateFrame; - expect(win.header.streamId, 1); - expect(win.windowSizeIncrement, 1); - win = await nextFrame() as WindowUpdateFrame; - expect(win.header.streamId, 0); - expect(win.windowSizeIncrement, 1); - win = await nextFrame() as WindowUpdateFrame; - expect(win.header.streamId, 0); - expect(win.windowSizeIncrement, 1); - // Make sure we get a [RstStreamFrame] frame. expect( await nextFrame(), @@ -445,20 +420,6 @@ void main() { serverWriter.writeRstStreamFrame(streamId, ErrorCode.STREAM_CLOSED); endDone.complete(); - // NOTE: The order of the window update frame / rst frame just - // happens to be like that ATM. - - // Await stream/connection window update frame. - var win = await nextFrame() as WindowUpdateFrame; - expect(win.header.streamId, 1); - expect(win.windowSizeIncrement, 1); - win = await nextFrame() as WindowUpdateFrame; - expect(win.header.streamId, 0); - expect(win.windowSizeIncrement, 1); - win = await nextFrame() as WindowUpdateFrame; - expect(win.header.streamId, 0); - expect(win.windowSizeIncrement, 1); - await clientDone.future; var finFrame = await nextFrame() as DataFrame; expect(finFrame.hasEndStreamFlag, true); diff --git a/test/src/flowcontrol/window_handler_test.dart b/test/src/flowcontrol/window_handler_test.dart index 017dadd..09d037f 100644 --- a/test/src/flowcontrol/window_handler_test.dart +++ b/test/src/flowcontrol/window_handler_test.dart @@ -102,7 +102,7 @@ void main() { const STREAM_ID = 99; var fw = FrameWriterMock(); - var window = Window(); + var window = Window(initialSize: 150); var initialSize = window.size; var handler = IncomingWindowHandler.stream(fw, window, STREAM_ID); @@ -112,17 +112,18 @@ void main() { // If the remote end sends us now 100 bytes, it reduces the local // incoming window by 100 bytes. Once we handled these bytes, it, // will send a [WindowUpdateFrame] to the remote peer to ACK it. - handler.gotData(100); - expect(handler.localWindowSize, initialSize - 100); - expect(window.size, initialSize - 100); + var numberOfBytes = 100; + handler.gotData(numberOfBytes); + expect(handler.localWindowSize, initialSize - numberOfBytes); + expect(window.size, initialSize - numberOfBytes); // The data might sit in a queue. Once the user drains enough data of // the queue, we will start ACKing the data and the window becomes // positive again. - handler.dataProcessed(100); - expect(handler.localWindowSize, initialSize); - expect(window.size, initialSize); - verify(fw.writeWindowUpdate(100, streamId: STREAM_ID)).called(1); + handler.dataProcessed(numberOfBytes); + expect(handler.localWindowSize, 125); + expect(window.size, 125); + verify(fw.writeWindowUpdate(75, streamId: STREAM_ID)).called(1); verifyNoMoreInteractions(fw); }); }); diff --git a/test/transport_test.dart b/test/transport_test.dart index e227ed2..bd5140a 100644 --- a/test/transport_test.dart +++ b/test/transport_test.dart @@ -400,7 +400,6 @@ void main() { group('flow-control', () { const kChunkSize = 1024; const kNumberOfMessages = 1000; - final headers = [Header.ascii('a', 'b')]; Future testWindowSize( ClientTransportConnection client, @@ -463,6 +462,7 @@ void main() { } Future clientFun() async { + final headers = [Header.ascii('a', 'b')]; var stream = client.makeRequest(headers, endStream: true); var gotHeadersFrame = false; @@ -502,11 +502,14 @@ void main() { await testWindowSize(client, server, Window().size); }); - transportTest('fast-sender-receiver-paused--10kb-window-size', - (ClientTransportConnection client, - ServerTransportConnection server) async { - await testWindowSize(client, server, 8096); - }, clientSettings: ClientSettings(streamWindowSize: 8096)); + transportTest( + 'fast-sender-receiver-paused--10kb-window-size', + (ClientTransportConnection client, + ServerTransportConnection server) async { + await testWindowSize(client, server, 8096); + }, + clientSettings: ClientSettings(streamWindowSize: 8096), + ); }); }); }