Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to modern Dart style and lints #30

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
build/
packages
# Remove the following pattern if you wish to check in your lock file
pubspec.lock
pubspec.lock.old

# Files created by dart2js
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 0.5.0 (2022-11-12)
- Updated to modern dart style and linting.
- API BREAK: EventSourceReadyState values are now camelCase.

## 0.4.0 (2022-03-23)

- Migrate to null-safety
Expand Down
1 change: 1 addition & 0 deletions analysis_options.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include: package:lints/recommended.yaml
7 changes: 4 additions & 3 deletions example/client_browser.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ main() async {
// approach. This will change once https://github.com/dart-lang/http/issues/1
// is fixed.

EventSource eventSource = await EventSource
.connect("http://example.org/events", client: new BrowserClient());
EventSource eventSource = await EventSource.connect(
"http://example.org/events",
client: BrowserClient());
// listen for events
eventSource.listen((Event event) {
print("New event:");
Expand All @@ -20,7 +21,7 @@ main() async {
String lastId = "iknowmylastid";
eventSource = await EventSource.connect(
"http://example.org/events",
client: new BrowserClient(),
client: BrowserClient(),
lastEventId: lastId,
);
// listen for events
Expand Down
58 changes: 31 additions & 27 deletions lib/eventsource.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,20 @@ import "dart:async";
import "dart:convert";

import "package:http/http.dart" as http;
import "package:http/src/utils.dart" show encodingForCharset;
import "package:http_parser/http_parser.dart" show MediaType;

import "src/event.dart";
import "src/decoder.dart";

enum EventSourceReadyState {
CONNECTING,
OPEN,
CLOSED,
connecting,
open,
closed,
}

Encoding encodingForCharset(String? charset, [Encoding fallback = latin1]) {
if (charset == null) return fallback;
return Encoding.getByName(charset) ?? fallback;
}

class EventSourceSubscriptionException extends Event implements Exception {
Expand All @@ -38,23 +42,23 @@ class EventSource extends Stream<Event> {

EventSourceReadyState get readyState => _readyState;

Stream<Event> get onOpen => this.where((e) => e.event == "open");
Stream<Event> get onMessage => this.where((e) => e.event == "message");
Stream<Event> get onError => this.where((e) => e.event == "error");
Stream<Event> get onOpen => where((e) => e.event == "open");
Stream<Event> get onMessage => where((e) => e.event == "message");
Stream<Event> get onError => where((e) => e.event == "error");

// internal attributes

StreamController<Event> _streamController =
new StreamController<Event>.broadcast();
final StreamController<Event> _streamController =
StreamController<Event>.broadcast();

EventSourceReadyState _readyState = EventSourceReadyState.CLOSED;
EventSourceReadyState _readyState = EventSourceReadyState.closed;

http.Client client;
Duration _retryDelay = const Duration(milliseconds: 3000);
String? _lastEventId;
late EventSourceDecoder _decoder;
String _body;
String _method;
final String _body;
final String _method;

/// Create a new EventSource by connecting to the specified url.
static Future<EventSource> connect(url,
Expand All @@ -65,31 +69,31 @@ class EventSource extends Stream<Event> {
String? method}) async {
// parameter initialization
url = url is Uri ? url : Uri.parse(url);
client = client ?? new http.Client();
client = client ?? http.Client();
body = body ?? "";
method = method ?? "GET";
EventSource es = new EventSource._internal(
url, client, lastEventId, headers, body, method);
EventSource es =
EventSource._internal(url, client, lastEventId, headers, body, method);
await es._start();
return es;
}

EventSource._internal(this.url, this.client, this._lastEventId, this.headers,
this._body, this._method) {
_decoder = new EventSourceDecoder(retryIndicator: _updateRetryDelay);
_decoder = EventSourceDecoder(retryIndicator: _updateRetryDelay);
}

// proxy the listen call to the controller's listen call
@override
StreamSubscription<Event> listen(void onData(Event event)?,
{Function? onError, void onDone()?, bool? cancelOnError}) =>
StreamSubscription<Event> listen(void Function(Event event)? onData,
{Function? onError, void Function()? onDone, bool? cancelOnError}) =>
_streamController.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);

/// Attempt to start a new connection.
Future _start() async {
_readyState = EventSourceReadyState.CONNECTING;
var request = new http.Request(_method, url);
_readyState = EventSourceReadyState.connecting;
var request = http.Request(_method, url);
request.headers["Cache-Control"] = "no-cache";
request.headers["Accept"] = "text/event-stream";
if (_lastEventId?.isNotEmpty == true) {
Expand All @@ -105,26 +109,26 @@ class EventSource extends Stream<Event> {
// server returned an error
var bodyBytes = await response.stream.toBytes();
String body = _encodingForHeaders(response.headers).decode(bodyBytes);
throw new EventSourceSubscriptionException(response.statusCode, body);
throw EventSourceSubscriptionException(response.statusCode, body);
}
_readyState = EventSourceReadyState.OPEN;
_readyState = EventSourceReadyState.open;
// start streaming the data
response.stream.transform(_decoder).listen((Event event) {
_streamController.add(event);
_lastEventId = event.id;
},
cancelOnError: true,
onError: _retry,
onDone: () => _readyState = EventSourceReadyState.CLOSED);
onDone: () => _readyState = EventSourceReadyState.closed);
}

/// Retries until a new connection is established. Uses exponential backoff.
Future _retry(dynamic e) async {
_readyState = EventSourceReadyState.CONNECTING;
_readyState = EventSourceReadyState.connecting;
// try reopening with exponential backoff
Duration backoff = _retryDelay;
while (true) {
await new Future.delayed(backoff);
await Future.delayed(backoff);
try {
await _start();
break;
Expand All @@ -151,6 +155,6 @@ Encoding _encodingForHeaders(Map<String, String> headers) =>
/// Defaults to `application/octet-stream`.
MediaType _contentTypeForHeaders(Map<String, String> headers) {
var contentType = headers['content-type'];
if (contentType != null) return new MediaType.parse(contentType);
return new MediaType("application", "octet-stream");
if (contentType != null) return MediaType.parse(contentType);
return MediaType("application", "octet-stream");
}
7 changes: 3 additions & 4 deletions lib/io_server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ import "dart:io" as io;
import "package:sync/waitgroup.dart";

import "publisher.dart";
import "src/event.dart";
import "src/encoder.dart";

/// Create a handler to serve [io.HttpRequest] objects for the specified
/// channel.
/// This method can be passed to the [io.HttpServer.listen] method.
Function createIoHandler(EventSourcePublisher publisher,
{String channel: "", bool gzip: false}) {
{String channel = "", bool gzip = false}) {
void ioHandler(io.HttpRequest request) {
io.HttpResponse response = request.response;

Expand All @@ -30,13 +29,13 @@ Function createIoHandler(EventSourcePublisher publisher,
if (useGzip) response.headers.set("Content-Encoding", "gzip");
// a wait group to keep track of flushes in order not to close while
// flushing
WaitGroup flushes = new WaitGroup();
WaitGroup flushes = WaitGroup();
// flush the headers
flushes.add(1);
response.flush().then((_) => flushes.done());

// create encoder for this connection
var encodedSink = new EventSourceEncoder(compressed: useGzip)
var encodedSink = EventSourceEncoder(compressed: useGzip)
.startChunkedConversion(response);

// define the methods for pushing events and closing the connection
Expand Down
20 changes: 10 additions & 10 deletions lib/publisher.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,31 @@ class EventSourcePublisher extends Sink<Event> {
/// If your Event's id properties are not incremental using
/// [Comparable.compare], set [comparableIds] to false.
EventSourcePublisher({
int cacheCapacity: 0,
bool comparableIds: false,
bool enableLogging: true,
int cacheCapacity = 0,
bool comparableIds = false,
bool enableLogging = true,
}) {
if (cacheCapacity > 0) {
_cache = new EventCache(cacheCapacity: cacheCapacity);
_cache = EventCache(cacheCapacity: cacheCapacity);
}
if (enableLogging) {
logger = new log.Logger("EventSourceServer");
logger = log.Logger("EventSourceServer");
}
}

Map<String, List<ProxySink>> _subsByChannel = {};
final Map<String, List<ProxySink>> _subsByChannel = {};

/// Creates a Sink for the specified channel.
/// The `add` and `remove` methods of this channel are equivalent to the
/// respective methods of this class with the specific channel passed along.
Sink<Event> channel(String channel) => new ProxySink(
Sink<Event> channel(String channel) => ProxySink(
onAdd: (e) => add(e, channels: [channel]),
onClose: () => close(channels: [channel]));

/// Add a publication to the specified channels.
/// By default, only adds to the default channel.
@override
void add(Event event, {Iterable<String> channels: const [""]}) {
void add(Event event, {Iterable<String> channels = const [""]}) {
for (String channel in channels) {
List<ProxySink>? subs = _subsByChannel[channel];
if (subs == null) {
Expand All @@ -70,7 +70,7 @@ class EventSourcePublisher extends Sink<Event> {
/// All the connections with the subscribers to this channels will be closed.
/// By default only closes the default channel.
@override
void close({Iterable<String> channels: const [""]}) {
void close({Iterable<String> channels = const [""]}) {
for (String channel in channels) {
List<ProxySink>? subs = _subsByChannel[channel];
if (subs == null) {
Expand All @@ -97,7 +97,7 @@ class EventSourcePublisher extends Sink<Event> {
}) {
_logFine("New subscriber on channel $channel.");
// create a sink for the subscription
ProxySink<Event> sub = new ProxySink(onAdd: onEvent, onClose: onClose);
ProxySink<Event> sub = ProxySink(onAdd: onEvent, onClose: onClose);
// save the subscription
_subsByChannel.putIfAbsent(channel, () => []).add(sub);
// replay past events
Expand Down
20 changes: 11 additions & 9 deletions lib/src/decoder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,21 @@ class EventSourceDecoder implements StreamTransformer<List<int>, Event> {

EventSourceDecoder({this.retryIndicator});

@override
Stream<Event> bind(Stream<List<int>> stream) {
late StreamController<Event> controller;
controller = new StreamController(onListen: () {
controller = StreamController(onListen: () {
// the event we are currently building
Event currentEvent = new Event();
Event currentEvent = Event();
// the regexes we will use later
RegExp lineRegex = new RegExp(r"^([^:]*)(?::)?(?: )?(.*)?$");
RegExp removeEndingNewlineRegex = new RegExp(r"^((?:.|\n)*)\n$");
RegExp lineRegex = RegExp(r"^([^:]*)(?::)?(?: )?(.*)?$");
RegExp removeEndingNewlineRegex = RegExp(r"^((?:.|\n)*)\n$");
// This stream will receive chunks of data that is not necessarily a
// single event. So we build events on the fly and broadcast the event as
// soon as we encounter a double newline, then we start a new one.
stream
.transform(new Utf8Decoder())
.transform(new LineSplitter())
.transform(Utf8Decoder())
.transform(LineSplitter())
.listen((String line) {
if (line.isEmpty) {
// event is done
Expand All @@ -35,7 +36,7 @@ class EventSourceDecoder implements StreamTransformer<List<int>, Event> {
currentEvent.data = match?.group(1);
}
controller.add(currentEvent);
currentEvent = new Event();
currentEvent = Event();
return;
}
// match the line prefix and the value using the regex
Expand All @@ -51,20 +52,21 @@ class EventSourceDecoder implements StreamTransformer<List<int>, Event> {
currentEvent.event = value;
break;
case "data":
currentEvent.data = (currentEvent.data ?? "") + value + "\n";
currentEvent.data = "${currentEvent.data ?? ""}$value\n";
break;
case "id":
currentEvent.id = value;
break;
case "retry":
retryIndicator?.call(new Duration(milliseconds: int.parse(value)));
retryIndicator?.call(Duration(milliseconds: int.parse(value)));
break;
}
});
});
return controller.stream;
}

@override
StreamTransformer<RS, RT> cast<RS, RT>() =>
StreamTransformer.castFrom<List<int>, Event, RS, RT>(this);
}
12 changes: 6 additions & 6 deletions lib/src/encoder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ import "proxy_sink.dart";
class EventSourceEncoder extends Converter<Event, List<int>> {
final bool compressed;

const EventSourceEncoder({bool this.compressed: false});
const EventSourceEncoder({this.compressed = false});

static Map<String, Function> _fields = {
static final Map<String, Function> _fields = {
"id: ": (e) => e.id,
"event: ": (e) => e.event,
"data: ": (e) => e.data,
};

@override
List<int> convert(Event event) {
String payload = convertToString(event);
List<int> convert(Event input) {
String payload = convertToString(input);
List<int> bytes = utf8.encode(payload);
if (compressed) {
bytes = gzip.encode(bytes);
Expand All @@ -36,7 +36,7 @@ class EventSourceEncoder extends Converter<Event, List<int>> {
}
// multi-line values need the field prefix on every line
value = value.replaceAll("\n", "\n$prefix");
payload += prefix + value + "\n";
payload += "$prefix$value\n";
}
payload += "\n";
return payload;
Expand All @@ -51,7 +51,7 @@ class EventSourceEncoder extends Converter<Event, List<int>> {
}
inputSink =
utf8.encoder.startChunkedConversion(inputSink as Sink<List<int>>);
return new ProxySink(
return ProxySink(
onAdd: (Event event) => inputSink.add(convertToString(event)),
onClose: () => inputSink.close());
}
Expand Down
4 changes: 2 additions & 2 deletions lib/src/event_cache.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import "event.dart";
class EventCache {
final int? cacheCapacity;
final bool comparableIds;
Map<String, List<Event>> _caches = <String, List<Event>>{};
final Map<String, List<Event>> _caches = <String, List<Event>>{};

EventCache({this.cacheCapacity, this.comparableIds: true});
EventCache({this.cacheCapacity, this.comparableIds = true});

void replay(Sink<Event> sink, String lastEventId, [String channel = ""]) {
List<Event>? cache = _caches[channel];
Expand Down
4 changes: 2 additions & 2 deletions pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
name: eventsource
description: A client and server implementation of Server-Sent Events.
version: 0.4.0
author: Steven Roose <[email protected]>
version: 0.5.0
homepage: https://github.com/stevenroose/dart-eventsource

environment:
Expand All @@ -15,4 +14,5 @@ dependencies:
sync: ^0.3.0

dev_dependencies:
lints: ^2.0.1
test: ^1.17.11
Loading