Skip to content

Commit

Permalink
CORE-18532: Retry Transient Errors in Flow Engine (#5337)
Browse files Browse the repository at this point in the history
Instead of automatically publishing the original event back to the
"flow.event" topic whenever a transient exception occurs when executing
the flow pipeline, automatically retry the exceptions using an
exponential backoff retry mechanism and permanently fail the flow if
the configured time or attempts is reached.

- Create internal retry utility to manage retries in an automated
  fashion with pluggable backoff strategy (constant, linear and
  exponential provided out of the box).
- Replace existing utilities in the crypto components and tests with
  the new one and delete unused classes.
- Update FlowEventProcessor to automatically retry transient exceptions
  through the new retry utility using an exponential backoff with a
  growth factor of 250ms.
- Remove unnecessary code and update tests to accommodate for the new
  internal retry.
  • Loading branch information
jujoramos authored Jan 4, 2024
1 parent ace5649 commit bca719e
Show file tree
Hide file tree
Showing 28 changed files with 642 additions and 586 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import net.corda.crypto.core.KeyAlreadyExistsException
import net.corda.crypto.core.KeyOrderBy
import net.corda.crypto.core.SecureHashImpl
import net.corda.crypto.core.ShortHash
import net.corda.crypto.impl.retrying.BackoffStrategy
import net.corda.crypto.impl.retrying.CryptoRetryingExecutor
import net.corda.crypto.impl.toMap
import net.corda.crypto.impl.toSignatureSpec
Expand Down Expand Up @@ -66,10 +65,7 @@ class CryptoOpsBusProcessor(
}
}

private val executor = CryptoRetryingExecutor(
logger,
BackoffStrategy.createBackoff(config.maxAttempts, config.waitBetweenMills)
)
private val executor = CryptoRetryingExecutor(logger, config.maxAttempts.toLong(), config.waitBetweenMills)

override fun onNext(request: RpcOpsRequest, respFuture: CompletableFuture<RpcOpsResponse>) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package net.corda.crypto.service.impl.bus

import net.corda.crypto.config.impl.RetryingConfig
import net.corda.crypto.core.CryptoService
import net.corda.crypto.impl.retrying.BackoffStrategy
import net.corda.crypto.impl.retrying.CryptoRetryingExecutor
import net.corda.crypto.softhsm.TenantInfoService
import net.corda.data.crypto.wire.CryptoNoContentValue
Expand All @@ -29,10 +28,7 @@ class HSMRegistrationBusProcessor(
private val logger: Logger = LoggerFactory.getLogger(this::class.java.enclosingClass)
}

private val executor = CryptoRetryingExecutor(
logger,
BackoffStrategy.createBackoff(config.maxAttempts, config.waitBetweenMills)
)
private val executor = CryptoRetryingExecutor(logger, config.maxAttempts.toLong(), config.waitBetweenMills)

override fun onNext(request: HSMRegistrationRequest, respFuture: CompletableFuture<HSMRegistrationResponse>) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import net.corda.crypto.core.CryptoService
import net.corda.crypto.core.SecureHashImpl
import net.corda.crypto.core.ShortHash
import net.corda.crypto.core.publicKeyIdFromBytes
import net.corda.crypto.impl.retrying.BackoffStrategy
import net.corda.crypto.impl.retrying.CryptoRetryingExecutor
import net.corda.crypto.impl.toMap
import net.corda.crypto.impl.toSignatureSpec
Expand Down Expand Up @@ -51,10 +50,7 @@ class CryptoFlowOpsProcessor(
private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass)
}

private val executor = CryptoRetryingExecutor(
logger,
BackoffStrategy.createBackoff(config.maxAttempts, config.waitBetweenMills)
)
private val executor = CryptoRetryingExecutor(logger, config.maxAttempts.toLong(), config.waitBetweenMills)

override fun process(request: FlowOpsRequest): FlowEvent {
logger.trace { "Processing request: ${request::class.java.name}" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ package net.corda.crypto.service.impl.rpc
import net.corda.crypto.config.impl.RetryingConfig
import net.corda.crypto.core.CryptoService
import net.corda.crypto.core.CryptoTenants
import net.corda.crypto.impl.retrying.BackoffStrategy
import net.corda.crypto.impl.retrying.CryptoRetryingExecutor
import net.corda.data.ExceptionEnvelope
import net.corda.data.crypto.wire.ops.encryption.request.DecryptRpcCommand
import net.corda.data.crypto.wire.ops.encryption.response.CryptoDecryptionResult
import net.corda.data.crypto.wire.ops.encryption.response.EncryptionOpsError
import net.corda.data.crypto.wire.ops.encryption.response.DecryptionOpsResponse
import net.corda.data.crypto.wire.ops.encryption.response.EncryptionOpsError
import net.corda.messaging.api.processor.SyncRPCProcessor
import net.corda.utilities.trace
import org.slf4j.LoggerFactory
Expand All @@ -23,13 +22,9 @@ class SessionDecryptionProcessor(
private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass)
}

private val executor = CryptoRetryingExecutor(
logger,
BackoffStrategy.createBackoff(config.maxAttempts, config.waitBetweenMills)
)

override val requestClass = DecryptRpcCommand::class.java
override val responseClass = DecryptionOpsResponse::class.java
private val executor = CryptoRetryingExecutor(logger, config.maxAttempts.toLong(), config.waitBetweenMills)

override fun process(request: DecryptRpcCommand): DecryptionOpsResponse {
logger.trace { "Processing request: ${request::class.java.name}" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package net.corda.crypto.service.impl.rpc
import net.corda.crypto.config.impl.RetryingConfig
import net.corda.crypto.core.CryptoService
import net.corda.crypto.core.CryptoTenants
import net.corda.crypto.impl.retrying.BackoffStrategy
import net.corda.crypto.impl.retrying.CryptoRetryingExecutor
import net.corda.data.ExceptionEnvelope
import net.corda.data.crypto.wire.ops.encryption.request.EncryptRpcCommand
Expand All @@ -23,13 +22,9 @@ class SessionEncryptionProcessor(
private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass)
}

private val executor = CryptoRetryingExecutor(
logger,
BackoffStrategy.createBackoff(config.maxAttempts, config.waitBetweenMills)
)

override val requestClass = EncryptRpcCommand::class.java
override val responseClass = EncryptionOpsResponse::class.java
private val executor = CryptoRetryingExecutor(logger, config.maxAttempts.toLong(), config.waitBetweenMills)

override fun process(request: EncryptRpcCommand): EncryptionOpsResponse {
logger.trace { "Processing request: ${request::class.java.name}" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class FlowServiceTestContext @Activate constructor(

private val testConfig = mutableMapOf<String, Any>(
FlowConfig.EXTERNAL_EVENT_MAX_RETRIES to 2,
FlowConfig.PROCESSING_MAX_RETRY_ATTEMPTS to 5,
FlowConfig.EXTERNAL_EVENT_MESSAGE_RESEND_WINDOW to 500000,
FlowConfig.SESSION_TIMEOUT_WINDOW to 500000,
FlowConfig.SESSION_FLOW_CLEANUP_TIME to 30000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package net.corda.flow.testing.tests
import net.corda.data.flow.output.FlowStates
import net.corda.flow.testing.context.ALICE_FLOW_KEY_MAPPER
import net.corda.flow.testing.context.BOB_FLOW_KEY_MAPPER
import net.corda.flow.testing.context.CHARLIE_FLOW_KEY_MAPPER
import net.corda.flow.testing.context.FlowServiceTestBase
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
Expand Down Expand Up @@ -53,39 +52,6 @@ class FlowFinishedAcceptanceTest : FlowServiceTestBase() {
}
}

@Test
fun `A flow finishing when previously in a retry state publishes a completed flow status and schedules flow cleanup`() {
// Trigger a retry state
`when` {
startFlowEventReceived(FLOW_ID1, REQUEST_ID1, CHARLIE_HOLDING_IDENTITY, CPI1, "flow start data")
}

then {
expectOutputForFlow(FLOW_ID1) {
checkpointHasRetry(1)
}
}

// Now add the missing vnode and trigger a flow completion
given {
virtualNode(CPI1, CHARLIE_HOLDING_IDENTITY)
membershipGroupFor(CHARLIE_HOLDING_IDENTITY)
}

`when` {
startFlowEventReceived(FLOW_ID1, REQUEST_ID1, CHARLIE_HOLDING_IDENTITY, CPI1, "flow start data")
.completedSuccessfullyWith(DONE)
}

then {
expectOutputForFlow(FLOW_ID1) {
nullStateRecord()
flowStatus(FlowStates.COMPLETED, result = DONE)
scheduleFlowMapperCleanupEvents(CHARLIE_FLOW_KEY_MAPPER)
}
}
}

@Test
fun `A flow finishing with FlowFinished removes fiber from fiber cache`() {
`when` {
Expand Down Expand Up @@ -118,4 +84,4 @@ class FlowFinishedAcceptanceTest : FlowServiceTestBase() {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.parallel.Execution
import org.junit.jupiter.api.parallel.ExecutionMode
import org.osgi.test.junit5.service.ServiceExtension
import java.util.concurrent.CountDownLatch
import kotlin.concurrent.thread

@ExtendWith(ServiceExtension::class)
@Execution(ExecutionMode.SAME_THREAD)
class StartFlowTest : FlowServiceTestBase() {

@Test
fun `RPC Start Flow - Flow starts and updates stats to running`() {

fun `Flow starts and runs to completion`() {
given {
virtualNode(CPI1, BOB_HOLDING_IDENTITY)
cpkMetadata(CPI1, CPK1, CPK1_CHECKSUM)
Expand All @@ -41,14 +42,12 @@ class StartFlowTest : FlowServiceTestBase() {
}
}


/**
* When a virtual node has an INACTIVE StartFlowOperationalStatus, it should throw a FlowMarkedForKillException and
* have a Killed status.
*/
@Test
fun `Flow is marked as killed if startFlowOperationalStatus of vNode is INACTIVE`() {

given {
virtualNode(CPI1, CHARLIE_HOLDING_IDENTITY, flowStartOperationalStatus = OperationalStatus.INACTIVE)
cpkMetadata(CPI1, CPK1, CPK1_CHECKSUM)
Expand All @@ -67,98 +66,82 @@ class StartFlowTest : FlowServiceTestBase() {
flowStatus(
state = FlowStates.KILLED,
flowTerminatedReason = "flowStartOperationalStatus is INACTIVE, new flows cannot be started for " +
"virtual node with shortHash ${CHARLIE_HOLDING_IDENTITY.toCorda().shortHash}"
"virtual node with shortHash ${CHARLIE_HOLDING_IDENTITY.toCorda().shortHash}"
)
}
}
}

/**
* When a flow event fails with a transient exception then the flow will be put into a retry
* state. In this case the flow engine will publish the problematic event back to itself to be processed at some
* later time. This could then fail again, triggering the same loop.
* When a flow event fails with an internal transient exception, the flow engine will automatically retry the failed
* operation without publishing any extra messages to Kafka. Instead, it will try to re-process the event at a later
* time using exponential backoff. This could then fail again, triggering the same loop until a threshold is reached.
*
* Scenario 1 - Fails multiple times but completes before the retry limit
* Scenario 2 - Fails multiple times and hits the retry limit failing the flow to the DLQ
* Scenario 1 - Fails multiple times, transient error is fixed before retry limit => flow completes.
* Scenario 2 - Fails multiple times, transient error persists, hits the retry limit => flow fails and DLQ event.
*/
@Test
fun `RPC Start Flow - Retry scenario 1 - Fail then succeeds`() {
fun `Retry scenario 1 - Transient exception is fixed within the retry limit and flow succeeds`() {
given {
cpkMetadata(CPI1, CPK1, CPK1_CHECKSUM)
sandboxCpk(CPK1_CHECKSUM)
membershipGroupFor(BOB_HOLDING_IDENTITY)
}

`when` {
startFlowEventReceived(FLOW_ID1, REQUEST_ID1, BOB_HOLDING_IDENTITY, CPI1, "flow start data")
}

then {
expectOutputForFlow(FLOW_ID1) {
checkpointHasRetry(1)
flowFiberCacheDoesNotContainKey(BOB_HOLDING_IDENTITY, REQUEST_ID1)
val latch = CountDownLatch(1)
val startFlow = thread {
latch.countDown()
`when` {
startFlowEventReceived(FLOW_ID1, REQUEST_ID1, BOB_HOLDING_IDENTITY, CPI1, "flow start data")
.suspendsWith(FlowIORequest.InitialCheckpoint)
.completedSuccessfullyWith("hello")
}
}

// Now fix the issue and expect the wake-up event to
// retry the start flow successfully
given {
virtualNode(CPI1, BOB_HOLDING_IDENTITY)
}

`when` {
startFlowEventReceived(FLOW_ID1, REQUEST_ID1, BOB_HOLDING_IDENTITY, CPI1, "flow start data")
.completedSuccessfullyWith("hello")
}
latch.await()
Thread.sleep(1000)
testContext.virtualNode(CPI1, BOB_HOLDING_IDENTITY)
startFlow.join()

then {
expectOutputForFlow(FLOW_ID1) {
checkpointDoesNotHaveRetry()
nullStateRecord()
flowStatus(FlowStates.COMPLETED, result = "hello")
flowFiberCacheDoesNotContainKey(BOB_HOLDING_IDENTITY, REQUEST_ID1)
}
}
}

@Test
fun `RPC Start Flow - Retry scenario 2 - Hit the retry limit and fail the flow`() {
fun `Retry scenario 2 - Transient exception is not fixed within the retry limit and flow permanently fails`() {
given {
cpkMetadata(CPI1, CPK1, CPK1_CHECKSUM)
sandboxCpk(CPK1_CHECKSUM)
membershipGroupFor(BOB_HOLDING_IDENTITY)
flowConfiguration(FlowConfig.PROCESSING_MAX_RETRY_WINDOW_DURATION, 0)
flowConfiguration(FlowConfig.PROCESSING_MAX_RETRY_ATTEMPTS, 1)
}

`when` {
startFlowEventReceived(FLOW_ID1, REQUEST_ID1, BOB_HOLDING_IDENTITY, CPI1, "flow start data")
}

then {
expectOutputForFlow(FLOW_ID1) {
checkpointHasRetry(1)
flowFiberCacheDoesNotContainKey(BOB_HOLDING_IDENTITY, REQUEST_ID1)
}
}

`when` {
startFlowEventReceived(FLOW_ID1, REQUEST_ID1, BOB_HOLDING_IDENTITY, CPI1, "flow start data")
}

then {
expectOutputForFlow(FLOW_ID1) {
nullStateRecord()
markedForDlq()
noFlowEvents()
flowFiberCacheDoesNotContainKey(BOB_HOLDING_IDENTITY, REQUEST_ID1)
//we can't return a status record after the change to checkpoint initialization
// we can't return a status record after the change to checkpoint initialization
// Story to deal with change in status records -> CORE-10571: Re-design how status record is published
/* flowStatus(
/*
flowStatus(
state = FlowStates.FAILED,
errorType = FlowProcessingExceptionTypes.FLOW_FAILED,
errorMessage = "Execution failed with \"Failed to find the virtual node info for holder " +
"'HoldingIdentity(x500Name=${BOB_HOLDING_IDENTITY.x500Name}, groupId=${BOB_HOLDING_IDENTITY.groupId})'\" " +
"after 1 retry attempts."
)*/
"'HoldingIdentity(x500Name=${BOB_HOLDING_IDENTITY.x500Name}, groupId=${BOB_HOLDING_IDENTITY.groupId})'\" " +
"after 1 retry attempts."
)
*/
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import net.corda.flow.pipeline.exceptions.FlowEventException
import net.corda.flow.pipeline.exceptions.FlowFatalException
import net.corda.flow.pipeline.exceptions.FlowMarkedForKillException
import net.corda.flow.pipeline.exceptions.FlowPlatformException
import net.corda.flow.pipeline.exceptions.FlowTransientException
import net.corda.libs.configuration.SmartConfig

/**
Expand Down Expand Up @@ -34,18 +33,6 @@ interface FlowEventExceptionProcessor {
*/
fun process(throwable: Throwable, context: FlowEventContext<*>): FlowEventContext<*>

/**
* Processes a [FlowTransientException] and provides the pipeline response.
*
* Used to handle event retries.
*
* @param exception The [FlowTransientException] thrown during processing
* @param context The [FlowEventContext] at the point of failure.
*
* @return The updated context.
*/
fun process(exception: FlowTransientException, context: FlowEventContext<*>): FlowEventContext<*>

/**
* Processes a [FlowFatalException] and provides the pipeline response.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package net.corda.flow.pipeline.exceptions
import net.corda.v5.base.exceptions.CordaRuntimeException

/**
* The [FlowTransientException] is thrown for a recoverable error, this exception will cause the event processing
* to be retried
* The [FlowTransientException] should only be thrown for transient errors that affect the flow engine and any of its
* dependant services, it will cause the pipeline execution to be automatically retried at source.
*/
class FlowTransientException(override val message: String, cause: Throwable? = null) :
CordaRuntimeException(message, cause)

Loading

0 comments on commit bca719e

Please sign in to comment.