Skip to content

Commit

Permalink
CORE-20867: update to the latest api version. some self review updates
Browse files Browse the repository at this point in the history
  • Loading branch information
LWogan committed Nov 8, 2024
1 parent a9979c5 commit 7a2d71b
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ interface ExternalEventManager {
* Returns the event as is from the state. No additional checks required.
* @param externalEventState The [ExternalEventState] to get the event from.
* @param instant The current time. Used to set timestamp.
* @return The external event request to resend
* */
fun getRetryEvent(
externalEventState: ExternalEventState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ class ExternalEventManagerImpl(
override fun getRetryEvent(
externalEventState: ExternalEventState,
): Record<*, *> {
//Don't update ExternalEventState with new timestamp as this will result in State change being detected and potentially
// additional checkpoints saved for cases where multiple sequential RPC calls have transient retry errors
//Don't update ExternalEventState with new timestamp as this will result in State change being detected by the message pattern
// and additional checkpoints saved for cases where multiple sequential RPC calls have transient retry errors
return generateRecord(externalEventState, null)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ class FlowEventMediatorFactoryImpl @Activate constructor(
private const val RPC_CLIENT = "RpcClient"
private const val RETRY_TOPIC_POLL_LIMIT = 5
private const val RETRY_TOPIC = FLOW_EVENT_TOPIC
private const val REQUEST_ID_HEADER = "request_id"

private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass)
}
Expand Down Expand Up @@ -134,7 +133,6 @@ class FlowEventMediatorFactoryImpl @Activate constructor(
.build()

private fun buildRetryRequest(key: String, syncRpcRequest: MediatorMessage<Any>) : MediatorMessage<Any> {
// TODO are they all entity requests
val entityRequest = deserializer.deserialize(syncRpcRequest.payload as ByteArray) as EntityRequest
val requestId = entityRequest.flowExternalEventContext.requestId
val externalEventRetryRequest = ExternalEventRetryRequest.newBuilder()
Expand Down Expand Up @@ -166,7 +164,6 @@ class FlowEventMediatorFactoryImpl @Activate constructor(
}

private fun getRetryTopicConfig(messagingConfig: SmartConfig): SmartConfig {
// TODO - perhaps we should configure consumer to poll less frequently than primary topic consumers
return messagingConfig.withValue(KAFKA_CONSUMER_MAX_POLL_RECORDS, ConfigValueFactory.fromAnyRef(RETRY_TOPIC_POLL_LIMIT))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import net.corda.libs.configuration.SmartConfig
import net.corda.libs.configuration.helper.getConfig
import net.corda.libs.statemanager.api.Metadata
import net.corda.schema.configuration.ConfigKeys
import net.corda.schema.configuration.MessagingConfig
import net.corda.schema.configuration.FlowConfig
import net.corda.utilities.debug
import org.osgi.service.component.annotations.Component
import org.slf4j.Logger
Expand Down Expand Up @@ -105,5 +105,5 @@ class ExternalEventRetryRequestHandler : FlowEventHandler<ExternalEventRetryRequ
}

private fun retryTimeout(config: SmartConfig) =
config.getLong(MessagingConfig.Subscription.MEDIATOR_PROCESSING_TRANSIENT_ERROR_TIMEOUT)
config.getLong(FlowConfig.EXTERNAL_EVENT_TRANSIENT_ERROR_TIMEOUT)
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ commonsLangVersion = 3.12.0
commonsTextVersion = 1.10.0
# Corda API libs revision (change in 4th digit indicates a breaking change)
# Change to 5.2.1.xx-SNAPSHOT to pick up maven local published copy
cordaApiVersion=5.2.1.54-alpha-1731001992820
cordaApiVersion=5.2.1.54-alpha-1731056624538

disruptorVersion=3.4.4
felixConfigAdminVersion=1.9.26
Expand Down

0 comments on commit 7a2d71b

Please sign in to comment.