Skip to content

Commit

Permalink
feat: process incoming messages in-band (#41)
Browse files Browse the repository at this point in the history
# What kind of change does this PR introduce?
 
Process awala messages in-band

# What is the current behavior?
 
Now there is only message parsing is performed in-band. Other work with message content (for example, saving data to a database) is performed after acknowledging of a message.

# What is the new behavior (if this is a feature change)?
All work with message content is performed before message.ack() call.
  • Loading branch information
migulyaev authored Sep 11, 2023
1 parent 1c1e624 commit 3c06aa8
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.launch
import tech.relaycorp.letro.account.model.Account
import javax.inject.Inject
Expand All @@ -21,7 +22,7 @@ class AccountRepositoryImpl @Inject constructor(

private val databaseScope: CoroutineScope = CoroutineScope(Dispatchers.IO)
private val _allAccounts = MutableSharedFlow<List<Account>>()
private val _currentAccount = MutableSharedFlow<Account?>()
private val _currentAccount = MutableStateFlow<Account?>(null)
override val currentAccount: Flow<Account?>
get() = _currentAccount

Expand Down
24 changes: 7 additions & 17 deletions app/src/main/java/tech/relaycorp/letro/awala/AwalaManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.withContext
Expand All @@ -24,19 +21,17 @@ import tech.relaycorp.awaladroid.endpoint.PublicThirdPartyEndpoint
import tech.relaycorp.awaladroid.endpoint.ThirdPartyEndpoint
import tech.relaycorp.awaladroid.messaging.OutgoingMessage
import tech.relaycorp.letro.R
import tech.relaycorp.letro.awala.message.AwalaIncomingMessage
import tech.relaycorp.letro.awala.message.AwalaOutgoingMessage
import tech.relaycorp.letro.awala.message.MessageRecipient
import tech.relaycorp.letro.awala.message.MessageType
import tech.relaycorp.letro.awala.parser.AwalaMessageParser
import tech.relaycorp.letro.awala.processor.AwalaMessageProcessor
import tech.relaycorp.letro.ui.navigation.Route
import tech.relaycorp.letro.utils.awala.loadNonNullFirstPartyEndpoint
import tech.relaycorp.letro.utils.awala.loadNonNullThirdPartyEndpoint
import java.util.concurrent.atomic.AtomicBoolean
import javax.inject.Inject

interface AwalaManager {
val incomingMessages: Flow<AwalaIncomingMessage<*>>
suspend fun sendMessage(
outgoingMessage: AwalaOutgoingMessage,
recipient: MessageRecipient,
Expand All @@ -48,7 +43,7 @@ interface AwalaManager {
class AwalaManagerImpl @Inject constructor(
private val awalaRepository: AwalaRepository,
@ApplicationContext private val context: Context,
private val parser: AwalaMessageParser,
private val processor: AwalaMessageProcessor,
) : AwalaManager {

private val awalaScope = CoroutineScope(Dispatchers.IO)
Expand All @@ -59,10 +54,6 @@ class AwalaManagerImpl @Inject constructor(
@OptIn(DelicateCoroutinesApi::class)
private val messageReceivingThreadContext = newSingleThreadContext("AwalaManagerMessageReceiverThread")

private val _incomingMessages = Channel<AwalaIncomingMessage<*>>()
override val incomingMessages: Flow<AwalaIncomingMessage<*>>
get() = _incomingMessages.receiveAsFlow()

private var isAwalaSetUp = AtomicBoolean(false)
private var awalaSetupJob: Job? = null

Expand Down Expand Up @@ -167,10 +158,9 @@ class AwalaManagerImpl @Inject constructor(

Log.i(TAG, "start receiving messages...")
GatewayClient.receiveMessages().collect { message ->
val type = MessageType.from(message.type)
val parsedMessage = parser.parse(type, message.content)
.also { Log.i(TAG, "Receive message: ($it)") }
_incomingMessages.send(parsedMessage)
Log.i(TAG, "Receive message: ${message.type}: ($message)")
processor.process(message)
Log.i(TAG, "Message processed")
message.ack()
}
}
Expand Down Expand Up @@ -262,8 +252,8 @@ class AwalaManagerImpl @Inject constructor(
}
}

private companion object {
private const val TAG = "AwalaManager"
companion object {
const val TAG = "AwalaManager"
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,7 @@
package tech.relaycorp.letro.awala.parser

import tech.relaycorp.letro.awala.message.AwalaIncomingMessage
import tech.relaycorp.letro.awala.message.MessageType

interface AwalaMessageParser {
fun parse(type: MessageType, content: ByteArray): AwalaIncomingMessage<*>
}

class AwalaMessageParserImpl constructor(
private val parsers: Map<MessageType, AwalaMessageParser>,
) : AwalaMessageParser {
override fun parse(type: MessageType, content: ByteArray): AwalaIncomingMessage<*> {
return parsers[type]?.parse(type, content) ?: throw IllegalStateException("No parser for messageType: $type")
}
fun parse(content: ByteArray): AwalaIncomingMessage<*>
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ interface UnknownMessageParser : AwalaMessageParser

class UnknownMessageParserImpl @Inject constructor() : UnknownMessageParser {

override fun parse(type: MessageType, content: ByteArray): AwalaIncomingMessage<*> {
override fun parse(content: ByteArray): AwalaIncomingMessage<*> {
return UnknownIncomingMessage(
content = content.decodeToString(),
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package tech.relaycorp.letro.awala.processor

import tech.relaycorp.awaladroid.messaging.IncomingMessage
import tech.relaycorp.letro.awala.message.MessageType

interface AwalaMessageProcessor {
suspend fun process(message: IncomingMessage)
}

class AwalaMessageProcessorImpl constructor(
private val processors: Map<MessageType, AwalaMessageProcessor>,
) : AwalaMessageProcessor {

override suspend fun process(message: IncomingMessage) {
val type = MessageType.from(message.type)
processors[type]!!.process(message)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package tech.relaycorp.letro.awala.processor

import android.util.Log
import tech.relaycorp.awaladroid.messaging.IncomingMessage
import tech.relaycorp.letro.awala.AwalaManagerImpl
import javax.inject.Inject

class UnknownMessageProcessor @Inject constructor() : AwalaMessageProcessor {

override suspend fun process(message: IncomingMessage) {
Log.w(AwalaManagerImpl.TAG, "Unknown message processor for type: ${message.type}")
}
}
23 changes: 12 additions & 11 deletions app/src/main/java/tech/relaycorp/letro/di/AwalaModule.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,28 @@ import tech.relaycorp.letro.awala.AwalaManagerImpl
import tech.relaycorp.letro.awala.AwalaRepository
import tech.relaycorp.letro.awala.AwalaRepositoryImpl
import tech.relaycorp.letro.awala.message.MessageType
import tech.relaycorp.letro.awala.parser.AwalaMessageParser
import tech.relaycorp.letro.awala.parser.AwalaMessageParserImpl
import tech.relaycorp.letro.awala.parser.UnknownMessageParser
import tech.relaycorp.letro.awala.parser.UnknownMessageParserImpl
import tech.relaycorp.letro.onboarding.registration.parser.RegistrationMessageParser
import tech.relaycorp.letro.awala.processor.AwalaMessageProcessor
import tech.relaycorp.letro.awala.processor.AwalaMessageProcessorImpl
import tech.relaycorp.letro.awala.processor.UnknownMessageProcessor
import tech.relaycorp.letro.onboarding.registration.processor.RegistrationMessageProcessor
import javax.inject.Singleton

@Module
@InstallIn(SingletonComponent::class)
object AwalaModule {

@Provides
fun provideMessageParser(
registrationParser: RegistrationMessageParser,
unknownMessageParser: UnknownMessageParser,
): AwalaMessageParser {
val parsers = mapOf(
MessageType.AccountCreationCompleted to registrationParser,
MessageType.Unknown to unknownMessageParser,
fun provideMessageProcessor(
registrationMessageProcessor: RegistrationMessageProcessor,
unknownMessageProcessor: UnknownMessageProcessor,
): AwalaMessageProcessor {
val processors = mapOf(
MessageType.AccountCreationCompleted to registrationMessageProcessor,
MessageType.Unknown to unknownMessageProcessor,
)
return AwalaMessageParserImpl(parsers)
return AwalaMessageProcessorImpl(processors)
}

@Module
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import tech.relaycorp.letro.onboarding.registration.RegistrationRepository
import tech.relaycorp.letro.onboarding.registration.RegistrationRepositoryImpl
import tech.relaycorp.letro.onboarding.registration.parser.RegistrationMessageParser
import tech.relaycorp.letro.onboarding.registration.parser.RegistrationMessageParserImpl
import tech.relaycorp.letro.onboarding.registration.processor.RegistrationMessageProcessor
import tech.relaycorp.letro.onboarding.registration.processor.RegistrationMessageProcessorImpl
import javax.inject.Singleton

@Module
Expand Down Expand Up @@ -39,4 +41,9 @@ interface RegistrationModuleSingleton {
fun bindRegistrationMessageParser(
impl: RegistrationMessageParserImpl,
): RegistrationMessageParser

@Binds
fun bindRegistrationMessageProcessor(
impl: RegistrationMessageProcessorImpl,
): RegistrationMessageProcessor
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package tech.relaycorp.letro.onboarding.registration

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.launch
import tech.relaycorp.letro.account.storage.AccountRepository
import tech.relaycorp.letro.awala.AwalaManager
import tech.relaycorp.letro.awala.message.AwalaOutgoingMessage
import tech.relaycorp.letro.awala.message.MessageRecipient
import tech.relaycorp.letro.awala.message.MessageType
import tech.relaycorp.letro.onboarding.registration.dto.RegistrationResponseIncomingMessage
import javax.inject.Inject

interface RegistrationRepository {
Expand All @@ -23,16 +21,6 @@ class RegistrationRepositoryImpl @Inject constructor(

private val scope = CoroutineScope(Dispatchers.IO)

init {
scope.launch {
awalaManager.incomingMessages
.filterIsInstance(RegistrationResponseIncomingMessage::class)
.collect {
accountRepository.updateAccountId(it.content.requestedVeraId, it.content.assignedVeraId)
}
}
}

override fun createNewAccount(id: String) {
scope.launch {
accountRepository.createAccount(id)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package tech.relaycorp.letro.onboarding.registration.parser

import tech.relaycorp.letro.awala.message.AwalaIncomingMessage
import tech.relaycorp.letro.awala.message.MessageType
import tech.relaycorp.letro.awala.parser.AwalaMessageParser
import tech.relaycorp.letro.onboarding.registration.dto.RegistrationResponse
import tech.relaycorp.letro.onboarding.registration.dto.RegistrationResponseIncomingMessage
Expand All @@ -12,7 +10,7 @@ interface RegistrationMessageParser : AwalaMessageParser

class RegistrationMessageParserImpl @Inject constructor() : RegistrationMessageParser {

override fun parse(type: MessageType, content: ByteArray): AwalaIncomingMessage<*> {
override fun parse(content: ByteArray): RegistrationResponseIncomingMessage {
val veraIds = content.toString(Charset.defaultCharset()).split(",")
val response = RegistrationResponse(
requestedVeraId = veraIds[0],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package tech.relaycorp.letro.onboarding.registration.processor

import tech.relaycorp.awaladroid.messaging.IncomingMessage
import tech.relaycorp.letro.account.storage.AccountRepository
import tech.relaycorp.letro.awala.processor.AwalaMessageProcessor
import tech.relaycorp.letro.onboarding.registration.dto.RegistrationResponseIncomingMessage
import tech.relaycorp.letro.onboarding.registration.parser.RegistrationMessageParser
import javax.inject.Inject

interface RegistrationMessageProcessor : AwalaMessageProcessor

class RegistrationMessageProcessorImpl @Inject constructor(
private val parser: RegistrationMessageParser,
private val accountRepository: AccountRepository,
) : RegistrationMessageProcessor {

override suspend fun process(message: IncomingMessage) {
val response = parser.parse(message.content) as RegistrationResponseIncomingMessage
accountRepository.updateAccountId(response.content.requestedVeraId, response.content.assignedVeraId)
}
}

0 comments on commit 3c06aa8

Please sign in to comment.