Skip to content

Commit

Permalink
fix(realtime_client): Consolidate realtime subscription for stream (#…
Browse files Browse the repository at this point in the history
…1096)

* fix: Consolidate realtime subscription for stream

* fix: adjust the mock data
  • Loading branch information
dshukertjr authored Dec 11, 2024
1 parent ccfcbf5 commit 4e35115
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 55 deletions.
70 changes: 34 additions & 36 deletions packages/supabase/lib/src/supabase_stream_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -171,46 +171,44 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {

_channel!
.onPostgresChanges(
event: PostgresChangeEvent.insert,
event: PostgresChangeEvent.all,
schema: _schema,
table: _table,
filter: realtimeFilter,
callback: (payload) {
final newRecord = payload.newRecord;
_streamData.add(newRecord);
_addStream();
})
.onPostgresChanges(
event: PostgresChangeEvent.update,
schema: _schema,
table: _table,
filter: realtimeFilter,
callback: (payload) {
final updatedIndex = _streamData.indexWhere(
(element) => _isTargetRecord(record: element, payload: payload),
);

final updatedRecord = payload.newRecord;
if (updatedIndex >= 0) {
_streamData[updatedIndex] = updatedRecord;
} else {
_streamData.add(updatedRecord);
}
_addStream();
})
.onPostgresChanges(
event: PostgresChangeEvent.delete,
schema: _schema,
table: _table,
filter: realtimeFilter,
callback: (payload) {
final deletedIndex = _streamData.indexWhere(
(element) => _isTargetRecord(record: element, payload: payload),
);
if (deletedIndex >= 0) {
/// Delete the data from in memory cache if it was found
_streamData.removeAt(deletedIndex);
_addStream();
switch (payload.eventType) {
case PostgresChangeEvent.insert:
final newRecord = payload.newRecord;
_streamData.add(newRecord);
_addStream();
break;
case PostgresChangeEvent.update:
final updatedIndex = _streamData.indexWhere(
(element) =>
_isTargetRecord(record: element, payload: payload),
);

final updatedRecord = payload.newRecord;
if (updatedIndex >= 0) {
_streamData[updatedIndex] = updatedRecord;
} else {
_streamData.add(updatedRecord);
}
_addStream();
break;
case PostgresChangeEvent.delete:
final deletedIndex = _streamData.indexWhere(
(element) =>
_isTargetRecord(record: element, payload: payload),
);
if (deletedIndex >= 0) {
/// Delete the data from in memory cache if it was found
_streamData.removeAt(deletedIndex);
_addStream();
}
break;
default:
break;
}
})
.subscribe((status, [error]) {
Expand Down
24 changes: 5 additions & 19 deletions packages/supabase/test/mock_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -136,25 +136,11 @@ void main() {
'postgres_changes': [
{
'id': 77086988,
'event': 'INSERT',
'event': '*',
'schema': 'public',
'table': 'todos',
if (realtimeFilter != null) 'filter': realtimeFilter,
},
{
'id': 25993878,
'event': 'UPDATE',
'schema': 'public',
'table': 'todos',
if (realtimeFilter != null) 'filter': realtimeFilter,
},
{
'id': 48673474,
'event': 'DELETE',
'schema': 'public',
'table': 'todos',
if (realtimeFilter != null) 'filter': realtimeFilter,
}
]
},
'status': 'ok'
Expand Down Expand Up @@ -208,7 +194,7 @@ void main() {
'ref': null,
'event': 'postgres_changes',
'payload': {
'ids': [25993878],
'ids': [77086988],
'data': {
'columns': [
{'name': 'id', 'type': 'int4', 'type_modifier': 4294967295},
Expand Down Expand Up @@ -257,7 +243,7 @@ void main() {
'type': 'DELETE',
if (realtimeFilter != null) 'filter': realtimeFilter,
},
'ids': [48673474]
'ids': [77086988]
},
});
webSocket!.add(deleteString);
Expand All @@ -271,7 +257,7 @@ void main() {
'ref': null,
'event': 'postgres_changes',
'payload': {
'ids': [25993878],
'ids': [77086988],
'data': {
'columns': [
{'name': 'id', 'type': 'int4', 'type_modifier': 4294967295},
Expand Down Expand Up @@ -321,7 +307,7 @@ void main() {
'type': 'DELETE',
if (realtimeFilter != null) 'filter': realtimeFilter,
},
'ids': [48673474]
'ids': [77086988]
},
});
webSocket!.add(ignoredDeleteString);
Expand Down

0 comments on commit 4e35115

Please sign in to comment.