Skip to content

Commit

Permalink
Create an adapter for package:web_socket (dart-lang/web_socket_chan…
Browse files Browse the repository at this point in the history
  • Loading branch information
brianquinlan authored Apr 8, 2024
1 parent ccfa26f commit 01817d2
Show file tree
Hide file tree
Showing 11 changed files with 363 additions and 62 deletions.
11 changes: 8 additions & 3 deletions pkgs/web_socket_channel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
## 3.0.0-wip

- Provide an adapter around `package:web_socket` `WebSocket`s and make it the
default implementation for `WebSocketChannel.connect`.

## 2.4.5

* use secure random number generator for frame masking.
- use secure random number generator for frame masking.

## 2.4.4

* Require Dart `^3.3`
* Require `package:web` `^0.5.0`.
- Require Dart `^3.3`
- Require `package:web` `^0.5.0`.

## 2.4.3

Expand Down
15 changes: 0 additions & 15 deletions pkgs/web_socket_channel/lib/src/_connect_api.dart

This file was deleted.

16 changes: 0 additions & 16 deletions pkgs/web_socket_channel/lib/src/_connect_html.dart

This file was deleted.

15 changes: 0 additions & 15 deletions pkgs/web_socket_channel/lib/src/_connect_io.dart

This file was deleted.

6 changes: 2 additions & 4 deletions pkgs/web_socket_channel/lib/src/channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ import 'package:async/async.dart';
import 'package:crypto/crypto.dart';
import 'package:stream_channel/stream_channel.dart';

import '_connect_api.dart'
if (dart.library.io) '_connect_io.dart'
if (dart.library.js_interop) '_connect_html.dart' as platform;
import '../web_socket_adapter_web_socket_channel.dart';
import 'copy/web_socket_impl.dart';
import 'exception.dart';

Expand Down Expand Up @@ -141,7 +139,7 @@ class WebSocketChannel extends StreamChannelMixin {
/// If there are errors creating the connection the [ready] future will
/// complete with an error.
factory WebSocketChannel.connect(Uri uri, {Iterable<String>? protocols}) =>
platform.connect(uri, protocols: protocols);
WebSocketAdapterWebSocketChannel.connect(uri, protocols: protocols);
}

/// The sink exposed by a [WebSocketChannel].
Expand Down
136 changes: 136 additions & 0 deletions pkgs/web_socket_channel/lib/web_socket_adapter_web_socket_channel.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:typed_data';

import 'package:async/async.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:web_socket/web_socket.dart';

import 'src/channel.dart';
import 'src/exception.dart';

/// A [WebSocketChannel] implemented using [WebSocket].
class WebSocketAdapterWebSocketChannel extends StreamChannelMixin
implements WebSocketChannel {
@override
String? get protocol => _protocol;
String? _protocol;

@override
int? get closeCode => _closeCode;
int? _closeCode;

@override
String? get closeReason => _closeReason;
String? _closeReason;

/// The close code set by the local user.
///
/// To ensure proper ordering, this is stored until we get a done event on
/// [_controller.local.stream].
int? _localCloseCode;

/// The close reason set by the local user.
///
/// To ensure proper ordering, this is stored until we get a done event on
/// [_controller.local.stream].
String? _localCloseReason;

/// Completer for [ready].
final _readyCompleter = Completer<void>();

@override
Future<void> get ready => _readyCompleter.future;

@override
Stream get stream => _controller.foreign.stream;

final _controller =
StreamChannelController<Object?>(sync: true, allowForeignErrors: false);

@override
late final WebSocketSink sink = _WebSocketSink(this);

/// Creates a new WebSocket connection.
///
/// If provided, the [protocols] argument indicates that subprotocols that
/// the peer is able to select. See
/// [RFC-6455 1.9](https://datatracker.ietf.org/doc/html/rfc6455#section-1.9).
///
/// After construction, the [WebSocketAdapterWebSocketChannel] may not be
/// connected to the peer. The [ready] future will complete after the channel
/// is connected. If there are errors creating the connection the [ready]
/// future will complete with an error.
factory WebSocketAdapterWebSocketChannel.connect(Uri url,
{Iterable<String>? protocols}) =>
WebSocketAdapterWebSocketChannel._(
WebSocket.connect(url, protocols: protocols));

// Create a [WebSocketWebSocketChannelAdapter] from an existing [WebSocket].
factory WebSocketAdapterWebSocketChannel.fromWebSocket(WebSocket webSocket) =>
WebSocketAdapterWebSocketChannel._(Future.value(webSocket));

WebSocketAdapterWebSocketChannel._(Future<WebSocket> webSocketFuture) {
webSocketFuture.then((webSocket) {
var remoteClosed = false;
webSocket.events.listen((event) {
switch (event) {
case TextDataReceived(text: final text):
_controller.local.sink.add(text);
case BinaryDataReceived(data: final data):
_controller.local.sink.add(data);
case CloseReceived(code: final code, reason: final reason):
remoteClosed = true;
_closeCode = code;
_closeReason = reason;
_controller.local.sink.close();
}
});
_controller.local.stream.listen((obj) {
try {
switch (obj) {
case final String s:
webSocket.sendText(s);
case final Uint8List b:
webSocket.sendBytes(b);
case final List<int> b:
webSocket.sendBytes(Uint8List.fromList(b));
default:
throw UnsupportedError('Cannot send ${obj.runtimeType}');
}
} on WebSocketConnectionClosed catch (_) {
// There is nowhere to surface this error; `_controller.local.sink`
// has already been closed.
}
}, onDone: () {
if (!remoteClosed) {
webSocket.close(_localCloseCode, _localCloseReason);
}
});
_protocol = webSocket.protocol;
_readyCompleter.complete();
}, onError: (Object e) {
_readyCompleter.completeError(WebSocketChannelException.from(e));
});
}
}

/// A [WebSocketSink] that tracks the close code and reason passed to [close].
class _WebSocketSink extends DelegatingStreamSink implements WebSocketSink {
/// The channel to which this sink belongs.
final WebSocketAdapterWebSocketChannel _channel;

_WebSocketSink(WebSocketAdapterWebSocketChannel channel)
: _channel = channel,
super(channel._controller.foreign.sink);

@override
Future close([int? closeCode, String? closeReason]) {
_channel._localCloseCode = closeCode;
_channel._localCloseReason = closeReason;
return super.close();
}
}
9 changes: 8 additions & 1 deletion pkgs/web_socket_channel/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: web_socket_channel
version: 2.4.5
version: 3.0.0-wip
description: >-
StreamChannel wrappers for WebSockets. Provides a cross-platform
WebSocketChannel API, a cross-platform implementation of that API that
Expand All @@ -14,7 +14,14 @@ dependencies:
crypto: ^3.0.0
stream_channel: ^2.1.0
web: ^0.5.0
web_socket: ^0.1.0

dev_dependencies:
dart_flutter_team_lints: ^2.0.0
test: ^1.16.0

# Remove this when versions of `package:test` and `shelf_web_socket` that support
# channel_web_socket 3.0 are released.
dependency_overrides:
shelf_web_socket: 1.0.4
test: 1.25.2
34 changes: 34 additions & 0 deletions pkgs/web_socket_channel/test/echo_server_vm.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:io';

import 'package:stream_channel/stream_channel.dart';

Future<void> hybridMain(StreamChannel<Object?> channel) async {
late HttpServer server;

server = (await HttpServer.bind('localhost', 0))
..transform(WebSocketTransformer())
.listen((WebSocket webSocket) => webSocket.listen((data) {
if (data == 'close') {
webSocket.close(3001, 'you asked me to');
} else {
webSocket.add(data);
}
}));

channel.sink.add(server.port);
await channel
.stream.first; // Any writes indicates that the server should exit.
unawaited(server.close());
}

/// Starts an WebSocket server that echos the payload of the request.
Future<StreamChannel<Object?>> startServer() async {
final controller = StreamChannelController<Object?>(sync: true);
unawaited(hybridMain(controller.foreign));
return controller.local;
}
35 changes: 35 additions & 0 deletions pkgs/web_socket_channel/test/echo_server_web.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'package:stream_channel/stream_channel.dart';
import 'package:test/test.dart';

/// Starts an WebSocket server that echos the payload of the request.
/// Copied from `echo_server_vm.dart`.
Future<StreamChannel<Object?>> startServer() async => spawnHybridCode(r'''
import 'dart:async';
import 'dart:io';
import 'package:stream_channel/stream_channel.dart';
/// Starts an WebSocket server that echos the payload of the request.
Future<void> hybridMain(StreamChannel<Object?> channel) async {
late HttpServer server;
server = (await HttpServer.bind('localhost', 0))
..transform(WebSocketTransformer())
.listen((WebSocket webSocket) => webSocket.listen((data) {
if (data == 'close') {
webSocket.close(3001, 'you asked me to');
} else {
webSocket.add(data);
}
}));
channel.sink.add(server.port);
await channel
.stream.first; // Any writes indicates that the server should exit.
unawaited(server.close());
}
''');
16 changes: 8 additions & 8 deletions pkgs/web_socket_channel/test/io_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ void main() {
channel.stream.listen((request) {
expect(request, equals('ping'));
channel.sink.add('pong');
channel.sink.close(5678, 'raisin');
channel.sink.close(3678, 'raisin');
});
});

Expand All @@ -45,7 +45,7 @@ void main() {
}
n++;
}, onDone: expectAsync0(() {
expect(channel.closeCode, equals(5678));
expect(channel.closeCode, equals(3678));
expect(channel.closeReason, equals('raisin'));
}));
});
Expand All @@ -70,7 +70,7 @@ void main() {
channel.stream.listen(
expectAsync1((message) {
expect(message, equals('pong'));
channel.sink.close(5678, 'raisin');
channel.sink.close(3678, 'raisin');
}, count: 1),
onDone: expectAsync0(() {}));
});
Expand All @@ -97,7 +97,7 @@ void main() {
channel.stream.listen(
expectAsync1((message) {
expect(message, equals('pong'));
channel.sink.close(5678, 'raisin');
channel.sink.close(3678, 'raisin');
}, count: 1),
onDone: expectAsync0(() {}));
});
Expand All @@ -109,7 +109,7 @@ void main() {
expect(() async {
final channel = IOWebSocketChannel(webSocket);
await channel.stream.drain<void>();
expect(channel.closeCode, equals(5678));
expect(channel.closeCode, equals(3678));
expect(channel.closeReason, equals('raisin'));
}(), completes);
});
Expand All @@ -118,7 +118,7 @@ void main() {

expect(channel.ready, completes);

await channel.sink.close(5678, 'raisin');
await channel.sink.close(3678, 'raisin');
});

test('.connect wraps a connection error in WebSocketChannelException',
Expand Down Expand Up @@ -192,7 +192,7 @@ void main() {
expect(() async {
final channel = IOWebSocketChannel(webSocket);
await channel.stream.drain<void>();
expect(channel.closeCode, equals(5678));
expect(channel.closeCode, equals(3678));
expect(channel.closeReason, equals('raisin'));
}(), completes);
});
Expand All @@ -202,7 +202,7 @@ void main() {
connectTimeout: const Duration(milliseconds: 1000),
);
expect(channel.ready, completes);
await channel.sink.close(5678, 'raisin');
await channel.sink.close(3678, 'raisin');
});

test('.respects timeout parameter when trying to connect', () async {
Expand Down
Loading

0 comments on commit 01817d2

Please sign in to comment.