diff --git a/components/crypto/crypto-service-impl/src/main/kotlin/net/corda/crypto/service/impl/bus/CryptoOpsBusProcessor.kt b/components/crypto/crypto-service-impl/src/main/kotlin/net/corda/crypto/service/impl/bus/CryptoOpsBusProcessor.kt index b6cde38d331..b2535615154 100644 --- a/components/crypto/crypto-service-impl/src/main/kotlin/net/corda/crypto/service/impl/bus/CryptoOpsBusProcessor.kt +++ b/components/crypto/crypto-service-impl/src/main/kotlin/net/corda/crypto/service/impl/bus/CryptoOpsBusProcessor.kt @@ -10,7 +10,6 @@ import net.corda.crypto.core.KeyAlreadyExistsException import net.corda.crypto.core.KeyOrderBy import net.corda.crypto.core.SecureHashImpl import net.corda.crypto.core.ShortHash -import net.corda.crypto.impl.retrying.BackoffStrategy import net.corda.crypto.impl.retrying.CryptoRetryingExecutor import net.corda.crypto.impl.toMap import net.corda.crypto.impl.toSignatureSpec @@ -66,10 +65,7 @@ class CryptoOpsBusProcessor( } } - private val executor = CryptoRetryingExecutor( - logger, - BackoffStrategy.createBackoff(config.maxAttempts, config.waitBetweenMills) - ) + private val executor = CryptoRetryingExecutor(logger, config.maxAttempts.toLong(), config.waitBetweenMills) override fun onNext(request: RpcOpsRequest, respFuture: CompletableFuture) { try { diff --git a/components/crypto/crypto-service-impl/src/main/kotlin/net/corda/crypto/service/impl/bus/HSMRegistrationBusProcessor.kt b/components/crypto/crypto-service-impl/src/main/kotlin/net/corda/crypto/service/impl/bus/HSMRegistrationBusProcessor.kt index f7c50f5943f..23a35e063d2 100644 --- a/components/crypto/crypto-service-impl/src/main/kotlin/net/corda/crypto/service/impl/bus/HSMRegistrationBusProcessor.kt +++ b/components/crypto/crypto-service-impl/src/main/kotlin/net/corda/crypto/service/impl/bus/HSMRegistrationBusProcessor.kt @@ -2,7 +2,6 @@ package net.corda.crypto.service.impl.bus import net.corda.crypto.config.impl.RetryingConfig import net.corda.crypto.core.CryptoService -import net.corda.crypto.impl.retrying.BackoffStrategy import net.corda.crypto.impl.retrying.CryptoRetryingExecutor import net.corda.crypto.softhsm.TenantInfoService import net.corda.data.crypto.wire.CryptoNoContentValue @@ -29,10 +28,7 @@ class HSMRegistrationBusProcessor( private val logger: Logger = LoggerFactory.getLogger(this::class.java.enclosingClass) } - private val executor = CryptoRetryingExecutor( - logger, - BackoffStrategy.createBackoff(config.maxAttempts, config.waitBetweenMills) - ) + private val executor = CryptoRetryingExecutor(logger, config.maxAttempts.toLong(), config.waitBetweenMills) override fun onNext(request: HSMRegistrationRequest, respFuture: CompletableFuture) { try { diff --git a/components/crypto/crypto-service-impl/src/main/kotlin/net/corda/crypto/service/impl/rpc/CryptoFlowOpsProcessor.kt b/components/crypto/crypto-service-impl/src/main/kotlin/net/corda/crypto/service/impl/rpc/CryptoFlowOpsProcessor.kt index 4cdff4b608d..c80721ae05e 100644 --- a/components/crypto/crypto-service-impl/src/main/kotlin/net/corda/crypto/service/impl/rpc/CryptoFlowOpsProcessor.kt +++ b/components/crypto/crypto-service-impl/src/main/kotlin/net/corda/crypto/service/impl/rpc/CryptoFlowOpsProcessor.kt @@ -6,7 +6,6 @@ import net.corda.crypto.core.CryptoService import net.corda.crypto.core.SecureHashImpl import net.corda.crypto.core.ShortHash import net.corda.crypto.core.publicKeyIdFromBytes -import net.corda.crypto.impl.retrying.BackoffStrategy import net.corda.crypto.impl.retrying.CryptoRetryingExecutor import net.corda.crypto.impl.toMap import net.corda.crypto.impl.toSignatureSpec @@ -51,10 +50,7 @@ class CryptoFlowOpsProcessor( private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass) } - private val executor = CryptoRetryingExecutor( - logger, - BackoffStrategy.createBackoff(config.maxAttempts, config.waitBetweenMills) - ) + private val executor = CryptoRetryingExecutor(logger, config.maxAttempts.toLong(), config.waitBetweenMills) override fun process(request: FlowOpsRequest): FlowEvent { logger.trace { "Processing request: ${request::class.java.name}" } diff --git a/components/crypto/crypto-service-impl/src/main/kotlin/net/corda/crypto/service/impl/rpc/SessionDecryptionProcessor.kt b/components/crypto/crypto-service-impl/src/main/kotlin/net/corda/crypto/service/impl/rpc/SessionDecryptionProcessor.kt index dd66e131e26..1a33a5de0ca 100644 --- a/components/crypto/crypto-service-impl/src/main/kotlin/net/corda/crypto/service/impl/rpc/SessionDecryptionProcessor.kt +++ b/components/crypto/crypto-service-impl/src/main/kotlin/net/corda/crypto/service/impl/rpc/SessionDecryptionProcessor.kt @@ -3,13 +3,12 @@ package net.corda.crypto.service.impl.rpc import net.corda.crypto.config.impl.RetryingConfig import net.corda.crypto.core.CryptoService import net.corda.crypto.core.CryptoTenants -import net.corda.crypto.impl.retrying.BackoffStrategy import net.corda.crypto.impl.retrying.CryptoRetryingExecutor import net.corda.data.ExceptionEnvelope import net.corda.data.crypto.wire.ops.encryption.request.DecryptRpcCommand import net.corda.data.crypto.wire.ops.encryption.response.CryptoDecryptionResult -import net.corda.data.crypto.wire.ops.encryption.response.EncryptionOpsError import net.corda.data.crypto.wire.ops.encryption.response.DecryptionOpsResponse +import net.corda.data.crypto.wire.ops.encryption.response.EncryptionOpsError import net.corda.messaging.api.processor.SyncRPCProcessor import net.corda.utilities.trace import org.slf4j.LoggerFactory @@ -23,13 +22,9 @@ class SessionDecryptionProcessor( private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass) } - private val executor = CryptoRetryingExecutor( - logger, - BackoffStrategy.createBackoff(config.maxAttempts, config.waitBetweenMills) - ) - override val requestClass = DecryptRpcCommand::class.java override val responseClass = DecryptionOpsResponse::class.java + private val executor = CryptoRetryingExecutor(logger, config.maxAttempts.toLong(), config.waitBetweenMills) override fun process(request: DecryptRpcCommand): DecryptionOpsResponse { logger.trace { "Processing request: ${request::class.java.name}" } diff --git a/components/crypto/crypto-service-impl/src/main/kotlin/net/corda/crypto/service/impl/rpc/SessionEncryptionProcessor.kt b/components/crypto/crypto-service-impl/src/main/kotlin/net/corda/crypto/service/impl/rpc/SessionEncryptionProcessor.kt index be4841405ee..2482a452ca1 100644 --- a/components/crypto/crypto-service-impl/src/main/kotlin/net/corda/crypto/service/impl/rpc/SessionEncryptionProcessor.kt +++ b/components/crypto/crypto-service-impl/src/main/kotlin/net/corda/crypto/service/impl/rpc/SessionEncryptionProcessor.kt @@ -3,7 +3,6 @@ package net.corda.crypto.service.impl.rpc import net.corda.crypto.config.impl.RetryingConfig import net.corda.crypto.core.CryptoService import net.corda.crypto.core.CryptoTenants -import net.corda.crypto.impl.retrying.BackoffStrategy import net.corda.crypto.impl.retrying.CryptoRetryingExecutor import net.corda.data.ExceptionEnvelope import net.corda.data.crypto.wire.ops.encryption.request.EncryptRpcCommand @@ -23,13 +22,9 @@ class SessionEncryptionProcessor( private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass) } - private val executor = CryptoRetryingExecutor( - logger, - BackoffStrategy.createBackoff(config.maxAttempts, config.waitBetweenMills) - ) - override val requestClass = EncryptRpcCommand::class.java override val responseClass = EncryptionOpsResponse::class.java + private val executor = CryptoRetryingExecutor(logger, config.maxAttempts.toLong(), config.waitBetweenMills) override fun process(request: EncryptRpcCommand): EncryptionOpsResponse { logger.trace { "Processing request: ${request::class.java.name}" } diff --git a/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/context/FlowServiceTestContext.kt b/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/context/FlowServiceTestContext.kt index 4a054bdf032..3d791d8d898 100644 --- a/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/context/FlowServiceTestContext.kt +++ b/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/context/FlowServiceTestContext.kt @@ -108,6 +108,7 @@ class FlowServiceTestContext @Activate constructor( private val testConfig = mutableMapOf( FlowConfig.EXTERNAL_EVENT_MAX_RETRIES to 2, + FlowConfig.PROCESSING_MAX_RETRY_ATTEMPTS to 5, FlowConfig.EXTERNAL_EVENT_MESSAGE_RESEND_WINDOW to 500000, FlowConfig.SESSION_TIMEOUT_WINDOW to 500000, FlowConfig.SESSION_FLOW_CLEANUP_TIME to 30000, diff --git a/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/FlowFinishedAcceptanceTest.kt b/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/FlowFinishedAcceptanceTest.kt index 7792278096d..7429ea2b691 100644 --- a/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/FlowFinishedAcceptanceTest.kt +++ b/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/FlowFinishedAcceptanceTest.kt @@ -3,7 +3,6 @@ package net.corda.flow.testing.tests import net.corda.data.flow.output.FlowStates import net.corda.flow.testing.context.ALICE_FLOW_KEY_MAPPER import net.corda.flow.testing.context.BOB_FLOW_KEY_MAPPER -import net.corda.flow.testing.context.CHARLIE_FLOW_KEY_MAPPER import net.corda.flow.testing.context.FlowServiceTestBase import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test @@ -53,39 +52,6 @@ class FlowFinishedAcceptanceTest : FlowServiceTestBase() { } } - @Test - fun `A flow finishing when previously in a retry state publishes a completed flow status and schedules flow cleanup`() { - // Trigger a retry state - `when` { - startFlowEventReceived(FLOW_ID1, REQUEST_ID1, CHARLIE_HOLDING_IDENTITY, CPI1, "flow start data") - } - - then { - expectOutputForFlow(FLOW_ID1) { - checkpointHasRetry(1) - } - } - - // Now add the missing vnode and trigger a flow completion - given { - virtualNode(CPI1, CHARLIE_HOLDING_IDENTITY) - membershipGroupFor(CHARLIE_HOLDING_IDENTITY) - } - - `when` { - startFlowEventReceived(FLOW_ID1, REQUEST_ID1, CHARLIE_HOLDING_IDENTITY, CPI1, "flow start data") - .completedSuccessfullyWith(DONE) - } - - then { - expectOutputForFlow(FLOW_ID1) { - nullStateRecord() - flowStatus(FlowStates.COMPLETED, result = DONE) - scheduleFlowMapperCleanupEvents(CHARLIE_FLOW_KEY_MAPPER) - } - } - } - @Test fun `A flow finishing with FlowFinished removes fiber from fiber cache`() { `when` { @@ -118,4 +84,4 @@ class FlowFinishedAcceptanceTest : FlowServiceTestBase() { } } } -} \ No newline at end of file +} diff --git a/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/StartFlowTest.kt b/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/StartFlowTest.kt index 1cd31de1b00..cf8edd16970 100644 --- a/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/StartFlowTest.kt +++ b/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/StartFlowTest.kt @@ -11,14 +11,15 @@ import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.parallel.Execution import org.junit.jupiter.api.parallel.ExecutionMode import org.osgi.test.junit5.service.ServiceExtension +import java.util.concurrent.CountDownLatch +import kotlin.concurrent.thread @ExtendWith(ServiceExtension::class) @Execution(ExecutionMode.SAME_THREAD) class StartFlowTest : FlowServiceTestBase() { @Test - fun `RPC Start Flow - Flow starts and updates stats to running`() { - + fun `Flow starts and runs to completion`() { given { virtualNode(CPI1, BOB_HOLDING_IDENTITY) cpkMetadata(CPI1, CPK1, CPK1_CHECKSUM) @@ -41,14 +42,12 @@ class StartFlowTest : FlowServiceTestBase() { } } - /** * When a virtual node has an INACTIVE StartFlowOperationalStatus, it should throw a FlowMarkedForKillException and * have a Killed status. */ @Test fun `Flow is marked as killed if startFlowOperationalStatus of vNode is INACTIVE`() { - given { virtualNode(CPI1, CHARLIE_HOLDING_IDENTITY, flowStartOperationalStatus = OperationalStatus.INACTIVE) cpkMetadata(CPI1, CPK1, CPK1_CHECKSUM) @@ -67,53 +66,46 @@ class StartFlowTest : FlowServiceTestBase() { flowStatus( state = FlowStates.KILLED, flowTerminatedReason = "flowStartOperationalStatus is INACTIVE, new flows cannot be started for " + - "virtual node with shortHash ${CHARLIE_HOLDING_IDENTITY.toCorda().shortHash}" + "virtual node with shortHash ${CHARLIE_HOLDING_IDENTITY.toCorda().shortHash}" ) } } } /** - * When a flow event fails with a transient exception then the flow will be put into a retry - * state. In this case the flow engine will publish the problematic event back to itself to be processed at some - * later time. This could then fail again, triggering the same loop. + * When a flow event fails with an internal transient exception, the flow engine will automatically retry the failed + * operation without publishing any extra messages to Kafka. Instead, it will try to re-process the event at a later + * time using exponential backoff. This could then fail again, triggering the same loop until a threshold is reached. * - * Scenario 1 - Fails multiple times but completes before the retry limit - * Scenario 2 - Fails multiple times and hits the retry limit failing the flow to the DLQ + * Scenario 1 - Fails multiple times, transient error is fixed before retry limit => flow completes. + * Scenario 2 - Fails multiple times, transient error persists, hits the retry limit => flow fails and DLQ event. */ @Test - fun `RPC Start Flow - Retry scenario 1 - Fail then succeeds`() { + fun `Retry scenario 1 - Transient exception is fixed within the retry limit and flow succeeds`() { given { cpkMetadata(CPI1, CPK1, CPK1_CHECKSUM) sandboxCpk(CPK1_CHECKSUM) membershipGroupFor(BOB_HOLDING_IDENTITY) } - `when` { - startFlowEventReceived(FLOW_ID1, REQUEST_ID1, BOB_HOLDING_IDENTITY, CPI1, "flow start data") - } - - then { - expectOutputForFlow(FLOW_ID1) { - checkpointHasRetry(1) - flowFiberCacheDoesNotContainKey(BOB_HOLDING_IDENTITY, REQUEST_ID1) + val latch = CountDownLatch(1) + val startFlow = thread { + latch.countDown() + `when` { + startFlowEventReceived(FLOW_ID1, REQUEST_ID1, BOB_HOLDING_IDENTITY, CPI1, "flow start data") + .suspendsWith(FlowIORequest.InitialCheckpoint) + .completedSuccessfullyWith("hello") } } - // Now fix the issue and expect the wake-up event to - // retry the start flow successfully - given { - virtualNode(CPI1, BOB_HOLDING_IDENTITY) - } - - `when` { - startFlowEventReceived(FLOW_ID1, REQUEST_ID1, BOB_HOLDING_IDENTITY, CPI1, "flow start data") - .completedSuccessfullyWith("hello") - } + latch.await() + Thread.sleep(1000) + testContext.virtualNode(CPI1, BOB_HOLDING_IDENTITY) + startFlow.join() then { expectOutputForFlow(FLOW_ID1) { - checkpointDoesNotHaveRetry() + nullStateRecord() flowStatus(FlowStates.COMPLETED, result = "hello") flowFiberCacheDoesNotContainKey(BOB_HOLDING_IDENTITY, REQUEST_ID1) } @@ -121,44 +113,35 @@ class StartFlowTest : FlowServiceTestBase() { } @Test - fun `RPC Start Flow - Retry scenario 2 - Hit the retry limit and fail the flow`() { + fun `Retry scenario 2 - Transient exception is not fixed within the retry limit and flow permanently fails`() { given { cpkMetadata(CPI1, CPK1, CPK1_CHECKSUM) sandboxCpk(CPK1_CHECKSUM) membershipGroupFor(BOB_HOLDING_IDENTITY) - flowConfiguration(FlowConfig.PROCESSING_MAX_RETRY_WINDOW_DURATION, 0) + flowConfiguration(FlowConfig.PROCESSING_MAX_RETRY_ATTEMPTS, 1) } `when` { startFlowEventReceived(FLOW_ID1, REQUEST_ID1, BOB_HOLDING_IDENTITY, CPI1, "flow start data") } - then { - expectOutputForFlow(FLOW_ID1) { - checkpointHasRetry(1) - flowFiberCacheDoesNotContainKey(BOB_HOLDING_IDENTITY, REQUEST_ID1) - } - } - - `when` { - startFlowEventReceived(FLOW_ID1, REQUEST_ID1, BOB_HOLDING_IDENTITY, CPI1, "flow start data") - } - then { expectOutputForFlow(FLOW_ID1) { nullStateRecord() markedForDlq() noFlowEvents() flowFiberCacheDoesNotContainKey(BOB_HOLDING_IDENTITY, REQUEST_ID1) - //we can't return a status record after the change to checkpoint initialization + // we can't return a status record after the change to checkpoint initialization // Story to deal with change in status records -> CORE-10571: Re-design how status record is published -/* flowStatus( + /* + flowStatus( state = FlowStates.FAILED, errorType = FlowProcessingExceptionTypes.FLOW_FAILED, errorMessage = "Execution failed with \"Failed to find the virtual node info for holder " + - "'HoldingIdentity(x500Name=${BOB_HOLDING_IDENTITY.x500Name}, groupId=${BOB_HOLDING_IDENTITY.groupId})'\" " + - "after 1 retry attempts." - )*/ + "'HoldingIdentity(x500Name=${BOB_HOLDING_IDENTITY.x500Name}, groupId=${BOB_HOLDING_IDENTITY.groupId})'\" " + + "after 1 retry attempts." + ) + */ } } } diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/FlowEventExceptionProcessor.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/FlowEventExceptionProcessor.kt index a2bde9893ef..e340a74e049 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/FlowEventExceptionProcessor.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/FlowEventExceptionProcessor.kt @@ -6,7 +6,6 @@ import net.corda.flow.pipeline.exceptions.FlowEventException import net.corda.flow.pipeline.exceptions.FlowFatalException import net.corda.flow.pipeline.exceptions.FlowMarkedForKillException import net.corda.flow.pipeline.exceptions.FlowPlatformException -import net.corda.flow.pipeline.exceptions.FlowTransientException import net.corda.libs.configuration.SmartConfig /** @@ -34,18 +33,6 @@ interface FlowEventExceptionProcessor { */ fun process(throwable: Throwable, context: FlowEventContext<*>): FlowEventContext<*> - /** - * Processes a [FlowTransientException] and provides the pipeline response. - * - * Used to handle event retries. - * - * @param exception The [FlowTransientException] thrown during processing - * @param context The [FlowEventContext] at the point of failure. - * - * @return The updated context. - */ - fun process(exception: FlowTransientException, context: FlowEventContext<*>): FlowEventContext<*> - /** * Processes a [FlowFatalException] and provides the pipeline response. * diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/exceptions/FlowTransientException.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/exceptions/FlowTransientException.kt index 897694bc60a..61ed7ff55ca 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/exceptions/FlowTransientException.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/exceptions/FlowTransientException.kt @@ -3,9 +3,8 @@ package net.corda.flow.pipeline.exceptions import net.corda.v5.base.exceptions.CordaRuntimeException /** - * The [FlowTransientException] is thrown for a recoverable error, this exception will cause the event processing - * to be retried + * The [FlowTransientException] should only be thrown for transient errors that affect the flow engine and any of its + * dependant services, it will cause the pipeline execution to be automatically retried at source. */ class FlowTransientException(override val message: String, cause: Throwable? = null) : CordaRuntimeException(message, cause) - diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImpl.kt index 80c8fdc98c1..37dbd4efed4 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImpl.kt @@ -11,7 +11,6 @@ import net.corda.flow.pipeline.exceptions.FlowFatalException import net.corda.flow.pipeline.exceptions.FlowMarkedForKillException import net.corda.flow.pipeline.exceptions.FlowPlatformException import net.corda.flow.pipeline.exceptions.FlowProcessingExceptionTypes.PLATFORM_ERROR -import net.corda.flow.pipeline.exceptions.FlowTransientException import net.corda.flow.pipeline.factory.FlowMessageFactory import net.corda.flow.pipeline.factory.FlowRecordFactory import net.corda.flow.pipeline.sessions.FlowSessionManager @@ -64,53 +63,6 @@ class FlowEventExceptionProcessorImpl @Activate constructor( ) } - override fun process( - exception: FlowTransientException, - context: FlowEventContext<*> - ): FlowEventContext<*> { - return withEscalation(context) { - val flowCheckpoint = context.checkpoint - - /** If the retry window has expired then we escalate this to a fatal exception and DLQ the flow */ - if (retryWindowExpired(flowCheckpoint.firstFailureTimestamp)) { - return@withEscalation process( - FlowFatalException( - "Execution failed with \"${exception.message}\" after " + - "${flowCheckpoint.currentRetryCount} retry attempts in a retry window of $maxRetryWindowDuration.", - exception - ), context - ) - } - - log.info("Flow ${context.checkpoint.flowId} encountered a transient problem and is retrying: ${exception.message}") - - val payload = context.inputEventPayload ?: return@withEscalation process( - FlowFatalException( - "Could not process a retry as the input event has no payload.", - exception - ), context - ) - - /** - * As we're still inside the retry window, republish the record that needs retrying here. If the system is - * under load, there may be some delay before it is retried. This is reasonable however, as the system may - * need to wait for the underlying transient problem to clear up. - */ - val records = createStatusRecord(context.checkpoint.flowId) { - flowMessageFactory.createFlowRetryingStatusMessage(context.checkpoint) - } + flowRecordFactory.createFlowEventRecord(context.checkpoint.flowId, payload) - - // Set up records before the rollback, just in case a transient exception happens after a flow is initialised - // but before the first checkpoint has been recorded. - flowCheckpoint.rollback() - flowCheckpoint.markForRetry(context.inputEvent, exception) - - removeCachedFlowFiber(flowCheckpoint) - - context.copy(outputRecords = context.outputRecords + records) - } - } - private fun retryWindowExpired(firstFailureTimestamp: Instant?): Boolean { return firstFailureTimestamp != null && Duration.between(firstFailureTimestamp, Instant.now()) >= maxRetryWindowDuration diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowEventProcessorImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowEventProcessorImpl.kt index f7caeab4e81..68f098ee0d3 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowEventProcessorImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowEventProcessorImpl.kt @@ -13,19 +13,25 @@ import net.corda.flow.pipeline.exceptions.FlowPlatformException import net.corda.flow.pipeline.exceptions.FlowTransientException import net.corda.flow.pipeline.factory.FlowEventPipelineFactory import net.corda.flow.pipeline.handlers.FlowPostProcessingHandler +import net.corda.flow.state.FlowCheckpoint import net.corda.libs.configuration.SmartConfig import net.corda.messaging.api.processor.StateAndEventProcessor import net.corda.messaging.api.processor.StateAndEventProcessor.State import net.corda.messaging.api.records.Record import net.corda.schema.configuration.ConfigKeys.FLOW_CONFIG +import net.corda.schema.configuration.FlowConfig import net.corda.schema.configuration.MessagingConfig.Subscription.PROCESSOR_TIMEOUT import net.corda.tracing.TraceContext import net.corda.tracing.traceStateAndEventExecution import net.corda.utilities.debug +import net.corda.utilities.retry.Exponential +import net.corda.utilities.retry.tryWithBackoff import net.corda.utilities.trace import net.corda.utilities.withMDC import net.corda.v5.base.exceptions.CordaRuntimeException +import org.slf4j.Logger import org.slf4j.LoggerFactory + @Suppress("LongParameterList") class FlowEventProcessorImpl( private val flowEventPipelineFactory: FlowEventPipelineFactory, @@ -37,7 +43,7 @@ class FlowEventProcessorImpl( ) : StateAndEventProcessor { private companion object { - val log = LoggerFactory.getLogger(this::class.java.enclosingClass) + val log: Logger = LoggerFactory.getLogger(this::class.java.enclosingClass) } override val keyClass = String::class.java @@ -110,14 +116,24 @@ class FlowEventProcessorImpl( // thread after this period and so this timeout would never be reached and given a chance to return otherwise. val flowTimeout = (flowConfig.getLong(PROCESSOR_TIMEOUT) * 0.75).toLong() val result = try { - pipeline - .eventPreProcessing() - .virtualNodeFlowOperationalChecks() - .executeFlow(flowTimeout) - .globalPostProcessing() - .context - } catch (e: FlowTransientException) { - flowEventExceptionProcessor.process(e, pipeline.context) + tryWithBackoff( + logger = log, + maxRetries = flowConfig.getLong(FlowConfig.PROCESSING_MAX_RETRY_ATTEMPTS), + maxTimeMillis = flowConfig.getLong(FlowConfig.PROCESSING_MAX_RETRY_WINDOW_DURATION), + // Exponential backoff -> 500ms, 1s, 2s, 4s, 8s, etc. + backoffStrategy = Exponential(base = 2.0, growthFactor = 250L), + // Only FlowTransientException will be retried + shouldRetry = { _, _, t -> t is FlowTransientException }, + onRetryAttempt = { n, d, t -> logRetryAndRollbackCheckpoint(pipeline.context.checkpoint, n, d, t) }, + onRetryExhaustion = { r, e, t -> giveUpAndThrowFlowFatalException(r, e, t) }, + ) { + pipeline + .eventPreProcessing() + .virtualNodeFlowOperationalChecks() + .executeFlow(flowTimeout) + .globalPostProcessing() + .context + } } catch (e: FlowEventException) { flowEventExceptionProcessor.process(e, pipeline.context) } catch (e: FlowPlatformException) { @@ -147,4 +163,41 @@ class FlowEventProcessorImpl( result.copy(outputRecords = result.outputRecords + cleanupEvents) ) } + + /** + * Executed within the [tryWithBackoff] function whenever a retry attempt is about to be made. + * We simply log the attempt under INFO level and rollback the checkpoint. + * + * @param flowCheckpoint current flow checkpoint. + * @param retryNumber retry attempt number, starting in 1. + * @param delayMillis delay before the next retry attempt is made. + * @param throwable original exception thrown while executing retry attempt number [retryNumber]. + */ + private fun logRetryAndRollbackCheckpoint( + flowCheckpoint: FlowCheckpoint, + retryNumber: Int, delayMillis: Long, throwable: Throwable + ) { + log.info( + "Flow ${flowCheckpoint.flowId} encountered a transient error (attempt $retryNumber) and will retry " + + "after $delayMillis milliseconds: ${throwable.message}" + ) + flowCheckpoint.rollback() + } + + /** + * Executed within the [tryWithBackoff] function when all retry attempts has been exhausted. + * + * @param retryCount total amount of retry attempts. + * @param elapsedTime total amount of time spent retrying the transient exception. + * @param throwable original exception thrown while executing the last retry attempt. + */ + private fun giveUpAndThrowFlowFatalException( + retryCount: Int, elapsedTime: Long, throwable: Throwable + ): CordaRuntimeException { + return FlowFatalException( + "Execution failed with \"${throwable.message}\" after $retryCount retry attempts in a " + + "retry window of $elapsedTime.", + throwable + ) + } } diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/TestConstants.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/TestConstants.kt index 1063efb65c2..633cd0c6359 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/TestConstants.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/TestConstants.kt @@ -3,22 +3,25 @@ package net.corda.flow import com.typesafe.config.ConfigFactory import net.corda.data.identity.HoldingIdentity import net.corda.libs.configuration.SmartConfigFactory +import net.corda.schema.configuration.FlowConfig import net.corda.schema.configuration.MessagingConfig import net.corda.v5.base.types.MemberX500Name -val BOB_X500 = "CN=Bob, O=Bob Corp, L=LDN, C=GB" -val ALICE_X500 = "CN=Alice, O=Alice Corp, L=LDN, C=GB" +const val BOB_X500 = "CN=Bob, O=Bob Corp, L=LDN, C=GB" +const val ALICE_X500 = "CN=Alice, O=Alice Corp, L=LDN, C=GB" val BOB_X500_NAME = MemberX500Name.parse(BOB_X500) val ALICE_X500_NAME = MemberX500Name.parse(ALICE_X500) val BOB_X500_HOLDING_IDENTITY = HoldingIdentity(BOB_X500, "group1") val ALICE_X500_HOLDING_IDENTITY = HoldingIdentity(ALICE_X500, "group1") -val SESSION_ID_1 = "S1" -val FLOW_ID_1 = "F1" -val REQUEST_ID_1 ="R1" +const val SESSION_ID_1 = "S1" +const val FLOW_ID_1 = "F1" +const val REQUEST_ID_1 ="R1" val MINIMUM_SMART_CONFIG = SmartConfigFactory.createWithoutSecurityServices().create( ConfigFactory.parseMap( mapOf( + FlowConfig.PROCESSING_MAX_RETRY_ATTEMPTS to 5, + FlowConfig.PROCESSING_MAX_RETRY_WINDOW_DURATION to 10000, MessagingConfig.Subscription.PROCESSOR_TIMEOUT to 60000 ) ) diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImplTest.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImplTest.kt index 0653760d8c7..6c4b8ab4bbb 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImplTest.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImplTest.kt @@ -4,13 +4,9 @@ import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigValueFactory import net.corda.data.flow.FlowKey import net.corda.data.flow.FlowStartContext -import net.corda.data.flow.event.FlowEvent import net.corda.data.flow.event.external.ExternalEventResponse import net.corda.data.flow.event.mapper.FlowMapperEvent -import net.corda.data.flow.output.FlowStatus import net.corda.data.flow.state.checkpoint.Checkpoint -import net.corda.data.flow.state.session.SessionState -import net.corda.data.flow.state.session.SessionStateType import net.corda.data.flow.state.waiting.WaitingFor import net.corda.flow.fiber.cache.FlowFiberCache import net.corda.flow.maintenance.CheckpointCleanupHandler @@ -41,7 +37,6 @@ import org.mockito.kotlin.times import org.mockito.kotlin.verify import org.mockito.kotlin.whenever import java.nio.ByteBuffer -import java.time.Instant class FlowEventExceptionProcessorImplTest { private val flowMessageFactory = mock() @@ -65,14 +60,6 @@ class FlowEventExceptionProcessorImplTest { private val checkpointCleanupHandler = mock() private val sessionIdOpen = "sesh-id" - private val sessionIdClosed = "sesh-id-closed" - private val flowActiveSessionState = SessionState().apply { - sessionId = sessionIdOpen - status = SessionStateType.CONFIRMED - hasScheduledCleanup = false - } - private val flowInactiveSessionState = - SessionState().apply { sessionId = sessionIdClosed; status = SessionStateType.CLOSED; hasScheduledCleanup = true } private val target = FlowEventExceptionProcessorImpl( flowMessageFactory, @@ -100,75 +87,6 @@ class FlowEventExceptionProcessorImplTest { assertThat(result.outputRecords).isEmpty() } - @Test - fun `flow transient exception sets retry state and publishes a status update`() { - val error = FlowTransientException("error") - val flowStatusUpdate = FlowStatus() - val key = FlowKey() - val flowStatusUpdateRecord = Record("", key, flowStatusUpdate) - val flowId = "f1" - val flowEventRecord = Record("", flowId, FlowEvent(flowId, ExternalEventResponse())) - whenever(flowCheckpoint.flowId).thenReturn(flowId) - whenever(flowCheckpoint.currentRetryCount).thenReturn(1) - whenever(flowCheckpoint.suspendCount).thenReturn(123) - whenever(flowMessageFactory.createFlowRetryingStatusMessage(flowCheckpoint)).thenReturn(flowStatusUpdate) - whenever(flowRecordFactory.createFlowStatusRecord(flowStatusUpdate)).thenReturn(flowStatusUpdateRecord) - whenever(flowCheckpoint.doesExist).thenReturn(true) - whenever(flowCheckpoint.flowKey).thenReturn(key) - whenever(flowRecordFactory.createFlowEventRecord(flowId, ExternalEventResponse())).thenReturn(flowEventRecord) - - val result = target.process(error, context) - - verify(flowFiberCache).remove(key) - verify(result.checkpoint).rollback() - verify(result.checkpoint).markForRetry(context.inputEvent, error) - assertThat(result.outputRecords).containsOnly(flowStatusUpdateRecord, flowEventRecord) - } - - @Test - fun `flow transient exception when doesExist false does not remove from flow fiber cache`() { - val error = FlowTransientException("error") - val flowStatusUpdate = FlowStatus() - val flowStatusUpdateRecord = Record("", FlowKey(), flowStatusUpdate) - val flowId = "f1" - val flowEventRecord = Record("", flowId, FlowEvent(flowId, ExternalEventResponse())) - whenever(flowCheckpoint.flowId).thenReturn(flowId) - whenever(flowCheckpoint.currentRetryCount).thenReturn(1) - whenever(flowMessageFactory.createFlowRetryingStatusMessage(flowCheckpoint)).thenReturn(flowStatusUpdate) - whenever(flowRecordFactory.createFlowStatusRecord(flowStatusUpdate)).thenReturn(flowStatusUpdateRecord) - whenever(flowRecordFactory.createFlowEventRecord(flowId, ExternalEventResponse())).thenReturn(flowEventRecord) - whenever(flowCheckpoint.doesExist).thenReturn(false) - - val result = target.process(error, context) - - verify(result.checkpoint).rollback() - verify(result.checkpoint).markForRetry(context.inputEvent, error) - assertThat(result.outputRecords).containsOnly(flowStatusUpdateRecord, flowEventRecord) - } - - @Test - fun `flow transient exception processed as fatal when retry window expired`() { - val error = FlowTransientException("mock error message") - val flowStatusUpdate = FlowStatus() - val flowStatusUpdateRecord = Record("", FlowKey(), flowStatusUpdate) - val retryCount = 2 - val now = Instant.now() - whenever(flowCheckpoint.currentRetryCount).thenReturn(retryCount) - whenever(flowCheckpoint.firstFailureTimestamp).thenReturn(now.minusMillis(2000)) - whenever( - flowMessageFactory.createFlowFailedStatusMessage( - flowCheckpoint, - FlowProcessingExceptionTypes.FLOW_FAILED, - "Execution failed with \"${error.message}\" after $retryCount retry attempts in a retry window of PT1S.", - ) - ).thenReturn(flowStatusUpdate) - whenever(flowRecordFactory.createFlowStatusRecord(flowStatusUpdate)).thenReturn(flowStatusUpdateRecord) - - target.process(error, context) - - verify(checkpointCleanupHandler).cleanupCheckpoint(eq(flowCheckpoint), any(), any()) - } - @Test fun `flow fatal exception marks flow for dlq and publishes status update`() { val flowId = "f1" @@ -226,23 +144,6 @@ class FlowEventExceptionProcessorImplTest { verify(result.checkpoint).setPendingPlatformError(FlowProcessingExceptionTypes.PLATFORM_ERROR, error.message) } - @Test - fun `failure to create a status message does not prevent transient failure handling from succeeding`() { - val error = FlowTransientException("error") - val flowId = "f1" - val flowEventRecord = Record("", flowId, FlowEvent(flowId, ExternalEventResponse())) - whenever(flowCheckpoint.flowId).thenReturn(flowId) - whenever(flowCheckpoint.currentRetryCount).thenReturn(1) - whenever(flowMessageFactory.createFlowRetryingStatusMessage(flowCheckpoint)).thenThrow(IllegalStateException()) - whenever(flowRecordFactory.createFlowEventRecord(flowId, ExternalEventResponse())).thenReturn(flowEventRecord) - - val result = target.process(error, context) - - verify(flowCheckpoint).rollback() - verify(flowCheckpoint).markForRetry(context.inputEvent, error) - assertThat(result.outputRecords).containsOnly(flowEventRecord) - } - @Test fun `throwable triggered during transient exception processing does not escape the processor`() { val throwable = RuntimeException() @@ -302,4 +203,4 @@ class FlowEventExceptionProcessorImplTest { assertThat(result.outputRecords).isEmpty() assertThat(result.sendToDlq).isTrue } -} \ No newline at end of file +} diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowEventProcessorImplTest.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowEventProcessorImplTest.kt index a86febaafe7..e2024dd87df 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowEventProcessorImplTest.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowEventProcessorImplTest.kt @@ -21,6 +21,7 @@ import net.corda.flow.pipeline.FlowEventExceptionProcessor import net.corda.flow.pipeline.FlowEventPipeline import net.corda.flow.pipeline.FlowMDCService import net.corda.flow.pipeline.converters.FlowEventContextConverter +import net.corda.flow.pipeline.events.FlowEventContext import net.corda.flow.pipeline.exceptions.FlowEventException import net.corda.flow.pipeline.exceptions.FlowFatalException import net.corda.flow.pipeline.exceptions.FlowMarkedForKillException @@ -50,7 +51,6 @@ import org.mockito.kotlin.whenever import java.time.Instant class FlowEventProcessorImplTest { - private val payload = ExternalEventResponse() private val aliceHoldingIdentity = HoldingIdentity("CN=Alice, O=Alice Corp, L=LDN, C=GB", "1") private val bobHoldingIdentity = HoldingIdentity("CN=Bob, O=Alice Corp, L=LDN, C=GB", "1") @@ -195,17 +195,30 @@ class FlowEventProcessorImplTest { } @Test - fun `Flow transient exception is handled`() { + fun `flow transient exception is processed as fatal when retry window expired`() { val error = FlowTransientException("") whenever(flowEventPipeline.eventPreProcessing()).thenThrow(error) - whenever(flowEventExceptionProcessor.process(error, flowEventPipeline.context)).thenReturn(errorContext) + whenever(flowEventExceptionProcessor.process(any(), any>())) + .thenReturn(errorContext) val response = processor.onNext(state, getFlowEventRecord(FlowEvent(flowKey, payload))) assertThat(response).isEqualTo(errorResponse) } + @Test + fun `flow transient exception is retried at source and no extra output records are generated`() { + val error = FlowTransientException("") + + whenever(flowEventPipeline.eventPreProcessing()).thenThrow(error).thenReturn(flowEventPipeline) + whenever(flowEventExceptionProcessor.process(error, flowEventPipeline.context)).thenReturn(errorContext) + + val response = processor.onNext(state, getFlowEventRecord(FlowEvent(flowKey, payload))) + + assertThat(response).isEqualTo(outputResponse) + } + @Test fun `Flow event exception is handled`() { val error = FlowEventException("") @@ -386,4 +399,4 @@ class FlowEventProcessorImplTest { private fun getFlowEventRecord(flowEvent: FlowEvent?): Record { return Record(FLOW_SESSION, flowKey, flowEvent) } -} \ No newline at end of file +} diff --git a/libs/crypto/crypto-impl/src/main/kotlin/net/corda/crypto/impl/retrying/BackoffStrategy.kt b/libs/crypto/crypto-impl/src/main/kotlin/net/corda/crypto/impl/retrying/BackoffStrategy.kt deleted file mode 100644 index c9f10b9327c..00000000000 --- a/libs/crypto/crypto-impl/src/main/kotlin/net/corda/crypto/impl/retrying/BackoffStrategy.kt +++ /dev/null @@ -1,51 +0,0 @@ -package net.corda.crypto.impl.retrying - -/** - * Strategy to provide backoff delays when handling transient faults by retrying. - */ -fun interface BackoffStrategy { - companion object { - /** - * Creates backoff strategy. If the number of attempts is less than max attempts then - * the last values is repeated. If the backoff is empty then the time is set to zero. - */ - @JvmStatic - fun createBackoff(maxAttempts: Int, backoff: List): BackoffStrategy = when { - maxAttempts <= 1 -> Default(emptyArray()) - backoff.isEmpty() -> createBackoff(maxAttempts, listOf(0L)) - else -> Default( - Array(maxAttempts - 1) { - if (it < backoff.size) { - backoff[it] - } else { - backoff[backoff.size - 1] - } - } - ) - } - } - - /** - * Returns the next wait period in milliseconds for the given attempt. - * The return value of -1 would mean that there is no further attempts to retry. - * - * @param attempt - the current attempt which failed, starts at 1. - */ - fun getBackoff(attempt: Int): Long - - /** - * Default implementation of the [BackoffStrategy] - * - * @property backoff defines the wait times between each attempt, the number of max attempts is backoff size plus 1. - */ - class Default( - private val backoff: Array - ) : BackoffStrategy { - override fun getBackoff(attempt: Int): Long = - if (attempt < 1 || attempt > backoff.size) { - -1 - } else { - backoff[attempt - 1] - } - } -} diff --git a/libs/crypto/crypto-impl/src/main/kotlin/net/corda/crypto/impl/retrying/CryptoRetryingExecutor.kt b/libs/crypto/crypto-impl/src/main/kotlin/net/corda/crypto/impl/retrying/CryptoRetryingExecutor.kt index 499ae477760..170def89e8f 100644 --- a/libs/crypto/crypto-impl/src/main/kotlin/net/corda/crypto/impl/retrying/CryptoRetryingExecutor.kt +++ b/libs/crypto/crypto-impl/src/main/kotlin/net/corda/crypto/impl/retrying/CryptoRetryingExecutor.kt @@ -2,23 +2,36 @@ package net.corda.crypto.impl.retrying import net.corda.crypto.core.CryptoRetryException import net.corda.crypto.core.isRecoverable -import net.corda.utilities.debug +import net.corda.utilities.retry.BackoffStrategy +import net.corda.utilities.retry.FixedSequence +import net.corda.utilities.retry.tryWithBackoff import org.slf4j.Logger -import java.util.UUID /** * Basic block executor with the retry behaviour. */ open class CryptoRetryingExecutor( private val logger: Logger, - private val strategy: BackoffStrategy + private val maxAttempts: Long, + waitBetweenMills: List ) { - companion object { - const val CRYPTO_MAX_ATTEMPT_GUARD: Int = 10 - } + private val backoffStrategy: BackoffStrategy init { - logger.debug { "Using ${strategy::class.java.name} retry strategy." } + val delays: List = when { + maxAttempts <= 1 -> emptyList() + waitBetweenMills.isEmpty() -> List(maxAttempts.toInt()) { 0L } + else -> + List(maxAttempts.toInt() - 1) { + if (it < waitBetweenMills.size) { + waitBetweenMills[it] + } else { + waitBetweenMills[waitBetweenMills.size - 1] + } + } + } + + backoffStrategy = FixedSequence(delays) } /** @@ -29,50 +42,21 @@ open class CryptoRetryingExecutor( */ @Suppress("NestedBlockDepth") fun executeWithRetry(block: () -> R): R { - var attempt = 1 - var op = "" - while (true) { - try { - if (attempt > 1) { - logger.info("Retrying operation (op={},attempt={})", op, attempt) - } - val result = execute(block) - if (attempt > 1) { - logger.info("Retrying was successful (op={},attempt={})", op, attempt) - } - return result - } catch (e: Throwable) { - if (!e.isRecoverable()) { - // the exception is not recoverable, no point in retrying - logCompleteFailure(attempt, op) - // throws the original exception - throw e - } - val backoff = strategy.getBackoff(attempt) - if (backoff < 0 || attempt > CRYPTO_MAX_ATTEMPT_GUARD) { - // the strategy is exhausted, giving up - logCompleteFailure(attempt, op) - // throws the CryptoRetryException only because the original exception was recoverable - throw CryptoRetryException("Failed to execute on attempt=$attempt", e) - } else { - attempt++ - if(op.isEmpty()) { - op = UUID.randomUUID().toString() - } - logger.warn( - "Failed to execute, will retry after $backoff milliseconds (op=$op,attempt=$attempt)", - e - ) - // sleep for a little while and then retry - Thread.sleep(backoff) - } - } + return tryWithBackoff( + logger = logger, + maxRetries = maxAttempts, + maxTimeMillis = Long.MAX_VALUE, + backoffStrategy = backoffStrategy, + shouldRetry = { _, _, throwable -> throwable.isRecoverable() }, + onRetryAttempt = { attempt, delay, _ -> + logger.warn("Failed to execute, will retry after $delay milliseconds (attempt=$attempt)") + }, + onRetryExhaustion = { attempt, _, throwable -> + logger.warn("Failed to execute (attempt={})", attempt) + CryptoRetryException("Failed to execute on attempt=$attempt", throwable) + }, + ) { + block() } } - - private fun logCompleteFailure(attempt: Int, opId: String) { - logger.warn("Failed to execute (opId={},attempt={})", opId, attempt) - } - - protected open fun execute(block: () -> R): R = block() } diff --git a/libs/crypto/crypto-impl/src/main/kotlin/net/corda/crypto/impl/retrying/CryptoRetryingExecutorWithTimeout.kt b/libs/crypto/crypto-impl/src/main/kotlin/net/corda/crypto/impl/retrying/CryptoRetryingExecutorWithTimeout.kt deleted file mode 100644 index 0f46a0f3349..00000000000 --- a/libs/crypto/crypto-impl/src/main/kotlin/net/corda/crypto/impl/retrying/CryptoRetryingExecutorWithTimeout.kt +++ /dev/null @@ -1,19 +0,0 @@ -package net.corda.crypto.impl.retrying - -import net.corda.utilities.concurrent.getOrThrow -import net.corda.utilities.concurrent.SecManagerForkJoinPool -import org.slf4j.Logger -import java.time.Duration -import java.util.concurrent.CompletableFuture - -/** - * Block executor with the retry behaviour and timeout limit for each attempt. - */ -class CryptoRetryingExecutorWithTimeout( - logger: Logger, - strategy: BackoffStrategy, - private val attemptTimeout: Duration?, -) : CryptoRetryingExecutor(logger, strategy) { - override fun execute(block: () -> R): R = - CompletableFuture.supplyAsync(block, SecManagerForkJoinPool.pool).getOrThrow(attemptTimeout) -} diff --git a/libs/crypto/crypto-impl/src/test/kotlin/net/corda/crypto/impl/retrying/BackoffStrategyTests.kt b/libs/crypto/crypto-impl/src/test/kotlin/net/corda/crypto/impl/retrying/BackoffStrategyTests.kt deleted file mode 100644 index b4183a90db8..00000000000 --- a/libs/crypto/crypto-impl/src/test/kotlin/net/corda/crypto/impl/retrying/BackoffStrategyTests.kt +++ /dev/null @@ -1,52 +0,0 @@ -package net.corda.crypto.impl.retrying - -import net.corda.crypto.impl.retrying.BackoffStrategy.Companion.createBackoff -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test - -class BackoffStrategyTests { - @Test - fun `Should return customizes backoff`() { - val strategy = createBackoff(3, listOf(100, 200)) - var backoff = strategy.getBackoff(1) - assertEquals(100L, backoff) - backoff = strategy.getBackoff(2) - assertEquals(200L, backoff) - backoff = strategy.getBackoff(3) - assertEquals(-1L, backoff) - } - - @Test - fun `Should return customizes backoff with repeating value`() { - val strategy = createBackoff(3, listOf(300)) - var backoff = strategy.getBackoff(1) - assertEquals(300L, backoff) - backoff = strategy.getBackoff(2) - assertEquals(300L, backoff) - backoff = strategy.getBackoff(3) - assertEquals(-1L, backoff) - } - - @Test - fun `Should return customizes backoff for attempts 0 and empty list`() { - val strategy = createBackoff(0, emptyList()) - val backoff = strategy.getBackoff(1) - assertEquals(-1L, backoff) - } - - @Test - fun `Should return customizes backoff for attempts 1 and empty list`() { - val strategy = createBackoff(1, emptyList()) - val backoff = strategy.getBackoff(1) - assertEquals(-1L, backoff) - } - - @Test - fun `Should return customizes backoff for attempts 2 and empty list`() { - val strategy = createBackoff(2, emptyList()) - var backoff = strategy.getBackoff(1) - assertEquals(0L, backoff) - backoff = strategy.getBackoff(2) - assertEquals(-1L, backoff) - } -} \ No newline at end of file diff --git a/libs/crypto/crypto-impl/src/test/kotlin/net/corda/crypto/impl/retrying/CryptoRetryingExecutorsTests.kt b/libs/crypto/crypto-impl/src/test/kotlin/net/corda/crypto/impl/retrying/CryptoRetryingExecutorsTests.kt index 84e066faed5..32c5e05e162 100644 --- a/libs/crypto/crypto-impl/src/test/kotlin/net/corda/crypto/impl/retrying/CryptoRetryingExecutorsTests.kt +++ b/libs/crypto/crypto-impl/src/test/kotlin/net/corda/crypto/impl/retrying/CryptoRetryingExecutorsTests.kt @@ -8,7 +8,6 @@ import org.junit.jupiter.api.assertThrows import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.MethodSource import org.slf4j.LoggerFactory -import java.time.Duration import java.util.concurrent.TimeoutException import javax.persistence.LockTimeoutException import javax.persistence.OptimisticLockException @@ -17,7 +16,6 @@ import javax.persistence.PessimisticLockException import javax.persistence.QueryTimeoutException class CryptoRetryingExecutorsTests { - private val defaultRetryTimeout = Duration.ofSeconds(5) companion object { private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass) @@ -65,12 +63,16 @@ class CryptoRetryingExecutorsTests { PersistenceException("error", PessimisticLockException()), PersistenceException("error", java.sql.SQLTransientException()), PersistenceException("error", java.sql.SQLTimeoutException()), - PersistenceException("error", org.hibernate.exception.LockAcquisitionException( - "error", java.sql.SQLException() - )), - PersistenceException("error", org.hibernate.exception.LockTimeoutException( - "error", java.sql.SQLException() - )) + PersistenceException( + "error", org.hibernate.exception.LockAcquisitionException( + "error", java.sql.SQLException() + ) + ), + PersistenceException( + "error", org.hibernate.exception.LockTimeoutException( + "error", java.sql.SQLException() + ) + ) ) } @@ -78,24 +80,7 @@ class CryptoRetryingExecutorsTests { fun `Should execute without retrying`() { var called = 0 val result = CryptoRetryingExecutor( - logger, - BackoffStrategy.createBackoff(3, listOf(100L)) - ).executeWithRetry { - called++ - "Hello World!" - } - - assertThat(called).isEqualTo(1) - assertThat(result).isEqualTo("Hello World!") - } - - @Test - fun `Should execute withTimeout without retrying`() { - var called = 0 - val result = CryptoRetryingExecutorWithTimeout( - logger, - BackoffStrategy.createBackoff(3, listOf(100L)), - defaultRetryTimeout + logger, 3, listOf(100L) ).executeWithRetry { called++ "Hello World!" @@ -105,65 +90,12 @@ class CryptoRetryingExecutorsTests { assertThat(result).isEqualTo("Hello World!") } - @Test - fun `CryptoRetryingExecutorWithTimeout should throw CryptoRetryException`() { - var called = 0 - assertThrows { - CryptoRetryingExecutorWithTimeout( - logger, - BackoffStrategy.createBackoff(1, listOf(100L)), - Duration.ofMillis(10) - ).executeWithRetry { - called++ - Thread.sleep(100) - } - } - - assertThat(called).isEqualTo(1) - } - - @ParameterizedTest - @MethodSource("mostCommonUnrecoverableExceptions") - fun `CryptoRetryingExecutorWithTimeout should not retry common exceptions`(e: Throwable) { - var called = 0 - val actual = assertThrows { - CryptoRetryingExecutorWithTimeout( - logger, - BackoffStrategy.createBackoff(3, listOf(100L)), - defaultRetryTimeout - ).executeWithRetry { - called++ - throw e - } - } - - assertThat(called).isEqualTo(1) - assertThat(actual::class.java).isEqualTo(e::class.java) - } - - @Test - fun `CryptoRetryingExecutorWithTimeout should not retry unrecoverable crypto library exception`() { - var called = 0 - assertThrows { - CryptoRetryingExecutorWithTimeout( - logger, BackoffStrategy.createBackoff(3, listOf(100L)), - defaultRetryTimeout - ).executeWithRetry { - called++ - throw CryptoException("error") - } - } - - assertThat(called).isEqualTo(1) - } - @Test fun `Should eventually fail with retrying`() { var called = 0 val actual = assertThrows { CryptoRetryingExecutor( - logger, - BackoffStrategy.createBackoff(3, listOf(10L)) + logger, 3, listOf(10L) ).executeWithRetry { called++ throw TimeoutException() @@ -178,8 +110,7 @@ class CryptoRetryingExecutorsTests { fun `Should eventually succeed after retrying TimeoutException`() { var called = 0 val result = CryptoRetryingExecutor( - logger, - BackoffStrategy.createBackoff(3, listOf(10L)) + logger, 3, listOf(10L) ).executeWithRetry { called++ if (called <= 2) { @@ -199,8 +130,7 @@ class CryptoRetryingExecutorsTests { ) { var called = 0 val result = CryptoRetryingExecutor( - logger, - BackoffStrategy.createBackoff(3, listOf(10L)) + logger, 3, listOf(10L) ).executeWithRetry { called++ if (called <= 2) { @@ -218,8 +148,7 @@ class CryptoRetryingExecutorsTests { fun `Should retry all recoverable exceptions`(e: Throwable) { var called = 0 val result = CryptoRetryingExecutor( - logger, - BackoffStrategy.createBackoff(2, listOf(10L)) + logger, 2, listOf(10L) ).executeWithRetry { called++ if (called < 2) { diff --git a/libs/utilities/src/main/java/net/corda/utilities/retry/package-info.java b/libs/utilities/src/main/java/net/corda/utilities/retry/package-info.java new file mode 100644 index 00000000000..44a588637ec --- /dev/null +++ b/libs/utilities/src/main/java/net/corda/utilities/retry/package-info.java @@ -0,0 +1,4 @@ +@Export +package net.corda.utilities.retry; + +import org.osgi.annotation.bundle.Export; diff --git a/libs/utilities/src/main/kotlin/net/corda/utilities/retry/BackoffStrategy.kt b/libs/utilities/src/main/kotlin/net/corda/utilities/retry/BackoffStrategy.kt new file mode 100644 index 00000000000..019a263362b --- /dev/null +++ b/libs/utilities/src/main/kotlin/net/corda/utilities/retry/BackoffStrategy.kt @@ -0,0 +1,59 @@ +package net.corda.utilities.retry + +import kotlin.math.pow + +/** + * Strategy to provide backoff delays when handling transient failures. + */ +interface BackoffStrategy { + /** + * Calculate next wait period in milliseconds for the given attempt. + * + * @param attempt current failed attempt, starts at 1. + * @return wait time, in milliseconds, for the given attempt number. A negative value forces the retry to stop. + */ + fun delay(attempt: Int): Long +} + +/** + * Fixed backoff strategy, delay between retries remains constant between attempts. + * + * @param delay the constant delay, in milliseconds, to be applied on each attempt. + */ +class Fixed(private val delay: Long = 1000L) : BackoffStrategy { + override fun delay(attempt: Int) = delay +} + +/** + * Fixed sequence backoff strategy, delay between retries is fixed and pre-configured for any given attempt number. + * + * @param delays the array of delays, in milliseconds, to be returned for each attempt. + */ +class FixedSequence(private val delays: List) : BackoffStrategy { + override fun delay(attempt: Int) = + // Halt the retry process if there are not enough delays configured. + if (attempt > delays.size) { + -1 + } else { + delays[attempt - 1] + } +} + +/** + * Linear backoff strategy, the delay between retries increases linearly with each attempt. + * + * @param growthFactor constant increment added to the delay, in milliseconds, with each attempt. + */ +class Linear(private val growthFactor: Long = 1000L) : BackoffStrategy { + override fun delay(attempt: Int) = growthFactor * attempt +} + +/** + * Exponential backoff strategy, the delay between retries increases exponentially with each attempt. + * + * @param base the base value for exponential growth. + * @param growthFactor the multiplier, in milliseconds, to scale the exponential growth. + */ +class Exponential(private val base: Double = 2.0, private val growthFactor: Long = 1000L) : BackoffStrategy { + override fun delay(attempt: Int) = (base.pow(attempt)).toLong() * growthFactor +} diff --git a/libs/utilities/src/main/kotlin/net/corda/utilities/retry/RetryException.kt b/libs/utilities/src/main/kotlin/net/corda/utilities/retry/RetryException.kt new file mode 100644 index 00000000000..64ec7f38199 --- /dev/null +++ b/libs/utilities/src/main/kotlin/net/corda/utilities/retry/RetryException.kt @@ -0,0 +1,5 @@ +package net.corda.utilities.retry + +import net.corda.v5.base.exceptions.CordaRuntimeException + +class RetryException(message: String, cause: Throwable? = null) : CordaRuntimeException(message, cause) diff --git a/libs/utilities/src/main/kotlin/net/corda/utilities/retry/RetryUtils.kt b/libs/utilities/src/main/kotlin/net/corda/utilities/retry/RetryUtils.kt new file mode 100644 index 00000000000..5c1f3258458 --- /dev/null +++ b/libs/utilities/src/main/kotlin/net/corda/utilities/retry/RetryUtils.kt @@ -0,0 +1,82 @@ +@file:JvmName("RetryUtils") + +package net.corda.utilities.retry + +import net.corda.v5.base.exceptions.CordaRuntimeException +import org.slf4j.Logger + +/** + * Attempts to execute the provided [operation] and returns the result if no errors occur. + * If an error occurs and the [shouldRetry] function classifies the error as non retryable, the original exception is + * thrown. If an error occurs and the [shouldRetry] function classifies the error as retryable, though, the + * [backoffStrategy] is used to determine delay intervals for automatically retrying the [operation]. This process + * continues until [operation] succeeds, or up to a maximum of [maxRetries] or [maxTimeMillis], whichever happens first. + * + * The [shouldRetry] callback function is used to determine (based on the attempt number, elapsed time and exception + * type) whether the error is retryable or not. + * + * Functions [onRetryAttempt] and [onRetryExhaustion] can be leveraged by users to control what happens in between + * attempts and what exception should be thrown if all retry attempts fail. + * + * @param logger Logger instance used to register messages, if any. + * @param maxRetries Maximum amount of retries to spend across all retries before giving up. A zero or negative value + * prevents retries. + * @param maxTimeMillis Maximum amount of time to spend across all retries before giving up. A zero or negative value + * prevents retries. + * @param backoffStrategy Strategy to use when calculating next delay interval. The retry process is halted if the + * returned delay is negative. + * @param shouldRetry A function to determine whether the exception can be automatically retried or not based on the + * attempt number, amount of milliseconds elapsed since the first attempt and the Throwable itself. + * @param onRetryAttempt Callback function to be executed before a retry attempt is made. By default, it simply logs + * (using WARN log level) the current attempt number, the delay that will be used before the next retry attempt, + * and the exception thrown by the last attempt. + * @param onRetryExhaustion Callback function that returns a [CordaRuntimeException] ([RetryException] by default) to + * be executed once all retry attempts have been exhausted. By default, it logs (under WARN log level) the total + * amount of attempts, total elapsed time and the exception thrown by the last attempt. + * @param operation Block to execute. + * @throws RetryException, or the exception type created through [onRetryExhaustion], if the original exception was + * marked as retryable but all retry attempts failed. + */ +@Suppress("LongParameterList") +fun tryWithBackoff( + logger: Logger, + maxRetries: Long, + maxTimeMillis: Long, + backoffStrategy: BackoffStrategy, + shouldRetry: (Int, Long, Throwable) -> Boolean = { _, _, _ -> false }, + onRetryAttempt: (Int, Long, Throwable) -> Unit = { attempt, delayMillis, throwable -> + logger.warn("Attempt $attempt failed with \"${throwable.message}\", will try again after $delayMillis milliseconds") + }, + onRetryExhaustion: (Int, Long, Throwable) -> CordaRuntimeException = { attempts, elapsedMillis, throwable -> + val errorMessage = + "Execution failed with \"${throwable.message}\" after retrying $attempts times for $elapsedMillis milliseconds." + logger.warn(errorMessage) + RetryException(errorMessage, throwable) + }, + operation: () -> T, +): T { + var attempt = 1 + var originalException: Throwable? + val startTime = System.currentTimeMillis() + + while (true) { + try { + return operation() + } catch (throwable: Throwable) { + val elapsedMillis = System.currentTimeMillis() - startTime + if (!shouldRetry(attempt, elapsedMillis, throwable) || (maxRetries <= 0) || (maxTimeMillis <= 0)) { + throw throwable + } + + originalException = throwable + val delayMillis = backoffStrategy.delay(attempt) + if ((delayMillis < 0) || (attempt >= maxRetries) || ((elapsedMillis + delayMillis) >= maxTimeMillis)) { + throw onRetryExhaustion(attempt, elapsedMillis, originalException) + } + + onRetryAttempt(attempt, delayMillis, originalException) + Thread.sleep(delayMillis) + attempt++ + } + } +} diff --git a/libs/utilities/src/test/kotlin/net/corda/utilities/retry/BackoffStrategyTest.kt b/libs/utilities/src/test/kotlin/net/corda/utilities/retry/BackoffStrategyTest.kt new file mode 100644 index 00000000000..488609d1201 --- /dev/null +++ b/libs/utilities/src/test/kotlin/net/corda/utilities/retry/BackoffStrategyTest.kt @@ -0,0 +1,57 @@ +package net.corda.utilities.retry + +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.seconds + +class BackoffStrategyTest { + + @Test + fun preSetBackoffStrategyReturnsPreConfiguredDelayValue() { + val backoffStrategy = FixedSequence(listOf(1, 2, 3, 4, 5, 6, 7)) + for (i in 1..7) { + assertThat(backoffStrategy.delay(i)).isEqualTo(i.toLong()) + } + } + + @Test + fun preSetBackoffStrategyReturnsNegativeDelayIfNotEnoughValuesConfigured() { + val backoffStrategy = FixedSequence(emptyList()) + assertThat(backoffStrategy.delay(1)).isNegative() + } + + @Test + fun constantBackoffStrategyReturnsConstantDelay() { + val backoffStrategy = Fixed(10) + + repeat(10) { + assertThat(backoffStrategy.delay(it)).isEqualTo(10) + } + } + + @Test + fun linearBackoffStrategyReturnsLinearDelay() { + val backoffStrategy = Linear(growthFactor = 1000L) + + assertThat(backoffStrategy.delay(1)).isEqualTo(1.seconds.inWholeMilliseconds) + assertThat(backoffStrategy.delay(2)).isEqualTo(2.seconds.inWholeMilliseconds) + assertThat(backoffStrategy.delay(3)).isEqualTo(3.seconds.inWholeMilliseconds) + } + + @Test + fun exponentialBackoffStrategyReturnsExponentialDelayWhenUsingDefaults() { + val backoffStrategy = Exponential() + assertThat(backoffStrategy.delay(1)).isEqualTo(2.seconds.inWholeMilliseconds) + assertThat(backoffStrategy.delay(2)).isEqualTo(4.seconds.inWholeMilliseconds) + assertThat(backoffStrategy.delay(3)).isEqualTo(8.seconds.inWholeMilliseconds) + } + + @Test + fun exponentialBackoffStrategyReturnsExponentialDelayWhenUsingCustomValues() { + val backoffStrategy = Exponential(base = 3.0, growthFactor = 100L) + assertThat(backoffStrategy.delay(1)).isEqualTo(300.milliseconds.inWholeMilliseconds) + assertThat(backoffStrategy.delay(2)).isEqualTo(900.milliseconds.inWholeMilliseconds) + assertThat(backoffStrategy.delay(3)).isEqualTo(2700.milliseconds.inWholeMilliseconds) + } +} diff --git a/libs/utilities/src/test/kotlin/net/corda/utilities/retry/RetryUtilsTest.kt b/libs/utilities/src/test/kotlin/net/corda/utilities/retry/RetryUtilsTest.kt new file mode 100644 index 00000000000..e4e0d0617d6 --- /dev/null +++ b/libs/utilities/src/test/kotlin/net/corda/utilities/retry/RetryUtilsTest.kt @@ -0,0 +1,228 @@ +package net.corda.utilities.retry + +import net.corda.v5.base.exceptions.CordaRuntimeException +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.assertj.core.api.Assertions.catchThrowable +import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource +import org.mockito.kotlin.any +import org.mockito.kotlin.atLeast +import org.mockito.kotlin.mock +import org.mockito.kotlin.times +import org.mockito.kotlin.verify +import org.mockito.kotlin.verifyNoInteractions +import org.mockito.kotlin.whenever +import org.slf4j.LoggerFactory +import kotlin.time.Duration.Companion.seconds + +class RetryUtilsTest { + private val backoffStrategy = mock() + + companion object { + private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass) + } + + @Test + fun tryWithBackoffDoesNotRetryOnSuccessfulExecution() { + val executor = Executor(1) + val executionResult = + tryWithBackoff(logger, maxRetries = 3, maxTimeMillis = Long.MAX_VALUE, backoffStrategy = backoffStrategy) { + executor.execute("dummy") + } + + verifyNoInteractions(backoffStrategy) + assertThat(executor.calls).isEqualTo(1) + assertThat(executionResult).isEqualTo("dummy") + } + + @Test + fun tryWithBackoffDoesNotRetryForNonRetryableExceptions() { + val executor = Executor(2) + assertThatThrownBy { + tryWithBackoff(logger, maxRetries = 3, maxTimeMillis = Long.MAX_VALUE, backoffStrategy = backoffStrategy) { + executor.execute("") + } + }.isInstanceOf(RecoverableException::class.java).hasMessage("Dummy Exception") + + verifyNoInteractions(backoffStrategy) + assertThat(executor.calls).isEqualTo(1) + } + + @ParameterizedTest + @ValueSource(longs = [0, -10]) + fun tryWithBackoffDoesNotRetryWhenMaxAttemptsIsNegativeOrZero(maxRetries: Long) { + val executor = Executor(Int.MAX_VALUE) + assertThatThrownBy { + tryWithBackoff( + logger, + maxRetries = maxRetries, + maxTimeMillis = Long.MAX_VALUE, + backoffStrategy = backoffStrategy, + shouldRetry = { _, _, t -> t is RecoverableException } + ) { + executor.execute("") + } + }.isInstanceOf(RecoverableException::class.java).hasMessage("Dummy Exception") + + verifyNoInteractions(backoffStrategy) + assertThat(executor.calls).isEqualTo(1) + } + + @ParameterizedTest + @ValueSource(longs = [0, -10]) + fun tryWithBackoffDoesNotRetryWhenMaxTimeMillisIsNegativeOrZero(maxTimeMillis: Long) { + val executor = Executor(Int.MAX_VALUE) + assertThatThrownBy { + tryWithBackoff( + logger, + maxRetries = Long.MAX_VALUE, + maxTimeMillis = maxTimeMillis, + backoffStrategy = backoffStrategy, + shouldRetry = { _, _, t -> t is RecoverableException } + ) { + executor.execute("") + } + }.isInstanceOf(RecoverableException::class.java).hasMessage("Dummy Exception") + + verifyNoInteractions(backoffStrategy) + assertThat(executor.calls).isEqualTo(1) + } + + @Test + fun tryWithBackoffRetriesOnFailureAndStopsRetryingOnSuccessfulExecution() { + val executor = Executor(5) + val executionResult = tryWithBackoff( + logger, + maxRetries = 10, + maxTimeMillis = Long.MAX_VALUE, + backoffStrategy = backoffStrategy, + shouldRetry = { _, _, t -> t is RecoverableException }, + ) { + executor.execute("dummy") + } + + assertThat(executor.calls).isEqualTo(5) + assertThat(executionResult).isEqualTo("dummy") + verify(backoffStrategy, times(4)).delay(any()) + } + + @Test + fun tryWithBackoffRetriesOnFailureAndStopsRetryingWhenDelayIsNegative() { + val executor = Executor(10) + whenever(backoffStrategy.delay(any())) + .thenReturn(1) + .thenReturn(2) + .thenReturn(-1) + + val thrownException = catchThrowable { + tryWithBackoff( + logger, + maxRetries = Long.MAX_VALUE, + maxTimeMillis = Long.MAX_VALUE, + backoffStrategy = backoffStrategy, + shouldRetry = { _, _, t -> t is RecoverableException }, + ) { + executor.execute("") + } + } + + assertThat(executor.calls).isEqualTo(3) + assertThat(thrownException) + .isInstanceOf(RetryException::class.java) + .hasCauseInstanceOf(RecoverableException::class.java) + .hasRootCauseMessage("Dummy Exception") + verify(backoffStrategy, times(3)).delay(any()) + } + + @Test + fun tryWithBackoffRetriesOnFailureAndStopsRetryingWhenMaxAttemptsIsReached() { + val executor = Executor(5) + val thrownException = catchThrowable { + tryWithBackoff( + logger, + maxRetries = 3, + maxTimeMillis = Long.MAX_VALUE, + backoffStrategy = backoffStrategy, + shouldRetry = { _, _, t -> t is RecoverableException }, + ) { + executor.execute("") + } + } + + assertThat(executor.calls).isEqualTo(3) + assertThat(thrownException) + .isInstanceOf(RetryException::class.java) + .hasCauseInstanceOf(RecoverableException::class.java) + .hasRootCauseMessage("Dummy Exception") + verify(backoffStrategy, times(3)).delay(any()) + } + + @Test + fun tryWithBackoffRetriesOnFailureAndStopsRetryingWhenMaxTimeIsReached() { + val executor = Executor(Int.MAX_VALUE) + whenever(backoffStrategy.delay(any())).thenReturn(1.seconds.inWholeMilliseconds) + + val thrownException = catchThrowable { + tryWithBackoff( + logger, + maxRetries = Long.MAX_VALUE, + maxTimeMillis = 2.seconds.inWholeMilliseconds, + backoffStrategy = backoffStrategy, + shouldRetry = { _, _, t -> t is RecoverableException }, + ) { + executor.execute("") + } + } + + assertThat(executor.calls).isGreaterThanOrEqualTo(1) + assertThat(thrownException) + .isInstanceOf(RetryException::class.java) + .hasCauseInstanceOf(RecoverableException::class.java) + .hasRootCauseMessage("Dummy Exception") + verify(backoffStrategy, atLeast(1)).delay(any()) + } + + @Test + fun tryWithBackoffRetriesOnFailureAndThrowsCustomException() { + val executor = Executor(Int.MAX_VALUE) + val thrownException = catchThrowable { + tryWithBackoff( + logger, + maxRetries = 1, + maxTimeMillis = Long.MAX_VALUE, + backoffStrategy = backoffStrategy, + shouldRetry = { _, _, t -> t is RecoverableException }, + onRetryExhaustion = { _, _, t -> CustomException("", t) } + ) { + executor.execute("") + } + } + + assertThat(executor.calls).isEqualTo(1) + assertThat(thrownException) + .isInstanceOf(CustomException::class.java) + .hasCauseInstanceOf(RecoverableException::class.java) + .hasRootCauseMessage("Dummy Exception") + verify(backoffStrategy, times(1)).delay(any()) + } +} + +class RecoverableException(message: String) : RuntimeException(message) + +class CustomException(message: String, throwable: Throwable) : CordaRuntimeException(message, throwable) + +class Executor(private val timesToSuccess: Int = 1) { + var calls = 0 + + fun execute(value: Any): Any { + calls++ + + if (calls < timesToSuccess) { + throw RecoverableException("Dummy Exception") + } + + return value + } +} diff --git a/testing/virtual-node-info-read-service-fake/src/main/kotlin/net/corda/virtualnode/read/fake/VirtualNodeInfoReadServiceFake.kt b/testing/virtual-node-info-read-service-fake/src/main/kotlin/net/corda/virtualnode/read/fake/VirtualNodeInfoReadServiceFake.kt index 4a1e503343c..aae839a9dad 100644 --- a/testing/virtual-node-info-read-service-fake/src/main/kotlin/net/corda/virtualnode/read/fake/VirtualNodeInfoReadServiceFake.kt +++ b/testing/virtual-node-info-read-service-fake/src/main/kotlin/net/corda/virtualnode/read/fake/VirtualNodeInfoReadServiceFake.kt @@ -22,6 +22,9 @@ import org.osgi.service.component.annotations.Reference import org.osgi.service.component.propertytypes.ServiceRanking import org.slf4j.LoggerFactory import java.io.File +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.ConcurrentMap import java.util.stream.Stream @@ -45,8 +48,8 @@ class VirtualNodeInfoReadServiceFake internal constructor( private val file = File("virtual-node-info-read-service-fake.yaml") } - private val map: MutableMap = virtualNodeInfos.toMutableMap() - private val callbacks: MutableList = callbacks.toMutableList() + private val map: ConcurrentMap = ConcurrentHashMap(virtualNodeInfos) + private val callbacks: ConcurrentLinkedQueue = ConcurrentLinkedQueue(callbacks) init { map += VirtualNodeInfoReadServiceFakeParser.loadFrom(file).associateBy { it.holdingIdentity } diff --git a/testing/virtual-node-info-read-service-fake/src/test/kotlin/net/corda/virtualnode/read/fake/VirtualNodeInfoReadServiceFakeTest.kt b/testing/virtual-node-info-read-service-fake/src/test/kotlin/net/corda/virtualnode/read/fake/VirtualNodeInfoReadServiceFakeTest.kt index 0b656b2ccb0..1092ed4c34e 100644 --- a/testing/virtual-node-info-read-service-fake/src/test/kotlin/net/corda/virtualnode/read/fake/VirtualNodeInfoReadServiceFakeTest.kt +++ b/testing/virtual-node-info-read-service-fake/src/test/kotlin/net/corda/virtualnode/read/fake/VirtualNodeInfoReadServiceFakeTest.kt @@ -1,8 +1,6 @@ package net.corda.virtualnode.read.fake -import net.corda.virtualnode.VirtualNodeInfo -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertNull +import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test internal class VirtualNodeInfoReadServiceFakeTest { @@ -24,31 +22,27 @@ internal class VirtualNodeInfoReadServiceFakeTest { @Test fun getAll() { - assertEquals( - listOf(alice, bob), - createService(alice, bob).getAll(), - "Onboarded virtual nodes" - ) - - assertEquals( - emptyList(), - createService().getAll(), - "Onboarded virtual nodes" - ) + assertThat(createService(alice, bob).getAll()) + .containsExactlyInAnyOrder(alice, bob) + + assertThat(createService().getAll()) + .isEmpty() } @Test fun get() { val service = createService(alice, bob) - assertEquals(bob, service.get(bob.holdingIdentity), "Virtual Node Info") - assertNull(service.get(carol.holdingIdentity), "Virtual Node Info") + + assertThat(service.get(carol.holdingIdentity)).isNull() + assertThat(service.get(bob.holdingIdentity)).isEqualTo(bob) } @Test fun getById() { val service = createService(alice, bob) - assertEquals(bob, service.getByHoldingIdentityShortHash(bob.holdingIdentity.shortHash), "Virtual Node Info") - assertNull(service.getByHoldingIdentityShortHash(carol.holdingIdentity.shortHash), "Virtual Node Info") + + assertThat(service.getByHoldingIdentityShortHash(carol.holdingIdentity.shortHash)).isNull() + assertThat(service.getByHoldingIdentityShortHash(bob.holdingIdentity.shortHash)).isEqualTo(bob) } @Test @@ -56,10 +50,9 @@ internal class VirtualNodeInfoReadServiceFakeTest { val listener = VirtualNodeInfoListenerSpy() createService(alice, bob).registerCallback(listener) - - assertEquals(1, listener.timesCalled, "times called") - assertEquals(keys(alice, bob), listener.keys[0], "changed keys") - assertEquals(snapshot(alice, bob), listener.snapshots[0], "current snapshot") + assertThat(listener.timesCalled).isEqualTo(1) + assertThat(listener.keys[0]).isEqualTo(keys(alice, bob)) + assertThat(listener.snapshots[0]).isEqualTo(snapshot(alice, bob)) } @Test @@ -68,11 +61,10 @@ internal class VirtualNodeInfoReadServiceFakeTest { val service = createService(alice, callbacks = listOf(listener)) service.addOrUpdate(bob) - - assertEquals(listOf(alice, bob), service.getAll(), "all vnodes") - assertEquals(1, listener.timesCalled, "times called listener1") - assertEquals(keys(bob), listener.keys[0], "keys added") - assertEquals(snapshot(alice, bob), listener.snapshots[0], "snapshot") + assertThat(service.getAll()).containsExactlyInAnyOrder(alice, bob) + assertThat(listener.timesCalled).isEqualTo(1) + assertThat(listener.keys[0]).isEqualTo(keys(bob)) + assertThat(listener.snapshots[0]).isEqualTo(snapshot(alice, bob)) } @Test @@ -81,10 +73,9 @@ internal class VirtualNodeInfoReadServiceFakeTest { val service = createService(alice, bob, callbacks = listOf(listener)) service.remove(alice.holdingIdentity) - - assertEquals(listOf(bob), service.getAll(), "all vnodes") - assertEquals(1, listener.timesCalled, "times called listener1") - assertEquals(keys(alice), listener.keys[0], "keys removed") - assertEquals(snapshot(bob), listener.snapshots[0], "snapshot") + assertThat(service.getAll()).containsExactlyInAnyOrder(bob) + assertThat(listener.timesCalled).isEqualTo(1) + assertThat(listener.keys[0]).isEqualTo(keys(alice)) + assertThat(listener.snapshots[0]).isEqualTo(snapshot(bob)) } }