Skip to content

Commit

Permalink
intermittent syncing fixed by recursively calling fetchMessageEvents …
Browse files Browse the repository at this point in the history
…until the most recent message batch is reached
  • Loading branch information
ereio committed Oct 28, 2020
1 parent bac2147 commit c7f18f6
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 68 deletions.
36 changes: 36 additions & 0 deletions assets/cheatsheet.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,40 @@
*
*/
ThunkAction<AppState> createUser({enableErrors = false}) {}
```

```dart
// needed to test the recursive messaging 'catch-up'
if (true) {
printError('[fromMessageEvents] *** ${this.name} *** ');
print('[limited] now ${limited}, before ${this.limited}');
print('[lastHash] now ${lastHash}, before ${this.lastHash}');
print('[prevHash] now ${prevHash}');
}
```

```dart
// original initStore function without much regar
// for action types. Ideally, it would have none.
Future<Store> initStore() async {
// Configure redux persist instance
final persistor = Persistor<AppState>(
storage: MemoryStorage(),
serializer: CacheSerializer(),
throttleDuration: Duration(milliseconds: 4500),
shouldSave: (Store<AppState> store, dynamic action) {
switch (action.runtimeType) {
case SetSyncing:
case SetSynced:
// debugPrint('[Redux Persist] cache skip');
return false;
default:
// debugPrint('[Redux Persist] caching');
return true;
}
},
);
```
19 changes: 0 additions & 19 deletions lib/store/index.dart
Original file line number Diff line number Diff line change
Expand Up @@ -97,25 +97,6 @@ AppState appReducer(AppState state, action) => AppState(
* this is why the "storage: MemoryStore()" property is set and
* the Hive Serializer has been impliemented
*/
// Future<Store> initStore() async {
// // Configure redux persist instance
// final persistor = Persistor<AppState>(
// storage: MemoryStorage(),
// serializer: CacheSerializer(),
// throttleDuration: Duration(milliseconds: 4500),
// shouldSave: (Store<AppState> store, dynamic action) {
// switch (action.runtimeType) {
// case SetSyncing:
// case SetSynced:
// // debugPrint('[Redux Persist] cache skip');
// return false;
// default:
// // debugPrint('[Redux Persist] caching');
// return true;
// }
// },
// );

Future<Store> initStore() async {
// Configure redux persist instance
final persistor = Persistor<AppState>(
Expand Down
19 changes: 7 additions & 12 deletions lib/store/rooms/actions.dart
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,7 @@ class AddArchive {
* Helper action that will determine how to update a room
* from data formatted like a sync request
*/
ThunkAction<AppState> syncRooms(
Map roomData,
) {
ThunkAction<AppState> syncRooms(Map roomData) {
return (Store<AppState> store) async {
// init new store containers
final rooms = store.state.roomStore.rooms ?? Map<String, Room>();
Expand Down Expand Up @@ -171,20 +169,17 @@ ThunkAction<AppState> syncRooms(

// and is not already at the end of the last known batch
// the end would be room.prevHash == room.lastHash
final roomUpdated = store.state.roomStore.rooms[room.id];

// fetch previous messages since last /sync (a gap)
// determined by the fromSync function of room
final roomUpdated = store.state.roomStore.rooms[room.id];
if (roomUpdated != null && room.limited) {
debugPrint('[syncRooms] fetchMessageEvents called due to limited');
store.dispatch(
fetchMessageEvents(
room: room,
from: room.lastHash,
),
);
store.dispatch(fetchMessageEvents(
room: room,
from: room.prevHash,
));
}

// update room
store.dispatch(SetRoom(room: room));
});
};
Expand Down
12 changes: 0 additions & 12 deletions lib/store/rooms/events/actions.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@ import 'package:redux/redux.dart';
import 'package:redux_thunk/redux_thunk.dart';

// Project imports:
import 'package:syphon/global/algos.dart';
import 'package:syphon/global/libs/matrix/encryption.dart';
import 'package:syphon/global/libs/matrix/index.dart';
import 'package:syphon/store/alerts/actions.dart';
import 'package:syphon/store/crypto/actions.dart';
import 'package:syphon/store/crypto/events/actions.dart';
import 'package:syphon/store/crypto/keys/model.dart';
import 'package:syphon/store/index.dart';
import 'package:syphon/store/rooms/actions.dart';
import 'package:syphon/store/rooms/events/model.dart';
Expand Down Expand Up @@ -61,9 +58,6 @@ ThunkAction<AppState> fetchMessageEvents({
try {
store.dispatch(UpdateRoom(id: room.id, syncing: true));

// debugPrint('[to] $to');
// debugPrint('[from] $from');

final messagesJson = await compute(MatrixApi.fetchMessageEventsMapped, {
"protocol": protocol,
"homeserver": store.state.authStore.user.homeserver,
Expand All @@ -83,12 +77,6 @@ ThunkAction<AppState> fetchMessageEvents({
// The messages themselves
final List<dynamic> messages = messagesJson['chunk'] ?? [];

// TODO: remove after 0.1.3 is merged
// messages.forEach((element) {
// printJson(element);
// });
debugPrint('[OLDEST] ${oldest}');

// reuse the logic for syncing
await store.dispatch(
syncRooms({
Expand Down
48 changes: 23 additions & 25 deletions lib/store/rooms/room/model.dart
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ class Room {
@HiveField(17)
final Message draft;

// TODO: removed until state timeline work can be done
// @HiveField(19)
// TODO: removed until state timeline work can be done
// final List<Event> state;

@HiveField(20)
Expand Down Expand Up @@ -326,9 +325,9 @@ class Room {
accountEvents,
)
.fromStateEvents(
events: stateEvents,
invite: invite,
limited: limited,
events: stateEvents,
currentUser: currentUser,
)
.fromMessageEvents(
Expand Down Expand Up @@ -389,10 +388,8 @@ class Room {
bool direct = this.direct ?? false;
int lastUpdate = this.lastUpdate;
int namePriority = this.namePriority != 4 ? this.namePriority : 4;

Map<String, User> users = this.users ?? Map<String, User>();

// room state event filter
try {
events.forEach((event) {
final timestamp = event.timestamp ?? 0;
Expand Down Expand Up @@ -541,19 +538,27 @@ class Room {
lastUpdate = messagesNew[0].timestamp;
}

// Check to see if the new messages contain those existing in cache
if (messagesNew.isNotEmpty &&
messagesExisting.isNotEmpty &&
this.limited) {
final messageLatest = messagesExisting.firstWhere(
(msg) => msg.id == messagesNew[0].id,
orElse: () => null,
);
// limited indicates need to fetch additional data for room timelines
if (this.limited) {
// Check to see if the new messages contain those existing in cache
if (messagesNew.isNotEmpty && messagesExisting.isNotEmpty) {
final messageLatest = messagesExisting.firstWhere(
(msg) => msg.id == messagesNew[0].id,
orElse: () => null,
);
// Set limited to false if they now exist
limited = messageLatest != null;
}

// Set limited (used to recursively sync) to false if
// - new messages contains old ones
// - it's the first full /sync (lastHash == null)
limited = messageLatest != null || this.lastHash == null ? false : null;
// Set limited to false false if
// - the oldest hash (lastHash) is non-existant
// - the previous hash (most recent) is non-existant
// - the oldest hash equals the previously fetched hash
if (this.lastHash == null ||
this.prevHash == null ||
this.lastHash == this.prevHash) {
limited = false;
}
}

// Combine current and existing messages on unique ids
Expand All @@ -572,14 +577,7 @@ class Room {
// Filter to find startTime and endTime
final messagesAll = List<Message>.from(messagesMap.values);

// TODO: remove after 0.1.5 :( - message catchup works
if (true) {
// print('[fromMessageEvents] *** ${this.name} *** ');
// print('[limited] now ${limited}, before ${this.limited}');
// print('[lastHash] now ${lastHash}, before ${this.lastHash}');
// print('[prevHash] now ${prevHash}');
}

// Save values to room
return this.copyWith(
outbox: outbox,
messages: messagesAll,
Expand Down
1 change: 1 addition & 0 deletions lib/views/home/chat/index.dart
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ class _Props extends Equatable {
onLoadMoreMessages: () {
final room = store.state.roomStore.rooms[roomId] ?? Room();

// fetch messages beyond the oldest known message - lastHash
store.dispatch(fetchMessageEvents(
room: room,
from: room.lastHash,
Expand Down
1 change: 1 addition & 0 deletions pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ description: a privacy focused matrix client
# $ emulator -list-avds
# $ emulator -avd Pixel_3a_API_29
# $ adb shell && run-as org.tether.tether # cache inspection
# $ adb logcat ActivityManager:I flutter:I *:S

# desktop support options
# $ flutter config --enable-macos-desktop
Expand Down

0 comments on commit c7f18f6

Please sign in to comment.