Skip to content

Commit

Permalink
wait for contract spec txs to complete before submitting scope spec t…
Browse files Browse the repository at this point in the history
…xs (#109)
  • Loading branch information
celloman authored Apr 27, 2022
1 parent 54dc7cb commit c9fa76e
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ class ChaincodeProperties {
@NotNull var mainNet: Boolean = false
@NotNull var emptyIterationBackoffMS: Int = 1_000
@NotNull var txBatchSize: Int = 25
@NotNull var specTxBatchSize: Int = 10
@NotNull var contractSpecTxBatchSize: Int = 10
@NotNull var scopeSpecTxBatchSize: Int = 10
@NotNull var contractSpecTxTimeoutS = 60
@NotNull var gasMultiplier: Double = 1.0
@NotNull var maxGasMultiplierPerDay: Int = 1000
@NotNull var blockHeightTimeoutInterval: Int = 20
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,8 @@ class ChaincodeInvokeService(
scopeSpecIdToContractSpecHashes: Map<UUID, Collection<ByteString>>,
contractSpecs: List<ContractSpec>,
) {
log.info("received a set of contract specs: ${contractSpecs.size} and scope specs: ${scopeSpecs.size}")
val logPrefix = "[addContractSpecs]"
log.info("$logPrefix received a set of contract specs: ${contractSpecs.size} and scope specs: ${scopeSpecs.size}")

val owners = listOf(accountProvider.bech32Address())

Expand Down Expand Up @@ -431,26 +432,74 @@ class ChaincodeInvokeService(
.addAllSigners(owners)
.build()
}
contractSpecTx.plus(scopeSpecTx).chunked(chaincodeProperties.specTxBatchSize).forEach { messages ->
log.info("sending batch of ${messages.size} contract spec messages")
val contractSpecHashes = contractSpecTx.chunked(chaincodeProperties.contractSpecTxBatchSize).map { messages ->
log.info("$logPrefix sending batch of ${messages.size} contract spec messages")
val txBody = messages.toTxBody(provenanceGrpc.getLatestBlock().block.header.height + chaincodeProperties.blockHeightTimeoutInterval)

synchronized(provenanceGrpc) {
batchTx(txBody, applyMultiplier = false).also {
batchTx(txBody, applyMultiplier = false).let {
if (it.txResponse.code != 0) {
throw Exception("Error adding contract spec: ${it.txResponse.rawLog}")
}

log.info("contract spec batch made it to mempool with txhash = ${it.txResponse.txhash}")
log.info("$logPrefix contract spec batch made it to mempool with txhash = ${it.txResponse.txhash}")

it.txResponse.txhash
}
}
}

log.info("$logPrefix waiting for ${contractSpecHashes.size} contract spec batches to complete")

// wait for all contract specs to be written
if (!contractSpecHashes.waitForAllTxsToCompleteSuccessfully(OffsetDateTime.now().plusSeconds(
chaincodeProperties.contractSpecTxTimeoutS.toLong()
))) {
throw Exception("Timeout waiting for all contract spec txs to complete successfully")
}

log.info("$logPrefix all ${contractSpecHashes.size} contract spec batches completed successfully")

scopeSpecTx.chunked(chaincodeProperties.scopeSpecTxBatchSize).forEach { messages ->
log.info("$logPrefix sending batch of ${messages.size} scope spec messages")
val txBody = messages.toTxBody(provenanceGrpc.getLatestBlock().block.header.height + chaincodeProperties.blockHeightTimeoutInterval)

synchronized(provenanceGrpc) {
batchTx(txBody, applyMultiplier = false).also {
if (it.txResponse.code != 0) {
throw Exception("Error adding scope spec: ${it.txResponse.rawLog}")
}

log.info("$logPrefix scope spec batch made it to mempool with txhash = ${it.txResponse.txhash}")
}
}
}
} catch(e: Throwable) {
log.warn("failed to add contract spec: ${e.message}")
log.warn("$logPrefix failed to add contract spec: ${e.message}")
throw e
}
}

fun List<String>.waitForAllTxsToCompleteSuccessfully(deadline: OffsetDateTime): Boolean {
var remaining = this
while (deadline.isAfter(OffsetDateTime.now())) {
remaining = remaining.filterNot {
val txResponse = provenanceGrpc.getTx(it)

if (txResponse.code > 0) {
throw Exception("Error adding contract spec while waiting for completion (code ${txResponse.code}): ${txResponse.rawLog}")
}

txResponse.height > 0 && txResponse.code == 0 // filtering out transactions that have completed successfully
}

if (remaining.isEmpty()) {
return true
}
}
return false
}

fun batchTx(body: TxBody, applyMultiplier: Boolean = true): BroadcastTxResponse {
val accountNumber = accountInfo.accountNumber
val sequenceNumber = getAndIncrementSequenceNumber()
Expand Down
4 changes: 3 additions & 1 deletion p8e-api/src/main/resources/application-container.properties
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ chaincode.chainId=${CHAINCODE_CHAIN_ID}
chaincode.mnemonic=${CHAINCODE_MNEMONIC}
chaincode.emptyIterationBackoffMS=${CHAINCODE_EMPTY_ITERATION_BACKOFF:750}
chaincode.txBatchSize=${CHAINCODE_TX_BATCH_SIZE:25}
chaincode.specTxBatchSize=${CHAINCODE_SPEC_TX_BATCH_SIZE:10}
chaincode.contractSpecTxBatchSize=${CHAINCODE_CONTRACT_SPEC_TX_BATCH_SIZE:10}
chaincode.scopeSpecTxBatchSize=${CHAINCODE_SCOPE_SPEC_TX_BATCH_SIZE:10}
chaincode.contractSpecTxTimeoutS=${CHAINCODE_CONTRACT_SPEC_TX_TIMEOUT_S:60}
chaincode.gasMultiplier=${CHAINCODE_GAS_MULTIPLIER:1.0}
chaincode.maxGasMultiplierPerDay=${CHAINCODE_MAX_GAS_MULTIPLIER_PER_DAY:1000}
chaincode.blockHeightTimeoutInterval=${CHAINCODE_BLOCK_HEIGHT_TIMEOUT_INTERVAL:20}
Expand Down
4 changes: 3 additions & 1 deletion p8e-api/src/main/resources/application-local.properties
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ chaincode.chainId=chain-local
chaincode.mnemonic=${CHAINCODE_MNEMONIC}
chaincode.emptyIterationBackoffMS=750
chaincode.txBatchSize=25
chaincode.specTxBatchSize=25
chaincode.contractSpecTxBatchSize=10
chaincode.scopeSpecTxBatchSize=10
chaincode.contractSpecTxTimeoutS=60

# Elasticsearch
elasticsearch.host=localhost
Expand Down

0 comments on commit c9fa76e

Please sign in to comment.