Skip to content

Commit

Permalink
backport improved byte consolidation to SyncTransformer and Backgroun…
Browse files Browse the repository at this point in the history
…dTranformer (#2248)

Follow up to #2239:

The more efficient Stream<UintList> -> Uint8List conversion can be
backported to `SyncTransformer` & the extending `BackgroundTransformer`
  • Loading branch information
knaeckeKami authored Jun 16, 2024
1 parent 4d310a2 commit fa043d4
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 19 deletions.
22 changes: 6 additions & 16 deletions dio/lib/src/transformers/fused_transformer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import '../compute/compute.dart';
import '../headers.dart';
import '../options.dart';
import '../transformer.dart';
import 'util/consolidate_bytes.dart';

/// A [Transformer] that has a fast path for decoding UTF8-encoded JSON.
/// If the response is utf8-encoded JSON and no custom decoder is specified in the [RequestOptions], this transformer
Expand Down Expand Up @@ -49,7 +50,7 @@ class FusedTransformer extends Transformer {

// Return the finalized bytes if the response type is bytes.
if (responseType == ResponseType.bytes) {
return _consolidateStream(responseBody.stream);
return consolidateBytes(responseBody.stream);
}

final isJsonContent = Transformer.isJsonMimeType(
Expand All @@ -63,7 +64,7 @@ class FusedTransformer extends Transformer {
if (isJsonContent && customResponseDecoder == null) {
return _fastUtf8JsonDecode(responseBody);
}
final responseBytes = await _consolidateStream(responseBody.stream);
final responseBytes = await consolidateBytes(responseBody.stream);

// A custom response decoder overrides the default behavior
final String? decodedResponse;
Expand Down Expand Up @@ -119,7 +120,7 @@ class FusedTransformer extends Transformer {
// and count the bytes to determine if we should use an isolate
// otherwise we use the content length header
if (!hasContentLengthHeader) {
responseBytes = await _consolidateStream(responseBody.stream);
responseBytes = await consolidateBytes(responseBody.stream);
contentLength = responseBytes.length;
} else {
contentLength = int.parse(contentLengthHeader.first);
Expand All @@ -136,7 +137,7 @@ class FusedTransformer extends Transformer {
// we can't send the stream to the isolate, so we need to decode the response bytes first
return compute(
_decodeUtf8ToJson,
responseBytes ?? await _consolidateStream(responseBody.stream),
responseBytes ?? await consolidateBytes(responseBody.stream),
);
} else {
if (!hasContentLengthHeader || contentLength == 0) {
Expand All @@ -145,7 +146,7 @@ class FusedTransformer extends Transformer {
// but the body is empty, null is returned.
// _utf8JsonDecoder.bind(responseBody.stream) would throw if the body is empty.
// So we need to check if the body is empty and return null in that case
responseBytes ??= await _consolidateStream(responseBody.stream);
responseBytes ??= await consolidateBytes(responseBody.stream);
if (responseBytes.isEmpty) {
return null;
}
Expand All @@ -172,14 +173,3 @@ class FusedTransformer extends Transformer {
return _utf8JsonDecoder.convert(data);
}
}

/// Consolidates a stream of [Uint8List] into a single [Uint8List]
Future<Uint8List> _consolidateStream(Stream<Uint8List> stream) async {
final builder = BytesBuilder(copy: false);

await for (final chunk in stream) {
builder.add(chunk);
}

return builder.takeBytes();
}
5 changes: 2 additions & 3 deletions dio/lib/src/transformers/sync_transformer.dart
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import 'dart:async';
import 'dart:convert';
import 'dart:typed_data';

import '../adapter.dart';
import '../headers.dart';
import '../options.dart';
import '../transformer.dart';
import 'util/consolidate_bytes.dart';

@Deprecated('Use BackgroundTransformer instead')
typedef DefaultTransformer = SyncTransformer;
Expand Down Expand Up @@ -38,8 +38,7 @@ class SyncTransformer extends Transformer {
return responseBody;
}

final chunks = await responseBody.stream.toList();
final responseBytes = Uint8List.fromList(chunks.expand((c) => c).toList());
final responseBytes = await consolidateBytes(responseBody.stream);

// Return the finalized bytes if the response type is bytes.
if (responseType == ResponseType.bytes) {
Expand Down
12 changes: 12 additions & 0 deletions dio/lib/src/transformers/util/consolidate_bytes.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import 'dart:typed_data';

/// Consolidates a stream of [Uint8List] into a single [Uint8List]
Future<Uint8List> consolidateBytes(Stream<Uint8List> stream) async {
final builder = BytesBuilder(copy: false);

await for (final chunk in stream) {
builder.add(chunk);
}

return builder.takeBytes();
}
25 changes: 25 additions & 0 deletions dio/test/transformer_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import 'dart:convert';
import 'dart:typed_data';

import 'package:dio/dio.dart';
import 'package:dio/src/transformers/util/consolidate_bytes.dart';
import 'package:test/test.dart';

void main() {
Expand Down Expand Up @@ -254,4 +255,28 @@ void main() {
expect(request, '{"foo":"bar"}');
});
});

group('consolidate bytes', () {
test('consolidates bytes from a stream', () async {
final stream = Stream.fromIterable([
Uint8List.fromList([1, 2, 3]),
Uint8List.fromList([4, 5, 6]),
Uint8List.fromList([7, 8, 9]),
]);
final bytes = await consolidateBytes(stream);
expect(bytes, Uint8List.fromList([1, 2, 3, 4, 5, 6, 7, 8, 9]));
});

test('handles empty stream', () async {
const stream = Stream<Uint8List>.empty();
final bytes = await consolidateBytes(stream);
expect(bytes, Uint8List(0));
});

test('handles empty lists', () async {
final stream = Stream.value(Uint8List(0));
final bytes = await consolidateBytes(stream);
expect(bytes, Uint8List(0));
});
});
}

0 comments on commit fa043d4

Please sign in to comment.