Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DA5-22: Distribution and validation of draft transactions #5295

Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
86aca11
initial commit - adding the send and receive functions
julia-filipczak Dec 16, 2023
8031cf2
Merge branch 'release/os/5.2' into juliaf/DA5-22-Distribution-and-Val…
julia-filipczak Dec 16, 2023
3788bab
updated version
julia-filipczak Dec 18, 2023
97c0b73
added property to UtxoLedgerTransactionFactoryImpl
julia-filipczak Dec 18, 2023
af43b36
Merge branch 'release/os/5.2' into juliaf/DA5-22-Distribution-and-Val…
julia-filipczak Dec 18, 2023
ee6bc2c
adding unit tests
julia-filipczak Dec 18, 2023
620a86c
updating unit tests
julia-filipczak Dec 18, 2023
7510939
renaming
julia-filipczak Dec 19, 2023
6653c8d
addressing pr comments
julia-filipczak Dec 20, 2023
347292c
Merge branch 'release/os/5.2' into juliaf/DA5-22-Distribution-and-Val…
julia-filipczak Dec 20, 2023
39e4044
updating version
julia-filipczak Dec 20, 2023
4929516
Merge branch 'release/os/5.2' into juliaf/DA5-22-Distribution-and-Val…
julia-filipczak Dec 20, 2023
b0501dc
resolving detekt issues
julia-filipczak Dec 20, 2023
52af62d
Merge remote-tracking branch 'origin/juliaf/DA5-22-Distribution-and-V…
julia-filipczak Dec 20, 2023
dbdacde
addressing pr comments
julia-filipczak Dec 21, 2023
13e2687
addressing pr comments
julia-filipczak Dec 21, 2023
78f2e0c
Merge branch 'release/os/5.2' into juliaf/DA5-22-Distribution-and-Val…
julia-filipczak Dec 21, 2023
8e95633
updating version
julia-filipczak Dec 21, 2023
b12dc25
Merge branch 'release/os/5.2' into juliaf/DA5-22-Distribution-and-Val…
julia-filipczak Dec 21, 2023
67168ec
resolving detekt issue
julia-filipczak Dec 21, 2023
07f6cfe
Merge branch 'release/os/5.2' into juliaf/DA5-22-Distribution-and-Val…
julia-filipczak Dec 28, 2023
120f95b
updating version
julia-filipczak Dec 28, 2023
580eaaa
updating version
julia-filipczak Jan 8, 2024
0e2f70b
Merge branch 'release/os/5.2' into juliaf/DA5-22-Distribution-and-Val…
julia-filipczak Jan 8, 2024
2ac3392
Merge branch 'release/os/5.2' into juliaf/DA5-22-Distribution-and-Val…
julia-filipczak Jan 8, 2024
5887323
updating version
julia-filipczak Jan 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import net.corda.ledger.utxo.flow.impl.flows.finality.UtxoFinalityFlow
import net.corda.ledger.utxo.flow.impl.flows.finality.UtxoReceiveFinalityFlow
import net.corda.ledger.utxo.flow.impl.flows.transactionbuilder.ReceiveAndUpdateTransactionBuilderFlow
import net.corda.ledger.utxo.flow.impl.flows.transactionbuilder.SendTransactionBuilderDiffFlow
import net.corda.ledger.utxo.flow.impl.flows.transactiontransmission.ReceiveLedgerTransactionFlow
import net.corda.ledger.utxo.flow.impl.flows.transactiontransmission.ReceiveTransactionFlow
import net.corda.ledger.utxo.flow.impl.flows.transactiontransmission.SendAsLedgerTransactionFlow
import net.corda.ledger.utxo.flow.impl.flows.transactiontransmission.SendTransactionFlow
import net.corda.ledger.utxo.flow.impl.persistence.UtxoLedgerPersistenceService
import net.corda.ledger.utxo.flow.impl.persistence.UtxoLedgerStateQueryService
Expand Down Expand Up @@ -279,6 +281,16 @@ class UtxoLedgerServiceImpl @Activate constructor(
flowEngine.subFlow(SendTransactionFlow(signedTransaction, sessions))
}

@Suspendable
override fun receiveLedgerTransaction(session: FlowSession): UtxoLedgerTransaction {
return flowEngine.subFlow(ReceiveLedgerTransactionFlow(session))
}

@Suspendable
override fun sendAsLedgerTransaction(signedTransaction: UtxoSignedTransaction, sessions: List<FlowSession>) {
flowEngine.subFlow(SendAsLedgerTransactionFlow(signedTransaction, sessions))
}

@Suspendable
override fun sendUpdatedTransactionBuilder(
transactionBuilder: UtxoTransactionBuilder,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package net.corda.ledger.utxo.flow.impl.flows.backchain

import net.corda.v5.crypto.SecureHash
import net.corda.v5.ledger.utxo.transaction.UtxoLedgerTransaction
import net.corda.v5.ledger.utxo.transaction.UtxoSignedTransaction

val UtxoSignedTransaction.dependencies: Set<SecureHash>
get() = this
.let { it.inputStateRefs.asSequence() + it.referenceStateRefs.asSequence() }
.map { it.transactionId }
.toSet()

val UtxoLedgerTransaction.dependencies: Set<SecureHash>
get() = this
.let { it.inputStateRefs.asSequence() + it.referenceStateRefs.asSequence() }
.map { it.transactionId }
.toSet()
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package net.corda.ledger.utxo.flow.impl.flows.transactiontransmission

import net.corda.ledger.common.data.transaction.WireTransaction
import net.corda.ledger.common.flow.flows.Payload
import net.corda.ledger.utxo.flow.impl.flows.backchain.InvalidBackchainException
import net.corda.ledger.utxo.flow.impl.flows.backchain.TransactionBackchainResolutionFlow
import net.corda.ledger.utxo.flow.impl.flows.backchain.dependencies
import net.corda.ledger.utxo.flow.impl.transaction.factory.UtxoLedgerTransactionFactory
import net.corda.sandbox.CordaSystemFlow
import net.corda.utilities.trace
import net.corda.v5.application.crypto.DigitalSignatureAndMetadata
import net.corda.v5.application.flows.CordaInject
import net.corda.v5.application.flows.FlowEngine
import net.corda.v5.application.flows.SubFlow
import net.corda.v5.application.messaging.FlowSession
import net.corda.v5.base.annotations.Suspendable
import net.corda.v5.ledger.utxo.transaction.UtxoLedgerTransaction
import org.slf4j.Logger
import org.slf4j.LoggerFactory

@CordaSystemFlow
class ReceiveLedgerTransactionFlow(
private val session: FlowSession
) : SubFlow<UtxoLedgerTransaction> {

private companion object {
private val log: Logger = LoggerFactory.getLogger(this::class.java)
}

@CordaInject
lateinit var flowEngine: FlowEngine

@CordaInject
lateinit var utxoLedgerTransactionFactory: UtxoLedgerTransactionFactory

@Suspendable
override fun call(): UtxoLedgerTransaction {
val wireTransaction = session.receive(WireTransaction::class.java)
val ledgerTransaction = utxoLedgerTransactionFactory.create(wireTransaction)

val transactionDependencies = ledgerTransaction.dependencies
if (transactionDependencies.isNotEmpty()) {
try {
flowEngine.subFlow(TransactionBackchainResolutionFlow(transactionDependencies, session))
} catch (e: InvalidBackchainException) {
val message = "Invalid transaction: ${wireTransaction.id} found during back-chain resolution."
log.warn(message, e)
session.send(Payload.Failure<List<DigitalSignatureAndMetadata>>(message))
throw e
}
} else {
log.trace {
"Transaction with id ${wireTransaction.id} has no dependencies so backchain resolution will not be performed."
}
}

session.send(Payload.Success("Successfully received transaction."))

return ledgerTransaction
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package net.corda.ledger.utxo.flow.impl.flows.transactiontransmission

import net.corda.ledger.common.flow.flows.Payload
import net.corda.ledger.utxo.flow.impl.flows.backchain.TransactionBackchainSenderFlow
import net.corda.ledger.utxo.flow.impl.flows.backchain.dependencies
import net.corda.ledger.utxo.flow.impl.transaction.UtxoSignedTransactionInternal
import net.corda.sandbox.CordaSystemFlow
import net.corda.utilities.trace
import net.corda.v5.application.flows.CordaInject
import net.corda.v5.application.flows.FlowEngine
import net.corda.v5.application.flows.SubFlow
import net.corda.v5.application.messaging.FlowMessaging
import net.corda.v5.application.messaging.FlowSession
import net.corda.v5.base.annotations.Suspendable
import net.corda.v5.base.exceptions.CordaRuntimeException
import net.corda.v5.ledger.utxo.transaction.UtxoSignedTransaction
import org.slf4j.LoggerFactory

@CordaSystemFlow
class SendAsLedgerTransactionFlow(
private val transaction: UtxoSignedTransaction,
private val sessions: List<FlowSession>
) : SubFlow<Unit> {

private companion object {
val log = LoggerFactory.getLogger(this::class.java)
}

@CordaInject
lateinit var flowMessaging: FlowMessaging

@CordaInject
lateinit var flowEngine: FlowEngine

@Suspendable
override fun call() {
flowMessaging.sendAll((transaction as UtxoSignedTransactionInternal).wireTransaction, sessions.toSet())

sessions.forEach {
if (transaction.dependencies.isNotEmpty()) {
flowEngine.subFlow(TransactionBackchainSenderFlow(transaction.id, it))
} else {
log.trace {
"Transaction with id ${transaction.id} has no dependencies so backchain resolution will not be performed."
}
}

val sendingTransactionResult = it.receive(Payload::class.java)
if (sendingTransactionResult is Payload.Failure) {
throw CordaRuntimeException(
sendingTransactionResult.message
)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import net.corda.ledger.utxo.data.transaction.WrappedUtxoWireTransaction
import net.corda.ledger.utxo.flow.impl.persistence.UtxoLedgerGroupParametersPersistenceService
import net.corda.ledger.utxo.flow.impl.persistence.UtxoLedgerStateQueryService
import net.corda.ledger.utxo.flow.impl.transaction.factory.UtxoLedgerTransactionFactory
import net.corda.sandbox.type.SandboxConstants.CORDA_SYSTEM_SERVICE
import net.corda.sandbox.type.UsedByFlow
import net.corda.v5.application.serialization.SerializationService
import net.corda.v5.base.annotations.Suspendable
Expand All @@ -24,7 +25,11 @@ import org.osgi.service.component.annotations.Component
import org.osgi.service.component.annotations.Reference
import org.osgi.service.component.annotations.ServiceScope

@Component(service = [UtxoLedgerTransactionFactory::class, UsedByFlow::class], scope = ServiceScope.PROTOTYPE)
@Component(
service = [UtxoLedgerTransactionFactory::class, UsedByFlow::class],
property = [CORDA_SYSTEM_SERVICE],
scope = ServiceScope.PROTOTYPE
)
class UtxoLedgerTransactionFactoryImpl @Activate constructor(
@Reference(service = SerializationService::class)
private val serializationService: SerializationService,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package net.corda.ledger.utxo.flow.impl.flows.transactiontransmission

import net.corda.crypto.core.SecureHashImpl
import net.corda.ledger.common.data.transaction.WireTransaction
import net.corda.ledger.common.flow.flows.Payload
import net.corda.ledger.utxo.data.transaction.UtxoLedgerTransactionInternal
import net.corda.ledger.utxo.flow.impl.flows.backchain.TransactionBackchainResolutionFlow
import net.corda.ledger.utxo.flow.impl.flows.backchain.dependencies
import net.corda.ledger.utxo.flow.impl.transaction.factory.UtxoLedgerTransactionFactory
import net.corda.v5.application.flows.FlowEngine
import net.corda.v5.application.messaging.FlowSession
import net.corda.v5.ledger.utxo.ContractState
import net.corda.v5.ledger.utxo.StateRef
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.mockito.kotlin.any
import org.mockito.kotlin.mock
import org.mockito.kotlin.spy
import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever
import java.security.PublicKey

class ReceiveLedgerTransactionFlowTest {
private companion object {
val TX_ID_1 = SecureHashImpl("SHA", byteArrayOf(2, 2, 2, 2))
val TX_ID_2 = SecureHashImpl("SHA", byteArrayOf(3, 3, 3, 3))

val TX_2_INPUT_DEPENDENCY_STATE_REF_1 = StateRef(TX_ID_2, 0)
val TX_3_INPUT_DEPENDENCY_STATE_REF_1 = StateRef(TX_ID_2, 0)
val TX_3_INPUT_DEPENDENCY_STATE_REF_2 = StateRef(TX_ID_2, 1)

val TX_3_INPUT_REFERENCE_DEPENDENCY_STATE_REF_1 = StateRef(TX_ID_2, 0)
val TX_3_INPUT_REFERENCE_DEPENDENCY_STATE_REF_2 = StateRef(TX_ID_2, 1)
}

private val flowEngine = mock<FlowEngine>()
private var utxoLedgerTransactionFactory = mock<UtxoLedgerTransactionFactory>()

private val sessionAlice = mock<FlowSession>()

private val transaction = mock<WireTransaction>()
private val ledgerTransaction = mock<UtxoLedgerTransactionInternal>()

private val publicKeyAlice = mock<PublicKey>().also { whenever(it.encoded).thenReturn(byteArrayOf(0x01)) }
private val publicKeyBob = mock<PublicKey>().also { whenever(it.encoded).thenReturn(byteArrayOf(0x02)) }
julia-filipczak marked this conversation as resolved.
Show resolved Hide resolved

@BeforeEach
fun beforeEach() {
whenever(transaction.id).thenReturn(TX_ID_1)
whenever(flowEngine.subFlow(any<TransactionBackchainResolutionFlow>())).thenReturn(Unit)
whenever(utxoLedgerTransactionFactory.create(transaction)).thenReturn(ledgerTransaction)
}

@Test
fun `receiving transaction with dependencies should call backchain resolution flow`() {
julia-filipczak marked this conversation as resolved.
Show resolved Hide resolved
whenever(sessionAlice.receive(WireTransaction::class.java)).thenReturn(transaction)
whenever(ledgerTransaction.inputStateRefs).thenReturn(
listOf(
TX_2_INPUT_DEPENDENCY_STATE_REF_1,
TX_3_INPUT_DEPENDENCY_STATE_REF_1,
TX_3_INPUT_DEPENDENCY_STATE_REF_2
)
)
whenever(ledgerTransaction.referenceStateRefs).thenReturn(
listOf(
TX_3_INPUT_REFERENCE_DEPENDENCY_STATE_REF_1,
TX_3_INPUT_REFERENCE_DEPENDENCY_STATE_REF_2
)
)

callReceiveTransactionFlow(sessionAlice)

verify(flowEngine).subFlow(TransactionBackchainResolutionFlow(ledgerTransaction.dependencies, sessionAlice))
verify(sessionAlice).send(Payload.Success("Successfully received transaction."))
}

private fun callReceiveTransactionFlow(session: FlowSession) {
val flow = spy(ReceiveLedgerTransactionFlow(session))
flow.utxoLedgerTransactionFactory = utxoLedgerTransactionFactory
flow.flowEngine = flowEngine
flow.call()
}

class TestState(private val participants: List<PublicKey>) : ContractState {
julia-filipczak marked this conversation as resolved.
Show resolved Hide resolved

override fun getParticipants(): List<PublicKey> {
return participants
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package net.corda.ledger.utxo.flow.impl.flows.transactiontransmission

import net.corda.crypto.core.SecureHashImpl
import net.corda.ledger.common.flow.flows.Payload
import net.corda.ledger.utxo.flow.impl.flows.backchain.TransactionBackchainSenderFlow
import net.corda.ledger.utxo.flow.impl.transaction.UtxoSignedTransactionInternal
import net.corda.v5.application.crypto.DigitalSignatureAndMetadata
import net.corda.v5.application.flows.FlowEngine
import net.corda.v5.application.messaging.FlowMessaging
import net.corda.v5.application.messaging.FlowSession
import net.corda.v5.base.exceptions.CordaRuntimeException
import net.corda.v5.ledger.utxo.transaction.UtxoSignedTransaction
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.mockito.kotlin.any
import org.mockito.kotlin.mock
import org.mockito.kotlin.never
import org.mockito.kotlin.spy
import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever

class SendAsLedgerTransactionFlowTest {

private companion object {
val TX_ID = SecureHashImpl("SHA", byteArrayOf(1, 1, 1, 1))
}

private val flowEngine = mock<FlowEngine>()
private val flowMessaging = mock<FlowMessaging>()

private val sessionAlice = mock<FlowSession>()
private val sessionBob = mock<FlowSession>()
private val sessions = listOf(sessionAlice, sessionBob)

private val transaction = mock<UtxoSignedTransactionInternal>()
private val successMessage = "Successfully received transaction."

@BeforeEach
fun beforeEach() {
whenever(transaction.id).thenReturn(TX_ID)
whenever(flowEngine.subFlow(any<TransactionBackchainSenderFlow>())).thenReturn(Unit)
}

@Test
fun `does nothing when receiving payload successfully`() {
whenever(sessionAlice.receive(Payload::class.java)).thenReturn(
Payload.Success(successMessage)
)
whenever(sessionBob.receive(Payload::class.java)).thenReturn(
Payload.Success(successMessage)
)

callSendTransactionFlow(transaction, sessions)

verify(flowMessaging).sendAll(transaction.wireTransaction, sessions.toSet())
verify(sessionAlice).receive(Payload::class.java)
}

@Test
fun `sending transaction with dependencies should call backchain flow`() {
whenever(transaction.inputStateRefs).thenReturn(listOf(mock()))

whenever(sessionAlice.receive(Payload::class.java)).thenReturn(
Payload.Success(successMessage)
)
whenever(sessionBob.receive(Payload::class.java)).thenReturn(
Payload.Success(successMessage)
)

callSendTransactionFlow(transaction, sessions)

verify(flowEngine).subFlow(TransactionBackchainSenderFlow(TX_ID, sessionAlice))
}

@Test
fun `sending transaction with no dependencies should not call backchain flow`() {
whenever(sessionAlice.receive(Payload::class.java)).thenReturn(
Payload.Success(successMessage)
)
whenever(sessionBob.receive(Payload::class.java)).thenReturn(
Payload.Success(successMessage)
)

callSendTransactionFlow(transaction, sessions)

verify(flowEngine, never()).subFlow(TransactionBackchainSenderFlow(TX_ID, sessionAlice))
vlajos marked this conversation as resolved.
Show resolved Hide resolved
}

@Test
fun `sending unverified transaction should throw exception`() {
julia-filipczak marked this conversation as resolved.
Show resolved Hide resolved
whenever(sessionAlice.receive(Payload::class.java)).thenReturn(
Payload.Failure<List<DigitalSignatureAndMetadata>>("fail")
)

assertThatThrownBy { callSendTransactionFlow(transaction, sessions) }
.isInstanceOf(CordaRuntimeException::class.java)
.hasMessageContaining("fail")
}

private fun callSendTransactionFlow(signedTransaction: UtxoSignedTransaction, sessions: List<FlowSession>) {
val flow = spy(SendAsLedgerTransactionFlow(signedTransaction, sessions))

flow.flowEngine = flowEngine
flow.flowMessaging = flowMessaging
flow.call()
}
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ commonsLangVersion = 3.12.0
commonsTextVersion = 1.10.0
# Corda API libs revision (change in 4th digit indicates a breaking change)
# Change to 5.2.0.xx-SNAPSHOT to pick up maven local published copy
cordaApiVersion=5.2.0.21-beta+
cordaApiVersion=5.2.0.22-alpha-1703092935848

disruptorVersion=3.4.4
felixConfigAdminVersion=1.9.26
Expand Down