diff --git a/pkgs/cronet_http/cronet_http.iml b/pkgs/cronet_http/cronet_http.iml deleted file mode 100644 index 39cce21274..0000000000 --- a/pkgs/cronet_http/cronet_http.iml +++ /dev/null @@ -1,19 +0,0 @@ - - - - - - - - - - - - - - - - - - - diff --git a/pkgs/cupertino_http/cupertino_http.iml b/pkgs/cupertino_http/cupertino_http.iml deleted file mode 100644 index 39cce21274..0000000000 --- a/pkgs/cupertino_http/cupertino_http.iml +++ /dev/null @@ -1,19 +0,0 @@ - - - - - - - - - - - - - - - - - - - diff --git a/pkgs/http/CHANGELOG.md b/pkgs/http/CHANGELOG.md index 6d39b104df..0729b259d8 100644 --- a/pkgs/http/CHANGELOG.md +++ b/pkgs/http/CHANGELOG.md @@ -1,3 +1,8 @@ +## 1.2.0 + +* Add a `RequestController` class which can be used to manage the lifecycle of an +HTTP request (e.g., for timeouts and request cancellation). + ## 1.1.1 * `BrowserClient` throws `ClientException` when the `'Content-Length'` header diff --git a/pkgs/http/lib/http.dart b/pkgs/http/lib/http.dart index 62004240c7..22f6d1791e 100644 --- a/pkgs/http/lib/http.dart +++ b/pkgs/http/lib/http.dart @@ -23,6 +23,7 @@ export 'src/exception.dart'; export 'src/multipart_file.dart'; export 'src/multipart_request.dart'; export 'src/request.dart'; +export 'src/request_controller.dart'; export 'src/response.dart'; export 'src/streamed_request.dart'; export 'src/streamed_response.dart'; diff --git a/pkgs/http/lib/retry.dart b/pkgs/http/lib/retry.dart index dedba9a9e7..3c07bcc695 100644 --- a/pkgs/http/lib/retry.dart +++ b/pkgs/http/lib/retry.dart @@ -15,6 +15,9 @@ import 'http.dart'; /// resending it. This can cause a lot of memory usage when sending a large /// [StreamedRequest]. final class RetryClient extends BaseClient { + @override + bool get supportsController => _inner.supportsController; + /// The wrapped client. final Client _inner; @@ -149,7 +152,7 @@ final class RetryClient extends BaseClient { } @override - void close() => _inner.close(); + void close({bool force = true}) => _inner.close(force: force); } bool _defaultWhen(BaseResponse response) => response.statusCode == 503; diff --git a/pkgs/http/lib/src/active_request_tracker.dart b/pkgs/http/lib/src/active_request_tracker.dart new file mode 100644 index 0000000000..16cf4f2888 --- /dev/null +++ b/pkgs/http/lib/src/active_request_tracker.dart @@ -0,0 +1,147 @@ +import 'dart:async'; + +import 'package:meta/meta.dart'; + +import 'base_request.dart'; +import 'exception.dart'; +import 'request_controller.dart'; + +/// Used internally to track a request's lifecycle, if a tracker exists for the +/// request (as specified in the [tracker] parameter). Otherwise returns the +/// original [future]. +/// +/// Used for more concise code in client implementations that track the +/// lifecycle. +/// +/// See [RequestController] for the public API. +@internal +Future maybeTrack( + final Future future, { + final ActiveRequestTracker? tracker, + final RequestLifecycleState? state, + void Function(Exception?)? onCancel, +}) { + if (tracker != null) { + return tracker.trackRequestState( + future, + state: state, + onCancel: onCancel, + ); + } else { + return future; + } +} + +/// Used internally to track a request's lifecycle. +/// See [RequestController] for the public API. +@internal +final class ActiveRequestTracker { + final BaseRequest request; + + /// Whether the [ActiveRequestTracker] is tracking a streaming request. + /// The response timeout can only be handled internally for non-streaming + /// requests. + /// This signals to any clients that want to buffer the response that they + /// should track the response timeout themselves. + final bool isStreaming; + + final List> _pendingRequestActions = []; + + ActiveRequestTracker( + this.request, { + required this.isStreaming, + Duration? timeout, + }) { + // If an overall timeout is specified, apply it to the completer for the + // request and cancel the request if it times out. + if (timeout != null) { + _inProgressCompleter.future.timeout(timeout, onTimeout: () { + _cancelWith(TimeoutException(null, timeout)); + }); + } + } + + RequestController get controller => request.controller!; + + final Completer _inProgressCompleter = Completer(); + + /// Whether the request is still in progress. + bool get inProgress => !_inProgressCompleter.isCompleted; + + Future trackRequestState( + final Future future, { + final RequestLifecycleState? state, + void Function(Exception?)? onCancel, + }) { + // If the request is not being processed, simply ignore any tracking. + if (!inProgress) { + return _inProgressCompleter.future.then((_) => future); + } + + // Create a completer to track the request (and allow it to be cancelled). + final pendingRequestAction = Completer(); + _pendingRequestActions.add(pendingRequestAction); + + // Return a future that tracks both; completing or error-ing with the + // result of the first one to complete. This means if + // [pendingRequestAction] is cancelled first, [future] will be discarded. + // If [future] completes first, [pendingRequestAction] will be discarded. + var cancellable = Future.any([pendingRequestAction.future, future]); + + // If a timeout is specified for this state, apply it to the cancellable + // future. + if (state != null && controller.hasTimeoutForLifecycleState(state)) { + cancellable = + cancellable.timeout(controller.timeoutForLifecycleState(state)!); + } + + cancellable + // If the cancellable future succeeds, and the state was the receiving + // state, mark the request as no longer in progress. + ..then((value) { + if (state == RequestLifecycleState.receiving) { + _inProgressCompleter.complete(); + } + + return value; + }) + // Handle timeouts by simply calling [onCancel] and rethrowing the + // timeout exception. + ..onError( + (error, stackTrace) { + onCancel?.call(error); + throw error; + }, + ) + // Similarly, handle cancellations by calling [onCancel] and rethrowing + // the cancellation exception. + ..onError( + (error, stackTrace) { + onCancel?.call(error); + throw error; + }, + ) + // When the cancellable future completes, remove the pending request from + // the list of pending requests. + ..whenComplete( + () => _pendingRequestActions.remove(pendingRequestAction), + ); + + return cancellable; + } + + /// Cancels the request by expiring all pending request actions. + /// + /// Does nothing if the request is not in progress. + void cancel([final String message = 'Request cancelled']) => + _cancelWith(CancelledException(message)); + + void _cancelWith(Exception exception) { + if (!inProgress) return; + _inProgressCompleter.completeError(exception); + + for (final pendingAction in _pendingRequestActions) { + pendingAction.completeError(exception); + } + } +} diff --git a/pkgs/http/lib/src/base_client.dart b/pkgs/http/lib/src/base_client.dart index 48a7f92fe9..d0ef298abd 100644 --- a/pkgs/http/lib/src/base_client.dart +++ b/pkgs/http/lib/src/base_client.dart @@ -10,6 +10,7 @@ import 'byte_stream.dart'; import 'client.dart'; import 'exception.dart'; import 'request.dart'; +import 'request_controller.dart'; import 'response.dart'; import 'streamed_response.dart'; @@ -19,43 +20,66 @@ import 'streamed_response.dart'; /// maybe [close], and then they get various convenience methods for free. abstract mixin class BaseClient implements Client { @override - Future head(Uri url, {Map? headers}) => - _sendUnstreamed('HEAD', url, headers); + bool get supportsController => false; @override - Future get(Uri url, {Map? headers}) => - _sendUnstreamed('GET', url, headers); + Future head(Uri url, + {Map? headers, RequestController? controller}) => + _sendUnstreamed('HEAD', url, headers, controller: controller); + + @override + Future get(Uri url, + {Map? headers, RequestController? controller}) => + _sendUnstreamed('GET', url, headers, controller: controller); @override Future post(Uri url, - {Map? headers, Object? body, Encoding? encoding}) => - _sendUnstreamed('POST', url, headers, body, encoding); + {Map? headers, + Object? body, + Encoding? encoding, + RequestController? controller}) => + _sendUnstreamed('POST', url, headers, + body: body, encoding: encoding, controller: controller); @override Future put(Uri url, - {Map? headers, Object? body, Encoding? encoding}) => - _sendUnstreamed('PUT', url, headers, body, encoding); + {Map? headers, + Object? body, + Encoding? encoding, + RequestController? controller}) => + _sendUnstreamed('PUT', url, headers, + body: body, encoding: encoding, controller: controller); @override Future patch(Uri url, - {Map? headers, Object? body, Encoding? encoding}) => - _sendUnstreamed('PATCH', url, headers, body, encoding); + {Map? headers, + Object? body, + Encoding? encoding, + RequestController? controller}) => + _sendUnstreamed('PATCH', url, headers, + body: body, encoding: encoding, controller: controller); @override Future delete(Uri url, - {Map? headers, Object? body, Encoding? encoding}) => - _sendUnstreamed('DELETE', url, headers, body, encoding); + {Map? headers, + Object? body, + Encoding? encoding, + RequestController? controller}) => + _sendUnstreamed('DELETE', url, headers, + body: body, encoding: encoding, controller: controller); @override - Future read(Uri url, {Map? headers}) async { - final response = await get(url, headers: headers); + Future read(Uri url, + {Map? headers, RequestController? controller}) async { + final response = await get(url, headers: headers, controller: controller); _checkResponseSuccess(url, response); return response.body; } @override - Future readBytes(Uri url, {Map? headers}) async { - final response = await get(url, headers: headers); + Future readBytes(Uri url, + {Map? headers, RequestController? controller}) async { + final response = await get(url, headers: headers, controller: controller); _checkResponseSuccess(url, response); return response.bodyBytes; } @@ -73,8 +97,8 @@ abstract mixin class BaseClient implements Client { /// Sends a non-streaming [Request] and returns a non-streaming [Response]. Future _sendUnstreamed( String method, Uri url, Map? headers, - [Object? body, Encoding? encoding]) async { - var request = Request(method, url); + {Object? body, Encoding? encoding, RequestController? controller}) async { + var request = Request(method, url, controller: controller); if (headers != null) request.headers.addAll(headers); if (encoding != null) request.encoding = encoding; @@ -104,5 +128,5 @@ abstract mixin class BaseClient implements Client { } @override - void close() {} + void close({bool force = true}) {} } diff --git a/pkgs/http/lib/src/base_request.dart b/pkgs/http/lib/src/base_request.dart index 70a78695aa..37253f83a8 100644 --- a/pkgs/http/lib/src/base_request.dart +++ b/pkgs/http/lib/src/base_request.dart @@ -11,6 +11,7 @@ import 'base_client.dart'; import 'base_response.dart'; import 'byte_stream.dart'; import 'client.dart'; +import 'request_controller.dart'; import 'streamed_response.dart'; import 'utils.dart'; @@ -21,6 +22,12 @@ import 'utils.dart'; /// over the request properties. However, usually it's easier to use convenience /// methods like [get] or [BaseClient.get]. abstract class BaseRequest { + /// The [RequestController] of the request. + /// + /// Used to manage the lifecycle of the request. If this is `null`, the + /// request is assumed to be unmanaged and it cannot be cancelled. + final RequestController? controller; + /// The HTTP method of the request. /// /// Most commonly "GET" or "POST", less commonly "HEAD", "PUT", or "DELETE". @@ -96,7 +103,7 @@ abstract class BaseRequest { return method; } - BaseRequest(String method, this.url) + BaseRequest(String method, this.url, {this.controller}) : method = _validateMethod(method), headers = LinkedHashMap( equals: (key1, key2) => key1.toLowerCase() == key2.toLowerCase(), diff --git a/pkgs/http/lib/src/browser_client.dart b/pkgs/http/lib/src/browser_client.dart index 9345be0ce1..45e260016a 100644 --- a/pkgs/http/lib/src/browser_client.dart +++ b/pkgs/http/lib/src/browser_client.dart @@ -10,6 +10,7 @@ import 'base_client.dart'; import 'base_request.dart'; import 'byte_stream.dart'; import 'exception.dart'; +import 'request_controller.dart'; import 'streamed_response.dart'; final _digitRegex = RegExp(r'^\d+$'); @@ -34,6 +35,9 @@ BaseClient createClient() { /// also unable to stream requests or responses; a request will only be sent and /// a response will only be returned once all the data is available. class BrowserClient extends BaseClient { + @override + final bool supportsController = true; + /// The currently active XHRs. /// /// These are aborted if the client is closed. @@ -54,8 +58,107 @@ class BrowserClient extends BaseClient { throw ClientException( 'HTTP request failed. Client is already closed.', request.url); } + var bytes = await request.finalize().toBytes(); + var xhr = HttpRequest(); + + // Life-cycle tracking is implemented using three completers and the + // onReadyStateChange event. The three completers are: + // + // - connectCompleter (completes when OPENED) (initiates sendingCompleter) + // - sendingCompleter (completes when HEADERS_RECEIVED) + // (initiates receivingCompleter) + // - receivingCompleter (completes when DONE) + // + // connectCompleter is initiated immediately and on completion initiates + // sendingCompleter, and so on. + // + // Note 'initiated' is not 'initialized' - initiated refers to a timeout + // being set on the completer, to ensure the step completes within the + // specified timeout. + final controller = request.controller; + + if (controller != null) { + if (controller.hasLifecycleTimeouts) { + // The browser client (which uses XHR) seems not to be able to work with + // partial (streamed) requests or responses, so the receive timeout is + // handled by the browser client itself. + final tracker = controller.track(request, isStreaming: false); + + // Returns a completer for the given state if a timeout is specified + // for it, otherwise returns null. + Completer? completer(RequestLifecycleState state) => + controller.hasTimeoutForLifecycleState(state) + ? Completer() + : null; + + final connectCompleter = completer(RequestLifecycleState.connecting); + final sendingCompleter = completer(RequestLifecycleState.sending); + final receivingCompleter = completer(RequestLifecycleState.receiving); + + // Simply abort the XHR if a timeout or cancellation occurs. + void handleCancel(_) => xhr.abort(); + + // If a connect timeout is specified, initiate the connectCompleter. + if (connectCompleter != null) { + unawaited(tracker.trackRequestState( + connectCompleter.future, + state: RequestLifecycleState.connecting, + onCancel: handleCancel, + )); + } + + xhr.onReadyStateChange.listen((_) { + // If the connection is at the OPENED stage and the + // connectCompleter has not yet been marked as completed, complete it. + if (xhr.readyState == HttpRequest.OPENED) { + if (connectCompleter != null) { + connectCompleter.complete(); + } + + // Initiate the sendingCompleter if there is a timeout specified for + // it. + if (sendingCompleter != null) { + unawaited(tracker.trackRequestState( + sendingCompleter.future, + state: RequestLifecycleState.sending, + onCancel: handleCancel, + )); + } + } + + // If the connection is at the HEADERS_RECEIVED stage and + // the sendingCompleter has not yet been marked as completed, + // complete it. + if (xhr.readyState == HttpRequest.HEADERS_RECEIVED) { + if (sendingCompleter != null) { + sendingCompleter.complete(); + } + + // Initiate the receivingCompleter if there is a timeout specified + // for it. + if (receivingCompleter != null) { + unawaited(tracker.trackRequestState( + receivingCompleter.future, + state: RequestLifecycleState.receiving, + onCancel: handleCancel, + )); + } + } + + // If the connection is at least at the DONE stage and the + // receivingCompleter has not yet been marked as completed, complete + // it. + if (xhr.readyState == HttpRequest.DONE) { + if (receivingCompleter != null) { + receivingCompleter.complete(); + } + } + }); + } + } + _xhrs.add(xhr); xhr ..open(request.method, '${request.url}', async: true) @@ -104,11 +207,16 @@ class BrowserClient extends BaseClient { /// /// This terminates all active requests. @override - void close() { + void close({bool force = true}) { _isClosed = true; - for (var xhr in _xhrs) { - xhr.abort(); + + // If the close is forced (default) then abort all pending requests. + if (force) { + for (var xhr in _xhrs) { + xhr.abort(); + } } + _xhrs.clear(); } } diff --git a/pkgs/http/lib/src/client.dart b/pkgs/http/lib/src/client.dart index 9bceb887f4..2755b8d0e9 100644 --- a/pkgs/http/lib/src/client.dart +++ b/pkgs/http/lib/src/client.dart @@ -15,6 +15,7 @@ import 'client_stub.dart' if (dart.library.html) 'browser_client.dart' if (dart.library.io) 'io_client.dart'; import 'exception.dart'; +import 'request_controller.dart'; import 'response.dart'; import 'streamed_response.dart'; @@ -34,6 +35,13 @@ import 'streamed_response.dart'; /// another instance of [Client] and add functionality on top of that. This /// allows all classes implementing [Client] to be mutually composable. abstract interface class Client { + /// Whether this client supports the [RequestController] API. + /// + /// If this is `true`, [send] will use the supplied [RequestController] and + /// will allow cancelling the request and specifying a timeout. Otherwise, + /// a specified [RequestController] will be ignored. + bool get supportsController => false; + /// Creates a new platform appropriate client. /// /// Creates an `IOClient` if `dart:io` is available and a `BrowserClient` if @@ -43,12 +51,14 @@ abstract interface class Client { /// Sends an HTTP HEAD request with the given headers to the given URL. /// /// For more fine-grained control over the request, use [send] instead. - Future head(Uri url, {Map? headers}); + Future head(Uri url, + {Map? headers, RequestController? controller}); /// Sends an HTTP GET request with the given headers to the given URL. /// /// For more fine-grained control over the request, use [send] instead. - Future get(Uri url, {Map? headers}); + Future get(Uri url, + {Map? headers, RequestController? controller}); /// Sends an HTTP POST request with the given headers and body to the given /// URL. @@ -71,7 +81,10 @@ abstract interface class Client { /// /// For more fine-grained control over the request, use [send] instead. Future post(Uri url, - {Map? headers, Object? body, Encoding? encoding}); + {Map? headers, + Object? body, + Encoding? encoding, + RequestController? controller}); /// Sends an HTTP PUT request with the given headers and body to the given /// URL. @@ -92,7 +105,10 @@ abstract interface class Client { /// /// For more fine-grained control over the request, use [send] instead. Future put(Uri url, - {Map? headers, Object? body, Encoding? encoding}); + {Map? headers, + Object? body, + Encoding? encoding, + RequestController? controller}); /// Sends an HTTP PATCH request with the given headers and body to the given /// URL. @@ -113,13 +129,19 @@ abstract interface class Client { /// /// For more fine-grained control over the request, use [send] instead. Future patch(Uri url, - {Map? headers, Object? body, Encoding? encoding}); + {Map? headers, + Object? body, + Encoding? encoding, + RequestController? controller}); /// Sends an HTTP DELETE request with the given headers to the given URL. /// /// For more fine-grained control over the request, use [send] instead. Future delete(Uri url, - {Map? headers, Object? body, Encoding? encoding}); + {Map? headers, + Object? body, + Encoding? encoding, + RequestController? controller}); /// Sends an HTTP GET request with the given headers to the given URL and /// returns a Future that completes to the body of the response as a String. @@ -129,7 +151,8 @@ abstract interface class Client { /// /// For more fine-grained control over the request and response, use [send] or /// [get] instead. - Future read(Uri url, {Map? headers}); + Future read(Uri url, + {Map? headers, RequestController? controller}); /// Sends an HTTP GET request with the given headers to the given URL and /// returns a Future that completes to the body of the response as a list of @@ -140,7 +163,8 @@ abstract interface class Client { /// /// For more fine-grained control over the request and response, use [send] or /// [get] instead. - Future readBytes(Uri url, {Map? headers}); + Future readBytes(Uri url, + {Map? headers, RequestController? controller}); /// Sends an HTTP request and asynchronously returns the response. Future send(BaseRequest request); @@ -153,7 +177,12 @@ abstract interface class Client { /// Once [close] is called, no other methods should be called. If [close] is /// called while other asynchronous methods are running, the behavior is /// undefined. - void close(); + /// + /// Optionally, [force] can be set to `false` to specify that the client + /// should wait for any pending or in-progress requests to complete before + /// closing, (otherwise, the client will immediately close, terminating all + /// connections immediately to avoid further resource consumption). + void close({bool force = true}); } /// The [Client] for the current [Zone], if one has been set. diff --git a/pkgs/http/lib/src/exception.dart b/pkgs/http/lib/src/exception.dart index 5d47155f30..ce9090629d 100644 --- a/pkgs/http/lib/src/exception.dart +++ b/pkgs/http/lib/src/exception.dart @@ -20,3 +20,19 @@ class ClientException implements Exception { } } } + +/// An exception caused by a request being cancelled programmatically. +// This is a separate exception to allow distinguishing between a request being +// cancelled and a request being aborted due to an error. +class CancelledException extends ClientException { + CancelledException(super.message, [super.uri]); + + @override + String toString() { + if (uri != null) { + return 'CancelledException: $message, uri=$uri'; + } else { + return 'CancelledException: $message'; + } + } +} diff --git a/pkgs/http/lib/src/io_client.dart b/pkgs/http/lib/src/io_client.dart index 247cc8cad6..123946cd9f 100644 --- a/pkgs/http/lib/src/io_client.dart +++ b/pkgs/http/lib/src/io_client.dart @@ -4,11 +4,13 @@ import 'dart:io'; +import 'active_request_tracker.dart'; import 'base_client.dart'; import 'base_request.dart'; import 'client.dart'; import 'exception.dart'; import 'io_streamed_response.dart'; +import 'request_controller.dart'; /// Create an [IOClient]. /// @@ -69,6 +71,9 @@ class _ClientSocketException extends ClientException /// } /// ``` class IOClient extends BaseClient { + @override + final bool supportsController = true; + /// The underlying `dart:io` HTTP client. HttpClient? _inner; @@ -85,17 +90,43 @@ class IOClient extends BaseClient { var stream = request.finalize(); try { - var ioRequest = (await _inner!.openUrl(request.method, request.url)) + final tracker = request.controller?.track(request, isStreaming: true); + + // Open a connection to the server. + // If the connection times out, this will simply throw a + // CancelledException and we needn't do anything else as there's no + // resulting HttpClientRequest to close. + var ioRequest = (await maybeTrack( + _inner!.openUrl(request.method, request.url), + tracker: tracker, + state: RequestLifecycleState.connecting, + )) ..followRedirects = request.followRedirects ..maxRedirects = request.maxRedirects ..contentLength = (request.contentLength ?? -1) ..persistentConnection = request.persistentConnection; + + // Pass-thru all request headers from the BaseRequest. request.headers.forEach((name, value) { ioRequest.headers.set(name, value); }); - var response = await stream.pipe(ioRequest) as HttpClientResponse; - + // Pipe the byte stream of request body data into the HttpClientRequest. + // This will send the request to the server and call .done() on the + // HttpClientRequest when the stream is done. At which point, the future + // will complete once the response is available. + // + // If the step or the request times out or is cancelled, this will abort + // with the corresponding error. + var response = await maybeTrack( + stream.pipe(ioRequest), + tracker: tracker, + state: RequestLifecycleState.sending, + onCancel: (error) => ioRequest.abort(error), + ) as HttpClientResponse; + + // Read all the response headers into a map, comma-concatenating any + // duplicate values for a single header name. var headers = {}; response.headers.forEach((key, values) { // TODO: Remove trimRight() when @@ -130,9 +161,9 @@ class IOClient extends BaseClient { /// Terminates all active connections. If a client remains unclosed, the Dart /// process may not terminate. @override - void close() { + void close({bool force = true}) { if (_inner != null) { - _inner!.close(force: true); + _inner!.close(force: force); _inner = null; } } diff --git a/pkgs/http/lib/src/io_streamed_response.dart b/pkgs/http/lib/src/io_streamed_response.dart index 95b818c2b3..3c10c28691 100644 --- a/pkgs/http/lib/src/io_streamed_response.dart +++ b/pkgs/http/lib/src/io_streamed_response.dart @@ -8,7 +8,7 @@ import 'streamed_response.dart'; /// An HTTP response where the response body is received asynchronously after /// the headers have been received. -class IOStreamedResponse extends StreamedResponse { +class IOStreamedResponse extends ClosableStreamedResponse { final HttpClientResponse? _inner; /// Creates a new streaming response. @@ -30,4 +30,10 @@ class IOStreamedResponse extends StreamedResponse { /// /// Will throw if `inner` was not set or `null` when `this` was created. Future detachSocket() async => _inner!.detachSocket(); + + @override + Future close() async { + // Detach the socket and then destroy it in both directions. + (await detachSocket()).destroy(); + } } diff --git a/pkgs/http/lib/src/mock_client.dart b/pkgs/http/lib/src/mock_client.dart index bf2df40ee7..3d1d8990c8 100644 --- a/pkgs/http/lib/src/mock_client.dart +++ b/pkgs/http/lib/src/mock_client.dart @@ -20,6 +20,9 @@ import 'streamed_response.dart'; /// are made through it so that you can mock a server without having to send /// real HTTP requests. class MockClient extends BaseClient { + @override + bool get supportsController => false; + /// The handler for receiving [StreamedRequest]s and sending /// [StreamedResponse]s. final MockClientStreamHandler _handler; diff --git a/pkgs/http/lib/src/multipart_request.dart b/pkgs/http/lib/src/multipart_request.dart index 79525421fb..a9ed75e6a3 100644 --- a/pkgs/http/lib/src/multipart_request.dart +++ b/pkgs/http/lib/src/multipart_request.dart @@ -45,7 +45,7 @@ class MultipartRequest extends BaseRequest { /// The list of files to upload for this request. final files = []; - MultipartRequest(super.method, super.url); + MultipartRequest(super.method, super.url, {super.controller}); /// The total length of the request body, in bytes. /// diff --git a/pkgs/http/lib/src/request.dart b/pkgs/http/lib/src/request.dart index c15e55169d..fa29b3b716 100644 --- a/pkgs/http/lib/src/request.dart +++ b/pkgs/http/lib/src/request.dart @@ -149,7 +149,7 @@ class Request extends BaseRequest { body = mapToQuery(fields, encoding: encoding); } - Request(super.method, super.url) + Request(super.method, super.url, {super.controller}) : _defaultEncoding = utf8, _bodyBytes = Uint8List(0); diff --git a/pkgs/http/lib/src/request_controller.dart b/pkgs/http/lib/src/request_controller.dart new file mode 100644 index 0000000000..8f2ddd74f8 --- /dev/null +++ b/pkgs/http/lib/src/request_controller.dart @@ -0,0 +1,179 @@ +import 'dart:async'; + +import 'package:meta/meta.dart'; + +import '../http.dart'; +import 'active_request_tracker.dart'; + +/// Represents the state of a request that is in progress. +enum RequestLifecycleState { + /// A connection is being opened to the server to send the request. + connecting, + + /// The request data is being sent to the server. + sending, + + /// The response is being received from the server. + receiving, +} + +/// Encapsulates timeouts for individual parts of a request's lifecycle. +final class PartialTimeouts { + /// The duration to wait for a connection to be successfully opened with a + /// server before aborting the request. + final Duration? connectTimeout; + + /// The duration to wait for the request data to be sent to the server before + /// aborting the request. + final Duration? sendTimeout; + + /// The duration to wait for a response to be received from the server + /// before aborting the request. + final Duration? receiveTimeout; + + /// Creates a new [PartialTimeouts]. + const PartialTimeouts({ + this.connectTimeout, + this.sendTimeout, + this.receiveTimeout, + }); + + /// Creates a new [PartialTimeouts] with all timeouts set to the + /// specified [timeout]. + const PartialTimeouts.all(Duration timeout) + : connectTimeout = timeout, + sendTimeout = timeout, + receiveTimeout = timeout; + + /// Returns true if a timeout is specified for the specified [state]. + /// Returns false otherwise. + bool hasForState(RequestLifecycleState state) => forState(state) != null; + + /// Returns the timeout for the specified [state] if there is one specified, + /// otherwise returns null. + Duration? forState(RequestLifecycleState state) { + switch (state) { + case RequestLifecycleState.connecting: + return connectTimeout; + case RequestLifecycleState.sending: + return sendTimeout; + case RequestLifecycleState.receiving: + return receiveTimeout; + } + } +} + +/// A [RequestController] manages the lifecycle of a request, or multiple +/// requests. +/// +/// Its primary purpose is to allow the proper cancellation of requests on +/// demand. This is useful for cases where a request is no longer needed, but +/// the response is still being awaited. Calling [cancel] on the controller +/// allows the library to clean up any resources associated with the request, +/// and to ensure that the response is never delivered. +/// +/// It is perfectly valid to register one controller per request if individual +/// control over each request is desired. If you intend to call [cancel] +/// on a controller, ensure the controller is only registered on requests that +/// are intended to be cancelled together. +class RequestController { + /// The timeout for the entire round trip of a request before it is aborted. + /// + /// If the request as a whole takes longer than this timeout, it will be + /// cancelled. + /// If this value is `null`, requests will never timeout. + /// + /// If a request times out, it will throw a [TimeoutException]. + final Duration? timeout; + + final PartialTimeouts? _lifecycleTimeouts; + + final List _activeRequests = []; + + /// Returns true if this controller has any timeouts specified. + bool get hasTimeouts => timeout != null || hasLifecycleTimeouts; + + /// Returns true if this controller has any timeouts specified for individual + /// parts of a request's lifecycle. + bool get hasLifecycleTimeouts => _lifecycleTimeouts != null; + + /// Returns true if a timeout is specified for the specified request + /// lifecycle [state]. + /// + /// This is true if the timeout has been specified in the + /// `partialTimeouts` parameter of the constructor for more granular control + /// over timeouts. + bool hasTimeoutForLifecycleState(RequestLifecycleState state) => + timeoutForLifecycleState(state) != null; + + /// Returns the timeout for the specified request lifecycle [state] if there + /// is one specified, otherwise returns null. + /// + /// This is the timeout specified in the `partialTimeouts` parameter of the + /// constructor for more granular control over timeouts. + /// + /// If no timeout is specified for the specified [state], this will return + /// null. + Duration? timeoutForLifecycleState(RequestLifecycleState state) => + _lifecycleTimeouts?.forState(state); + + /// Creates a new [RequestController]. + /// + /// Optionally, a default [timeout] may be specified for requests on this + /// controller. If no timeout is specified, requests will never timeout. + /// See [timeout] for more details. + /// + /// For more granular control over timeouts, [partialTimeouts] may be + /// specified in addition to, or instead of, [timeout]. These can be used + /// to control the timeout for individual parts of a request's lifecycle. + /// + /// For instance, you may wish to abort a request if it takes too long to + /// connect, but once it has connected, you may wish to allow the request + /// to take as long as it needs to send and receive data. + RequestController({this.timeout, PartialTimeouts? partialTimeouts}) + : _lifecycleTimeouts = partialTimeouts; + + /// Tracks a request with this controller. + /// + /// This method is called internally when a request is sent. It should not + /// be called directly. A [StateError] will be thrown if the request is not + /// bound to this controller. + /// + /// If the request is already being tracked by this controller, the + /// existing [ActiveRequestTracker] will be returned. + @internal + ActiveRequestTracker track(BaseRequest request, {required bool isStreaming}) { + if (request.controller != this) { + throw StateError('Request is not bound to this controller'); + } + + if (_activeRequests.any((r) => r.request == request)) { + return _activeRequests.firstWhere((r) => r.request == request); + } + + final activeRequest = ActiveRequestTracker( + request, + isStreaming: isStreaming, + timeout: timeout, + ); + _activeRequests.add(activeRequest); + return activeRequest; + } + + // Fetches a tracker for an existing request. + @internal + ActiveRequestTracker? getExistingTracker(BaseRequest request) => + _activeRequests.where((r) => r.request == request).singleOrNull; + + /// Cancels/aborts all pending requests on this controller. + /// This will cause all requests to throw a [CancelledException]. + /// + /// Optionally, a [message] may be specified to provide a reason for the + /// cancellation that will be included in the exception. If no message is + /// specified, the default message is "Request cancelled". + void cancel([final String message = 'Request cancelled']) { + for (final activeRequest in _activeRequests) { + activeRequest.cancel(message); + } + } +} diff --git a/pkgs/http/lib/src/response.dart b/pkgs/http/lib/src/response.dart index 1ba7c466cf..1929b7447a 100644 --- a/pkgs/http/lib/src/response.dart +++ b/pkgs/http/lib/src/response.dart @@ -7,8 +7,10 @@ import 'dart:typed_data'; import 'package:http_parser/http_parser.dart'; +import 'active_request_tracker.dart'; import 'base_request.dart'; import 'base_response.dart'; +import 'request_controller.dart'; import 'streamed_response.dart'; import 'utils.dart'; @@ -54,7 +56,32 @@ class Response extends BaseResponse { /// Creates a new HTTP response by waiting for the full body to become /// available from a [StreamedResponse]. static Future fromStream(StreamedResponse response) async { - final body = await response.stream.toBytes(); + // Future that collects the body bytes from the response stream. + final collectBodyBytes = response.stream.toBytes(); + + // We can automatically track a response timeout here, if there is one + // specified. + late final Uint8List body; + + if (response.request?.controller != null && + response.request!.controller! + .hasTimeoutForLifecycleState(RequestLifecycleState.receiving)) { + body = await maybeTrack( + collectBodyBytes.timeout(response.request!.controller! + .timeoutForLifecycleState(RequestLifecycleState.receiving)!), + tracker: + response.request!.controller!.getExistingTracker(response.request!), + state: RequestLifecycleState.receiving, + onCancel: (_) { + if (response is ClosableStreamedResponse) { + response.close(); + } + }, + ); + } else { + body = await collectBodyBytes; + } + return Response.bytes(body, response.statusCode, request: response.request, headers: response.headers, diff --git a/pkgs/http/lib/src/streamed_request.dart b/pkgs/http/lib/src/streamed_request.dart index d10386e263..46010a60d3 100644 --- a/pkgs/http/lib/src/streamed_request.dart +++ b/pkgs/http/lib/src/streamed_request.dart @@ -35,21 +35,21 @@ class StreamedRequest extends BaseRequest { /// buffered. /// /// Closing this signals the end of the request. - StreamSink> get sink => _controller.sink; + StreamSink> get sink => _streamController.sink; /// The controller for [sink], from which [BaseRequest] will read data for /// [finalize]. - final StreamController> _controller; + final StreamController> _streamController; /// Creates a new streaming request. - StreamedRequest(super.method, super.url) - : _controller = StreamController>(sync: true); + StreamedRequest(super.method, super.url, {super.controller}) + : _streamController = StreamController>(sync: true); /// Freezes all mutable fields and returns a single-subscription [ByteStream] /// that emits the data being written to [sink]. @override ByteStream finalize() { super.finalize(); - return ByteStream(_controller.stream); + return ByteStream(_streamController.stream); } } diff --git a/pkgs/http/lib/src/streamed_response.dart b/pkgs/http/lib/src/streamed_response.dart index 8cc0c76f75..f440173f8a 100644 --- a/pkgs/http/lib/src/streamed_response.dart +++ b/pkgs/http/lib/src/streamed_response.dart @@ -26,3 +26,19 @@ class StreamedResponse extends BaseResponse { super.reasonPhrase}) : stream = toByteStream(stream); } + +abstract class ClosableStreamedResponse extends StreamedResponse { + ClosableStreamedResponse(super.stream, super.statusCode, + {super.contentLength, + super.request, + super.headers, + super.isRedirect, + super.persistentConnection, + super.reasonPhrase}); + + /// Closes the response body. + /// + /// This should be called when the body is no longer needed in order to free + /// up underlying resources (e.g. sockets). + Future close(); +} diff --git a/pkgs/http/pubspec.yaml b/pkgs/http/pubspec.yaml index ec23e2d608..40dd6e30e3 100644 --- a/pkgs/http/pubspec.yaml +++ b/pkgs/http/pubspec.yaml @@ -1,5 +1,5 @@ name: http -version: 1.1.1-wip +version: 1.2.0 description: A composable, multi-platform, Future-based API for HTTP requests. repository: https://github.com/dart-lang/http/tree/master/pkgs/http