Skip to content

Commit

Permalink
CORE-17502: Add token selection HTTP RPC (#4910)
Browse files Browse the repository at this point in the history
CORE-17502 - Migrate the flow/token selection RPC from Kafka to the new HTTP RPC Model
CORE-17503 - Migrate the token selection state model from the Kafka state and event pattern to the new State Manage model
  • Loading branch information
owenstanford authored Oct 25, 2023
1 parent 4867017 commit 55af914
Show file tree
Hide file tree
Showing 49 changed files with 1,707 additions and 139 deletions.
2 changes: 1 addition & 1 deletion .ci/e2eTests/JenkinsfileCombinedWorker
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pipeline {
REST_TLS_PATH = "${env.WORKSPACE}/applications/workers/release/combined-worker/tls/rest/rest_worker.pfx"
VM_PARAMETERS = "-Dco.paralleluniverse.fibers.verifyInstrumentation=true"
LOG4J_PARAMETERS = "-Dlog4j.configurationFile=log4j2-console.xml"
PROGRAM_PARAMETERS = "--instance-id=0 -mbus.busType=DATABASE -spassphrase=password -ssalt=salt -ddatabase.user=u${postgresDb} -ddatabase.pass=password -ddatabase.jdbc.url=jdbc:postgresql://${postgresHost}:${postgresPort}/${postgresDb} -ddatabase.jdbc.directory=${JDBC_PATH} -rtls.keystore.path=${REST_TLS_PATH} -rtls.keystore.password=mySecretPassword --serviceEndpoint=endpoints.crypto=localhost:7004 --serviceEndpoint=endpoints.verification=localhost:7004 --serviceEndpoint=endpoints.uniqueness=localhost:7004 --serviceEndpoint=endpoints.persistence=localhost:7004"
PROGRAM_PARAMETERS = "--instance-id=0 -mbus.busType=DATABASE -spassphrase=password -ssalt=salt -ddatabase.user=u${postgresDb} -ddatabase.pass=password -ddatabase.jdbc.url=jdbc:postgresql://${postgresHost}:${postgresPort}/${postgresDb} -ddatabase.jdbc.directory=${JDBC_PATH} -rtls.keystore.path=${REST_TLS_PATH} -rtls.keystore.password=mySecretPassword --serviceEndpoint=endpoints.crypto=localhost:7004 --serviceEndpoint=endpoints.verification=localhost:7004 --serviceEndpoint=endpoints.uniqueness=localhost:7004 --serviceEndpoint=endpoints.persistence=localhost:7004 --serviceEndpoint=endpoints.tokenSelection=localhost:7004"
WORKING_DIRECTORY = "${env.WORKSPACE}"
}
steps {
Expand Down
2 changes: 1 addition & 1 deletion .run/Combined Worker Local.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<configuration default="false" name="Combined Worker Local" type="JarApplication">
<option name="JAR_PATH" value="$PROJECT_DIR$/applications/workers/release/combined-worker/build/bin/corda-combined-worker-5.1.0.0-SNAPSHOT.jar" />
<option name="VM_PARAMETERS" value="-Dco.paralleluniverse.fibers.verifyInstrumentation=true" />
<option name="PROGRAM_PARAMETERS" value="--instance-id=0 -mbus.busType=DATABASE -spassphrase=password -ssalt=salt -spassphrase=password -ssalt=salt -ddatabase.user=user -ddatabase.pass=password -ddatabase.jdbc.url=jdbc:postgresql://localhost:5432/cordacluster -ddatabase.jdbc.directory=$ProjectFileDir$/applications/workers/release/combined-worker/drivers -rtls.crt.path=$ProjectFileDir$/applications/workers/release/combined-worker/tls/rest/server.crt -rtls.key.path=$ProjectFileDir$/applications/workers/release/combined-worker/tls/rest/server.key -rtls.ca.crt.path=$ProjectFileDir$/applications/workers/release/combined-worker/tls/rest/ca-chain-bundle.crt --serviceEndpoint=endpoints.crypto=localhost:7004 --serviceEndpoint=endpoints.verification=localhost:7004 --serviceEndpoint=endpoints.uniqueness=localhost:7004 --serviceEndpoint=endpoints.persistence=localhost:7004" />
<option name="PROGRAM_PARAMETERS" value="--instance-id=0 -mbus.busType=DATABASE -spassphrase=password -ssalt=salt -spassphrase=password -ssalt=salt -ddatabase.user=user -ddatabase.pass=password -ddatabase.jdbc.url=jdbc:postgresql://localhost:5432/cordacluster -ddatabase.jdbc.directory=$ProjectFileDir$/applications/workers/release/combined-worker/drivers -rtls.crt.path=$ProjectFileDir$/applications/workers/release/combined-worker/tls/rest/server.crt -rtls.key.path=$ProjectFileDir$/applications/workers/release/combined-worker/tls/rest/server.key -rtls.ca.crt.path=$ProjectFileDir$/applications/workers/release/combined-worker/tls/rest/ca-chain-bundle.crt --serviceEndpoint=endpoints.crypto=localhost:7004 --serviceEndpoint=endpoints.verification=localhost:7004 --serviceEndpoint=endpoints.uniqueness=localhost:7004 --serviceEndpoint=endpoints.persistence=localhost:7004 --serviceEndpoint=endpoints.tokenSelection=localhost:7004" />
<option name="WORKING_DIRECTORY" value="$ProjectFileDir$" />
<option name="ALTERNATIVE_JRE_PATH" />
<method v="2">
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package net.corda.applications.workers.smoketest.token.selection

import net.corda.applications.workers.smoketest.utils.PLATFORM_VERSION
import net.corda.data.KeyValuePairList
import net.corda.data.flow.event.FlowEvent
import net.corda.data.flow.event.external.ExternalEventContext
import net.corda.data.flow.event.external.ExternalEventResponse
import net.corda.data.ledger.utxo.token.selection.data.TokenAmount
import net.corda.data.ledger.utxo.token.selection.data.TokenClaimQuery
import net.corda.data.ledger.utxo.token.selection.data.TokenClaimQueryResult
import net.corda.data.ledger.utxo.token.selection.data.TokenForceClaimRelease
import net.corda.data.ledger.utxo.token.selection.event.TokenPoolCacheEvent
import net.corda.data.ledger.utxo.token.selection.key.TokenPoolCacheKey
import net.corda.messagebus.kafka.serialization.CordaAvroSerializationFactoryImpl
import net.corda.schema.registry.impl.AvroSchemaRegistryImpl
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import java.math.BigDecimal
import java.net.URI
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.nio.ByteBuffer
import java.time.Duration
import java.time.Instant
import java.util.Timer
import java.util.UUID
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import kotlin.concurrent.scheduleAtFixedRate

/*
* This will be removed in future PRs, for now it's useful for testing different optimizations.
*/
class SimpleHttpRPCPerformanceTest {

private companion object {
const val HOLDING_ID = "A332E0C2F697"
const val TOKEN_ISSUER_HASH = "SHA-256:EC4F2DBB3B140095550C9AFBBB69B5D6FD9E814B9DA82FAD0B34E9FCBE56F1CB"
const val TOKEN_SYMBOL = "USD"
const val TOKEN_TYPE = "TestUtxoState"
const val TOKEN_NOTARY = "O=MyNotaryService, L=London, C=GB"
val TOKEN_AMOUNT = BigDecimal.TEN
}

private val httpClient: HttpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(30))
.build()
private val serializationFactory = CordaAvroSerializationFactoryImpl(
AvroSchemaRegistryImpl()
)

private val avroSerializer = serializationFactory.createAvroSerializer<TokenPoolCacheEvent> { }
private val avroFlowEventDeserializer =
serializationFactory.createAvroDeserializer({}, FlowEvent::class.java)
private val avroTokenClaimQueryResultDeserializer =
serializationFactory.createAvroDeserializer({}, TokenClaimQueryResult::class.java)

@Test
@org.junit.jupiter.api.Disabled
fun `60 second concurrent request test`() {
val concurrentCount = 30
val start = Instant.now().toEpochMilli()
val concurrentClaims = ConcurrentHashMap<String, CompletableFuture<String>>()
val concurrentReleases = ConcurrentHashMap<String, CompletableFuture<String>>()
var completedClaims = 0
var completedReleases = 0

var running = true

Timer("monitor").scheduleAtFixedRate(0, 500) {
val diff = Instant.now().toEpochMilli() - start

val claimRate = if (diff > 0) {
(completedClaims * 1000) / diff
} else {
0
}

val releaseRate = if (diff > 0) {
(completedClaims * 1000) / diff
} else {
0
}

println("${diff/1000} ${concurrentClaims.size} ${concurrentReleases.size} $completedClaims $completedReleases $claimRate $releaseRate ${claimRate+releaseRate}")
}

while (running) {
if (concurrentClaims.size < concurrentCount) {
val claimId = UUID.randomUUID().toString()
val claim = getClaim(claimId)
concurrentClaims[claimId] = claim
claim.thenApply { completedClaimId ->
completedClaims++
concurrentClaims.remove(completedClaimId)
val release = forceClaimRelease(completedClaimId)
concurrentReleases[completedClaimId] = release
release.thenApply { releasedClaimId ->
concurrentReleases.remove(releasedClaimId)
completedReleases++
}
}
}
running = Instant.now().toEpochMilli() < start + 60000
}

while (concurrentReleases.size > 0) {
Thread.sleep(10)
}
}

private fun forceClaimRelease(claimId: String): CompletableFuture<String> {
val url = "${System.getProperty("tokenSelectionWorkerUrl")}api/$PLATFORM_VERSION/token-selection"

val serializedPayload = avroSerializer.serialize(createReleasePayload(claimId))

val request = HttpRequest.newBuilder()
.uri(URI.create(url))
.headers("Content-Type", "application/octet-stream")
.POST(HttpRequest.BodyPublishers.ofByteArray(serializedPayload))
.build()

return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray()).thenApply { response ->

assertThat(response.statusCode()).isEqualTo(200)
.withFailMessage("status code on response: ${response.statusCode()} url: $url")

val responseBody: ByteArray = response.body()
assertThat(responseBody.size).isEqualTo(0)
claimId
}
}

private fun getClaim(claimId: String): CompletableFuture<String> {
val url = "${System.getProperty("tokenSelectionWorkerUrl")}api/$PLATFORM_VERSION/token-selection"

val serializedPayload = avroSerializer.serialize(createClaimPayload(claimId))

val request = HttpRequest.newBuilder()
.uri(URI.create(url))
.headers("Content-Type", "application/octet-stream")
.POST(HttpRequest.BodyPublishers.ofByteArray(serializedPayload))
.build()

return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray()).thenApply { response ->

assertThat(response.statusCode()).isEqualTo(200)
.withFailMessage("status code on response: ${response.statusCode()} url: $url")

val responseBody: ByteArray = response.body()
val responseFlowEvent = avroFlowEventDeserializer.deserialize(responseBody)
val responseExternalEvent = responseFlowEvent?.payload as ExternalEventResponse
val claimResponse =
avroTokenClaimQueryResultDeserializer.deserialize(responseExternalEvent.payload.array())!!
assertThat(claimResponse.claimedTokens.size).isGreaterThan(0)

claimResponse.claimId
}
}

private fun createClaimPayload(claimId: String): TokenPoolCacheEvent {
val poolKey = TokenPoolCacheKey.newBuilder()
.setShortHolderId(HOLDING_ID)
.setTokenType(TOKEN_TYPE)
.setIssuerHash(TOKEN_ISSUER_HASH)
.setNotaryX500Name(TOKEN_NOTARY)
.setSymbol(TOKEN_SYMBOL)
.build()

val externalEventContext = ExternalEventContext.newBuilder()
.setFlowId("f1")
.setRequestId(claimId)
.setContextProperties(KeyValuePairList(listOf()))
.build()

val payload = TokenClaimQuery.newBuilder()
.setRequestContext(externalEventContext)
.setPoolKey(poolKey)
.setTargetAmount(TOKEN_AMOUNT.toTokenAmount())
.build()

return TokenPoolCacheEvent.newBuilder()
.setPoolKey(poolKey)
.setPayload(payload)
.build()
}

private fun createReleasePayload(claimId: String): TokenPoolCacheEvent {
val poolKey = TokenPoolCacheKey.newBuilder()
.setShortHolderId(HOLDING_ID)
.setTokenType(TOKEN_TYPE)
.setIssuerHash(TOKEN_ISSUER_HASH)
.setNotaryX500Name(TOKEN_NOTARY)
.setSymbol(TOKEN_SYMBOL)
.build()

val payload = TokenForceClaimRelease.newBuilder()
.setPoolKey(poolKey)
.setClaimId(claimId)
.build()

return TokenPoolCacheEvent.newBuilder()
.setPoolKey(poolKey)
.setPayload(payload)
.build()
}

private fun BigDecimal.toTokenAmount() =
TokenAmount.newBuilder()
.setScale(this.scale())
.setUnscaledValue(ByteBuffer.wrap(this.unscaledValue().toByteArray()))
.build()
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import net.corda.libs.statemanager.api.StateManager
import net.corda.messaging.api.constants.WorkerRPCPaths.CRYPTO_PATH
import net.corda.messaging.api.constants.WorkerRPCPaths.LEDGER_PATH
import net.corda.messaging.api.constants.WorkerRPCPaths.PERSISTENCE_PATH
import net.corda.messaging.api.constants.WorkerRPCPaths.TOKEN_SELECTION_PATH
import net.corda.messaging.api.constants.WorkerRPCPaths.UNIQUENESS_PATH
import net.corda.messaging.api.constants.WorkerRPCPaths.VERIFICATION_PATH
import net.corda.messaging.api.mediator.MediatorMessage
Expand All @@ -33,9 +34,9 @@ import net.corda.messaging.api.processor.StateAndEventProcessor
import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC
import net.corda.schema.Schemas.Flow.FLOW_STATUS_TOPIC
import net.corda.schema.Schemas.Services.TOKEN_CACHE_EVENT
import net.corda.schema.configuration.BootConfig.CRYPTO_WORKER_REST_ENDPOINT
import net.corda.schema.configuration.BootConfig.PERSISTENCE_WORKER_REST_ENDPOINT
import net.corda.schema.configuration.BootConfig.TOKEN_SELECTION_WORKER_REST_ENDPOINT
import net.corda.schema.configuration.BootConfig.UNIQUENESS_WORKER_REST_ENDPOINT
import net.corda.schema.configuration.BootConfig.VERIFICATION_WORKER_REST_ENDPOINT
import net.corda.schema.configuration.ConfigKeys
Expand Down Expand Up @@ -125,7 +126,7 @@ class FlowEventMediatorFactoryImpl @Activate constructor(
is FlowOpsRequest -> routeTo(rpcClient, rpcEndpoint(CRYPTO_WORKER_REST_ENDPOINT, CRYPTO_PATH))
is FlowStatus -> routeTo(messageBusClient, FLOW_STATUS_TOPIC)
is LedgerPersistenceRequest -> routeTo(rpcClient, rpcEndpoint(PERSISTENCE_WORKER_REST_ENDPOINT, LEDGER_PATH))
is TokenPoolCacheEvent -> routeTo(messageBusClient, TOKEN_CACHE_EVENT)
is TokenPoolCacheEvent -> routeTo(rpcClient, rpcEndpoint(TOKEN_SELECTION_WORKER_REST_ENDPOINT, TOKEN_SELECTION_PATH))
is TransactionVerificationRequest -> routeTo(rpcClient, rpcEndpoint(VERIFICATION_WORKER_REST_ENDPOINT, VERIFICATION_PATH))
is UniquenessCheckRequestAvro -> routeTo(rpcClient, rpcEndpoint(UNIQUENESS_WORKER_REST_ENDPOINT, UNIQUENESS_PATH))
is FlowEvent -> routeTo(messageBusClient, FLOW_EVENT_TOPIC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import net.corda.lifecycle.createCoordinator
import net.corda.messaging.api.mediator.MultiSourceEventMediator
import net.corda.schema.configuration.BootConfig.CRYPTO_WORKER_REST_ENDPOINT
import net.corda.schema.configuration.BootConfig.PERSISTENCE_WORKER_REST_ENDPOINT
import net.corda.schema.configuration.BootConfig.TOKEN_SELECTION_WORKER_REST_ENDPOINT
import net.corda.schema.configuration.BootConfig.UNIQUENESS_WORKER_REST_ENDPOINT
import net.corda.schema.configuration.BootConfig.VERIFICATION_WORKER_REST_ENDPOINT
import net.corda.schema.configuration.ConfigKeys
Expand Down Expand Up @@ -148,7 +149,8 @@ class FlowExecutorImpl constructor(
CRYPTO_WORKER_REST_ENDPOINT,
PERSISTENCE_WORKER_REST_ENDPOINT,
UNIQUENESS_WORKER_REST_ENDPOINT,
VERIFICATION_WORKER_REST_ENDPOINT
VERIFICATION_WORKER_REST_ENDPOINT,
TOKEN_SELECTION_WORKER_REST_ENDPOINT
).fold(this) { msgConfig: SmartConfig, endpoint: String ->
msgConfig.withValue(endpoint, ConfigValueFactory.fromAnyRef(bootConfig.getString(endpoint)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import net.corda.libs.platform.PlatformInfoProvider
import net.corda.messaging.api.constants.WorkerRPCPaths.CRYPTO_PATH
import net.corda.messaging.api.constants.WorkerRPCPaths.LEDGER_PATH
import net.corda.messaging.api.constants.WorkerRPCPaths.PERSISTENCE_PATH
import net.corda.messaging.api.constants.WorkerRPCPaths.TOKEN_SELECTION_PATH
import net.corda.messaging.api.constants.WorkerRPCPaths.UNIQUENESS_PATH
import net.corda.messaging.api.constants.WorkerRPCPaths.VERIFICATION_PATH
import net.corda.messaging.api.mediator.MediatorMessage
Expand All @@ -30,7 +31,6 @@ import net.corda.messaging.api.mediator.factory.MultiSourceEventMediatorFactory
import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC
import net.corda.schema.Schemas.Flow.FLOW_MAPPER_EVENT_TOPIC
import net.corda.schema.Schemas.Flow.FLOW_STATUS_TOPIC
import net.corda.schema.Schemas.Services.TOKEN_CACHE_EVENT
import net.corda.schema.configuration.ConfigKeys
import net.corda.schema.configuration.FlowConfig
import org.assertj.core.api.Assertions.assertThat
Expand Down Expand Up @@ -75,7 +75,7 @@ class FlowEventMediatorFactoryImplTest {
)
}

private fun endpoint(suffix: String) : String {
private fun endpoint(suffix: String): String {
// As no config is supplied in these tests, the parameterized parts of the endpoint will be null.
return "http://null/api/null$suffix"
}
Expand Down Expand Up @@ -103,7 +103,9 @@ class FlowEventMediatorFactoryImplTest {
assertThat(router.getDestination(MediatorMessage(FlowStatus())).endpoint).isEqualTo(FLOW_STATUS_TOPIC)
assertThat(router.getDestination(MediatorMessage(LedgerPersistenceRequest())).endpoint)
.isEqualTo(endpoint(LEDGER_PATH))
assertThat(router.getDestination(MediatorMessage(TokenPoolCacheEvent())).endpoint).isEqualTo(TOKEN_CACHE_EVENT)
assertThat(router.getDestination(MediatorMessage(TokenPoolCacheEvent())).endpoint).isEqualTo(
endpoint(TOKEN_SELECTION_PATH)
)
assertThat(router.getDestination(MediatorMessage(TransactionVerificationRequest())).endpoint).isEqualTo(
endpoint(VERIFICATION_PATH)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,5 +202,6 @@ class FlowExecutorImplTest {
.withEndpoint(BootConfig.PERSISTENCE_WORKER_REST_ENDPOINT, "TEST_PERSISTENCE_ENDPOINT")
.withEndpoint(BootConfig.UNIQUENESS_WORKER_REST_ENDPOINT, "TEST_UNIQUENESS_ENDPOINT")
.withEndpoint(BootConfig.VERIFICATION_WORKER_REST_ENDPOINT, "TEST_VERIFICATION_ENDPOINT")
.withEndpoint(BootConfig.TOKEN_SELECTION_WORKER_REST_ENDPOINT, "TEST_TOKEN_SELECTION_ENDPOINT")
}
}
1 change: 1 addition & 0 deletions components/ledger/ledger-utxo-flow/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ dependencies {
implementation project(':libs:platform-info')
implementation project(':libs:sandbox')
implementation project(':libs:serialization:json-validator')
implementation project(':libs:serialization:serialization-avro')
implementation project(':libs:serialization:serialization-checkpoint-api')
implementation project(':libs:serialization:serialization-internal')
implementation project(':libs:utilities')
Expand Down
Loading

0 comments on commit 55af914

Please sign in to comment.