-
Notifications
You must be signed in to change notification settings - Fork 362
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a WebSocket implementation to package:cupertino_http (#1153)
- Loading branch information
1 parent
5dfea72
commit cfbc191
Showing
10 changed files
with
313 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
22 changes: 22 additions & 0 deletions
22
pkgs/cupertino_http/example/integration_test/web_socket_conformance_test.dart
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file | ||
// for details. All rights reserved. Use of this source code is governed by a | ||
// BSD-style license that can be found in the LICENSE file. | ||
|
||
import 'package:cupertino_http/cupertino_http.dart'; | ||
import 'package:test/test.dart'; | ||
import 'package:web_socket_conformance_tests/web_socket_conformance_tests.dart'; | ||
|
||
void main() { | ||
testAll(CupertinoWebSocket.connect); | ||
|
||
group('defaultSessionConfiguration', () { | ||
testAll( | ||
CupertinoWebSocket.connect, | ||
); | ||
}); | ||
group('fromSessionConfiguration', () { | ||
final config = URLSessionConfiguration.ephemeralSessionConfiguration(); | ||
testAll((uri, {protocols}) => | ||
CupertinoWebSocket.connect(uri, protocols: protocols, config: config)); | ||
}); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,204 @@ | ||
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file | ||
// for details. All rights reserved. Use of this source code is governed by a | ||
// BSD-style license that can be found in the LICENSE file. | ||
|
||
import 'dart:async'; | ||
import 'dart:convert'; | ||
import 'dart:typed_data'; | ||
|
||
import 'package:web_socket/web_socket.dart'; | ||
|
||
import 'cupertino_api.dart'; | ||
|
||
/// An error occurred while connecting to the peer. | ||
class ConnectionException extends WebSocketException { | ||
final Error error; | ||
|
||
ConnectionException(super.message, this.error); | ||
|
||
@override | ||
String toString() => 'CupertinoErrorWebSocketException: $message $error'; | ||
} | ||
|
||
/// A [WebSocket] implemented using the | ||
/// [NSURLSessionWebSocketTask API](https://developer.apple.com/documentation/foundation/nsurlsessionwebsockettask). | ||
class CupertinoWebSocket implements WebSocket { | ||
/// Create a new WebSocket connection using the | ||
/// [NSURLSessionWebSocketTask API](https://developer.apple.com/documentation/foundation/nsurlsessionwebsockettask). | ||
/// | ||
/// The URL supplied in [url] must use the scheme ws or wss. | ||
/// | ||
/// If provided, the [protocols] argument indicates that subprotocols that | ||
/// the peer is able to select. See | ||
/// [RFC-6455 1.9](https://datatracker.ietf.org/doc/html/rfc6455#section-1.9). | ||
static Future<CupertinoWebSocket> connect(Uri url, | ||
{Iterable<String>? protocols, URLSessionConfiguration? config}) async { | ||
if (!url.isScheme('ws') && !url.isScheme('wss')) { | ||
throw ArgumentError.value( | ||
url, 'url', 'only ws: and wss: schemes are supported'); | ||
} | ||
|
||
final readyCompleter = Completer<CupertinoWebSocket>(); | ||
late CupertinoWebSocket webSocket; | ||
|
||
final session = URLSession.sessionWithConfiguration( | ||
config ?? URLSessionConfiguration.defaultSessionConfiguration(), | ||
// In a successful flow, the callbacks will be made in this order: | ||
// onWebSocketTaskOpened(...) // Good connect. | ||
// <receive/send messages to the peer> | ||
// onWebSocketTaskClosed(...) // Optional: peer sent Close frame. | ||
// onComplete(..., error=null) // Disconnected. | ||
// | ||
// In a failure to connect to the peer, the flow will be: | ||
// onComplete(session, task, error=error): | ||
// | ||
// `onComplete` can also be called at any point if the peer is | ||
// disconnected without Close frames being exchanged. | ||
onWebSocketTaskOpened: (session, task, protocol) { | ||
webSocket = CupertinoWebSocket._(task, protocol ?? ''); | ||
readyCompleter.complete(webSocket); | ||
}, onWebSocketTaskClosed: (session, task, closeCode, reason) { | ||
assert(readyCompleter.isCompleted); | ||
webSocket._connectionClosed(closeCode, reason); | ||
}, onComplete: (session, task, error) { | ||
if (!readyCompleter.isCompleted) { | ||
// `onWebSocketTaskOpened should have been called and completed | ||
// `readyCompleter`. So either there was a error creating the connection | ||
// or a logic error. | ||
if (error == null) { | ||
throw AssertionError( | ||
'expected an error or "onWebSocketTaskOpened" to be called ' | ||
'first'); | ||
} | ||
readyCompleter.completeError( | ||
ConnectionException('connection ended unexpectedly', error)); | ||
} else { | ||
// There are three possibilities here: | ||
// 1. the peer sent a close Frame, `onWebSocketTaskClosed` was already | ||
// called and `_connectionClosed` is a no-op. | ||
// 2. we sent a close Frame (through `close()`) and `_connectionClosed` | ||
// is a no-op. | ||
// 3. an error occured (e.g. network failure) and `_connectionClosed` | ||
// will signal that and close `event`. | ||
webSocket._connectionClosed( | ||
1006, Data.fromList('abnormal close'.codeUnits)); | ||
} | ||
}); | ||
|
||
session.webSocketTaskWithURL(url, protocols: protocols).resume(); | ||
return readyCompleter.future; | ||
} | ||
|
||
final URLSessionWebSocketTask _task; | ||
final String _protocol; | ||
final _events = StreamController<WebSocketEvent>(); | ||
|
||
CupertinoWebSocket._(this._task, this._protocol) { | ||
_scheduleReceive(); | ||
} | ||
|
||
/// Handle an incoming message from the peer and schedule receiving the next | ||
/// message. | ||
void _handleMessage(URLSessionWebSocketMessage value) { | ||
late WebSocketEvent event; | ||
switch (value.type) { | ||
case URLSessionWebSocketMessageType.urlSessionWebSocketMessageTypeString: | ||
event = TextDataReceived(value.string!); | ||
break; | ||
case URLSessionWebSocketMessageType.urlSessionWebSocketMessageTypeData: | ||
event = BinaryDataReceived(value.data!.bytes); | ||
break; | ||
} | ||
_events.add(event); | ||
_scheduleReceive(); | ||
} | ||
|
||
void _scheduleReceive() { | ||
unawaited(_task | ||
.receiveMessage() | ||
.then(_handleMessage, onError: _closeConnectionWithError)); | ||
} | ||
|
||
/// Close the WebSocket connection due to an error and send the | ||
/// [CloseReceived] event. | ||
void _closeConnectionWithError(Object e) { | ||
if (e is Error) { | ||
if (e.domain == 'NSPOSIXErrorDomain' && e.code == 57) { | ||
// Socket is not connected. | ||
// onWebSocketTaskClosed/onComplete will be invoked and may indicate a | ||
// close code. | ||
return; | ||
} | ||
var (int code, String? reason) = switch ([e.domain, e.code]) { | ||
['NSPOSIXErrorDomain', 100] => (1002, e.localizedDescription), | ||
_ => (1006, e.localizedDescription) | ||
}; | ||
_task.cancel(); | ||
_connectionClosed( | ||
code, reason == null ? null : Data.fromList(reason.codeUnits)); | ||
} else { | ||
throw StateError('unexpected error: $e'); | ||
} | ||
} | ||
|
||
void _connectionClosed(int? closeCode, Data? reason) { | ||
if (!_events.isClosed) { | ||
final closeReason = reason == null ? '' : utf8.decode(reason.bytes); | ||
|
||
_events | ||
..add(CloseReceived(closeCode, closeReason)) | ||
..close(); | ||
} | ||
} | ||
|
||
@override | ||
void sendBytes(Uint8List b) { | ||
if (_events.isClosed) { | ||
throw StateError('WebSocket is closed'); | ||
} | ||
_task | ||
.sendMessage(URLSessionWebSocketMessage.fromData(Data.fromList(b))) | ||
.then((_) => _, onError: _closeConnectionWithError); | ||
} | ||
|
||
@override | ||
void sendText(String s) { | ||
if (_events.isClosed) { | ||
throw StateError('WebSocket is closed'); | ||
} | ||
_task | ||
.sendMessage(URLSessionWebSocketMessage.fromString(s)) | ||
.then((_) => _, onError: _closeConnectionWithError); | ||
} | ||
|
||
@override | ||
Future<void> close([int? code, String? reason]) async { | ||
if (_events.isClosed) { | ||
throw StateError('WebSocket is closed'); | ||
} | ||
|
||
if (code != null) { | ||
RangeError.checkValueInInterval(code, 3000, 4999, 'code'); | ||
} | ||
if (reason != null && utf8.encode(reason).length > 123) { | ||
throw ArgumentError.value(reason, 'reason', | ||
'reason must be <= 123 bytes long when encoded as UTF-8'); | ||
} | ||
|
||
if (!_events.isClosed) { | ||
unawaited(_events.close()); | ||
if (code != null) { | ||
reason = reason ?? ''; | ||
_task.cancelWithCloseCode(code, Data.fromList(utf8.encode(reason))); | ||
} else { | ||
_task.cancel(); | ||
} | ||
} | ||
} | ||
|
||
@override | ||
Stream<WebSocketEvent> get events => _events.stream; | ||
|
||
@override | ||
String get protocol => _protocol; | ||
} |
Oops, something went wrong.