From 17bf76ffcdff57885e2b2654a208438f69333da1 Mon Sep 17 00:00:00 2001
From: dshukertjr <dshukertjr@gmail.com>
Date: Tue, 10 Dec 2024 17:25:26 +0900
Subject: [PATCH 1/7] fix: prevent sending expired tokens

---
 .../lib/src/realtime_client.dart              | 40 +++++++-
 packages/realtime_client/pubspec.yaml         |  1 +
 .../realtime_client/test/socket_test.dart     | 91 ++++++++++++++++++-
 3 files changed, 124 insertions(+), 8 deletions(-)

diff --git a/packages/realtime_client/lib/src/realtime_client.dart b/packages/realtime_client/lib/src/realtime_client.dart
index f5e6f5fa..67b5de66 100644
--- a/packages/realtime_client/lib/src/realtime_client.dart
+++ b/packages/realtime_client/lib/src/realtime_client.dart
@@ -54,6 +54,7 @@ class RealtimeCloseEvent {
 }
 
 class RealtimeClient {
+  // This is named `accessTokenValue` in supabase-js
   String? accessToken;
   List<RealtimeChannel> channels = [];
   final String endPoint;
@@ -89,6 +90,8 @@ class RealtimeClient {
   };
   int longpollerTimeout = 20000;
   SocketStates? connState;
+  // This is called `accessToken` in realtime-js
+  Future<String> Function()? customAccessToken;
 
   /// Initializes the Socket
   ///
@@ -403,15 +406,42 @@ class RealtimeClient {
   /// Sets the JWT access token used for channel subscription authorization and Realtime RLS.
   ///
   /// `token` A JWT strings.
-  void setAuth(String? token) {
-    accessToken = token;
+  Future<void> setAuth(String? token) async {
+    final tokenToSend =
+        token ?? (await customAccessToken?.call()) ?? accessToken;
+
+    if (tokenToSend != null) {
+      Map<String, dynamic>? parsed;
+      try {
+        final decoded =
+            utf8.decode(base64Url.decode(tokenToSend.split('.')[1]));
+        parsed = json.decode(decoded);
+      } catch (e) {
+        // ignore parsing errors
+      }
+      if (parsed != null && parsed['exp'] != null) {
+        final now = (DateTime.now().millisecondsSinceEpoch / 1000).floor();
+        final valid = now - parsed['exp'] < 0;
+        if (!valid) {
+          log(
+            'auth',
+            'InvalidJWTToken: Invalid value for JWT claim "exp" with value ${parsed['exp']}',
+            null,
+            Level.FINE,
+          );
+          throw 'InvalidJWTToken: Invalid value for JWT claim "exp" with value ${parsed['exp']}';
+        }
+      }
+    }
+
+    accessToken = tokenToSend;
 
     for (final channel in channels) {
-      if (token != null) {
-        channel.updateJoinPayload({'access_token': token});
+      if (tokenToSend != null) {
+        channel.updateJoinPayload({'access_token': tokenToSend});
       }
       if (channel.joinedOnce && channel.isJoined) {
-        channel.push(ChannelEvents.accessToken, {'access_token': token});
+        channel.push(ChannelEvents.accessToken, {'access_token': tokenToSend});
       }
     }
   }
diff --git a/packages/realtime_client/pubspec.yaml b/packages/realtime_client/pubspec.yaml
index 8c7ef8cd..29b924f2 100644
--- a/packages/realtime_client/pubspec.yaml
+++ b/packages/realtime_client/pubspec.yaml
@@ -19,3 +19,4 @@ dev_dependencies:
   lints: ^3.0.0
   mocktail: ^1.0.0
   test: ^1.16.5
+  crypto: ^3.0.6
diff --git a/packages/realtime_client/test/socket_test.dart b/packages/realtime_client/test/socket_test.dart
index 79fe1306..746c0e86 100644
--- a/packages/realtime_client/test/socket_test.dart
+++ b/packages/realtime_client/test/socket_test.dart
@@ -1,6 +1,7 @@
 import 'dart:convert';
 import 'dart:io';
 
+import 'package:crypto/crypto.dart';
 import 'package:mocktail/mocktail.dart';
 import 'package:realtime_client/realtime_client.dart';
 import 'package:realtime_client/src/constants.dart';
@@ -16,6 +17,31 @@ typedef WebSocketChannelClosure = WebSocketChannel Function(
   Map<String, String> headers,
 );
 
+/// Generate a JWT token for testing purposes
+///
+/// [exp] in seconds since Epoch
+String generateJwt([int? exp]) {
+  final header = {'alg': 'HS256', 'typ': 'JWT'};
+
+  final now = DateTime.now();
+  final expiry = exp ??
+      (now.add(Duration(hours: 1)).millisecondsSinceEpoch / 1000).floor();
+
+  final payload = {'exp': expiry};
+
+  final key = 'your-256-bit-secret';
+
+  final encodedHeader = base64Url.encode(utf8.encode(json.encode(header)));
+  final encodedPayload = base64Url.encode(utf8.encode(json.encode(payload)));
+
+  final signatureInput = '$encodedHeader.$encodedPayload';
+  final hmac = Hmac(sha256, utf8.encode(key));
+  final digest = hmac.convert(utf8.encode(signatureInput));
+  final signature = base64Url.encode(digest.bytes);
+
+  return '$encodedHeader.$encodedPayload.$signature';
+}
+
 void main() {
   const int int64MaxValue = 9223372036854775807;
 
@@ -427,8 +453,9 @@ void main() {
   });
 
   group('setAuth', () {
-    final updateJoinPayload = {'access_token': 'token123'};
-    final pushPayload = {'access_token': 'token123'};
+    final token = generateJwt();
+    final updateJoinPayload = {'access_token': token};
+    final pushPayload = {'access_token': token};
 
     test(
         "sets access token, updates channels' join payload, and pushes token to channels",
@@ -457,7 +484,9 @@ void main() {
       final channel1 = mockedSocket.channel(tTopic1);
       final channel2 = mockedSocket.channel(tTopic2);
 
-      mockedSocket.setAuth('token123');
+      mockedSocket.setAuth(token);
+
+      expect(mockedSocket.accessToken, token);
 
       verify(() => channel1.updateJoinPayload(updateJoinPayload)).called(1);
       verify(() => channel2.updateJoinPayload(updateJoinPayload)).called(1);
@@ -466,6 +495,62 @@ void main() {
       verify(() => channel2.push(ChannelEvents.accessToken, pushPayload))
           .called(1);
     });
+
+    test(
+        "sets access token, updates channels' join payload, and pushes token to channels if is not a jwt",
+        () {
+      final mockedChannel1 = MockChannel();
+      final mockedChannel2 = MockChannel();
+      final mockedChannel3 = MockChannel();
+
+      when(() => mockedChannel1.joinedOnce).thenReturn(true);
+      when(() => mockedChannel1.isJoined).thenReturn(true);
+      when(() => mockedChannel1.push(ChannelEvents.accessToken, any()))
+          .thenReturn(MockPush());
+
+      when(() => mockedChannel2.joinedOnce).thenReturn(false);
+      when(() => mockedChannel2.isJoined).thenReturn(false);
+      when(() => mockedChannel2.push(ChannelEvents.accessToken, any()))
+          .thenReturn(MockPush());
+
+      when(() => mockedChannel3.joinedOnce).thenReturn(true);
+      when(() => mockedChannel3.isJoined).thenReturn(true);
+      when(() => mockedChannel3.push(ChannelEvents.accessToken, any()))
+          .thenReturn(MockPush());
+
+      const tTopic1 = 'test-topic1';
+      const tTopic2 = 'test-topic2';
+      const tTopic3 = 'test-topic3';
+
+      final mockedSocket = SocketWithMockedChannel(socketEndpoint);
+      mockedSocket.mockedChannelLooker.addAll(<String, RealtimeChannel>{
+        tTopic1: mockedChannel1,
+        tTopic2: mockedChannel2,
+        tTopic3: mockedChannel3,
+      });
+
+      final channel1 = mockedSocket.channel(tTopic1);
+      final channel2 = mockedSocket.channel(tTopic2);
+      final channel3 = mockedSocket.channel(tTopic3);
+
+      const token = 'sb-key';
+      final pushPayload = {'access_token': token};
+      final updateJoinPayload = {'access_token': token};
+
+      mockedSocket.setAuth(token);
+
+      expect(mockedSocket.accessToken, token);
+
+      verify(() => channel1.updateJoinPayload(updateJoinPayload)).called(1);
+      verify(() => channel2.updateJoinPayload(updateJoinPayload)).called(1);
+      verify(() => channel3.updateJoinPayload(updateJoinPayload)).called(1);
+
+      verify(() => channel1.push(ChannelEvents.accessToken, pushPayload))
+          .called(1);
+      verifyNever(() => channel2.push(ChannelEvents.accessToken, pushPayload));
+      verify(() => channel3.push(ChannelEvents.accessToken, pushPayload))
+          .called(1);
+    });
   });
 
   group('sendHeartbeat', () {

From 8045727e640ded44453b2fe16532ab6820751670 Mon Sep 17 00:00:00 2001
From: dshukertjr <dshukertjr@gmail.com>
Date: Tue, 10 Dec 2024 18:09:44 +0900
Subject: [PATCH 2/7] widen the constraint for crypto dev dependencies on
 realtime

---
 packages/realtime_client/pubspec.yaml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/packages/realtime_client/pubspec.yaml b/packages/realtime_client/pubspec.yaml
index 29b924f2..6ef77e14 100644
--- a/packages/realtime_client/pubspec.yaml
+++ b/packages/realtime_client/pubspec.yaml
@@ -19,4 +19,4 @@ dev_dependencies:
   lints: ^3.0.0
   mocktail: ^1.0.0
   test: ^1.16.5
-  crypto: ^3.0.6
+  crypto: ^3.0.0

From 7e660c3cac6ab40923131aba6804c3a9921d7b9d Mon Sep 17 00:00:00 2001
From: dshukertjr <dshukertjr@gmail.com>
Date: Thu, 12 Dec 2024 14:08:59 +0900
Subject: [PATCH 3/7] fix: properly handle exception on supabase-client for
 realtime set auth

---
 .../realtime_client/lib/src/realtime_client.dart     |  3 ++-
 packages/supabase/lib/src/supabase_client.dart       | 12 +++++++++++-
 2 files changed, 13 insertions(+), 2 deletions(-)

diff --git a/packages/realtime_client/lib/src/realtime_client.dart b/packages/realtime_client/lib/src/realtime_client.dart
index 67b5de66..ce4904ad 100644
--- a/packages/realtime_client/lib/src/realtime_client.dart
+++ b/packages/realtime_client/lib/src/realtime_client.dart
@@ -429,7 +429,8 @@ class RealtimeClient {
             null,
             Level.FINE,
           );
-          throw 'InvalidJWTToken: Invalid value for JWT claim "exp" with value ${parsed['exp']}';
+          throw FormatException(
+              'InvalidJWTToken: Invalid value for JWT claim "exp" with value ${parsed['exp']}');
         }
       }
     }
diff --git a/packages/supabase/lib/src/supabase_client.dart b/packages/supabase/lib/src/supabase_client.dart
index 4f500b8b..fe592ff0 100644
--- a/packages/supabase/lib/src/supabase_client.dart
+++ b/packages/supabase/lib/src/supabase_client.dart
@@ -350,7 +350,17 @@ class SupabaseClient {
     // ignore: invalid_use_of_internal_member
     _authStateSubscription = auth.onAuthStateChangeSync.listen(
       (data) {
-        _handleTokenChanged(data.event, data.session?.accessToken);
+        try {
+          _handleTokenChanged(data.event, data.session?.accessToken);
+        } on FormatException catch (e) {
+          if (e.message.contains('InvalidJWTToken')) {
+            // The exception is thrown by RealtimeClient when the token is
+            // expired for example on app launch after the app has been closed
+            // for a while.
+          } else {
+            rethrow;
+          }
+        }
       },
       onError: (error, stack) {},
     );

From 9b323b98a8ea6a2447fe0c24e23480253b2a736c Mon Sep 17 00:00:00 2001
From: dshukertjr <dshukertjr@gmail.com>
Date: Thu, 12 Dec 2024 14:21:37 +0900
Subject: [PATCH 4/7] fix: handle realtime token exception on SupabaseClient

---
 .../supabase/lib/src/supabase_client.dart     | 30 +++++++++----------
 1 file changed, 15 insertions(+), 15 deletions(-)

diff --git a/packages/supabase/lib/src/supabase_client.dart b/packages/supabase/lib/src/supabase_client.dart
index fe592ff0..9c32fcb3 100644
--- a/packages/supabase/lib/src/supabase_client.dart
+++ b/packages/supabase/lib/src/supabase_client.dart
@@ -349,32 +349,32 @@ class SupabaseClient {
   void _listenForAuthEvents() {
     // ignore: invalid_use_of_internal_member
     _authStateSubscription = auth.onAuthStateChangeSync.listen(
-      (data) {
-        try {
-          _handleTokenChanged(data.event, data.session?.accessToken);
-        } on FormatException catch (e) {
-          if (e.message.contains('InvalidJWTToken')) {
-            // The exception is thrown by RealtimeClient when the token is
-            // expired for example on app launch after the app has been closed
-            // for a while.
-          } else {
-            rethrow;
-          }
-        }
+      (data) async {
+        await _handleTokenChanged(data.event, data.session?.accessToken);
       },
       onError: (error, stack) {},
     );
   }
 
-  void _handleTokenChanged(AuthChangeEvent event, String? token) {
+  Future<void> _handleTokenChanged(AuthChangeEvent event, String? token) async {
     if (event == AuthChangeEvent.initialSession ||
         event == AuthChangeEvent.tokenRefreshed ||
         event == AuthChangeEvent.signedIn) {
-      realtime.setAuth(token);
+      try {
+        await realtime.setAuth(token);
+      } on FormatException catch (e) {
+        if (e.message.contains('InvalidJWTToken')) {
+          // The exception is thrown by RealtimeClient when the token is
+          // expired for example on app launch after the app has been closed
+          // for a while.
+        } else {
+          rethrow;
+        }
+      }
     } else if (event == AuthChangeEvent.signedOut) {
       // Token is removed
 
-      realtime.setAuth(_supabaseKey);
+      await realtime.setAuth(_supabaseKey);
     }
   }
 }

From 4205d8d098ce48d929ca855095cfdf6ae39d4d39 Mon Sep 17 00:00:00 2001
From: dshukertjr <dshukertjr@gmail.com>
Date: Sun, 15 Dec 2024 23:13:43 +0900
Subject: [PATCH 5/7] await all setAuth calls

---
 .../lib/src/realtime_channel.dart              |  6 ++++--
 .../lib/src/realtime_client.dart               |  6 +++---
 packages/realtime_client/test/socket_test.dart | 18 +++++++++---------
 3 files changed, 16 insertions(+), 14 deletions(-)

diff --git a/packages/realtime_client/lib/src/realtime_channel.dart b/packages/realtime_client/lib/src/realtime_channel.dart
index 7c37d800..02c7aa86 100644
--- a/packages/realtime_client/lib/src/realtime_channel.dart
+++ b/packages/realtime_client/lib/src/realtime_channel.dart
@@ -150,9 +150,11 @@ class RealtimeChannel {
 
       joinPush.receive(
         'ok',
-        (response) {
+        (response) async {
           final serverPostgresFilters = response['postgres_changes'];
-          if (socket.accessToken != null) socket.setAuth(socket.accessToken);
+          if (socket.accessToken != null) {
+            await socket.setAuth(socket.accessToken);
+          }
 
           if (serverPostgresFilters == null) {
             if (callback != null) {
diff --git a/packages/realtime_client/lib/src/realtime_client.dart b/packages/realtime_client/lib/src/realtime_client.dart
index ce4904ad..269b046a 100644
--- a/packages/realtime_client/lib/src/realtime_client.dart
+++ b/packages/realtime_client/lib/src/realtime_client.dart
@@ -467,7 +467,7 @@ class RealtimeClient {
     if (heartbeatTimer != null) heartbeatTimer!.cancel();
     heartbeatTimer = Timer.periodic(
       Duration(milliseconds: heartbeatIntervalMs),
-      (Timer t) => sendHeartbeat(),
+      (Timer t) async => await sendHeartbeat(),
     );
     for (final callback in stateChangeCallbacks['open']!) {
       callback();
@@ -533,7 +533,7 @@ class RealtimeClient {
   }
 
   @internal
-  void sendHeartbeat() {
+  Future<void> sendHeartbeat() async {
     if (!isConnected) {
       return;
     }
@@ -555,6 +555,6 @@ class RealtimeClient {
       payload: {},
       ref: pendingHeartbeatRef!,
     ));
-    setAuth(accessToken);
+    await setAuth(accessToken);
   }
 }
diff --git a/packages/realtime_client/test/socket_test.dart b/packages/realtime_client/test/socket_test.dart
index 746c0e86..55c50914 100644
--- a/packages/realtime_client/test/socket_test.dart
+++ b/packages/realtime_client/test/socket_test.dart
@@ -200,7 +200,7 @@ void main() {
       await Future.delayed(const Duration(milliseconds: 200));
       expect(opens, 1);
 
-      socket.sendHeartbeat();
+      await socket.sendHeartbeat();
       // need to wait for event to trigger
       await Future.delayed(const Duration(seconds: 1));
       expect(lastMsg['event'], 'heartbeat');
@@ -459,7 +459,7 @@ void main() {
 
     test(
         "sets access token, updates channels' join payload, and pushes token to channels",
-        () {
+        () async {
       final mockedChannel1 = MockChannel();
       when(() => mockedChannel1.joinedOnce).thenReturn(true);
       when(() => mockedChannel1.isJoined).thenReturn(true);
@@ -484,7 +484,7 @@ void main() {
       final channel1 = mockedSocket.channel(tTopic1);
       final channel2 = mockedSocket.channel(tTopic2);
 
-      mockedSocket.setAuth(token);
+      await mockedSocket.setAuth(token);
 
       expect(mockedSocket.accessToken, token);
 
@@ -498,7 +498,7 @@ void main() {
 
     test(
         "sets access token, updates channels' join payload, and pushes token to channels if is not a jwt",
-        () {
+        () async {
       final mockedChannel1 = MockChannel();
       final mockedChannel2 = MockChannel();
       final mockedChannel3 = MockChannel();
@@ -537,7 +537,7 @@ void main() {
       final pushPayload = {'access_token': token};
       final updateJoinPayload = {'access_token': token};
 
-      mockedSocket.setAuth(token);
+      await mockedSocket.setAuth(token);
 
       expect(mockedSocket.accessToken, token);
 
@@ -581,18 +581,18 @@ void main() {
 
     //! Unimplemented Test: closes socket when heartbeat is not ack'd within heartbeat window
 
-    test('pushes heartbeat data when connected', () {
+    test('pushes heartbeat data when connected', () async {
       mockedSocket.connState = SocketStates.open;
 
-      mockedSocket.sendHeartbeat();
+      await mockedSocket.sendHeartbeat();
 
       verify(() => mockedSink.add(captureAny(that: equals(data)))).called(1);
     });
 
-    test('no ops when not connected', () {
+    test('no ops when not connected', () async {
       mockedSocket.connState = SocketStates.connecting;
 
-      mockedSocket.sendHeartbeat();
+      await mockedSocket.sendHeartbeat();
       verifyNever(() => mockedSink.add(any()));
     });
   });

From 54ae4af5e9635f60e3da4309bfaec268b316481f Mon Sep 17 00:00:00 2001
From: dshukertjr <dshukertjr@gmail.com>
Date: Sun, 15 Dec 2024 23:21:42 +0900
Subject: [PATCH 6/7] pass custom access token as params

---
 packages/realtime_client/lib/src/realtime_client.dart | 1 +
 packages/supabase/lib/src/supabase_client.dart        | 1 +
 2 files changed, 2 insertions(+)

diff --git a/packages/realtime_client/lib/src/realtime_client.dart b/packages/realtime_client/lib/src/realtime_client.dart
index 269b046a..18307f5f 100644
--- a/packages/realtime_client/lib/src/realtime_client.dart
+++ b/packages/realtime_client/lib/src/realtime_client.dart
@@ -132,6 +132,7 @@ class RealtimeClient {
     this.longpollerTimeout = 20000,
     RealtimeLogLevel? logLevel,
     this.httpClient,
+    this.customAccessToken,
   })  : endPoint = Uri.parse('$endPoint/${Transports.websocket}')
             .replace(
               queryParameters:
diff --git a/packages/supabase/lib/src/supabase_client.dart b/packages/supabase/lib/src/supabase_client.dart
index 9c32fcb3..4a7d52de 100644
--- a/packages/supabase/lib/src/supabase_client.dart
+++ b/packages/supabase/lib/src/supabase_client.dart
@@ -332,6 +332,7 @@ class SupabaseClient {
       logLevel: options.logLevel,
       httpClient: _authHttpClient,
       timeout: options.timeout ?? RealtimeConstants.defaultTimeout,
+      customAccessToken: accessToken,
     );
   }
 

From eefb5bbf6ec56000f6dbe85ec6ab38b6e309145b Mon Sep 17 00:00:00 2001
From: dshukertjr <dshukertjr@gmail.com>
Date: Tue, 17 Dec 2024 00:53:21 +0900
Subject: [PATCH 7/7] properly parse JWT within realtime client

---
 packages/realtime_client/lib/src/realtime_client.dart | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/packages/realtime_client/lib/src/realtime_client.dart b/packages/realtime_client/lib/src/realtime_client.dart
index 18307f5f..3ecfb612 100644
--- a/packages/realtime_client/lib/src/realtime_client.dart
+++ b/packages/realtime_client/lib/src/realtime_client.dart
@@ -415,8 +415,8 @@ class RealtimeClient {
       Map<String, dynamic>? parsed;
       try {
         final decoded =
-            utf8.decode(base64Url.decode(tokenToSend.split('.')[1]));
-        parsed = json.decode(decoded);
+            base64.decode(base64.normalize(tokenToSend.split('.')[1]));
+        parsed = json.decode(utf8.decode(decoded));
       } catch (e) {
         // ignore parsing errors
       }