Skip to content

Commit

Permalink
CORE-20543 Retry sending unauthenticated message to MGM (#6140)
Browse files Browse the repository at this point in the history
An outbound unauthenticated message (only sent during first-time communication from member to MGM) is dropped if the MGM-info has not propagated through to the group reader after it has been published by the registration service. Although the membership worker retries registration when it does not hear back from the MGM after 40 seconds, this delay causes test pipelines to fail. This change implements retries on the link manager worker so that the message is scheduled to be republished to p2p.out (up to 3 times with backoff logic).
  • Loading branch information
YashNabar authored May 17, 2024
1 parent b44805c commit 4881fe9
Showing 4 changed files with 152 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -3,10 +3,13 @@ package net.corda.p2p.linkmanager.outbound
import net.corda.configuration.read.ConfigurationReadService
import net.corda.libs.configuration.SmartConfig
import net.corda.lifecycle.LifecycleCoordinatorFactory
import net.corda.lifecycle.domino.logic.ComplexDominoTile
import net.corda.lifecycle.domino.logic.LifecycleWithDominoTile
import net.corda.lifecycle.domino.logic.util.PublisherWithDominoLogic
import net.corda.lifecycle.domino.logic.util.SubscriptionDominoTile
import net.corda.membership.grouppolicy.GroupPolicyProvider
import net.corda.membership.read.MembershipGroupReaderProvider
import net.corda.messaging.api.publisher.config.PublisherConfig
import net.corda.messaging.api.publisher.factory.PublisherFactory
import net.corda.messaging.api.subscription.config.SubscriptionConfig
import net.corda.messaging.api.subscription.factory.SubscriptionFactory
@@ -15,6 +18,7 @@ import net.corda.p2p.linkmanager.hosting.LinkManagerHostingMap
import net.corda.p2p.linkmanager.delivery.DeliveryTracker
import net.corda.schema.Schemas
import net.corda.utilities.time.Clock
import java.util.concurrent.Executors

@Suppress("LongParameterList")
internal class OutboundLinkManager(
@@ -31,7 +35,17 @@ internal class OutboundLinkManager(
) : LifecycleWithDominoTile {
companion object {
private const val OUTBOUND_MESSAGE_PROCESSOR_GROUP = "outbound_message_processor_group"
private const val OUTBOUND_MESSAGE_PROCESSOR_ID = "outbound_message_processor"

}
private val publisher = PublisherWithDominoLogic(
publisherFactory,
lifecycleCoordinatorFactory,
PublisherConfig(OUTBOUND_MESSAGE_PROCESSOR_ID),
messagingConfiguration
)
private val scheduledExecutor =
Executors.newSingleThreadScheduledExecutor { runnable -> Thread(runnable, OUTBOUND_MESSAGE_PROCESSOR_ID) }
private val outboundMessageProcessor = OutboundMessageProcessor(
commonComponents.sessionManager,
linkManagerHostingMap,
@@ -41,6 +55,8 @@ internal class OutboundLinkManager(
commonComponents.messagesPendingSession,
clock,
commonComponents.messageConverter,
publisher,
scheduledExecutor,
)
private val deliveryTracker = DeliveryTracker(
lifecycleCoordinatorFactory,
@@ -63,15 +79,31 @@ internal class OutboundLinkManager(
)
}

override val dominoTile = SubscriptionDominoTile(
private val subscriptionTile = SubscriptionDominoTile(
lifecycleCoordinatorFactory,
outboundMessageSubscription,
subscriptionConfig,
dependentChildren = listOf(
deliveryTracker.dominoTile.coordinatorName,
commonComponents.dominoTile.coordinatorName,
commonComponents.inboundAssignmentListener.dominoTile.coordinatorName,
publisher.dominoTile.coordinatorName,
),
managedChildren = setOf(
deliveryTracker.dominoTile.toNamedLifecycle(),
publisher.dominoTile.toNamedLifecycle(),
)
)

override val dominoTile = ComplexDominoTile(
this.javaClass.simpleName,
lifecycleCoordinatorFactory,
onClose = { scheduledExecutor.shutdown() },
dependentChildren = setOf(
subscriptionTile.coordinatorName,
),
managedChildren = setOf(deliveryTracker.dominoTile.toNamedLifecycle())
managedChildren = setOf(
subscriptionTile.toNamedLifecycle(),
)
)
}
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ import net.corda.data.p2p.markers.LinkManagerProcessedMarker
import net.corda.data.p2p.markers.LinkManagerReceivedMarker
import net.corda.data.p2p.markers.LinkManagerSentMarker
import net.corda.data.p2p.markers.TtlExpiredMarker
import net.corda.lifecycle.domino.logic.util.PublisherWithDominoLogic
import net.corda.membership.grouppolicy.GroupPolicyProvider
import net.corda.membership.read.MembershipGroupReaderProvider
import net.corda.messaging.api.processor.EventLogProcessor
@@ -45,6 +46,9 @@ import net.corda.membership.lib.exceptions.BadGroupPolicyException
import net.corda.p2p.linkmanager.TraceableItem
import net.corda.p2p.linkmanager.metrics.recordOutboundMessagesMetric
import net.corda.p2p.linkmanager.metrics.recordOutboundSessionMessagesMetric
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit

@Suppress("LongParameterList", "TooManyFunctions")
internal class OutboundMessageProcessor(
@@ -56,6 +60,8 @@ internal class OutboundMessageProcessor(
private val messagesPendingSession: PendingSessionMessageQueues,
private val clock: Clock,
private val messageConverter: MessageConverter,
private val publisher: PublisherWithDominoLogic,
private val scheduledExecutor: ScheduledExecutorService,
private val networkMessagingValidator: NetworkMessagingValidator =
NetworkMessagingValidator(membershipGroupReaderProvider),
) : EventLogProcessor<String, AppMessage> {
@@ -66,6 +72,8 @@ internal class OutboundMessageProcessor(

companion object {
private const val tracingEventName = "P2P Link Manager Outbound Event"
private const val FIRST_RETRY_DELAY = 500L
private const val MAX_RETRY_DELAY = 2000L
fun recordsForNewSessions(
state: SessionManager.SessionState.NewSessionsNeeded,
inboundAssignmentListener: InboundAssignmentListener,
@@ -93,6 +101,11 @@ internal class OutboundMessageProcessor(
}
}

/**
* Map of unauthenticated message ID to the previous republishing delay
*/
private val unauthenticatedMessageReplays = ConcurrentHashMap<String, Long>()

private fun ttlExpired(ttl: Instant?): Boolean {
if (ttl == null) return false
val currentTimeInTimeMillis = clock.instant()
@@ -122,7 +135,7 @@ internal class OutboundMessageProcessor(
}

val results = unauthenticatedMessages.map { (message, event) ->
TraceableItem(processUnauthenticatedMessage(message), event)
TraceableItem(processUnauthenticatedMessage(message, event), event)
} + processAuthenticatedMessages(authenticatedMessages)

for (result in results) {
@@ -192,7 +205,10 @@ internal class OutboundMessageProcessor(
return outResult ?: inResult
}

private fun processUnauthenticatedMessage(message: OutboundUnauthenticatedMessage): List<Record<String, *>> {
private fun processUnauthenticatedMessage(
message: OutboundUnauthenticatedMessage,
originalRecord: EventLogRecord<String, AppMessage>?,
): List<Record<String, *>> {
logger.debug { "Processing outbound message ${message.header.messageId} to ${message.header.destination}." }

val discardReason = checkSourceAndDestinationValid(
@@ -204,7 +220,10 @@ internal class OutboundMessageProcessor(
"from ${message.header.source} to ${message.header.destination} as the " +
discardReason
)
originalRecord?.scheduleRepublish(message.header.messageId)
return emptyList()
} else {
unauthenticatedMessageReplays.remove(message.header.messageId)
}

val destinationMemberInfo = membershipGroupReaderProvider.lookup(
@@ -251,6 +270,41 @@ internal class OutboundMessageProcessor(
}
}

private fun EventLogRecord<String, AppMessage>.scheduleRepublish(messageId: String) {
val delay = unauthenticatedMessageReplays.compute(messageId) { _, previous -> getRepublishDelay(previous) }
if (delay == null) {
logger.debug { "Stopping republishing of outbound unauthenticated message '$messageId'." }
unauthenticatedMessageReplays.remove(messageId)
} else {
scheduledExecutor.schedule(
{
logger.debug { "Republishing outbound unauthenticated message '$messageId'." }
publisher.publish(
listOf(
Record(
topic = this.topic,
key = this.key,
value = this.value,
timestamp = this.timestamp,
headers = this.headers
)
)
)
},
delay,
TimeUnit.MILLISECONDS,
)
}
}

private fun getRepublishDelay(previousDelay: Long?): Long? = previousDelay?.let {
if (it >= MAX_RETRY_DELAY) {
return null
} else {
it * 2
}
} ?: FIRST_RETRY_DELAY

fun processReplayedAuthenticatedMessage(messageAndKey: AuthenticatedMessageAndKey): List<Record<String, *>> =
processAuthenticatedMessages(listOf(TraceableItem(messageAndKey, null)), true).flatMap { it.item }

Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package net.corda.p2p.linkmanager.outbound

import net.corda.data.identity.HoldingIdentity
import net.corda.membership.grouppolicy.GroupPolicyProvider
import net.corda.messaging.api.records.EventLogRecord
import net.corda.data.p2p.AuthenticatedMessageAndKey
import net.corda.data.p2p.LinkOutMessage
import net.corda.data.p2p.SessionPartitions
@@ -14,19 +12,26 @@ import net.corda.data.p2p.app.InboundUnauthenticatedMessageHeader
import net.corda.data.p2p.app.MembershipStatusFilter
import net.corda.data.p2p.app.OutboundUnauthenticatedMessage
import net.corda.data.p2p.app.OutboundUnauthenticatedMessageHeader
import net.corda.data.p2p.markers.AppMessageMarker
import net.corda.data.p2p.markers.LinkManagerDiscardedMarker
import net.corda.data.p2p.markers.LinkManagerProcessedMarker
import net.corda.data.p2p.markers.LinkManagerReceivedMarker
import net.corda.data.p2p.markers.TtlExpiredMarker
import net.corda.lifecycle.domino.logic.util.PublisherWithDominoLogic
import net.corda.membership.grouppolicy.GroupPolicyProvider
import net.corda.membership.lib.exceptions.BadGroupPolicyException
import net.corda.messaging.api.records.EventLogRecord
import net.corda.messaging.api.records.Record
import net.corda.p2p.crypto.protocol.api.AuthenticatedSession
import net.corda.p2p.crypto.protocol.api.AuthenticationResult
import net.corda.p2p.linkmanager.TraceableItem
import net.corda.p2p.linkmanager.common.MessageConverter
import net.corda.p2p.linkmanager.hosting.LinkManagerHostingMap
import net.corda.p2p.linkmanager.inbound.InboundAssignmentListener
import net.corda.p2p.linkmanager.membership.NetworkMessagingValidator
import net.corda.p2p.linkmanager.sessions.PendingSessionMessageQueues
import net.corda.p2p.linkmanager.sessions.SessionManager
import net.corda.p2p.linkmanager.utilities.mockMembersAndGroups
import net.corda.data.p2p.markers.AppMessageMarker
import net.corda.data.p2p.markers.LinkManagerDiscardedMarker
import net.corda.data.p2p.markers.LinkManagerReceivedMarker
import net.corda.data.p2p.markers.LinkManagerProcessedMarker
import net.corda.data.p2p.markers.TtlExpiredMarker
import net.corda.p2p.linkmanager.membership.NetworkMessagingValidator
import net.corda.schema.Schemas
import net.corda.test.util.identity.createTestHoldingIdentity
import net.corda.test.util.time.MockTimeFacilitiesProvider
@@ -37,19 +42,18 @@ import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.SoftAssertions.assertSoftly
import org.junit.jupiter.api.Test
import org.mockito.kotlin.any
import org.mockito.kotlin.argumentCaptor
import org.mockito.kotlin.doReturn
import org.mockito.kotlin.doThrow
import org.mockito.kotlin.eq
import org.mockito.kotlin.mock
import org.mockito.kotlin.never
import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever
import java.nio.ByteBuffer
import java.time.Instant
import net.corda.membership.lib.exceptions.BadGroupPolicyException
import net.corda.messaging.api.records.Record
import net.corda.p2p.linkmanager.TraceableItem
import net.corda.p2p.linkmanager.common.MessageConverter
import org.mockito.kotlin.argumentCaptor
import org.mockito.kotlin.doThrow
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit

class OutboundMessageProcessorTest {
private val myIdentity = createTestHoldingIdentity("CN=PartyA, O=Corp, L=LDN, C=GB", "Group")
@@ -86,6 +90,8 @@ class OutboundMessageProcessorTest {
on { validateInbound(any(), any()) } doReturn Either.Left(Unit)
on { validateOutbound(any(), any()) } doReturn Either.Left(Unit)
}
private val publisher = mock<PublisherWithDominoLogic>()
private val scheduledExecutorService = mock<ScheduledExecutorService>()

private val processor = OutboundMessageProcessor(
sessionManager,
@@ -96,7 +102,9 @@ class OutboundMessageProcessorTest {
messagesPendingSession,
mockTimeFacilitiesProvider.clock,
messageConverter,
networkMessagingValidator
publisher,
scheduledExecutorService,
networkMessagingValidator,
)

private fun setupSessionManager(response: SessionManager.SessionState) {
@@ -604,6 +612,8 @@ class OutboundMessageProcessorTest {
messagesPendingSession,
mockTimeFacilitiesProvider.clock,
messageConverter,
publisher,
scheduledExecutorService,
networkMessagingValidator,
)

@@ -649,6 +659,8 @@ class OutboundMessageProcessorTest {
messagesPendingSession,
mockTimeFacilitiesProvider.clock,
messageConverter,
publisher,
scheduledExecutorService,
networkMessagingValidator,
)

@@ -779,6 +791,41 @@ class OutboundMessageProcessorTest {
assertThat(records).isEmpty()
}

@Test
fun `dropped outbound unauthenticated messages are scheduled to be republished`() {
whenever(
networkMessagingValidator.validateOutbound(any(), any())
).doReturn(Either.Right("foo-bar"))
val payload = "test"
val unauthenticatedMsg = OutboundUnauthenticatedMessage(
OutboundUnauthenticatedMessageHeader(
remoteIdentity.toAvro(),
localIdentity.toAvro(),
"subsystem",
"messageId",
),
ByteBuffer.wrap(payload.toByteArray()),
)
val appMessage = AppMessage(unauthenticatedMsg)

processor.onNext(
listOf(
EventLogRecord(
Schemas.P2P.P2P_OUT_TOPIC,
"key",
appMessage,
1,
0
)
)
)

val publishCaptor = argumentCaptor<Runnable>()
verify(scheduledExecutorService).schedule(publishCaptor.capture(), eq(500L), eq(TimeUnit.MILLISECONDS))
publishCaptor.firstValue.run()
verify(publisher).publish(listOf(Record(Schemas.P2P.P2P_OUT_TOPIC, "key", appMessage)))
}

@Test
fun `onNext produces only a LinkManagerProcessed marker (per flowMessage) if SessionAlreadyPending`() {
setupSessionManager(SessionManager.SessionState.SessionAlreadyPending(sessionCounterparties))
Original file line number Diff line number Diff line change
@@ -13,13 +13,11 @@ import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import picocli.CommandLine
import java.io.File
import java.util.UUID

@Disabled
class OnboardMemberTest {
companion object {
private const val CPB_FILE = "test-cordapp.cpb"

0 comments on commit 4881fe9

Please sign in to comment.