Skip to content

Commit

Permalink
Proposal: support for realtime multiple filters
Browse files Browse the repository at this point in the history
  • Loading branch information
riccardocucia committed Mar 26, 2024
1 parent c59bdbb commit 2ace607
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 110 deletions.
12 changes: 7 additions & 5 deletions packages/realtime_client/example/main.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ Future<void> main() async {
final channel = socket.channel('realtime:public');
channel.onPostgresChanges(
event: PostgresChangeEvent.all,
filter: PostgresChangeFilter(
type: PostgresChangeFilterType.eq,
column: 'column',
value: 'value',
),
filters: [
PostgresChangeFilter(
type: PostgresChangeFilterType.eq,
column: 'column',
value: 'value',
)
],
callback: (payload) {},
);
channel.onPostgresChanges(
Expand Down
6 changes: 3 additions & 3 deletions packages/realtime_client/lib/src/realtime_channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ class RealtimeChannel {
/// [table] is the table of the database on which to setup the listener.
/// The listener will return all changes from every listenable table if omitted.
///
/// [filter] can be used to further control which rows to listen to within the given [schema] and [table].
/// [filters] can be used to further control which rows to listen to within the given [schema] and [table].
///
/// ```dart
/// supabase.channel('my_channel').onPostgresChanges(
Expand All @@ -289,7 +289,7 @@ class RealtimeChannel {
required PostgresChangeEvent event,
String? schema,
String? table,
PostgresChangeFilter? filter,
List<PostgresChangeFilter> filters = const [],
required void Function(PostgresChangePayload payload) callback,
}) {
return onEvents(
Expand All @@ -298,7 +298,7 @@ class RealtimeChannel {
event: event.toRealtimeEvent(),
schema: schema,
table: table,
filter: filter?.toString(),
filter: filters.map((e) => e.toString()).join('&'),
),
(payload, [ref]) => callback(PostgresChangePayload.fromPayload(payload)),
);
Expand Down
30 changes: 21 additions & 9 deletions packages/realtime_client/test/mock_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,13 @@ void main() {
event: PostgresChangeEvent.all,
schema: 'public',
table: 'todos',
filter: PostgresChangeFilter(
type: PostgresChangeFilterType.eq,
column: 'id',
value: 2,
),
filters: [
PostgresChangeFilter(
type: PostgresChangeFilterType.eq,
column: 'id',
value: 2,
)
],
callback: (payload) {
streamController.add(payload);
})
Expand Down Expand Up @@ -278,8 +280,13 @@ void main() {
event: PostgresChangeEvent.all,
schema: 'public',
table: 'todos',
filter: PostgresChangeFilter(
type: PostgresChangeFilterType.eq, column: 'id', value: 2),
filters: [
PostgresChangeFilter(
type: PostgresChangeFilterType.eq,
column: 'id',
value: 2,
)
],
callback: (payload) {},
);

Expand Down Expand Up @@ -473,8 +480,13 @@ void main() {
event: PostgresChangeEvent.all,
schema: 'public',
table: 'todos',
filter: PostgresChangeFilter(
type: PostgresChangeFilterType.eq, column: 'id', value: 2),
filters: [
PostgresChangeFilter(
type: PostgresChangeFilterType.eq,
column: 'id',
value: 2,
)
],
callback: (payload) {
streamController.add(payload);
})
Expand Down
77 changes: 42 additions & 35 deletions packages/supabase/lib/src/supabase_stream_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class _Order {
required this.column,
required this.ascending,
});

final String column;
final bool ascending;
}
Expand Down Expand Up @@ -56,7 +57,7 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
SupabaseStreamEvent _streamData = [];

/// `eq` filter used for both postgrest and realtime
_StreamPostgrestFilter? _streamFilter;
final List<_StreamPostgrestFilter> _streamFilters = [];

/// Which column to order by and whether it's ascending
_Order? _orderBy;
Expand Down Expand Up @@ -137,15 +138,19 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
}

Future<void> _getStreamData() async {
final currentStreamFilter = _streamFilter;
final currentStreamFilter = _streamFilters;
_streamData = [];
PostgresChangeFilter? realtimeFilter;
if (currentStreamFilter != null) {
realtimeFilter = PostgresChangeFilter(
type: currentStreamFilter.type,
column: currentStreamFilter.column,
value: currentStreamFilter.value,
);
List<PostgresChangeFilter> realtimeFilter = [];
if (currentStreamFilter.isNotEmpty) {
realtimeFilter = currentStreamFilter
.map(
(e) => PostgresChangeFilter(
type: e.type,
column: e.column,
value: e.value,
),
)
.toList();
}

_channel = _realtimeClient.channel(_realtimeTopic);
Expand All @@ -155,7 +160,7 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
event: PostgresChangeEvent.insert,
schema: _schema,
table: _table,
filter: realtimeFilter,
filters: realtimeFilter,
callback: (payload) {
final newRecord = payload.newRecord;
_streamData.add(newRecord);
Expand All @@ -165,7 +170,7 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
event: PostgresChangeEvent.update,
schema: _schema,
table: _table,
filter: realtimeFilter,
filters: realtimeFilter,
callback: (payload) {
final updatedIndex = _streamData.indexWhere(
(element) => _isTargetRecord(record: element, payload: payload),
Expand All @@ -183,7 +188,7 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
event: PostgresChangeEvent.delete,
schema: _schema,
table: _table,
filter: realtimeFilter,
filters: realtimeFilter,
callback: (payload) {
final deletedIndex = _streamData.indexWhere(
(element) => _isTargetRecord(record: element, payload: payload),
Expand All @@ -201,29 +206,31 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
});

PostgrestFilterBuilder query = _queryBuilder.select();
if (_streamFilter != null) {
switch (_streamFilter!.type) {
case PostgresChangeFilterType.eq:
query = query.eq(_streamFilter!.column, _streamFilter!.value);
break;
case PostgresChangeFilterType.neq:
query = query.neq(_streamFilter!.column, _streamFilter!.value);
break;
case PostgresChangeFilterType.lt:
query = query.lt(_streamFilter!.column, _streamFilter!.value);
break;
case PostgresChangeFilterType.lte:
query = query.lte(_streamFilter!.column, _streamFilter!.value);
break;
case PostgresChangeFilterType.gt:
query = query.gt(_streamFilter!.column, _streamFilter!.value);
break;
case PostgresChangeFilterType.gte:
query = query.gte(_streamFilter!.column, _streamFilter!.value);
break;
case PostgresChangeFilterType.inFilter:
query = query.inFilter(_streamFilter!.column, _streamFilter!.value);
break;
if (_streamFilters.isNotEmpty) {
for (var filter in _streamFilters) {
switch (filter.type) {
case PostgresChangeFilterType.eq:
query = query.eq(filter.column, filter.value);
break;
case PostgresChangeFilterType.neq:
query = query.neq(filter.column, filter.value);
break;
case PostgresChangeFilterType.lt:
query = query.lt(filter.column, filter.value);
break;
case PostgresChangeFilterType.lte:
query = query.lte(filter.column, filter.value);
break;
case PostgresChangeFilterType.gt:
query = query.gt(filter.column, filter.value);
break;
case PostgresChangeFilterType.gte:
query = query.gte(filter.column, filter.value);
break;
case PostgresChangeFilterType.inFilter:
query = query.inFilter(filter.column, filter.value);
break;
}
}
}
PostgrestTransformBuilder? transformQuery;
Expand Down
42 changes: 21 additions & 21 deletions packages/supabase/lib/src/supabase_stream_filter_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ class SupabaseStreamFilterBuilder extends SupabaseStreamBuilder {
/// ```dart
/// supabase.from('users').stream(primaryKey: ['id']).eq('name', 'Supabase');
/// ```
SupabaseStreamBuilder eq(String column, Object value) {
_streamFilter = _StreamPostgrestFilter(
SupabaseStreamFilterBuilder eq(String column, Object value) {
_streamFilters.add(_StreamPostgrestFilter(
type: PostgresChangeFilterType.eq,
column: column,
value: value,
);
));
return this;
}

Expand All @@ -33,12 +33,12 @@ class SupabaseStreamFilterBuilder extends SupabaseStreamBuilder {
/// ```dart
/// supabase.from('users').stream(primaryKey: ['id']).neq('name', 'Supabase');
/// ```
SupabaseStreamBuilder neq(String column, Object value) {
_streamFilter = _StreamPostgrestFilter(
SupabaseStreamFilterBuilder neq(String column, Object value) {
_streamFilters.add(_StreamPostgrestFilter(
type: PostgresChangeFilterType.neq,
column: column,
value: value,
);
));
return this;
}

Expand All @@ -49,12 +49,12 @@ class SupabaseStreamFilterBuilder extends SupabaseStreamBuilder {
/// ```dart
/// supabase.from('users').stream(primaryKey: ['id']).lt('likes', 100);
/// ```
SupabaseStreamBuilder lt(String column, Object value) {
_streamFilter = _StreamPostgrestFilter(
SupabaseStreamFilterBuilder lt(String column, Object value) {
_streamFilters.add(_StreamPostgrestFilter(
type: PostgresChangeFilterType.lt,
column: column,
value: value,
);
));
return this;
}

Expand All @@ -65,12 +65,12 @@ class SupabaseStreamFilterBuilder extends SupabaseStreamBuilder {
/// ```dart
/// supabase.from('users').stream(primaryKey: ['id']).lte('likes', 100);
/// ```
SupabaseStreamBuilder lte(String column, Object value) {
_streamFilter = _StreamPostgrestFilter(
SupabaseStreamFilterBuilder lte(String column, Object value) {
_streamFilters.add(_StreamPostgrestFilter(
type: PostgresChangeFilterType.lte,
column: column,
value: value,
);
));
return this;
}

Expand All @@ -81,12 +81,12 @@ class SupabaseStreamFilterBuilder extends SupabaseStreamBuilder {
/// ```dart
/// supabase.from('users').stream(primaryKey: ['id']).gt('likes', '100');
/// ```
SupabaseStreamBuilder gt(String column, Object value) {
_streamFilter = _StreamPostgrestFilter(
SupabaseStreamFilterBuilder gt(String column, Object value) {
_streamFilters.add(_StreamPostgrestFilter(
type: PostgresChangeFilterType.gt,
column: column,
value: value,
);
));
return this;
}

Expand All @@ -97,12 +97,12 @@ class SupabaseStreamFilterBuilder extends SupabaseStreamBuilder {
/// ```dart
/// supabase.from('users').stream(primaryKey: ['id']).gte('likes', 100);
/// ```
SupabaseStreamBuilder gte(String column, Object value) {
_streamFilter = _StreamPostgrestFilter(
SupabaseStreamFilterBuilder gte(String column, Object value) {
_streamFilters.add(_StreamPostgrestFilter(
type: PostgresChangeFilterType.gte,
column: column,
value: value,
);
));
return this;
}

Expand All @@ -113,12 +113,12 @@ class SupabaseStreamFilterBuilder extends SupabaseStreamBuilder {
/// ```dart
/// supabase.from('users').stream(primaryKey: ['id']).inFilter('name', ['Andy', 'Amy', 'Terry']);
/// ```
SupabaseStreamBuilder inFilter(String column, List<Object> values) {
_streamFilter = _StreamPostgrestFilter(
SupabaseStreamFilterBuilder inFilter(String column, List<Object> values) {
_streamFilters.add(_StreamPostgrestFilter(
type: PostgresChangeFilterType.inFilter,
column: column,
value: values,
);
));
return this;
}
}
3 changes: 2 additions & 1 deletion packages/supabase/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ dependencies:
gotrue: ^2.5.1
http: '>=0.13.5 <2.0.0'
postgrest: ^2.1.1
realtime_client: ^2.0.1
realtime_client: #^2.0.1
path: ../realtime_client
storage_client: ^2.0.1
rxdart: ^0.27.5
yet_another_json_isolate: ^2.0.0
Expand Down
3 changes: 2 additions & 1 deletion packages/supabase_flutter/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ dependencies:
sdk: flutter
http: '>=0.13.4 <2.0.0'
meta: ^1.7.0
supabase: ^2.0.8
supabase: #^2.0.8
path: ../supabase
url_launcher: ^6.1.2
path_provider: ^2.0.0
shared_preferences: ^2.0.0
Expand Down
Loading

0 comments on commit 2ace607

Please sign in to comment.