Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CORE-18532: Retry Transient Errors in Flow Engine #5337

Merged
merged 10 commits into from
Jan 4, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import net.corda.crypto.core.KeyAlreadyExistsException
import net.corda.crypto.core.KeyOrderBy
import net.corda.crypto.core.SecureHashImpl
import net.corda.crypto.core.ShortHash
import net.corda.crypto.impl.retrying.BackoffStrategy
import net.corda.crypto.impl.retrying.CryptoRetryingExecutor
import net.corda.crypto.impl.toMap
import net.corda.crypto.impl.toSignatureSpec
Expand Down Expand Up @@ -66,10 +65,7 @@ class CryptoOpsBusProcessor(
}
}

private val executor = CryptoRetryingExecutor(
logger,
BackoffStrategy.createBackoff(config.maxAttempts, config.waitBetweenMills)
)
private val executor = CryptoRetryingExecutor(logger, config.maxAttempts.toLong(), config.waitBetweenMills)

override fun onNext(request: RpcOpsRequest, respFuture: CompletableFuture<RpcOpsResponse>) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package net.corda.crypto.service.impl.bus

import net.corda.crypto.config.impl.RetryingConfig
import net.corda.crypto.core.CryptoService
import net.corda.crypto.impl.retrying.BackoffStrategy
import net.corda.crypto.impl.retrying.CryptoRetryingExecutor
import net.corda.crypto.softhsm.TenantInfoService
import net.corda.data.crypto.wire.CryptoNoContentValue
Expand All @@ -29,10 +28,7 @@ class HSMRegistrationBusProcessor(
private val logger: Logger = LoggerFactory.getLogger(this::class.java.enclosingClass)
}

private val executor = CryptoRetryingExecutor(
logger,
BackoffStrategy.createBackoff(config.maxAttempts, config.waitBetweenMills)
)
private val executor = CryptoRetryingExecutor(logger, config.maxAttempts.toLong(), config.waitBetweenMills)

override fun onNext(request: HSMRegistrationRequest, respFuture: CompletableFuture<HSMRegistrationResponse>) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import net.corda.crypto.core.CryptoService
import net.corda.crypto.core.SecureHashImpl
import net.corda.crypto.core.ShortHash
import net.corda.crypto.core.publicKeyIdFromBytes
import net.corda.crypto.impl.retrying.BackoffStrategy
import net.corda.crypto.impl.retrying.CryptoRetryingExecutor
import net.corda.crypto.impl.toMap
import net.corda.crypto.impl.toSignatureSpec
Expand Down Expand Up @@ -51,10 +50,7 @@ class CryptoFlowOpsProcessor(
private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass)
}

private val executor = CryptoRetryingExecutor(
logger,
BackoffStrategy.createBackoff(config.maxAttempts, config.waitBetweenMills)
)
private val executor = CryptoRetryingExecutor(logger, config.maxAttempts.toLong(), config.waitBetweenMills)

override fun process(request: FlowOpsRequest): FlowEvent {
logger.trace { "Processing request: ${request::class.java.name}" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ package net.corda.crypto.service.impl.rpc
import net.corda.crypto.config.impl.RetryingConfig
import net.corda.crypto.core.CryptoService
import net.corda.crypto.core.CryptoTenants
import net.corda.crypto.impl.retrying.BackoffStrategy
import net.corda.crypto.impl.retrying.CryptoRetryingExecutor
import net.corda.data.ExceptionEnvelope
import net.corda.data.crypto.wire.ops.encryption.request.DecryptRpcCommand
import net.corda.data.crypto.wire.ops.encryption.response.CryptoDecryptionResult
import net.corda.data.crypto.wire.ops.encryption.response.EncryptionOpsError
import net.corda.data.crypto.wire.ops.encryption.response.DecryptionOpsResponse
import net.corda.data.crypto.wire.ops.encryption.response.EncryptionOpsError
import net.corda.messaging.api.processor.SyncRPCProcessor
import net.corda.utilities.trace
import org.slf4j.LoggerFactory
Expand All @@ -23,13 +22,9 @@ class SessionDecryptionProcessor(
private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass)
}

private val executor = CryptoRetryingExecutor(
logger,
BackoffStrategy.createBackoff(config.maxAttempts, config.waitBetweenMills)
)

override val requestClass = DecryptRpcCommand::class.java
override val responseClass = DecryptionOpsResponse::class.java
private val executor = CryptoRetryingExecutor(logger, config.maxAttempts.toLong(), config.waitBetweenMills)

override fun process(request: DecryptRpcCommand): DecryptionOpsResponse {
logger.trace { "Processing request: ${request::class.java.name}" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package net.corda.crypto.service.impl.rpc
import net.corda.crypto.config.impl.RetryingConfig
import net.corda.crypto.core.CryptoService
import net.corda.crypto.core.CryptoTenants
import net.corda.crypto.impl.retrying.BackoffStrategy
import net.corda.crypto.impl.retrying.CryptoRetryingExecutor
import net.corda.data.ExceptionEnvelope
import net.corda.data.crypto.wire.ops.encryption.request.EncryptRpcCommand
Expand All @@ -23,13 +22,9 @@ class SessionEncryptionProcessor(
private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass)
}

private val executor = CryptoRetryingExecutor(
logger,
BackoffStrategy.createBackoff(config.maxAttempts, config.waitBetweenMills)
)

override val requestClass = EncryptRpcCommand::class.java
override val responseClass = EncryptionOpsResponse::class.java
private val executor = CryptoRetryingExecutor(logger, config.maxAttempts.toLong(), config.waitBetweenMills)

override fun process(request: EncryptRpcCommand): EncryptionOpsResponse {
logger.trace { "Processing request: ${request::class.java.name}" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class FlowServiceTestContext @Activate constructor(

private val testConfig = mutableMapOf<String, Any>(
FlowConfig.EXTERNAL_EVENT_MAX_RETRIES to 2,
FlowConfig.PROCESSING_MAX_RETRY_ATTEMPTS to 5,
FlowConfig.EXTERNAL_EVENT_MESSAGE_RESEND_WINDOW to 500000,
FlowConfig.SESSION_TIMEOUT_WINDOW to 500000,
FlowConfig.SESSION_FLOW_CLEANUP_TIME to 30000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package net.corda.flow.testing.tests
import net.corda.data.flow.output.FlowStates
import net.corda.flow.testing.context.ALICE_FLOW_KEY_MAPPER
import net.corda.flow.testing.context.BOB_FLOW_KEY_MAPPER
import net.corda.flow.testing.context.CHARLIE_FLOW_KEY_MAPPER
import net.corda.flow.testing.context.FlowServiceTestBase
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
Expand Down Expand Up @@ -53,39 +52,6 @@ class FlowFinishedAcceptanceTest : FlowServiceTestBase() {
}
}

@Test
fun `A flow finishing when previously in a retry state publishes a completed flow status and schedules flow cleanup`() {
// Trigger a retry state
`when` {
startFlowEventReceived(FLOW_ID1, REQUEST_ID1, CHARLIE_HOLDING_IDENTITY, CPI1, "flow start data")
}

then {
expectOutputForFlow(FLOW_ID1) {
checkpointHasRetry(1)
}
}

// Now add the missing vnode and trigger a flow completion
given {
virtualNode(CPI1, CHARLIE_HOLDING_IDENTITY)
membershipGroupFor(CHARLIE_HOLDING_IDENTITY)
}

`when` {
startFlowEventReceived(FLOW_ID1, REQUEST_ID1, CHARLIE_HOLDING_IDENTITY, CPI1, "flow start data")
.completedSuccessfullyWith(DONE)
}

then {
expectOutputForFlow(FLOW_ID1) {
nullStateRecord()
flowStatus(FlowStates.COMPLETED, result = DONE)
scheduleFlowMapperCleanupEvents(CHARLIE_FLOW_KEY_MAPPER)
}
}
}

@Test
fun `A flow finishing with FlowFinished removes fiber from fiber cache`() {
`when` {
Expand Down Expand Up @@ -118,4 +84,4 @@ class FlowFinishedAcceptanceTest : FlowServiceTestBase() {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.parallel.Execution
import org.junit.jupiter.api.parallel.ExecutionMode
import org.osgi.test.junit5.service.ServiceExtension
import java.util.concurrent.CountDownLatch
import kotlin.concurrent.thread

@ExtendWith(ServiceExtension::class)
@Execution(ExecutionMode.SAME_THREAD)
class StartFlowTest : FlowServiceTestBase() {

@Test
fun `RPC Start Flow - Flow starts and updates stats to running`() {

fun `Flow starts and runs to completion`() {
given {
virtualNode(CPI1, BOB_HOLDING_IDENTITY)
cpkMetadata(CPI1, CPK1, CPK1_CHECKSUM)
Expand All @@ -41,14 +42,12 @@ class StartFlowTest : FlowServiceTestBase() {
}
}


/**
* When a virtual node has an INACTIVE StartFlowOperationalStatus, it should throw a FlowMarkedForKillException and
* have a Killed status.
*/
@Test
fun `Flow is marked as killed if startFlowOperationalStatus of vNode is INACTIVE`() {

given {
virtualNode(CPI1, CHARLIE_HOLDING_IDENTITY, flowStartOperationalStatus = OperationalStatus.INACTIVE)
cpkMetadata(CPI1, CPK1, CPK1_CHECKSUM)
Expand All @@ -67,98 +66,82 @@ class StartFlowTest : FlowServiceTestBase() {
flowStatus(
state = FlowStates.KILLED,
flowTerminatedReason = "flowStartOperationalStatus is INACTIVE, new flows cannot be started for " +
"virtual node with shortHash ${CHARLIE_HOLDING_IDENTITY.toCorda().shortHash}"
"virtual node with shortHash ${CHARLIE_HOLDING_IDENTITY.toCorda().shortHash}"
)
}
}
}

/**
* When a flow event fails with a transient exception then the flow will be put into a retry
* state. In this case the flow engine will publish the problematic event back to itself to be processed at some
* later time. This could then fail again, triggering the same loop.
* When a flow event fails with an internal transient exception, the flow engine will automatically retry the failed
* operation without publishing any extra messages to Kafka. Instead, it will try to re-process the event at a later
* time using exponential backoff. This could then fail again, triggering the same loop until a threshold is reached.
*
* Scenario 1 - Fails multiple times but completes before the retry limit
* Scenario 2 - Fails multiple times and hits the retry limit failing the flow to the DLQ
* Scenario 1 - Fails multiple times, transient error is fixed before retry limit => flow completes.
* Scenario 2 - Fails multiple times, transient error persists, hits the retry limit => flow fails and DLQ event.
*/
@Test
fun `RPC Start Flow - Retry scenario 1 - Fail then succeeds`() {
fun `Retry scenario 1 - Transient exception is fixed within the retry limit and flow succeeds`() {
given {
cpkMetadata(CPI1, CPK1, CPK1_CHECKSUM)
sandboxCpk(CPK1_CHECKSUM)
membershipGroupFor(BOB_HOLDING_IDENTITY)
}

`when` {
startFlowEventReceived(FLOW_ID1, REQUEST_ID1, BOB_HOLDING_IDENTITY, CPI1, "flow start data")
}

then {
expectOutputForFlow(FLOW_ID1) {
checkpointHasRetry(1)
flowFiberCacheDoesNotContainKey(BOB_HOLDING_IDENTITY, REQUEST_ID1)
val latch = CountDownLatch(1)
val startFlow = thread {
latch.countDown()
`when` {
startFlowEventReceived(FLOW_ID1, REQUEST_ID1, BOB_HOLDING_IDENTITY, CPI1, "flow start data")
.suspendsWith(FlowIORequest.InitialCheckpoint)
.completedSuccessfullyWith("hello")
}
}

// Now fix the issue and expect the wake-up event to
// retry the start flow successfully
given {
virtualNode(CPI1, BOB_HOLDING_IDENTITY)
}

`when` {
startFlowEventReceived(FLOW_ID1, REQUEST_ID1, BOB_HOLDING_IDENTITY, CPI1, "flow start data")
.completedSuccessfullyWith("hello")
}
latch.await()
Thread.sleep(1000)
testContext.virtualNode(CPI1, BOB_HOLDING_IDENTITY)
startFlow.join()

then {
expectOutputForFlow(FLOW_ID1) {
checkpointDoesNotHaveRetry()
nullStateRecord()
flowStatus(FlowStates.COMPLETED, result = "hello")
flowFiberCacheDoesNotContainKey(BOB_HOLDING_IDENTITY, REQUEST_ID1)
}
}
}

@Test
fun `RPC Start Flow - Retry scenario 2 - Hit the retry limit and fail the flow`() {
fun `Retry scenario 2 - Transient exception is not fixed within the retry limit and flow permanently fails`() {
given {
cpkMetadata(CPI1, CPK1, CPK1_CHECKSUM)
sandboxCpk(CPK1_CHECKSUM)
membershipGroupFor(BOB_HOLDING_IDENTITY)
flowConfiguration(FlowConfig.PROCESSING_MAX_RETRY_WINDOW_DURATION, 0)
flowConfiguration(FlowConfig.PROCESSING_MAX_RETRY_ATTEMPTS, 1)
}

`when` {
startFlowEventReceived(FLOW_ID1, REQUEST_ID1, BOB_HOLDING_IDENTITY, CPI1, "flow start data")
}

then {
expectOutputForFlow(FLOW_ID1) {
checkpointHasRetry(1)
flowFiberCacheDoesNotContainKey(BOB_HOLDING_IDENTITY, REQUEST_ID1)
}
}

`when` {
startFlowEventReceived(FLOW_ID1, REQUEST_ID1, BOB_HOLDING_IDENTITY, CPI1, "flow start data")
}

then {
expectOutputForFlow(FLOW_ID1) {
nullStateRecord()
markedForDlq()
noFlowEvents()
flowFiberCacheDoesNotContainKey(BOB_HOLDING_IDENTITY, REQUEST_ID1)
//we can't return a status record after the change to checkpoint initialization
// we can't return a status record after the change to checkpoint initialization
// Story to deal with change in status records -> CORE-10571: Re-design how status record is published
/* flowStatus(
/*
flowStatus(
state = FlowStates.FAILED,
errorType = FlowProcessingExceptionTypes.FLOW_FAILED,
errorMessage = "Execution failed with \"Failed to find the virtual node info for holder " +
"'HoldingIdentity(x500Name=${BOB_HOLDING_IDENTITY.x500Name}, groupId=${BOB_HOLDING_IDENTITY.groupId})'\" " +
"after 1 retry attempts."
)*/
"'HoldingIdentity(x500Name=${BOB_HOLDING_IDENTITY.x500Name}, groupId=${BOB_HOLDING_IDENTITY.groupId})'\" " +
"after 1 retry attempts."
)
*/
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import net.corda.flow.pipeline.exceptions.FlowEventException
import net.corda.flow.pipeline.exceptions.FlowFatalException
import net.corda.flow.pipeline.exceptions.FlowMarkedForKillException
import net.corda.flow.pipeline.exceptions.FlowPlatformException
import net.corda.flow.pipeline.exceptions.FlowTransientException
import net.corda.libs.configuration.SmartConfig

/**
Expand Down Expand Up @@ -34,18 +33,6 @@ interface FlowEventExceptionProcessor {
*/
fun process(throwable: Throwable, context: FlowEventContext<*>): FlowEventContext<*>

/**
* Processes a [FlowTransientException] and provides the pipeline response.
*
* Used to handle event retries.
*
* @param exception The [FlowTransientException] thrown during processing
* @param context The [FlowEventContext] at the point of failure.
*
* @return The updated context.
*/
fun process(exception: FlowTransientException, context: FlowEventContext<*>): FlowEventContext<*>

/**
* Processes a [FlowFatalException] and provides the pipeline response.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package net.corda.flow.pipeline.exceptions
import net.corda.v5.base.exceptions.CordaRuntimeException

/**
* The [FlowTransientException] is thrown for a recoverable error, this exception will cause the event processing
* to be retried
* The [FlowTransientException] should only be thrown for transient errors that affect the flow engine and any of its
* dependant services, it will cause the pipeline execution to be automatically retried at source.
*/
class FlowTransientException(override val message: String, cause: Throwable? = null) :
CordaRuntimeException(message, cause)

Loading