diff --git a/src/frontend/app/src/main/kotlin/pt/ulisboa/ist/pharmacist/service/real_time_updates/RealTimeUpdatesService.kt b/src/frontend/app/src/main/kotlin/pt/ulisboa/ist/pharmacist/service/real_time_updates/RealTimeUpdatesService.kt index 13505dc..9ffc498 100644 --- a/src/frontend/app/src/main/kotlin/pt/ulisboa/ist/pharmacist/service/real_time_updates/RealTimeUpdatesService.kt +++ b/src/frontend/app/src/main/kotlin/pt/ulisboa/ist/pharmacist/service/real_time_updates/RealTimeUpdatesService.kt @@ -2,7 +2,10 @@ package pt.ulisboa.ist.pharmacist.service.real_time_updates import android.util.Log import com.google.gson.Gson +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.callbackFlow @@ -27,6 +30,18 @@ class RealTimeUpdatesService( private val updateSubscribeFlow = MutableSharedFlow>() private val updatePublishFlow = MutableSharedFlow() + private val currentSubscriptions = mutableListOf() + + private val subscribeScope = CoroutineScope(Dispatchers.Default) + + init { + subscribeScope.launch { + updateSubscribeFlow.collect { subscriptions -> + currentSubscriptions.addAll(subscriptions) + } + } + } + /** * Starts the real time updates service, opening the web socket and listening for * log in and log out events to determine if a new web socket should open. @@ -38,12 +53,23 @@ class RealTimeUpdatesService( Log.d(TAG, "Already logged in. Starting WebSocket listener flow") getWebSocketListenerFlow().collect(updatePublishFlow::emit) } + while (sessionManager.isLoggedIn()) { + Log.d(TAG, "User is still logged in. Starting new WebSocket listener flow") + getWebSocketListenerFlow().collect(updatePublishFlow::emit) + delay(2000) + } + Log.d(TAG, "Listening to logInFlow") sessionManager.logInFlow.collect { loggedIn -> - if (loggedIn && sessionManager.isLoggedIn()) { + if (loggedIn) { Log.d(TAG, "Became logged in (logInFlow). Starting WebSocket listener flow") getWebSocketListenerFlow().collect(updatePublishFlow::emit) } + while (sessionManager.isLoggedIn()) { + Log.d(TAG, "User is still logged in. Starting new WebSocket listener flow") + getWebSocketListenerFlow().collect(updatePublishFlow::emit) + delay(2000) + } } } } @@ -53,6 +79,11 @@ class RealTimeUpdatesService( override fun onOpen(webSocket: WebSocket, response: Response) { super.onOpen(webSocket, response) Log.d(TAG, "WebSocket opened") + + Log.d(TAG, "Sending subscriptions to WebSocket") + val json = Gson().toJson(currentSubscriptions.map { it.toDto() }.toTypedArray()) + val sent = webSocket.send(json) + Log.d(TAG, "Successfully sent subscriptions to WebSocket: $sent") } override fun onMessage(webSocket: WebSocket, text: String) {