diff --git a/p8e-api/src/main/kotlin/io/provenance/engine/config/Properties.kt b/p8e-api/src/main/kotlin/io/provenance/engine/config/Properties.kt index d9dbb6c..c901c9e 100644 --- a/p8e-api/src/main/kotlin/io/provenance/engine/config/Properties.kt +++ b/p8e-api/src/main/kotlin/io/provenance/engine/config/Properties.kt @@ -54,7 +54,9 @@ class ChaincodeProperties { @NotNull var mainNet: Boolean = false @NotNull var emptyIterationBackoffMS: Int = 1_000 @NotNull var txBatchSize: Int = 25 - @NotNull var specTxBatchSize: Int = 10 + @NotNull var contractSpecTxBatchSize: Int = 10 + @NotNull var scopeSpecTxBatchSize: Int = 10 + @NotNull var contractSpecTxTimeoutS = 60 @NotNull var gasMultiplier: Double = 1.0 @NotNull var maxGasMultiplierPerDay: Int = 1000 @NotNull var blockHeightTimeoutInterval: Int = 20 diff --git a/p8e-api/src/main/kotlin/io/provenance/engine/service/ChaincodeInvokeService.kt b/p8e-api/src/main/kotlin/io/provenance/engine/service/ChaincodeInvokeService.kt index c96acae..372f62e 100644 --- a/p8e-api/src/main/kotlin/io/provenance/engine/service/ChaincodeInvokeService.kt +++ b/p8e-api/src/main/kotlin/io/provenance/engine/service/ChaincodeInvokeService.kt @@ -401,7 +401,8 @@ class ChaincodeInvokeService( scopeSpecIdToContractSpecHashes: Map>, contractSpecs: List, ) { - log.info("received a set of contract specs: ${contractSpecs.size} and scope specs: ${scopeSpecs.size}") + val logPrefix = "[addContractSpecs]" + log.info("$logPrefix received a set of contract specs: ${contractSpecs.size} and scope specs: ${scopeSpecs.size}") val owners = listOf(accountProvider.bech32Address()) @@ -431,26 +432,74 @@ class ChaincodeInvokeService( .addAllSigners(owners) .build() } - contractSpecTx.plus(scopeSpecTx).chunked(chaincodeProperties.specTxBatchSize).forEach { messages -> - log.info("sending batch of ${messages.size} contract spec messages") + val contractSpecHashes = contractSpecTx.chunked(chaincodeProperties.contractSpecTxBatchSize).map { messages -> + log.info("$logPrefix sending batch of ${messages.size} contract spec messages") val txBody = messages.toTxBody(provenanceGrpc.getLatestBlock().block.header.height + chaincodeProperties.blockHeightTimeoutInterval) synchronized(provenanceGrpc) { - batchTx(txBody, applyMultiplier = false).also { + batchTx(txBody, applyMultiplier = false).let { if (it.txResponse.code != 0) { throw Exception("Error adding contract spec: ${it.txResponse.rawLog}") } - log.info("contract spec batch made it to mempool with txhash = ${it.txResponse.txhash}") + log.info("$logPrefix contract spec batch made it to mempool with txhash = ${it.txResponse.txhash}") + + it.txResponse.txhash + } + } + } + + log.info("$logPrefix waiting for ${contractSpecHashes.size} contract spec batches to complete") + + // wait for all contract specs to be written + if (!contractSpecHashes.waitForAllTxsToCompleteSuccessfully(OffsetDateTime.now().plusSeconds( + chaincodeProperties.contractSpecTxTimeoutS.toLong() + ))) { + throw Exception("Timeout waiting for all contract spec txs to complete successfully") + } + + log.info("$logPrefix all ${contractSpecHashes.size} contract spec batches completed successfully") + + scopeSpecTx.chunked(chaincodeProperties.scopeSpecTxBatchSize).forEach { messages -> + log.info("$logPrefix sending batch of ${messages.size} scope spec messages") + val txBody = messages.toTxBody(provenanceGrpc.getLatestBlock().block.header.height + chaincodeProperties.blockHeightTimeoutInterval) + + synchronized(provenanceGrpc) { + batchTx(txBody, applyMultiplier = false).also { + if (it.txResponse.code != 0) { + throw Exception("Error adding scope spec: ${it.txResponse.rawLog}") + } + + log.info("$logPrefix scope spec batch made it to mempool with txhash = ${it.txResponse.txhash}") } } } } catch(e: Throwable) { - log.warn("failed to add contract spec: ${e.message}") + log.warn("$logPrefix failed to add contract spec: ${e.message}") throw e } } + fun List.waitForAllTxsToCompleteSuccessfully(deadline: OffsetDateTime): Boolean { + var remaining = this + while (deadline.isAfter(OffsetDateTime.now())) { + remaining = remaining.filterNot { + val txResponse = provenanceGrpc.getTx(it) + + if (txResponse.code > 0) { + throw Exception("Error adding contract spec while waiting for completion (code ${txResponse.code}): ${txResponse.rawLog}") + } + + txResponse.height > 0 && txResponse.code == 0 // filtering out transactions that have completed successfully + } + + if (remaining.isEmpty()) { + return true + } + } + return false + } + fun batchTx(body: TxBody, applyMultiplier: Boolean = true): BroadcastTxResponse { val accountNumber = accountInfo.accountNumber val sequenceNumber = getAndIncrementSequenceNumber() diff --git a/p8e-api/src/main/resources/application-container.properties b/p8e-api/src/main/resources/application-container.properties index c80b483..63de058 100644 --- a/p8e-api/src/main/resources/application-container.properties +++ b/p8e-api/src/main/resources/application-container.properties @@ -44,7 +44,9 @@ chaincode.chainId=${CHAINCODE_CHAIN_ID} chaincode.mnemonic=${CHAINCODE_MNEMONIC} chaincode.emptyIterationBackoffMS=${CHAINCODE_EMPTY_ITERATION_BACKOFF:750} chaincode.txBatchSize=${CHAINCODE_TX_BATCH_SIZE:25} -chaincode.specTxBatchSize=${CHAINCODE_SPEC_TX_BATCH_SIZE:10} +chaincode.contractSpecTxBatchSize=${CHAINCODE_CONTRACT_SPEC_TX_BATCH_SIZE:10} +chaincode.scopeSpecTxBatchSize=${CHAINCODE_SCOPE_SPEC_TX_BATCH_SIZE:10} +chaincode.contractSpecTxTimeoutS=${CHAINCODE_CONTRACT_SPEC_TX_TIMEOUT_S:60} chaincode.gasMultiplier=${CHAINCODE_GAS_MULTIPLIER:1.0} chaincode.maxGasMultiplierPerDay=${CHAINCODE_MAX_GAS_MULTIPLIER_PER_DAY:1000} chaincode.blockHeightTimeoutInterval=${CHAINCODE_BLOCK_HEIGHT_TIMEOUT_INTERVAL:20} diff --git a/p8e-api/src/main/resources/application-local.properties b/p8e-api/src/main/resources/application-local.properties index ea3c348..c7677f1 100644 --- a/p8e-api/src/main/resources/application-local.properties +++ b/p8e-api/src/main/resources/application-local.properties @@ -39,7 +39,9 @@ chaincode.chainId=chain-local chaincode.mnemonic=${CHAINCODE_MNEMONIC} chaincode.emptyIterationBackoffMS=750 chaincode.txBatchSize=25 -chaincode.specTxBatchSize=25 +chaincode.contractSpecTxBatchSize=10 +chaincode.scopeSpecTxBatchSize=10 +chaincode.contractSpecTxTimeoutS=60 # Elasticsearch elasticsearch.host=localhost