diff --git a/lib/http.dart b/lib/http.dart index 1ea751eab1..efc6cbd9ff 100644 --- a/lib/http.dart +++ b/lib/http.dart @@ -3,6 +3,7 @@ // BSD-style license that can be found in the LICENSE file. /// A composable, [Future]-based library for making HTTP requests. +import 'dart:async'; import 'dart:convert'; import 'dart:typed_data'; @@ -27,23 +28,35 @@ export 'src/streamed_response.dart'; /// Sends an HTTP HEAD request with the given headers to the given URL. /// +/// If [timeout] is not null the request will be aborted if it takes longer than +/// the given duration to complete, and the returned future will complete as an +/// error with a [TimeoutException]. +/// /// This automatically initializes a new [Client] and closes that client once /// the request is complete. If you're planning on making multiple requests to /// the same server, you should use a single [Client] for all of those requests. /// /// For more fine-grained control over the request, use [Request] instead. -Future head(Uri url, {Map? headers}) => - _withClient((client) => client.head(url, headers: headers)); +Future head(Uri url, + {Map? headers, Duration? timeout}) => + _withClient( + (client) => client.head(url, headers: headers, timeout: timeout)); /// Sends an HTTP GET request with the given headers to the given URL. /// +/// If [timeout] is not null the request will be aborted if it takes longer than +/// the given duration to complete, and the returned future will complete as an +/// error with a [TimeoutException]. +/// /// This automatically initializes a new [Client] and closes that client once /// the request is complete. If you're planning on making multiple requests to /// the same server, you should use a single [Client] for all of those requests. /// /// For more fine-grained control over the request, use [Request] instead. -Future get(Uri url, {Map? headers}) => - _withClient((client) => client.get(url, headers: headers)); +Future get(Uri url, + {Map? headers, Duration? timeout}) => + _withClient( + (client) => client.get(url, headers: headers, timeout: timeout)); /// Sends an HTTP POST request with the given headers and body to the given URL. /// @@ -61,12 +74,19 @@ Future get(Uri url, {Map? headers}) => /// /// [encoding] defaults to [utf8]. /// +/// If [timeout] is not null the request will be aborted if it takes longer than +/// the given duration to complete, and the returned future will complete as an +/// error with a [TimeoutException]. +/// /// For more fine-grained control over the request, use [Request] or /// [StreamedRequest] instead. Future post(Uri url, - {Map? headers, Object? body, Encoding? encoding}) => - _withClient((client) => - client.post(url, headers: headers, body: body, encoding: encoding)); + {Map? headers, + Object? body, + Encoding? encoding, + Duration? timeout}) => + _withClient((client) => client.post(url, + headers: headers, body: body, encoding: encoding, timeout: timeout)); /// Sends an HTTP PUT request with the given headers and body to the given URL. /// @@ -84,12 +104,19 @@ Future post(Uri url, /// /// [encoding] defaults to [utf8]. /// +/// If [timeout] is not null the request will be aborted if it takes longer than +/// the given duration to complete, and the returned future will complete as an +/// error with a [TimeoutException]. +/// /// For more fine-grained control over the request, use [Request] or /// [StreamedRequest] instead. Future put(Uri url, - {Map? headers, Object? body, Encoding? encoding}) => - _withClient((client) => - client.put(url, headers: headers, body: body, encoding: encoding)); + {Map? headers, + Object? body, + Encoding? encoding, + Duration? timeout}) => + _withClient((client) => client.put(url, + headers: headers, body: body, encoding: encoding, timeout: timeout)); /// Sends an HTTP PATCH request with the given headers and body to the given /// URL. @@ -108,24 +135,38 @@ Future put(Uri url, /// /// [encoding] defaults to [utf8]. /// +/// If [timeout] is not null the request will be aborted if it takes longer than +/// the given duration to complete, and the returned future will complete as an +/// error with a [TimeoutException]. +/// /// For more fine-grained control over the request, use [Request] or /// [StreamedRequest] instead. Future patch(Uri url, - {Map? headers, Object? body, Encoding? encoding}) => - _withClient((client) => - client.patch(url, headers: headers, body: body, encoding: encoding)); + {Map? headers, + Object? body, + Encoding? encoding, + Duration? timeout}) => + _withClient((client) => client.patch(url, + headers: headers, body: body, encoding: encoding, timeout: timeout)); /// Sends an HTTP DELETE request with the given headers to the given URL. /// +/// If [timeout] is not null the request will be aborted if it takes longer than +/// the given duration to complete, and the returned future will complete as an +/// error with a [TimeoutException]. +/// /// This automatically initializes a new [Client] and closes that client once /// the request is complete. If you're planning on making multiple requests to /// the same server, you should use a single [Client] for all of those requests. /// /// For more fine-grained control over the request, use [Request] instead. Future delete(Uri url, - {Map? headers, Object? body, Encoding? encoding}) => - _withClient((client) => - client.delete(url, headers: headers, body: body, encoding: encoding)); + {Map? headers, + Object? body, + Encoding? encoding, + Duration? timeout}) => + _withClient((client) => client.delete(url, + headers: headers, body: body, encoding: encoding, timeout: timeout)); /// Sends an HTTP GET request with the given headers to the given URL and /// returns a Future that completes to the body of the response as a [String]. @@ -133,14 +174,20 @@ Future delete(Uri url, /// The Future will emit a [ClientException] if the response doesn't have a /// success status code. /// +/// If [timeout] is not null the request will be aborted if it takes longer than +/// the given duration to complete, and the returned future will complete as an +/// error with a [TimeoutException]. +/// /// This automatically initializes a new [Client] and closes that client once /// the request is complete. If you're planning on making multiple requests to /// the same server, you should use a single [Client] for all of those requests. /// /// For more fine-grained control over the request and response, use [Request] /// instead. -Future read(Uri url, {Map? headers}) => - _withClient((client) => client.read(url, headers: headers)); +Future read(Uri url, + {Map? headers, Duration? timeout}) => + _withClient( + (client) => client.read(url, headers: headers, timeout: timeout)); /// Sends an HTTP GET request with the given headers to the given URL and /// returns a Future that completes to the body of the response as a list of @@ -149,14 +196,20 @@ Future read(Uri url, {Map? headers}) => /// The Future will emit a [ClientException] if the response doesn't have a /// success status code. /// +/// If [timeout] is not null the request will be aborted if it takes longer than +/// the given duration to complete, and the returned future will complete as an +/// error with a [TimeoutException]. +/// /// This automatically initializes a new [Client] and closes that client once /// the request is complete. If you're planning on making multiple requests to /// the same server, you should use a single [Client] for all of those requests. /// /// For more fine-grained control over the request and response, use [Request] /// instead. -Future readBytes(Uri url, {Map? headers}) => - _withClient((client) => client.readBytes(url, headers: headers)); +Future readBytes(Uri url, + {Map? headers, Duration? timeout}) => + _withClient( + (client) => client.readBytes(url, headers: headers, timeout: timeout)); Future _withClient(Future Function(Client) fn) async { var client = Client(); diff --git a/lib/src/base_client.dart b/lib/src/base_client.dart index efb065f70e..8e1445a1f7 100644 --- a/lib/src/base_client.dart +++ b/lib/src/base_client.dart @@ -19,43 +19,59 @@ import 'streamed_response.dart'; /// maybe [close], and then they get various convenience methods for free. abstract class BaseClient implements Client { @override - Future head(Uri url, {Map? headers}) => - _sendUnstreamed('HEAD', url, headers); + Future head(Uri url, + {Map? headers, Duration? timeout}) => + _sendUnstreamed('HEAD', url, headers, null, null, timeout); @override - Future get(Uri url, {Map? headers}) => - _sendUnstreamed('GET', url, headers); + Future get(Uri url, + {Map? headers, Duration? timeout}) => + _sendUnstreamed('GET', url, headers, null, null, timeout); @override Future post(Uri url, - {Map? headers, Object? body, Encoding? encoding}) => - _sendUnstreamed('POST', url, headers, body, encoding); + {Map? headers, + Object? body, + Encoding? encoding, + Duration? timeout}) => + _sendUnstreamed('POST', url, headers, body, encoding, timeout); @override Future put(Uri url, - {Map? headers, Object? body, Encoding? encoding}) => - _sendUnstreamed('PUT', url, headers, body, encoding); + {Map? headers, + Object? body, + Encoding? encoding, + Duration? timeout}) => + _sendUnstreamed('PUT', url, headers, body, encoding, timeout); @override Future patch(Uri url, - {Map? headers, Object? body, Encoding? encoding}) => - _sendUnstreamed('PATCH', url, headers, body, encoding); + {Map? headers, + Object? body, + Encoding? encoding, + Duration? timeout}) => + _sendUnstreamed('PATCH', url, headers, body, encoding, timeout); @override Future delete(Uri url, - {Map? headers, Object? body, Encoding? encoding}) => - _sendUnstreamed('DELETE', url, headers, body, encoding); + {Map? headers, + Object? body, + Encoding? encoding, + Duration? timeout}) => + _sendUnstreamed('DELETE', url, headers, body, encoding, timeout); @override - Future read(Uri url, {Map? headers}) async { - final response = await get(url, headers: headers); + Future read(Uri url, + {Map? headers, Duration? timeout}) async { + final response = await get(url, headers: headers, timeout: timeout); _checkResponseSuccess(url, response); return response.body; } @override - Future readBytes(Uri url, {Map? headers}) async { - final response = await get(url, headers: headers); + Future readBytes(Uri url, + {Map? headers, Duration? timeout}) async { + final response = await get(url, headers: headers, timeout: timeout); _checkResponseSuccess(url, response); return response.bodyBytes; } @@ -68,12 +84,17 @@ abstract class BaseClient implements Client { /// later point, or it could already be closed when it's returned. Any /// internal HTTP errors should be wrapped as [ClientException]s. @override - Future send(BaseRequest request); + Future send(BaseRequest request, + {Duration? contentTimeout}); /// Sends a non-streaming [Request] and returns a non-streaming [Response]. Future _sendUnstreamed( - String method, Uri url, Map? headers, - [body, Encoding? encoding]) async { + String method, + Uri url, + Map? headers, + dynamic body, + Encoding? encoding, + Duration? timeout) async { var request = Request(method, url); if (headers != null) request.headers.addAll(headers); @@ -90,7 +111,7 @@ abstract class BaseClient implements Client { } } - return Response.fromStream(await send(request)); + return Response.fromStream(await send(request, contentTimeout: timeout)); } /// Throws an error if [response] is not successful. diff --git a/lib/src/base_request.dart b/lib/src/base_request.dart index 6380cb0bc9..156e20d85e 100644 --- a/lib/src/base_request.dart +++ b/lib/src/base_request.dart @@ -2,6 +2,7 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. +import 'dart:async'; import 'dart:collection'; import 'package:meta/meta.dart'; @@ -117,11 +118,18 @@ abstract class BaseRequest { /// the request is complete. If you're planning on making multiple requests to /// the same server, you should use a single [Client] for all of those /// requests. - Future send() async { + /// + /// If [contentTimeout] is not null the request will be aborted if it takes + /// longer than the given duration to receive the entire response. If the + /// timeout occurs before any reply is received from the server the returned + /// future will as an error with a [TimeoutException]. If the timout occurs + /// after the reply has been started but before the entire body has been read + /// the response stream will emit a [TimeoutException] and close. + Future send({Duration? contentTimeout}) async { var client = Client(); try { - var response = await client.send(this); + var response = await client.send(this, contentTimeout: contentTimeout); var stream = onDone(response.stream, client.close); return StreamedResponse(ByteStream(stream), response.statusCode, contentLength: response.contentLength, diff --git a/lib/src/browser_client.dart b/lib/src/browser_client.dart index d1eb35c1f5..123b51ec76 100644 --- a/lib/src/browser_client.dart +++ b/lib/src/browser_client.dart @@ -41,43 +41,65 @@ class BrowserClient extends BaseClient { /// Sends an HTTP request and asynchronously returns the response. @override - Future send(BaseRequest request) async { + Future send(BaseRequest request, + {Duration? contentTimeout}) { + final completer = Completer(); + _send(request, contentTimeout, completer); + return completer.future; + } + + Future _send(BaseRequest request, Duration? timeout, + Completer completer) async { + Timer? timer; + HttpRequest? toAbort; + if (timeout != null) { + timer = Timer(timeout, () { + toAbort?.abort(); + if (!completer.isCompleted) { + completer.completeError(TimeoutException('Request aborted', timeout)); + } + }); + } var bytes = await request.finalize().toBytes(); - var xhr = HttpRequest(); + var xhr = toAbort = HttpRequest(); _xhrs.add(xhr); + unawaited(completer.future + .whenComplete(() { + _xhrs.remove(xhr); + }) + .then((_) {}) + .catchError((_) {})); xhr ..open(request.method, '${request.url}', async: true) ..responseType = 'arraybuffer' ..withCredentials = withCredentials; request.headers.forEach(xhr.setRequestHeader); - var completer = Completer(); - unawaited(xhr.onLoad.first.then((_) { var body = (xhr.response as ByteBuffer).asUint8List(); - completer.complete(StreamedResponse( - ByteStream.fromBytes(body), xhr.status!, - contentLength: body.length, - request: request, - headers: xhr.responseHeaders, - reasonPhrase: xhr.statusText)); + timer?.cancel(); + if (!completer.isCompleted) { + completer.complete(StreamedResponse( + ByteStream.fromBytes(body), xhr.status!, + contentLength: body.length, + request: request, + headers: xhr.responseHeaders, + reasonPhrase: xhr.statusText)); + } })); unawaited(xhr.onError.first.then((_) { // Unfortunately, the underlying XMLHttpRequest API doesn't expose any // specific information about the error itself. - completer.completeError( - ClientException('XMLHttpRequest error.', request.url), - StackTrace.current); + timer?.cancel(); + if (!completer.isCompleted) { + completer.completeError( + ClientException('XMLHttpRequest error.', request.url), + StackTrace.current); + } })); xhr.send(bytes); - - try { - return await completer.future; - } finally { - _xhrs.remove(xhr); - } } /// Closes the client. diff --git a/lib/src/client.dart b/lib/src/client.dart index 12695e7123..ba162dbb2f 100644 --- a/lib/src/client.dart +++ b/lib/src/client.dart @@ -2,6 +2,7 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. +import 'dart:async'; import 'dart:convert'; import 'dart:typed_data'; @@ -33,13 +34,23 @@ abstract class Client { /// Sends an HTTP HEAD request with the given headers to the given URL. /// + /// If [timeout] is not null the request will be aborted if it takes longer + /// than the given duration to complete, and the returned future will complete + /// as an error with a [TimeoutException]. + /// /// For more fine-grained control over the request, use [send] instead. - Future head(Uri url, {Map? headers}); + Future head(Uri url, + {Map? headers, Duration? timeout}); /// Sends an HTTP GET request with the given headers to the given URL. /// + /// If [timeout] is not null the request will be aborted if it takes longer + /// than the given duration to complete, and the returned future will complete + /// as an error with a [TimeoutException]. + /// /// For more fine-grained control over the request, use [send] instead. - Future get(Uri url, {Map? headers}); + Future get(Uri url, + {Map? headers, Duration? timeout}); /// Sends an HTTP POST request with the given headers and body to the given /// URL. @@ -58,9 +69,16 @@ abstract class Client { /// /// [encoding] defaults to [utf8]. /// + /// If [timeout] is not null the request will be aborted if it takes longer + /// than the given duration to complete, and the returned future will complete + /// as an error with a [TimeoutException]. + /// /// For more fine-grained control over the request, use [send] instead. Future post(Uri url, - {Map? headers, Object? body, Encoding? encoding}); + {Map? headers, + Object? body, + Encoding? encoding, + Duration? timeout}); /// Sends an HTTP PUT request with the given headers and body to the given /// URL. @@ -79,9 +97,16 @@ abstract class Client { /// /// [encoding] defaults to [utf8]. /// + /// If [timeout] is not null the request will be aborted if it takes longer + /// than the given duration to complete, and the returned future will complete + /// as an error with a [TimeoutException]. + /// /// For more fine-grained control over the request, use [send] instead. Future put(Uri url, - {Map? headers, Object? body, Encoding? encoding}); + {Map? headers, + Object? body, + Encoding? encoding, + Duration? timeout}); /// Sends an HTTP PATCH request with the given headers and body to the given /// URL. @@ -100,15 +125,29 @@ abstract class Client { /// /// [encoding] defaults to [utf8]. /// + /// If [timeout] is not null the request will be aborted if it takes longer + /// than the given duration to complete, and the returned future will complete + /// as an error with a [TimeoutException]. + /// /// For more fine-grained control over the request, use [send] instead. Future patch(Uri url, - {Map? headers, Object? body, Encoding? encoding}); + {Map? headers, + Object? body, + Encoding? encoding, + Duration? timeout}); /// Sends an HTTP DELETE request with the given headers to the given URL. /// + /// If [timeout] is not null the request will be aborted if it takes longer + /// than the given duration to complete, and the returned future will complete + /// as an error with a [TimeoutException]. + /// /// For more fine-grained control over the request, use [send] instead. Future delete(Uri url, - {Map? headers, Object? body, Encoding? encoding}); + {Map? headers, + Object? body, + Encoding? encoding, + Duration? timeout}); /// Sends an HTTP GET request with the given headers to the given URL and /// returns a Future that completes to the body of the response as a String. @@ -116,9 +155,14 @@ abstract class Client { /// The Future will emit a [ClientException] if the response doesn't have a /// success status code. /// + /// If [timeout] is not null the request will be aborted if it takes longer + /// than the given duration to complete, and the returned future will complete + /// as an error with a [TimeoutException]. + /// /// For more fine-grained control over the request and response, use [send] or /// [get] instead. - Future read(Uri url, {Map? headers}); + Future read(Uri url, + {Map? headers, Duration? timeout}); /// Sends an HTTP GET request with the given headers to the given URL and /// returns a Future that completes to the body of the response as a list of @@ -127,12 +171,25 @@ abstract class Client { /// The Future will emit a [ClientException] if the response doesn't have a /// success status code. /// + /// If [timeout] is not null the request will be aborted if it takes longer + /// than the given duration to complete, and the returned future will complete + /// as an error with a [TimeoutException]. + /// /// For more fine-grained control over the request and response, use [send] or /// [get] instead. - Future readBytes(Uri url, {Map? headers}); + Future readBytes(Uri url, + {Map? headers, Duration? timeout}); /// Sends an HTTP request and asynchronously returns the response. - Future send(BaseRequest request); + /// + /// If [contentTimeout] is not null the request will be aborted if it takes + /// longer than the given duration to receive the entire response. If the + /// timeout occurs before any reply is received from the server the returned + /// future will as an error with a [TimeoutException]. If the timout occurs + /// after the reply has been started but before the entire body has been read + /// the response stream will emit a [TimeoutException] and close. + Future send(BaseRequest request, {Duration? + contentTimeout}); /// Closes the client and cleans up any resources associated with it. /// diff --git a/lib/src/io_client.dart b/lib/src/io_client.dart index 5ee66db0f8..590b831e88 100644 --- a/lib/src/io_client.dart +++ b/lib/src/io_client.dart @@ -2,6 +2,7 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. +import 'dart:async'; import 'dart:io'; import 'base_client.dart'; @@ -23,42 +24,95 @@ class IOClient extends BaseClient { /// Sends an HTTP request and asynchronously returns the response. @override - Future send(BaseRequest request) async { + Future send(BaseRequest request, + {Duration? contentTimeout}) { + final completer = Completer(); + _send(request, contentTimeout, completer); + return completer.future; + } + + Future _send(BaseRequest request, Duration? contentTimeout, + Completer completer) async { var stream = request.finalize(); + Timer? timer; + void Function() onTimeout; + if (contentTimeout != null) { + onTimeout = () { + if (!completer.isCompleted) { + completer.completeError( + TimeoutException('Request aborted', contentTimeout)); + } + }; + timer = Timer(contentTimeout, () { + onTimeout(); + }); + } try { var ioRequest = (await _inner!.openUrl(request.method, request.url)) ..followRedirects = request.followRedirects ..maxRedirects = request.maxRedirects ..contentLength = (request.contentLength ?? -1) ..persistentConnection = request.persistentConnection; + if (completer.isCompleted) return; request.headers.forEach((name, value) { ioRequest.headers.set(name, value); }); + if (contentTimeout != null) { + onTimeout = () { + ioRequest.abort(); + if (!completer.isCompleted) { + completer.completeError( + TimeoutException('Request aborted', contentTimeout)); + } + }; + } + var response = await stream.pipe(ioRequest) as HttpClientResponse; + if (completer.isCompleted) return; var headers = {}; response.headers.forEach((key, values) { headers[key] = values.join(','); }); + var wasTimedOut = false; + if (contentTimeout != null) { + onTimeout = () { + wasTimedOut = true; + response.detachSocket().then((socket) => socket.destroy()); + }; + } + var responseStream = response.handleError((error) { + final httpException = error as HttpException; + throw ClientException(httpException.message, httpException.uri); + }, test: (error) => error is HttpException).transform>( + StreamTransformer.fromHandlers(handleDone: (sink) { + timer?.cancel(); + if (wasTimedOut) { + sink.addError(TimeoutException('Request aborted', contentTimeout)); + } + sink.close(); + })); - return IOStreamedResponse( - response.handleError((error) { - final httpException = error as HttpException; - throw ClientException(httpException.message, httpException.uri); - }, test: (error) => error is HttpException), - response.statusCode, - contentLength: - response.contentLength == -1 ? null : response.contentLength, - request: request, - headers: headers, - isRedirect: response.isRedirect, - persistentConnection: response.persistentConnection, - reasonPhrase: response.reasonPhrase, - inner: response); + if (!completer.isCompleted) { + completer.complete(IOStreamedResponse( + responseStream, response.statusCode, + contentLength: + response.contentLength == -1 ? null : response.contentLength, + request: request, + headers: headers, + isRedirect: response.isRedirect, + persistentConnection: response.persistentConnection, + reasonPhrase: response.reasonPhrase, + inner: response)); + } } on HttpException catch (error) { - throw ClientException(error.message, error.uri); + if (completer.isCompleted) return; + completer.completeError(ClientException(error.message, error.uri)); + } catch (error, stackTrace) { + if (completer.isCompleted) return; + completer.completeError(error, stackTrace); } } diff --git a/lib/src/mock_client.dart b/lib/src/mock_client.dart index abfcde20e3..09312062c0 100644 --- a/lib/src/mock_client.dart +++ b/lib/src/mock_client.dart @@ -65,7 +65,8 @@ class MockClient extends BaseClient { }); @override - Future send(BaseRequest request) async { + Future send(BaseRequest request, + {Duration? contentTimeout}) async { var bodyStream = request.finalize(); return await _handler(request, bodyStream); }