Skip to content

Commit

Permalink
feat: support consuming external events for debugging purposes (WPB-1…
Browse files Browse the repository at this point in the history
…0756) (#2971)

* feat: support consuming external events for debugging purposes

* fix: move event parsing into EventRepository to avoid accessing network layer from a use case

---------

Co-authored-by: Mohamad Jaara <[email protected]>
  • Loading branch information
typfel and MohamadJaara authored Sep 5, 2024
1 parent 2f11cb1 commit c91a5fe
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,22 @@ import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.isActive
import kotlinx.serialization.json.Json
import kotlin.coroutines.coroutineContext

interface EventRepository {

suspend fun pendingEvents(): Flow<Either<CoreFailure, EventEnvelope>>
suspend fun liveEvents(): Either<CoreFailure, Flow<WebSocketEvent<EventEnvelope>>>
suspend fun updateLastProcessedEventId(eventId: String): Either<StorageFailure, Unit>

/**
* Parse events from an external JSON payload
*
* @return List of [EventEnvelope]
*/
fun parseExternalEvents(data: String): List<EventEnvelope>

/**
* Retrieves the last processed event ID from the storage.
*
Expand Down Expand Up @@ -145,6 +154,13 @@ class EventDataSource(
}
}

override fun parseExternalEvents(data: String): List<EventEnvelope> {
val notificationResponse = Json.decodeFromString<NotificationResponse>(data)
return notificationResponse.notifications.flatMap {
eventMapper.fromDTO(it, isLive = false)
}
}

override suspend fun lastProcessedEventId(): Either<StorageFailure, String> = wrapStorageRequest {
metadataDAO.valueByKey(LAST_PROCESSED_EVENT_ID_KEY)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1794,6 +1794,7 @@ class UserSessionScope internal constructor(
userRepository,
userId,
assetRepository,
eventRepository,
syncManager,
slowSyncRepository,
messageSendingScheduler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.wire.kalium.logic.data.client.remote.ClientRemoteRepository
import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.conversation.LegalHoldStatusMapperImpl
import com.wire.kalium.logic.data.conversation.MLSConversationRepository
import com.wire.kalium.logic.data.event.EventRepository
import com.wire.kalium.logic.data.id.CurrentClientIdProvider
import com.wire.kalium.logic.data.message.MessageRepository
import com.wire.kalium.logic.data.message.ProtoContentMapper
Expand Down Expand Up @@ -78,6 +79,7 @@ class DebugScope internal constructor(
private val userRepository: UserRepository,
private val userId: UserId,
private val assetRepository: AssetRepository,
private val eventRepository: EventRepository,
private val syncManager: SyncManager,
private val slowSyncRepository: SlowSyncRepository,
private val messageSendingScheduler: MessageSendingScheduler,
Expand Down Expand Up @@ -116,6 +118,12 @@ class DebugScope internal constructor(
eventProcessor = eventProcessor
)

val synchronizeExternalData: SynchronizeExternalDataUseCase
get() = SynchronizeExternalDataUseCaseImpl(
eventRepository = eventRepository,
eventProcessor = eventProcessor
)

private val messageSendFailureHandler: MessageSendFailureHandler
get() = MessageSendFailureHandlerImpl(
userRepository,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Wire
* Copyright (C) 2024 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package com.wire.kalium.logic.feature.debug

import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.data.event.EventRepository
import com.wire.kalium.logic.functional.fold
import com.wire.kalium.logic.functional.foldToEitherWhileRight
import com.wire.kalium.logic.sync.incremental.EventProcessor

fun interface SynchronizeExternalDataUseCase {

/**
* Consume event data coming from an external source.
*
* @param data NotificationResponse serialized to JSON
* @return an [SynchronizeExternalDataResult] containing a [CoreFailure] in case anything goes wrong
*/
suspend operator fun invoke(
data: String,
): SynchronizeExternalDataResult

}

sealed class SynchronizeExternalDataResult {
data object Success : SynchronizeExternalDataResult()
data class Failure(val coreFailure: CoreFailure) : SynchronizeExternalDataResult()
}

internal class SynchronizeExternalDataUseCaseImpl(
val eventRepository: EventRepository,
val eventProcessor: EventProcessor
) : SynchronizeExternalDataUseCase {

override suspend operator fun invoke(
data: String,
): SynchronizeExternalDataResult {
return eventRepository.parseExternalEvents(data).foldToEitherWhileRight(Unit) { event, _ ->
eventProcessor.processEvent(event)
}.fold({
return@fold SynchronizeExternalDataResult.Failure(it)
}, {
return@fold SynchronizeExternalDataResult.Success
})
}
}

0 comments on commit c91a5fe

Please sign in to comment.