From fa043d4e50decc90e6c66873a96ef999621631b7 Mon Sep 17 00:00:00 2001 From: Martin Kamleithner Date: Sun, 16 Jun 2024 18:16:28 +0100 Subject: [PATCH] backport improved byte consolidation to SyncTransformer and BackgroundTranformer (#2248) Follow up to #2239: The more efficient Stream -> Uint8List conversion can be backported to `SyncTransformer` & the extending `BackgroundTransformer` --- .../src/transformers/fused_transformer.dart | 22 +++++----------- .../src/transformers/sync_transformer.dart | 5 ++-- .../transformers/util/consolidate_bytes.dart | 12 +++++++++ dio/test/transformer_test.dart | 25 +++++++++++++++++++ 4 files changed, 45 insertions(+), 19 deletions(-) create mode 100644 dio/lib/src/transformers/util/consolidate_bytes.dart diff --git a/dio/lib/src/transformers/fused_transformer.dart b/dio/lib/src/transformers/fused_transformer.dart index b3ea6d7f0..1b77cf2df 100644 --- a/dio/lib/src/transformers/fused_transformer.dart +++ b/dio/lib/src/transformers/fused_transformer.dart @@ -6,6 +6,7 @@ import '../compute/compute.dart'; import '../headers.dart'; import '../options.dart'; import '../transformer.dart'; +import 'util/consolidate_bytes.dart'; /// A [Transformer] that has a fast path for decoding UTF8-encoded JSON. /// If the response is utf8-encoded JSON and no custom decoder is specified in the [RequestOptions], this transformer @@ -49,7 +50,7 @@ class FusedTransformer extends Transformer { // Return the finalized bytes if the response type is bytes. if (responseType == ResponseType.bytes) { - return _consolidateStream(responseBody.stream); + return consolidateBytes(responseBody.stream); } final isJsonContent = Transformer.isJsonMimeType( @@ -63,7 +64,7 @@ class FusedTransformer extends Transformer { if (isJsonContent && customResponseDecoder == null) { return _fastUtf8JsonDecode(responseBody); } - final responseBytes = await _consolidateStream(responseBody.stream); + final responseBytes = await consolidateBytes(responseBody.stream); // A custom response decoder overrides the default behavior final String? decodedResponse; @@ -119,7 +120,7 @@ class FusedTransformer extends Transformer { // and count the bytes to determine if we should use an isolate // otherwise we use the content length header if (!hasContentLengthHeader) { - responseBytes = await _consolidateStream(responseBody.stream); + responseBytes = await consolidateBytes(responseBody.stream); contentLength = responseBytes.length; } else { contentLength = int.parse(contentLengthHeader.first); @@ -136,7 +137,7 @@ class FusedTransformer extends Transformer { // we can't send the stream to the isolate, so we need to decode the response bytes first return compute( _decodeUtf8ToJson, - responseBytes ?? await _consolidateStream(responseBody.stream), + responseBytes ?? await consolidateBytes(responseBody.stream), ); } else { if (!hasContentLengthHeader || contentLength == 0) { @@ -145,7 +146,7 @@ class FusedTransformer extends Transformer { // but the body is empty, null is returned. // _utf8JsonDecoder.bind(responseBody.stream) would throw if the body is empty. // So we need to check if the body is empty and return null in that case - responseBytes ??= await _consolidateStream(responseBody.stream); + responseBytes ??= await consolidateBytes(responseBody.stream); if (responseBytes.isEmpty) { return null; } @@ -172,14 +173,3 @@ class FusedTransformer extends Transformer { return _utf8JsonDecoder.convert(data); } } - -/// Consolidates a stream of [Uint8List] into a single [Uint8List] -Future _consolidateStream(Stream stream) async { - final builder = BytesBuilder(copy: false); - - await for (final chunk in stream) { - builder.add(chunk); - } - - return builder.takeBytes(); -} diff --git a/dio/lib/src/transformers/sync_transformer.dart b/dio/lib/src/transformers/sync_transformer.dart index 12ce3585e..fa5dba578 100644 --- a/dio/lib/src/transformers/sync_transformer.dart +++ b/dio/lib/src/transformers/sync_transformer.dart @@ -1,11 +1,11 @@ import 'dart:async'; import 'dart:convert'; -import 'dart:typed_data'; import '../adapter.dart'; import '../headers.dart'; import '../options.dart'; import '../transformer.dart'; +import 'util/consolidate_bytes.dart'; @Deprecated('Use BackgroundTransformer instead') typedef DefaultTransformer = SyncTransformer; @@ -38,8 +38,7 @@ class SyncTransformer extends Transformer { return responseBody; } - final chunks = await responseBody.stream.toList(); - final responseBytes = Uint8List.fromList(chunks.expand((c) => c).toList()); + final responseBytes = await consolidateBytes(responseBody.stream); // Return the finalized bytes if the response type is bytes. if (responseType == ResponseType.bytes) { diff --git a/dio/lib/src/transformers/util/consolidate_bytes.dart b/dio/lib/src/transformers/util/consolidate_bytes.dart new file mode 100644 index 000000000..94a757ef8 --- /dev/null +++ b/dio/lib/src/transformers/util/consolidate_bytes.dart @@ -0,0 +1,12 @@ +import 'dart:typed_data'; + +/// Consolidates a stream of [Uint8List] into a single [Uint8List] +Future consolidateBytes(Stream stream) async { + final builder = BytesBuilder(copy: false); + + await for (final chunk in stream) { + builder.add(chunk); + } + + return builder.takeBytes(); +} diff --git a/dio/test/transformer_test.dart b/dio/test/transformer_test.dart index 5c8aa748c..842e3ea38 100644 --- a/dio/test/transformer_test.dart +++ b/dio/test/transformer_test.dart @@ -2,6 +2,7 @@ import 'dart:convert'; import 'dart:typed_data'; import 'package:dio/dio.dart'; +import 'package:dio/src/transformers/util/consolidate_bytes.dart'; import 'package:test/test.dart'; void main() { @@ -254,4 +255,28 @@ void main() { expect(request, '{"foo":"bar"}'); }); }); + + group('consolidate bytes', () { + test('consolidates bytes from a stream', () async { + final stream = Stream.fromIterable([ + Uint8List.fromList([1, 2, 3]), + Uint8List.fromList([4, 5, 6]), + Uint8List.fromList([7, 8, 9]), + ]); + final bytes = await consolidateBytes(stream); + expect(bytes, Uint8List.fromList([1, 2, 3, 4, 5, 6, 7, 8, 9])); + }); + + test('handles empty stream', () async { + const stream = Stream.empty(); + final bytes = await consolidateBytes(stream); + expect(bytes, Uint8List(0)); + }); + + test('handles empty lists', () async { + final stream = Stream.value(Uint8List(0)); + final bytes = await consolidateBytes(stream); + expect(bytes, Uint8List(0)); + }); + }); }