Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Better stream and access token management #1019

Merged
merged 17 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/realtime_client/lib/realtime_client.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export 'src/constants.dart' show RealtimeConstants, RealtimeLogLevel;
export 'src/constants.dart'
show RealtimeConstants, RealtimeLogLevel, SocketStates;
export 'src/realtime_channel.dart';
export 'src/realtime_client.dart';
export 'src/realtime_presence.dart';
Expand Down
4 changes: 2 additions & 2 deletions packages/realtime_client/lib/src/constants.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ enum SocketStates {
/// Connection is live and connected
open,

/// Socket is closing.
closing,
Vinzent03 marked this conversation as resolved.
Show resolved Hide resolved
/// Socket is closing by the user
disconnecting,

/// Socket being close not by the user. Realtime should attempt to reconnect.
closed,
Expand Down
23 changes: 20 additions & 3 deletions packages/realtime_client/lib/src/realtime_channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class RealtimeChannel {
socket.remove(this);
});

_onError((String? reason) {
_onError((reason) {
if (isLeaving || isClosed) {
return;
}
Expand Down Expand Up @@ -260,9 +260,9 @@ class RealtimeChannel {
}

/// Registers a callback that will be executed when the channel encounteres an error.
void _onError(void Function(String?) callback) {
void _onError(Function callback) {
onEvents(ChannelEvents.error.eventName(), ChannelFilter(),
(reason, [ref]) => callback(reason?.toString()));
(reason, [ref]) => callback(reason));
}

/// Sets up a listener on your Supabase database.
Expand Down Expand Up @@ -646,6 +646,23 @@ class RealtimeChannel {
joinPush.resend(timeout ?? _timeout);
}

/// Resends [joinPush] to tell the server we join this channel again and marks
/// the channel as [ChannelStates.joining].
///
/// Usually [rejoin] only happens when the channel timeouts or errors out.
/// When manually disconnecting, the channel is still marked as
/// [ChannelStates.joined]. Calling [RealtimeClient.leaveOpenTopic] will
/// unsubscribe itself, which causes issues when trying to rejoin. This method
/// therefore doesn't call [RealtimeClient.leaveOpenTopic].
@internal
void forceRejoin([Duration? timeout]) {
if (isLeaving) {
return;
}
_state = ChannelStates.joining;
joinPush.resend(timeout ?? _timeout);
}

void trigger(String type, [dynamic payload, String? ref]) {
final typeLower = type.toLowerCase();

Expand Down
76 changes: 53 additions & 23 deletions packages/realtime_client/lib/src/realtime_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ class RealtimeCloseEvent {
required this.code,
required this.reason,
});

@override
String toString() {
return 'RealtimeCloseEvent(code: $code, reason: $reason)';
}
}

class RealtimeClient {
Expand Down Expand Up @@ -134,17 +139,17 @@ class RealtimeClient {
(String payload, Function(dynamic result) callback) =>
callback(json.decode(payload));
reconnectTimer = RetryTimer(
() {
disconnect();
connect();
() async {
await disconnect();
await connect();
},
this.reconnectAfterMs,
);
}

/// Connects the socket.
@internal
void connect() async {
Future<void> connect() async {
if (conn != null) {
return;
}
Expand All @@ -153,8 +158,20 @@ class RealtimeClient {
connState = SocketStates.connecting;
conn = transport(endPointURL, headers);

// handle connection errors
conn!.ready.catchError(_onConnError);
try {
await conn!.ready;
} catch (error) {
// Don't schedule a reconnect and emit error if connection has been
// closed by the user or [disconnect] waits for the connection to be
// ready before closing it.
if (connState != SocketStates.disconnected &&
connState != SocketStates.disconnecting) {
connState = SocketStates.closed;
_onConnError(error);
reconnectTimer.scheduleTimeout();
}
return;
}

connState = SocketStates.open;

Expand All @@ -166,7 +183,8 @@ class RealtimeClient {
onError: _onConnError,
onDone: () {
// communication has been closed
if (connState != SocketStates.disconnected) {
if (connState != SocketStates.disconnected &&
connState != SocketStates.disconnecting) {
connState = SocketStates.closed;
}
_onConnClose();
Expand All @@ -179,20 +197,32 @@ class RealtimeClient {
}

/// Disconnects the socket with status [code] and [reason] for the disconnect
void disconnect({int? code, String? reason}) {
Future<void> disconnect({int? code, String? reason}) async {
final conn = this.conn;
if (conn != null) {
connState = SocketStates.disconnected;
if (code != null) {
conn.sink.close(code, reason ?? '');
} else {
conn.sink.close();
final oldState = connState;
connState = SocketStates.disconnecting;

// Connection cannot be closed while it's still connecting. Wait for connection to
// be ready and then close it.
if (oldState == SocketStates.connecting) {
await conn.ready.catchError((_) {});
}

if (oldState == SocketStates.open ||
oldState == SocketStates.connecting) {
if (code != null) {
await conn.sink.close(code, reason ?? '');
} else {
await conn.sink.close();
}
connState = SocketStates.disconnected;
reconnectTimer.reset();
}
this.conn = null;

// remove open handles
if (heartbeatTimer != null) heartbeatTimer?.cancel();
reconnectTimer.reset();
}
}

Expand Down Expand Up @@ -251,8 +281,8 @@ class RealtimeClient {
return 'connecting';
case SocketStates.open:
return 'open';
case SocketStates.closing:
return 'closing';
case SocketStates.disconnecting:
return 'disconnecting';
case SocketStates.disconnected:
return 'disconnected';
case SocketStates.closed:
Expand All @@ -262,7 +292,7 @@ class RealtimeClient {
}

/// Retuns `true` is the connection is open.
bool get isConnected => connectionState == 'open';
bool get isConnected => connState == SocketStates.open;

/// Removes a subscription from the socket.
@internal
Expand Down Expand Up @@ -353,15 +383,15 @@ class RealtimeClient {

for (final channel in channels) {
if (token != null) {
channel.updateJoinPayload({'user_token': token});
Vinzent03 marked this conversation as resolved.
Show resolved Hide resolved
channel.updateJoinPayload({'access_token': token});
}
if (channel.joinedOnce && channel.isJoined) {
channel.push(ChannelEvents.accessToken, {'access_token': token});
}
}
}

/// Unsubscribe from channels with the specified topic.
/// Unsubscribe from joined or joining channels with the specified topic.
@internal
void leaveOpenTopic(String topic) {
final dupChannel = channels.firstWhereOrNull(
Expand Down Expand Up @@ -399,7 +429,7 @@ class RealtimeClient {
/// SocketStates.disconnected: by user with socket.disconnect()
/// SocketStates.closed: NOT by user, should try to reconnect
if (connState == SocketStates.closed) {
_triggerChanError();
_triggerChanError(event);
reconnectTimer.scheduleTimeout();
}
if (heartbeatTimer != null) heartbeatTimer!.cancel();
Expand All @@ -410,15 +440,15 @@ class RealtimeClient {

void _onConnError(dynamic error) {
log('transport', error.toString());
_triggerChanError();
_triggerChanError(error);
for (final callback in stateChangeCallbacks['error']!) {
callback(error);
}
}

void _triggerChanError() {
void _triggerChanError([dynamic error]) {
for (final channel in channels) {
channel.trigger(ChannelEvents.error.eventName());
channel.trigger(ChannelEvents.error.eventName(), error);
}
}

Expand Down
8 changes: 5 additions & 3 deletions packages/realtime_client/test/mock_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,9 @@ void main() {
final subscribeCallback =
expectAsync2((RealtimeSubscribeStatus event, error) {
if (event == RealtimeSubscribeStatus.channelError) {
expect(error, isNull);
expect(error, isA<RealtimeCloseEvent>());
error as RealtimeCloseEvent;
expect(error.reason, "heartbeat timeout");
} else {
expect(event, RealtimeSubscribeStatus.closed);
}
Expand All @@ -285,8 +287,8 @@ void main() {

channel.subscribe(subscribeCallback);

await client.conn!.sink
.close(Constants.wsCloseNormal, "heartbeat timeout");
await Future.delayed(Duration(milliseconds: 200));
await webSocket?.close(Constants.wsCloseNormal, "heartbeat timeout");
});
});

Expand Down
12 changes: 8 additions & 4 deletions packages/realtime_client/test/socket_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ void main() {
});

socket.connect();
await Future.delayed(const Duration(milliseconds: 200));
expect(opens, 1);

socket.sendHeartbeat();
Expand Down Expand Up @@ -214,8 +215,8 @@ void main() {
});

test('removes existing connection', () async {
socket.connect();
socket.disconnect();
await socket.connect();
await socket.disconnect();

expect(socket.conn, null);
});
Expand All @@ -229,7 +230,7 @@ void main() {
expect(closes, 1);
});

test('calls connection close callback', () {
test('calls connection close callback', () async {
final mockedSocketChannel = MockIOWebSocketChannel();
final mockedSocket = RealtimeClient(
socketEndpoint,
Expand All @@ -247,7 +248,10 @@ void main() {
const tReason = 'reason';

mockedSocket.connect();
mockedSocket.connState = SocketStates.open;
await Future.delayed(const Duration(milliseconds: 200));
mockedSocket.disconnect(code: tCode, reason: tReason);
await Future.delayed(const Duration(milliseconds: 200));

verify(
() => mockedSink.close(
Expand Down Expand Up @@ -423,7 +427,7 @@ void main() {
});

group('setAuth', () {
final updateJoinPayload = {'user_token': 'token123'};
final updateJoinPayload = {'access_token': 'token123'};
final pushPayload = {'access_token': 'token123'};

test(
Expand Down
9 changes: 7 additions & 2 deletions packages/supabase/lib/src/supabase_query_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@ class SupabaseQueryBuilder extends PostgrestQueryBuilder {
url: Uri.parse(url),
);

/// Returns real-time data from your table as a `Stream`.
/// Combines the current state of your table from PostgREST with changes from the realtime server to return real-time data from your table as a [Stream].
///
/// Realtime is disabled by default for new tables. You can turn it on by [managing replication](https://supabase.com/docs/guides/realtime/extensions/postgres-changes#replication-setup).
///
/// Pass the list of primary key column names to [primaryKey], which will be used to updating and deleting the proper records internally as the library receives real-time updates.
/// Pass the list of primary key column names to [primaryKey], which will be used to update and delete the proper records internally as the stream receives real-time updates.
///
/// It handles the lifecycle of the realtime connection and automatically refetches data from PostgREST when needed.
///
/// Make sure to provide `onError` and `onDone` callbacks to [Stream.listen] to handle errors and completion of the stream.
/// The stream gets closed when the realtime connection is closed.
///
/// ```dart
/// supabase.from('chats').stream(primaryKey: ['id']).listen(_onChatsReceived);
Expand Down
Loading
Loading