Skip to content

Commit

Permalink
Fixed real time updates not working after internet drops.
Browse files Browse the repository at this point in the history
  • Loading branch information
Nyckoka committed May 19, 2024
1 parent 901c523 commit 767d324
Showing 1 changed file with 32 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package pt.ulisboa.ist.pharmacist.service.real_time_updates

import android.util.Log
import com.google.gson.Gson
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.callbackFlow
Expand All @@ -27,6 +30,18 @@ class RealTimeUpdatesService(
private val updateSubscribeFlow = MutableSharedFlow<Array<RealTimeUpdateSubscription>>()
private val updatePublishFlow = MutableSharedFlow<RealTimeUpdatePublishingDto>()

private val currentSubscriptions = mutableListOf<RealTimeUpdateSubscription>()

private val subscribeScope = CoroutineScope(Dispatchers.Default)

init {
subscribeScope.launch {
updateSubscribeFlow.collect { subscriptions ->
currentSubscriptions.addAll(subscriptions)
}
}
}

/**
* Starts the real time updates service, opening the web socket and listening for
* log in and log out events to determine if a new web socket should open.
Expand All @@ -38,12 +53,23 @@ class RealTimeUpdatesService(
Log.d(TAG, "Already logged in. Starting WebSocket listener flow")
getWebSocketListenerFlow().collect(updatePublishFlow::emit)
}
while (sessionManager.isLoggedIn()) {
Log.d(TAG, "User is still logged in. Starting new WebSocket listener flow")
getWebSocketListenerFlow().collect(updatePublishFlow::emit)
delay(2000)
}

Log.d(TAG, "Listening to logInFlow")
sessionManager.logInFlow.collect { loggedIn ->
if (loggedIn && sessionManager.isLoggedIn()) {
if (loggedIn) {
Log.d(TAG, "Became logged in (logInFlow). Starting WebSocket listener flow")
getWebSocketListenerFlow().collect(updatePublishFlow::emit)
}
while (sessionManager.isLoggedIn()) {
Log.d(TAG, "User is still logged in. Starting new WebSocket listener flow")
getWebSocketListenerFlow().collect(updatePublishFlow::emit)
delay(2000)
}
}
}
}
Expand All @@ -53,6 +79,11 @@ class RealTimeUpdatesService(
override fun onOpen(webSocket: WebSocket, response: Response) {
super.onOpen(webSocket, response)
Log.d(TAG, "WebSocket opened")

Log.d(TAG, "Sending subscriptions to WebSocket")
val json = Gson().toJson(currentSubscriptions.map { it.toDto() }.toTypedArray())
val sent = webSocket.send(json)
Log.d(TAG, "Successfully sent subscriptions to WebSocket: $sent")
}

override fun onMessage(webSocket: WebSocket, text: String) {
Expand Down

0 comments on commit 767d324

Please sign in to comment.