From ff9c7f37de40e34aa98be21725218ebee7641e2a Mon Sep 17 00:00:00 2001 From: Dries Samyn Date: Wed, 8 Nov 2023 13:33:08 +0000 Subject: [PATCH 01/18] CORE-18194 - Revert `fetch.max.wait.ms` default config (#5050) * Temporary revert Kafka config changes to test impact on CPU load. * Set fetch.max.wait.ms = 500 * Revert kafka configs back to release version, except from `fetch.max.wait.ms` * Increase mediator pull frequency. --- .../src/main/resources/kafka-messaging-defaults.conf | 2 +- .../corda/messaging/mediator/MultiSourceEventMediatorImpl.kt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/libs/messaging/kafka-message-bus-impl/src/main/resources/kafka-messaging-defaults.conf b/libs/messaging/kafka-message-bus-impl/src/main/resources/kafka-messaging-defaults.conf index bd11c722869..6a89e3f335b 100644 --- a/libs/messaging/kafka-message-bus-impl/src/main/resources/kafka-messaging-defaults.conf +++ b/libs/messaging/kafka-message-bus-impl/src/main/resources/kafka-messaging-defaults.conf @@ -29,7 +29,7 @@ consumer = ${common} { heartbeat.interval.ms = 20000 # The maximum amount of time kafka broker will wait before answering a consumer request if there hasn't been # an update to the topic - fetch.max.wait.ms = 5 + fetch.max.wait.ms = 500 } # Defaults for all producers. diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt index 339ee7e5027..73a25b032f9 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt @@ -69,7 +69,7 @@ class MultiSourceEventMediatorImpl( private val running = AtomicBoolean(false) // TODO This timeout was set with CORE-17768 (changing configuration value would affect other messaging patterns) // This should be reverted to use configuration value once event mediator polling is refactored (planned for 5.2) - private val pollTimeout = Duration.ofMillis(5) + private val pollTimeout = Duration.ofMillis(50) override fun start() { log.debug { "Starting multi-source event mediator with config: $config" } From 576888d7deff3af6e98c7b0ddd1773d513e7d521 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Jos=C3=A9=20Ramos?= Date: Wed, 8 Nov 2023 17:22:18 +0000 Subject: [PATCH 02/18] CORE-18239: Delete Correct State Within Mediator (#5063) Update event mediator so it uses the State returned by the processor, instead of the original one, when making the decision on whether to delete it from the persistent storage or not. Failing to do so leaves orphaned States in the persistent storage, which in turn ends up causing other issues in the system. --------- Co-authored-by: James Higgs --- .../mediator/MultiSourceEventMediatorImpl.kt | 57 ++++++++++++------- .../MultiSourceEventMediatorImplTest.kt | 49 +++++++++++++++- 2 files changed, 84 insertions(+), 22 deletions(-) diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt index 73a25b032f9..67e2e4f7053 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt @@ -165,12 +165,14 @@ class MultiSourceEventMediatorImpl( if (messages.isNotEmpty()) { var groups = allocateGroups(messages.map { it.toRecord() }) var states = stateManager.get(messages.map { it.key.toString() }.distinct()) + while (groups.isNotEmpty()) { val asynchronousOutputs = mutableListOf>() - val newStates = ConcurrentHashMap() - val updateStates = ConcurrentHashMap() - val deleteStates = ConcurrentHashMap() + val statesToCreate = ConcurrentHashMap() + val statesToUpdate = ConcurrentHashMap() + val statesToDelete = ConcurrentHashMap() val flowEvents = ConcurrentHashMap>>() + // Process each group on a thread groups.filter { it.isNotEmpty() @@ -209,20 +211,7 @@ class MultiSourceEventMediatorImpl( processorState, ) - // New state - if (state == null && processedState != null) { - newStates[it.key.toString()] = processedState - } - - // Update state - if (state != null && processedState != null) { - updateStates[it.key.toString()] = processedState - } - - // Delete state - if (state != null && processorState == null) { - deleteStates[it.key.toString()] = state - } + qualifyState(it.key.toString(), state, processedState, statesToCreate, statesToUpdate, statesToDelete) } } }.map { @@ -231,10 +220,10 @@ class MultiSourceEventMediatorImpl( sendAsynchronousEvents(asynchronousOutputs) // Persist states changes - val failedToCreateKeys = stateManager.create(newStates.values.mapNotNull { it }) + val failedToCreateKeys = stateManager.create(statesToCreate.values.mapNotNull { it }) val failedToCreate = stateManager.get(failedToCreateKeys.keys) - val failedToDelete = stateManager.delete(deleteStates.values.mapNotNull { it }) - val failedToUpdate = stateManager.update(updateStates.values.mapNotNull { it }) + val failedToDelete = stateManager.delete(statesToDelete.values.mapNotNull { it }) + val failedToUpdate = stateManager.update(statesToUpdate.values.mapNotNull { it }) states = failedToCreate + failedToDelete + failedToUpdate groups = if (states.isNotEmpty()) { allocateGroups(flowEvents.filterKeys { states.containsKey(it) }.values.flatten()) @@ -249,6 +238,34 @@ class MultiSourceEventMediatorImpl( metrics.processorTimer.record(System.nanoTime() - startTimestamp, TimeUnit.NANOSECONDS) } + /** + * Decide, based on the original and processed state values, whether the state must be deleted, updated or + * deleted; and add the relevant state value to the specific Map. + */ + fun qualifyState( + groupId: String, + original: State?, + processed: State?, + toCreate : MutableMap, + toUpdate : MutableMap, + toDelete : MutableMap + ) { + // New state + if (original == null && processed != null) { + toCreate[groupId] = processed + } + + // Update state + if (original != null && processed != null) { + toUpdate[groupId] = processed + } + + // Delete state + if (original != null && processed == null) { + toDelete[groupId] = original + } + } + private fun sendAsynchronousEvents(busEvents: MutableList>) { busEvents.forEach { message -> with(messageRouter.getDestination(message)) { diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImplTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImplTest.kt index 62b2e99dacb..b446a17f605 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImplTest.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImplTest.kt @@ -3,6 +3,7 @@ package net.corda.messaging.mediator import net.corda.avro.serialization.CordaAvroDeserializer import net.corda.avro.serialization.CordaAvroSerializer import net.corda.libs.configuration.SmartConfig +import net.corda.libs.statemanager.api.State import net.corda.libs.statemanager.api.StateManager import net.corda.lifecycle.LifecycleCoordinatorFactory import net.corda.messagebus.api.consumer.CordaConsumerRecord @@ -10,7 +11,6 @@ import net.corda.messaging.api.exception.CordaMessageAPIIntermittentException import net.corda.messaging.api.mediator.MediatorConsumer import net.corda.messaging.api.mediator.MessageRouter import net.corda.messaging.api.mediator.MessagingClient -import net.corda.messaging.api.mediator.MultiSourceEventMediator import net.corda.messaging.api.mediator.RoutingDestination import net.corda.messaging.api.mediator.config.EventMediatorConfig import net.corda.messaging.api.mediator.config.EventMediatorConfigBuilder @@ -25,7 +25,9 @@ import net.corda.messaging.api.records.Record import net.corda.schema.configuration.MessagingConfig import net.corda.taskmanager.TaskManager import net.corda.test.util.waitWhile +import org.assertj.core.api.Assertions import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test import org.mockito.kotlin.any import org.mockito.kotlin.anyOrNull import org.mockito.kotlin.atLeast @@ -36,6 +38,7 @@ import org.mockito.kotlin.verify import org.mockito.kotlin.whenever import java.time.Duration import java.util.concurrent.CompletableFuture +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicInteger class MultiSourceEventMediatorImplTest { @@ -48,7 +51,7 @@ class MultiSourceEventMediatorImplTest { private val messagingConfig = mock() private lateinit var config: EventMediatorConfig - private lateinit var mediator: MultiSourceEventMediator + private lateinit var mediator: MultiSourceEventMediatorImpl private val mediatorConsumerFactory = mock() private val consumer = mock>() private val messagingClientFactory = mock() @@ -207,6 +210,48 @@ class MultiSourceEventMediatorImplTest { verify(messagingClient, times(expectedProcessingCount)).send(any()) } + @Test + fun qualifyStateMarksStateAsNewWhenOriginalIsNullAndProcessedIsNot() { + val toCreate = ConcurrentHashMap() + val toUpdate = ConcurrentHashMap() + val toDelete = ConcurrentHashMap() + val originalState = null + val processedState = State("key1", "".toByteArray()) + mediator.qualifyState("groupKey", originalState, processedState, toCreate, toUpdate, toDelete) + + Assertions.assertThat(toCreate).containsExactly(Assertions.entry("groupKey", processedState)) + Assertions.assertThat(toUpdate).isEmpty() + Assertions.assertThat(toDelete).isEmpty() + } + + @Test + fun qualifyStateMarksStateAsUpdatableWhenOriginalAndProcessedAreNotNull() { + val toCreate = ConcurrentHashMap() + val toUpdate = ConcurrentHashMap() + val toDelete = ConcurrentHashMap() + val originalState = State("key1", "".toByteArray()) + val processedState = State("key1", "nonEmpty".toByteArray()) + mediator.qualifyState("groupKey", originalState, processedState, toCreate, toUpdate, toDelete) + + Assertions.assertThat(toCreate).isEmpty() + Assertions.assertThat(toUpdate).containsExactly(Assertions.entry("groupKey", processedState)) + Assertions.assertThat(toDelete).isEmpty() + } + + @Test + fun qualifyStateMarksStateAsDeletableWhenProcessedIsNullAndOriginalIsNot() { + val toCreate = ConcurrentHashMap() + val toUpdate = ConcurrentHashMap() + val toDelete = ConcurrentHashMap() + val originalState = State("key1", "".toByteArray()) + val processedState = null + mediator.qualifyState("groupKey", originalState, processedState, toCreate, toUpdate, toDelete) + + Assertions.assertThat(toCreate).isEmpty() + Assertions.assertThat(toUpdate).isEmpty() + Assertions.assertThat(toDelete).containsExactly(Assertions.entry("groupKey", originalState)) + } + private fun cordaConsumerRecords(key: String, event: String) = CordaConsumerRecord( topic = "", partition = 0, offset = 0, key, event, timestamp = 0 From 2cd6a290b3e8a3858542e6e14dbd0a78581d2177 Mon Sep 17 00:00:00 2001 From: Ben Millar <44114751+ben-millar@users.noreply.github.com> Date: Thu, 9 Nov 2023 11:33:58 +0000 Subject: [PATCH 03/18] CORE-18255 Replacing epochSecond check with epochMilli reducing test flakiness (#5064) Comparing the timestamp between requests and responses was, in some cases, causing test failures if the times were too close together. Moving to a more granular comparison to prevent this. --- .../workers/smoketest/services/CryptoRPCSmokeTests.kt | 6 +++--- .../crypto/service/impl/bus/CryptoOpsBusProcessorTests.kt | 6 +++--- .../service/impl/bus/HSMRegistrationBusProcessorTests.kt | 6 +++--- .../service/impl/MemberOpsServiceProcessorTest.kt | 4 ++-- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/applications/workers/workers-smoketest/src/smokeTest/kotlin/net/corda/applications/workers/smoketest/services/CryptoRPCSmokeTests.kt b/applications/workers/workers-smoketest/src/smokeTest/kotlin/net/corda/applications/workers/smoketest/services/CryptoRPCSmokeTests.kt index 1dde1fbfe77..77a4043b2e0 100644 --- a/applications/workers/workers-smoketest/src/smokeTest/kotlin/net/corda/applications/workers/smoketest/services/CryptoRPCSmokeTests.kt +++ b/applications/workers/workers-smoketest/src/smokeTest/kotlin/net/corda/applications/workers/smoketest/services/CryptoRPCSmokeTests.kt @@ -225,9 +225,9 @@ class CryptoRPCSmokeTests { assertEquals(expected.requestId, actual.requestId) assertEquals(expected.requestingComponent, actual.requestingComponent) assertEquals(expected.requestTimestamp, actual.requestTimestamp) - assertThat(actual.responseTimestamp.epochSecond) - .isGreaterThanOrEqualTo(expected.requestTimestamp.epochSecond) - .isLessThanOrEqualTo(now.epochSecond) + assertThat(actual.responseTimestamp.toEpochMilli()) + .isGreaterThanOrEqualTo(expected.requestTimestamp.toEpochMilli()) + .isLessThanOrEqualTo(now.toEpochMilli()) assertSoftly { softly -> softly.assertThat(actual.other.items.size == expected.other.items.size) softly.assertThat(actual.other.items.containsAll(expected.other.items)) diff --git a/components/crypto/crypto-service-impl/src/test/kotlin/net/corda/crypto/service/impl/bus/CryptoOpsBusProcessorTests.kt b/components/crypto/crypto-service-impl/src/test/kotlin/net/corda/crypto/service/impl/bus/CryptoOpsBusProcessorTests.kt index 45adf54e903..797c3ccc6c1 100644 --- a/components/crypto/crypto-service-impl/src/test/kotlin/net/corda/crypto/service/impl/bus/CryptoOpsBusProcessorTests.kt +++ b/components/crypto/crypto-service-impl/src/test/kotlin/net/corda/crypto/service/impl/bus/CryptoOpsBusProcessorTests.kt @@ -165,9 +165,9 @@ class CryptoOpsBusProcessorTests { assertEquals(expected.requestId, actual.requestId) assertEquals(expected.requestingComponent, actual.requestingComponent) assertEquals(expected.requestTimestamp, actual.requestTimestamp) - assertThat(actual.responseTimestamp.epochSecond) - .isGreaterThanOrEqualTo(expected.requestTimestamp.epochSecond) - .isLessThanOrEqualTo(now.epochSecond) + assertThat(actual.responseTimestamp.toEpochMilli()) + .isGreaterThanOrEqualTo(expected.requestTimestamp.toEpochMilli()) + .isLessThanOrEqualTo(now.toEpochMilli()) assertTrue( actual.other.items.size == expected.other.items.size && actual.other.items.containsAll(expected.other.items) && diff --git a/components/crypto/crypto-service-impl/src/test/kotlin/net/corda/crypto/service/impl/bus/HSMRegistrationBusProcessorTests.kt b/components/crypto/crypto-service-impl/src/test/kotlin/net/corda/crypto/service/impl/bus/HSMRegistrationBusProcessorTests.kt index c5b5fe57618..715f7b6c2b7 100644 --- a/components/crypto/crypto-service-impl/src/test/kotlin/net/corda/crypto/service/impl/bus/HSMRegistrationBusProcessorTests.kt +++ b/components/crypto/crypto-service-impl/src/test/kotlin/net/corda/crypto/service/impl/bus/HSMRegistrationBusProcessorTests.kt @@ -72,9 +72,9 @@ class HSMRegistrationBusProcessorTests { assertEquals(expected.requestId, actual.requestId) assertEquals(expected.requestingComponent, actual.requestingComponent) assertEquals(expected.requestTimestamp, actual.requestTimestamp) - assertThat(actual.responseTimestamp.epochSecond) - .isGreaterThanOrEqualTo(expected.requestTimestamp.epochSecond) - .isLessThanOrEqualTo(now.epochSecond) + assertThat(actual.responseTimestamp.toEpochMilli()) + .isGreaterThanOrEqualTo(expected.requestTimestamp.toEpochMilli()) + .isLessThanOrEqualTo(now.toEpochMilli()) assertTrue( actual.other.items.size == expected.other.items.size && actual.other.items.containsAll(expected.other.items) && diff --git a/components/membership/membership-service-impl/src/test/kotlin/net/corda/membership/service/impl/MemberOpsServiceProcessorTest.kt b/components/membership/membership-service-impl/src/test/kotlin/net/corda/membership/service/impl/MemberOpsServiceProcessorTest.kt index 578e70c50f9..1fc4afffcc8 100644 --- a/components/membership/membership-service-impl/src/test/kotlin/net/corda/membership/service/impl/MemberOpsServiceProcessorTest.kt +++ b/components/membership/membership-service-impl/src/test/kotlin/net/corda/membership/service/impl/MemberOpsServiceProcessorTest.kt @@ -141,8 +141,8 @@ class MemberOpsServiceProcessorTest { private fun assertResponseContext(expected: MembershipRpcRequestContext, actual: MembershipRpcResponseContext) { assertEquals(expected.requestId, actual.requestId) assertEquals(expected.requestTimestamp, actual.requestTimestamp) - assertThat(actual.responseTimestamp.epochSecond).isGreaterThanOrEqualTo(expected.requestTimestamp.epochSecond) - assertThat(actual.responseTimestamp.epochSecond).isLessThanOrEqualTo(now.epochSecond) + assertThat(actual.responseTimestamp.toEpochMilli()).isGreaterThanOrEqualTo(expected.requestTimestamp.toEpochMilli()) + assertThat(actual.responseTimestamp.toEpochMilli()).isLessThanOrEqualTo(now.toEpochMilli()) } From c8ee6430aae654d15b18e4a0783e341ac90ca3f6 Mon Sep 17 00:00:00 2001 From: James Higgs <45565019+JamesHR3@users.noreply.github.com> Date: Fri, 10 Nov 2023 14:38:35 +0000 Subject: [PATCH 04/18] CORE-18228: Swap the send to after the state storage in the event mediator (#5069) Ensures that states are stored before publishing events back to Kafka. By doing these operations in this order, the messaging layer ensures that any visible effects outside the processor being invoked do not become apparent until after a record of the current state has been made. This ensures that certain types of race conditions or ledger mutation bugs cannot occur. This change also fixes missing synchronization on the data structure holding output events, which is required when processing in a multi-threaded environment. This can manifest as lost messages when placed under load. --- .../mediator/MultiSourceEventMediatorImpl.kt | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt index 67e2e4f7053..e83c896467c 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt @@ -167,7 +167,7 @@ class MultiSourceEventMediatorImpl( var states = stateManager.get(messages.map { it.key.toString() }.distinct()) while (groups.isNotEmpty()) { - val asynchronousOutputs = mutableListOf>() + val asynchronousOutputs = ConcurrentHashMap>>() val statesToCreate = ConcurrentHashMap() val statesToUpdate = ConcurrentHashMap() val statesToDelete = ConcurrentHashMap() @@ -179,7 +179,7 @@ class MultiSourceEventMediatorImpl( }.map { group -> taskManager.executeShortRunningTask { // Process all same flow events in one go - group.map { it -> + group.map { // Keep track of all records belonging to one flow flowEvents.compute(it.key.toString()) { _, v -> if (v == null) { @@ -201,7 +201,7 @@ class MultiSourceEventMediatorImpl( val event = queue.removeFirst() val response = config.messageProcessor.onNext(processorState, event) processorState = response.updatedState - processOutputEvents(response, asynchronousOutputs, queue, event) + processOutputEvents(it.key.toString(), response, asynchronousOutputs, queue, event) } // ---- Manage the state ---- @@ -218,7 +218,6 @@ class MultiSourceEventMediatorImpl( it.join() } - sendAsynchronousEvents(asynchronousOutputs) // Persist states changes val failedToCreateKeys = stateManager.create(statesToCreate.values.mapNotNull { it }) val failedToCreate = stateManager.get(failedToCreateKeys.keys) @@ -230,6 +229,8 @@ class MultiSourceEventMediatorImpl( } else { listOf() } + states.keys.forEach { asynchronousOutputs.remove(it) } + sendAsynchronousEvents(asynchronousOutputs.values.flatten()) } metrics.commitTimer.recordCallable { consumer.syncCommitOffsets() @@ -266,7 +267,7 @@ class MultiSourceEventMediatorImpl( } } - private fun sendAsynchronousEvents(busEvents: MutableList>) { + private fun sendAsynchronousEvents(busEvents: Collection>) { busEvents.forEach { message -> with(messageRouter.getDestination(message)) { message.addProperty(MessagingClient.MSG_PROP_ENDPOINT, endpoint) @@ -279,8 +280,9 @@ class MultiSourceEventMediatorImpl( * Send any synchronous events immediately, add asynchronous events to the busEvents collection to be sent later */ private fun processOutputEvents( + key: String, response: StateAndEventProcessor.Response, - busEvents: MutableList>, + busEvents: MutableMap>>, queue: ArrayDeque>, event: Record ) { @@ -288,7 +290,11 @@ class MultiSourceEventMediatorImpl( output.forEach { message -> val destination = messageRouter.getDestination(message) if (destination.type == RoutingDestination.Type.ASYNCHRONOUS) { - busEvents.add(message) + busEvents.compute(key) { _, value -> + val list = value ?: mutableListOf() + list.add(message) + list + } } else { @Suppress("UNCHECKED_CAST") val reply = with(destination) { From 0179d913ba11d2b355d801d9e83ab4ada8274305 Mon Sep 17 00:00:00 2001 From: Emily Bowe Date: Mon, 13 Nov 2023 11:58:04 +0000 Subject: [PATCH 05/18] CORE-17966: Implement checkpoint cleanup handler when processing FlowFatalException (#5044) Change: This PR will implement checkpoint cleanup handler when processing FlowFatalException. Reasoning: In order to address CORE-17966: When exceptions such as FlowFatalException are thrown in the Flow Engine the exception handlers cleanup the checkpoints and schedule cleanup of the session states. However they currently do not cleanup the StartFlow Flow Mapper state for RPC started flows the checkpoint cleanup handler will be integrated in a series of small PRs. --- .../tests/ExternalEventAcceptanceTest.kt | 7 +- .../tests/SubFlowFailedAcceptanceTest.kt | 144 ++++++++++++------ .../CheckpointCleanupHandlerImpl.kt | 4 +- .../impl/FlowEventExceptionProcessorImpl.kt | 35 +---- .../FlowEventExceptionProcessorImplTest.kt | 91 +++-------- .../FlowToBeKilledExceptionProcessingTest.kt | 6 +- 6 files changed, 132 insertions(+), 155 deletions(-) diff --git a/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/ExternalEventAcceptanceTest.kt b/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/ExternalEventAcceptanceTest.kt index 7b4e31f07fe..cf629c9b91d 100644 --- a/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/ExternalEventAcceptanceTest.kt +++ b/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/ExternalEventAcceptanceTest.kt @@ -3,17 +3,19 @@ package net.corda.flow.testing.tests import net.corda.data.KeyValuePairList import net.corda.data.flow.event.external.ExternalEventContext import net.corda.data.flow.event.external.ExternalEventResponseErrorType +import net.corda.data.flow.output.FlowStates import net.corda.data.persistence.EntityRequest import net.corda.data.persistence.EntityResponse import net.corda.data.persistence.FindEntities import net.corda.flow.external.events.factory.ExternalEventFactory import net.corda.flow.external.events.factory.ExternalEventRecord import net.corda.flow.fiber.FlowIORequest +import net.corda.flow.pipeline.exceptions.FlowProcessingExceptionTypes.FLOW_FAILED import net.corda.flow.state.FlowCheckpoint +import net.corda.flow.testing.context.ALICE_FLOW_KEY_MAPPER import net.corda.flow.testing.context.FlowServiceTestBase import net.corda.flow.testing.context.flowResumedWithError import net.corda.schema.configuration.FlowConfig -import net.corda.utilities.seconds import net.corda.v5.base.exceptions.CordaRuntimeException import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test @@ -400,6 +402,9 @@ class ExternalEventAcceptanceTest : FlowServiceTestBase() { markedForDlq() flowDidNotResume() flowFiberCacheDoesNotContainKey(ALICE_HOLDING_IDENTITY, REQUEST_ID1) + scheduleFlowMapperCleanupEvents(ALICE_FLOW_KEY_MAPPER) + nullStateRecord() + flowStatus(state = FlowStates.FAILED, errorType = FLOW_FAILED, errorMessage = "message") } } } diff --git a/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/SubFlowFailedAcceptanceTest.kt b/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/SubFlowFailedAcceptanceTest.kt index 6be827721a8..7da21140bcd 100644 --- a/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/SubFlowFailedAcceptanceTest.kt +++ b/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/SubFlowFailedAcceptanceTest.kt @@ -1,7 +1,9 @@ package net.corda.flow.testing.tests +import net.corda.data.flow.output.FlowStates import net.corda.flow.application.sessions.SessionInfo import net.corda.flow.fiber.FlowIORequest +import net.corda.flow.pipeline.exceptions.FlowProcessingExceptionTypes.FLOW_FAILED import net.corda.flow.testing.context.ALICE_FLOW_KEY_MAPPER import net.corda.flow.testing.context.FlowServiceTestBase import net.corda.flow.testing.context.startFlow @@ -33,15 +35,19 @@ class SubFlowFailedAcceptanceTest : FlowServiceTestBase() { initiatingToInitiatedFlow(PROTOCOL, FAKE_FLOW_NAME, FAKE_FLOW_NAME) } } + @Test fun `Given a subFlow contains sessions when the subFlow fails, session error events are sent and session cleanup is scheduled`() { `when` { startFlow(this) - .suspendsWith(FlowIORequest.Send( - mapOf( - SessionInfo(SESSION_ID_1, initiatedIdentityMemberName) to DATA_MESSAGE_1, - SessionInfo(SESSION_ID_2, initiatedIdentityMemberName) to DATA_MESSAGE_2, - ))) + .suspendsWith( + FlowIORequest.Send( + mapOf( + SessionInfo(SESSION_ID_1, initiatedIdentityMemberName) to DATA_MESSAGE_1, + SessionInfo(SESSION_ID_2, initiatedIdentityMemberName) to DATA_MESSAGE_2, + ) + ) + ) .suspendsWith( FlowIORequest.SubFlowFailed( RuntimeException(), @@ -58,50 +64,56 @@ class SubFlowFailedAcceptanceTest : FlowServiceTestBase() { } } } -@Test -fun `Given a subFlow contains an initiated and closed session when the subFlow fails a single session error event is sent to the initiated session and session cleanup is scheduled`() { - `when` { - startFlow(this) - .suspendsWith(FlowIORequest.Send( - mapOf( - SessionInfo(SESSION_ID_1, initiatedIdentityMemberName) to DATA_MESSAGE_1, - SessionInfo(SESSION_ID_2, initiatedIdentityMemberName) to DATA_MESSAGE_2, - ))) - .suspendsWith(FlowIORequest.CloseSessions(setOf(SESSION_ID_1))) - - sessionCloseEventReceived(FLOW_ID1, SESSION_ID_1, 1, ALICE_HOLDING_IDENTITY, BOB_HOLDING_IDENTITY) - .suspendsWith( - FlowIORequest.SubFlowFailed( - RuntimeException(), - listOf(SESSION_ID_1, SESSION_ID_2) + + @Test + fun `Given a subFlow contains an initiated and closed session when the subFlow fails a single session error event is sent to the initiated session and session cleanup is scheduled`() { + `when` { + startFlow(this) + .suspendsWith( + FlowIORequest.Send( + mapOf( + SessionInfo(SESSION_ID_1, initiatedIdentityMemberName) to DATA_MESSAGE_1, + SessionInfo(SESSION_ID_2, initiatedIdentityMemberName) to DATA_MESSAGE_2, + ) + ) ) - ) - .completedWithError(CordaRuntimeException("error")) - } + .suspendsWith(FlowIORequest.CloseSessions(setOf(SESSION_ID_1))) - then { - expectOutputForFlow(FLOW_ID1) { - noOutputEvent() + sessionCloseEventReceived(FLOW_ID1, SESSION_ID_1, 1, ALICE_HOLDING_IDENTITY, BOB_HOLDING_IDENTITY) + .suspendsWith( + FlowIORequest.SubFlowFailed( + RuntimeException(), + listOf(SESSION_ID_1, SESSION_ID_2) + ) + ) + .completedWithError(CordaRuntimeException("error")) } - expectOutputForFlow(FLOW_ID1) { - sessionErrorEvents(SESSION_ID_2) - scheduleFlowMapperCleanupEvents(ALICE_FLOW_KEY_MAPPER, SESSION_ID_1, SESSION_ID_2) + then { + expectOutputForFlow(FLOW_ID1) { + noOutputEvent() + } + + expectOutputForFlow(FLOW_ID1) { + sessionErrorEvents(SESSION_ID_2) + scheduleFlowMapperCleanupEvents(ALICE_FLOW_KEY_MAPPER, SESSION_ID_1, SESSION_ID_2) + } } } -} - @Test fun `Given a subFlow contains only closed sessions when the subFlow fails no session error events are sent`() { `when` { startFlow(this) - .suspendsWith(FlowIORequest.Send( - mapOf( - SessionInfo(SESSION_ID_1, initiatedIdentityMemberName) to DATA_MESSAGE_1, - SessionInfo(SESSION_ID_2, initiatedIdentityMemberName) to DATA_MESSAGE_2, - ))) + .suspendsWith( + FlowIORequest.Send( + mapOf( + SessionInfo(SESSION_ID_1, initiatedIdentityMemberName) to DATA_MESSAGE_1, + SessionInfo(SESSION_ID_2, initiatedIdentityMemberName) to DATA_MESSAGE_2, + ) + ) + ) .suspendsWith(FlowIORequest.CloseSessions(setOf(SESSION_ID_1, SESSION_ID_2))) .suspendsWith( FlowIORequest.SubFlowFailed( @@ -119,13 +131,17 @@ fun `Given a subFlow contains an initiated and closed session when the subFlow f } @Test - fun `Given a subFlow contains only errored sessions when the subFlow fails a wakeup event is scheduled and no session error events are sent`() { + fun `Given a subFlow contains only errored sessions when the subFlow fails no session error events are sent`() { given { startFlow(this) - .suspendsWith(FlowIORequest.Receive(setOf( - SessionInfo(SESSION_ID_1, initiatedIdentityMemberName), - SessionInfo(SESSION_ID_2, initiatedIdentityMemberName), - ))) + .suspendsWith( + FlowIORequest.Receive( + setOf( + SessionInfo(SESSION_ID_1, initiatedIdentityMemberName), + SessionInfo(SESSION_ID_2, initiatedIdentityMemberName), + ) + ) + ) sessionErrorEventReceived(FLOW_ID1, SESSION_ID_1) } @@ -151,7 +167,7 @@ fun `Given a subFlow contains an initiated and closed session when the subFlow f fun `Given a subFlow contains no sessions when the subFlow fails and flow finishes, requestid is cleaned up and no session errors are sent`() { `when` { startFlow(this) - .suspendsWith(FlowIORequest.SubFlowFailed(RuntimeException(), emptyList())) + .suspendsWith(FlowIORequest.SubFlowFailed(RuntimeException(), emptyList())) .completedWithError(CordaRuntimeException("Error")) } @@ -217,15 +233,19 @@ fun `Given a subFlow contains an initiated and closed session when the subFlow f } @Test - fun `Given an initiated top level flow with an errored session when it finishes and calls SubFlowFailed a wakeup event is scheduled and no session error event is sent`() { + fun `Given an initiated top level flow with an errored session when it finishes and calls SubFlowFailed, schedules cleanup and does not send a session error event`() { given { membershipGroupFor(BOB_HOLDING_IDENTITY) initiatingToInitiatedFlow(PROTOCOL_2, FLOW_NAME, FLOW_NAME_2) sessionCounterpartyInfoRequestReceived(FLOW_ID1, INITIATED_SESSION_ID_1, CPI1, PROTOCOL_2) - .suspendsWith(FlowIORequest.Receive(setOf( - SessionInfo(INITIATED_SESSION_ID_1, initiatedIdentityMemberName), - ))) + .suspendsWith( + FlowIORequest.Receive( + setOf( + SessionInfo(INITIATED_SESSION_ID_1, initiatedIdentityMemberName), + ) + ) + ) } `when` { @@ -246,4 +266,34 @@ fun `Given a subFlow contains an initiated and closed session when the subFlow f } } } + + @Test + fun `Given a subFlow contains sessions when the subFlow fails, and session is not found, FlowFatalException is thrown`() { + `when` { + startFlow(this) + .suspendsWith( + FlowIORequest.Send( + mapOf( + SessionInfo(SESSION_ID_1, initiatedIdentityMemberName) to DATA_MESSAGE_1, + ) + ) + ) + .suspendsWith( + FlowIORequest.SubFlowFailed( + RuntimeException(), + listOf(SESSION_ID_1, "BrokenSession") + ) + ) + } + + then { + expectOutputForFlow(FLOW_ID1) { + sessionErrorEvents(SESSION_ID_1) + scheduleFlowMapperCleanupEvents(SESSION_ID_1, ALICE_FLOW_KEY_MAPPER) + nullStateRecord() + flowStatus(state = FlowStates.FAILED, errorType = FLOW_FAILED, errorMessage = "Session: BrokenSession does not exist when executing session operation that requires an existing session") + } + } + } + } \ No newline at end of file diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/CheckpointCleanupHandlerImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/CheckpointCleanupHandlerImpl.kt index 240e4ff84b3..4a175703d4f 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/CheckpointCleanupHandlerImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/CheckpointCleanupHandlerImpl.kt @@ -37,7 +37,7 @@ class CheckpointCleanupHandlerImpl @Activate constructor( val records = errorActiveSessions(checkpoint, config, exception, time) + cleanupSessions(checkpoint, config, time) + generateStatus(checkpoint, exception) + - cleanupInitiatingFlow(checkpoint, config, time) + cleanupRpcFlowMapperState(checkpoint, config, time) checkpoint.markDeleted() return records } @@ -100,7 +100,7 @@ class CheckpointCleanupHandlerImpl @Activate constructor( } } - private fun cleanupInitiatingFlow( + private fun cleanupRpcFlowMapperState( checkpoint: FlowCheckpoint, config: SmartConfig, currentTime: Instant diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImpl.kt index 590045312b3..95eca1f1abc 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImpl.kt @@ -9,13 +9,13 @@ import net.corda.data.flow.state.session.SessionState import net.corda.data.flow.state.session.SessionStateType import net.corda.data.flow.state.waiting.WaitingFor import net.corda.flow.fiber.cache.FlowFiberCache +import net.corda.flow.maintenance.CheckpointCleanupHandler import net.corda.flow.pipeline.FlowEventExceptionProcessor import net.corda.flow.pipeline.events.FlowEventContext import net.corda.flow.pipeline.exceptions.FlowEventException import net.corda.flow.pipeline.exceptions.FlowFatalException import net.corda.flow.pipeline.exceptions.FlowMarkedForKillException import net.corda.flow.pipeline.exceptions.FlowPlatformException -import net.corda.flow.pipeline.exceptions.FlowProcessingExceptionTypes.FLOW_FAILED import net.corda.flow.pipeline.exceptions.FlowProcessingExceptionTypes.PLATFORM_ERROR import net.corda.flow.pipeline.exceptions.FlowTransientException import net.corda.flow.pipeline.factory.FlowMessageFactory @@ -45,6 +45,8 @@ class FlowEventExceptionProcessorImpl @Activate constructor( private val flowSessionManager: FlowSessionManager, @Reference(service = FlowFiberCache::class) private val flowFiberCache: FlowFiberCache, + @Reference(service = CheckpointCleanupHandler::class) + private val checkpointCleanupHandler: CheckpointCleanupHandler ) : FlowEventExceptionProcessor { private companion object { @@ -125,8 +127,6 @@ class FlowEventExceptionProcessorImpl @Activate constructor( exception: FlowFatalException, context: FlowEventContext<*> ): FlowEventContext<*> = withEscalation(context) { - - val exceptionHandlingStartTime = Instant.now() val checkpoint = context.checkpoint val msg = if (!checkpoint.doesExist) { @@ -138,36 +138,11 @@ class FlowEventExceptionProcessorImpl @Activate constructor( } log.warn(msg, exception) - val activeSessionIds = getActiveSessionIds(checkpoint) - - if(activeSessionIds.isNotEmpty()) { - checkpoint.putSessionStates( - flowSessionManager.sendErrorMessages( - context.checkpoint, activeSessionIds, exception, exceptionHandlingStartTime - ) - ) - } - - val errorEvents = - flowSessionManager.getSessionErrorEventRecords(checkpoint, context.flowConfig, exceptionHandlingStartTime) - val cleanupEvents = createCleanupEventsForSessions( - getScheduledCleanupExpiryTime(context, exceptionHandlingStartTime), - checkpoint.sessions.filterNot { it.hasScheduledCleanup } - ) - removeCachedFlowFiber(checkpoint) - checkpoint.markDeleted() - - val records = createStatusRecord(checkpoint.flowId) { - flowMessageFactory.createFlowFailedStatusMessage( - checkpoint, - FLOW_FAILED, - exception.message - ) - } + val cleanupRecords = checkpointCleanupHandler.cleanupCheckpoint(checkpoint, context.flowConfig, exception) context.copy( - outputRecords = records + errorEvents + cleanupEvents, + outputRecords = cleanupRecords, sendToDlq = true ) } diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImplTest.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImplTest.kt index 373db46dc0c..0653760d8c7 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImplTest.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImplTest.kt @@ -3,6 +3,7 @@ package net.corda.flow.pipeline.impl import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigValueFactory import net.corda.data.flow.FlowKey +import net.corda.data.flow.FlowStartContext import net.corda.data.flow.event.FlowEvent import net.corda.data.flow.event.external.ExternalEventResponse import net.corda.data.flow.event.mapper.FlowMapperEvent @@ -12,6 +13,7 @@ import net.corda.data.flow.state.session.SessionState import net.corda.data.flow.state.session.SessionStateType import net.corda.data.flow.state.waiting.WaitingFor import net.corda.flow.fiber.cache.FlowFiberCache +import net.corda.flow.maintenance.CheckpointCleanupHandler import net.corda.flow.pipeline.converters.FlowEventContextConverter import net.corda.flow.pipeline.events.FlowEventContext import net.corda.flow.pipeline.exceptions.FlowEventException @@ -33,7 +35,7 @@ import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.mockito.kotlin.any -import org.mockito.kotlin.anyOrNull +import org.mockito.kotlin.eq import org.mockito.kotlin.mock import org.mockito.kotlin.times import org.mockito.kotlin.verify @@ -60,6 +62,8 @@ class FlowEventExceptionProcessorImplTest { private val flowFiberCache = mock() private val serializedFiber = ByteBuffer.wrap("mock fiber".toByteArray()) + private val checkpointCleanupHandler = mock() + private val sessionIdOpen = "sesh-id" private val sessionIdClosed = "sesh-id-closed" private val flowActiveSessionState = SessionState().apply { @@ -74,7 +78,8 @@ class FlowEventExceptionProcessorImplTest { flowMessageFactory, flowRecordFactory, flowSessionManager, - flowFiberCache + flowFiberCache, + checkpointCleanupHandler ) @BeforeEach @@ -159,40 +164,30 @@ class FlowEventExceptionProcessorImplTest { ).thenReturn(flowStatusUpdate) whenever(flowRecordFactory.createFlowStatusRecord(flowStatusUpdate)).thenReturn(flowStatusUpdateRecord) - val result = target.process(error, context) + target.process(error, context) - verify(result.checkpoint).markDeleted() - assertThat(result.outputRecords).containsOnly(flowStatusUpdateRecord) - assertThat(result.sendToDlq).isTrue + verify(checkpointCleanupHandler).cleanupCheckpoint(eq(flowCheckpoint), any(), any()) } @Test fun `flow fatal exception marks flow for dlq and publishes status update`() { + val flowId = "f1" val error = FlowFatalException("error") - val flowStatusUpdate = FlowStatus() val key = FlowKey() - val flowStatusUpdateRecord = Record("", key, flowStatusUpdate) val flowMapperEvent = mock() val flowMapperRecord = Record(Schemas.Flow.FLOW_MAPPER_SESSION_OUT, "key", flowMapperEvent) - - whenever( - flowMessageFactory.createFlowFailedStatusMessage( - flowCheckpoint, - FlowProcessingExceptionTypes.FLOW_FAILED, - error.message - ) - ).thenReturn(flowStatusUpdate) - whenever(flowRecordFactory.createFlowStatusRecord(flowStatusUpdate)).thenReturn(flowStatusUpdateRecord) whenever(flowCheckpoint.doesExist).thenReturn(true) + whenever(flowCheckpoint.flowId).thenReturn(flowId) + val startContext = mock() + whenever(flowCheckpoint.flowStartContext).thenReturn(startContext) whenever(flowCheckpoint.flowKey).thenReturn(key) - whenever(flowCheckpoint.sessions).thenReturn(listOf(flowActiveSessionState, flowInactiveSessionState)) - whenever(flowCheckpoint.suspendCount).thenReturn(123) - whenever(flowRecordFactory.createFlowMapperEventRecord(any(), any())).thenReturn(flowMapperRecord) + val cleanupRecords = listOf (flowMapperRecord) + whenever(checkpointCleanupHandler.cleanupCheckpoint(any(), any(), any())).thenReturn(cleanupRecords) val result = target.process(error, context) - verify(result.checkpoint).markDeleted() - assertThat(result.outputRecords).contains(flowStatusUpdateRecord, flowMapperRecord) + verify(checkpointCleanupHandler).cleanupCheckpoint(eq(flowCheckpoint), any(), eq(error)) + assertThat(result.outputRecords).containsOnly(flowMapperRecord) assertThat(result.sendToDlq).isTrue verify(flowFiberCache).remove(key) } @@ -248,25 +243,6 @@ class FlowEventExceptionProcessorImplTest { assertThat(result.outputRecords).containsOnly(flowEventRecord) } - @Test - fun `failure to create a status message does not prevent fatal failure handling from succeeding`() { - val error = FlowFatalException("error") - - whenever( - flowMessageFactory.createFlowFailedStatusMessage( - any(), - any(), - any() - ) - ).thenThrow(IllegalStateException()) - - val result = target.process(error, context) - - verify(result.checkpoint).markDeleted() - assertThat(result.outputRecords).isEmpty() - assertThat(result.sendToDlq).isTrue - } - @Test fun `throwable triggered during transient exception processing does not escape the processor`() { val throwable = RuntimeException() @@ -278,18 +254,6 @@ class FlowEventExceptionProcessorImplTest { assertEmptyDLQdResult(transientResult) } - @Test - fun `throwable triggered during fatal exception processing does not escape the processor`() { - val throwable = RuntimeException() - val fatalError = FlowFatalException("error") - whenever( - flowSessionManager.getSessionErrorEventRecords(anyOrNull(),anyOrNull(),anyOrNull())) - .thenThrow(throwable) - - val fatalResult = target.process(fatalError, context) - assertEmptyDLQdResult(fatalResult) - } - @Test fun `throwable triggered during platform exception processing does not escape the processor`() { val throwable = RuntimeException() @@ -313,19 +277,8 @@ class FlowEventExceptionProcessorImplTest { fun `flow fatal exception with false doesExist confirms flow checkpoint not called`() { val flowCheckpoint = mock() whenever(flowCheckpoint.doesExist).thenReturn(false) - val error = FlowFatalException("error") - val flowStatusUpdate = FlowStatus() - val flowStatusUpdateRecord = Record("", FlowKey(), flowStatusUpdate) - whenever( - flowMessageFactory.createFlowFailedStatusMessage( - flowCheckpoint, - FlowProcessingExceptionTypes.FLOW_FAILED, - error.message - ) - ).thenReturn(flowStatusUpdate) - whenever(flowRecordFactory.createFlowStatusRecord(flowStatusUpdate)).thenReturn(flowStatusUpdateRecord) target.process(error, context) verify(flowCheckpoint, times(0)).flowStartContext @@ -338,17 +291,7 @@ class FlowEventExceptionProcessorImplTest { val context = buildFlowEventContext(checkpoint = flowCheckpoint, inputEventPayload = inputEvent) val error = FlowFatalException("error") - val flowStatusUpdate = FlowStatus() - val flowStatusUpdateRecord = Record("", FlowKey(), flowStatusUpdate) - whenever( - flowMessageFactory.createFlowFailedStatusMessage( - flowCheckpoint, - FlowProcessingExceptionTypes.FLOW_FAILED, - error.message - ) - ).thenReturn(flowStatusUpdate) - whenever(flowRecordFactory.createFlowStatusRecord(flowStatusUpdate)).thenReturn(flowStatusUpdateRecord) target.process(error, context) verify(flowCheckpoint, times(1)).flowStartContext diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowToBeKilledExceptionProcessingTest.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowToBeKilledExceptionProcessingTest.kt index f7a08ca52a5..1a600a01458 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowToBeKilledExceptionProcessingTest.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowToBeKilledExceptionProcessingTest.kt @@ -15,6 +15,7 @@ import net.corda.data.flow.state.session.SessionState import net.corda.data.flow.state.session.SessionStateType import net.corda.data.identity.HoldingIdentity import net.corda.flow.fiber.cache.FlowFiberCache +import net.corda.flow.maintenance.CheckpointCleanupHandler import net.corda.flow.pipeline.converters.FlowEventContextConverter import net.corda.flow.pipeline.exceptions.FlowMarkedForKillException import net.corda.flow.pipeline.factory.FlowMessageFactory @@ -97,12 +98,15 @@ class FlowToBeKilledExceptionProcessingTest { private val flowKilledStatusRecord = Record("s", flowKey, flowKilledStatus) private val mockResponse = mock>() private val flowFiberCache = mock() + private val checkpointCleanupHandler = mock() + private val target = FlowEventExceptionProcessorImpl( flowMessageFactory, flowRecordFactory, flowSessionManager, - flowFiberCache + flowFiberCache, + checkpointCleanupHandler ) @BeforeEach From 92fff548a7ec92a7511f8e8d88c1dd9c6866d823 Mon Sep 17 00:00:00 2001 From: Miljenko Brkic <97448832+mbrkic-r3@users.noreply.github.com> Date: Mon, 13 Nov 2023 12:32:06 +0000 Subject: [PATCH 06/18] CORE-18088 Race condition registering RPC endpoints in JavalinServer (#5076) * CORE-18088 Synchronized registration/removal of endpoints in JavalinServer * CORE-18088 Using mutable set for "endpoints" in JavalinServer since access is now synchronized with "serverLock" --- .../net/corda/web/server/JavalinServer.kt | 43 ++++++++++--------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/libs/web/web-impl/src/main/kotlin/net/corda/web/server/JavalinServer.kt b/libs/web/web-impl/src/main/kotlin/net/corda/web/server/JavalinServer.kt index 4c19c4bfa37..fb3d41c6d57 100644 --- a/libs/web/web-impl/src/main/kotlin/net/corda/web/server/JavalinServer.kt +++ b/libs/web/web-impl/src/main/kotlin/net/corda/web/server/JavalinServer.kt @@ -15,7 +15,6 @@ import org.osgi.service.component.annotations.Component import org.osgi.service.component.annotations.Reference import org.slf4j.Logger import org.slf4j.LoggerFactory -import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock @@ -55,12 +54,12 @@ class JavalinServer( private val apiPathPrefix: String = "/api/${platformInfoProvider.localWorkerSoftwareShortVersion}" private var server: Javalin? = null private val coordinator = coordinatorFactory.createCoordinator { _, _ -> } - private val serverStartLock = ReentrantLock() + private val serverLock = ReentrantLock() - override val endpoints: MutableSet = ConcurrentHashMap.newKeySet() + override val endpoints: MutableSet = mutableSetOf() override fun start(port: Int) { - serverStartLock.withLock { + serverLock.withLock { check(null == server) { "The Javalin webserver is already initialized" } coordinator.start() startServer(port) @@ -100,7 +99,7 @@ class JavalinServer( } private fun restartServer() { - serverStartLock.withLock { + serverLock.withLock { // restart server without marking the component down. checkNotNull(server) { "Cannot restart a non-existing server" } val port = server?.port() @@ -111,7 +110,7 @@ class JavalinServer( } override fun stop() { - serverStartLock.withLock { + serverLock.withLock { coordinator.updateStatus(LifecycleStatus.DOWN) stopServer() coordinator.stop() @@ -119,23 +118,27 @@ class JavalinServer( } override fun registerEndpoint(endpoint: Endpoint) { - if(endpoints.any { it.path == endpoint.path && it.methodType == endpoint.methodType }) - throw IllegalArgumentException("Endpoint with path ${endpoint.path} and method ${endpoint.methodType} already exists.") - // register immediately when the server has been started - if(null != server) registerEndpointInternal(endpoint) - // record the path in case we need to register when it's already started - endpoints.add(endpoint) + serverLock.withLock { + if (endpoints.any { it.path == endpoint.path && it.methodType == endpoint.methodType }) + throw IllegalArgumentException("Endpoint with path ${endpoint.path} and method ${endpoint.methodType} already exists.") + // register immediately when the server has been started + if (null != server) registerEndpointInternal(endpoint) + // record the path in case we need to register when it's already started + endpoints.add(endpoint) + } } override fun removeEndpoint(endpoint: Endpoint) { - endpoints.remove(endpoint) - // NOTE: - // The server needs to be restarted to un-register the path. However, this means everything dependent on - // this is impacted by a restart, which doesn't feel quite right. - // This also means we can't really DOWN/UP the lifecycle status of this because this would end up in a - // relentless yoyo-ing of this component as dependent components keep calling this function. - // TODO - review if it is really needed to de-register a path when a Subscription goes down, for example. - if(null != server) restartServer() + serverLock.withLock { + endpoints.remove(endpoint) + // NOTE: + // The server needs to be restarted to un-register the path. However, this means everything dependent on + // this is impacted by a restart, which doesn't feel quite right. + // This also means we can't really DOWN/UP the lifecycle status of this because this would end up in a + // relentless yoyo-ing of this component as dependent components keep calling this function. + // TODO - review if it is really needed to de-register a path when a Subscription goes down, for example. + if (null != server) restartServer() + } } private fun registerEndpointInternal(endpoint: Endpoint) { From 43e26fe537ecb95506f00737f9931ed28c3a3ebb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Jos=C3=A9=20Ramos?= Date: Mon, 13 Nov 2023 12:47:47 +0000 Subject: [PATCH 07/18] CORE-18299: Use Interface Instead of Concrete Type (#5082) --- .../net/corda/cli/plugins/network/GenerateGroupPolicy.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tools/plugins/network/src/main/kotlin/net/corda/cli/plugins/network/GenerateGroupPolicy.kt b/tools/plugins/network/src/main/kotlin/net/corda/cli/plugins/network/GenerateGroupPolicy.kt index cada2a5502c..a006d186594 100644 --- a/tools/plugins/network/src/main/kotlin/net/corda/cli/plugins/network/GenerateGroupPolicy.kt +++ b/tools/plugins/network/src/main/kotlin/net/corda/cli/plugins/network/GenerateGroupPolicy.kt @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.exc.MismatchedInputException import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import net.corda.cli.plugins.network.output.ConsoleOutput +import net.corda.cli.plugins.network.output.Output import net.corda.cli.plugins.network.utils.PrintUtils.printJsonOutput import net.corda.cli.plugins.network.utils.PrintUtils.verifyAndPrintError import org.yaml.snakeyaml.Yaml @@ -20,7 +21,7 @@ import java.util.UUID description = ["Generates GroupPolicy.json file."], mixinStandardHelpOptions = true ) -class GenerateGroupPolicy(private val output: ConsoleOutput = ConsoleOutput()) : Runnable { +class GenerateGroupPolicy(private val output: Output = ConsoleOutput()) : Runnable { @CommandLine.Option( names = ["--endpoint"], From 3719df555cbab76d7f80fdda081ee1eb8d9da192 Mon Sep 17 00:00:00 2001 From: Ben Millar <44114751+ben-millar@users.noreply.github.com> Date: Mon, 13 Nov 2023 13:54:53 +0000 Subject: [PATCH 08/18] CORE-17194 Adding prefix to consumer group names (#5055) This change adds a prefix to consumer group IDs to support multi-tenancy (>1 Corda cluster within a single Kafka cluster), in the same way that we currently do for topic names. Without this, consumers in different Corda deployments were being considered to be part of the same consumer group; this causes unnecessary re-balances, and was logging authz exceptions on partition assignment. --- .../kafka/config/MessageBusConfigResolver.kt | 29 ++++++++++++------- .../config/MessageBusConfigResolverTest.kt | 12 ++++---- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/libs/messaging/kafka-message-bus-impl/src/main/kotlin/net/corda/messagebus/kafka/config/MessageBusConfigResolver.kt b/libs/messaging/kafka-message-bus-impl/src/main/kotlin/net/corda/messagebus/kafka/config/MessageBusConfigResolver.kt index ed7bcd43a3e..0d80edf5db7 100644 --- a/libs/messaging/kafka-message-bus-impl/src/main/kotlin/net/corda/messagebus/kafka/config/MessageBusConfigResolver.kt +++ b/libs/messaging/kafka-message-bus-impl/src/main/kotlin/net/corda/messagebus/kafka/config/MessageBusConfigResolver.kt @@ -1,7 +1,6 @@ package net.corda.messagebus.kafka.config import com.typesafe.config.ConfigFactory -import java.util.Properties import net.corda.libs.configuration.SmartConfig import net.corda.libs.configuration.SmartConfigFactory import net.corda.messagebus.api.configuration.AdminConfig @@ -16,6 +15,7 @@ import net.corda.utilities.debug import org.apache.kafka.clients.producer.ProducerConfig.PARTITIONER_CLASS_CONFIG import org.osgi.framework.FrameworkUtil import org.slf4j.LoggerFactory +import java.util.Properties /** * Resolve a Kafka bus configuration against the enforced and default configurations provided by the library. @@ -71,6 +71,7 @@ internal class MessageBusConfigResolver(private val smartConfigFactory: SmartCon val properties = roleConfig.toKafkaProperties() logger.debug {"Kafka properties for role $rolePath: $properties" } + return properties } @@ -97,13 +98,15 @@ internal class MessageBusConfigResolver(private val smartConfigFactory: SmartCon * @return Resolved user configurable consumer values and kafka properties to be used for the given role type */ fun resolve(messageBusConfig: SmartConfig, consumerConfig: ConsumerConfig): Pair { - val kafkaProperties = resolve(messageBusConfig, consumerConfig.role.configPath, consumerConfig.toSmartConfig()) - val resolvedConfig = ResolvedConsumerConfig( - consumerConfig.group, - consumerConfig.clientId, - messageBusConfig.getString(BootConfig.TOPIC_PREFIX) - ) - return Pair(resolvedConfig, kafkaProperties) + val topicPrefix = messageBusConfig.getString(BootConfig.TOPIC_PREFIX) + val amendedConfig = consumerConfig.addGroupPrefix(topicPrefix) + val kafkaProperties = resolve(messageBusConfig, amendedConfig.role.configPath, amendedConfig.toSmartConfig()) + + return ResolvedConsumerConfig( + amendedConfig.group, + amendedConfig.clientId, + topicPrefix + ) to kafkaProperties } /** @@ -119,13 +122,13 @@ internal class MessageBusConfigResolver(private val smartConfigFactory: SmartCon val kafkaProperties = resolve(messageBusConfig, producerConfig.role.configPath, producerConfig.toSmartConfig()) //enforce the partitioner to be our custom partitioner for producers only kafkaProperties[PARTITIONER_CLASS_CONFIG] = KafkaProducerPartitioner::class.java - val resolvedConfig = ResolvedProducerConfig( + + return ResolvedProducerConfig( producerConfig.clientId, producerConfig.transactional, messageBusConfig.getString(BootConfig.TOPIC_PREFIX), producerConfig.throwOnSerializationError - ) - return Pair(resolvedConfig, kafkaProperties) + ) to kafkaProperties } /** @@ -166,6 +169,10 @@ internal class MessageBusConfigResolver(private val smartConfigFactory: SmartCon ) } + private fun ConsumerConfig.addGroupPrefix(prefix: String) : ConsumerConfig { + return this.copy(group = prefix + this.group) + } + private fun ProducerConfig.toSmartConfig(): SmartConfig { val transactionalId = if (transactional) { "$clientId-$instanceId" diff --git a/libs/messaging/kafka-message-bus-impl/src/test/kotlin/net/corda/messagebus/kafka/config/MessageBusConfigResolverTest.kt b/libs/messaging/kafka-message-bus-impl/src/test/kotlin/net/corda/messagebus/kafka/config/MessageBusConfigResolverTest.kt index e64b50d8a20..43ae5ddbeea 100644 --- a/libs/messaging/kafka-message-bus-impl/src/test/kotlin/net/corda/messagebus/kafka/config/MessageBusConfigResolverTest.kt +++ b/libs/messaging/kafka-message-bus-impl/src/test/kotlin/net/corda/messagebus/kafka/config/MessageBusConfigResolverTest.kt @@ -1,8 +1,6 @@ package net.corda.messagebus.kafka.config import com.typesafe.config.ConfigFactory -import java.util.Properties -import java.util.stream.Stream import net.corda.libs.configuration.SmartConfig import net.corda.libs.configuration.SmartConfigFactory import net.corda.messagebus.api.configuration.AdminConfig @@ -17,6 +15,8 @@ import org.junit.jupiter.api.assertThrows import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.MethodSource +import java.util.Properties +import java.util.stream.Stream class MessageBusConfigResolverTest { @@ -74,7 +74,7 @@ class MessageBusConfigResolverTest { BOOTSTRAP_SERVERS_PROP to "kafka:1001", SSL_KEYSTORE_PROP to "foo/bar", CLIENT_ID_PROP to "eventConsumer--$CLIENT_ID", - GROUP_ID_PROP to "group-cooperative" + GROUP_ID_PROP to "testgroup-cooperative" ) ), ConsumerRoles.EVENT_LOG to getExpectedConsumerProperties( @@ -85,7 +85,7 @@ class MessageBusConfigResolverTest { ), ConsumerRoles.RPC_SENDER to getExpectedConsumerProperties( mapOf( - GROUP_ID_PROP to "$GROUP_NAME-sender", + GROUP_ID_PROP to "test$GROUP_NAME-sender", BOOTSTRAP_SERVERS_PROP to "kafka:1001", SSL_KEYSTORE_PROP to "foo/bar", AUTO_OFFSET_RESET_PROP to "latest" @@ -93,7 +93,7 @@ class MessageBusConfigResolverTest { ), ConsumerRoles.RPC_RESPONDER to getExpectedConsumerProperties( mapOf( - GROUP_ID_PROP to "$GROUP_NAME-responder", + GROUP_ID_PROP to "test$GROUP_NAME-responder", BOOTSTRAP_SERVERS_PROP to "kafka:1001", SSL_KEYSTORE_PROP to "foo/bar", AUTO_OFFSET_RESET_PROP to "latest" @@ -149,7 +149,7 @@ class MessageBusConfigResolverTest { private fun getExpectedConsumerProperties(overrides: Map): Properties { val defaults = mapOf( - GROUP_ID_PROP to GROUP_NAME, + GROUP_ID_PROP to "test$GROUP_NAME", CLIENT_ID_PROP to "consumer--$CLIENT_ID", ISOLATION_LEVEL_PROP to "read_committed", BOOTSTRAP_SERVERS_PROP to "localhost:9092", From a880b11ae88dd464f8e8b633b5cd37980d154bf4 Mon Sep 17 00:00:00 2001 From: Ronan Browne Date: Mon, 13 Nov 2023 16:47:15 +0000 Subject: [PATCH 09/18] ES-1621: active forward merge automation (#5086) --- .../forward-merge/JenkinsfileMergeAutomation | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 .ci/dev/forward-merge/JenkinsfileMergeAutomation diff --git a/.ci/dev/forward-merge/JenkinsfileMergeAutomation b/.ci/dev/forward-merge/JenkinsfileMergeAutomation new file mode 100644 index 00000000000..d87cec96325 --- /dev/null +++ b/.ci/dev/forward-merge/JenkinsfileMergeAutomation @@ -0,0 +1,32 @@ +#! groovy +@Library('corda-shared-build-pipeline-steps@5.1') _ + +/** + * Forward merge any changes in current branch to the branch with following version. + * + * Please note, the branches names are intentionally separated as variables, to minimised conflicts + * during automated merges for this file. + * + * These variables should be updated when a new version is cut + */ + +/** + * the branch name of origin branch, it should match the current branch + * and it acts as a fail-safe inside {@code forwardMerger} pipeline + */ +String originBranch = 'release/os/5.1' + +/** + * the branch name of target branch, it should be the branch with the next version + * after the one in current branch. + */ +String targetBranch = 'release/os/5.2' + +/** + * Forward merge any changes between {@code originBranch} and {@code targetBranch} + */ +forwardMerger( + targetBranch: targetBranch, + originBranch: originBranch, + slackChannel: '#c5-forward-merge-bot-notifications' +) From 570dcc4ab7a8fc025d30794a9086e19e350eab45 Mon Sep 17 00:00:00 2001 From: Kyriakos Tharrouniatis Date: Mon, 13 Nov 2023 22:21:23 +0000 Subject: [PATCH 10/18] CORE-18268 db-worker restart causes loss of REST API High Availability (#5078) The issue was that on forced config reconciliation we would reconcile all config sections and therefore propagate them across the cluster no matter if their config value actually changed or not as part of the defaulting process. This caused components across the cluster to restart on db-worker restarts (one of which is the http server within the rest-worker which was causing its unavailability). * Contains the above config propagation on the db-worker side. On forced reconciliation checks if after applying the defaulting process to a config value read from the DB, it changes (expands) compared to its Kafka corresponding one and only then reconciles it --- .../impl/publish/ConfigPublishServiceImpl.kt | 55 +++++++++++++++++++ .../impl/ReconcilerEventHandler.kt | 10 +++- .../impl/ReconcilerEventHandlerTest.kt | 7 ++- .../corda/reconciliation/ReconcilerWriter.kt | 10 ++++ 4 files changed, 80 insertions(+), 2 deletions(-) diff --git a/components/configuration/configuration-write-service-impl/src/main/kotlin/net/corda/configuration/write/impl/publish/ConfigPublishServiceImpl.kt b/components/configuration/configuration-write-service-impl/src/main/kotlin/net/corda/configuration/write/impl/publish/ConfigPublishServiceImpl.kt index f603e502531..80df16543b4 100644 --- a/components/configuration/configuration-write-service-impl/src/main/kotlin/net/corda/configuration/write/impl/publish/ConfigPublishServiceImpl.kt +++ b/components/configuration/configuration-write-service-impl/src/main/kotlin/net/corda/configuration/write/impl/publish/ConfigPublishServiceImpl.kt @@ -19,6 +19,7 @@ import net.corda.v5.base.versioning.Version import org.osgi.service.component.annotations.Activate import org.osgi.service.component.annotations.Component import org.osgi.service.component.annotations.Reference +import org.slf4j.LoggerFactory // This needs to be a `Lifecycle` for reconciliation, maybe not only for that. However it cannot really wait on // `ConfigurationReadService`, because it will be used by `ConfigWriteService` which needs to be started before @@ -36,6 +37,10 @@ class ConfigPublishServiceImpl @Activate constructor( configurationValidatorFactory: ConfigurationValidatorFactory ) : ConfigPublishService { + companion object { + private val logger = LoggerFactory.getLogger(ConfigPublishServiceImpl::class.java) + } + private val handler = ConfigPublishServiceHandler(publisherFactory, configMerger) override val lifecycleCoordinatorName = LifecycleCoordinatorName.forComponent() @@ -102,6 +107,56 @@ class ConfigPublishServiceImpl @Activate constructor( TODO("Not yet implemented") } + override fun valuesMisalignedAfterDefaults( + recordKey: String, + dbRecordValue: Configuration, + kafkaRecordValue: Configuration + ): Boolean { + require(dbRecordValue.version == kafkaRecordValue.version) + require(dbRecordValue.schemaVersion.majorVersion == kafkaRecordValue.schemaVersion.majorVersion) + val schemaMajorVersion = dbRecordValue.schemaVersion.majorVersion + require(dbRecordValue.schemaVersion.minorVersion == kafkaRecordValue.schemaVersion.minorVersion) + val schemaMinorVersion = dbRecordValue.schemaVersion.minorVersion + + val dbConfigValueWithDefaults = + smartConfigFactory.create(ConfigFactory.parseString(dbRecordValue.value)).run { + validator.validate( + recordKey, + Version(schemaMajorVersion, schemaMinorVersion), + this, + applyDefaults = true + ) + } + + val kafkaConfigValue = + smartConfigFactory.create(ConfigFactory.parseString(kafkaRecordValue.value)).run { + validator.validate( + recordKey, + Version(schemaMajorVersion, schemaMinorVersion), + this, + applyDefaults = false, + ) + } + + + val configsAreNotEqual = dbConfigValueWithDefaults != kafkaConfigValue + if (configsAreNotEqual) { + logger.info( + "Configuration for key $recordKey is misaligned on Kafka after applying defaults (Kafka will be updated).\n" + + "DB config value: ${ + dbConfigValueWithDefaults.toSafeConfig().root() + .render(ConfigRenderOptions.concise().setFormatted(true)) + }\n" + + "Kafka config value: ${ + kafkaConfigValue.toSafeConfig().root() + .render(ConfigRenderOptions.concise().setFormatted(true)) + }" + ) + } + + return configsAreNotEqual + } + override val isRunning get() = coordinator.isRunning override fun start() = coordinator.start() diff --git a/components/reconciliation/reconciliation-impl/src/main/kotlin/net/corda/reconciliation/impl/ReconcilerEventHandler.kt b/components/reconciliation/reconciliation-impl/src/main/kotlin/net/corda/reconciliation/impl/ReconcilerEventHandler.kt index 4fe7d46f77f..6ed76e03831 100644 --- a/components/reconciliation/reconciliation-impl/src/main/kotlin/net/corda/reconciliation/impl/ReconcilerEventHandler.kt +++ b/components/reconciliation/reconciliation-impl/src/main/kotlin/net/corda/reconciliation/impl/ReconcilerEventHandler.kt @@ -137,7 +137,15 @@ internal class ReconcilerEventHandler( // therefore through the defaulting config process which will add the property(ies) and subsequently // will publish them to Kafka. We only need to force the first reconciliation. if (forceInitialReconciliation && firstRun) { - dbRecord.version >= matchedKafkaRecord.version // reconcile all db records again (forced reconciliation) + dbRecord.version > matchedKafkaRecord.version + // reconcile all db records again (forced reconciliation) + || (dbRecord.version == matchedKafkaRecord.version + && writer.valuesMisalignedAfterDefaults( + dbRecord.key, + dbRecord.value, + matchedKafkaRecord.value + ) + ) } else { dbRecord.version > matchedKafkaRecord.version // reconcile db updated records } || dbRecord.isDeleted // reconcile db soft deleted records diff --git a/components/reconciliation/reconciliation-impl/src/test/kotlin/net/corda/reconciliation/impl/ReconcilerEventHandlerTest.kt b/components/reconciliation/reconciliation-impl/src/test/kotlin/net/corda/reconciliation/impl/ReconcilerEventHandlerTest.kt index cbf23559c4c..dc6d265df98 100644 --- a/components/reconciliation/reconciliation-impl/src/test/kotlin/net/corda/reconciliation/impl/ReconcilerEventHandlerTest.kt +++ b/components/reconciliation/reconciliation-impl/src/test/kotlin/net/corda/reconciliation/impl/ReconcilerEventHandlerTest.kt @@ -2,6 +2,7 @@ package net.corda.reconciliation.impl import net.corda.lifecycle.LifecycleCoordinator import net.corda.reconciliation.ReconcilerReader +import net.corda.reconciliation.ReconcilerWriter import net.corda.reconciliation.VersionedRecord import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions.assertEquals @@ -92,11 +93,15 @@ internal class ReconcilerEventHandlerTest { } } + val writer = mock>().also { + whenever(it.valuesMisalignedAfterDefaults(any(), any(), any())).thenReturn(true) + } + reconcilerEventHandler = ReconcilerEventHandler( dbReader, kafkaReader, - writer = mock(), + writer, keyClass = String::class.java, valueClass = Int::class.java, 10L, diff --git a/components/reconciliation/reconciliation/src/main/kotlin/net/corda/reconciliation/ReconcilerWriter.kt b/components/reconciliation/reconciliation/src/main/kotlin/net/corda/reconciliation/ReconcilerWriter.kt index 73da8a3a8b0..8d05c6180ab 100644 --- a/components/reconciliation/reconciliation/src/main/kotlin/net/corda/reconciliation/ReconcilerWriter.kt +++ b/components/reconciliation/reconciliation/src/main/kotlin/net/corda/reconciliation/ReconcilerWriter.kt @@ -18,4 +18,14 @@ interface ReconcilerWriter { fun remove(recordKey: K) val lifecycleCoordinatorName: LifecycleCoordinatorName + + /** + * Compare DB record value to its Kafka respective one to see if after applying defaults + * to the DB record, the DB record is different to the Kafka one. + */ + fun valuesMisalignedAfterDefaults( + recordKey: K, + dbRecordValue: V, + kafkaRecordValue: V + ): Boolean = false } \ No newline at end of file From 4bbf3dba18d9506687d370d828aa9e6733167575 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Jos=C3=A9=20Ramos?= Date: Tue, 14 Nov 2023 09:12:06 +0000 Subject: [PATCH 11/18] CORE-18282: Fix Flaky Tests (#5090) Sort the returned states before building the filters to use so that the lower bound is always older in time than the upper bound. --- .../impl/tests/StateManagerIntegrationTest.kt | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/libs/state-manager/state-manager-db-impl/src/integrationTest/kotlin/net/corda/libs/statemanager/impl/tests/StateManagerIntegrationTest.kt b/libs/state-manager/state-manager-db-impl/src/integrationTest/kotlin/net/corda/libs/statemanager/impl/tests/StateManagerIntegrationTest.kt index ba725ed7d8d..47e671a072b 100644 --- a/libs/state-manager/state-manager-db-impl/src/integrationTest/kotlin/net/corda/libs/statemanager/impl/tests/StateManagerIntegrationTest.kt +++ b/libs/state-manager/state-manager-db-impl/src/integrationTest/kotlin/net/corda/libs/statemanager/impl/tests/StateManagerIntegrationTest.kt @@ -127,6 +127,24 @@ class StateManagerIntegrationTest { } } + private fun getIntervalBetweenEntities(startEntityKey: String, finishEntityKey: String): Pair { + return dataSource.connection.transaction { connection -> + val loadedEntities = connection.prepareStatement(queryProvider.findStatesByKey(2)) + .use { + it.setString(1, startEntityKey) + it.setString(2, finishEntityKey) + it.executeQuery().resultSetAsStateEntityCollection() + }.sortedBy { + it.modifiedTime + } + + Pair( + loadedEntities.elementAt(0).modifiedTime, + loadedEntities.elementAt(1).modifiedTime + ) + } + } + @ValueSource(ints = [1, 10]) @ParameterizedTest(name = "can create basic states (batch size: {0})") fun canCreateBasicStates(stateCount: Int) { @@ -457,22 +475,6 @@ class StateManagerIntegrationTest { } } - private fun getIntervalBetweenEntities(startEntityKey: String, finishEntityKey: String): Pair { - return dataSource.connection.transaction { connection -> - val loadedEntities = connection.prepareStatement(queryProvider.findStatesByKey(2)) - .use { - it.setString(1, startEntityKey) - it.setString(2, finishEntityKey) - it.executeQuery().resultSetAsStateEntityCollection() - } - - Pair( - loadedEntities.elementAt(0).modifiedTime, - loadedEntities.elementAt(1).modifiedTime - ) - } - } - @Test @DisplayName(value = "can filter states using simple comparisons on metadata values") fun canFilterStatesUsingSimpleComparisonsOnMetadataValues() { From 9c258cb3cfa9c14f4e860b3c2d1cc23702e34e0a Mon Sep 17 00:00:00 2001 From: Emily Bowe Date: Tue, 14 Nov 2023 15:36:35 +0000 Subject: [PATCH 12/18] CORE-17966: Implement checkpoint cleanup handler when processing FlowMarkedForKillException (#5057) Change: This PR will implement checkpoint cleanup handler when processing FlowMarkedForKillException. Reasoning: In order to address CORE-17966: When exceptions such as FlowFatalException are thrown in the Flow Engine the exception handlers cleanup the checkpoints and schedule cleanup of the session states. However they currently do not cleanup the StartFlow Flow Mapper state for RPC started flows the checkpoint cleanup handler will be integrated in a series of small PRs. --- .../testing/tests/FlowKilledAcceptanceTest.kt | 6 + .../impl/FlowEventExceptionProcessorImpl.kt | 55 +----- .../FlowToBeKilledExceptionProcessingTest.kt | 170 +++--------------- 3 files changed, 28 insertions(+), 203 deletions(-) diff --git a/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/FlowKilledAcceptanceTest.kt b/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/FlowKilledAcceptanceTest.kt index 84052973ff2..343443410df 100644 --- a/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/FlowKilledAcceptanceTest.kt +++ b/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/FlowKilledAcceptanceTest.kt @@ -3,6 +3,8 @@ package net.corda.flow.testing.tests import net.corda.data.flow.output.FlowStates import net.corda.flow.application.sessions.SessionInfo import net.corda.flow.fiber.FlowIORequest +import net.corda.flow.testing.context.ALICE_FLOW_KEY_MAPPER +import net.corda.flow.testing.context.BOB_FLOW_KEY_MAPPER import net.corda.flow.testing.context.FlowServiceTestBase import net.corda.virtualnode.OperationalStatus import org.junit.jupiter.api.BeforeEach @@ -41,8 +43,10 @@ class FlowKilledAcceptanceTest : FlowServiceTestBase() { then { expectOutputForFlow(FLOW_ID1) { + sessionErrorEvents() nullStateRecord() flowKilledStatus(flowTerminatedReason = "Flow operational status is INACTIVE") + scheduleFlowMapperCleanupEvents(BOB_FLOW_KEY_MAPPER) flowFiberCacheDoesNotContainKey(BOB_HOLDING_IDENTITY, REQUEST_ID1) } } @@ -94,8 +98,10 @@ class FlowKilledAcceptanceTest : FlowServiceTestBase() { then { expectOutputForFlow(FLOW_ID1) { + sessionErrorEvents(SESSION_ID_1) nullStateRecord() flowKilledStatus(flowTerminatedReason = "Flow operational status is INACTIVE") + scheduleFlowMapperCleanupEvents(SESSION_ID_1, ALICE_FLOW_KEY_MAPPER) flowFiberCacheDoesNotContainKey(ALICE_HOLDING_IDENTITY, REQUEST_ID1) } } diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImpl.kt index 95eca1f1abc..d2b28b4c091 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImpl.kt @@ -1,12 +1,8 @@ package net.corda.flow.pipeline.impl import net.corda.data.flow.event.StartFlow -import net.corda.data.flow.event.mapper.FlowMapperEvent -import net.corda.data.flow.event.mapper.ScheduleCleanup import net.corda.data.flow.output.FlowStates import net.corda.data.flow.output.FlowStatus -import net.corda.data.flow.state.session.SessionState -import net.corda.data.flow.state.session.SessionStateType import net.corda.data.flow.state.waiting.WaitingFor import net.corda.flow.fiber.cache.FlowFiberCache import net.corda.flow.maintenance.CheckpointCleanupHandler @@ -25,7 +21,6 @@ import net.corda.flow.state.FlowCheckpoint import net.corda.libs.configuration.SmartConfig import net.corda.libs.configuration.getLongOrDefault import net.corda.messaging.api.records.Record -import net.corda.schema.configuration.FlowConfig import net.corda.schema.configuration.FlowConfig.PROCESSING_MAX_RETRY_WINDOW_DURATION import org.osgi.service.component.annotations.Activate import org.osgi.service.component.annotations.Component @@ -196,7 +191,6 @@ class FlowEventExceptionProcessorImpl @Activate constructor( context: FlowEventContext<*> ): FlowEventContext<*> { return withEscalation(context) { - val exceptionHandlingStartTime = Instant.now() val checkpoint = context.checkpoint if (!checkpoint.doesExist) { @@ -212,42 +206,16 @@ class FlowEventExceptionProcessorImpl @Activate constructor( ) } - val activeSessionIds = getActiveSessionIds(checkpoint) - - if (activeSessionIds.isNotEmpty()) { - checkpoint.putSessionStates( - flowSessionManager.sendErrorMessages( - context.checkpoint, activeSessionIds, exception, exceptionHandlingStartTime - ) - ) - } - val errorEvents = flowSessionManager.getSessionErrorEventRecords( - context.checkpoint, - context.flowConfig, - exceptionHandlingStartTime - ) - val cleanupEvents = createCleanupEventsForSessions( - getScheduledCleanupExpiryTime(context, exceptionHandlingStartTime), - checkpoint.sessions.filterNot { it.hasScheduledCleanup } - ) - val statusRecord = - createFlowKilledStatusRecord(checkpoint, exception.message ?: "No exception message provided.") - removeCachedFlowFiber(checkpoint) - - checkpoint.markDeleted() + val cleanupRecords = checkpointCleanupHandler.cleanupCheckpoint(checkpoint, context.flowConfig, exception) context.copy( - outputRecords = errorEvents + cleanupEvents + statusRecord, + outputRecords = cleanupRecords, sendToDlq = false // killed flows do not go to DLQ ) } } - private fun getActiveSessionIds(checkpoint: FlowCheckpoint) = checkpoint.sessions.filterNot { sessionState -> - sessionState.status == SessionStateType.CLOSED || sessionState.status == SessionStateType.ERROR - }.map { it.sessionId } - private fun withEscalation(context: FlowEventContext<*>, handler: () -> FlowEventContext<*>): FlowEventContext<*> { return try { handler() @@ -257,20 +225,6 @@ class FlowEventExceptionProcessorImpl @Activate constructor( } } - private fun createCleanupEventsForSessions( - expiryTime: Long, - sessionsToCleanup: List - ): List> { - return sessionsToCleanup - .onEach { it.hasScheduledCleanup = true } - .map { - flowRecordFactory.createFlowMapperEventRecord( - it.sessionId, - ScheduleCleanup(expiryTime) - ) - } - } - private fun createFlowKilledStatusRecord(checkpoint: FlowCheckpoint, message: String?): List> { return createStatusRecord(checkpoint.flowId) { flowMessageFactory.createFlowKilledStatusMessage(checkpoint, message) @@ -301,11 +255,6 @@ class FlowEventExceptionProcessorImpl @Activate constructor( } } - private fun getScheduledCleanupExpiryTime(context: FlowEventContext<*>, now: Instant): Long { - val flowCleanupTime = context.flowConfig.getLong(FlowConfig.SESSION_FLOW_CLEANUP_TIME) - return now.plusMillis(flowCleanupTime).toEpochMilli() - } - /** * Remove cached flow fiber for this checkpoint, if it exists. */ diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowToBeKilledExceptionProcessingTest.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowToBeKilledExceptionProcessingTest.kt index 1a600a01458..54d67ce4290 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowToBeKilledExceptionProcessingTest.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowToBeKilledExceptionProcessingTest.kt @@ -6,17 +6,11 @@ import net.corda.data.flow.FlowKey import net.corda.data.flow.FlowStartContext import net.corda.data.flow.event.StartFlow import net.corda.data.flow.event.mapper.FlowMapperEvent -import net.corda.data.flow.event.mapper.ScheduleCleanup import net.corda.data.flow.output.FlowStates import net.corda.data.flow.output.FlowStatus -import net.corda.data.flow.state.checkpoint.Checkpoint -import net.corda.data.flow.state.external.ExternalEventState -import net.corda.data.flow.state.session.SessionState -import net.corda.data.flow.state.session.SessionStateType import net.corda.data.identity.HoldingIdentity import net.corda.flow.fiber.cache.FlowFiberCache import net.corda.flow.maintenance.CheckpointCleanupHandler -import net.corda.flow.pipeline.converters.FlowEventContextConverter import net.corda.flow.pipeline.exceptions.FlowMarkedForKillException import net.corda.flow.pipeline.factory.FlowMessageFactory import net.corda.flow.pipeline.factory.FlowRecordFactory @@ -24,67 +18,28 @@ import net.corda.flow.pipeline.sessions.FlowSessionManager import net.corda.flow.state.FlowCheckpoint import net.corda.flow.test.utils.buildFlowEventContext import net.corda.libs.configuration.SmartConfigFactory -import net.corda.messaging.api.processor.StateAndEventProcessor import net.corda.messaging.api.records.Record +import net.corda.schema.Schemas import net.corda.schema.configuration.FlowConfig import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.mockito.kotlin.any -import org.mockito.kotlin.eq import org.mockito.kotlin.mock -import org.mockito.kotlin.times -import org.mockito.kotlin.verify import org.mockito.kotlin.whenever class FlowToBeKilledExceptionProcessingTest { - private companion object { - const val SESSION_ID_1 = "s1" - const val SESSION_ID_2 = "s2" - const val SESSION_ID_3 = "s3" - const val SESSION_ID_4 = "s4" - const val SESSION_ID_5 = "s5" const val FLOW_ID = "flowId" } private val flowRecordFactory = mock() private val flowMessageFactory = mock() - private val flowEventContextConverter = mock() private val flowSessionManager = mock() private val flowConfig = ConfigFactory.empty().withValue( FlowConfig.PROCESSING_MAX_RETRY_WINDOW_DURATION, ConfigValueFactory.fromAnyRef(20000L) ) private val smartFlowConfig = SmartConfigFactory.createWithoutSecurityServices().create(flowConfig) - - // starting session states - private val sessionState1 = createSessionState(SESSION_ID_1, true, SessionStateType.CLOSING) - private val sessionState2 = createSessionState(SESSION_ID_2, true, SessionStateType.ERROR) - private val sessionState3 = createSessionState(SESSION_ID_3, true, SessionStateType.CLOSED) - private val sessionState4 = createSessionState(SESSION_ID_4, false, SessionStateType.CONFIRMED) - private val sessionState5 = createSessionState(SESSION_ID_5, false, SessionStateType.CREATED) - - // session states which have had their state updated to ERROR - private val erroredSessionState1 = createSessionState(SESSION_ID_1, false, SessionStateType.ERROR) - private val erroredSessionState4 = createSessionState(SESSION_ID_4, false, SessionStateType.ERROR) - private val erroredSessionState5 = createSessionState(SESSION_ID_5, false, SessionStateType.ERROR) - - private val allFlowSessions = listOf(sessionState1, sessionState2, sessionState3, sessionState4, sessionState5) - private val activeFlowSessionIds = listOf(SESSION_ID_1, SESSION_ID_4, SESSION_ID_5) - private val erroredFlowSessions = listOf(erroredSessionState1, erroredSessionState4, erroredSessionState5) - private val allFlowSessionsAfterErrorsSent = listOf( - erroredSessionState1, sessionState2, sessionState3, erroredSessionState4, erroredSessionState5 - ) - - private fun createSessionState(sessionId: String, hasScheduledCleanup: Boolean, status: SessionStateType): SessionState = - SessionState().apply { - this.sessionId = sessionId - this.hasScheduledCleanup = hasScheduledCleanup - this.status = status - } - - private val scheduleCleanupRecord4 = Record("t", SESSION_ID_4, FlowMapperEvent(ScheduleCleanup(1000))) - private val scheduleCleanupRecord5 = Record("t", SESSION_ID_5, FlowMapperEvent(ScheduleCleanup(1000))) private val flowKey = FlowKey("id", HoldingIdentity("x500", "grp1")) private val checkpoint = mock { whenever(it.flowKey).thenReturn(flowKey) @@ -96,17 +51,10 @@ class FlowToBeKilledExceptionProcessingTest { processingTerminatedReason = "reason" } private val flowKilledStatusRecord = Record("s", flowKey, flowKilledStatus) - private val mockResponse = mock>() private val flowFiberCache = mock() private val checkpointCleanupHandler = mock() - - private val target = FlowEventExceptionProcessorImpl( - flowMessageFactory, - flowRecordFactory, - flowSessionManager, - flowFiberCache, - checkpointCleanupHandler + flowMessageFactory, flowRecordFactory, flowSessionManager, flowFiberCache, checkpointCleanupHandler ) @BeforeEach @@ -116,127 +64,49 @@ class FlowToBeKilledExceptionProcessingTest { } @Test - fun `processing MarkedForKillException sends error events to all sessions then schedules cleanup for any not yet scheduled`() { + fun `processing FlowMarkedForKillException calls checkpoint cleanup handler and copies cleanup records to context`() { val testContext = buildFlowEventContext(checkpoint, Any()) val exception = FlowMarkedForKillException("reasoning") - - whenever(checkpoint.doesExist).thenReturn(true) - - // The first call returns all sessions. - whenever(checkpoint.sessions) - .thenReturn(allFlowSessions) - .thenReturn(allFlowSessionsAfterErrorsSent) - - // we send error messages for all active flow sessions, returning the flow sessions with state updated to ERROR - whenever(flowSessionManager.sendErrorMessages(eq(checkpoint), eq(activeFlowSessionIds), eq(exception), any())) - .thenReturn(erroredFlowSessions) - - // we clean up all flow sessions that have not already been cleaned up - whenever(flowRecordFactory.createFlowMapperEventRecord(eq(SESSION_ID_4), any())).thenReturn(scheduleCleanupRecord4) - whenever(flowRecordFactory.createFlowMapperEventRecord(eq(SESSION_ID_5), any())).thenReturn(scheduleCleanupRecord5) - - // callouts to factories to create flow killed status record - whenever(flowMessageFactory.createFlowKilledStatusMessage(any(), any())).thenReturn(flowKilledStatus) - whenever(flowRecordFactory.createFlowStatusRecord(flowKilledStatus)).thenReturn(flowKilledStatusRecord) - - val response = target.process(exception, testContext) - - verify(checkpoint, times(1)).putSessionStates(erroredFlowSessions) - verify(checkpoint, times(1)).markDeleted() - - assertThat(response.outputRecords) - .withFailMessage("Output records should contain cleanup records for sessions that aren't already scheduled") - .contains(scheduleCleanupRecord4, scheduleCleanupRecord5) - - assertThat(response.outputRecords) - .withFailMessage("Output records should contain the flow killed status record") - .contains(flowKilledStatusRecord) - - val updatedFlowSessions = response.checkpoint.sessions.associateBy { it.sessionId } - assertThat(updatedFlowSessions.values.all { it.hasScheduledCleanup }) - .withFailMessage("All flow sessions should now be marked as scheduled for cleanup") - .isTrue - assertThat(updatedFlowSessions[SESSION_ID_1]?.status).isEqualTo(SessionStateType.ERROR) - assertThat(updatedFlowSessions[SESSION_ID_2]?.status).isEqualTo(SessionStateType.ERROR) - assertThat(updatedFlowSessions[SESSION_ID_3]?.status).isEqualTo(SessionStateType.CLOSED) - assertThat(updatedFlowSessions[SESSION_ID_4]?.status).isEqualTo(SessionStateType.ERROR) - assertThat(updatedFlowSessions[SESSION_ID_5]?.status).isEqualTo(SessionStateType.ERROR) - } - - @Test - fun `processing MarkedForKillException removes retries and external events from output records`() { whenever(checkpoint.doesExist).thenReturn(true) - - val externalEventState = ExternalEventState() - whenever(checkpoint.externalEventState).thenReturn(externalEventState) - whenever(checkpoint.inRetryState).thenReturn(true) - - val testContext = buildFlowEventContext(checkpoint, Any()) - val exception = FlowMarkedForKillException("reasoning") - - whenever(checkpoint.sessions) - .thenReturn(emptyList()) - .thenReturn(emptyList()) - - // callouts to factories to create flow killed status record - whenever(flowMessageFactory.createFlowKilledStatusMessage(any(), any())).thenReturn(flowKilledStatus) - whenever(flowRecordFactory.createFlowStatusRecord(flowKilledStatus)).thenReturn(flowKilledStatusRecord) + val flowMapperEvent = mock() + val cleanupRecord = Record(Schemas.Flow.FLOW_MAPPER_SESSION_OUT, "key", flowMapperEvent) + val cleanupRecords = listOf(cleanupRecord) + whenever(checkpointCleanupHandler.cleanupCheckpoint(any(), any(), any())).thenReturn(cleanupRecords) val response = target.process(exception, testContext) - verify(checkpoint, times(1)).markDeleted() - - assertThat(response.outputRecords) - .withFailMessage("Output records should only have flow status record") - .hasSize(1) - - assertThat(response.outputRecords) - .withFailMessage("Output records should contain the flow killed status record") - .contains(flowKilledStatusRecord) + assertThat(response.outputRecords).contains(cleanupRecord) } @Test - fun `processing MarkedForKillException when checkpoint does not exist only outputs flow killed status record`() { + fun `processing FlowMarkedForKillException when checkpoint does not exist only outputs flow killed status record`() { whenever(checkpoint.doesExist).thenReturn(false) - - val inputEventPayload = StartFlow(FlowStartContext().apply {statusKey = flowKey}, "") - + val inputEventPayload = StartFlow(FlowStartContext().apply { statusKey = flowKey }, "") val testContext = buildFlowEventContext(checkpoint, inputEventPayload) val exception = FlowMarkedForKillException("reasoning") - whenever(flowRecordFactory.createFlowStatusRecord(any())).thenReturn(flowKilledStatusRecord) val response = target.process(exception, testContext) - assertThat(response.outputRecords) - .withFailMessage("Output records should only have flow status record") - .hasSize(1) - - assertThat(response.outputRecords) - .withFailMessage("Output records should contain the flow killed status record") - .contains(flowKilledStatusRecord) + assertThat(response.outputRecords).hasSize(1).contains(flowKilledStatusRecord) } @Test - fun `error processing MarkedForKillException falls back to null state record, empty response events and marked for DLQ`() { + fun `error processing FlowMarkedForKillException falls back to null state record, empty response events and marked for DLQ`() { val testContext = buildFlowEventContext(checkpoint, Any()) val exception = FlowMarkedForKillException("reasoning") - - whenever(checkpoint.doesExist).thenReturn(true) - - // The first call returns all sessions. - whenever(checkpoint.sessions) - .thenReturn(allFlowSessions) - .thenReturn(allFlowSessionsAfterErrorsSent) - - // simulating exception thrown during processing of the flow session - whenever(flowSessionManager.sendErrorMessages(eq(checkpoint), eq(activeFlowSessionIds), eq(exception), any())) - .thenThrow(IllegalArgumentException("some error message while sending errors to peers")) + // simulating exception thrown during processing + whenever( + checkpointCleanupHandler.cleanupCheckpoint( + any(), + any(), + any() + ) + ).thenThrow(IllegalArgumentException("some error message while sending errors to peers")) val response = target.process(exception, testContext) - verify(response.checkpoint).markDeleted() assertThat(response.outputRecords).isEmpty() assertThat(response.sendToDlq).isTrue } From cc6fbec796393d467b8caa4ff335cb05151b2f9b Mon Sep 17 00:00:00 2001 From: Conal Smith <68279385+conalsmith-r3@users.noreply.github.com> Date: Thu, 16 Nov 2023 13:57:29 +0000 Subject: [PATCH 13/18] CORE-17502 - enable state-manager with local deployment (#4974) Add token-selection section to local deployment --- state-manager.yaml | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/state-manager.yaml b/state-manager.yaml index b333b5c2f83..f1d52962b51 100644 --- a/state-manager.yaml +++ b/state-manager.yaml @@ -2,16 +2,8 @@ # Helm chart and a state-manager deployed via bitnami postgres helm chart with the overrides from # `state-manager-postgres.yaml`. # -# First use `./gradlew publishOSGiImage --parallel` to create local docker images -# -# Then deploy the prereqs using: -# helm upgrade --install prereqs -n corda \ -# oci://corda-os-docker.software.r3.com/helm-charts/corda-dev-prereqs \ -# --render-subchart-notes \ -# --timeout 10m \ -# # Then deploy the state manager using the bitnami helm chart -# helm install state-manager-db oci://docker-remotes.software.r3.com/bitnamicharts/postgresql -n corda --version "12.1.0" \ +# helm install state-manager-db oci://registry-1.docker.io/bitnamicharts/postgresql -n corda --version "12.1.0" \ # -f ./state-manager-postgres.yaml \ # --timeout 10m \ # --wait @@ -46,6 +38,14 @@ bootstrap: secretKeyRef: name: "state-manager-db-postgresql" key: "postgres-password" + tokenSelection: + username: + value: "postgres" + password: + valueFrom: + secretKeyRef: + name: "state-manager-db-postgresql" + key: "postgres-password" workers: flow: stateManager: From fb8ddad51d90df99227c424ff4a7553b97137c3e Mon Sep 17 00:00:00 2001 From: Yiftach Kaplan <67583323+yift-r3@users.noreply.github.com> Date: Thu, 16 Nov 2023 19:44:45 +0000 Subject: [PATCH 14/18] CORE-18390: Allow old version of SetOwnRegistrationStatus in the command (#5114) * CORE-18357: Allow old version of SetOwnRegistrationStatus in the command * Set to the correct version --- .../PersistMemberRegistrationStateHandler.kt | 41 +++++++++++++++++-- ...rsistMemberRegistrationStateHandlerTest.kt | 17 ++++---- gradle.properties | 2 +- 3 files changed, 48 insertions(+), 12 deletions(-) diff --git a/components/membership/registration-impl/src/main/kotlin/net/corda/membership/impl/registration/dynamic/handler/member/PersistMemberRegistrationStateHandler.kt b/components/membership/registration-impl/src/main/kotlin/net/corda/membership/impl/registration/dynamic/handler/member/PersistMemberRegistrationStateHandler.kt index c6d92b889ff..4178f8dc91e 100644 --- a/components/membership/registration-impl/src/main/kotlin/net/corda/membership/impl/registration/dynamic/handler/member/PersistMemberRegistrationStateHandler.kt +++ b/components/membership/registration-impl/src/main/kotlin/net/corda/membership/impl/registration/dynamic/handler/member/PersistMemberRegistrationStateHandler.kt @@ -2,6 +2,10 @@ package net.corda.membership.impl.registration.dynamic.handler.member import net.corda.data.identity.HoldingIdentity import net.corda.data.membership.command.registration.member.PersistMemberRegistrationState +import net.corda.data.membership.common.RegistrationStatus as RegistrationStatusV1 +import net.corda.data.membership.common.v2.RegistrationStatus as RegistrationStatusV2 +import net.corda.data.membership.p2p.SetOwnRegistrationStatus as SetOwnRegistrationStatusV1 +import net.corda.data.membership.p2p.v2.SetOwnRegistrationStatus as SetOwnRegistrationStatusV2 import net.corda.data.membership.state.RegistrationState import net.corda.membership.impl.registration.dynamic.handler.RegistrationHandler import net.corda.membership.impl.registration.dynamic.handler.RegistrationHandlerResult @@ -17,11 +21,12 @@ internal class PersistMemberRegistrationStateHandler( command: PersistMemberRegistrationState, ): RegistrationHandlerResult { val member = command.member.toCorda() + val request = command.request() val commands = membershipPersistenceClient.setRegistrationRequestStatus( member, - command.setStatusRequest.registrationId, - command.setStatusRequest.newStatus, - command.setStatusRequest.reason + request.registrationId, + request.newStatus, + request.reason ).createAsyncCommands() return RegistrationHandlerResult( null, @@ -35,4 +40,34 @@ internal class PersistMemberRegistrationStateHandler( state: RegistrationState?, command: PersistMemberRegistrationState ): HoldingIdentity = command.member + + private fun PersistMemberRegistrationState.request(): SetOwnRegistrationStatusV2 { + val request = this.setStatusRequest + return when (request) { + is SetOwnRegistrationStatusV2 -> request + is SetOwnRegistrationStatusV1 -> SetOwnRegistrationStatusV2( + request.registrationId, + request.newStatus.toV2(), + null + ) + else -> throw IllegalArgumentException("Unknown request status '${request.javaClass}' received.") + } + } + + private fun RegistrationStatusV1.toV2(): RegistrationStatusV2 { + return when(this) { + RegistrationStatusV1.NEW -> RegistrationStatusV2.NEW + RegistrationStatusV1.SENT_TO_MGM -> RegistrationStatusV2.SENT_TO_MGM + RegistrationStatusV1.RECEIVED_BY_MGM -> RegistrationStatusV2.RECEIVED_BY_MGM + RegistrationStatusV1.PENDING_MEMBER_VERIFICATION -> RegistrationStatusV2.PENDING_MEMBER_VERIFICATION + RegistrationStatusV1.PENDING_MANUAL_APPROVAL -> RegistrationStatusV2.PENDING_MANUAL_APPROVAL + RegistrationStatusV1.PENDING_AUTO_APPROVAL -> RegistrationStatusV2.PENDING_AUTO_APPROVAL + RegistrationStatusV1.APPROVED -> RegistrationStatusV2.APPROVED + RegistrationStatusV1.DECLINED -> RegistrationStatusV2.DECLINED + RegistrationStatusV1.INVALID -> RegistrationStatusV2.INVALID + RegistrationStatusV1.FAILED -> RegistrationStatusV2.FAILED + else -> throw IllegalArgumentException("Unknown status '${this.name}' received.") + } + + } } diff --git a/components/membership/registration-impl/src/test/kotlin/net/corda/membership/impl/registration/dynamic/handler/member/PersistMemberRegistrationStateHandlerTest.kt b/components/membership/registration-impl/src/test/kotlin/net/corda/membership/impl/registration/dynamic/handler/member/PersistMemberRegistrationStateHandlerTest.kt index f7acb9236d3..8386019d3ad 100644 --- a/components/membership/registration-impl/src/test/kotlin/net/corda/membership/impl/registration/dynamic/handler/member/PersistMemberRegistrationStateHandlerTest.kt +++ b/components/membership/registration-impl/src/test/kotlin/net/corda/membership/impl/registration/dynamic/handler/member/PersistMemberRegistrationStateHandlerTest.kt @@ -38,13 +38,14 @@ class PersistMemberRegistrationStateHandlerTest { } doReturn operation } private val reason = "some reason" - val command = PersistMemberRegistrationState( + private val request = SetOwnRegistrationStatus( + UUID(1,2).toString(), + RegistrationStatus.DECLINED, + reason + ) + private val command = PersistMemberRegistrationState( HoldingIdentity("O=Alice, L=London, C=GB", "GroupId"), - SetOwnRegistrationStatus( - UUID(1,2).toString(), - RegistrationStatus.DECLINED, - reason - ) + request ) private val handler = PersistMemberRegistrationStateHandler( @@ -66,8 +67,8 @@ class PersistMemberRegistrationStateHandlerTest { verify(membershipPersistenceClient).setRegistrationRequestStatus( command.member.toCorda(), - command.setStatusRequest.registrationId, - command.setStatusRequest.newStatus, + request.registrationId, + request.newStatus, reason ) } diff --git a/gradle.properties b/gradle.properties index 5cf01b55410..5a2f4561c8c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -46,7 +46,7 @@ commonsTextVersion = 1.10.0 bouncycastleVersion=1.76 # Corda API libs revision (change in 4th digit indicates a breaking change) # Change to 5.1.0.xx-SNAPSHOT to pick up maven local published copy -cordaApiVersion=5.1.0.37-beta+ +cordaApiVersion=5.1.0.38-beta+ disruptorVersion=3.4.4 felixConfigAdminVersion=1.9.26 From e86405d976df295a84b6ab5265a6610b0e167abe Mon Sep 17 00:00:00 2001 From: Dan Newton Date: Fri, 17 Nov 2023 10:14:14 +0000 Subject: [PATCH 15/18] CORE-18370 Fix `findUnconsumedStatesByExactType` (#5111) The query for `findUnconsumedStatesByExactType` did not actually filter by `consumed IS NULL` so it returned all states regardless of if they were consumed or not. Added `consumed IS NULL` to fix this. --- .../query/registration/impl/VaultNamedQueryFactoryProvider.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/ledger/ledger-persistence/src/main/kotlin/net/corda/ledger/persistence/query/registration/impl/VaultNamedQueryFactoryProvider.kt b/components/ledger/ledger-persistence/src/main/kotlin/net/corda/ledger/persistence/query/registration/impl/VaultNamedQueryFactoryProvider.kt index b66e5e5b056..5b289ed5e95 100644 --- a/components/ledger/ledger-persistence/src/main/kotlin/net/corda/ledger/persistence/query/registration/impl/VaultNamedQueryFactoryProvider.kt +++ b/components/ledger/ledger-persistence/src/main/kotlin/net/corda/ledger/persistence/query/registration/impl/VaultNamedQueryFactoryProvider.kt @@ -48,7 +48,7 @@ class VaultNamedQueryFactoryProvider @Activate constructor( private fun registerPlatformQueries(vaultNamedQueryBuilderFactory: VaultNamedQueryBuilderFactory) { vaultNamedQueryBuilderFactory .create(FIND_UNCONSUMED_STATES_BY_EXACT_TYPE) - .whereJson("WHERE visible_states.type = :type") + .whereJson("WHERE visible_states.type = :type AND visible_states.consumed IS NULL") .register() } } From f441f27aa12a80f4ee31d814fd3987c148647ebd Mon Sep 17 00:00:00 2001 From: Yash Nabar Date: Fri, 17 Nov 2023 13:27:57 +0000 Subject: [PATCH 16/18] CORE-15267 Make MemberInfo serializable (#5127) Makes `MemberInfo` serializable by registering custom serializers for `MemberContext` and `MGMContext`. The corresponding corda-api change marks these interfaces as `@CordaSerializable`. --- gradle.properties | 2 +- libs/membership/membership-impl/build.gradle | 2 + .../serializer/amqp/MGMContextSerializer.kt | 72 +++++++++++++++++++ .../amqp/MemberContextSerializer.kt | 72 +++++++++++++++++++ 4 files changed, 147 insertions(+), 1 deletion(-) create mode 100644 libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MGMContextSerializer.kt create mode 100644 libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MemberContextSerializer.kt diff --git a/gradle.properties b/gradle.properties index 5a2f4561c8c..2f2b96c64e1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -46,7 +46,7 @@ commonsTextVersion = 1.10.0 bouncycastleVersion=1.76 # Corda API libs revision (change in 4th digit indicates a breaking change) # Change to 5.1.0.xx-SNAPSHOT to pick up maven local published copy -cordaApiVersion=5.1.0.38-beta+ +cordaApiVersion=5.1.0.39-beta+ disruptorVersion=3.4.4 felixConfigAdminVersion=1.9.26 diff --git a/libs/membership/membership-impl/build.gradle b/libs/membership/membership-impl/build.gradle index 96285545f43..6965bd5bd8b 100644 --- a/libs/membership/membership-impl/build.gradle +++ b/libs/membership/membership-impl/build.gradle @@ -17,7 +17,9 @@ dependencies { implementation project(':libs:crypto:crypto-core') implementation project(":libs:membership:membership-common") implementation project(":libs:utilities") + implementation project(':libs:sandbox-types') implementation project(":libs:serialization:serialization-avro") + implementation project(':libs:serialization:serialization-internal') implementation "net.corda:corda-avro-schema" implementation "net.corda:corda-base" diff --git a/libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MGMContextSerializer.kt b/libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MGMContextSerializer.kt new file mode 100644 index 00000000000..b287f9aed0b --- /dev/null +++ b/libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MGMContextSerializer.kt @@ -0,0 +1,72 @@ +package net.corda.membership.lib.impl.serializer.amqp + +import net.corda.layeredpropertymap.LayeredPropertyMapFactory +import net.corda.membership.lib.impl.MGMContextImpl +import net.corda.sandbox.type.SandboxConstants +import net.corda.sandbox.type.UsedByFlow +import net.corda.sandbox.type.UsedByPersistence +import net.corda.sandbox.type.UsedByVerification +import net.corda.serialization.BaseProxySerializer +import net.corda.serialization.InternalCustomSerializer +import net.corda.v5.base.exceptions.CordaRuntimeException +import net.corda.v5.base.types.LayeredPropertyMap +import org.osgi.service.component.annotations.Activate +import org.osgi.service.component.annotations.Component +import org.osgi.service.component.annotations.Reference +import org.osgi.service.component.annotations.ServiceScope + +@Component( + service = [ InternalCustomSerializer::class, UsedByFlow::class, UsedByPersistence::class, UsedByVerification::class ], + property = [SandboxConstants.CORDA_UNINJECTABLE_SERVICE], + scope = ServiceScope.PROTOTYPE +) +class MGMContextSerializer @Activate constructor( + @Reference(service = LayeredPropertyMapFactory::class) + private val layeredPropertyMapFactory: LayeredPropertyMapFactory, +) : BaseProxySerializer(), UsedByFlow, UsedByPersistence, UsedByVerification { + private companion object { + private const val VERSION_1 = 1 + } + + override fun toProxy(obj: MGMContextImpl): MGMContextProxy { + return MGMContextProxy( + VERSION_1, + obj.toMap() + ) + } + + override fun fromProxy(proxy: MGMContextProxy): MGMContextImpl { + return when(proxy.version) { + VERSION_1 -> + MGMContextImpl(layeredPropertyMapFactory.createMap(proxy.map)) + else -> + throw CordaRuntimeException("Unable to create MGMContextImpl with Version='${proxy.version}'") + } + } + + override val proxyType: Class + get() = MGMContextProxy::class.java + + override val type: Class + get() = MGMContextImpl::class.java + + override val withInheritance: Boolean + get() = false + + private fun LayeredPropertyMap.toMap() = this.entries.associate { it.key to it.value } + +} + +/** + * The class that actually gets serialized on the wire. + */ +data class MGMContextProxy( + /** + * Version of container. + */ + val version: Int, + /** + * Properties for [MGMContextImpl] serialization. + */ + val map: Map +) diff --git a/libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MemberContextSerializer.kt b/libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MemberContextSerializer.kt new file mode 100644 index 00000000000..a6b795e0bff --- /dev/null +++ b/libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MemberContextSerializer.kt @@ -0,0 +1,72 @@ +package net.corda.membership.lib.impl.serializer.amqp + +import net.corda.layeredpropertymap.LayeredPropertyMapFactory +import net.corda.membership.lib.impl.MemberContextImpl +import net.corda.sandbox.type.SandboxConstants.CORDA_UNINJECTABLE_SERVICE +import net.corda.sandbox.type.UsedByFlow +import net.corda.sandbox.type.UsedByPersistence +import net.corda.sandbox.type.UsedByVerification +import net.corda.serialization.BaseProxySerializer +import net.corda.serialization.InternalCustomSerializer +import net.corda.v5.base.exceptions.CordaRuntimeException +import net.corda.v5.base.types.LayeredPropertyMap +import org.osgi.service.component.annotations.Activate +import org.osgi.service.component.annotations.Component +import org.osgi.service.component.annotations.Reference +import org.osgi.service.component.annotations.ServiceScope + +@Component( + service = [ InternalCustomSerializer::class, UsedByFlow::class, UsedByPersistence::class, UsedByVerification::class ], + property = [ CORDA_UNINJECTABLE_SERVICE ], + scope = ServiceScope.PROTOTYPE +) +class MemberContextSerializer @Activate constructor( + @Reference(service = LayeredPropertyMapFactory::class) + private val layeredPropertyMapFactory: LayeredPropertyMapFactory, +) : BaseProxySerializer(), UsedByFlow, UsedByPersistence, UsedByVerification { + private companion object { + private const val VERSION_1 = 1 + } + + override fun toProxy(obj: MemberContextImpl): MemberContextProxy { + return MemberContextProxy( + VERSION_1, + obj.toMap() + ) + } + + override fun fromProxy(proxy: MemberContextProxy): MemberContextImpl { + return when(proxy.version) { + VERSION_1 -> + MemberContextImpl(layeredPropertyMapFactory.createMap(proxy.map)) + else -> + throw CordaRuntimeException("Unable to create MemberContextImpl with Version='${proxy.version}'") + } + } + + override val proxyType: Class + get() = MemberContextProxy::class.java + + override val type: Class + get() = MemberContextImpl::class.java + + override val withInheritance: Boolean + get() = false + + private fun LayeredPropertyMap.toMap() = this.entries.associate { it.key to it.value } + +} + +/** + * The class that actually gets serialized on the wire. + */ +data class MemberContextProxy( + /** + * Version of container. + */ + val version: Int, + /** + * Properties for [MemberContextImpl] serialization. + */ + val map: Map +) From bd1e42602d569d44ab1290b17dab4af794a97044 Mon Sep 17 00:00:00 2001 From: Ben Millar <44114751+ben-millar@users.noreply.github.com> Date: Fri, 17 Nov 2023 16:01:09 +0000 Subject: [PATCH 17/18] CORE-17917 Adding handling to MessageBusClient for async kafka errors (#5107) A bug was observed whereby Kafka producer errors were not being correctly reported back to the MultiSourceEventMediator. This is because our Kafka messages are essentially 'fire-and-forget'; when something goes wrong, Kafka will exceptionally close a future in the background, but we never attempted to capture or track this future. This PR adds a callback which will log this exception clearly for the user. --- .../messaging/mediator/MessageBusClient.kt | 36 +++++++- .../messaging/publisher/CordaPublisherImpl.kt | 2 +- .../mediator/MessageBusClientTest.kt | 85 ++++++++++++++++--- 3 files changed, 106 insertions(+), 17 deletions(-) diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MessageBusClient.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MessageBusClient.kt index 116ca1579dd..8b2c43f8818 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MessageBusClient.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MessageBusClient.kt @@ -2,12 +2,14 @@ package net.corda.messaging.mediator import net.corda.messagebus.api.producer.CordaProducer import net.corda.messagebus.api.producer.CordaProducerRecord +import net.corda.messaging.api.exception.CordaMessageAPIFatalException import net.corda.messaging.api.mediator.MediatorMessage import net.corda.messaging.api.mediator.MessagingClient import net.corda.messaging.api.mediator.MessagingClient.Companion.MSG_PROP_ENDPOINT import net.corda.messaging.api.mediator.MessagingClient.Companion.MSG_PROP_KEY import org.slf4j.Logger import org.slf4j.LoggerFactory +import java.util.concurrent.CompletableFuture class MessageBusClient( override val id: String, @@ -18,9 +20,31 @@ class MessageBusClient( private val log: Logger = LoggerFactory.getLogger(this::class.java.enclosingClass) } - override fun send(message: MediatorMessage<*>): MediatorMessage<*>? { - producer.send(message.toCordaProducerRecord(), null) - return null + override fun send(message: MediatorMessage<*>): MediatorMessage<*> { + val future = CompletableFuture() + val record = message.toCordaProducerRecord() + producer.send(record) { ex -> + setFutureFromResponse(ex, future, record.topic) + } + + return MediatorMessage(future) + } + + /** + * Helper function to set a [future] result based on the presence of an [exception] + */ + private fun setFutureFromResponse( + exception: Exception?, + future: CompletableFuture, + topic: String + ) { + if (exception == null) { + future.complete(Unit) + } else { + val message = "Producer clientId $id for topic $topic failed to send." + log.warn(message, exception) + future.completeExceptionally(CordaMessageAPIFatalException(message, exception)) + } } override fun close() { @@ -34,6 +58,9 @@ class MessageBusClient( } } +/** + * Helper function to convert a [MediatorMessage] of a specific format to a [CordaProducerRecord] + */ private fun MediatorMessage<*>.toCordaProducerRecord(): CordaProducerRecord<*, *> { return CordaProducerRecord( topic = this.getProperty(MSG_PROP_ENDPOINT), @@ -43,5 +70,8 @@ private fun MediatorMessage<*>.toCordaProducerRecord(): CordaProducerRecord<*, * ) } +/** + * Helper function to extract headers from message props + */ private fun Map.toHeaders() = map { (key, value) -> (key to value.toString()) } \ No newline at end of file diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/publisher/CordaPublisherImpl.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/publisher/CordaPublisherImpl.kt index 5c9f2646aca..fa0b94f75eb 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/publisher/CordaPublisherImpl.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/publisher/CordaPublisherImpl.kt @@ -232,7 +232,7 @@ internal class CordaPublisherImpl( if (!config.transactional) { future.complete(Unit) } else { - log.debug { "Asynchronous send completed completed successfully." } + log.debug { "Asynchronous send completed successfully." } } } diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MessageBusClientTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MessageBusClientTest.kt index 58559d25ba4..1cf81e3a2e6 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MessageBusClientTest.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MessageBusClientTest.kt @@ -2,19 +2,26 @@ package net.corda.messaging.mediator import net.corda.messagebus.api.producer.CordaProducer import net.corda.messagebus.api.producer.CordaProducerRecord +import net.corda.messaging.api.exception.CordaMessageAPIFatalException import net.corda.messaging.api.mediator.MediatorMessage import net.corda.messaging.api.mediator.MessagingClient.Companion.MSG_PROP_ENDPOINT import net.corda.v5.base.exceptions.CordaRuntimeException +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.mockito.Mockito +import org.mockito.Mockito.any +import org.mockito.Mockito.doAnswer import org.mockito.Mockito.times import org.mockito.kotlin.eq -import org.mockito.kotlin.isNull import org.mockito.kotlin.mock import org.mockito.kotlin.verify import org.mockito.kotlin.whenever +import java.util.concurrent.CompletableFuture class MessageBusClientTest { private companion object { @@ -31,6 +38,12 @@ class MessageBusClientTest { MSG_PROP_KEY to TEST_KEY, ) private val message: MediatorMessage = MediatorMessage("value", messageProps) + private val record: CordaProducerRecord<*, *> = CordaProducerRecord( + TEST_ENDPOINT, + TEST_KEY, + message.payload, + messageProps.toHeaders(), + ) @BeforeEach @@ -39,22 +52,26 @@ class MessageBusClientTest { messageBusClient = MessageBusClient("client-id", cordaProducer) } + @Suppress("UNCHECKED_CAST") @Test - fun testSend() { - messageBusClient.send(message) + fun `test send`() { + doAnswer { + val callback = it.getArgument(1) + callback.onCompletion(null) + }.whenever(cordaProducer).send(eq(record), any()) - val expected = CordaProducerRecord( - TEST_ENDPOINT, - TEST_KEY, - message.payload, - messageProps.toHeaders(), - ) + val result = messageBusClient.send(message) as MediatorMessage> - verify(cordaProducer).send(eq(expected), isNull()) + verify(cordaProducer).send(eq(record), any()) + assertNotNull(result.payload) + result.payload?.let { + assertTrue(it.isDone) + assertFalse(it.isCompletedExceptionally) + } } @Test - fun testSendWithError() { + fun `send should handle synchronous error`() { val record = CordaProducerRecord( TEST_ENDPOINT, TEST_KEY, @@ -62,14 +79,56 @@ class MessageBusClientTest { messageProps.toHeaders(), ) - Mockito.doThrow(CordaRuntimeException("")).whenever(cordaProducer).send(eq(record), isNull()) + Mockito.doThrow(CordaRuntimeException("")).whenever(cordaProducer).send(eq(record), any()) assertThrows { messageBusClient.send(message) } } + @Suppress("UNCHECKED_CAST") + @Test + fun `send should handle asynchronous CordaMessageAPIFatalException`() { + doAnswer { + val callback = it.getArgument(1) + callback.onCompletion(CordaMessageAPIFatalException("test")) + }.whenever(cordaProducer).send(eq(record), any()) + + val result = messageBusClient.send(message) as MediatorMessage> + + verify(cordaProducer).send(eq(record), any()) + assertNotNull(result.payload) + + result.payload?.isCompletedExceptionally?.let { assertTrue(it) } + + result.payload?.handle { _, exception -> + assertTrue(exception is CordaMessageAPIFatalException) + assertEquals("Producer clientId client-id for topic topic failed to send.", exception.message) + }?.get() + } + + @Suppress("UNCHECKED_CAST") + @Test + fun `send should wrap unknown exceptions`() { + doAnswer { + val callback = it.getArgument(1) + callback.onCompletion(CordaRuntimeException("test")) + }.whenever(cordaProducer).send(eq(record), any()) + + val result = messageBusClient.send(message) as MediatorMessage> + + verify(cordaProducer).send(eq(record), any()) + assertNotNull(result.payload) + + result.payload?.isCompletedExceptionally?.let { assertTrue(it) } + + result.payload?.handle { _, exception -> + assertTrue(exception is CordaMessageAPIFatalException) + assertEquals("Producer clientId client-id for topic topic failed to send.", exception.message) + }?.get() + } + @Test - fun testClose() { + fun `test close`() { messageBusClient.close() verify(cordaProducer, times(1)).close() } From 3284054ec6bab78f762097e64b604d889df568c7 Mon Sep 17 00:00:00 2001 From: Connel McGovern <100574906+mcgovc@users.noreply.github.com> Date: Fri, 1 Dec 2023 11:37:57 +0000 Subject: [PATCH 18/18] ES-1707: Revert Codeowners files to pre code freeze state (#5190) --- .github/CODEOWNERS | 108 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 107 insertions(+), 1 deletion(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 734204008da..861051ac939 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1 +1,107 @@ -* @driessamyn @jasonbyrner3 @dimosr @ronanbrowne @rick-r3 @simon-johnson-r3 @blsemo @Omar-awad @aditisdesai @vinir3 @vkolomeyko @thiagoviana @Sakpal + +# Build scripts should be audited by BLT + +Jenkinsfile @corda/blt +.ci/** @corda/blt + +*.gradle @corda/blt +gradle.properties @corda/corda5-team-leads +gradle/* @corda/blt + +.github/** @corda/blt +CODEOWNERS @corda/blt @corda/corda5-team-leads + +# Modules to be audited by REST team +/applications/workers/release/rest-worker/ @corda/rest +/components/rest-gateway-comp/ @corda/rest +/components/permissions/ @corda/rest +/components/rbac-security-manager-service/ @corda/rest +/libs/rest/ @corda/rest +/libs/permissions/ @corda/rest +/processors/rest-processor/ @corda/rest +/tools/plugins/initial-rbac/ @corda/rest +/tools/plugins/plugins-rest/ @corda/rest +/tools/plugins/virtual-node/ @corda/rest + +# Corda Helm chart for cluster management team +/charts/corda/ @corda/cluster-management + +# Modules to be audited by the Network team +/applications/workers/release/p2p-gateway-worker/ @corda/corda-platform-network-team +/applications/workers/release/p2p-link-manager-worker/ @corda/corda-platform-network-team +/applications/workers/release/member-worker/ @corda/corda-platform-network-team +/processors/link-manager-processor/ @corda/corda-platform-network-team +/processors/gateway-processor/ @corda/corda-platform-network-team +/processors/member-processor/ @corda/corda-platform-network-team +/components/gateway/ @corda/corda-platform-network-team +/components/link-manager/ @corda/corda-platform-network-team +/components/membership/ @corda/corda-platform-network-team +/libs/membership/ @corda/corda-platform-network-team +/libs/p2p-crypto/ @corda/corda-platform-network-team +/libs/layered-property-map/ @corda/corda-platform-network-team +/tools/plugins/mgm/ @corda/corda-platform-network-team +/tools/plugins/network/ @corda/corda-platform-network-team +/applications/tools/p2p-test/ @corda/corda-platform-network-team + +# Modules to be audited by Sandboxing SMEs +/components/security-manager/ @corda/sandboxing +/components/virtual-node/sandbox-* @corda/sandboxing +/components/sandbox* @corda/sandboxing +/libs/virtual-node/sandbox-* @corda/sandboxing +/osgi-* @corda/sandboxing +/testing/sandboxes/ @corda/sandboxing +/testing/sandboxes-testkit/ @corda/sandboxing +/testing/security-manager-utilities/ @corda/sandboxing + +# Modules to be audited by Crypto SMEs +/components/crypto/ @corda/crypto +/libs/crypto/ @corda/crypto +/processors/crypto/ @corda/crypto + +# Modules to be audited by Packaging SMEs +/components/chunking/ @corda/packaging +/components/virtual-node/cpi-* @corda/packaging +/components/virtual-node/cpk-* @corda/packaging +/libs/chunking/ @corda/packaging +/libs/packaging/ @corda/packaging +/libs/serialization/ @corda/packaging +/libs/virtual-node/cpi-* @corda/packaging +/testing/packaging-test-utilities/ @corda/packaging +/tools/plugins/package @corda/packaging + +# Modules to be audited by DB SMEs +/components/db/ @corda/db +/components/persistence/ @corda/db +/components/reconciliation/ @corda/db +/libs/db/ @corda/db +/processors/db/ @corda/db +/testing/persistence-testkit/ @corda/db +/tools/plugins/db-config @corda/db + +# Modules to be audited by Flow Worker team +/components/flow/ @corda/flow-worker +/libs/flows/ @corda/flow-worker +/libs/lifecycle/ @corda/flow-worker +/libs/messaging/ @corda/flow-worker +/libs/application/application-impl/ @corda/flow-worker +/processors/flow-processor/ @corda/flow-worker +/testing/flow/ @corda/flow-worker +/testing/message-patterns/ @corda/flow-worker +/applications/workers/release/flow-worker @corda/flow-worker + +# Modules to be audited by Ledger SMEs +/components/ledger/ @corda/ledger +/libs/ledger/ @corda/ledger +/testing/ledger/ @corda/ledger + +# Modules to be audited by Notary SMEs +/components/uniqueness/ @corda/notaries +/libs/uniqueness/ @corda/notaries +/notary-plugins/ @corda/notaries +/processors/uniqueness-processor/ @corda/notaries +/testing/uniqueness/ @corda/notaries + +# Ledger token selection files to be reviewed by the REST team +# This needs to be after the ledger rules to partially override those +/components/ledger/ledger-utxo-token-cache @corda/rest +/components/ledger/ledger-utxo-flow/src/main/kotlin/net/corda/ledger/utxo/impl/token @corda/rest