From 8c325b9ca33d878a86d69c5048a8e6e18379663c Mon Sep 17 00:00:00 2001 From: Anikate De <40452578+Anikate-De@users.noreply.github.com> Date: Fri, 14 Jun 2024 05:25:21 +0530 Subject: [PATCH] pkgs/ok_http: Stream response bodies. (#1233) --- .../example/ok_http/AsyncInputStreamReader.kt | 63 ++++ .../example/integration_test/client_test.dart | 21 +- pkgs/ok_http/jnigen.yaml | 2 + pkgs/ok_http/lib/src/jni/bindings.dart | 349 ++++++++++++++++++ pkgs/ok_http/lib/src/ok_http_client.dart | 38 +- 5 files changed, 447 insertions(+), 26 deletions(-) create mode 100644 pkgs/ok_http/android/src/main/kotlin/com/example/ok_http/AsyncInputStreamReader.kt diff --git a/pkgs/ok_http/android/src/main/kotlin/com/example/ok_http/AsyncInputStreamReader.kt b/pkgs/ok_http/android/src/main/kotlin/com/example/ok_http/AsyncInputStreamReader.kt new file mode 100644 index 0000000000..b06b767a9d --- /dev/null +++ b/pkgs/ok_http/android/src/main/kotlin/com/example/ok_http/AsyncInputStreamReader.kt @@ -0,0 +1,63 @@ +// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +package com.example.ok_http + +import java.io.IOException +import java.io.InputStream +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.Future + + +/** + * Callback interface utilized by the [AsyncInputStreamReader]. + */ +interface DataCallback { + fun onDataRead(data: ByteArray) + fun onFinished() + fun onError(e: IOException) +} + +/** + * Provides functions to read data from an InputStream asynchronously. + */ +class AsyncInputStreamReader { + private val executorService: ExecutorService = Executors.newSingleThreadExecutor() + + /** + * Reads data from an InputStream asynchronously using an executor service. + * + * @param inputStream The InputStream to read from + * @param callback The DataCallback to call when data is read, finished, or an error occurs + * + * @return Future<*> + */ + fun readAsync(inputStream: InputStream, callback: DataCallback): Future<*> { + return executorService.submit { + try { + val buffer = ByteArray(4096) + var bytesRead: Int + while (inputStream.read(buffer).also { bytesRead = it } != -1) { + val byteArray = buffer.copyOfRange(0, bytesRead) + callback.onDataRead(byteArray) + } + + } catch (e: IOException) { + callback.onError(e) + } finally { + try { + inputStream.close() + } catch (e: IOException) { + callback.onError(e) + } + callback.onFinished() + } + } + } + + fun shutdown() { + executorService.shutdown() + } +} diff --git a/pkgs/ok_http/example/integration_test/client_test.dart b/pkgs/ok_http/example/integration_test/client_test.dart index 83998f4f38..b167a8f329 100644 --- a/pkgs/ok_http/example/integration_test/client_test.dart +++ b/pkgs/ok_http/example/integration_test/client_test.dart @@ -15,18 +15,13 @@ void main() async { Future testConformance() async { group('ok_http client', () { - testRequestBody(OkHttpClient()); - testResponseBody(OkHttpClient(), canStreamResponseBody: false); - testRequestHeaders(OkHttpClient()); - testRequestMethods(OkHttpClient(), preservesMethodCase: true); - testResponseHeaders(OkHttpClient(), supportsFoldedHeaders: false); - testResponseStatusLine(OkHttpClient()); - testCompressedResponseBody(OkHttpClient()); - testRedirect(OkHttpClient()); - testServerErrors(OkHttpClient()); - testClose(OkHttpClient.new); - testIsolate(OkHttpClient.new); - testRequestCookies(OkHttpClient(), canSendCookieHeaders: true); - testResponseCookies(OkHttpClient(), canReceiveSetCookieHeaders: true); + testAll( + OkHttpClient.new, + canStreamRequestBody: false, + preservesMethodCase: true, + supportsFoldedHeaders: false, + canSendCookieHeaders: true, + canReceiveSetCookieHeaders: true, + ); }); } diff --git a/pkgs/ok_http/jnigen.yaml b/pkgs/ok_http/jnigen.yaml index d0ebd3c547..f4b43951ad 100644 --- a/pkgs/ok_http/jnigen.yaml +++ b/pkgs/ok_http/jnigen.yaml @@ -28,6 +28,8 @@ classes: - "okhttp3.Dispatcher" - "okhttp3.Cache" - "com.example.ok_http.RedirectInterceptor" + - "com.example.ok_http.AsyncInputStreamReader" + - "com.example.ok_http.DataCallback" # Exclude the deprecated methods listed below # They cause syntax errors during the `dart format` step of JNIGen. diff --git a/pkgs/ok_http/lib/src/jni/bindings.dart b/pkgs/ok_http/lib/src/jni/bindings.dart index 71116e07e7..8b76334e8c 100644 --- a/pkgs/ok_http/lib/src/jni/bindings.dart +++ b/pkgs/ok_http/lib/src/jni/bindings.dart @@ -9618,3 +9618,352 @@ final class $RedirectInterceptorType extends jni.JObjType { other is $RedirectInterceptorType; } } + +/// from: com.example.ok_http.AsyncInputStreamReader +class AsyncInputStreamReader extends jni.JObject { + @override + late final jni.JObjType $type = type; + + AsyncInputStreamReader.fromReference( + jni.JReference reference, + ) : super.fromReference(reference); + + static final _class = + jni.JClass.forName(r"com/example/ok_http/AsyncInputStreamReader"); + + /// The type which includes information such as the signature of this class. + static const type = $AsyncInputStreamReaderType(); + static final _id_new0 = _class.constructorId( + r"()V", + ); + + static final _new0 = ProtectedJniExtensions.lookup< + ffi.NativeFunction< + jni.JniResult Function( + ffi.Pointer, + jni.JMethodIDPtr, + )>>("globalEnv_NewObject") + .asFunction< + jni.JniResult Function( + ffi.Pointer, + jni.JMethodIDPtr, + )>(); + + /// from: public void () + /// The returned object must be released after use, by calling the [release] method. + factory AsyncInputStreamReader() { + return AsyncInputStreamReader.fromReference( + _new0(_class.reference.pointer, _id_new0 as jni.JMethodIDPtr) + .reference); + } + + static final _id_readAsync = _class.instanceMethodId( + r"readAsync", + r"(Ljava/io/InputStream;Lcom/example/ok_http/DataCallback;)Ljava/util/concurrent/Future;", + ); + + static final _readAsync = ProtectedJniExtensions.lookup< + ffi.NativeFunction< + jni.JniResult Function( + ffi.Pointer, + jni.JMethodIDPtr, + ffi.VarArgs< + ( + ffi.Pointer, + ffi.Pointer + )>)>>("globalEnv_CallObjectMethod") + .asFunction< + jni.JniResult Function(ffi.Pointer, jni.JMethodIDPtr, + ffi.Pointer, ffi.Pointer)>(); + + /// from: public final java.util.concurrent.Future readAsync(java.io.InputStream inputStream, com.example.ok_http.DataCallback dataCallback) + /// The returned object must be released after use, by calling the [release] method. + jni.JObject readAsync( + jni.JObject inputStream, + DataCallback dataCallback, + ) { + return _readAsync(reference.pointer, _id_readAsync as jni.JMethodIDPtr, + inputStream.reference.pointer, dataCallback.reference.pointer) + .object(const jni.JObjectType()); + } + + static final _id_shutdown = _class.instanceMethodId( + r"shutdown", + r"()V", + ); + + static final _shutdown = ProtectedJniExtensions.lookup< + ffi.NativeFunction< + jni.JThrowablePtr Function( + ffi.Pointer, + jni.JMethodIDPtr, + )>>("globalEnv_CallVoidMethod") + .asFunction< + jni.JThrowablePtr Function( + ffi.Pointer, + jni.JMethodIDPtr, + )>(); + + /// from: public final void shutdown() + void shutdown() { + _shutdown(reference.pointer, _id_shutdown as jni.JMethodIDPtr).check(); + } +} + +final class $AsyncInputStreamReaderType + extends jni.JObjType { + const $AsyncInputStreamReaderType(); + + @override + String get signature => r"Lcom/example/ok_http/AsyncInputStreamReader;"; + + @override + AsyncInputStreamReader fromReference(jni.JReference reference) => + AsyncInputStreamReader.fromReference(reference); + + @override + jni.JObjType get superType => const jni.JObjectType(); + + @override + final superCount = 1; + + @override + int get hashCode => ($AsyncInputStreamReaderType).hashCode; + + @override + bool operator ==(Object other) { + return other.runtimeType == ($AsyncInputStreamReaderType) && + other is $AsyncInputStreamReaderType; + } +} + +/// from: com.example.ok_http.DataCallback +class DataCallback extends jni.JObject { + @override + late final jni.JObjType $type = type; + + DataCallback.fromReference( + jni.JReference reference, + ) : super.fromReference(reference); + + static final _class = jni.JClass.forName(r"com/example/ok_http/DataCallback"); + + /// The type which includes information such as the signature of this class. + static const type = $DataCallbackType(); + static final _id_onDataRead = _class.instanceMethodId( + r"onDataRead", + r"([B)V", + ); + + static final _onDataRead = ProtectedJniExtensions.lookup< + ffi.NativeFunction< + jni.JThrowablePtr Function( + ffi.Pointer, + jni.JMethodIDPtr, + ffi.VarArgs<(ffi.Pointer,)>)>>( + "globalEnv_CallVoidMethod") + .asFunction< + jni.JThrowablePtr Function(ffi.Pointer, jni.JMethodIDPtr, + ffi.Pointer)>(); + + /// from: public abstract void onDataRead(byte[] bs) + void onDataRead( + jni.JArray bs, + ) { + _onDataRead(reference.pointer, _id_onDataRead as jni.JMethodIDPtr, + bs.reference.pointer) + .check(); + } + + static final _id_onFinished = _class.instanceMethodId( + r"onFinished", + r"()V", + ); + + static final _onFinished = ProtectedJniExtensions.lookup< + ffi.NativeFunction< + jni.JThrowablePtr Function( + ffi.Pointer, + jni.JMethodIDPtr, + )>>("globalEnv_CallVoidMethod") + .asFunction< + jni.JThrowablePtr Function( + ffi.Pointer, + jni.JMethodIDPtr, + )>(); + + /// from: public abstract void onFinished() + void onFinished() { + _onFinished(reference.pointer, _id_onFinished as jni.JMethodIDPtr).check(); + } + + static final _id_onError = _class.instanceMethodId( + r"onError", + r"(Ljava/io/IOException;)V", + ); + + static final _onError = ProtectedJniExtensions.lookup< + ffi.NativeFunction< + jni.JThrowablePtr Function( + ffi.Pointer, + jni.JMethodIDPtr, + ffi.VarArgs<(ffi.Pointer,)>)>>( + "globalEnv_CallVoidMethod") + .asFunction< + jni.JThrowablePtr Function(ffi.Pointer, jni.JMethodIDPtr, + ffi.Pointer)>(); + + /// from: public abstract void onError(java.io.IOException iOException) + void onError( + jni.JObject iOException, + ) { + _onError(reference.pointer, _id_onError as jni.JMethodIDPtr, + iOException.reference.pointer) + .check(); + } + + /// Maps a specific port to the implemented interface. + static final Map _$impls = {}; + ReceivePort? _$p; + + static jni.JObjectPtr _$invoke( + int port, + jni.JObjectPtr descriptor, + jni.JObjectPtr args, + ) { + return _$invokeMethod( + port, + $MethodInvocation.fromAddresses( + 0, + descriptor.address, + args.address, + ), + ); + } + + static final ffi.Pointer< + ffi.NativeFunction< + jni.JObjectPtr Function( + ffi.Uint64, jni.JObjectPtr, jni.JObjectPtr)>> + _$invokePointer = ffi.Pointer.fromFunction(_$invoke); + + static ffi.Pointer _$invokeMethod( + int $p, + $MethodInvocation $i, + ) { + try { + final $d = $i.methodDescriptor.toDartString(releaseOriginal: true); + final $a = $i.args; + if ($d == r"onDataRead([B)V") { + _$impls[$p]!.onDataRead( + $a[0].castTo(const jni.JArrayType(jni.jbyteType()), + releaseOriginal: true), + ); + return jni.nullptr; + } + if ($d == r"onFinished()V") { + _$impls[$p]!.onFinished(); + return jni.nullptr; + } + if ($d == r"onError(Ljava/io/IOException;)V") { + _$impls[$p]!.onError( + $a[0].castTo(const jni.JObjectType(), releaseOriginal: true), + ); + return jni.nullptr; + } + } catch (e) { + return ProtectedJniExtensions.newDartException(e.toString()); + } + return jni.nullptr; + } + + factory DataCallback.implement( + $DataCallbackImpl $impl, + ) { + final $p = ReceivePort(); + final $x = DataCallback.fromReference( + ProtectedJniExtensions.newPortProxy( + r"com.example.ok_http.DataCallback", + $p, + _$invokePointer, + ), + ).._$p = $p; + final $a = $p.sendPort.nativePort; + _$impls[$a] = $impl; + $p.listen(($m) { + if ($m == null) { + _$impls.remove($p.sendPort.nativePort); + $p.close(); + return; + } + final $i = $MethodInvocation.fromMessage($m as List); + final $r = _$invokeMethod($p.sendPort.nativePort, $i); + ProtectedJniExtensions.returnResult($i.result, $r); + }); + return $x; + } +} + +abstract interface class $DataCallbackImpl { + factory $DataCallbackImpl({ + required void Function(jni.JArray bs) onDataRead, + required void Function() onFinished, + required void Function(jni.JObject iOException) onError, + }) = _$DataCallbackImpl; + + void onDataRead(jni.JArray bs); + void onFinished(); + void onError(jni.JObject iOException); +} + +class _$DataCallbackImpl implements $DataCallbackImpl { + _$DataCallbackImpl({ + required void Function(jni.JArray bs) onDataRead, + required void Function() onFinished, + required void Function(jni.JObject iOException) onError, + }) : _onDataRead = onDataRead, + _onFinished = onFinished, + _onError = onError; + + final void Function(jni.JArray bs) _onDataRead; + final void Function() _onFinished; + final void Function(jni.JObject iOException) _onError; + + void onDataRead(jni.JArray bs) { + return _onDataRead(bs); + } + + void onFinished() { + return _onFinished(); + } + + void onError(jni.JObject iOException) { + return _onError(iOException); + } +} + +final class $DataCallbackType extends jni.JObjType { + const $DataCallbackType(); + + @override + String get signature => r"Lcom/example/ok_http/DataCallback;"; + + @override + DataCallback fromReference(jni.JReference reference) => + DataCallback.fromReference(reference); + + @override + jni.JObjType get superType => const jni.JObjectType(); + + @override + final superCount = 1; + + @override + int get hashCode => ($DataCallbackType).hashCode; + + @override + bool operator ==(Object other) { + return other.runtimeType == ($DataCallbackType) && + other is $DataCallbackType; + } +} diff --git a/pkgs/ok_http/lib/src/ok_http_client.dart b/pkgs/ok_http/lib/src/ok_http_client.dart index 3479a236e5..3ab3799d62 100644 --- a/pkgs/ok_http/lib/src/ok_http_client.dart +++ b/pkgs/ok_http/lib/src/ok_http_client.dart @@ -133,6 +133,9 @@ class OkHttpClient extends BaseClient { .newCall(reqBuilder.build()) .enqueue(bindings.Callback.implement(bindings.$CallbackImpl( onResponse: (bindings.Call call, bindings.Response response) { + var reader = bindings.AsyncInputStreamReader(); + var respBodyStreamController = StreamController>(); + var responseHeaders = {}; response.headers().toMultimap().forEach((key, value) { @@ -153,21 +156,30 @@ class OkHttpClient extends BaseClient { } } - // Exceptions while reading the response body such as - // `java.net.ProtocolException` & `java.net.SocketTimeoutException` - // crash the app if un-handled. - var responseBody = Uint8List.fromList([]); - try { - // Blocking call to read the response body. - responseBody = response.body().bytes().toUint8List(); - } catch (e) { - responseCompleter - .completeError(ClientException(e.toString(), request.url)); - return; - } + var responseBodyByteStream = response.body().byteStream(); + reader.readAsync( + responseBodyByteStream, + bindings.DataCallback.implement( + bindings.$DataCallbackImpl( + onDataRead: (JArray data) { + respBodyStreamController.sink.add(data.toUint8List()); + }, + onFinished: () async { + reader.shutdown(); + await respBodyStreamController.sink.close(); + }, + onError: (iOException) async { + respBodyStreamController.sink.addError( + ClientException(iOException.toString(), request.url)); + + reader.shutdown(); + await respBodyStreamController.sink.close(); + }, + ), + )); responseCompleter.complete(StreamedResponse( - Stream.value(responseBody), + respBodyStreamController.stream, response.code(), reasonPhrase: response.message().toDartString(releaseOriginal: true),