Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CORE-20867 Implement retry topic to handle persistent transient RPC Client errors #6385

Merged
merged 19 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package net.corda.flow.external.events.impl

import java.time.Instant
import net.corda.data.flow.event.external.ExternalEvent
import net.corda.data.flow.event.external.ExternalEventResponse
import net.corda.data.flow.state.external.ExternalEventState
import net.corda.flow.external.events.factory.ExternalEventFactory
import net.corda.flow.external.events.factory.ExternalEventRecord
import net.corda.messaging.api.records.Record
import java.time.Duration
import java.time.Instant

/**
* [ExternalEventManager] encapsulates external event behaviour by creating and modifying [ExternalEventState]s.
Expand Down Expand Up @@ -86,4 +86,16 @@ interface ExternalEventManager {
instant: Instant,
retryWindow: Duration
): Pair<ExternalEventState, Record<*, *>?>

/**
* Get the external event to send for the transient error retry scenario.
* Returns the event as is from the state. No additional checks required.
* @param externalEventState The [ExternalEventState] to get the event from.
* @param instant The current time. Used to set timestamp.
* @return The external event request to resend
* */
fun getRetryEvent(
externalEventState: ExternalEventState,
instant: Instant,
): Record<*, *>
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,13 @@ class ExternalEventManagerImpl(
return externalEventState to record
}

override fun getRetryEvent(
externalEventState: ExternalEventState,
instant: Instant,
): Record<*, *> {
return generateRecord(externalEventState, instant)
}

private fun checkRetry(externalEventState: ExternalEventState, instant: Instant, retryWindow: Duration) {
when {
(externalEventState.sendTimestamp + retryWindow) >= instant -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package net.corda.flow.messaging.mediator

import com.typesafe.config.ConfigValueFactory
import net.corda.avro.serialization.CordaAvroSerializationFactory
import net.corda.data.crypto.wire.ops.flow.FlowOpsRequest
import net.corda.data.flow.event.FlowEvent
import net.corda.data.flow.event.external.ExternalEventRetryRequest
import net.corda.data.flow.event.mapper.FlowMapperEvent
import net.corda.data.flow.output.FlowStatus
import net.corda.data.flow.state.checkpoint.Checkpoint
Expand Down Expand Up @@ -30,6 +32,7 @@ import net.corda.messaging.api.mediator.RoutingDestination.Companion.routeTo
import net.corda.messaging.api.mediator.RoutingDestination.Type.ASYNCHRONOUS
import net.corda.messaging.api.mediator.RoutingDestination.Type.SYNCHRONOUS
import net.corda.messaging.api.mediator.config.EventMediatorConfigBuilder
import net.corda.messaging.api.mediator.config.RetryConfig
import net.corda.messaging.api.mediator.factory.MediatorConsumerFactory
import net.corda.messaging.api.mediator.factory.MediatorConsumerFactoryFactory
import net.corda.messaging.api.mediator.factory.MessageRouterFactory
Expand All @@ -47,12 +50,14 @@ import net.corda.schema.configuration.BootConfig.TOKEN_SELECTION_WORKER_REST_END
import net.corda.schema.configuration.BootConfig.UNIQUENESS_WORKER_REST_ENDPOINT
import net.corda.schema.configuration.BootConfig.VERIFICATION_WORKER_REST_ENDPOINT
import net.corda.schema.configuration.BootConfig.WORKER_MEDIATOR_REPLICAS_FLOW_SESSION
import net.corda.schema.configuration.MessagingConfig.Bus.KAFKA_CONSUMER_MAX_POLL_RECORDS
import net.corda.schema.configuration.MessagingConfig.Subscription.MEDIATOR_PROCESSING_MIN_POOL_RECORD_COUNT
import net.corda.schema.configuration.MessagingConfig.Subscription.MEDIATOR_PROCESSING_THREAD_POOL_SIZE
import org.osgi.service.component.annotations.Activate
import org.osgi.service.component.annotations.Component
import org.osgi.service.component.annotations.Reference
import org.slf4j.LoggerFactory
import java.time.Instant
import java.util.UUID

@Suppress("LongParameterList")
Expand All @@ -77,7 +82,9 @@ class FlowEventMediatorFactoryImpl @Activate constructor(
private const val CONSUMER_GROUP = "FlowEventConsumer"
private const val MESSAGE_BUS_CLIENT = "MessageBusClient"
private const val RPC_CLIENT = "RpcClient"

private const val RETRY_TOPIC_POLL_LIMIT = 5
private const val RETRY_TOPIC = FLOW_EVENT_TOPIC
private const val TOKEN_RETRY = "TokenRetry"
private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass)
}

Expand Down Expand Up @@ -123,12 +130,65 @@ class FlowEventMediatorFactoryImpl @Activate constructor(
.threadName("flow-event-mediator")
.stateManager(stateManager)
.minGroupSize(messagingConfig.getInt(MEDIATOR_PROCESSING_MIN_POOL_RECORD_COUNT))
.retryConfig(RetryConfig(RETRY_TOPIC, ::buildRetryRequest))
.build()


/**
* Build a request to trigger a resend of external events via the flow event pipeline.
* Request id is calculated from the previous request payload when possible to allow for some validation in the pipeline.
* This validation is an enhancement and not strictly required.
* A new Timestamp is set on each request to ensure each request is unique for replay logic handling.
* @param key the key of the input record
* @param syncRpcRequest the previous sync request which failed.
* @return list of output retry events.
*/
private fun buildRetryRequest(key: String, syncRpcRequest: MediatorMessage<Any>) : List<MediatorMessage<Any>> {
return try {
val requestId = getRequestId(syncRpcRequest)
val externalEventRetryRequest = ExternalEventRetryRequest.newBuilder()
.setRequestId(requestId)
.setTimestamp(Instant.now())
.build()
val flowEvent = FlowEvent.newBuilder()
.setFlowId(key)
.setPayload(externalEventRetryRequest)
.build()
//ensure key is set correctly on new message destined for flow topic
val properties = syncRpcRequest.properties.toMutableMap().apply { this["key"] = key }
listOf(MediatorMessage(flowEvent, properties))
} catch (ex: Exception) {
//In this scenario we failed to build the retry event. This will likely result in the flow hanging until the idle processor
// kicks in. This shouldn't be possible and is just a safety net.
logger.warn("Failed to generate a retry event for key $key. No retry will be triggered.", ex)
emptyList()
}
}

/**
* Determine the external event request id where possible.
* Note, some token events have no request id as there is no response.
* For these use a hardcoded request id which will be ignored at the validation step.
* @param syncRpcRequest the previous request
* @return Request ID to set in the retry event.
*/
private fun getRequestId(syncRpcRequest: MediatorMessage<Any>): String {
return when (val entityRequest = deserializer.deserialize(syncRpcRequest.payload as ByteArray)) {
is EntityRequest -> entityRequest.flowExternalEventContext.requestId
is FlowOpsRequest -> entityRequest.flowExternalEventContext.requestId
is LedgerPersistenceRequest -> entityRequest.flowExternalEventContext.requestId
is TransactionVerificationRequest -> entityRequest.flowExternalEventContext.requestId
is UniquenessCheckRequestAvro -> entityRequest.flowExternalEventContext.requestId
is TokenPoolCacheEvent -> TOKEN_RETRY
else -> "InvalidEntityType"
LWogan marked this conversation as resolved.
Show resolved Hide resolved
}
}

private fun createMediatorConsumerFactories(messagingConfig: SmartConfig, bootConfig: SmartConfig): List<MediatorConsumerFactory> {
val retryTopicMessagingConfig = getRetryTopicConfig(messagingConfig)
val mediatorConsumerFactory: MutableList<MediatorConsumerFactory> = mutableListOf(
mediatorConsumerFactory(FLOW_START, messagingConfig),
mediatorConsumerFactory(FLOW_EVENT_TOPIC, messagingConfig)
mediatorConsumerFactory(FLOW_EVENT_TOPIC, retryTopicMessagingConfig)
)

val mediatorReplicas = bootConfig.getIntOrDefault(WORKER_MEDIATOR_REPLICAS_FLOW_SESSION, 1)
Expand All @@ -140,6 +200,10 @@ class FlowEventMediatorFactoryImpl @Activate constructor(
return mediatorConsumerFactory
}

private fun getRetryTopicConfig(messagingConfig: SmartConfig): SmartConfig {
return messagingConfig.withValue(KAFKA_CONSUMER_MAX_POLL_RECORDS, ConfigValueFactory.fromAnyRef(RETRY_TOPIC_POLL_LIMIT))
}

private fun mediatorConsumerFactory(
topic: String,
messagingConfig: SmartConfig
Expand Down Expand Up @@ -178,8 +242,9 @@ class FlowEventMediatorFactoryImpl @Activate constructor(
rpcEndpoint(VERIFICATION_WORKER_REST_ENDPOINT, VERIFICATION_PATH), SYNCHRONOUS)
is UniquenessCheckRequestAvro -> routeTo(rpcClient,
rpcEndpoint(UNIQUENESS_WORKER_REST_ENDPOINT, UNIQUENESS_PATH), SYNCHRONOUS)
//RETRIES will appear as FlowEvents
is FlowEvent -> routeTo(messageBusClient,
FLOW_EVENT_TOPIC, ASYNCHRONOUS)
RETRY_TOPIC, ASYNCHRONOUS)
is String -> routeTo(messageBusClient, // Handling external messaging
message.properties[MSG_PROP_TOPIC] as String, ASYNCHRONOUS)
else -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package net.corda.flow.pipeline.handlers.events

import net.corda.data.flow.event.external.ExternalEventResponse
import net.corda.data.flow.event.external.ExternalEventRetryRequest
import net.corda.flow.pipeline.events.FlowEventContext
import net.corda.flow.pipeline.exceptions.FlowEventException
import net.corda.utilities.debug
import org.osgi.service.component.annotations.Component
import org.slf4j.Logger
import org.slf4j.LoggerFactory

/**
* Handles pre-processing of events that are intended to trigger a resend of an external event.
* This can be triggered by the mediator in the event of transient errors.
*/
@Component(service = [FlowEventHandler::class])
class ExternalEventRetryRequestHandler : FlowEventHandler<ExternalEventRetryRequest> {

private companion object {
val log: Logger = LoggerFactory.getLogger(this::class.java.enclosingClass)
private const val TOKEN_RETRY = "TokenRetry"
}

override val type = ExternalEventRetryRequest::class.java

override fun preProcess(context: FlowEventContext<ExternalEventRetryRequest>): FlowEventContext<ExternalEventRetryRequest> {
val checkpoint = context.checkpoint
val externalEventRetryRequest = context.inputEventPayload

if (!checkpoint.doesExist) {
log.debug {
"Received a ${ExternalEventRetryRequest::class.simpleName} for flow [${context.inputEvent.flowId}] that " +
"does not exist. The event will be discarded. ${ExternalEventRetryRequest::class.simpleName}: " +
externalEventRetryRequest
}
throw FlowEventException(
"ExternalEventRetryRequestHandler received a ${ExternalEventRetryRequest::class.simpleName} for flow" +
" [${context.inputEvent.flowId}] that does not exist"
)
}

val externalEventState = checkpoint.externalEventState
val retryRequestId: String = externalEventRetryRequest.requestId
val externalEventStateRequestId = externalEventState?.requestId
if (externalEventState == null) {
log.debug {
"Received an ${ExternalEventRetryRequest::class.simpleName} with request id: " +
"$retryRequestId while flow [${context.inputEvent.flowId} is not waiting " +
"for an ${ExternalEventResponse::class.simpleName}. " +
"${ExternalEventRetryRequest::class.simpleName}: $externalEventRetryRequest"
}
throw FlowEventException(
"ExternalEventRetryRequestHandler received an ${ExternalEventRetryRequest::class.simpleName} with request id: " +
"$retryRequestId while flow [${context.inputEvent.flowId} is not waiting " +
"for an ${ExternalEventResponse::class.simpleName}"
)
}
//Discard events not related. Some token requests do not contain the external event id so this validation will allow all token
// requests to be resent. e.g TokenForceClaimRelease
else if (externalEventStateRequestId != retryRequestId && retryRequestId != TOKEN_RETRY) {
throw FlowEventException(
"Discarding retry request received with requestId $retryRequestId. This is likely a stale record polled. Checkpoint " +
"is currently waiting to receive a response for requestId $externalEventStateRequestId"
)
}

return context
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package net.corda.flow.pipeline.impl

import net.corda.data.flow.event.external.ExternalEventRetryRequest
import net.corda.data.flow.event.mapper.FlowMapperEvent
import net.corda.data.flow.event.mapper.ScheduleCleanup
import net.corda.data.flow.state.external.ExternalEventState
import net.corda.data.flow.state.session.SessionState
import net.corda.data.flow.state.session.SessionStateType
import net.corda.flow.external.events.impl.ExternalEventManager
Expand Down Expand Up @@ -51,11 +53,11 @@ class FlowGlobalPostProcessorImpl @Activate constructor(
postProcessPendingPlatformError(context)

val outputRecords = getSessionEvents(context, now) +
getFlowMapperSessionCleanupEvents(context, now) +
getExternalEvent(context, now)
getFlowMapperSessionCleanupEvents(context, now) +
getExternalEvent(context, now)

context.flowMetrics.flowEventCompleted(context.inputEvent.payload::class.java.name)
val metadata = getStateMetadata(context)
val metadata = updateFlowSessionMetadata(context)

return context.copy(
outputRecords = context.outputRecords + outputRecords,
Expand Down Expand Up @@ -111,7 +113,7 @@ class FlowGlobalPostProcessorImpl @Activate constructor(
if (!counterpartyExists) {
val msg =
"[${context.checkpoint.holdingIdentity.x500Name}] has failed to create a flow with counterparty: " +
"[${counterparty}] as the recipient doesn't exist in the network."
"[${counterparty}] as the recipient doesn't exist in the network."
sessionManager.errorSession(sessionState)
if (doesCheckpointExist) {
log.debug { "$msg. Throwing FlowFatalException" }
Expand Down Expand Up @@ -157,24 +159,26 @@ class FlowGlobalPostProcessorImpl @Activate constructor(
* Check to see if any external events needs to be sent or resent.
*/
private fun getExternalEvent(context: FlowEventContext<Any>, now: Instant): List<Record<*, *>> {
val externalEventState = context.checkpoint.externalEventState
return if (externalEventState == null) {
listOf()
} else {
val retryWindow = context.flowConfig.getLong(EXTERNAL_EVENT_MESSAGE_RESEND_WINDOW)
externalEventManager.getEventToSend(externalEventState, now, Duration.ofMillis(retryWindow))
.let { (updatedExternalEventState, record) ->
context.checkpoint.externalEventState = updatedExternalEventState
if (record != null) {
listOf(record)
} else {
listOf()
}
val externalEventState = context.checkpoint.externalEventState ?: return emptyList()

return when (context.inputEvent.payload) {
is ExternalEventRetryRequest -> getTransientRetryRequest(externalEventState, now)
else -> {
val retryWindow = Duration.ofMillis(context.flowConfig.getLong(EXTERNAL_EVENT_MESSAGE_RESEND_WINDOW))
externalEventManager.getEventToSend(externalEventState, now, retryWindow).let { (updatedState, record) ->
context.checkpoint.externalEventState = updatedState
listOfNotNull(record)
}
}
}
}

private fun getStateMetadata(context: FlowEventContext<Any>): Metadata? {
private fun getTransientRetryRequest(externalEventState: ExternalEventState, now: Instant):
List<Record<*, *>> {
return listOf(externalEventManager.getRetryEvent(externalEventState, now))
}

private fun updateFlowSessionMetadata(context: FlowEventContext<Any>): Metadata? {
val checkpoint = context.checkpoint
// Find the earliest expiry time for any open sessions.
val lastReceivedMessageTime = checkpoint.sessions.filter {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package net.corda.flow.external.events.impl

import java.nio.ByteBuffer
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.stream.Stream
import net.corda.avro.serialization.CordaAvroDeserializer
import net.corda.avro.serialization.CordaAvroSerializer
import net.corda.data.ExceptionEnvelope
Expand Down Expand Up @@ -34,7 +30,11 @@ import org.mockito.kotlin.mock
import org.mockito.kotlin.verify
import org.mockito.kotlin.verifyNoInteractions
import org.mockito.kotlin.whenever
import java.nio.ByteBuffer
import java.time.Duration
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.stream.Stream

class ExternalEventManagerImplTest {

Expand Down Expand Up @@ -530,4 +530,35 @@ class ExternalEventManagerImplTest {

assertEquals(null, record)
}

@Test
fun `getRetryEvent returns an external event`() {
val now = Instant.now().truncatedTo(ChronoUnit.MILLIS)
val key = ByteBuffer.wrap(KEY.toByteArray())
val payload = ByteBuffer.wrap(byteArrayOf(1, 2, 3))

val externalEvent = ExternalEvent().apply {
this.topic = TOPIC
this.key = key
this.payload = payload
this.timestamp = now.minusSeconds(10)
}

val externalEventState = ExternalEventState().apply {
requestId = REQUEST_ID_1
eventToSend = externalEvent
sendTimestamp = null
status = ExternalEventStateStatus(ExternalEventStateType.OK, null)
}

val record = externalEventManager.getRetryEvent(
externalEventState,
now,
)


assertEquals(TOPIC, record.topic)
assertEquals(key.array(), record.key)
assertEquals(payload.array(), record.value)
}
}
Loading
Loading