Skip to content

Commit

Permalink
fix: Consolidate realtime subscription for stream
Browse files Browse the repository at this point in the history
  • Loading branch information
dshukertjr committed Dec 10, 2024
1 parent ccfcbf5 commit 668cfaa
Showing 1 changed file with 34 additions and 36 deletions.
70 changes: 34 additions & 36 deletions packages/supabase/lib/src/supabase_stream_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -171,46 +171,44 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {

_channel!
.onPostgresChanges(
event: PostgresChangeEvent.insert,
event: PostgresChangeEvent.all,
schema: _schema,
table: _table,
filter: realtimeFilter,
callback: (payload) {
final newRecord = payload.newRecord;
_streamData.add(newRecord);
_addStream();
})
.onPostgresChanges(
event: PostgresChangeEvent.update,
schema: _schema,
table: _table,
filter: realtimeFilter,
callback: (payload) {
final updatedIndex = _streamData.indexWhere(
(element) => _isTargetRecord(record: element, payload: payload),
);

final updatedRecord = payload.newRecord;
if (updatedIndex >= 0) {
_streamData[updatedIndex] = updatedRecord;
} else {
_streamData.add(updatedRecord);
}
_addStream();
})
.onPostgresChanges(
event: PostgresChangeEvent.delete,
schema: _schema,
table: _table,
filter: realtimeFilter,
callback: (payload) {
final deletedIndex = _streamData.indexWhere(
(element) => _isTargetRecord(record: element, payload: payload),
);
if (deletedIndex >= 0) {
/// Delete the data from in memory cache if it was found
_streamData.removeAt(deletedIndex);
_addStream();
switch (payload.eventType) {
case PostgresChangeEvent.insert:
final newRecord = payload.newRecord;
_streamData.add(newRecord);
_addStream();
break;
case PostgresChangeEvent.update:
final updatedIndex = _streamData.indexWhere(
(element) =>
_isTargetRecord(record: element, payload: payload),
);

final updatedRecord = payload.newRecord;
if (updatedIndex >= 0) {
_streamData[updatedIndex] = updatedRecord;
} else {
_streamData.add(updatedRecord);
}
_addStream();
break;
case PostgresChangeEvent.delete:
final deletedIndex = _streamData.indexWhere(
(element) =>
_isTargetRecord(record: element, payload: payload),
);
if (deletedIndex >= 0) {
/// Delete the data from in memory cache if it was found
_streamData.removeAt(deletedIndex);
_addStream();
}
break;
default:
break;
}
})
.subscribe((status, [error]) {
Expand Down

0 comments on commit 668cfaa

Please sign in to comment.