diff --git a/packages/realtime_client/lib/realtime_client.dart b/packages/realtime_client/lib/realtime_client.dart index 660dd2f6..84f158c5 100644 --- a/packages/realtime_client/lib/realtime_client.dart +++ b/packages/realtime_client/lib/realtime_client.dart @@ -1,4 +1,5 @@ -export 'src/constants.dart' show RealtimeConstants, RealtimeLogLevel; +export 'src/constants.dart' + show RealtimeConstants, RealtimeLogLevel, SocketStates; export 'src/realtime_channel.dart'; export 'src/realtime_client.dart'; export 'src/realtime_presence.dart'; diff --git a/packages/realtime_client/lib/src/constants.dart b/packages/realtime_client/lib/src/constants.dart index 1d14f9d6..1f9e0b87 100644 --- a/packages/realtime_client/lib/src/constants.dart +++ b/packages/realtime_client/lib/src/constants.dart @@ -18,8 +18,8 @@ enum SocketStates { /// Connection is live and connected open, - /// Socket is closing. - closing, + /// Socket is closing by the user + disconnecting, /// Socket being close not by the user. Realtime should attempt to reconnect. closed, diff --git a/packages/realtime_client/lib/src/realtime_channel.dart b/packages/realtime_client/lib/src/realtime_channel.dart index 42a2470c..7c37d800 100644 --- a/packages/realtime_client/lib/src/realtime_channel.dart +++ b/packages/realtime_client/lib/src/realtime_channel.dart @@ -70,7 +70,7 @@ class RealtimeChannel { socket.remove(this); }); - _onError((String? reason) { + _onError((reason) { if (isLeaving || isClosed) { return; } @@ -260,9 +260,9 @@ class RealtimeChannel { } /// Registers a callback that will be executed when the channel encounteres an error. - void _onError(void Function(String?) callback) { + void _onError(Function callback) { onEvents(ChannelEvents.error.eventName(), ChannelFilter(), - (reason, [ref]) => callback(reason?.toString())); + (reason, [ref]) => callback(reason)); } /// Sets up a listener on your Supabase database. @@ -646,6 +646,23 @@ class RealtimeChannel { joinPush.resend(timeout ?? _timeout); } + /// Resends [joinPush] to tell the server we join this channel again and marks + /// the channel as [ChannelStates.joining]. + /// + /// Usually [rejoin] only happens when the channel timeouts or errors out. + /// When manually disconnecting, the channel is still marked as + /// [ChannelStates.joined]. Calling [RealtimeClient.leaveOpenTopic] will + /// unsubscribe itself, which causes issues when trying to rejoin. This method + /// therefore doesn't call [RealtimeClient.leaveOpenTopic]. + @internal + void forceRejoin([Duration? timeout]) { + if (isLeaving) { + return; + } + _state = ChannelStates.joining; + joinPush.resend(timeout ?? _timeout); + } + void trigger(String type, [dynamic payload, String? ref]) { final typeLower = type.toLowerCase(); diff --git a/packages/realtime_client/lib/src/realtime_client.dart b/packages/realtime_client/lib/src/realtime_client.dart index 6873a2ca..fe7eefd3 100644 --- a/packages/realtime_client/lib/src/realtime_client.dart +++ b/packages/realtime_client/lib/src/realtime_client.dart @@ -45,6 +45,11 @@ class RealtimeCloseEvent { required this.code, required this.reason, }); + + @override + String toString() { + return 'RealtimeCloseEvent(code: $code, reason: $reason)'; + } } class RealtimeClient { @@ -134,9 +139,9 @@ class RealtimeClient { (String payload, Function(dynamic result) callback) => callback(json.decode(payload)); reconnectTimer = RetryTimer( - () { - disconnect(); - connect(); + () async { + await disconnect(); + await connect(); }, this.reconnectAfterMs, ); @@ -144,7 +149,7 @@ class RealtimeClient { /// Connects the socket. @internal - void connect() async { + Future connect() async { if (conn != null) { return; } @@ -153,8 +158,20 @@ class RealtimeClient { connState = SocketStates.connecting; conn = transport(endPointURL, headers); - // handle connection errors - conn!.ready.catchError(_onConnError); + try { + await conn!.ready; + } catch (error) { + // Don't schedule a reconnect and emit error if connection has been + // closed by the user or [disconnect] waits for the connection to be + // ready before closing it. + if (connState != SocketStates.disconnected && + connState != SocketStates.disconnecting) { + connState = SocketStates.closed; + _onConnError(error); + reconnectTimer.scheduleTimeout(); + } + return; + } connState = SocketStates.open; @@ -166,7 +183,8 @@ class RealtimeClient { onError: _onConnError, onDone: () { // communication has been closed - if (connState != SocketStates.disconnected) { + if (connState != SocketStates.disconnected && + connState != SocketStates.disconnecting) { connState = SocketStates.closed; } _onConnClose(); @@ -179,20 +197,32 @@ class RealtimeClient { } /// Disconnects the socket with status [code] and [reason] for the disconnect - void disconnect({int? code, String? reason}) { + Future disconnect({int? code, String? reason}) async { final conn = this.conn; if (conn != null) { - connState = SocketStates.disconnected; - if (code != null) { - conn.sink.close(code, reason ?? ''); - } else { - conn.sink.close(); + final oldState = connState; + connState = SocketStates.disconnecting; + + // Connection cannot be closed while it's still connecting. Wait for connection to + // be ready and then close it. + if (oldState == SocketStates.connecting) { + await conn.ready.catchError((_) {}); + } + + if (oldState == SocketStates.open || + oldState == SocketStates.connecting) { + if (code != null) { + await conn.sink.close(code, reason ?? ''); + } else { + await conn.sink.close(); + } + connState = SocketStates.disconnected; + reconnectTimer.reset(); } this.conn = null; // remove open handles if (heartbeatTimer != null) heartbeatTimer?.cancel(); - reconnectTimer.reset(); } } @@ -251,8 +281,8 @@ class RealtimeClient { return 'connecting'; case SocketStates.open: return 'open'; - case SocketStates.closing: - return 'closing'; + case SocketStates.disconnecting: + return 'disconnecting'; case SocketStates.disconnected: return 'disconnected'; case SocketStates.closed: @@ -262,7 +292,7 @@ class RealtimeClient { } /// Retuns `true` is the connection is open. - bool get isConnected => connectionState == 'open'; + bool get isConnected => connState == SocketStates.open; /// Removes a subscription from the socket. @internal @@ -353,7 +383,7 @@ class RealtimeClient { for (final channel in channels) { if (token != null) { - channel.updateJoinPayload({'user_token': token}); + channel.updateJoinPayload({'access_token': token}); } if (channel.joinedOnce && channel.isJoined) { channel.push(ChannelEvents.accessToken, {'access_token': token}); @@ -361,7 +391,7 @@ class RealtimeClient { } } - /// Unsubscribe from channels with the specified topic. + /// Unsubscribe from joined or joining channels with the specified topic. @internal void leaveOpenTopic(String topic) { final dupChannel = channels.firstWhereOrNull( @@ -399,7 +429,7 @@ class RealtimeClient { /// SocketStates.disconnected: by user with socket.disconnect() /// SocketStates.closed: NOT by user, should try to reconnect if (connState == SocketStates.closed) { - _triggerChanError(); + _triggerChanError(event); reconnectTimer.scheduleTimeout(); } if (heartbeatTimer != null) heartbeatTimer!.cancel(); @@ -410,15 +440,15 @@ class RealtimeClient { void _onConnError(dynamic error) { log('transport', error.toString()); - _triggerChanError(); + _triggerChanError(error); for (final callback in stateChangeCallbacks['error']!) { callback(error); } } - void _triggerChanError() { + void _triggerChanError([dynamic error]) { for (final channel in channels) { - channel.trigger(ChannelEvents.error.eventName()); + channel.trigger(ChannelEvents.error.eventName(), error); } } diff --git a/packages/realtime_client/test/mock_test.dart b/packages/realtime_client/test/mock_test.dart index ccd77089..89455fb7 100644 --- a/packages/realtime_client/test/mock_test.dart +++ b/packages/realtime_client/test/mock_test.dart @@ -268,7 +268,9 @@ void main() { final subscribeCallback = expectAsync2((RealtimeSubscribeStatus event, error) { if (event == RealtimeSubscribeStatus.channelError) { - expect(error, isNull); + expect(error, isA()); + error as RealtimeCloseEvent; + expect(error.reason, "heartbeat timeout"); } else { expect(event, RealtimeSubscribeStatus.closed); } @@ -285,8 +287,8 @@ void main() { channel.subscribe(subscribeCallback); - await client.conn!.sink - .close(Constants.wsCloseNormal, "heartbeat timeout"); + await Future.delayed(Duration(milliseconds: 200)); + await webSocket?.close(Constants.wsCloseNormal, "heartbeat timeout"); }); }); diff --git a/packages/realtime_client/test/socket_test.dart b/packages/realtime_client/test/socket_test.dart index a5463ab3..79fe1306 100644 --- a/packages/realtime_client/test/socket_test.dart +++ b/packages/realtime_client/test/socket_test.dart @@ -171,6 +171,7 @@ void main() { }); socket.connect(); + await Future.delayed(const Duration(milliseconds: 200)); expect(opens, 1); socket.sendHeartbeat(); @@ -214,8 +215,8 @@ void main() { }); test('removes existing connection', () async { - socket.connect(); - socket.disconnect(); + await socket.connect(); + await socket.disconnect(); expect(socket.conn, null); }); @@ -229,7 +230,7 @@ void main() { expect(closes, 1); }); - test('calls connection close callback', () { + test('calls connection close callback', () async { final mockedSocketChannel = MockIOWebSocketChannel(); final mockedSocket = RealtimeClient( socketEndpoint, @@ -247,7 +248,10 @@ void main() { const tReason = 'reason'; mockedSocket.connect(); + mockedSocket.connState = SocketStates.open; + await Future.delayed(const Duration(milliseconds: 200)); mockedSocket.disconnect(code: tCode, reason: tReason); + await Future.delayed(const Duration(milliseconds: 200)); verify( () => mockedSink.close( @@ -423,7 +427,7 @@ void main() { }); group('setAuth', () { - final updateJoinPayload = {'user_token': 'token123'}; + final updateJoinPayload = {'access_token': 'token123'}; final pushPayload = {'access_token': 'token123'}; test( diff --git a/packages/supabase/lib/src/supabase_query_builder.dart b/packages/supabase/lib/src/supabase_query_builder.dart index 1c9b4bcb..dd31713a 100644 --- a/packages/supabase/lib/src/supabase_query_builder.dart +++ b/packages/supabase/lib/src/supabase_query_builder.dart @@ -23,11 +23,16 @@ class SupabaseQueryBuilder extends PostgrestQueryBuilder { url: Uri.parse(url), ); - /// Returns real-time data from your table as a `Stream`. + /// Combines the current state of your table from PostgREST with changes from the realtime server to return real-time data from your table as a [Stream]. /// /// Realtime is disabled by default for new tables. You can turn it on by [managing replication](https://supabase.com/docs/guides/realtime/extensions/postgres-changes#replication-setup). /// - /// Pass the list of primary key column names to [primaryKey], which will be used to updating and deleting the proper records internally as the library receives real-time updates. + /// Pass the list of primary key column names to [primaryKey], which will be used to update and delete the proper records internally as the stream receives real-time updates. + /// + /// It handles the lifecycle of the realtime connection and automatically refetches data from PostgREST when needed. + /// + /// Make sure to provide `onError` and `onDone` callbacks to [Stream.listen] to handle errors and completion of the stream. + /// The stream gets closed when the realtime connection is closed. /// /// ```dart /// supabase.from('chats').stream(primaryKey: ['id']).listen(_onChatsReceived); diff --git a/packages/supabase/lib/src/supabase_stream_builder.dart b/packages/supabase/lib/src/supabase_stream_builder.dart index 0807c734..5e0aac6a 100644 --- a/packages/supabase/lib/src/supabase_stream_builder.dart +++ b/packages/supabase/lib/src/supabase_stream_builder.dart @@ -31,6 +31,18 @@ class _Order { final bool ascending; } +class RealtimeSubscribeException implements Exception { + RealtimeSubscribeException(this.status, [this.details]); + + final RealtimeSubscribeStatus status; + final Object? details; + + @override + String toString() { + return 'RealtimeSubscribeException(status: $status, details: $details)'; + } +} + typedef SupabaseStreamEvent = List>; class SupabaseStreamBuilder extends Stream { @@ -64,6 +76,9 @@ class SupabaseStreamBuilder extends Stream { /// Count of record to be returned int? _limit; + /// Flag that the stream has at least one time been subscribed to realtime + bool _wasSubscribed = false; + SupabaseStreamBuilder({ required PostgrestQueryBuilder queryBuilder, required String realtimeTopic, @@ -195,12 +210,31 @@ class SupabaseStreamBuilder extends Stream { } }) .subscribe((status, [error]) { - if (error != null) { - _addException(error); + switch (status) { + case RealtimeSubscribeStatus.subscribed: + // Reload all data after a reconnect from postgrest + // First data from postgrest gets loaded before the realtime connect + if (_wasSubscribed) { + _getPostgrestData(); + } + _wasSubscribed = true; + break; + case RealtimeSubscribeStatus.closed: + _streamController?.close(); + break; + case RealtimeSubscribeStatus.timedOut: + _addException(RealtimeSubscribeException(status, error)); + break; + case RealtimeSubscribeStatus.channelError: + _addException(RealtimeSubscribeException(status, error)); + break; } }); + _getPostgrestData(); + } - PostgrestFilterBuilder query = _queryBuilder.select(); + Future _getPostgrestData() async { + PostgrestFilterBuilder query = _queryBuilder.select(); if (_streamFilter != null) { switch (_streamFilter!.type) { case PostgresChangeFilterType.eq: @@ -226,7 +260,7 @@ class SupabaseStreamBuilder extends Stream { break; } } - PostgrestTransformBuilder? transformQuery; + PostgrestTransformBuilder? transformQuery; if (_orderBy != null) { transformQuery = query.order(_orderBy!.column, ascending: _orderBy!.ascending); @@ -237,11 +271,15 @@ class SupabaseStreamBuilder extends Stream { try { final data = await (transformQuery ?? query); - final rows = SupabaseStreamEvent.from(data as List); - _streamData.addAll(rows); + final rows = SupabaseStreamEvent.from(data); + _streamData = rows; _addStream(); } catch (error, stackTrace) { _addException(error, stackTrace); + // In case the postgrest call fails, there is no need to keep the + // realtime connection open + _channel?.unsubscribe(); + _streamController?.close(); } } diff --git a/packages/supabase_flutter/lib/src/supabase.dart b/packages/supabase_flutter/lib/src/supabase.dart index 7cb2930e..0608707c 100644 --- a/packages/supabase_flutter/lib/src/supabase.dart +++ b/packages/supabase_flutter/lib/src/supabase.dart @@ -107,7 +107,7 @@ class Supabase { accessToken: accessToken, ); _instance._debugEnable = debug ?? kDebugMode; - _instance.log('***** Supabase init completed $_instance'); + _instance.log('***** Supabase init completed *****'); _instance._supabaseAuth = SupabaseAuth(); await _instance._supabaseAuth.initialize(options: authOptions); diff --git a/packages/supabase_flutter/lib/src/supabase_auth.dart b/packages/supabase_flutter/lib/src/supabase_auth.dart index fa3d72e8..80cf6248 100644 --- a/packages/supabase_flutter/lib/src/supabase_auth.dart +++ b/packages/supabase_flutter/lib/src/supabase_auth.dart @@ -4,6 +4,7 @@ import 'dart:io' show Platform; import 'dart:math'; import 'package:app_links/app_links.dart'; +import 'package:async/async.dart'; import 'package:flutter/foundation.dart' show kIsWeb; import 'package:flutter/material.dart'; import 'package:flutter/services.dart'; @@ -29,6 +30,8 @@ class SupabaseAuth with WidgetsBindingObserver { StreamSubscription? _deeplinkSubscription; + CancelableOperation? _realtimeReconnectOperation; + final _appLinks = AppLinks(); /// - Obtains session from local storage and sets it as the current session @@ -113,17 +116,75 @@ class SupabaseAuth with WidgetsBindingObserver { void didChangeAppLifecycleState(AppLifecycleState state) { switch (state) { case AppLifecycleState.resumed: - if (_autoRefreshToken) { - Supabase.instance.client.auth.startAutoRefresh(); - } + onResumed(); case AppLifecycleState.detached: - case AppLifecycleState.inactive: case AppLifecycleState.paused: - Supabase.instance.client.auth.stopAutoRefresh(); + if (kIsWeb || Platform.isAndroid || Platform.isIOS) { + Supabase.instance.client.auth.stopAutoRefresh(); + _realtimeReconnectOperation?.cancel(); + Supabase.instance.client.realtime.disconnect(); + } default: } } + Future onResumed() async { + if (_autoRefreshToken) { + Supabase.instance.client.auth.startAutoRefresh(); + } + final realtime = Supabase.instance.client.realtime; + if (realtime.channels.isNotEmpty) { + if (realtime.connState == SocketStates.disconnecting) { + // If the socket is still disconnecting from e.g. + // [AppLifecycleState.paused] we should wait for it to finish before + // reconnecting. + + bool cancel = false; + final connectFuture = realtime.conn!.sink.done.then( + (_) async { + // Make this connect cancelable so that it does not connect if the + // disconnect took so long that the app is already in background + // again. + + if (!cancel) { + // ignore: invalid_use_of_internal_member + await realtime.connect(); + for (final channel in realtime.channels) { + // ignore: invalid_use_of_internal_member + if (channel.isJoined) { + channel.forceRejoin(); + } + } + } + }, + onError: (error) {}, + ); + _realtimeReconnectOperation = CancelableOperation.fromFuture( + connectFuture, + onCancel: () => cancel = true, + ); + } else if (!realtime.isConnected) { + // Reconnect if the socket is currently not connected. + // When coming from [AppLifecycleState.paused] this should be the case, + // but when coming from [AppLifecycleState.inactive] no disconnect + // happened and therefore connection should still be intanct and we + // should not reconnect. + + // ignore: invalid_use_of_internal_member + await realtime.connect(); + for (final channel in realtime.channels) { + // Only rejoin channels that think they are still joined and not + // which were manually unsubscribed by the user while in background + + // ignore: invalid_use_of_internal_member + if (channel.isJoined) { + channel.forceRejoin(); + } + } + } + } + } + void _onAuthStateChange(AuthChangeEvent event, Session? session) { Supabase.instance.log('**** onAuthStateChange: $event'); if (session != null) {