Skip to content
This repository has been archived by the owner on Jan 6, 2025. It is now read-only.

Use Fetch API in SSE Client #66

Merged
merged 4 commits into from
Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 4.1.2-dev

- Send `fetch` requests instead of `XHR` requests.

## 4.1.1

- Apply `keepAlive` logic to `SocketException`s.
Expand Down
62 changes: 50 additions & 12 deletions lib/client/sse_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import 'dart:async';
import 'dart:convert';
import 'dart:html';

import 'package:js/js.dart';
import 'package:logging/logging.dart';
import 'package:pool/pool.dart';
import 'package:stream_channel/stream_channel.dart';
Expand Down Expand Up @@ -64,13 +65,7 @@ class SseClient extends StreamChannelMixin<String?> {
// By default the SSE client uses keep-alive.
// Allow for a retry to connect before giving up.
_errorTimer = Timer(const Duration(seconds: 5), () {
_incomingController.addError(error);
close();
if (!_onConnected.isCompleted) {
// This call must happen after the call to close() which checks
// whether the completer was completed earlier.
_onConnected.completeError(error);
}
_closeWithError(error);
});
}
});
Expand Down Expand Up @@ -103,6 +98,16 @@ class SseClient extends StreamChannelMixin<String?> {
_outgoingController.close();
}

void _closeWithError(Object error) {
_incomingController.addError(error);
close();
if (!_onConnected.isCompleted) {
// This call must happen after the call to close() which checks
// whether the completer was completed earlier.
_onConnected.completeError(error);
}
}

void _onIncomingControlMessage(Event message) {
var data = (message as MessageEvent).data;
if (data == 'close') {
Expand Down Expand Up @@ -133,12 +138,45 @@ class SseClient extends StreamChannelMixin<String?> {
_logger.warning('Invalid argument: $e');
}
try {
await HttpRequest.request('$_serverUrl&messageId=${++_lastMessageId}',
method: 'POST', sendData: encodedMessage, withCredentials: true);
} catch (e) {
_logger.severe('Failed to send $message:\n $e');
close();
final url = '$_serverUrl&messageId=${++_lastMessageId}';
await _fetch(
url,
_FetchOptions(
method: 'POST',
body: encodedMessage,
credentialsOptions:
_CredentialsOptions(credentials: 'include')));
} catch (error) {
final augmentedError = 'SSE client failed to send $message:\n $error';
_logger.severe(augmentedError);
_closeWithError(augmentedError);
}
});
}
}

// Custom implementation of Fetch API until Dart supports GET vs. POST,
// credentials, etc. See https://github.com/dart-lang/http/issues/595.
@JS('fetch')
external Object _nativeJsFetch(String resourceUrl, _FetchOptions options);

Future<dynamic> _fetch(String resourceUrl, _FetchOptions options) =>
promiseToFuture(_nativeJsFetch(resourceUrl, options));

@JS()
@anonymous
class _FetchOptions {
external factory _FetchOptions({
required String method, // e.g., 'GET', 'POST'
required _CredentialsOptions credentialsOptions,
required String? body,
});
}

@JS()
@anonymous
class _CredentialsOptions {
external factory _CredentialsOptions({
required String credentials, // e.g., 'omit', 'same-origin', 'include'
});
}
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: sse
version: 4.1.1
version: 4.1.2-dev
description: >-
Provides client and server functionality for setting up bi-directional
communication through Server Sent Events (SSE) and corresponding POST
Expand Down