Skip to content

Commit

Permalink
fix: fix concurrency issues + issue with launching registration from …
Browse files Browse the repository at this point in the history
…view … (#40)

…model scope

# What kind of change does this PR introduce?
 
<!-- (Bug fix, feature, docs update, ...) -->

# What is the current behavior?
 
<!-- You can also link to an open issue here -->

# What is the new behavior (if this is a feature change)?

# Other information
  • Loading branch information
migulyaev authored Sep 11, 2023
1 parent dcd5454 commit 1c1e624
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 122 deletions.
8 changes: 7 additions & 1 deletion app/src/main/java/tech/relaycorp/letro/App.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ package tech.relaycorp.letro

import android.app.Application
import dagger.hilt.android.HiltAndroidApp
import tech.relaycorp.letro.awala.AwalaManager
import javax.inject.Inject

@HiltAndroidApp
open class App : Application()
open class App : Application() {

@Inject
lateinit var awalaManager: AwalaManager
}
245 changes: 143 additions & 102 deletions app/src/main/java/tech/relaycorp/letro/awala/AwalaManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ import android.util.Log
import androidx.annotation.RawRes
import dagger.hilt.android.qualifiers.ApplicationContext
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.withContext
import tech.relaycorp.awaladroid.Awala
import tech.relaycorp.awaladroid.GatewayBindingException
import tech.relaycorp.awaladroid.GatewayClient
Expand All @@ -28,6 +32,7 @@ import tech.relaycorp.letro.awala.parser.AwalaMessageParser
import tech.relaycorp.letro.ui.navigation.Route
import tech.relaycorp.letro.utils.awala.loadNonNullFirstPartyEndpoint
import tech.relaycorp.letro.utils.awala.loadNonNullThirdPartyEndpoint
import java.util.concurrent.atomic.AtomicBoolean
import javax.inject.Inject

interface AwalaManager {
Expand All @@ -39,6 +44,7 @@ interface AwalaManager {
suspend fun isAwalaInstalled(currentScreen: Route): Boolean
}

@OptIn(ExperimentalCoroutinesApi::class)
class AwalaManagerImpl @Inject constructor(
private val awalaRepository: AwalaRepository,
@ApplicationContext private val context: Context,
Expand All @@ -47,178 +53,213 @@ class AwalaManagerImpl @Inject constructor(

private val awalaScope = CoroutineScope(Dispatchers.IO)

@OptIn(DelicateCoroutinesApi::class)
private val awalaThreadContext = newSingleThreadContext("AwalaManagerThread")

@OptIn(DelicateCoroutinesApi::class)
private val messageReceivingThreadContext = newSingleThreadContext("AwalaManagerMessageReceiverThread")

private val _incomingMessages = Channel<AwalaIncomingMessage<*>>()
override val incomingMessages: Flow<AwalaIncomingMessage<*>>
get() = _incomingMessages.receiveAsFlow()

@Volatile
private var isAwalaSetUp = false
private var isAwalaSetUp = AtomicBoolean(false)
private var awalaSetupJob: Job? = null

@Volatile
private var isAwalaInstalledOnDevice: Boolean? = null

@Volatile
private var isReceivingMessages = false

private var firstPartyEndpoint: FirstPartyEndpoint? = null

private var thirdPartyServerEndpoint: ThirdPartyEndpoint? = null

init {
Log.i(TAG, "initializing")
awalaSetupJob = awalaScope.launch {
Awala.setUp(context)
checkIfAwalaAppInstalled()
isAwalaSetUp = true
awalaSetupJob = null
withContext(awalaThreadContext) {
Awala.setUp(context)
checkIfAwalaAppInstalled()
isAwalaSetUp.compareAndSet(false, true)
awalaSetupJob = null
}
}
}

override suspend fun sendMessage(
outgoingMessage: AwalaOutgoingMessage,
recipient: MessageRecipient,
) {
val firstPartyEndpoint = loadFirstPartyEndpoint()
val thirdPartyEndpoint = loadThirdPartyEndpoint(recipient)
Log.i(TAG, "sendMessage() from ${firstPartyEndpoint.nodeId} to ${thirdPartyEndpoint.nodeId}: $outgoingMessage)")
GatewayClient.sendMessage(
OutgoingMessage.build(
type = outgoingMessage.type.value,
content = outgoingMessage.content,
senderEndpoint = firstPartyEndpoint,
recipientEndpoint = thirdPartyEndpoint,
),
)
withContext(awalaThreadContext) {
if (outgoingMessage.type != MessageType.AuthorizeReceivingFromServer && awalaSetupJob != null) {
Log.i(TAG, "Awala wasn't initialized while tried to send a message. Wait for completion... $outgoingMessage")
awalaSetupJob?.join()
Log.i(TAG, "Awala was initialized, proceed futher...")
}
val firstPartyEndpoint = loadFirstPartyEndpoint()
val thirdPartyEndpoint = loadThirdPartyEndpoint(recipient)
Log.i(TAG, "sendMessage() from ${firstPartyEndpoint.nodeId} to ${thirdPartyEndpoint.nodeId}: $outgoingMessage)")
GatewayClient.sendMessage(
OutgoingMessage.build(
type = outgoingMessage.type.value,
content = outgoingMessage.content,
senderEndpoint = firstPartyEndpoint,
recipientEndpoint = thirdPartyEndpoint,
),
)
}
}

override suspend fun isAwalaInstalled(currentScreen: Route): Boolean {
if (!isAwalaSetUp) {
awalaSetupJob?.join()
}
return if (currentScreen == Route.AwalaNotInstalled) {
checkIfAwalaAppInstalled()
} else {
isAwalaInstalledOnDevice ?: checkIfAwalaAppInstalled()
val isInstalled = withContext(awalaThreadContext) {
if (!isAwalaSetUp.get()) {
awalaSetupJob?.join()
}
if (currentScreen == Route.AwalaNotInstalled) {
checkIfAwalaAppInstalled()
} else {
isAwalaInstalledOnDevice ?: checkIfAwalaAppInstalled()
}
}
return isInstalled
}

private suspend fun loadFirstPartyEndpoint(): FirstPartyEndpoint {
val firstPartyEndpointNodeId = awalaRepository.getServerFirstPartyEndpointNodeId()
?: registerFirstPartyEndpointIfNeeded()?.nodeId
?: throw IllegalStateException("You should register first party endpoint first!")
return firstPartyEndpoint ?: loadNonNullFirstPartyEndpoint(firstPartyEndpointNodeId)
return withContext(awalaThreadContext) {
val firstPartyEndpointNodeId = awalaRepository.getServerFirstPartyEndpointNodeId()
?: registerFirstPartyEndpointIfNeeded()?.nodeId
?: throw IllegalStateException("You should register first party endpoint first!")
firstPartyEndpoint ?: loadNonNullFirstPartyEndpoint(firstPartyEndpointNodeId)
}
}

private suspend fun loadThirdPartyEndpoint(recipient: MessageRecipient): ThirdPartyEndpoint {
if (recipient is MessageRecipient.Server) {
thirdPartyServerEndpoint?.let {
return it
}
}
val thirdPartyEndpointNodeId = when (recipient) {
is MessageRecipient.Server -> {
recipient.nodeId
?: awalaRepository.getServerThirdPartyEndpointNodeId()
?: importServerThirdPartyEndpointIfNeeded()?.nodeId
?: throw IllegalStateException("You should register third party endpoint first!")
return withContext(awalaThreadContext) {
if (recipient is MessageRecipient.Server) {
thirdPartyServerEndpoint?.let {
return@withContext it
}
}
is MessageRecipient.User -> {
Log.e(TAG, "Cannot find third-party endpoint ${recipient.nodeId}")
throw IllegalStateException("Cannot find third-party endpoint ${recipient.nodeId}")
val thirdPartyEndpointNodeId = when (recipient) {
is MessageRecipient.Server -> {
recipient.nodeId
?: awalaRepository.getServerThirdPartyEndpointNodeId()
?: importServerThirdPartyEndpointIfNeeded()?.nodeId
?: throw IllegalStateException("You should register third party endpoint first!")
}

is MessageRecipient.User -> {
Log.e(TAG, "Cannot find third-party endpoint ${recipient.nodeId}")
throw IllegalStateException("Cannot find third-party endpoint ${recipient.nodeId}")
}
}
loadNonNullThirdPartyEndpoint(thirdPartyEndpointNodeId)
}
return loadNonNullThirdPartyEndpoint(thirdPartyEndpointNodeId)
}

private suspend fun startReceivingMessages() {
if (isReceivingMessages) {
return
}
awalaScope.launch(messageReceivingThreadContext) {
if (isReceivingMessages) {
return@launch
}
isReceivingMessages = true

awalaScope.launch {
Log.i(TAG, "start receiving messages...")
GatewayClient.receiveMessages().collect { message ->
val type = MessageType.from(message.type)
val parsedMessage = parser.parse(type, message.content).also { Log.i(TAG, "Receive message: ($it)") }
val parsedMessage = parser.parse(type, message.content)
.also { Log.i(TAG, "Receive message: ($it)") }
_incomingMessages.send(parsedMessage)
message.ack()
}
}

isReceivingMessages = true
}

private suspend fun configureAwala() {
registerFirstPartyEndpointIfNeeded()
importServerThirdPartyEndpointIfNeeded()
withContext(awalaThreadContext) {
registerFirstPartyEndpointIfNeeded()
importServerThirdPartyEndpointIfNeeded()
}
}

private suspend fun checkIfAwalaAppInstalled(): Boolean {
try {
GatewayClient.bind()
configureAwala()
} catch (exp: GatewayBindingException) {
this.isAwalaInstalledOnDevice = false
return false
return withContext(awalaThreadContext) {
try {
GatewayClient.bind()
configureAwala()
} catch (exp: GatewayBindingException) {
this@AwalaManagerImpl.isAwalaInstalledOnDevice = false
return@withContext false
}
this@AwalaManagerImpl.isAwalaInstalledOnDevice = true
true
}
this.isAwalaInstalledOnDevice = true
return true
}

private suspend fun registerFirstPartyEndpointIfNeeded(): FirstPartyEndpoint? {
if (awalaRepository.getServerFirstPartyEndpointNodeId() != null) {
return withContext(awalaThreadContext) {
if (awalaRepository.getServerFirstPartyEndpointNodeId() != null) {
startReceivingMessages()
return@withContext null
}
val firstPartyEndpoint = FirstPartyEndpoint.register()
awalaRepository.saveServerFirstPartyEndpointNodeId(firstPartyEndpoint.nodeId)
Log.i(TAG, "First party endpoint was registred ${firstPartyEndpoint.nodeId}")
startReceivingMessages()
return null
firstPartyEndpoint
}
val firstPartyEndpoint = FirstPartyEndpoint.register()
awalaRepository.saveServerFirstPartyEndpointNodeId(firstPartyEndpoint.nodeId)
startReceivingMessages()
return firstPartyEndpoint
}

private suspend fun importServerThirdPartyEndpointIfNeeded(): ThirdPartyEndpoint? {
if (awalaRepository.getServerThirdPartyEndpointNodeId() != null) {
return null
}
return withContext(awalaThreadContext) {
if (awalaRepository.getServerThirdPartyEndpointNodeId() != null) {
return@withContext null
}

val firstPartyEndpointNodeId = awalaRepository.getServerFirstPartyEndpointNodeId()
?: registerFirstPartyEndpointIfNeeded()?.nodeId
?: throw IllegalStateException("You should register first party endpoint first!")

val thirdPartyEndpoint = importServerThirdPartyEndpoint(
connectionParams = R.raw.server_connection_params,
)

val firstPartyEndpoint = loadNonNullFirstPartyEndpoint(firstPartyEndpointNodeId)

// Create the Parcel Delivery Authorisation (PDA)
val auth = firstPartyEndpoint.authorizeIndefinitely(thirdPartyEndpoint)
sendMessage(
outgoingMessage = AwalaOutgoingMessage(
type = MessageType.AuthorizeReceivingFromServer,
content = auth,
),
recipient = MessageRecipient.Server(
nodeId = thirdPartyEndpoint.nodeId,
),
)
awalaRepository.saveServerThirdPartyEndpointNodeId(thirdPartyEndpoint.nodeId)
return thirdPartyEndpoint
val firstPartyEndpointNodeId = awalaRepository.getServerFirstPartyEndpointNodeId()
?: registerFirstPartyEndpointIfNeeded()?.nodeId
?: throw IllegalStateException("You should register first party endpoint first!")

val thirdPartyEndpoint = importServerThirdPartyEndpoint(
connectionParams = R.raw.server_connection_params,
)
Log.i(TAG, "Server third party endpoint was imported ${thirdPartyEndpoint.nodeId}")

val firstPartyEndpoint = loadNonNullFirstPartyEndpoint(firstPartyEndpointNodeId)

// Create the Parcel Delivery Authorisation (PDA)
val auth = firstPartyEndpoint.authorizeIndefinitely(thirdPartyEndpoint)
sendMessage(
outgoingMessage = AwalaOutgoingMessage(
type = MessageType.AuthorizeReceivingFromServer,
content = auth,
),
recipient = MessageRecipient.Server(
nodeId = thirdPartyEndpoint.nodeId,
),
)
awalaRepository.saveServerThirdPartyEndpointNodeId(thirdPartyEndpoint.nodeId)
thirdPartyEndpoint
}
}

@Throws(InvalidConnectionParams::class)
private suspend fun importServerThirdPartyEndpoint(
@RawRes connectionParams: Int,
): PublicThirdPartyEndpoint {
val endpoint = try {
PublicThirdPartyEndpoint.import(
context.resources.openRawResource(connectionParams).use {
it.readBytes()
},
)
} catch (e: InvalidThirdPartyEndpoint) {
throw InvalidConnectionParams(e)
return withContext(awalaThreadContext) {
val endpoint = try {
PublicThirdPartyEndpoint.import(
context.resources.openRawResource(connectionParams).use {
it.readBytes()
},
)
} catch (e: InvalidThirdPartyEndpoint) {
throw InvalidConnectionParams(e)
}
endpoint
}
return endpoint
}

private companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import tech.relaycorp.letro.onboarding.registration.dto.RegistrationResponseInco
import javax.inject.Inject

interface RegistrationRepository {
suspend fun createNewAccount(id: String)
fun createNewAccount(id: String)
}

class RegistrationRepositoryImpl @Inject constructor(
Expand All @@ -33,15 +33,17 @@ class RegistrationRepositoryImpl @Inject constructor(
}
}

override suspend fun createNewAccount(id: String) {
accountRepository.createAccount(id)
awalaManager
.sendMessage(
outgoingMessage = AwalaOutgoingMessage(
type = MessageType.AccountCreationRequest,
content = id.toByteArray(),
),
recipient = MessageRecipient.Server(),
)
override fun createNewAccount(id: String) {
scope.launch {
accountRepository.createAccount(id)
awalaManager
.sendMessage(
outgoingMessage = AwalaOutgoingMessage(
type = MessageType.AccountCreationRequest,
content = id.toByteArray(),
),
recipient = MessageRecipient.Server(),
)
}
}
}
Loading

0 comments on commit 1c1e624

Please sign in to comment.