diff --git a/packages/supabase/lib/src/supabase_stream_builder.dart b/packages/supabase/lib/src/supabase_stream_builder.dart index c27d9bd5..b3a1c9f0 100644 --- a/packages/supabase/lib/src/supabase_stream_builder.dart +++ b/packages/supabase/lib/src/supabase_stream_builder.dart @@ -171,46 +171,44 @@ class SupabaseStreamBuilder extends Stream { _channel! .onPostgresChanges( - event: PostgresChangeEvent.insert, + event: PostgresChangeEvent.all, schema: _schema, table: _table, filter: realtimeFilter, callback: (payload) { - final newRecord = payload.newRecord; - _streamData.add(newRecord); - _addStream(); - }) - .onPostgresChanges( - event: PostgresChangeEvent.update, - schema: _schema, - table: _table, - filter: realtimeFilter, - callback: (payload) { - final updatedIndex = _streamData.indexWhere( - (element) => _isTargetRecord(record: element, payload: payload), - ); - - final updatedRecord = payload.newRecord; - if (updatedIndex >= 0) { - _streamData[updatedIndex] = updatedRecord; - } else { - _streamData.add(updatedRecord); - } - _addStream(); - }) - .onPostgresChanges( - event: PostgresChangeEvent.delete, - schema: _schema, - table: _table, - filter: realtimeFilter, - callback: (payload) { - final deletedIndex = _streamData.indexWhere( - (element) => _isTargetRecord(record: element, payload: payload), - ); - if (deletedIndex >= 0) { - /// Delete the data from in memory cache if it was found - _streamData.removeAt(deletedIndex); - _addStream(); + switch (payload.eventType) { + case PostgresChangeEvent.insert: + final newRecord = payload.newRecord; + _streamData.add(newRecord); + _addStream(); + break; + case PostgresChangeEvent.update: + final updatedIndex = _streamData.indexWhere( + (element) => + _isTargetRecord(record: element, payload: payload), + ); + + final updatedRecord = payload.newRecord; + if (updatedIndex >= 0) { + _streamData[updatedIndex] = updatedRecord; + } else { + _streamData.add(updatedRecord); + } + _addStream(); + break; + case PostgresChangeEvent.delete: + final deletedIndex = _streamData.indexWhere( + (element) => + _isTargetRecord(record: element, payload: payload), + ); + if (deletedIndex >= 0) { + /// Delete the data from in memory cache if it was found + _streamData.removeAt(deletedIndex); + _addStream(); + } + break; + default: + break; } }) .subscribe((status, [error]) { diff --git a/packages/supabase/test/mock_test.dart b/packages/supabase/test/mock_test.dart index 6c458c5f..f3f4f53e 100644 --- a/packages/supabase/test/mock_test.dart +++ b/packages/supabase/test/mock_test.dart @@ -136,25 +136,11 @@ void main() { 'postgres_changes': [ { 'id': 77086988, - 'event': 'INSERT', + 'event': '*', 'schema': 'public', 'table': 'todos', if (realtimeFilter != null) 'filter': realtimeFilter, }, - { - 'id': 25993878, - 'event': 'UPDATE', - 'schema': 'public', - 'table': 'todos', - if (realtimeFilter != null) 'filter': realtimeFilter, - }, - { - 'id': 48673474, - 'event': 'DELETE', - 'schema': 'public', - 'table': 'todos', - if (realtimeFilter != null) 'filter': realtimeFilter, - } ] }, 'status': 'ok' @@ -208,7 +194,7 @@ void main() { 'ref': null, 'event': 'postgres_changes', 'payload': { - 'ids': [25993878], + 'ids': [77086988], 'data': { 'columns': [ {'name': 'id', 'type': 'int4', 'type_modifier': 4294967295}, @@ -257,7 +243,7 @@ void main() { 'type': 'DELETE', if (realtimeFilter != null) 'filter': realtimeFilter, }, - 'ids': [48673474] + 'ids': [77086988] }, }); webSocket!.add(deleteString); @@ -271,7 +257,7 @@ void main() { 'ref': null, 'event': 'postgres_changes', 'payload': { - 'ids': [25993878], + 'ids': [77086988], 'data': { 'columns': [ {'name': 'id', 'type': 'int4', 'type_modifier': 4294967295}, @@ -321,7 +307,7 @@ void main() { 'type': 'DELETE', if (realtimeFilter != null) 'filter': realtimeFilter, }, - 'ids': [48673474] + 'ids': [77086988] }, }); webSocket!.add(ignoredDeleteString);