Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix concurrency issues + issue with launching registration from view … #40

Merged
merged 2 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion app/src/main/java/tech/relaycorp/letro/App.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ package tech.relaycorp.letro

import android.app.Application
import dagger.hilt.android.HiltAndroidApp
import tech.relaycorp.letro.awala.AwalaManager
import javax.inject.Inject

@HiltAndroidApp
open class App : Application()
open class App : Application() {

@Inject
lateinit var awalaManager: AwalaManager
}
245 changes: 143 additions & 102 deletions app/src/main/java/tech/relaycorp/letro/awala/AwalaManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ import android.util.Log
import androidx.annotation.RawRes
import dagger.hilt.android.qualifiers.ApplicationContext
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.withContext
import tech.relaycorp.awaladroid.Awala
import tech.relaycorp.awaladroid.GatewayBindingException
import tech.relaycorp.awaladroid.GatewayClient
Expand All @@ -28,6 +32,7 @@ import tech.relaycorp.letro.awala.parser.AwalaMessageParser
import tech.relaycorp.letro.ui.navigation.Route
import tech.relaycorp.letro.utils.awala.loadNonNullFirstPartyEndpoint
import tech.relaycorp.letro.utils.awala.loadNonNullThirdPartyEndpoint
import java.util.concurrent.atomic.AtomicBoolean
import javax.inject.Inject

interface AwalaManager {
Expand All @@ -39,6 +44,7 @@ interface AwalaManager {
suspend fun isAwalaInstalled(currentScreen: Route): Boolean
}

@OptIn(ExperimentalCoroutinesApi::class)
class AwalaManagerImpl @Inject constructor(
private val awalaRepository: AwalaRepository,
@ApplicationContext private val context: Context,
Expand All @@ -47,178 +53,213 @@ class AwalaManagerImpl @Inject constructor(

private val awalaScope = CoroutineScope(Dispatchers.IO)

@OptIn(DelicateCoroutinesApi::class)
private val awalaThreadContext = newSingleThreadContext("AwalaManagerThread")

@OptIn(DelicateCoroutinesApi::class)
private val messageReceivingThreadContext = newSingleThreadContext("AwalaManagerMessageReceiverThread")

private val _incomingMessages = Channel<AwalaIncomingMessage<*>>()
override val incomingMessages: Flow<AwalaIncomingMessage<*>>
get() = _incomingMessages.receiveAsFlow()

@Volatile
private var isAwalaSetUp = false
private var isAwalaSetUp = AtomicBoolean(false)
private var awalaSetupJob: Job? = null

@Volatile
private var isAwalaInstalledOnDevice: Boolean? = null

@Volatile
private var isReceivingMessages = false

private var firstPartyEndpoint: FirstPartyEndpoint? = null

private var thirdPartyServerEndpoint: ThirdPartyEndpoint? = null

init {
Log.i(TAG, "initializing")
awalaSetupJob = awalaScope.launch {
Awala.setUp(context)
checkIfAwalaAppInstalled()
isAwalaSetUp = true
awalaSetupJob = null
withContext(awalaThreadContext) {
Awala.setUp(context)
checkIfAwalaAppInstalled()
isAwalaSetUp.compareAndSet(false, true)
awalaSetupJob = null
}
}
}

override suspend fun sendMessage(
outgoingMessage: AwalaOutgoingMessage,
recipient: MessageRecipient,
) {
val firstPartyEndpoint = loadFirstPartyEndpoint()
val thirdPartyEndpoint = loadThirdPartyEndpoint(recipient)
Log.i(TAG, "sendMessage() from ${firstPartyEndpoint.nodeId} to ${thirdPartyEndpoint.nodeId}: $outgoingMessage)")
GatewayClient.sendMessage(
OutgoingMessage.build(
type = outgoingMessage.type.value,
content = outgoingMessage.content,
senderEndpoint = firstPartyEndpoint,
recipientEndpoint = thirdPartyEndpoint,
),
)
withContext(awalaThreadContext) {
if (outgoingMessage.type != MessageType.AuthorizeReceivingFromServer && awalaSetupJob != null) {
Log.i(TAG, "Awala wasn't initialized while tried to send a message. Wait for completion... $outgoingMessage")
awalaSetupJob?.join()
Log.i(TAG, "Awala was initialized, proceed futher...")
}
val firstPartyEndpoint = loadFirstPartyEndpoint()
val thirdPartyEndpoint = loadThirdPartyEndpoint(recipient)
Log.i(TAG, "sendMessage() from ${firstPartyEndpoint.nodeId} to ${thirdPartyEndpoint.nodeId}: $outgoingMessage)")
GatewayClient.sendMessage(
OutgoingMessage.build(
type = outgoingMessage.type.value,
content = outgoingMessage.content,
senderEndpoint = firstPartyEndpoint,
recipientEndpoint = thirdPartyEndpoint,
),
)
}
}

override suspend fun isAwalaInstalled(currentScreen: Route): Boolean {
if (!isAwalaSetUp) {
awalaSetupJob?.join()
}
return if (currentScreen == Route.AwalaNotInstalled) {
checkIfAwalaAppInstalled()
} else {
isAwalaInstalledOnDevice ?: checkIfAwalaAppInstalled()
val isInstalled = withContext(awalaThreadContext) {
if (!isAwalaSetUp.get()) {
awalaSetupJob?.join()
}
if (currentScreen == Route.AwalaNotInstalled) {
checkIfAwalaAppInstalled()
} else {
isAwalaInstalledOnDevice ?: checkIfAwalaAppInstalled()
}
}
return isInstalled
}

private suspend fun loadFirstPartyEndpoint(): FirstPartyEndpoint {
val firstPartyEndpointNodeId = awalaRepository.getServerFirstPartyEndpointNodeId()
?: registerFirstPartyEndpointIfNeeded()?.nodeId
?: throw IllegalStateException("You should register first party endpoint first!")
return firstPartyEndpoint ?: loadNonNullFirstPartyEndpoint(firstPartyEndpointNodeId)
return withContext(awalaThreadContext) {
val firstPartyEndpointNodeId = awalaRepository.getServerFirstPartyEndpointNodeId()
?: registerFirstPartyEndpointIfNeeded()?.nodeId
?: throw IllegalStateException("You should register first party endpoint first!")
firstPartyEndpoint ?: loadNonNullFirstPartyEndpoint(firstPartyEndpointNodeId)
}
}

private suspend fun loadThirdPartyEndpoint(recipient: MessageRecipient): ThirdPartyEndpoint {
if (recipient is MessageRecipient.Server) {
thirdPartyServerEndpoint?.let {
return it
}
}
val thirdPartyEndpointNodeId = when (recipient) {
is MessageRecipient.Server -> {
recipient.nodeId
?: awalaRepository.getServerThirdPartyEndpointNodeId()
?: importServerThirdPartyEndpointIfNeeded()?.nodeId
?: throw IllegalStateException("You should register third party endpoint first!")
return withContext(awalaThreadContext) {
if (recipient is MessageRecipient.Server) {
thirdPartyServerEndpoint?.let {
return@withContext it
}
}
is MessageRecipient.User -> {
Log.e(TAG, "Cannot find third-party endpoint ${recipient.nodeId}")
throw IllegalStateException("Cannot find third-party endpoint ${recipient.nodeId}")
val thirdPartyEndpointNodeId = when (recipient) {
is MessageRecipient.Server -> {
recipient.nodeId
?: awalaRepository.getServerThirdPartyEndpointNodeId()
?: importServerThirdPartyEndpointIfNeeded()?.nodeId
?: throw IllegalStateException("You should register third party endpoint first!")
}

is MessageRecipient.User -> {
Log.e(TAG, "Cannot find third-party endpoint ${recipient.nodeId}")
throw IllegalStateException("Cannot find third-party endpoint ${recipient.nodeId}")
}
}
loadNonNullThirdPartyEndpoint(thirdPartyEndpointNodeId)
}
return loadNonNullThirdPartyEndpoint(thirdPartyEndpointNodeId)
}

private suspend fun startReceivingMessages() {
if (isReceivingMessages) {
return
}
awalaScope.launch(messageReceivingThreadContext) {
if (isReceivingMessages) {
return@launch
}
isReceivingMessages = true

awalaScope.launch {
Log.i(TAG, "start receiving messages...")
GatewayClient.receiveMessages().collect { message ->
val type = MessageType.from(message.type)
val parsedMessage = parser.parse(type, message.content).also { Log.i(TAG, "Receive message: ($it)") }
val parsedMessage = parser.parse(type, message.content)
.also { Log.i(TAG, "Receive message: ($it)") }
_incomingMessages.send(parsedMessage)
message.ack()
}
}

isReceivingMessages = true
}

private suspend fun configureAwala() {
registerFirstPartyEndpointIfNeeded()
importServerThirdPartyEndpointIfNeeded()
withContext(awalaThreadContext) {
registerFirstPartyEndpointIfNeeded()
importServerThirdPartyEndpointIfNeeded()
}
}

private suspend fun checkIfAwalaAppInstalled(): Boolean {
try {
GatewayClient.bind()
configureAwala()
} catch (exp: GatewayBindingException) {
this.isAwalaInstalledOnDevice = false
return false
return withContext(awalaThreadContext) {
try {
GatewayClient.bind()
configureAwala()
} catch (exp: GatewayBindingException) {
[email protected] = false
return@withContext false
}
[email protected] = true
true
}
this.isAwalaInstalledOnDevice = true
return true
}

private suspend fun registerFirstPartyEndpointIfNeeded(): FirstPartyEndpoint? {
if (awalaRepository.getServerFirstPartyEndpointNodeId() != null) {
return withContext(awalaThreadContext) {
if (awalaRepository.getServerFirstPartyEndpointNodeId() != null) {
startReceivingMessages()
return@withContext null
}
val firstPartyEndpoint = FirstPartyEndpoint.register()
awalaRepository.saveServerFirstPartyEndpointNodeId(firstPartyEndpoint.nodeId)
Log.i(TAG, "First party endpoint was registred ${firstPartyEndpoint.nodeId}")
startReceivingMessages()
return null
firstPartyEndpoint
}
val firstPartyEndpoint = FirstPartyEndpoint.register()
awalaRepository.saveServerFirstPartyEndpointNodeId(firstPartyEndpoint.nodeId)
startReceivingMessages()
return firstPartyEndpoint
}

private suspend fun importServerThirdPartyEndpointIfNeeded(): ThirdPartyEndpoint? {
if (awalaRepository.getServerThirdPartyEndpointNodeId() != null) {
return null
}
return withContext(awalaThreadContext) {
if (awalaRepository.getServerThirdPartyEndpointNodeId() != null) {
return@withContext null
}

val firstPartyEndpointNodeId = awalaRepository.getServerFirstPartyEndpointNodeId()
?: registerFirstPartyEndpointIfNeeded()?.nodeId
?: throw IllegalStateException("You should register first party endpoint first!")

val thirdPartyEndpoint = importServerThirdPartyEndpoint(
connectionParams = R.raw.server_connection_params,
)

val firstPartyEndpoint = loadNonNullFirstPartyEndpoint(firstPartyEndpointNodeId)

// Create the Parcel Delivery Authorisation (PDA)
val auth = firstPartyEndpoint.authorizeIndefinitely(thirdPartyEndpoint)
sendMessage(
outgoingMessage = AwalaOutgoingMessage(
type = MessageType.AuthorizeReceivingFromServer,
content = auth,
),
recipient = MessageRecipient.Server(
nodeId = thirdPartyEndpoint.nodeId,
),
)
awalaRepository.saveServerThirdPartyEndpointNodeId(thirdPartyEndpoint.nodeId)
return thirdPartyEndpoint
val firstPartyEndpointNodeId = awalaRepository.getServerFirstPartyEndpointNodeId()
?: registerFirstPartyEndpointIfNeeded()?.nodeId
?: throw IllegalStateException("You should register first party endpoint first!")

val thirdPartyEndpoint = importServerThirdPartyEndpoint(
connectionParams = R.raw.server_connection_params,
)
Log.i(TAG, "Server third party endpoint was imported ${thirdPartyEndpoint.nodeId}")

val firstPartyEndpoint = loadNonNullFirstPartyEndpoint(firstPartyEndpointNodeId)

// Create the Parcel Delivery Authorisation (PDA)
val auth = firstPartyEndpoint.authorizeIndefinitely(thirdPartyEndpoint)
sendMessage(
outgoingMessage = AwalaOutgoingMessage(
type = MessageType.AuthorizeReceivingFromServer,
content = auth,
),
recipient = MessageRecipient.Server(
nodeId = thirdPartyEndpoint.nodeId,
),
)
awalaRepository.saveServerThirdPartyEndpointNodeId(thirdPartyEndpoint.nodeId)
thirdPartyEndpoint
}
}

@Throws(InvalidConnectionParams::class)
private suspend fun importServerThirdPartyEndpoint(
@RawRes connectionParams: Int,
): PublicThirdPartyEndpoint {
val endpoint = try {
PublicThirdPartyEndpoint.import(
context.resources.openRawResource(connectionParams).use {
it.readBytes()
},
)
} catch (e: InvalidThirdPartyEndpoint) {
throw InvalidConnectionParams(e)
return withContext(awalaThreadContext) {
val endpoint = try {
PublicThirdPartyEndpoint.import(
context.resources.openRawResource(connectionParams).use {
it.readBytes()
},
)
} catch (e: InvalidThirdPartyEndpoint) {
throw InvalidConnectionParams(e)
}
endpoint
}
return endpoint
}

private companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import tech.relaycorp.letro.onboarding.registration.dto.RegistrationResponseInco
import javax.inject.Inject

interface RegistrationRepository {
suspend fun createNewAccount(id: String)
fun createNewAccount(id: String)
}

class RegistrationRepositoryImpl @Inject constructor(
Expand All @@ -33,15 +33,17 @@ class RegistrationRepositoryImpl @Inject constructor(
}
}

override suspend fun createNewAccount(id: String) {
accountRepository.createAccount(id)
awalaManager
.sendMessage(
outgoingMessage = AwalaOutgoingMessage(
type = MessageType.AccountCreationRequest,
content = id.toByteArray(),
),
recipient = MessageRecipient.Server(),
)
override fun createNewAccount(id: String) {
scope.launch {
accountRepository.createAccount(id)
awalaManager
.sendMessage(
outgoingMessage = AwalaOutgoingMessage(
type = MessageType.AccountCreationRequest,
content = id.toByteArray(),
),
recipient = MessageRecipient.Server(),
)
}
}
}
Loading