From c438bc6b87025af25ba35653984103dc1ba0a167 Mon Sep 17 00:00:00 2001 From: LWogan Date: Fri, 25 Oct 2024 16:18:22 +0100 Subject: [PATCH 01/18] CORE-20867 Implement retry topic to handle persistent transient RPC Client errors --- .../events/impl/ExternalEventManager.kt | 12 +- .../events/impl/ExternalEventManagerImpl.kt | 14 ++- .../mediator/FlowEventMediatorFactoryImpl.kt | 32 ++++- .../events/ExternalEventResponseHandler.kt | 24 +++- .../ExternalEventRetryRequestHandler.kt | 109 ++++++++++++++++++ .../impl/FlowGlobalPostProcessorImpl.kt | 40 ++++--- .../flow/state/impl/CheckpointMetadataKeys.kt | 6 + gradle.properties | 2 +- .../mediator/processor/EventProcessor.kt | 63 ++++++---- .../mediator/config/EventMediatorConfig.kt | 3 + .../config/EventMediatorConfigBuilder.kt | 15 +++ 11 files changed, 272 insertions(+), 48 deletions(-) create mode 100644 components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManager.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManager.kt index 35ae3298aed..8f6ad26bc3b 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManager.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManager.kt @@ -1,6 +1,5 @@ package net.corda.flow.external.events.impl -import java.time.Instant import net.corda.data.flow.event.external.ExternalEvent import net.corda.data.flow.event.external.ExternalEventResponse import net.corda.data.flow.state.external.ExternalEventState @@ -8,6 +7,7 @@ import net.corda.flow.external.events.factory.ExternalEventFactory import net.corda.flow.external.events.factory.ExternalEventRecord import net.corda.messaging.api.records.Record import java.time.Duration +import java.time.Instant /** * [ExternalEventManager] encapsulates external event behaviour by creating and modifying [ExternalEventState]s. @@ -86,4 +86,14 @@ interface ExternalEventManager { instant: Instant, retryWindow: Duration ): Pair?> + + /** + * Get the external event to send for the transient error retry scenario. + * Returns the event as is from the state. No additional checks required. + * @param externalEventState The [ExternalEventState] to get the event from. + * @param instant The current time. Used to set timestamp. + * */ + fun getRetryEvent( + externalEventState: ExternalEventState + ): Record<*, *> } \ No newline at end of file diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImpl.kt index 6927f481deb..183dcbfbf3e 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImpl.kt @@ -174,6 +174,14 @@ class ExternalEventManagerImpl( return externalEventState to record } + override fun getRetryEvent( + externalEventState: ExternalEventState, + ): Record<*, *> { + //Don't update ExternalEventState with new timestamp as this will result in State change being detected and potentially + // additional checkpoints saved for cases where multiple sequential RPC calls have transient retry errors + return generateRecord(externalEventState, null) + } + private fun checkRetry(externalEventState: ExternalEventState, instant: Instant, retryWindow: Duration) { when { (externalEventState.sendTimestamp + retryWindow) >= instant -> { @@ -194,9 +202,11 @@ class ExternalEventManagerImpl( } } - private fun generateRecord(externalEventState: ExternalEventState, instant: Instant) : Record<*, *> { + private fun generateRecord(externalEventState: ExternalEventState, instant: Instant?) : Record<*, *> { val eventToSend = externalEventState.eventToSend - eventToSend.timestamp = instant + if (instant != null) { + eventToSend.timestamp = instant + } val topic = eventToSend.topic log.trace { "Dispatching external event with id '${externalEventState.requestId}' to '$topic'" } return Record(topic, eventToSend.key.array(), eventToSend.payload.array()) diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt index 374f2fce4fc..9d31a27099d 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt @@ -1,8 +1,10 @@ package net.corda.flow.messaging.mediator +import com.typesafe.config.ConfigValueFactory import net.corda.avro.serialization.CordaAvroSerializationFactory import net.corda.data.crypto.wire.ops.flow.FlowOpsRequest import net.corda.data.flow.event.FlowEvent +import net.corda.data.flow.event.external.ExternalEventRetryRequest import net.corda.data.flow.event.mapper.FlowMapperEvent import net.corda.data.flow.output.FlowStatus import net.corda.data.flow.state.checkpoint.Checkpoint @@ -47,6 +49,7 @@ import net.corda.schema.configuration.BootConfig.TOKEN_SELECTION_WORKER_REST_END import net.corda.schema.configuration.BootConfig.UNIQUENESS_WORKER_REST_ENDPOINT import net.corda.schema.configuration.BootConfig.VERIFICATION_WORKER_REST_ENDPOINT import net.corda.schema.configuration.BootConfig.WORKER_MEDIATOR_REPLICAS_FLOW_SESSION +import net.corda.schema.configuration.MessagingConfig.Bus.KAFKA_CONSUMER_MAX_POLL_RECORDS import net.corda.schema.configuration.MessagingConfig.Subscription.MEDIATOR_PROCESSING_MIN_POOL_RECORD_COUNT import net.corda.schema.configuration.MessagingConfig.Subscription.MEDIATOR_PROCESSING_THREAD_POOL_SIZE import org.osgi.service.component.annotations.Activate @@ -77,6 +80,9 @@ class FlowEventMediatorFactoryImpl @Activate constructor( private const val CONSUMER_GROUP = "FlowEventConsumer" private const val MESSAGE_BUS_CLIENT = "MessageBusClient" private const val RPC_CLIENT = "RpcClient" + private const val RETRY_TOPIC_POLL_LIMIT = 5 + private const val RETRY_TOPIC = FLOW_EVENT_TOPIC + private const val REQUEST_ID_HEADER = "request_id" private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass) } @@ -123,12 +129,28 @@ class FlowEventMediatorFactoryImpl @Activate constructor( .threadName("flow-event-mediator") .stateManager(stateManager) .minGroupSize(messagingConfig.getInt(MEDIATOR_PROCESSING_MIN_POOL_RECORD_COUNT)) + .retryConfig(EventMediatorConfigBuilder.RetryConfig(RETRY_TOPIC, ::buildRetryRequest)) .build() + private fun buildRetryRequest(key: String, syncRpcRequest: MediatorMessage) : MediatorMessage { + //TODO - is this ok. this header comes from tracing. + val requestId = syncRpcRequest.getProperty(REQUEST_ID_HEADER).toString() + val externalEventRetryRequest = ExternalEventRetryRequest.newBuilder() + .setRequestId(requestId) + .build() + val flowEvent = FlowEvent.newBuilder() + .setFlowId(key) + .setPayload(externalEventRetryRequest) + .build() + return MediatorMessage(flowEvent, syncRpcRequest.properties) + } + private fun createMediatorConsumerFactories(messagingConfig: SmartConfig, bootConfig: SmartConfig): List { + + val retryTopicMessagingConfig = getRetryTopicConfig(messagingConfig) val mediatorConsumerFactory: MutableList = mutableListOf( mediatorConsumerFactory(FLOW_START, messagingConfig), - mediatorConsumerFactory(FLOW_EVENT_TOPIC, messagingConfig) + mediatorConsumerFactory(FLOW_EVENT_TOPIC, retryTopicMessagingConfig) ) val mediatorReplicas = bootConfig.getIntOrDefault(WORKER_MEDIATOR_REPLICAS_FLOW_SESSION, 1) @@ -140,6 +162,11 @@ class FlowEventMediatorFactoryImpl @Activate constructor( return mediatorConsumerFactory } + private fun getRetryTopicConfig(messagingConfig: SmartConfig): SmartConfig { + // TODO - perhaps we should configure consumer to poll less frequently than primary topic consumers + return messagingConfig.withValue(KAFKA_CONSUMER_MAX_POLL_RECORDS, ConfigValueFactory.fromAnyRef(RETRY_TOPIC_POLL_LIMIT)) + } + private fun mediatorConsumerFactory( topic: String, messagingConfig: SmartConfig @@ -178,8 +205,9 @@ class FlowEventMediatorFactoryImpl @Activate constructor( rpcEndpoint(VERIFICATION_WORKER_REST_ENDPOINT, VERIFICATION_PATH), SYNCHRONOUS) is UniquenessCheckRequestAvro -> routeTo(rpcClient, rpcEndpoint(UNIQUENESS_WORKER_REST_ENDPOINT, UNIQUENESS_PATH), SYNCHRONOUS) + //RETRIES will appear as FlowEvents is FlowEvent -> routeTo(messageBusClient, - FLOW_EVENT_TOPIC, ASYNCHRONOUS) + RETRY_TOPIC, ASYNCHRONOUS) is String -> routeTo(messageBusClient, // Handling external messaging message.properties[MSG_PROP_TOPIC] as String, ASYNCHRONOUS) else -> { diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventResponseHandler.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventResponseHandler.kt index 2aa86686f1f..ec8bbdc4658 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventResponseHandler.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventResponseHandler.kt @@ -4,6 +4,8 @@ import net.corda.data.flow.event.external.ExternalEventResponse import net.corda.flow.external.events.impl.ExternalEventManager import net.corda.flow.pipeline.events.FlowEventContext import net.corda.flow.pipeline.exceptions.FlowEventException +import net.corda.flow.state.impl.CheckpointMetadataKeys.RETRY_EXPIRY +import net.corda.libs.statemanager.api.Metadata import net.corda.utilities.debug import org.osgi.service.component.annotations.Activate import org.osgi.service.component.annotations.Component @@ -40,6 +42,7 @@ class ExternalEventResponseHandler @Activate constructor( } val externalEventState = checkpoint.externalEventState + var metadata = context.metadata if (externalEventState == null) { log.debug { @@ -62,6 +65,25 @@ class ExternalEventResponseHandler @Activate constructor( checkpoint.externalEventState = updatedExternalEventState - return context + //if an ExternalEventResponse is received then clear the expiry time + val transientRetryExpiry = getExpiry(metadata) + if (transientRetryExpiry != null && externalEventState.response != null) { + metadata = clearExpiry(metadata) + } + + return context.copy(metadata = metadata) + } + + private fun clearExpiry(metadata: Metadata?): Metadata? { + if (metadata == null) return null + val newMap = metadata.toMutableMap() + newMap.remove(RETRY_EXPIRY) + return Metadata(newMap) + } + + private fun getExpiry(metaData: Metadata?): Long? { + if (metaData == null) return null + val expiry = metaData[RETRY_EXPIRY] ?: return null + return expiry as Long } } diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt new file mode 100644 index 00000000000..9a8fdc65281 --- /dev/null +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt @@ -0,0 +1,109 @@ +package net.corda.flow.pipeline.handlers.events + +import net.corda.data.flow.event.external.ExternalEventResponse +import net.corda.data.flow.event.external.ExternalEventRetryRequest +import net.corda.flow.pipeline.events.FlowEventContext +import net.corda.flow.pipeline.exceptions.FlowEventException +import net.corda.flow.state.impl.CheckpointMetadataKeys +import net.corda.libs.configuration.SmartConfig +import net.corda.libs.configuration.helper.getConfig +import net.corda.libs.statemanager.api.Metadata +import net.corda.schema.configuration.ConfigKeys +import net.corda.schema.configuration.MessagingConfig +import net.corda.utilities.debug +import org.osgi.service.component.annotations.Component +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import java.time.Instant + +@Component(service = [FlowEventHandler::class]) +class ExternalEventRetryRequestHandler : FlowEventHandler { + + private companion object { + val log: Logger = LoggerFactory.getLogger(this::class.java.enclosingClass) + } + + override val type = ExternalEventRetryRequest::class.java + + override fun preProcess(context: FlowEventContext): FlowEventContext { + val checkpoint = context.checkpoint + val now = Instant.now() + val externalEventRetryRequest = context.inputEventPayload + var metaData = context.metadata + val messagingConfig = context.configs.getConfig(ConfigKeys.MESSAGING_CONFIG) + + if (!checkpoint.doesExist) { + log.debug { + "Received a ${ExternalEventRetryRequest::class.simpleName} for flow [${context.inputEvent.flowId}] that " + + "does not exist. The event will be discarded. ${ExternalEventRetryRequest::class.simpleName}: " + + externalEventRetryRequest + } + throw FlowEventException( + "ExternalEventResponseHandler received a ${ExternalEventRetryRequest::class.simpleName} for flow" + + " [${context.inputEvent.flowId}] that does not exist" + ) + } + + val externalEventState = checkpoint.externalEventState + val retryRequestId: String = externalEventRetryRequest.requestId + val externalEventStateRequestId = externalEventState?.requestId + if (externalEventState == null) { + log.debug { + "Received an ${ExternalEventRetryRequest::class.simpleName} with request id: " + + "$retryRequestId while flow [${context.inputEvent.flowId} is not waiting " + + "for an ${ExternalEventResponse::class.simpleName}. " + + "${ExternalEventRetryRequest::class.simpleName}: $externalEventRetryRequest" + } + throw FlowEventException( + "ExternalEventResponseHandler received an ${ExternalEventRetryRequest::class.simpleName} with request id: " + + "$retryRequestId while flow [${context.inputEvent.flowId} is not waiting " + + "for an ${ExternalEventResponse::class.simpleName}" + ) + } else if (externalEventStateRequestId == retryRequestId) { + val expiryTime = getExpiry(metaData) + if (expiryTime == null) { + //first retry so time to set an expiry + metaData = setExpiry(metaData, messagingConfig, now) + } else if (retryIsExpired(expiryTime, now)) { + //retry timeout is exceeded so fail the flow + log.debug { + "Received an ${ExternalEventRetryRequest::class.simpleName} with request id: " + + "$retryRequestId while flow [${context.inputEvent.flowId} is waiting " + + "for an ${ExternalEventResponse::class.simpleName}. " + + "${ExternalEventRetryRequest::class.simpleName}: $externalEventRetryRequest. " + + "However, transient error retry timeout has expired" + } + // Fail the flow gracefully as weve expired the retry timeout. + throw FlowEventException( + "ExternalEventResponseHandler received an ${ExternalEventRetryRequest::class.simpleName} with request id: " + + "$retryRequestId, however retry time limit has expired." + ) + } + } else { + log.info("Discarding retry request received with requestId $retryRequestId. This is likely a stale record polled. Checkpoint " + + "is currently waiting to receive a response for requestId $externalEventStateRequestId") + } + + return context.copy(metadata = metaData) + } + + private fun retryIsExpired(retryTimeout: Long, now: Instant): Boolean { + return retryTimeout > now.toEpochMilli() + } + + private fun getExpiry(metaData: Metadata?): Long? { + if (metaData == null) return null + val expiry = metaData[CheckpointMetadataKeys.RETRY_EXPIRY] ?: return null + return expiry as Long + } + + private fun setExpiry(metaData: Metadata?, messagingConfig: SmartConfig, now: Instant): Metadata { + val retryTimeout = retryTimeout(messagingConfig) + now.toEpochMilli() + val newEntry = mapOf(CheckpointMetadataKeys.RETRY_EXPIRY to retryTimeout) + if (metaData == null) return Metadata(newEntry) + return Metadata(metaData + newEntry) + } + + private fun retryTimeout(config: SmartConfig) = + config.getLong(MessagingConfig.Subscription.MEDIATOR_PROCESSING_TRANSIENT_ERROR_TIMEOUT) +} diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowGlobalPostProcessorImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowGlobalPostProcessorImpl.kt index 8b8801f3615..7100ccff6c7 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowGlobalPostProcessorImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowGlobalPostProcessorImpl.kt @@ -1,7 +1,9 @@ package net.corda.flow.pipeline.impl +import net.corda.data.flow.event.external.ExternalEventRetryRequest import net.corda.data.flow.event.mapper.FlowMapperEvent import net.corda.data.flow.event.mapper.ScheduleCleanup +import net.corda.data.flow.state.external.ExternalEventState import net.corda.data.flow.state.session.SessionState import net.corda.data.flow.state.session.SessionStateType import net.corda.flow.external.events.impl.ExternalEventManager @@ -51,11 +53,11 @@ class FlowGlobalPostProcessorImpl @Activate constructor( postProcessPendingPlatformError(context) val outputRecords = getSessionEvents(context, now) + - getFlowMapperSessionCleanupEvents(context, now) + - getExternalEvent(context, now) + getFlowMapperSessionCleanupEvents(context, now) + + getExternalEvent(context, now) context.flowMetrics.flowEventCompleted(context.inputEvent.payload::class.java.name) - val metadata = getStateMetadata(context) + val metadata = updateFlowSessionMetadata(context) return context.copy( outputRecords = context.outputRecords + outputRecords, @@ -111,7 +113,7 @@ class FlowGlobalPostProcessorImpl @Activate constructor( if (!counterpartyExists) { val msg = "[${context.checkpoint.holdingIdentity.x500Name}] has failed to create a flow with counterparty: " + - "[${counterparty}] as the recipient doesn't exist in the network." + "[${counterparty}] as the recipient doesn't exist in the network." sessionManager.errorSession(sessionState) if (doesCheckpointExist) { log.debug { "$msg. Throwing FlowFatalException" } @@ -157,24 +159,26 @@ class FlowGlobalPostProcessorImpl @Activate constructor( * Check to see if any external events needs to be sent or resent. */ private fun getExternalEvent(context: FlowEventContext, now: Instant): List> { - val externalEventState = context.checkpoint.externalEventState - return if (externalEventState == null) { - listOf() - } else { - val retryWindow = context.flowConfig.getLong(EXTERNAL_EVENT_MESSAGE_RESEND_WINDOW) - externalEventManager.getEventToSend(externalEventState, now, Duration.ofMillis(retryWindow)) - .let { (updatedExternalEventState, record) -> - context.checkpoint.externalEventState = updatedExternalEventState - if (record != null) { - listOf(record) - } else { - listOf() - } + val externalEventState = context.checkpoint.externalEventState ?: return emptyList() + + return when (context.inputEvent.payload) { + is ExternalEventRetryRequest -> getTransientRetryRequest(externalEventState) + else -> { + val retryWindow = Duration.ofMillis(context.flowConfig.getLong(EXTERNAL_EVENT_MESSAGE_RESEND_WINDOW)) + externalEventManager.getEventToSend(externalEventState, now, retryWindow).let { (updatedState, record) -> + context.checkpoint.externalEventState = updatedState + listOfNotNull(record) } + } } } - private fun getStateMetadata(context: FlowEventContext): Metadata? { + private fun getTransientRetryRequest(externalEventState: ExternalEventState): + List> { + return listOf(externalEventManager.getRetryEvent(externalEventState)) + } + + private fun updateFlowSessionMetadata(context: FlowEventContext): Metadata? { val checkpoint = context.checkpoint // Find the earliest expiry time for any open sessions. val lastReceivedMessageTime = checkpoint.sessions.filter { diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/state/impl/CheckpointMetadataKeys.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/state/impl/CheckpointMetadataKeys.kt index 9bf6b872882..acfa8cc3070 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/state/impl/CheckpointMetadataKeys.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/state/impl/CheckpointMetadataKeys.kt @@ -17,4 +17,10 @@ object CheckpointMetadataKeys { * Checkpoints will be deleted by a cleanup processor based on a configurable time/ */ const val STATE_META_CHECKPOINT_TERMINATED_KEY = "checkpoint.terminated" + + /** + * Records how long to retry external events that suffered transient errors in the message pattern. + * These retry events originate from the message pattern and are not related to external event responses with error of type TRANSIENT + */ + const val RETRY_EXPIRY = "retry.expiry" } \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index 6afaca45353..aa2649ebd0a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -39,7 +39,7 @@ commonsLangVersion = 3.12.0 commonsTextVersion = 1.10.0 # Corda API libs revision (change in 4th digit indicates a breaking change) # Change to 5.2.1.xx-SNAPSHOT to pick up maven local published copy -cordaApiVersion=5.2.1.53-beta+ +cordaApiVersion=5.2.1.54-alpha-1729858718520 disruptorVersion=3.4.4 felixConfigAdminVersion=1.9.26 diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt index 7463fcf6136..b2dcf3487ad 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt @@ -31,6 +31,7 @@ class EventProcessor( ) { private val metrics = EventMediatorMetrics(config.name) + private val retryConfig = config.retryConfig /** * Process a group of events. @@ -141,9 +142,9 @@ class EventProcessor( } newAsyncOutputs.addAll(asyncEvents) try { - queue.addAll(processSyncEvents(key, syncEvents)) - } catch (e: CordaMessageAPIIntermittentException) { - throw EventProcessorSyncEventsIntermittentException(processorStateUpdated, e) + val (newQueueEvents, updateProcessorState) = processSyncEvents(key, syncEvents, newAsyncOutputs, processorStateUpdated) + processorStateUpdated = updateProcessorState + queue.addAll(newQueueEvents) } catch (e: Exception) { throw EventProcessorSyncEventsFatalException(processorStateUpdated, e) } @@ -164,6 +165,7 @@ class EventProcessor( processed: State? ) = when { state == null && processed != null -> StateChangeAndOperation.Create(processed) + state != null && processed == state -> StateChangeAndOperation.Noop state != null && processed != null -> StateChangeAndOperation.Update(processed) state != null && processed == null -> StateChangeAndOperation.Delete(state) else -> StateChangeAndOperation.Noop @@ -177,32 +179,47 @@ class EventProcessor( /** * Send any synchronous events immediately and feed results back onto the queue. - */ + * If a sync request returns from the RPC client with a transient error and retry is enabled + * then push a retry event onto the retry topic + **/ private fun processSyncEvents( key: K, - syncEvents: List> - ): List> { - return syncEvents.mapNotNull { message -> + syncEvents: List>, + newAsyncOutputs: MutableList>, + processorStateUpdated: StateAndEventProcessor.State? + ): Pair>, StateAndEventProcessor.State?> { + var latestProcessorStateUpdated = processorStateUpdated + val outputEvents = syncEvents.mapNotNull { message -> val destination = messageRouter.getDestination(message) - @Suppress("UNCHECKED_CAST") - val reply = with(destination) { - message.addProperty(MessagingClient.MSG_PROP_ENDPOINT, endpoint) - client.send(message) as MediatorMessage? - } - reply?.let { - addTraceContextToRecord( - Record( - "", - key, - reply.payload, - 0, - listOf(Pair(SYNC_RESPONSE_HEADER, "true")) - ), - message.properties - ) + try { + @Suppress("UNCHECKED_CAST") + val reply = with(destination) { + message.addProperty(MessagingClient.MSG_PROP_ENDPOINT, endpoint) + client.send(message) as MediatorMessage? + } + reply?.let { + addTraceContextToRecord( + Record( + "", + key, + reply.payload, + 0, + listOf(Pair(SYNC_RESPONSE_HEADER, "true")) + ), + message.properties + ) + } + } catch (e: CordaMessageAPIIntermittentException) { + if (retryConfig != null) { + retryConfig.buildRetryRequest?.let { it(key, message) }?.let { + newAsyncOutputs.add(it) + } + } + null } } + return Pair(outputEvents, latestProcessorStateUpdated) } private fun convertToMessage(record: Record<*, *>): MediatorMessage { diff --git a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfig.kt b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfig.kt index 1d49c60bfb9..1174c28a60c 100644 --- a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfig.kt +++ b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfig.kt @@ -28,6 +28,8 @@ import java.time.Duration * @property stateManager State manager. * @property minGroupSize Minimum size for group of records passed to task manager for processing in a single thread. Does not block if * group size is not met by polled record count. + * @property retryConfig Topic to push retry events to as well as the function to build retry events. Set to null to not retry transient + * errors originating from the message pattern and fail early instead. */ data class EventMediatorConfig( val name: String, @@ -40,6 +42,7 @@ data class EventMediatorConfig( val threadName: String, val stateManager: StateManager, val minGroupSize: Int, + val retryConfig: EventMediatorConfigBuilder.RetryConfig? = null ) { /** * Timeout for polling consumers. diff --git a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfigBuilder.kt b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfigBuilder.kt index a6e502a6b5c..765f4c28079 100644 --- a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfigBuilder.kt +++ b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfigBuilder.kt @@ -2,6 +2,7 @@ package net.corda.messaging.api.mediator.config import net.corda.libs.configuration.SmartConfig import net.corda.libs.statemanager.api.StateManager +import net.corda.messaging.api.mediator.MediatorMessage import net.corda.messaging.api.mediator.MultiSourceEventMediator import net.corda.messaging.api.mediator.factory.MediatorConsumerFactory import net.corda.messaging.api.mediator.factory.MessageRouterFactory @@ -27,6 +28,7 @@ class EventMediatorConfigBuilder { private var threadName: String? = null private var stateManager: StateManager? = null private var minGroupSize: Int? = null + private var retryConfig: RetryConfig? = null /** Sets name for [MultiSourceEventMediator]. */ fun name(name: String) = @@ -74,6 +76,13 @@ class EventMediatorConfigBuilder { fun stateManager(stateManager: StateManager) = apply { this.stateManager = stateManager } + /** + * Sets the topic to push retry events triggered by transient errors in the message pattern when sending RPC calls. + * As well as setting how to build a retry event from the sync request + */ + fun retryConfig(retryConfig: RetryConfig) = + apply { this.retryConfig = retryConfig } + /** Builds [EventMediatorConfig]. */ fun build(): EventMediatorConfig { check(consumerFactories.isNotEmpty()) { "At least on consumer factory has to be set" } @@ -89,6 +98,12 @@ class EventMediatorConfigBuilder { threadName = checkNotNull(threadName) { "Thread name not set" }, stateManager = checkNotNull(stateManager) { "State manager not set" }, minGroupSize = checkNotNull(minGroupSize) { "Min group size not set" }, + retryConfig = retryConfig ) } + + data class RetryConfig( + val retryTopic: String, + val buildRetryRequest: ((K, MediatorMessage) -> MediatorMessage)? = null, + ) } \ No newline at end of file From b0e8cf997944be056ecba39432ec36610292de68 Mon Sep 17 00:00:00 2001 From: LWogan Date: Tue, 29 Oct 2024 09:21:40 +0000 Subject: [PATCH 02/18] CORE-20867 Update unit test for EventProcessor --- .../mediator/processor/EventProcessorTest.kt | 41 +++++++++++++++++-- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt index 0969349bd3f..2c2b1290f38 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt @@ -10,6 +10,7 @@ import net.corda.messaging.api.mediator.MessageRouter import net.corda.messaging.api.mediator.MessagingClient import net.corda.messaging.api.mediator.RoutingDestination import net.corda.messaging.api.mediator.config.EventMediatorConfig +import net.corda.messaging.api.mediator.config.EventMediatorConfigBuilder import net.corda.messaging.api.mediator.factory.MessageRouterFactory import net.corda.messaging.api.processor.StateAndEventProcessor import net.corda.messaging.api.processor.StateAndEventProcessor.Response @@ -34,6 +35,7 @@ import java.util.UUID @Execution(ExecutionMode.SAME_THREAD) class EventProcessorTest { private lateinit var eventMediatorConfig: EventMediatorConfig + private lateinit var eventMediatorRetryConfig: EventMediatorConfig private lateinit var stateManagerHelper: StateManagerHelper private lateinit var client: MessagingClient private lateinit var messageRouter: MessageRouter @@ -65,6 +67,8 @@ class EventProcessorTest { } else RoutingDestination(client, "endpoint", RoutingDestination.Type.ASYNCHRONOUS) } eventMediatorConfig = buildTestConfig() + val retryConfig = EventMediatorConfigBuilder.RetryConfig("retry.topic", buildRetryRequest) + eventMediatorRetryConfig = buildTestConfig(retryConfig) whenever(stateAndEventProcessor.onNext(anyOrNull(), any())).thenAnswer { Response( @@ -181,7 +185,7 @@ class EventProcessorTest { } @Test - fun `when sync processing fails with a transient error, a transient state change signal is sent`() { + fun `when sync processing fails with a transient error, retry is OFF, a NOOP state change signal is set and no retry event is sent`() { val mockedState = mock() val input = mapOf("key" to EventProcessingInput("key", getStringRecords(1, "key"), null)) @@ -201,10 +205,34 @@ class EventProcessorTest { val output = outputMap["key"] assertEquals(emptyList>(), output?.asyncOutputs) assertThat(output?.stateChangeAndOperation?.outputState).isEqualTo(null) - assertThat(output?.stateChangeAndOperation).isInstanceOf(StateChangeAndOperation.Transient::class.java) + assertThat(output?.stateChangeAndOperation).isInstanceOf(StateChangeAndOperation.Noop::class.java) } - private fun buildTestConfig() = EventMediatorConfig( + @Test + fun `when sync processing fails with a transient error, retry is ON, a NOOP state change signal is set and a retry event is sent`() { + val mockedState = mock() + val input = mapOf("key" to EventProcessingInput("key", getStringRecords(1, "key"), null)) + + whenever(client.send(any())).thenThrow(CordaMessageAPIIntermittentException("baz")) + whenever(stateAndEventProcessor.onNext(anyOrNull(), any())).thenAnswer { + Response( + null, + listOf( + Record("", "key", syncMessage) + ) + ) + } + whenever(stateManagerHelper.failStateProcessing(any(), eq(null), any())).thenReturn(mockedState) + eventProcessor = EventProcessor(eventMediatorRetryConfig, stateManagerHelper, messageRouter, mediatorInputService) + val outputMap = eventProcessor.processEvents(input) + + val output = outputMap["key"] + assertEquals(1, output?.asyncOutputs?.size) + assertThat(output?.stateChangeAndOperation?.outputState).isEqualTo(null) + assertThat(output?.stateChangeAndOperation).isInstanceOf(StateChangeAndOperation.Noop::class.java) + } + + private fun buildTestConfig(retryConfig: EventMediatorConfigBuilder.RetryConfig? = null) = EventMediatorConfig( "", SmartConfigImpl.empty(), emptyList(), @@ -214,6 +242,11 @@ class EventProcessorTest { 1, "", mock(), - 20 + 20, + retryConfig ) + + private val buildRetryRequest: ((String, MediatorMessage) -> MediatorMessage) = { _, message -> + message + } } From 67eef2fc2a853f82bd21bde601218913169fa04e Mon Sep 17 00:00:00 2001 From: LWogan Date: Thu, 7 Nov 2024 18:02:13 +0000 Subject: [PATCH 03/18] CORE-20867: update entity request to be unique via a timestamp to avoid replay logic and set request id correctly --- .../flow/external/events/impl/ExternalEventManagerImpl.kt | 1 + .../messaging/mediator/FlowEventMediatorFactoryImpl.kt | 7 +++++-- gradle.properties | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImpl.kt index 183dcbfbf3e..273d1a86400 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImpl.kt @@ -205,6 +205,7 @@ class ExternalEventManagerImpl( private fun generateRecord(externalEventState: ExternalEventState, instant: Instant?) : Record<*, *> { val eventToSend = externalEventState.eventToSend if (instant != null) { + //dont update timestamp in some scenarios to avoid updating the state eventToSend.timestamp = instant } val topic = eventToSend.topic diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt index 9d31a27099d..852fbdb27a7 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt @@ -56,6 +56,7 @@ import org.osgi.service.component.annotations.Activate import org.osgi.service.component.annotations.Component import org.osgi.service.component.annotations.Reference import org.slf4j.LoggerFactory +import java.time.Instant import java.util.UUID @Suppress("LongParameterList") @@ -133,10 +134,12 @@ class FlowEventMediatorFactoryImpl @Activate constructor( .build() private fun buildRetryRequest(key: String, syncRpcRequest: MediatorMessage) : MediatorMessage { - //TODO - is this ok. this header comes from tracing. - val requestId = syncRpcRequest.getProperty(REQUEST_ID_HEADER).toString() + // TODO are they all entity requests + val entityRequest = deserializer.deserialize(syncRpcRequest.payload as ByteArray) as EntityRequest + val requestId = entityRequest.flowExternalEventContext.requestId val externalEventRetryRequest = ExternalEventRetryRequest.newBuilder() .setRequestId(requestId) + .setTimestamp(Instant.now()) .build() val flowEvent = FlowEvent.newBuilder() .setFlowId(key) diff --git a/gradle.properties b/gradle.properties index aa2649ebd0a..79d366ecb35 100644 --- a/gradle.properties +++ b/gradle.properties @@ -39,7 +39,7 @@ commonsLangVersion = 3.12.0 commonsTextVersion = 1.10.0 # Corda API libs revision (change in 4th digit indicates a breaking change) # Change to 5.2.1.xx-SNAPSHOT to pick up maven local published copy -cordaApiVersion=5.2.1.54-alpha-1729858718520 +cordaApiVersion=5.2.1.54-alpha-1731001992820 disruptorVersion=3.4.4 felixConfigAdminVersion=1.9.26 From a9979c551ea31fd71fe1b509b53c91d7822d801a Mon Sep 17 00:00:00 2001 From: LWogan Date: Thu, 7 Nov 2024 19:01:46 +0000 Subject: [PATCH 04/18] CORE-20867: invert operator --- .../handlers/events/ExternalEventRetryRequestHandler.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt index 9a8fdc65281..d31d598cb48 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt @@ -88,7 +88,7 @@ class ExternalEventRetryRequestHandler : FlowEventHandler now.toEpochMilli() + return retryTimeout < now.toEpochMilli() } private fun getExpiry(metaData: Metadata?): Long? { From 7a2d71b42efd72a6eb96fec4109ee6273233ad9c Mon Sep 17 00:00:00 2001 From: LWogan Date: Fri, 8 Nov 2024 09:12:18 +0000 Subject: [PATCH 05/18] CORE-20867: update to the latest api version. some self review updates --- .../corda/flow/external/events/impl/ExternalEventManager.kt | 1 + .../flow/external/events/impl/ExternalEventManagerImpl.kt | 4 ++-- .../flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt | 3 --- .../handlers/events/ExternalEventRetryRequestHandler.kt | 4 ++-- gradle.properties | 2 +- 5 files changed, 6 insertions(+), 8 deletions(-) diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManager.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManager.kt index 8f6ad26bc3b..a1c770f4d5b 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManager.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManager.kt @@ -92,6 +92,7 @@ interface ExternalEventManager { * Returns the event as is from the state. No additional checks required. * @param externalEventState The [ExternalEventState] to get the event from. * @param instant The current time. Used to set timestamp. + * @return The external event request to resend * */ fun getRetryEvent( externalEventState: ExternalEventState diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImpl.kt index 273d1a86400..63e8f6d2aa8 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImpl.kt @@ -177,8 +177,8 @@ class ExternalEventManagerImpl( override fun getRetryEvent( externalEventState: ExternalEventState, ): Record<*, *> { - //Don't update ExternalEventState with new timestamp as this will result in State change being detected and potentially - // additional checkpoints saved for cases where multiple sequential RPC calls have transient retry errors + //Don't update ExternalEventState with new timestamp as this will result in State change being detected by the message pattern + // and additional checkpoints saved for cases where multiple sequential RPC calls have transient retry errors return generateRecord(externalEventState, null) } diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt index 852fbdb27a7..3195b214939 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt @@ -83,7 +83,6 @@ class FlowEventMediatorFactoryImpl @Activate constructor( private const val RPC_CLIENT = "RpcClient" private const val RETRY_TOPIC_POLL_LIMIT = 5 private const val RETRY_TOPIC = FLOW_EVENT_TOPIC - private const val REQUEST_ID_HEADER = "request_id" private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass) } @@ -134,7 +133,6 @@ class FlowEventMediatorFactoryImpl @Activate constructor( .build() private fun buildRetryRequest(key: String, syncRpcRequest: MediatorMessage) : MediatorMessage { - // TODO are they all entity requests val entityRequest = deserializer.deserialize(syncRpcRequest.payload as ByteArray) as EntityRequest val requestId = entityRequest.flowExternalEventContext.requestId val externalEventRetryRequest = ExternalEventRetryRequest.newBuilder() @@ -166,7 +164,6 @@ class FlowEventMediatorFactoryImpl @Activate constructor( } private fun getRetryTopicConfig(messagingConfig: SmartConfig): SmartConfig { - // TODO - perhaps we should configure consumer to poll less frequently than primary topic consumers return messagingConfig.withValue(KAFKA_CONSUMER_MAX_POLL_RECORDS, ConfigValueFactory.fromAnyRef(RETRY_TOPIC_POLL_LIMIT)) } diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt index d31d598cb48..69cecdf8d7d 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt @@ -9,7 +9,7 @@ import net.corda.libs.configuration.SmartConfig import net.corda.libs.configuration.helper.getConfig import net.corda.libs.statemanager.api.Metadata import net.corda.schema.configuration.ConfigKeys -import net.corda.schema.configuration.MessagingConfig +import net.corda.schema.configuration.FlowConfig import net.corda.utilities.debug import org.osgi.service.component.annotations.Component import org.slf4j.Logger @@ -105,5 +105,5 @@ class ExternalEventRetryRequestHandler : FlowEventHandler Date: Mon, 11 Nov 2024 11:43:51 +0000 Subject: [PATCH 06/18] CORE-20867: update to the latest api version. some self review updates --- .../handlers/events/ExternalEventRetryRequestHandler.kt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt index 69cecdf8d7d..dbd8b6d6b02 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt @@ -30,7 +30,7 @@ class ExternalEventRetryRequestHandler : FlowEventHandler Date: Mon, 11 Nov 2024 14:17:03 +0000 Subject: [PATCH 07/18] CORE-20867: remove config value not required. Update mediator to detect when not to save. --- .../events/impl/ExternalEventManager.kt | 3 +- .../events/impl/ExternalEventManagerImpl.kt | 12 ++--- .../events/ExternalEventResponseHandler.kt | 24 +-------- .../ExternalEventRetryRequestHandler.kt | 54 +------------------ .../impl/FlowGlobalPostProcessorImpl.kt | 6 +-- .../flow/state/impl/CheckpointMetadataKeys.kt | 6 --- gradle.properties | 2 +- .../mediator/processor/EventProcessor.kt | 50 ++++++++++++----- 8 files changed, 49 insertions(+), 108 deletions(-) diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManager.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManager.kt index a1c770f4d5b..352ea549847 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManager.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManager.kt @@ -95,6 +95,7 @@ interface ExternalEventManager { * @return The external event request to resend * */ fun getRetryEvent( - externalEventState: ExternalEventState + externalEventState: ExternalEventState, + instant: Instant, ): Record<*, *> } \ No newline at end of file diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImpl.kt index 63e8f6d2aa8..15d08da0deb 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImpl.kt @@ -176,10 +176,9 @@ class ExternalEventManagerImpl( override fun getRetryEvent( externalEventState: ExternalEventState, + instant: Instant, ): Record<*, *> { - //Don't update ExternalEventState with new timestamp as this will result in State change being detected by the message pattern - // and additional checkpoints saved for cases where multiple sequential RPC calls have transient retry errors - return generateRecord(externalEventState, null) + return generateRecord(externalEventState, instant) } private fun checkRetry(externalEventState: ExternalEventState, instant: Instant, retryWindow: Duration) { @@ -202,12 +201,9 @@ class ExternalEventManagerImpl( } } - private fun generateRecord(externalEventState: ExternalEventState, instant: Instant?) : Record<*, *> { + private fun generateRecord(externalEventState: ExternalEventState, instant: Instant) : Record<*, *> { val eventToSend = externalEventState.eventToSend - if (instant != null) { - //dont update timestamp in some scenarios to avoid updating the state - eventToSend.timestamp = instant - } + eventToSend.timestamp = instant val topic = eventToSend.topic log.trace { "Dispatching external event with id '${externalEventState.requestId}' to '$topic'" } return Record(topic, eventToSend.key.array(), eventToSend.payload.array()) diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventResponseHandler.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventResponseHandler.kt index ec8bbdc4658..2aa86686f1f 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventResponseHandler.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventResponseHandler.kt @@ -4,8 +4,6 @@ import net.corda.data.flow.event.external.ExternalEventResponse import net.corda.flow.external.events.impl.ExternalEventManager import net.corda.flow.pipeline.events.FlowEventContext import net.corda.flow.pipeline.exceptions.FlowEventException -import net.corda.flow.state.impl.CheckpointMetadataKeys.RETRY_EXPIRY -import net.corda.libs.statemanager.api.Metadata import net.corda.utilities.debug import org.osgi.service.component.annotations.Activate import org.osgi.service.component.annotations.Component @@ -42,7 +40,6 @@ class ExternalEventResponseHandler @Activate constructor( } val externalEventState = checkpoint.externalEventState - var metadata = context.metadata if (externalEventState == null) { log.debug { @@ -65,25 +62,6 @@ class ExternalEventResponseHandler @Activate constructor( checkpoint.externalEventState = updatedExternalEventState - //if an ExternalEventResponse is received then clear the expiry time - val transientRetryExpiry = getExpiry(metadata) - if (transientRetryExpiry != null && externalEventState.response != null) { - metadata = clearExpiry(metadata) - } - - return context.copy(metadata = metadata) - } - - private fun clearExpiry(metadata: Metadata?): Metadata? { - if (metadata == null) return null - val newMap = metadata.toMutableMap() - newMap.remove(RETRY_EXPIRY) - return Metadata(newMap) - } - - private fun getExpiry(metaData: Metadata?): Long? { - if (metaData == null) return null - val expiry = metaData[RETRY_EXPIRY] ?: return null - return expiry as Long + return context } } diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt index dbd8b6d6b02..e0af4c0a4a5 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt @@ -4,17 +4,10 @@ import net.corda.data.flow.event.external.ExternalEventResponse import net.corda.data.flow.event.external.ExternalEventRetryRequest import net.corda.flow.pipeline.events.FlowEventContext import net.corda.flow.pipeline.exceptions.FlowEventException -import net.corda.flow.state.impl.CheckpointMetadataKeys -import net.corda.libs.configuration.SmartConfig -import net.corda.libs.configuration.helper.getConfig -import net.corda.libs.statemanager.api.Metadata -import net.corda.schema.configuration.ConfigKeys -import net.corda.schema.configuration.FlowConfig import net.corda.utilities.debug import org.osgi.service.component.annotations.Component import org.slf4j.Logger import org.slf4j.LoggerFactory -import java.time.Instant @Component(service = [FlowEventHandler::class]) class ExternalEventRetryRequestHandler : FlowEventHandler { @@ -27,10 +20,7 @@ class ExternalEventRetryRequestHandler : FlowEventHandler): FlowEventContext { val checkpoint = context.checkpoint - val now = Instant.now() val externalEventRetryRequest = context.inputEventPayload - var metaData = context.metadata - val flowConfig = context.configs.getConfig(ConfigKeys.FLOW_CONFIG) if (!checkpoint.doesExist) { log.debug { @@ -59,51 +49,11 @@ class ExternalEventRetryRequestHandler : FlowEventHandler getTransientRetryRequest(externalEventState) + is ExternalEventRetryRequest -> getTransientRetryRequest(externalEventState, now) else -> { val retryWindow = Duration.ofMillis(context.flowConfig.getLong(EXTERNAL_EVENT_MESSAGE_RESEND_WINDOW)) externalEventManager.getEventToSend(externalEventState, now, retryWindow).let { (updatedState, record) -> @@ -173,9 +173,9 @@ class FlowGlobalPostProcessorImpl @Activate constructor( } } - private fun getTransientRetryRequest(externalEventState: ExternalEventState): + private fun getTransientRetryRequest(externalEventState: ExternalEventState, now: Instant): List> { - return listOf(externalEventManager.getRetryEvent(externalEventState)) + return listOf(externalEventManager.getRetryEvent(externalEventState, now)) } private fun updateFlowSessionMetadata(context: FlowEventContext): Metadata? { diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/state/impl/CheckpointMetadataKeys.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/state/impl/CheckpointMetadataKeys.kt index acfa8cc3070..9bf6b872882 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/state/impl/CheckpointMetadataKeys.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/state/impl/CheckpointMetadataKeys.kt @@ -17,10 +17,4 @@ object CheckpointMetadataKeys { * Checkpoints will be deleted by a cleanup processor based on a configurable time/ */ const val STATE_META_CHECKPOINT_TERMINATED_KEY = "checkpoint.terminated" - - /** - * Records how long to retry external events that suffered transient errors in the message pattern. - * These retry events originate from the message pattern and are not related to external event responses with error of type TRANSIENT - */ - const val RETRY_EXPIRY = "retry.expiry" } \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index 6de7022cb16..e8ece6bc0f1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -39,7 +39,7 @@ commonsLangVersion = 3.12.0 commonsTextVersion = 1.10.0 # Corda API libs revision (change in 4th digit indicates a breaking change) # Change to 5.2.1.xx-SNAPSHOT to pick up maven local published copy -cordaApiVersion=5.2.1.54-alpha-1731056624538 +cordaApiVersion=5.2.1.54-alpha-1731333866451 disruptorVersion=3.4.4 felixConfigAdminVersion=1.9.26 diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt index b2dcf3487ad..e52153be947 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt @@ -128,10 +128,11 @@ class EventProcessor( consumerInputEvent: Record, processorState: StateAndEventProcessor.State?, key: K, - ): Pair?, List>> { + ): ConsumerInputOutput { var processorStateUpdated = processorState val newAsyncOutputs = mutableListOf>() val consumerInputHash = mediatorInputService.getHash(consumerInputEvent) + val isRetryTopic = consumerInputEvent.topic == config.retryConfig?.retryTopic val queue = ArrayDeque(listOf(consumerInputEvent)) while (queue.isNotEmpty()) { val event = getNextEvent(queue, consumerInputHash) @@ -142,16 +143,32 @@ class EventProcessor( } newAsyncOutputs.addAll(asyncEvents) try { - val (newQueueEvents, updateProcessorState) = processSyncEvents(key, syncEvents, newAsyncOutputs, processorStateUpdated) - processorStateUpdated = updateProcessorState - queue.addAll(newQueueEvents) + val (syncResponses, isNoopRetry, asyncOutputs) = processSyncEvents(key, syncEvents, isRetryTopic) + newAsyncOutputs.addAll(asyncOutputs) + queue.addAll(syncResponses) + if (isNoopRetry) { + // return early if no state update is needed + return ConsumerInputOutput(processorStateUpdated, newAsyncOutputs, true) + } } catch (e: Exception) { throw EventProcessorSyncEventsFatalException(processorStateUpdated, e) } } - return Pair(processorStateUpdated, newAsyncOutputs) + return ConsumerInputOutput(processorStateUpdated, newAsyncOutputs) } + data class ConsumerInputOutput( + val updatedState: StateAndEventProcessor.State?, + val outputEvents: List>, + val isNoop: Boolean = false + ) + + data class SyncProcessingOutput( + val syncResponses: List>, + val isNoopRetry: Boolean = false, + val asyncOutputs: List> = emptyList() + ) + private fun getNextEvent( queue: ArrayDeque>, consumerInputHash: String @@ -165,7 +182,6 @@ class EventProcessor( processed: State? ) = when { state == null && processed != null -> StateChangeAndOperation.Create(processed) - state != null && processed == state -> StateChangeAndOperation.Noop state != null && processed != null -> StateChangeAndOperation.Update(processed) state != null && processed == null -> StateChangeAndOperation.Delete(state) else -> StateChangeAndOperation.Noop @@ -180,15 +196,14 @@ class EventProcessor( /** * Send any synchronous events immediately and feed results back onto the queue. * If a sync request returns from the RPC client with a transient error and retry is enabled - * then push a retry event onto the retry topic + * then push a retry event onto the retry topic. + * If retrying again via the retry topic and transient errors occur, resend retry event and do not update state **/ private fun processSyncEvents( key: K, syncEvents: List>, - newAsyncOutputs: MutableList>, - processorStateUpdated: StateAndEventProcessor.State? - ): Pair>, StateAndEventProcessor.State?> { - var latestProcessorStateUpdated = processorStateUpdated + isRetryTopic: Boolean + ): SyncProcessingOutput { val outputEvents = syncEvents.mapNotNull { message -> val destination = messageRouter.getDestination(message) @@ -211,15 +226,22 @@ class EventProcessor( ) } } catch (e: CordaMessageAPIIntermittentException) { + val outputEvents: MutableList> = mutableListOf() if (retryConfig != null) { retryConfig.buildRetryRequest?.let { it(key, message) }?.let { - newAsyncOutputs.add(it) + outputEvents.add(it) } } - null + // If we're on the retry topic and run into another transient error, exit early and do not update the state to save + // performance. + // If we are not on the retry topic then we need to save the state before adding the retry event. This will allow the + // flow cleanup processors to execute on an idle flow checkpoint + if (isRetryTopic) { + return SyncProcessingOutput(emptyList(), true, outputEvents) + } else null } } - return Pair(outputEvents, latestProcessorStateUpdated) + return SyncProcessingOutput(outputEvents) } private fun convertToMessage(record: Record<*, *>): MediatorMessage { From bed624e2d8e0a263aa0021ff45ee7ca730dfd209 Mon Sep 17 00:00:00 2001 From: LWogan Date: Mon, 11 Nov 2024 15:48:48 +0000 Subject: [PATCH 08/18] CORE-20867: exit early from processing when state change is a retry noop --- .../mediator/processor/ConsumerProcessor.kt | 1 + .../mediator/processor/EventProcessor.kt | 11 +++-- .../kotlin/net/corda/messaging/TestUtils.kt | 4 +- .../mediator/processor/EventProcessorTest.kt | 48 ++++++++++++++----- 4 files changed, 45 insertions(+), 19 deletions(-) diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/ConsumerProcessor.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/ConsumerProcessor.kt index 446f5bcbab0..21785d88f14 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/ConsumerProcessor.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/ConsumerProcessor.kt @@ -118,6 +118,7 @@ class ConsumerProcessor( metrics.processorTimer.recordCallable { try { val inputs = getInputs(consumer) + if (inputs.isEmpty()) return@recordCallable val outputs = processInputs(inputs) categorizeOutputs(outputs, failureCounts) commit(consumer, outputs, failureCounts) diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt index e52153be947..807ddaa77e5 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt @@ -75,13 +75,15 @@ class EventProcessor( var processorState = inputProcessorState val asyncOutputs = mutableMapOf, MutableList>>() val stateChangeAndOperation = try { + var isNoopState = false input.records.forEach { consumerInputEvent -> - val (updatedProcessorState, newAsyncOutputs) = processConsumerInput(consumerInputEvent, processorState, key) + val (updatedProcessorState, newAsyncOutputs, isNoop) = processConsumerInput(consumerInputEvent, processorState, key) processorState = updatedProcessorState asyncOutputs.addOutputs(consumerInputEvent, newAsyncOutputs) + if (isNoop) isNoopState = isNoop } val state = stateManagerHelper.createOrUpdateState(key.toString(), inputState, processorState) - stateChangeAndOperation(inputState, state) + if (isNoopState) StateChangeAndOperation.Noop else stateChangeAndOperation(inputState, state) } catch (e: EventProcessorSyncEventsIntermittentException) { asyncOutputs.clear() StateChangeAndOperation.Transient @@ -232,13 +234,12 @@ class EventProcessor( outputEvents.add(it) } } + // If we're on the retry topic and run into another transient error, exit early and do not update the state to save // performance. // If we are not on the retry topic then we need to save the state before adding the retry event. This will allow the // flow cleanup processors to execute on an idle flow checkpoint - if (isRetryTopic) { - return SyncProcessingOutput(emptyList(), true, outputEvents) - } else null + return SyncProcessingOutput(emptyList(), isRetryTopic, outputEvents) } } return SyncProcessingOutput(outputEvents) diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/TestUtils.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/TestUtils.kt index 928ebeea7b2..7180fa6f25e 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/TestUtils.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/TestUtils.kt @@ -50,10 +50,10 @@ fun generateMockCordaConsumerRecordList(numberOfRecords: Long, topic: String, pa /** * Generate [recordCount] string key/value records */ -fun getStringRecords(recordCount: Int, key: String): List> { +fun getStringRecords(recordCount: Int, key: String, topic: String = "topic"): List> { val records = mutableListOf>() for (j in 1..recordCount) { - records.add(Record("topic", key, j.toString())) + records.add(Record(topic, key, j.toString())) } return records diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt index 2c2b1290f38..48840d95a9d 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt @@ -1,6 +1,7 @@ package net.corda.messaging.mediator.processor import net.corda.libs.configuration.SmartConfigImpl +import net.corda.libs.statemanager.api.Metadata import net.corda.libs.statemanager.api.State import net.corda.messaging.api.exception.CordaMessageAPIFatalException import net.corda.messaging.api.exception.CordaMessageAPIIntermittentException @@ -44,6 +45,7 @@ class EventProcessorTest { private lateinit var eventProcessor: EventProcessor private val inputState1: State = mock() + private val retryTopic: String = "flow.event" private val asyncMessage: String = "ASYNC_PAYLOAD" private val syncMessage: String = "SYNC_PAYLOAD" private val updatedProcessingState = StateAndEventProcessor.State("bar", null) @@ -67,7 +69,7 @@ class EventProcessorTest { } else RoutingDestination(client, "endpoint", RoutingDestination.Type.ASYNCHRONOUS) } eventMediatorConfig = buildTestConfig() - val retryConfig = EventMediatorConfigBuilder.RetryConfig("retry.topic", buildRetryRequest) + val retryConfig = EventMediatorConfigBuilder.RetryConfig(retryTopic, buildRetryRequest) eventMediatorRetryConfig = buildTestConfig(retryConfig) whenever(stateAndEventProcessor.onNext(anyOrNull(), any())).thenAnswer { @@ -185,9 +187,11 @@ class EventProcessorTest { } @Test - fun `when sync processing fails with a transient error, retry is OFF, a NOOP state change signal is set and no retry event is sent`() { - val mockedState = mock() + fun `when sync processing fails with a transient error, retry is OFF, a CREATE state change signal is set and no retry event is sent` + () { val input = mapOf("key" to EventProcessingInput("key", getStringRecords(1, "key"), null)) + val mockedState = mock() + whenever(stateManagerHelper.createOrUpdateState(any(), anyOrNull(), anyOrNull())).thenReturn(mockedState) whenever(client.send(any())).thenThrow(CordaMessageAPIIntermittentException("baz")) whenever(stateAndEventProcessor.onNext(anyOrNull(), any())).thenAnswer { @@ -198,31 +202,28 @@ class EventProcessorTest { ) ) } - whenever(stateManagerHelper.failStateProcessing(any(), eq(null), any())).thenReturn(mockedState) - val outputMap = eventProcessor.processEvents(input) val output = outputMap["key"] assertEquals(emptyList>(), output?.asyncOutputs) - assertThat(output?.stateChangeAndOperation?.outputState).isEqualTo(null) - assertThat(output?.stateChangeAndOperation).isInstanceOf(StateChangeAndOperation.Noop::class.java) + assertThat(output?.stateChangeAndOperation?.outputState).isEqualTo(mockedState) + assertThat(output?.stateChangeAndOperation).isInstanceOf(StateChangeAndOperation.Create::class.java) } @Test - fun `when sync processing fails with a transient error, retry is ON, a NOOP state change signal is set and a retry event is sent`() { + fun `when transient error while processing retry topic, retry is ON, a NOOP state change signal is set and a retry event is sent`() { + val input = mapOf("key" to EventProcessingInput("key", getStringRecords(1, "key", retryTopic), null)) val mockedState = mock() - val input = mapOf("key" to EventProcessingInput("key", getStringRecords(1, "key"), null)) - + whenever(stateManagerHelper.createOrUpdateState(any(), anyOrNull(), anyOrNull())).thenReturn(mockedState) whenever(client.send(any())).thenThrow(CordaMessageAPIIntermittentException("baz")) whenever(stateAndEventProcessor.onNext(anyOrNull(), any())).thenAnswer { - Response( + Response( null, listOf( Record("", "key", syncMessage) ) ) } - whenever(stateManagerHelper.failStateProcessing(any(), eq(null), any())).thenReturn(mockedState) eventProcessor = EventProcessor(eventMediatorRetryConfig, stateManagerHelper, messageRouter, mediatorInputService) val outputMap = eventProcessor.processEvents(input) @@ -232,6 +233,29 @@ class EventProcessorTest { assertThat(output?.stateChangeAndOperation).isInstanceOf(StateChangeAndOperation.Noop::class.java) } + @Test + fun `when transient error while processing event topic, retry is ON, a NOOP state change signal is set and a retry event is sent`() { + val input = mapOf("key" to EventProcessingInput("key", getStringRecords(1, "key", "flow.start"), null)) + val mockedState = mock() + whenever(stateManagerHelper.createOrUpdateState(any(), anyOrNull(), any())).thenReturn(mockedState) + whenever(client.send(any())).thenThrow(CordaMessageAPIIntermittentException("baz")) + whenever(stateAndEventProcessor.onNext(anyOrNull(), any())).thenAnswer { + Response( + StateAndEventProcessor.State("", Metadata(mapOf())), + listOf( + Record("", "key", syncMessage) + ) + ) + } + eventProcessor = EventProcessor(eventMediatorRetryConfig, stateManagerHelper, messageRouter, mediatorInputService) + val outputMap = eventProcessor.processEvents(input) + + val output = outputMap["key"] + assertEquals(1, output?.asyncOutputs?.size) + assertThat(output?.stateChangeAndOperation?.outputState).isEqualTo(mockedState) + assertThat(output?.stateChangeAndOperation).isInstanceOf(StateChangeAndOperation.Create::class.java) + } + private fun buildTestConfig(retryConfig: EventMediatorConfigBuilder.RetryConfig? = null) = EventMediatorConfig( "", SmartConfigImpl.empty(), From 8b4039d412a95b14eebd3a46af49dc384721f0c4 Mon Sep 17 00:00:00 2001 From: LWogan Date: Tue, 12 Nov 2024 09:32:54 +0000 Subject: [PATCH 09/18] CORE-20867: add additional logging --- .../corda/messaging/mediator/processor/EventProcessor.kt | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt index 807ddaa77e5..8a7e1d96d58 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt @@ -15,6 +15,7 @@ import net.corda.messaging.api.records.Record import net.corda.messaging.mediator.StateManagerHelper import net.corda.messaging.mediator.metrics.EventMediatorMetrics import net.corda.tracing.addTraceContextToRecord +import org.slf4j.LoggerFactory /** * Class to process records received from the consumer. @@ -32,6 +33,8 @@ class EventProcessor( private val metrics = EventMediatorMetrics(config.name) private val retryConfig = config.retryConfig + private val logger = LoggerFactory.getLogger("${this.javaClass.name}-${config.name}") + /** * Process a group of events. @@ -80,7 +83,7 @@ class EventProcessor( val (updatedProcessorState, newAsyncOutputs, isNoop) = processConsumerInput(consumerInputEvent, processorState, key) processorState = updatedProcessorState asyncOutputs.addOutputs(consumerInputEvent, newAsyncOutputs) - if (isNoop) isNoopState = isNoop + if (isNoop) isNoopState = true } val state = stateManagerHelper.createOrUpdateState(key.toString(), inputState, processorState) if (isNoopState) StateChangeAndOperation.Noop else stateChangeAndOperation(inputState, state) @@ -153,6 +156,7 @@ class EventProcessor( return ConsumerInputOutput(processorStateUpdated, newAsyncOutputs, true) } } catch (e: Exception) { + logger.warn("Failed process synchronous events for key $key", e) throw EventProcessorSyncEventsFatalException(processorStateUpdated, e) } } From a7e94d7d26ed783a0ef5753cf60d3b57fb8ad959 Mon Sep 17 00:00:00 2001 From: LWogan Date: Tue, 12 Nov 2024 12:27:13 +0000 Subject: [PATCH 10/18] CORE-20867: fix retries for other event types --- .../mediator/FlowEventMediatorFactoryImpl.kt | 86 +++++++++++-------- .../ExternalEventRetryRequestHandler.kt | 6 +- .../mediator/processor/EventProcessor.kt | 2 +- .../config/EventMediatorConfigBuilder.kt | 2 +- 4 files changed, 59 insertions(+), 37 deletions(-) diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt index 3195b214939..3f63b74226f 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt @@ -83,7 +83,7 @@ class FlowEventMediatorFactoryImpl @Activate constructor( private const val RPC_CLIENT = "RpcClient" private const val RETRY_TOPIC_POLL_LIMIT = 5 private const val RETRY_TOPIC = FLOW_EVENT_TOPIC - + private const val TOKEN_RETRY = "TokenRetry" private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass) } @@ -110,44 +110,62 @@ class FlowEventMediatorFactoryImpl @Activate constructor( messageProcessor: StateAndEventProcessor, stateManager: StateManager, ) = EventMediatorConfigBuilder() - .name("FlowEventMediator") - .messagingConfig(messagingConfig) - .consumerFactories( - *createMediatorConsumerFactories(messagingConfig, bootConfig).toTypedArray() - ) - .clientFactories( - messagingClientFactoryFactory.createMessageBusClientFactory( - MESSAGE_BUS_CLIENT, messagingConfig - ), - messagingClientFactoryFactory.createRPCClientFactory( - RPC_CLIENT + .name("FlowEventMediator") + .messagingConfig(messagingConfig) + .consumerFactories( + *createMediatorConsumerFactories(messagingConfig, bootConfig).toTypedArray() ) - ) - .messageProcessor(messageProcessor) - .messageRouterFactory(createMessageRouterFactory(messagingConfig)) - .threads(messagingConfig.getInt(MEDIATOR_PROCESSING_THREAD_POOL_SIZE)) - .threadName("flow-event-mediator") - .stateManager(stateManager) - .minGroupSize(messagingConfig.getInt(MEDIATOR_PROCESSING_MIN_POOL_RECORD_COUNT)) - .retryConfig(EventMediatorConfigBuilder.RetryConfig(RETRY_TOPIC, ::buildRetryRequest)) - .build() - - private fun buildRetryRequest(key: String, syncRpcRequest: MediatorMessage) : MediatorMessage { - val entityRequest = deserializer.deserialize(syncRpcRequest.payload as ByteArray) as EntityRequest - val requestId = entityRequest.flowExternalEventContext.requestId - val externalEventRetryRequest = ExternalEventRetryRequest.newBuilder() - .setRequestId(requestId) - .setTimestamp(Instant.now()) - .build() - val flowEvent = FlowEvent.newBuilder() - .setFlowId(key) - .setPayload(externalEventRetryRequest) + .clientFactories( + messagingClientFactoryFactory.createMessageBusClientFactory( + MESSAGE_BUS_CLIENT, messagingConfig + ), + messagingClientFactoryFactory.createRPCClientFactory( + RPC_CLIENT + ) + ) + .messageProcessor(messageProcessor) + .messageRouterFactory(createMessageRouterFactory(messagingConfig)) + .threads(messagingConfig.getInt(MEDIATOR_PROCESSING_THREAD_POOL_SIZE)) + .threadName("flow-event-mediator") + .stateManager(stateManager) + .minGroupSize(messagingConfig.getInt(MEDIATOR_PROCESSING_MIN_POOL_RECORD_COUNT)) + .retryConfig(EventMediatorConfigBuilder.RetryConfig(RETRY_TOPIC, ::buildRetryRequest)) .build() - return MediatorMessage(flowEvent, syncRpcRequest.properties) + + + private fun buildRetryRequest(key: String, syncRpcRequest: MediatorMessage) : List> { + return try { + val requestId = getRequestId(syncRpcRequest) + val externalEventRetryRequest = ExternalEventRetryRequest.newBuilder() + .setRequestId(requestId) + .setTimestamp(Instant.now()) + .build() + val flowEvent = FlowEvent.newBuilder() + .setFlowId(key) + .setPayload(externalEventRetryRequest) + .build() + listOf(MediatorMessage(flowEvent, syncRpcRequest.properties)) + } catch (ex: Exception) { + //In this scenario we failed to build the retry event. This will likely result in the flow hanging until the idle processor + // kicks in. This shouldn't be possible as is just a safety net. + logger.warn("Failed to generate a retry event for key $key. No retry will be triggered.", ex) + emptyList() + } } - private fun createMediatorConsumerFactories(messagingConfig: SmartConfig, bootConfig: SmartConfig): List { + private fun getRequestId(syncRpcRequest: MediatorMessage): String { + return when (val entityRequest = deserializer.deserialize(syncRpcRequest.payload as ByteArray)) { + is EntityRequest -> entityRequest.flowExternalEventContext.requestId + is FlowOpsRequest -> entityRequest.flowExternalEventContext.requestId + is LedgerPersistenceRequest -> entityRequest.flowExternalEventContext.requestId + is TransactionVerificationRequest -> entityRequest.flowExternalEventContext.requestId + is UniquenessCheckRequestAvro -> entityRequest.flowExternalEventContext.requestId + is TokenPoolCacheEvent -> TOKEN_RETRY + else -> "InvalidEntityType" + } + } + private fun createMediatorConsumerFactories(messagingConfig: SmartConfig, bootConfig: SmartConfig): List { val retryTopicMessagingConfig = getRetryTopicConfig(messagingConfig) val mediatorConsumerFactory: MutableList = mutableListOf( mediatorConsumerFactory(FLOW_START, messagingConfig), diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt index e0af4c0a4a5..3209be4cb60 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt @@ -14,6 +14,7 @@ class ExternalEventRetryRequestHandler : FlowEventHandler( val outputEvents: MutableList> = mutableListOf() if (retryConfig != null) { retryConfig.buildRetryRequest?.let { it(key, message) }?.let { - outputEvents.add(it) + outputEvents.addAll(it) } } diff --git a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfigBuilder.kt b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfigBuilder.kt index 765f4c28079..d2ed98ddd6a 100644 --- a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfigBuilder.kt +++ b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfigBuilder.kt @@ -104,6 +104,6 @@ class EventMediatorConfigBuilder { data class RetryConfig( val retryTopic: String, - val buildRetryRequest: ((K, MediatorMessage) -> MediatorMessage)? = null, + val buildRetryRequest: ((K, MediatorMessage) -> List>)? = null, ) } \ No newline at end of file From 8182255d080b1a78fb870fa379a0795e8bd36406 Mon Sep 17 00:00:00 2001 From: LWogan Date: Tue, 12 Nov 2024 13:36:48 +0000 Subject: [PATCH 11/18] CORE-20867: self review comments --- .../mediator/FlowEventMediatorFactoryImpl.kt | 58 ++++++++++++------- .../ExternalEventRetryRequestHandler.kt | 4 ++ .../mediator/processor/ConsumerProcessor.kt | 1 + 3 files changed, 42 insertions(+), 21 deletions(-) diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt index 3f63b74226f..09eebf4a679 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt @@ -110,29 +110,38 @@ class FlowEventMediatorFactoryImpl @Activate constructor( messageProcessor: StateAndEventProcessor, stateManager: StateManager, ) = EventMediatorConfigBuilder() - .name("FlowEventMediator") - .messagingConfig(messagingConfig) - .consumerFactories( - *createMediatorConsumerFactories(messagingConfig, bootConfig).toTypedArray() - ) - .clientFactories( - messagingClientFactoryFactory.createMessageBusClientFactory( - MESSAGE_BUS_CLIENT, messagingConfig - ), - messagingClientFactoryFactory.createRPCClientFactory( - RPC_CLIENT - ) + .name("FlowEventMediator") + .messagingConfig(messagingConfig) + .consumerFactories( + *createMediatorConsumerFactories(messagingConfig, bootConfig).toTypedArray() + ) + .clientFactories( + messagingClientFactoryFactory.createMessageBusClientFactory( + MESSAGE_BUS_CLIENT, messagingConfig + ), + messagingClientFactoryFactory.createRPCClientFactory( + RPC_CLIENT ) - .messageProcessor(messageProcessor) - .messageRouterFactory(createMessageRouterFactory(messagingConfig)) - .threads(messagingConfig.getInt(MEDIATOR_PROCESSING_THREAD_POOL_SIZE)) - .threadName("flow-event-mediator") - .stateManager(stateManager) - .minGroupSize(messagingConfig.getInt(MEDIATOR_PROCESSING_MIN_POOL_RECORD_COUNT)) - .retryConfig(EventMediatorConfigBuilder.RetryConfig(RETRY_TOPIC, ::buildRetryRequest)) - .build() + ) + .messageProcessor(messageProcessor) + .messageRouterFactory(createMessageRouterFactory(messagingConfig)) + .threads(messagingConfig.getInt(MEDIATOR_PROCESSING_THREAD_POOL_SIZE)) + .threadName("flow-event-mediator") + .stateManager(stateManager) + .minGroupSize(messagingConfig.getInt(MEDIATOR_PROCESSING_MIN_POOL_RECORD_COUNT)) + .retryConfig(EventMediatorConfigBuilder.RetryConfig(RETRY_TOPIC, ::buildRetryRequest)) + .build() + /** + * Build a request to trigger a resend of external events via the flow event pipeline. + * Request id is calculated from the previous request payload when possible to allow for some validation in the pipeline. + * This validation is an enhancement and not strictly required. + * A new Timestamp is set on each request to ensure each request is unique for replay logic handling. + * @param key the key of the input record + * @param syncRpcRequest the previous sync request which failed. + * @return list of output retry events. + */ private fun buildRetryRequest(key: String, syncRpcRequest: MediatorMessage) : List> { return try { val requestId = getRequestId(syncRpcRequest) @@ -147,12 +156,19 @@ class FlowEventMediatorFactoryImpl @Activate constructor( listOf(MediatorMessage(flowEvent, syncRpcRequest.properties)) } catch (ex: Exception) { //In this scenario we failed to build the retry event. This will likely result in the flow hanging until the idle processor - // kicks in. This shouldn't be possible as is just a safety net. + // kicks in. This shouldn't be possible and is just a safety net. logger.warn("Failed to generate a retry event for key $key. No retry will be triggered.", ex) emptyList() } } + /** + * Determine the external event request id where possible. + * Note, some token events have no request id as there is no response. + * For these use a hardcoded request id which will be ignored at the validation step. + * @param syncRpcRequest the previous request + * @return Request ID to set in the retry event. + */ private fun getRequestId(syncRpcRequest: MediatorMessage): String { return when (val entityRequest = deserializer.deserialize(syncRpcRequest.payload as ByteArray)) { is EntityRequest -> entityRequest.flowExternalEventContext.requestId diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt index 3209be4cb60..7571aa6b238 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt @@ -9,6 +9,10 @@ import org.osgi.service.component.annotations.Component import org.slf4j.Logger import org.slf4j.LoggerFactory +/** + * Handles pre-processing of events that are intended to trigger a resend of an external event. + * This can be triggered by the mediator in the event of transient errors. + */ @Component(service = [FlowEventHandler::class]) class ExternalEventRetryRequestHandler : FlowEventHandler { diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/ConsumerProcessor.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/ConsumerProcessor.kt index 21785d88f14..a0e3f7674f8 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/ConsumerProcessor.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/ConsumerProcessor.kt @@ -118,6 +118,7 @@ class ConsumerProcessor( metrics.processorTimer.recordCallable { try { val inputs = getInputs(consumer) + // If no records polled return early. if (inputs.isEmpty()) return@recordCallable val outputs = processInputs(inputs) categorizeOutputs(outputs, failureCounts) From bb69c144116e213529d27d59973281c03ba665e2 Mon Sep 17 00:00:00 2001 From: LWogan Date: Tue, 12 Nov 2024 14:21:39 +0000 Subject: [PATCH 12/18] CORE-20867: fix test compilation --- .../corda/messaging/mediator/processor/EventProcessorTest.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt index 48840d95a9d..c21110dd60d 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt @@ -270,7 +270,7 @@ class EventProcessorTest { retryConfig ) - private val buildRetryRequest: ((String, MediatorMessage) -> MediatorMessage) = { _, message -> - message + private val buildRetryRequest: ((String, MediatorMessage) -> List>) = { _, message -> + listOf(message) } } From bb6a16c0381513915b04a90213842704dca7823c Mon Sep 17 00:00:00 2001 From: LWogan Date: Tue, 12 Nov 2024 19:54:17 +0000 Subject: [PATCH 13/18] CORE-20867: fix discard logic and ensure key is set correctly --- .../flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt | 4 +++- .../handlers/events/ExternalEventRetryRequestHandler.kt | 6 ++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt index 09eebf4a679..ed7919e80c6 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt @@ -153,7 +153,9 @@ class FlowEventMediatorFactoryImpl @Activate constructor( .setFlowId(key) .setPayload(externalEventRetryRequest) .build() - listOf(MediatorMessage(flowEvent, syncRpcRequest.properties)) + //ensure key is set correctly on new message destined for flow topic + val properties = syncRpcRequest.properties.toMutableMap().apply { this["key"] = key } + listOf(MediatorMessage(flowEvent, properties)) } catch (ex: Exception) { //In this scenario we failed to build the retry event. This will likely result in the flow hanging until the idle processor // kicks in. This shouldn't be possible and is just a safety net. diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt index 7571aa6b238..559b8387fb5 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt @@ -58,8 +58,10 @@ class ExternalEventRetryRequestHandler : FlowEventHandler Date: Thu, 14 Nov 2024 10:19:24 +0000 Subject: [PATCH 14/18] CORE-20867: bump avro version --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index e8ece6bc0f1..c6f9f6c2203 100644 --- a/gradle.properties +++ b/gradle.properties @@ -33,7 +33,7 @@ activationVersion=1.2.0 ariesDynamicFrameworkExtensionVersion=1.3.6 antlrVersion=2.7.7 asmVersion=9.5 -avroVersion=1.11.3 +avroVersion=1.12.0 commonsVersion = 1.7 commonsLangVersion = 3.12.0 commonsTextVersion = 1.10.0 From 14815c09fd813eb09fb5c9ad745ebaa896750a77 Mon Sep 17 00:00:00 2001 From: LWogan Date: Thu, 14 Nov 2024 10:28:52 +0000 Subject: [PATCH 15/18] CORE-20867: bump avro version --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index c6f9f6c2203..5b1feddc9d0 100644 --- a/gradle.properties +++ b/gradle.properties @@ -33,7 +33,7 @@ activationVersion=1.2.0 ariesDynamicFrameworkExtensionVersion=1.3.6 antlrVersion=2.7.7 asmVersion=9.5 -avroVersion=1.12.0 +avroVersion=1.11.4 commonsVersion = 1.7 commonsLangVersion = 3.12.0 commonsTextVersion = 1.10.0 From b0cb5437dd7d1da9182dc0a23b0537bba6216b93 Mon Sep 17 00:00:00 2001 From: LWogan Date: Thu, 14 Nov 2024 11:10:36 +0000 Subject: [PATCH 16/18] CORE-20867: some pr comments --- .../mediator/FlowEventMediatorFactoryImpl.kt | 3 +- .../mediator/processor/EventProcessor.kt | 41 ++++++++++++------- .../mediator/processor/EventProcessorTest.kt | 6 +-- .../mediator/config/EventMediatorConfig.kt | 2 +- .../config/EventMediatorConfigBuilder.kt | 5 --- .../api/mediator/config/RetryConfig.kt | 14 +++++++ 6 files changed, 46 insertions(+), 25 deletions(-) create mode 100644 libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/RetryConfig.kt diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt index ed7919e80c6..0c4e8aaa676 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt @@ -32,6 +32,7 @@ import net.corda.messaging.api.mediator.RoutingDestination.Companion.routeTo import net.corda.messaging.api.mediator.RoutingDestination.Type.ASYNCHRONOUS import net.corda.messaging.api.mediator.RoutingDestination.Type.SYNCHRONOUS import net.corda.messaging.api.mediator.config.EventMediatorConfigBuilder +import net.corda.messaging.api.mediator.config.RetryConfig import net.corda.messaging.api.mediator.factory.MediatorConsumerFactory import net.corda.messaging.api.mediator.factory.MediatorConsumerFactoryFactory import net.corda.messaging.api.mediator.factory.MessageRouterFactory @@ -129,7 +130,7 @@ class FlowEventMediatorFactoryImpl @Activate constructor( .threadName("flow-event-mediator") .stateManager(stateManager) .minGroupSize(messagingConfig.getInt(MEDIATOR_PROCESSING_MIN_POOL_RECORD_COUNT)) - .retryConfig(EventMediatorConfigBuilder.RetryConfig(RETRY_TOPIC, ::buildRetryRequest)) + .retryConfig(RetryConfig(RETRY_TOPIC, ::buildRetryRequest)) .build() diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt index 8490bb2089d..24a67a269c7 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt @@ -163,18 +163,6 @@ class EventProcessor( return ConsumerInputOutput(processorStateUpdated, newAsyncOutputs) } - data class ConsumerInputOutput( - val updatedState: StateAndEventProcessor.State?, - val outputEvents: List>, - val isNoop: Boolean = false - ) - - data class SyncProcessingOutput( - val syncResponses: List>, - val isNoopRetry: Boolean = false, - val asyncOutputs: List> = emptyList() - ) - private fun getNextEvent( queue: ArrayDeque>, consumerInputHash: String @@ -232,10 +220,10 @@ class EventProcessor( ) } } catch (e: CordaMessageAPIIntermittentException) { - val outputEvents: MutableList> = mutableListOf() + val asyncOutputEvents: MutableList> = mutableListOf() if (retryConfig != null) { retryConfig.buildRetryRequest?.let { it(key, message) }?.let { - outputEvents.addAll(it) + asyncOutputEvents.addAll(it) } } @@ -243,7 +231,7 @@ class EventProcessor( // performance. // If we are not on the retry topic then we need to save the state before adding the retry event. This will allow the // flow cleanup processors to execute on an idle flow checkpoint - return SyncProcessingOutput(emptyList(), isRetryTopic, outputEvents) + return SyncProcessingOutput(emptyList(), isRetryTopic, asyncOutputEvents) } } return SyncProcessingOutput(outputEvents) @@ -263,4 +251,27 @@ class EventProcessor( private fun List>.toMessageProperties() = associateTo(mutableMapOf()) { (key, value) -> key to (value as Any) } + + + /** + * The outputs from processing a single consumer input event from the bus. + * This will includes updates from all sync events which are processed as a result of a consumer input + * @property updatedState The state and event processor output state after processing + */ + data class ConsumerInputOutput( + val updatedState: StateAndEventProcessor.State?, + val outputEvents: List>, + val isNoop: Boolean = false + ) + + /** + * The outputs from processing a single consumer input event from the bus. + * This will includes updates from all sync events which are processed as a result of a consumer input + * @property updatedState The state and event processor output state after processing + */ + data class SyncProcessingOutput( + val syncResponses: List>, + val isNoopRetry: Boolean = false, + val asyncOutputs: List> = emptyList() + ) } \ No newline at end of file diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt index c21110dd60d..bd33f7cf4d8 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt @@ -11,7 +11,7 @@ import net.corda.messaging.api.mediator.MessageRouter import net.corda.messaging.api.mediator.MessagingClient import net.corda.messaging.api.mediator.RoutingDestination import net.corda.messaging.api.mediator.config.EventMediatorConfig -import net.corda.messaging.api.mediator.config.EventMediatorConfigBuilder +import net.corda.messaging.api.mediator.config.RetryConfig import net.corda.messaging.api.mediator.factory.MessageRouterFactory import net.corda.messaging.api.processor.StateAndEventProcessor import net.corda.messaging.api.processor.StateAndEventProcessor.Response @@ -69,7 +69,7 @@ class EventProcessorTest { } else RoutingDestination(client, "endpoint", RoutingDestination.Type.ASYNCHRONOUS) } eventMediatorConfig = buildTestConfig() - val retryConfig = EventMediatorConfigBuilder.RetryConfig(retryTopic, buildRetryRequest) + val retryConfig = RetryConfig(retryTopic, buildRetryRequest) eventMediatorRetryConfig = buildTestConfig(retryConfig) whenever(stateAndEventProcessor.onNext(anyOrNull(), any())).thenAnswer { @@ -256,7 +256,7 @@ class EventProcessorTest { assertThat(output?.stateChangeAndOperation).isInstanceOf(StateChangeAndOperation.Create::class.java) } - private fun buildTestConfig(retryConfig: EventMediatorConfigBuilder.RetryConfig? = null) = EventMediatorConfig( + private fun buildTestConfig(retryConfig: RetryConfig? = null) = EventMediatorConfig( "", SmartConfigImpl.empty(), emptyList(), diff --git a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfig.kt b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfig.kt index 1174c28a60c..032ce9455b0 100644 --- a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfig.kt +++ b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfig.kt @@ -42,7 +42,7 @@ data class EventMediatorConfig( val threadName: String, val stateManager: StateManager, val minGroupSize: Int, - val retryConfig: EventMediatorConfigBuilder.RetryConfig? = null + val retryConfig: RetryConfig? = null ) { /** * Timeout for polling consumers. diff --git a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfigBuilder.kt b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfigBuilder.kt index d2ed98ddd6a..b928ea3eb78 100644 --- a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfigBuilder.kt +++ b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfigBuilder.kt @@ -2,7 +2,6 @@ package net.corda.messaging.api.mediator.config import net.corda.libs.configuration.SmartConfig import net.corda.libs.statemanager.api.StateManager -import net.corda.messaging.api.mediator.MediatorMessage import net.corda.messaging.api.mediator.MultiSourceEventMediator import net.corda.messaging.api.mediator.factory.MediatorConsumerFactory import net.corda.messaging.api.mediator.factory.MessageRouterFactory @@ -102,8 +101,4 @@ class EventMediatorConfigBuilder { ) } - data class RetryConfig( - val retryTopic: String, - val buildRetryRequest: ((K, MediatorMessage) -> List>)? = null, - ) } \ No newline at end of file diff --git a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/RetryConfig.kt b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/RetryConfig.kt new file mode 100644 index 00000000000..bf7009abd8a --- /dev/null +++ b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/RetryConfig.kt @@ -0,0 +1,14 @@ +package net.corda.messaging.api.mediator.config + +import net.corda.messaging.api.mediator.MediatorMessage + +/** + * Config used to setup retry events in the mediator. + * These are used to retry transient errors that occur. + * @property retryTopic The topic to send retry events to + * @property buildRetryRequest lambda used to generate the retry requests + */ +data class RetryConfig( + val retryTopic: String, + val buildRetryRequest: ((K, MediatorMessage) -> List>)? = null, +) From 8b806953b72a64179d06e766e11a01f4dab247c5 Mon Sep 17 00:00:00 2001 From: LWogan Date: Thu, 14 Nov 2024 12:29:23 +0000 Subject: [PATCH 17/18] CORE-20867: some pr comments and unit tests --- .../ExternalEventRetryRequestHandler.kt | 4 +- .../impl/ExternalEventManagerImplTest.kt | 39 +++++++++- .../FlowEventMediatorFactoryImplTest.kt | 4 + .../ExternalEventRetryRequestHandlerTest.kt | 74 +++++++++++++++++++ .../impl/FlowGlobalPostProcessorImplTest.kt | 19 ++++- 5 files changed, 133 insertions(+), 7 deletions(-) create mode 100644 components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandlerTest.kt diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt index 559b8387fb5..b1d6cf8c8c5 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt @@ -34,7 +34,7 @@ class ExternalEventRetryRequestHandler : FlowEventHandler() + private val externalEventRetryRequestHandler = ExternalEventRetryRequestHandler() + + @Test + fun `does not throw a flow event exception if the checkpoint exists and it is the correct request id`() { + whenever(checkpoint.doesExist).thenReturn(true) + whenever(checkpoint.externalEventState).thenReturn(ExternalEventState().apply { requestId = "requestId" }) + val context = buildFlowEventContext(checkpoint, externalEventRetryRequest) + + externalEventRetryRequestHandler.preProcess(context) + } + + @Test + fun `does not throw a flow event exception if the checkpoint exists and it a token request id`() { + whenever(checkpoint.doesExist).thenReturn(true) + whenever(checkpoint.externalEventState).thenReturn(ExternalEventState().apply { requestId = "requestId" }) + + val context = buildFlowEventContext(checkpoint, ExternalEventRetryRequest("TokenRetry", Instant.now())) + externalEventRetryRequestHandler.preProcess(context) + } + + @Test + fun `throws a flow event exception if the checkpoint does not exist`() { + whenever(checkpoint.doesExist).thenReturn(false) + + val context = buildFlowEventContext(checkpoint, externalEventRetryRequest) + + assertThrows { + externalEventRetryRequestHandler.preProcess(context) + } + } + + @Test + fun `throws a flow event exception if the flow is not waiting for an external event response`() { + whenever(checkpoint.doesExist).thenReturn(true) + whenever(checkpoint.externalEventState).thenReturn(null) + + val context = buildFlowEventContext(checkpoint, externalEventRetryRequest) + + assertThrows { + externalEventRetryRequestHandler.preProcess(context) + } + } + + @Test + fun `throws a flow event exception if the flow is waiting for a different external event response`() { + whenever(checkpoint.doesExist).thenReturn(true) + whenever(checkpoint.externalEventState).thenReturn(ExternalEventState().apply { requestId = "OtherRequestId" }) + + val context = buildFlowEventContext(checkpoint, externalEventRetryRequest) + + assertThrows { + externalEventRetryRequestHandler.preProcess(context) + } + } + +} \ No newline at end of file diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowGlobalPostProcessorImplTest.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowGlobalPostProcessorImplTest.kt index 38b41ed43a3..dd1daf6280f 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowGlobalPostProcessorImplTest.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowGlobalPostProcessorImplTest.kt @@ -2,6 +2,7 @@ package net.corda.flow.pipeline.impl import net.corda.data.flow.FlowKey import net.corda.data.flow.event.SessionEvent +import net.corda.data.flow.event.external.ExternalEventRetryRequest import net.corda.data.flow.event.mapper.FlowMapperEvent import net.corda.data.flow.event.mapper.ScheduleCleanup import net.corda.data.flow.event.session.SessionData @@ -15,6 +16,7 @@ import net.corda.flow.BOB_X500_NAME import net.corda.flow.FLOW_ID_1 import net.corda.flow.REQUEST_ID_1 import net.corda.flow.external.events.impl.ExternalEventManager +import net.corda.flow.pipeline.events.FlowEventContext import net.corda.flow.pipeline.exceptions.FlowFatalException import net.corda.flow.pipeline.factory.FlowRecordFactory import net.corda.flow.state.FlowCheckpoint @@ -103,7 +105,7 @@ class FlowGlobalPostProcessorImplTest { private val membershipGroupReaderProvider = mock() private val membershipGroupReader = mock() private val checkpoint = mock() - private val testContext = buildFlowEventContext(checkpoint, Any()) + private lateinit var testContext: FlowEventContext private val flowGlobalPostProcessor = FlowGlobalPostProcessorImpl( externalEventManager, sessionManager, @@ -114,6 +116,7 @@ class FlowGlobalPostProcessorImplTest { @Suppress("Unused") @BeforeEach fun setup() { + testContext = buildFlowEventContext(checkpoint, Any()) whenever(checkpoint.sessions).thenReturn(listOf(sessionState1, sessionState2)) whenever(checkpoint.flowKey).thenReturn(FlowKey(FLOW_ID_1, ALICE_X500_HOLDING_IDENTITY)) whenever(checkpoint.holdingIdentity).thenReturn(ALICE_X500_HOLDING_IDENTITY.toCorda()) @@ -262,6 +265,20 @@ class FlowGlobalPostProcessorImplTest { verify(checkpoint).clearPendingPlatformError() } + @Test + fun `Adds external event record when there is a retry instruction`() { + val externalEventState = ExternalEventState() + + testContext = buildFlowEventContext(checkpoint, ExternalEventRetryRequest(REQUEST_ID_1, Instant.now())) + whenever(checkpoint.externalEventState).thenReturn(externalEventState) + whenever(externalEventManager.getRetryEvent(eq(externalEventState), any())) + .thenReturn(externalEventRecord) + + val outputContext = flowGlobalPostProcessor.postProcess(testContext) + + assertThat(outputContext.outputRecords).contains(externalEventRecord) + } + @Test fun `Adds external event record when there is an external event to send`() { val externalEventState = ExternalEventState() From 18cbbc9b0136dd0f3597098ff91cac8224bc9538 Mon Sep 17 00:00:00 2001 From: LWogan Date: Thu, 14 Nov 2024 12:39:20 +0000 Subject: [PATCH 18/18] CORE-20867:bump api version --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 5b1feddc9d0..6c6287a28ae 100644 --- a/gradle.properties +++ b/gradle.properties @@ -39,7 +39,7 @@ commonsLangVersion = 3.12.0 commonsTextVersion = 1.10.0 # Corda API libs revision (change in 4th digit indicates a breaking change) # Change to 5.2.1.xx-SNAPSHOT to pick up maven local published copy -cordaApiVersion=5.2.1.54-alpha-1731333866451 +cordaApiVersion=5.2.1.54-beta+ disruptorVersion=3.4.4 felixConfigAdminVersion=1.9.26