Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/release/os/5.1' into vkolomeyko/…
Browse files Browse the repository at this point in the history
…e2e-timeout-investigation
  • Loading branch information
vkolomeyko committed Oct 26, 2023
2 parents 8f4a8f5 + 1de54e6 commit 9738094
Show file tree
Hide file tree
Showing 15 changed files with 127 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,9 @@ interface TokenCache : Iterable<CachedToken> {
* @param stateRefs The list of tokens to be removed
*/
fun removeAll(stateRefs: Set<String>)

/**
* Empties the cache
*/
fun removeAll()
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ class TokenCacheImpl: TokenCache {
stateRefs.forEach { cachedTokens.remove(it) }
}

override fun removeAll() {
cachedTokens.clear()
}

override fun iterator(): Iterator<CachedToken> {
return cachedTokens.values.iterator()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@ class TokenCacheEventProcessorFactoryImpl constructor(
private val clock: Clock
) : TokenCacheEventProcessorFactory {

private val tokenPoolCache = TokenPoolCacheImpl()

override fun create(): StateAndEventProcessor<TokenPoolCacheKey, TokenPoolCacheState, TokenPoolCacheEvent> {
val recordFactory = RecordFactoryImpl(externalEventResponseFactory)
val tokenFilterStrategy = SimpleTokenFilterStrategy()
val sqlQueryProvider = SqlQueryProviderTokens()
val utxoTokenRepository = UtxoTokenRepositoryImpl(sqlQueryProvider)
val tokenPoolCache = TokenPoolCacheImpl()
val tokenSelectionMetrics = TokenSelectionMetricsImpl(clock)
val availableTokenService = AvailableTokenServiceImpl(
virtualNodeInfoService,
Expand Down Expand Up @@ -89,7 +90,7 @@ class TokenCacheEventProcessorFactoryImpl constructor(
stateManager: StateManager,
processor: StateAndEventProcessor<TokenPoolCacheKey, TokenPoolCacheState, TokenPoolCacheEvent>
): TokenSelectionDelegatedProcessor {
val claimStateStoreFactory = ClaimStateStoreFactoryImpl(stateManager, serialization, clock)
val claimStateStoreFactory = ClaimStateStoreFactoryImpl(stateManager, serialization, tokenPoolCache, clock)
val tokenSelectionMetrics = TokenSelectionMetricsImpl(UTCClock())
return TokenSelectionDelegatedProcessorImpl(
eventConverter,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
package net.corda.ledger.utxo.token.cache.services

import net.corda.ledger.utxo.token.cache.entities.TokenPoolCache
import net.corda.ledger.utxo.token.cache.entities.TokenPoolKey
import net.corda.libs.statemanager.api.StateManager
import net.corda.utilities.time.Clock

class ClaimStateStoreFactoryImpl(
private val stateManager: StateManager,
private val serialization: TokenPoolCacheStateSerialization,
private val tokenPoolCache: TokenPoolCache,
private val clock: Clock
) : ClaimStateStoreFactory {

override fun create(key: TokenPoolKey, storedPoolClaimState: StoredPoolClaimState): ClaimStateStore {
return PerformanceClaimStateStoreImpl(key, storedPoolClaimState, serialization, stateManager, clock)
return PerformanceClaimStateStoreImpl(
key,
storedPoolClaimState,
serialization,
stateManager,
tokenPoolCache,
clock
)
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package net.corda.ledger.utxo.token.cache.services

import net.corda.data.ledger.utxo.token.selection.state.TokenPoolCacheState
import net.corda.ledger.utxo.token.cache.entities.TokenPoolCache
import net.corda.ledger.utxo.token.cache.entities.TokenPoolKey
import net.corda.libs.statemanager.api.State
import net.corda.libs.statemanager.api.StateManager
Expand All @@ -10,12 +11,13 @@ import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

@Suppress("UNUSED")
@Suppress("LongParameterList")
class PerformanceClaimStateStoreImpl(
private val key: TokenPoolKey,
storedPoolClaimState: StoredPoolClaimState,
private val serialization: TokenPoolCacheStateSerialization,
private val stateManager: StateManager,
tokenPoolCache: TokenPoolCache,
private val clock: Clock
) : ClaimStateStore {

Expand All @@ -28,7 +30,7 @@ class PerformanceClaimStateStoreImpl(
ThreadPoolExecutor.DiscardPolicy()
)
private val requestQueue = LinkedBlockingQueue<QueuedRequestItem>()

private val tokenCache = tokenPoolCache.get(key)
private var currentState = storedPoolClaimState

private data class QueuedRequestItem(
Expand Down Expand Up @@ -79,6 +81,11 @@ class PerformanceClaimStateStoreImpl(
key,
serialization.deserialize(mismatchedState.value)
)

// When fail to save the state we have to assume the available token cache could be invalid
// and therefore clear it to force a refresh from the DB on the next request.
tokenCache.removeAll()

unexceptionalRequests.abort()
} else {
currentState = currentState.copy(dbVersion = currentState.dbVersion + 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ class TokenCacheImplTest {
assertThat(target.toList()).containsOnly(cachedToken2)
}

@Test
fun `remove all`() {
val target = TokenCacheImpl()
val cachedToken1 = mock<CachedToken>().apply { whenever(stateRef).thenReturn("s1") }
val cachedToken2 = mock<CachedToken>().apply { whenever(stateRef).thenReturn("s2") }
val cachedToken3 = mock<CachedToken>().apply { whenever(stateRef).thenReturn("s3") }
target.add(listOf(cachedToken1, cachedToken2, cachedToken3))

assertThat(target.toList()).containsOnly(cachedToken1, cachedToken2, cachedToken3)
target.removeAll()
assertThat(target.toList()).isEmpty()
}

@Test
fun `iterating the cache`() {
val target = TokenCacheImpl()
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package net.corda.ledger.utxo.token.cache.impl.services

import net.corda.data.ledger.utxo.token.selection.data.TokenClaim
import net.corda.data.ledger.utxo.token.selection.state.TokenPoolCacheState
import net.corda.ledger.utxo.token.cache.entities.TokenCache
import net.corda.ledger.utxo.token.cache.entities.TokenPoolCache
import net.corda.ledger.utxo.token.cache.impl.POOL_KEY
import net.corda.ledger.utxo.token.cache.impl.TOKEN_POOL_CACHE_STATE
import net.corda.ledger.utxo.token.cache.services.StoredPoolClaimState
Expand All @@ -16,7 +18,10 @@ import net.corda.schema.registry.impl.AvroSchemaRegistryImpl
import net.corda.utilities.time.Clock
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.mockito.kotlin.atLeast
import org.mockito.kotlin.mock
import org.mockito.kotlin.times
import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever
import java.time.Instant
import java.util.Timer
Expand All @@ -30,6 +35,8 @@ class PerformanceClaimStateStoreImplTest {
TokenPoolCacheStateSerializationImpl(CordaAvroSerializationFactoryImpl(AvroSchemaRegistryImpl()))
private val now = Instant.ofEpochMilli(1)
private val clock = mock<Clock>().apply { whenever(instant()).thenReturn(now) }
private val tokenCache = mock<TokenCache>()
private val tokenPoolCache = mock<TokenPoolCache>().apply { whenever(get(POOL_KEY)).thenReturn(tokenCache) }
private val baseState = State(
POOL_KEY.toString(),
serialization.serialize(TOKEN_POOL_CACHE_STATE),
Expand All @@ -54,16 +61,20 @@ class PerformanceClaimStateStoreImplTest {
this.create(listOf(baseState))
}

val initialStoredPoolClaimStateA = StoredPoolClaimState( 0, POOL_KEY, TokenPoolCacheState.newBuilder()
.setPoolKey(POOL_KEY.toAvro())
.setAvailableTokens(listOf())
.setTokenClaims(listOf())
.build())
val initialStoredPoolClaimStateB = StoredPoolClaimState( 0, POOL_KEY, TokenPoolCacheState.newBuilder()
.setPoolKey(POOL_KEY.toAvro())
.setAvailableTokens(listOf())
.setTokenClaims(listOf())
.build())
val initialStoredPoolClaimStateA = StoredPoolClaimState(
0, POOL_KEY, TokenPoolCacheState.newBuilder()
.setPoolKey(POOL_KEY.toAvro())
.setAvailableTokens(listOf())
.setTokenClaims(listOf())
.build()
)
val initialStoredPoolClaimStateB = StoredPoolClaimState(
0, POOL_KEY, TokenPoolCacheState.newBuilder()
.setPoolKey(POOL_KEY.toAvro())
.setAvailableTokens(listOf())
.setTokenClaims(listOf())
.build()
)
val instanceA = createTarget(initialStoredPoolClaimStateA, slowStateManager)
val instanceB = createTarget(initialStoredPoolClaimStateB, slowStateManager)

Expand All @@ -88,7 +99,7 @@ class PerformanceClaimStateStoreImplTest {
isComplete = instanceA.enqueueRequest { poolState ->
if (poolState.tokenClaims.any { it.claimId == newAClaim.claimId }) {
println("unexpected claim ${newAClaim.claimId}")
}else{
} else {
println("${newAClaim.claimId}")
}
poolState.tokenClaims = poolState.tokenClaims + newAClaim
Expand All @@ -114,7 +125,7 @@ class PerformanceClaimStateStoreImplTest {
isComplete = instanceB.enqueueRequest { poolState ->
if (poolState.tokenClaims.any { it.claimId == newBClaim.claimId }) {
println("unexpected claim ${newBClaim.claimId}")
}else{
} else {
println("${newBClaim.claimId}")
}
poolState.tokenClaims = poolState.tokenClaims + newBClaim
Expand Down Expand Up @@ -146,6 +157,9 @@ class PerformanceClaimStateStoreImplTest {
val pool = serialization.deserialize(endState.value)
assertThat(pool.tokenClaims.map { it.claimId }).containsOnlyOnceElementsOf(allClaimIds)

// We expect the available tokens cache to be cleared for each concurrency failure
verify(tokenCache, atLeast(1)).removeAll()

println("Update Call Count: ${slowStateManager.updateCallCount}")
println("Update Fail Count: ${slowStateManager.updateFailCount}")
println("Instance A Failures: $instanceAFailCount")
Expand All @@ -160,8 +174,11 @@ class PerformanceClaimStateStoreImplTest {
.build()
}

private fun createTarget(storedPoolClaimState: StoredPoolClaimState, sm: StateManager): PerformanceClaimStateStoreImpl {
return PerformanceClaimStateStoreImpl(POOL_KEY, storedPoolClaimState, serialization, sm, clock)
private fun createTarget(
storedPoolClaimState: StoredPoolClaimState,
sm: StateManager
): PerformanceClaimStateStoreImpl {
return PerformanceClaimStateStoreImpl(POOL_KEY, storedPoolClaimState, serialization, sm, tokenPoolCache, clock)
}

class StateManagerSimulator(private val updateSleepTime: Long = 0) : StateManager {
Expand Down
Loading

0 comments on commit 9738094

Please sign in to comment.