diff --git a/.ci/dev/forward-merge/JenkinsfileMergeAutomation b/.ci/dev/forward-merge/JenkinsfileMergeAutomation new file mode 100644 index 00000000000..d87cec96325 --- /dev/null +++ b/.ci/dev/forward-merge/JenkinsfileMergeAutomation @@ -0,0 +1,32 @@ +#! groovy +@Library('corda-shared-build-pipeline-steps@5.1') _ + +/** + * Forward merge any changes in current branch to the branch with following version. + * + * Please note, the branches names are intentionally separated as variables, to minimised conflicts + * during automated merges for this file. + * + * These variables should be updated when a new version is cut + */ + +/** + * the branch name of origin branch, it should match the current branch + * and it acts as a fail-safe inside {@code forwardMerger} pipeline + */ +String originBranch = 'release/os/5.1' + +/** + * the branch name of target branch, it should be the branch with the next version + * after the one in current branch. + */ +String targetBranch = 'release/os/5.2' + +/** + * Forward merge any changes between {@code originBranch} and {@code targetBranch} + */ +forwardMerger( + targetBranch: targetBranch, + originBranch: originBranch, + slackChannel: '#c5-forward-merge-bot-notifications' +) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 734204008da..861051ac939 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1 +1,107 @@ -* @driessamyn @jasonbyrner3 @dimosr @ronanbrowne @rick-r3 @simon-johnson-r3 @blsemo @Omar-awad @aditisdesai @vinir3 @vkolomeyko @thiagoviana @Sakpal + +# Build scripts should be audited by BLT + +Jenkinsfile @corda/blt +.ci/** @corda/blt + +*.gradle @corda/blt +gradle.properties @corda/corda5-team-leads +gradle/* @corda/blt + +.github/** @corda/blt +CODEOWNERS @corda/blt @corda/corda5-team-leads + +# Modules to be audited by REST team +/applications/workers/release/rest-worker/ @corda/rest +/components/rest-gateway-comp/ @corda/rest +/components/permissions/ @corda/rest +/components/rbac-security-manager-service/ @corda/rest +/libs/rest/ @corda/rest +/libs/permissions/ @corda/rest +/processors/rest-processor/ @corda/rest +/tools/plugins/initial-rbac/ @corda/rest +/tools/plugins/plugins-rest/ @corda/rest +/tools/plugins/virtual-node/ @corda/rest + +# Corda Helm chart for cluster management team +/charts/corda/ @corda/cluster-management + +# Modules to be audited by the Network team +/applications/workers/release/p2p-gateway-worker/ @corda/corda-platform-network-team +/applications/workers/release/p2p-link-manager-worker/ @corda/corda-platform-network-team +/applications/workers/release/member-worker/ @corda/corda-platform-network-team +/processors/link-manager-processor/ @corda/corda-platform-network-team +/processors/gateway-processor/ @corda/corda-platform-network-team +/processors/member-processor/ @corda/corda-platform-network-team +/components/gateway/ @corda/corda-platform-network-team +/components/link-manager/ @corda/corda-platform-network-team +/components/membership/ @corda/corda-platform-network-team +/libs/membership/ @corda/corda-platform-network-team +/libs/p2p-crypto/ @corda/corda-platform-network-team +/libs/layered-property-map/ @corda/corda-platform-network-team +/tools/plugins/mgm/ @corda/corda-platform-network-team +/tools/plugins/network/ @corda/corda-platform-network-team +/applications/tools/p2p-test/ @corda/corda-platform-network-team + +# Modules to be audited by Sandboxing SMEs +/components/security-manager/ @corda/sandboxing +/components/virtual-node/sandbox-* @corda/sandboxing +/components/sandbox* @corda/sandboxing +/libs/virtual-node/sandbox-* @corda/sandboxing +/osgi-* @corda/sandboxing +/testing/sandboxes/ @corda/sandboxing +/testing/sandboxes-testkit/ @corda/sandboxing +/testing/security-manager-utilities/ @corda/sandboxing + +# Modules to be audited by Crypto SMEs +/components/crypto/ @corda/crypto +/libs/crypto/ @corda/crypto +/processors/crypto/ @corda/crypto + +# Modules to be audited by Packaging SMEs +/components/chunking/ @corda/packaging +/components/virtual-node/cpi-* @corda/packaging +/components/virtual-node/cpk-* @corda/packaging +/libs/chunking/ @corda/packaging +/libs/packaging/ @corda/packaging +/libs/serialization/ @corda/packaging +/libs/virtual-node/cpi-* @corda/packaging +/testing/packaging-test-utilities/ @corda/packaging +/tools/plugins/package @corda/packaging + +# Modules to be audited by DB SMEs +/components/db/ @corda/db +/components/persistence/ @corda/db +/components/reconciliation/ @corda/db +/libs/db/ @corda/db +/processors/db/ @corda/db +/testing/persistence-testkit/ @corda/db +/tools/plugins/db-config @corda/db + +# Modules to be audited by Flow Worker team +/components/flow/ @corda/flow-worker +/libs/flows/ @corda/flow-worker +/libs/lifecycle/ @corda/flow-worker +/libs/messaging/ @corda/flow-worker +/libs/application/application-impl/ @corda/flow-worker +/processors/flow-processor/ @corda/flow-worker +/testing/flow/ @corda/flow-worker +/testing/message-patterns/ @corda/flow-worker +/applications/workers/release/flow-worker @corda/flow-worker + +# Modules to be audited by Ledger SMEs +/components/ledger/ @corda/ledger +/libs/ledger/ @corda/ledger +/testing/ledger/ @corda/ledger + +# Modules to be audited by Notary SMEs +/components/uniqueness/ @corda/notaries +/libs/uniqueness/ @corda/notaries +/notary-plugins/ @corda/notaries +/processors/uniqueness-processor/ @corda/notaries +/testing/uniqueness/ @corda/notaries + +# Ledger token selection files to be reviewed by the REST team +# This needs to be after the ledger rules to partially override those +/components/ledger/ledger-utxo-token-cache @corda/rest +/components/ledger/ledger-utxo-flow/src/main/kotlin/net/corda/ledger/utxo/impl/token @corda/rest diff --git a/applications/workers/workers-smoketest/src/smokeTest/kotlin/net/corda/applications/workers/smoketest/services/CryptoRPCSmokeTests.kt b/applications/workers/workers-smoketest/src/smokeTest/kotlin/net/corda/applications/workers/smoketest/services/CryptoRPCSmokeTests.kt index 1dde1fbfe77..77a4043b2e0 100644 --- a/applications/workers/workers-smoketest/src/smokeTest/kotlin/net/corda/applications/workers/smoketest/services/CryptoRPCSmokeTests.kt +++ b/applications/workers/workers-smoketest/src/smokeTest/kotlin/net/corda/applications/workers/smoketest/services/CryptoRPCSmokeTests.kt @@ -225,9 +225,9 @@ class CryptoRPCSmokeTests { assertEquals(expected.requestId, actual.requestId) assertEquals(expected.requestingComponent, actual.requestingComponent) assertEquals(expected.requestTimestamp, actual.requestTimestamp) - assertThat(actual.responseTimestamp.epochSecond) - .isGreaterThanOrEqualTo(expected.requestTimestamp.epochSecond) - .isLessThanOrEqualTo(now.epochSecond) + assertThat(actual.responseTimestamp.toEpochMilli()) + .isGreaterThanOrEqualTo(expected.requestTimestamp.toEpochMilli()) + .isLessThanOrEqualTo(now.toEpochMilli()) assertSoftly { softly -> softly.assertThat(actual.other.items.size == expected.other.items.size) softly.assertThat(actual.other.items.containsAll(expected.other.items)) diff --git a/components/configuration/configuration-write-service-impl/src/main/kotlin/net/corda/configuration/write/impl/publish/ConfigPublishServiceImpl.kt b/components/configuration/configuration-write-service-impl/src/main/kotlin/net/corda/configuration/write/impl/publish/ConfigPublishServiceImpl.kt index f603e502531..80df16543b4 100644 --- a/components/configuration/configuration-write-service-impl/src/main/kotlin/net/corda/configuration/write/impl/publish/ConfigPublishServiceImpl.kt +++ b/components/configuration/configuration-write-service-impl/src/main/kotlin/net/corda/configuration/write/impl/publish/ConfigPublishServiceImpl.kt @@ -19,6 +19,7 @@ import net.corda.v5.base.versioning.Version import org.osgi.service.component.annotations.Activate import org.osgi.service.component.annotations.Component import org.osgi.service.component.annotations.Reference +import org.slf4j.LoggerFactory // This needs to be a `Lifecycle` for reconciliation, maybe not only for that. However it cannot really wait on // `ConfigurationReadService`, because it will be used by `ConfigWriteService` which needs to be started before @@ -36,6 +37,10 @@ class ConfigPublishServiceImpl @Activate constructor( configurationValidatorFactory: ConfigurationValidatorFactory ) : ConfigPublishService { + companion object { + private val logger = LoggerFactory.getLogger(ConfigPublishServiceImpl::class.java) + } + private val handler = ConfigPublishServiceHandler(publisherFactory, configMerger) override val lifecycleCoordinatorName = LifecycleCoordinatorName.forComponent() @@ -102,6 +107,56 @@ class ConfigPublishServiceImpl @Activate constructor( TODO("Not yet implemented") } + override fun valuesMisalignedAfterDefaults( + recordKey: String, + dbRecordValue: Configuration, + kafkaRecordValue: Configuration + ): Boolean { + require(dbRecordValue.version == kafkaRecordValue.version) + require(dbRecordValue.schemaVersion.majorVersion == kafkaRecordValue.schemaVersion.majorVersion) + val schemaMajorVersion = dbRecordValue.schemaVersion.majorVersion + require(dbRecordValue.schemaVersion.minorVersion == kafkaRecordValue.schemaVersion.minorVersion) + val schemaMinorVersion = dbRecordValue.schemaVersion.minorVersion + + val dbConfigValueWithDefaults = + smartConfigFactory.create(ConfigFactory.parseString(dbRecordValue.value)).run { + validator.validate( + recordKey, + Version(schemaMajorVersion, schemaMinorVersion), + this, + applyDefaults = true + ) + } + + val kafkaConfigValue = + smartConfigFactory.create(ConfigFactory.parseString(kafkaRecordValue.value)).run { + validator.validate( + recordKey, + Version(schemaMajorVersion, schemaMinorVersion), + this, + applyDefaults = false, + ) + } + + + val configsAreNotEqual = dbConfigValueWithDefaults != kafkaConfigValue + if (configsAreNotEqual) { + logger.info( + "Configuration for key $recordKey is misaligned on Kafka after applying defaults (Kafka will be updated).\n" + + "DB config value: ${ + dbConfigValueWithDefaults.toSafeConfig().root() + .render(ConfigRenderOptions.concise().setFormatted(true)) + }\n" + + "Kafka config value: ${ + kafkaConfigValue.toSafeConfig().root() + .render(ConfigRenderOptions.concise().setFormatted(true)) + }" + ) + } + + return configsAreNotEqual + } + override val isRunning get() = coordinator.isRunning override fun start() = coordinator.start() diff --git a/components/crypto/crypto-service-impl/src/test/kotlin/net/corda/crypto/service/impl/bus/CryptoOpsBusProcessorTests.kt b/components/crypto/crypto-service-impl/src/test/kotlin/net/corda/crypto/service/impl/bus/CryptoOpsBusProcessorTests.kt index 45adf54e903..797c3ccc6c1 100644 --- a/components/crypto/crypto-service-impl/src/test/kotlin/net/corda/crypto/service/impl/bus/CryptoOpsBusProcessorTests.kt +++ b/components/crypto/crypto-service-impl/src/test/kotlin/net/corda/crypto/service/impl/bus/CryptoOpsBusProcessorTests.kt @@ -165,9 +165,9 @@ class CryptoOpsBusProcessorTests { assertEquals(expected.requestId, actual.requestId) assertEquals(expected.requestingComponent, actual.requestingComponent) assertEquals(expected.requestTimestamp, actual.requestTimestamp) - assertThat(actual.responseTimestamp.epochSecond) - .isGreaterThanOrEqualTo(expected.requestTimestamp.epochSecond) - .isLessThanOrEqualTo(now.epochSecond) + assertThat(actual.responseTimestamp.toEpochMilli()) + .isGreaterThanOrEqualTo(expected.requestTimestamp.toEpochMilli()) + .isLessThanOrEqualTo(now.toEpochMilli()) assertTrue( actual.other.items.size == expected.other.items.size && actual.other.items.containsAll(expected.other.items) && diff --git a/components/crypto/crypto-service-impl/src/test/kotlin/net/corda/crypto/service/impl/bus/HSMRegistrationBusProcessorTests.kt b/components/crypto/crypto-service-impl/src/test/kotlin/net/corda/crypto/service/impl/bus/HSMRegistrationBusProcessorTests.kt index c5b5fe57618..715f7b6c2b7 100644 --- a/components/crypto/crypto-service-impl/src/test/kotlin/net/corda/crypto/service/impl/bus/HSMRegistrationBusProcessorTests.kt +++ b/components/crypto/crypto-service-impl/src/test/kotlin/net/corda/crypto/service/impl/bus/HSMRegistrationBusProcessorTests.kt @@ -72,9 +72,9 @@ class HSMRegistrationBusProcessorTests { assertEquals(expected.requestId, actual.requestId) assertEquals(expected.requestingComponent, actual.requestingComponent) assertEquals(expected.requestTimestamp, actual.requestTimestamp) - assertThat(actual.responseTimestamp.epochSecond) - .isGreaterThanOrEqualTo(expected.requestTimestamp.epochSecond) - .isLessThanOrEqualTo(now.epochSecond) + assertThat(actual.responseTimestamp.toEpochMilli()) + .isGreaterThanOrEqualTo(expected.requestTimestamp.toEpochMilli()) + .isLessThanOrEqualTo(now.toEpochMilli()) assertTrue( actual.other.items.size == expected.other.items.size && actual.other.items.containsAll(expected.other.items) && diff --git a/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/ExternalEventAcceptanceTest.kt b/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/ExternalEventAcceptanceTest.kt index 7b4e31f07fe..cf629c9b91d 100644 --- a/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/ExternalEventAcceptanceTest.kt +++ b/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/ExternalEventAcceptanceTest.kt @@ -3,17 +3,19 @@ package net.corda.flow.testing.tests import net.corda.data.KeyValuePairList import net.corda.data.flow.event.external.ExternalEventContext import net.corda.data.flow.event.external.ExternalEventResponseErrorType +import net.corda.data.flow.output.FlowStates import net.corda.data.persistence.EntityRequest import net.corda.data.persistence.EntityResponse import net.corda.data.persistence.FindEntities import net.corda.flow.external.events.factory.ExternalEventFactory import net.corda.flow.external.events.factory.ExternalEventRecord import net.corda.flow.fiber.FlowIORequest +import net.corda.flow.pipeline.exceptions.FlowProcessingExceptionTypes.FLOW_FAILED import net.corda.flow.state.FlowCheckpoint +import net.corda.flow.testing.context.ALICE_FLOW_KEY_MAPPER import net.corda.flow.testing.context.FlowServiceTestBase import net.corda.flow.testing.context.flowResumedWithError import net.corda.schema.configuration.FlowConfig -import net.corda.utilities.seconds import net.corda.v5.base.exceptions.CordaRuntimeException import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test @@ -400,6 +402,9 @@ class ExternalEventAcceptanceTest : FlowServiceTestBase() { markedForDlq() flowDidNotResume() flowFiberCacheDoesNotContainKey(ALICE_HOLDING_IDENTITY, REQUEST_ID1) + scheduleFlowMapperCleanupEvents(ALICE_FLOW_KEY_MAPPER) + nullStateRecord() + flowStatus(state = FlowStates.FAILED, errorType = FLOW_FAILED, errorMessage = "message") } } } diff --git a/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/FlowKilledAcceptanceTest.kt b/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/FlowKilledAcceptanceTest.kt index 84052973ff2..343443410df 100644 --- a/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/FlowKilledAcceptanceTest.kt +++ b/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/FlowKilledAcceptanceTest.kt @@ -3,6 +3,8 @@ package net.corda.flow.testing.tests import net.corda.data.flow.output.FlowStates import net.corda.flow.application.sessions.SessionInfo import net.corda.flow.fiber.FlowIORequest +import net.corda.flow.testing.context.ALICE_FLOW_KEY_MAPPER +import net.corda.flow.testing.context.BOB_FLOW_KEY_MAPPER import net.corda.flow.testing.context.FlowServiceTestBase import net.corda.virtualnode.OperationalStatus import org.junit.jupiter.api.BeforeEach @@ -41,8 +43,10 @@ class FlowKilledAcceptanceTest : FlowServiceTestBase() { then { expectOutputForFlow(FLOW_ID1) { + sessionErrorEvents() nullStateRecord() flowKilledStatus(flowTerminatedReason = "Flow operational status is INACTIVE") + scheduleFlowMapperCleanupEvents(BOB_FLOW_KEY_MAPPER) flowFiberCacheDoesNotContainKey(BOB_HOLDING_IDENTITY, REQUEST_ID1) } } @@ -94,8 +98,10 @@ class FlowKilledAcceptanceTest : FlowServiceTestBase() { then { expectOutputForFlow(FLOW_ID1) { + sessionErrorEvents(SESSION_ID_1) nullStateRecord() flowKilledStatus(flowTerminatedReason = "Flow operational status is INACTIVE") + scheduleFlowMapperCleanupEvents(SESSION_ID_1, ALICE_FLOW_KEY_MAPPER) flowFiberCacheDoesNotContainKey(ALICE_HOLDING_IDENTITY, REQUEST_ID1) } } diff --git a/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/SubFlowFailedAcceptanceTest.kt b/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/SubFlowFailedAcceptanceTest.kt index 6be827721a8..7da21140bcd 100644 --- a/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/SubFlowFailedAcceptanceTest.kt +++ b/components/flow/flow-service/src/integrationTest/kotlin/net/corda/flow/testing/tests/SubFlowFailedAcceptanceTest.kt @@ -1,7 +1,9 @@ package net.corda.flow.testing.tests +import net.corda.data.flow.output.FlowStates import net.corda.flow.application.sessions.SessionInfo import net.corda.flow.fiber.FlowIORequest +import net.corda.flow.pipeline.exceptions.FlowProcessingExceptionTypes.FLOW_FAILED import net.corda.flow.testing.context.ALICE_FLOW_KEY_MAPPER import net.corda.flow.testing.context.FlowServiceTestBase import net.corda.flow.testing.context.startFlow @@ -33,15 +35,19 @@ class SubFlowFailedAcceptanceTest : FlowServiceTestBase() { initiatingToInitiatedFlow(PROTOCOL, FAKE_FLOW_NAME, FAKE_FLOW_NAME) } } + @Test fun `Given a subFlow contains sessions when the subFlow fails, session error events are sent and session cleanup is scheduled`() { `when` { startFlow(this) - .suspendsWith(FlowIORequest.Send( - mapOf( - SessionInfo(SESSION_ID_1, initiatedIdentityMemberName) to DATA_MESSAGE_1, - SessionInfo(SESSION_ID_2, initiatedIdentityMemberName) to DATA_MESSAGE_2, - ))) + .suspendsWith( + FlowIORequest.Send( + mapOf( + SessionInfo(SESSION_ID_1, initiatedIdentityMemberName) to DATA_MESSAGE_1, + SessionInfo(SESSION_ID_2, initiatedIdentityMemberName) to DATA_MESSAGE_2, + ) + ) + ) .suspendsWith( FlowIORequest.SubFlowFailed( RuntimeException(), @@ -58,50 +64,56 @@ class SubFlowFailedAcceptanceTest : FlowServiceTestBase() { } } } -@Test -fun `Given a subFlow contains an initiated and closed session when the subFlow fails a single session error event is sent to the initiated session and session cleanup is scheduled`() { - `when` { - startFlow(this) - .suspendsWith(FlowIORequest.Send( - mapOf( - SessionInfo(SESSION_ID_1, initiatedIdentityMemberName) to DATA_MESSAGE_1, - SessionInfo(SESSION_ID_2, initiatedIdentityMemberName) to DATA_MESSAGE_2, - ))) - .suspendsWith(FlowIORequest.CloseSessions(setOf(SESSION_ID_1))) - - sessionCloseEventReceived(FLOW_ID1, SESSION_ID_1, 1, ALICE_HOLDING_IDENTITY, BOB_HOLDING_IDENTITY) - .suspendsWith( - FlowIORequest.SubFlowFailed( - RuntimeException(), - listOf(SESSION_ID_1, SESSION_ID_2) + + @Test + fun `Given a subFlow contains an initiated and closed session when the subFlow fails a single session error event is sent to the initiated session and session cleanup is scheduled`() { + `when` { + startFlow(this) + .suspendsWith( + FlowIORequest.Send( + mapOf( + SessionInfo(SESSION_ID_1, initiatedIdentityMemberName) to DATA_MESSAGE_1, + SessionInfo(SESSION_ID_2, initiatedIdentityMemberName) to DATA_MESSAGE_2, + ) + ) ) - ) - .completedWithError(CordaRuntimeException("error")) - } + .suspendsWith(FlowIORequest.CloseSessions(setOf(SESSION_ID_1))) - then { - expectOutputForFlow(FLOW_ID1) { - noOutputEvent() + sessionCloseEventReceived(FLOW_ID1, SESSION_ID_1, 1, ALICE_HOLDING_IDENTITY, BOB_HOLDING_IDENTITY) + .suspendsWith( + FlowIORequest.SubFlowFailed( + RuntimeException(), + listOf(SESSION_ID_1, SESSION_ID_2) + ) + ) + .completedWithError(CordaRuntimeException("error")) } - expectOutputForFlow(FLOW_ID1) { - sessionErrorEvents(SESSION_ID_2) - scheduleFlowMapperCleanupEvents(ALICE_FLOW_KEY_MAPPER, SESSION_ID_1, SESSION_ID_2) + then { + expectOutputForFlow(FLOW_ID1) { + noOutputEvent() + } + + expectOutputForFlow(FLOW_ID1) { + sessionErrorEvents(SESSION_ID_2) + scheduleFlowMapperCleanupEvents(ALICE_FLOW_KEY_MAPPER, SESSION_ID_1, SESSION_ID_2) + } } } -} - @Test fun `Given a subFlow contains only closed sessions when the subFlow fails no session error events are sent`() { `when` { startFlow(this) - .suspendsWith(FlowIORequest.Send( - mapOf( - SessionInfo(SESSION_ID_1, initiatedIdentityMemberName) to DATA_MESSAGE_1, - SessionInfo(SESSION_ID_2, initiatedIdentityMemberName) to DATA_MESSAGE_2, - ))) + .suspendsWith( + FlowIORequest.Send( + mapOf( + SessionInfo(SESSION_ID_1, initiatedIdentityMemberName) to DATA_MESSAGE_1, + SessionInfo(SESSION_ID_2, initiatedIdentityMemberName) to DATA_MESSAGE_2, + ) + ) + ) .suspendsWith(FlowIORequest.CloseSessions(setOf(SESSION_ID_1, SESSION_ID_2))) .suspendsWith( FlowIORequest.SubFlowFailed( @@ -119,13 +131,17 @@ fun `Given a subFlow contains an initiated and closed session when the subFlow f } @Test - fun `Given a subFlow contains only errored sessions when the subFlow fails a wakeup event is scheduled and no session error events are sent`() { + fun `Given a subFlow contains only errored sessions when the subFlow fails no session error events are sent`() { given { startFlow(this) - .suspendsWith(FlowIORequest.Receive(setOf( - SessionInfo(SESSION_ID_1, initiatedIdentityMemberName), - SessionInfo(SESSION_ID_2, initiatedIdentityMemberName), - ))) + .suspendsWith( + FlowIORequest.Receive( + setOf( + SessionInfo(SESSION_ID_1, initiatedIdentityMemberName), + SessionInfo(SESSION_ID_2, initiatedIdentityMemberName), + ) + ) + ) sessionErrorEventReceived(FLOW_ID1, SESSION_ID_1) } @@ -151,7 +167,7 @@ fun `Given a subFlow contains an initiated and closed session when the subFlow f fun `Given a subFlow contains no sessions when the subFlow fails and flow finishes, requestid is cleaned up and no session errors are sent`() { `when` { startFlow(this) - .suspendsWith(FlowIORequest.SubFlowFailed(RuntimeException(), emptyList())) + .suspendsWith(FlowIORequest.SubFlowFailed(RuntimeException(), emptyList())) .completedWithError(CordaRuntimeException("Error")) } @@ -217,15 +233,19 @@ fun `Given a subFlow contains an initiated and closed session when the subFlow f } @Test - fun `Given an initiated top level flow with an errored session when it finishes and calls SubFlowFailed a wakeup event is scheduled and no session error event is sent`() { + fun `Given an initiated top level flow with an errored session when it finishes and calls SubFlowFailed, schedules cleanup and does not send a session error event`() { given { membershipGroupFor(BOB_HOLDING_IDENTITY) initiatingToInitiatedFlow(PROTOCOL_2, FLOW_NAME, FLOW_NAME_2) sessionCounterpartyInfoRequestReceived(FLOW_ID1, INITIATED_SESSION_ID_1, CPI1, PROTOCOL_2) - .suspendsWith(FlowIORequest.Receive(setOf( - SessionInfo(INITIATED_SESSION_ID_1, initiatedIdentityMemberName), - ))) + .suspendsWith( + FlowIORequest.Receive( + setOf( + SessionInfo(INITIATED_SESSION_ID_1, initiatedIdentityMemberName), + ) + ) + ) } `when` { @@ -246,4 +266,34 @@ fun `Given a subFlow contains an initiated and closed session when the subFlow f } } } + + @Test + fun `Given a subFlow contains sessions when the subFlow fails, and session is not found, FlowFatalException is thrown`() { + `when` { + startFlow(this) + .suspendsWith( + FlowIORequest.Send( + mapOf( + SessionInfo(SESSION_ID_1, initiatedIdentityMemberName) to DATA_MESSAGE_1, + ) + ) + ) + .suspendsWith( + FlowIORequest.SubFlowFailed( + RuntimeException(), + listOf(SESSION_ID_1, "BrokenSession") + ) + ) + } + + then { + expectOutputForFlow(FLOW_ID1) { + sessionErrorEvents(SESSION_ID_1) + scheduleFlowMapperCleanupEvents(SESSION_ID_1, ALICE_FLOW_KEY_MAPPER) + nullStateRecord() + flowStatus(state = FlowStates.FAILED, errorType = FLOW_FAILED, errorMessage = "Session: BrokenSession does not exist when executing session operation that requires an existing session") + } + } + } + } \ No newline at end of file diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/CheckpointCleanupHandlerImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/CheckpointCleanupHandlerImpl.kt index 240e4ff84b3..4a175703d4f 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/CheckpointCleanupHandlerImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/CheckpointCleanupHandlerImpl.kt @@ -37,7 +37,7 @@ class CheckpointCleanupHandlerImpl @Activate constructor( val records = errorActiveSessions(checkpoint, config, exception, time) + cleanupSessions(checkpoint, config, time) + generateStatus(checkpoint, exception) + - cleanupInitiatingFlow(checkpoint, config, time) + cleanupRpcFlowMapperState(checkpoint, config, time) checkpoint.markDeleted() return records } @@ -100,7 +100,7 @@ class CheckpointCleanupHandlerImpl @Activate constructor( } } - private fun cleanupInitiatingFlow( + private fun cleanupRpcFlowMapperState( checkpoint: FlowCheckpoint, config: SmartConfig, currentTime: Instant diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImpl.kt index 590045312b3..d2b28b4c091 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImpl.kt @@ -1,21 +1,17 @@ package net.corda.flow.pipeline.impl import net.corda.data.flow.event.StartFlow -import net.corda.data.flow.event.mapper.FlowMapperEvent -import net.corda.data.flow.event.mapper.ScheduleCleanup import net.corda.data.flow.output.FlowStates import net.corda.data.flow.output.FlowStatus -import net.corda.data.flow.state.session.SessionState -import net.corda.data.flow.state.session.SessionStateType import net.corda.data.flow.state.waiting.WaitingFor import net.corda.flow.fiber.cache.FlowFiberCache +import net.corda.flow.maintenance.CheckpointCleanupHandler import net.corda.flow.pipeline.FlowEventExceptionProcessor import net.corda.flow.pipeline.events.FlowEventContext import net.corda.flow.pipeline.exceptions.FlowEventException import net.corda.flow.pipeline.exceptions.FlowFatalException import net.corda.flow.pipeline.exceptions.FlowMarkedForKillException import net.corda.flow.pipeline.exceptions.FlowPlatformException -import net.corda.flow.pipeline.exceptions.FlowProcessingExceptionTypes.FLOW_FAILED import net.corda.flow.pipeline.exceptions.FlowProcessingExceptionTypes.PLATFORM_ERROR import net.corda.flow.pipeline.exceptions.FlowTransientException import net.corda.flow.pipeline.factory.FlowMessageFactory @@ -25,7 +21,6 @@ import net.corda.flow.state.FlowCheckpoint import net.corda.libs.configuration.SmartConfig import net.corda.libs.configuration.getLongOrDefault import net.corda.messaging.api.records.Record -import net.corda.schema.configuration.FlowConfig import net.corda.schema.configuration.FlowConfig.PROCESSING_MAX_RETRY_WINDOW_DURATION import org.osgi.service.component.annotations.Activate import org.osgi.service.component.annotations.Component @@ -45,6 +40,8 @@ class FlowEventExceptionProcessorImpl @Activate constructor( private val flowSessionManager: FlowSessionManager, @Reference(service = FlowFiberCache::class) private val flowFiberCache: FlowFiberCache, + @Reference(service = CheckpointCleanupHandler::class) + private val checkpointCleanupHandler: CheckpointCleanupHandler ) : FlowEventExceptionProcessor { private companion object { @@ -125,8 +122,6 @@ class FlowEventExceptionProcessorImpl @Activate constructor( exception: FlowFatalException, context: FlowEventContext<*> ): FlowEventContext<*> = withEscalation(context) { - - val exceptionHandlingStartTime = Instant.now() val checkpoint = context.checkpoint val msg = if (!checkpoint.doesExist) { @@ -138,36 +133,11 @@ class FlowEventExceptionProcessorImpl @Activate constructor( } log.warn(msg, exception) - val activeSessionIds = getActiveSessionIds(checkpoint) - - if(activeSessionIds.isNotEmpty()) { - checkpoint.putSessionStates( - flowSessionManager.sendErrorMessages( - context.checkpoint, activeSessionIds, exception, exceptionHandlingStartTime - ) - ) - } - - val errorEvents = - flowSessionManager.getSessionErrorEventRecords(checkpoint, context.flowConfig, exceptionHandlingStartTime) - val cleanupEvents = createCleanupEventsForSessions( - getScheduledCleanupExpiryTime(context, exceptionHandlingStartTime), - checkpoint.sessions.filterNot { it.hasScheduledCleanup } - ) - removeCachedFlowFiber(checkpoint) - checkpoint.markDeleted() - - val records = createStatusRecord(checkpoint.flowId) { - flowMessageFactory.createFlowFailedStatusMessage( - checkpoint, - FLOW_FAILED, - exception.message - ) - } + val cleanupRecords = checkpointCleanupHandler.cleanupCheckpoint(checkpoint, context.flowConfig, exception) context.copy( - outputRecords = records + errorEvents + cleanupEvents, + outputRecords = cleanupRecords, sendToDlq = true ) } @@ -221,7 +191,6 @@ class FlowEventExceptionProcessorImpl @Activate constructor( context: FlowEventContext<*> ): FlowEventContext<*> { return withEscalation(context) { - val exceptionHandlingStartTime = Instant.now() val checkpoint = context.checkpoint if (!checkpoint.doesExist) { @@ -237,42 +206,16 @@ class FlowEventExceptionProcessorImpl @Activate constructor( ) } - val activeSessionIds = getActiveSessionIds(checkpoint) - - if (activeSessionIds.isNotEmpty()) { - checkpoint.putSessionStates( - flowSessionManager.sendErrorMessages( - context.checkpoint, activeSessionIds, exception, exceptionHandlingStartTime - ) - ) - } - val errorEvents = flowSessionManager.getSessionErrorEventRecords( - context.checkpoint, - context.flowConfig, - exceptionHandlingStartTime - ) - val cleanupEvents = createCleanupEventsForSessions( - getScheduledCleanupExpiryTime(context, exceptionHandlingStartTime), - checkpoint.sessions.filterNot { it.hasScheduledCleanup } - ) - val statusRecord = - createFlowKilledStatusRecord(checkpoint, exception.message ?: "No exception message provided.") - removeCachedFlowFiber(checkpoint) - - checkpoint.markDeleted() + val cleanupRecords = checkpointCleanupHandler.cleanupCheckpoint(checkpoint, context.flowConfig, exception) context.copy( - outputRecords = errorEvents + cleanupEvents + statusRecord, + outputRecords = cleanupRecords, sendToDlq = false // killed flows do not go to DLQ ) } } - private fun getActiveSessionIds(checkpoint: FlowCheckpoint) = checkpoint.sessions.filterNot { sessionState -> - sessionState.status == SessionStateType.CLOSED || sessionState.status == SessionStateType.ERROR - }.map { it.sessionId } - private fun withEscalation(context: FlowEventContext<*>, handler: () -> FlowEventContext<*>): FlowEventContext<*> { return try { handler() @@ -282,20 +225,6 @@ class FlowEventExceptionProcessorImpl @Activate constructor( } } - private fun createCleanupEventsForSessions( - expiryTime: Long, - sessionsToCleanup: List - ): List> { - return sessionsToCleanup - .onEach { it.hasScheduledCleanup = true } - .map { - flowRecordFactory.createFlowMapperEventRecord( - it.sessionId, - ScheduleCleanup(expiryTime) - ) - } - } - private fun createFlowKilledStatusRecord(checkpoint: FlowCheckpoint, message: String?): List> { return createStatusRecord(checkpoint.flowId) { flowMessageFactory.createFlowKilledStatusMessage(checkpoint, message) @@ -326,11 +255,6 @@ class FlowEventExceptionProcessorImpl @Activate constructor( } } - private fun getScheduledCleanupExpiryTime(context: FlowEventContext<*>, now: Instant): Long { - val flowCleanupTime = context.flowConfig.getLong(FlowConfig.SESSION_FLOW_CLEANUP_TIME) - return now.plusMillis(flowCleanupTime).toEpochMilli() - } - /** * Remove cached flow fiber for this checkpoint, if it exists. */ diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImplTest.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImplTest.kt index 373db46dc0c..0653760d8c7 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImplTest.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowEventExceptionProcessorImplTest.kt @@ -3,6 +3,7 @@ package net.corda.flow.pipeline.impl import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigValueFactory import net.corda.data.flow.FlowKey +import net.corda.data.flow.FlowStartContext import net.corda.data.flow.event.FlowEvent import net.corda.data.flow.event.external.ExternalEventResponse import net.corda.data.flow.event.mapper.FlowMapperEvent @@ -12,6 +13,7 @@ import net.corda.data.flow.state.session.SessionState import net.corda.data.flow.state.session.SessionStateType import net.corda.data.flow.state.waiting.WaitingFor import net.corda.flow.fiber.cache.FlowFiberCache +import net.corda.flow.maintenance.CheckpointCleanupHandler import net.corda.flow.pipeline.converters.FlowEventContextConverter import net.corda.flow.pipeline.events.FlowEventContext import net.corda.flow.pipeline.exceptions.FlowEventException @@ -33,7 +35,7 @@ import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.mockito.kotlin.any -import org.mockito.kotlin.anyOrNull +import org.mockito.kotlin.eq import org.mockito.kotlin.mock import org.mockito.kotlin.times import org.mockito.kotlin.verify @@ -60,6 +62,8 @@ class FlowEventExceptionProcessorImplTest { private val flowFiberCache = mock() private val serializedFiber = ByteBuffer.wrap("mock fiber".toByteArray()) + private val checkpointCleanupHandler = mock() + private val sessionIdOpen = "sesh-id" private val sessionIdClosed = "sesh-id-closed" private val flowActiveSessionState = SessionState().apply { @@ -74,7 +78,8 @@ class FlowEventExceptionProcessorImplTest { flowMessageFactory, flowRecordFactory, flowSessionManager, - flowFiberCache + flowFiberCache, + checkpointCleanupHandler ) @BeforeEach @@ -159,40 +164,30 @@ class FlowEventExceptionProcessorImplTest { ).thenReturn(flowStatusUpdate) whenever(flowRecordFactory.createFlowStatusRecord(flowStatusUpdate)).thenReturn(flowStatusUpdateRecord) - val result = target.process(error, context) + target.process(error, context) - verify(result.checkpoint).markDeleted() - assertThat(result.outputRecords).containsOnly(flowStatusUpdateRecord) - assertThat(result.sendToDlq).isTrue + verify(checkpointCleanupHandler).cleanupCheckpoint(eq(flowCheckpoint), any(), any()) } @Test fun `flow fatal exception marks flow for dlq and publishes status update`() { + val flowId = "f1" val error = FlowFatalException("error") - val flowStatusUpdate = FlowStatus() val key = FlowKey() - val flowStatusUpdateRecord = Record("", key, flowStatusUpdate) val flowMapperEvent = mock() val flowMapperRecord = Record(Schemas.Flow.FLOW_MAPPER_SESSION_OUT, "key", flowMapperEvent) - - whenever( - flowMessageFactory.createFlowFailedStatusMessage( - flowCheckpoint, - FlowProcessingExceptionTypes.FLOW_FAILED, - error.message - ) - ).thenReturn(flowStatusUpdate) - whenever(flowRecordFactory.createFlowStatusRecord(flowStatusUpdate)).thenReturn(flowStatusUpdateRecord) whenever(flowCheckpoint.doesExist).thenReturn(true) + whenever(flowCheckpoint.flowId).thenReturn(flowId) + val startContext = mock() + whenever(flowCheckpoint.flowStartContext).thenReturn(startContext) whenever(flowCheckpoint.flowKey).thenReturn(key) - whenever(flowCheckpoint.sessions).thenReturn(listOf(flowActiveSessionState, flowInactiveSessionState)) - whenever(flowCheckpoint.suspendCount).thenReturn(123) - whenever(flowRecordFactory.createFlowMapperEventRecord(any(), any())).thenReturn(flowMapperRecord) + val cleanupRecords = listOf (flowMapperRecord) + whenever(checkpointCleanupHandler.cleanupCheckpoint(any(), any(), any())).thenReturn(cleanupRecords) val result = target.process(error, context) - verify(result.checkpoint).markDeleted() - assertThat(result.outputRecords).contains(flowStatusUpdateRecord, flowMapperRecord) + verify(checkpointCleanupHandler).cleanupCheckpoint(eq(flowCheckpoint), any(), eq(error)) + assertThat(result.outputRecords).containsOnly(flowMapperRecord) assertThat(result.sendToDlq).isTrue verify(flowFiberCache).remove(key) } @@ -248,25 +243,6 @@ class FlowEventExceptionProcessorImplTest { assertThat(result.outputRecords).containsOnly(flowEventRecord) } - @Test - fun `failure to create a status message does not prevent fatal failure handling from succeeding`() { - val error = FlowFatalException("error") - - whenever( - flowMessageFactory.createFlowFailedStatusMessage( - any(), - any(), - any() - ) - ).thenThrow(IllegalStateException()) - - val result = target.process(error, context) - - verify(result.checkpoint).markDeleted() - assertThat(result.outputRecords).isEmpty() - assertThat(result.sendToDlq).isTrue - } - @Test fun `throwable triggered during transient exception processing does not escape the processor`() { val throwable = RuntimeException() @@ -278,18 +254,6 @@ class FlowEventExceptionProcessorImplTest { assertEmptyDLQdResult(transientResult) } - @Test - fun `throwable triggered during fatal exception processing does not escape the processor`() { - val throwable = RuntimeException() - val fatalError = FlowFatalException("error") - whenever( - flowSessionManager.getSessionErrorEventRecords(anyOrNull(),anyOrNull(),anyOrNull())) - .thenThrow(throwable) - - val fatalResult = target.process(fatalError, context) - assertEmptyDLQdResult(fatalResult) - } - @Test fun `throwable triggered during platform exception processing does not escape the processor`() { val throwable = RuntimeException() @@ -313,19 +277,8 @@ class FlowEventExceptionProcessorImplTest { fun `flow fatal exception with false doesExist confirms flow checkpoint not called`() { val flowCheckpoint = mock() whenever(flowCheckpoint.doesExist).thenReturn(false) - val error = FlowFatalException("error") - val flowStatusUpdate = FlowStatus() - val flowStatusUpdateRecord = Record("", FlowKey(), flowStatusUpdate) - whenever( - flowMessageFactory.createFlowFailedStatusMessage( - flowCheckpoint, - FlowProcessingExceptionTypes.FLOW_FAILED, - error.message - ) - ).thenReturn(flowStatusUpdate) - whenever(flowRecordFactory.createFlowStatusRecord(flowStatusUpdate)).thenReturn(flowStatusUpdateRecord) target.process(error, context) verify(flowCheckpoint, times(0)).flowStartContext @@ -338,17 +291,7 @@ class FlowEventExceptionProcessorImplTest { val context = buildFlowEventContext(checkpoint = flowCheckpoint, inputEventPayload = inputEvent) val error = FlowFatalException("error") - val flowStatusUpdate = FlowStatus() - val flowStatusUpdateRecord = Record("", FlowKey(), flowStatusUpdate) - whenever( - flowMessageFactory.createFlowFailedStatusMessage( - flowCheckpoint, - FlowProcessingExceptionTypes.FLOW_FAILED, - error.message - ) - ).thenReturn(flowStatusUpdate) - whenever(flowRecordFactory.createFlowStatusRecord(flowStatusUpdate)).thenReturn(flowStatusUpdateRecord) target.process(error, context) verify(flowCheckpoint, times(1)).flowStartContext diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowToBeKilledExceptionProcessingTest.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowToBeKilledExceptionProcessingTest.kt index f7a08ca52a5..54d67ce4290 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowToBeKilledExceptionProcessingTest.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowToBeKilledExceptionProcessingTest.kt @@ -6,16 +6,11 @@ import net.corda.data.flow.FlowKey import net.corda.data.flow.FlowStartContext import net.corda.data.flow.event.StartFlow import net.corda.data.flow.event.mapper.FlowMapperEvent -import net.corda.data.flow.event.mapper.ScheduleCleanup import net.corda.data.flow.output.FlowStates import net.corda.data.flow.output.FlowStatus -import net.corda.data.flow.state.checkpoint.Checkpoint -import net.corda.data.flow.state.external.ExternalEventState -import net.corda.data.flow.state.session.SessionState -import net.corda.data.flow.state.session.SessionStateType import net.corda.data.identity.HoldingIdentity import net.corda.flow.fiber.cache.FlowFiberCache -import net.corda.flow.pipeline.converters.FlowEventContextConverter +import net.corda.flow.maintenance.CheckpointCleanupHandler import net.corda.flow.pipeline.exceptions.FlowMarkedForKillException import net.corda.flow.pipeline.factory.FlowMessageFactory import net.corda.flow.pipeline.factory.FlowRecordFactory @@ -23,67 +18,28 @@ import net.corda.flow.pipeline.sessions.FlowSessionManager import net.corda.flow.state.FlowCheckpoint import net.corda.flow.test.utils.buildFlowEventContext import net.corda.libs.configuration.SmartConfigFactory -import net.corda.messaging.api.processor.StateAndEventProcessor import net.corda.messaging.api.records.Record +import net.corda.schema.Schemas import net.corda.schema.configuration.FlowConfig import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.mockito.kotlin.any -import org.mockito.kotlin.eq import org.mockito.kotlin.mock -import org.mockito.kotlin.times -import org.mockito.kotlin.verify import org.mockito.kotlin.whenever class FlowToBeKilledExceptionProcessingTest { - private companion object { - const val SESSION_ID_1 = "s1" - const val SESSION_ID_2 = "s2" - const val SESSION_ID_3 = "s3" - const val SESSION_ID_4 = "s4" - const val SESSION_ID_5 = "s5" const val FLOW_ID = "flowId" } private val flowRecordFactory = mock() private val flowMessageFactory = mock() - private val flowEventContextConverter = mock() private val flowSessionManager = mock() private val flowConfig = ConfigFactory.empty().withValue( FlowConfig.PROCESSING_MAX_RETRY_WINDOW_DURATION, ConfigValueFactory.fromAnyRef(20000L) ) private val smartFlowConfig = SmartConfigFactory.createWithoutSecurityServices().create(flowConfig) - - // starting session states - private val sessionState1 = createSessionState(SESSION_ID_1, true, SessionStateType.CLOSING) - private val sessionState2 = createSessionState(SESSION_ID_2, true, SessionStateType.ERROR) - private val sessionState3 = createSessionState(SESSION_ID_3, true, SessionStateType.CLOSED) - private val sessionState4 = createSessionState(SESSION_ID_4, false, SessionStateType.CONFIRMED) - private val sessionState5 = createSessionState(SESSION_ID_5, false, SessionStateType.CREATED) - - // session states which have had their state updated to ERROR - private val erroredSessionState1 = createSessionState(SESSION_ID_1, false, SessionStateType.ERROR) - private val erroredSessionState4 = createSessionState(SESSION_ID_4, false, SessionStateType.ERROR) - private val erroredSessionState5 = createSessionState(SESSION_ID_5, false, SessionStateType.ERROR) - - private val allFlowSessions = listOf(sessionState1, sessionState2, sessionState3, sessionState4, sessionState5) - private val activeFlowSessionIds = listOf(SESSION_ID_1, SESSION_ID_4, SESSION_ID_5) - private val erroredFlowSessions = listOf(erroredSessionState1, erroredSessionState4, erroredSessionState5) - private val allFlowSessionsAfterErrorsSent = listOf( - erroredSessionState1, sessionState2, sessionState3, erroredSessionState4, erroredSessionState5 - ) - - private fun createSessionState(sessionId: String, hasScheduledCleanup: Boolean, status: SessionStateType): SessionState = - SessionState().apply { - this.sessionId = sessionId - this.hasScheduledCleanup = hasScheduledCleanup - this.status = status - } - - private val scheduleCleanupRecord4 = Record("t", SESSION_ID_4, FlowMapperEvent(ScheduleCleanup(1000))) - private val scheduleCleanupRecord5 = Record("t", SESSION_ID_5, FlowMapperEvent(ScheduleCleanup(1000))) private val flowKey = FlowKey("id", HoldingIdentity("x500", "grp1")) private val checkpoint = mock { whenever(it.flowKey).thenReturn(flowKey) @@ -95,14 +51,10 @@ class FlowToBeKilledExceptionProcessingTest { processingTerminatedReason = "reason" } private val flowKilledStatusRecord = Record("s", flowKey, flowKilledStatus) - private val mockResponse = mock>() private val flowFiberCache = mock() - + private val checkpointCleanupHandler = mock() private val target = FlowEventExceptionProcessorImpl( - flowMessageFactory, - flowRecordFactory, - flowSessionManager, - flowFiberCache + flowMessageFactory, flowRecordFactory, flowSessionManager, flowFiberCache, checkpointCleanupHandler ) @BeforeEach @@ -112,127 +64,49 @@ class FlowToBeKilledExceptionProcessingTest { } @Test - fun `processing MarkedForKillException sends error events to all sessions then schedules cleanup for any not yet scheduled`() { + fun `processing FlowMarkedForKillException calls checkpoint cleanup handler and copies cleanup records to context`() { val testContext = buildFlowEventContext(checkpoint, Any()) val exception = FlowMarkedForKillException("reasoning") - whenever(checkpoint.doesExist).thenReturn(true) - - // The first call returns all sessions. - whenever(checkpoint.sessions) - .thenReturn(allFlowSessions) - .thenReturn(allFlowSessionsAfterErrorsSent) - - // we send error messages for all active flow sessions, returning the flow sessions with state updated to ERROR - whenever(flowSessionManager.sendErrorMessages(eq(checkpoint), eq(activeFlowSessionIds), eq(exception), any())) - .thenReturn(erroredFlowSessions) - - // we clean up all flow sessions that have not already been cleaned up - whenever(flowRecordFactory.createFlowMapperEventRecord(eq(SESSION_ID_4), any())).thenReturn(scheduleCleanupRecord4) - whenever(flowRecordFactory.createFlowMapperEventRecord(eq(SESSION_ID_5), any())).thenReturn(scheduleCleanupRecord5) - - // callouts to factories to create flow killed status record - whenever(flowMessageFactory.createFlowKilledStatusMessage(any(), any())).thenReturn(flowKilledStatus) - whenever(flowRecordFactory.createFlowStatusRecord(flowKilledStatus)).thenReturn(flowKilledStatusRecord) + val flowMapperEvent = mock() + val cleanupRecord = Record(Schemas.Flow.FLOW_MAPPER_SESSION_OUT, "key", flowMapperEvent) + val cleanupRecords = listOf(cleanupRecord) + whenever(checkpointCleanupHandler.cleanupCheckpoint(any(), any(), any())).thenReturn(cleanupRecords) val response = target.process(exception, testContext) - verify(checkpoint, times(1)).putSessionStates(erroredFlowSessions) - verify(checkpoint, times(1)).markDeleted() - - assertThat(response.outputRecords) - .withFailMessage("Output records should contain cleanup records for sessions that aren't already scheduled") - .contains(scheduleCleanupRecord4, scheduleCleanupRecord5) - - assertThat(response.outputRecords) - .withFailMessage("Output records should contain the flow killed status record") - .contains(flowKilledStatusRecord) - - val updatedFlowSessions = response.checkpoint.sessions.associateBy { it.sessionId } - assertThat(updatedFlowSessions.values.all { it.hasScheduledCleanup }) - .withFailMessage("All flow sessions should now be marked as scheduled for cleanup") - .isTrue - assertThat(updatedFlowSessions[SESSION_ID_1]?.status).isEqualTo(SessionStateType.ERROR) - assertThat(updatedFlowSessions[SESSION_ID_2]?.status).isEqualTo(SessionStateType.ERROR) - assertThat(updatedFlowSessions[SESSION_ID_3]?.status).isEqualTo(SessionStateType.CLOSED) - assertThat(updatedFlowSessions[SESSION_ID_4]?.status).isEqualTo(SessionStateType.ERROR) - assertThat(updatedFlowSessions[SESSION_ID_5]?.status).isEqualTo(SessionStateType.ERROR) + assertThat(response.outputRecords).contains(cleanupRecord) } @Test - fun `processing MarkedForKillException removes retries and external events from output records`() { - whenever(checkpoint.doesExist).thenReturn(true) - - val externalEventState = ExternalEventState() - whenever(checkpoint.externalEventState).thenReturn(externalEventState) - whenever(checkpoint.inRetryState).thenReturn(true) - - val testContext = buildFlowEventContext(checkpoint, Any()) - val exception = FlowMarkedForKillException("reasoning") - - whenever(checkpoint.sessions) - .thenReturn(emptyList()) - .thenReturn(emptyList()) - - // callouts to factories to create flow killed status record - whenever(flowMessageFactory.createFlowKilledStatusMessage(any(), any())).thenReturn(flowKilledStatus) - whenever(flowRecordFactory.createFlowStatusRecord(flowKilledStatus)).thenReturn(flowKilledStatusRecord) - - val response = target.process(exception, testContext) - - verify(checkpoint, times(1)).markDeleted() - - assertThat(response.outputRecords) - .withFailMessage("Output records should only have flow status record") - .hasSize(1) - - assertThat(response.outputRecords) - .withFailMessage("Output records should contain the flow killed status record") - .contains(flowKilledStatusRecord) - } - - @Test - fun `processing MarkedForKillException when checkpoint does not exist only outputs flow killed status record`() { + fun `processing FlowMarkedForKillException when checkpoint does not exist only outputs flow killed status record`() { whenever(checkpoint.doesExist).thenReturn(false) - - val inputEventPayload = StartFlow(FlowStartContext().apply {statusKey = flowKey}, "") - + val inputEventPayload = StartFlow(FlowStartContext().apply { statusKey = flowKey }, "") val testContext = buildFlowEventContext(checkpoint, inputEventPayload) val exception = FlowMarkedForKillException("reasoning") - whenever(flowRecordFactory.createFlowStatusRecord(any())).thenReturn(flowKilledStatusRecord) val response = target.process(exception, testContext) - assertThat(response.outputRecords) - .withFailMessage("Output records should only have flow status record") - .hasSize(1) - - assertThat(response.outputRecords) - .withFailMessage("Output records should contain the flow killed status record") - .contains(flowKilledStatusRecord) + assertThat(response.outputRecords).hasSize(1).contains(flowKilledStatusRecord) } @Test - fun `error processing MarkedForKillException falls back to null state record, empty response events and marked for DLQ`() { + fun `error processing FlowMarkedForKillException falls back to null state record, empty response events and marked for DLQ`() { val testContext = buildFlowEventContext(checkpoint, Any()) val exception = FlowMarkedForKillException("reasoning") - - whenever(checkpoint.doesExist).thenReturn(true) - - // The first call returns all sessions. - whenever(checkpoint.sessions) - .thenReturn(allFlowSessions) - .thenReturn(allFlowSessionsAfterErrorsSent) - - // simulating exception thrown during processing of the flow session - whenever(flowSessionManager.sendErrorMessages(eq(checkpoint), eq(activeFlowSessionIds), eq(exception), any())) - .thenThrow(IllegalArgumentException("some error message while sending errors to peers")) + // simulating exception thrown during processing + whenever( + checkpointCleanupHandler.cleanupCheckpoint( + any(), + any(), + any() + ) + ).thenThrow(IllegalArgumentException("some error message while sending errors to peers")) val response = target.process(exception, testContext) - verify(response.checkpoint).markDeleted() assertThat(response.outputRecords).isEmpty() assertThat(response.sendToDlq).isTrue } diff --git a/components/ledger/ledger-persistence/src/main/kotlin/net/corda/ledger/persistence/query/registration/impl/VaultNamedQueryFactoryProvider.kt b/components/ledger/ledger-persistence/src/main/kotlin/net/corda/ledger/persistence/query/registration/impl/VaultNamedQueryFactoryProvider.kt index b66e5e5b056..5b289ed5e95 100644 --- a/components/ledger/ledger-persistence/src/main/kotlin/net/corda/ledger/persistence/query/registration/impl/VaultNamedQueryFactoryProvider.kt +++ b/components/ledger/ledger-persistence/src/main/kotlin/net/corda/ledger/persistence/query/registration/impl/VaultNamedQueryFactoryProvider.kt @@ -48,7 +48,7 @@ class VaultNamedQueryFactoryProvider @Activate constructor( private fun registerPlatformQueries(vaultNamedQueryBuilderFactory: VaultNamedQueryBuilderFactory) { vaultNamedQueryBuilderFactory .create(FIND_UNCONSUMED_STATES_BY_EXACT_TYPE) - .whereJson("WHERE visible_states.type = :type") + .whereJson("WHERE visible_states.type = :type AND visible_states.consumed IS NULL") .register() } } diff --git a/components/membership/membership-service-impl/src/test/kotlin/net/corda/membership/service/impl/MemberOpsServiceProcessorTest.kt b/components/membership/membership-service-impl/src/test/kotlin/net/corda/membership/service/impl/MemberOpsServiceProcessorTest.kt index 578e70c50f9..1fc4afffcc8 100644 --- a/components/membership/membership-service-impl/src/test/kotlin/net/corda/membership/service/impl/MemberOpsServiceProcessorTest.kt +++ b/components/membership/membership-service-impl/src/test/kotlin/net/corda/membership/service/impl/MemberOpsServiceProcessorTest.kt @@ -141,8 +141,8 @@ class MemberOpsServiceProcessorTest { private fun assertResponseContext(expected: MembershipRpcRequestContext, actual: MembershipRpcResponseContext) { assertEquals(expected.requestId, actual.requestId) assertEquals(expected.requestTimestamp, actual.requestTimestamp) - assertThat(actual.responseTimestamp.epochSecond).isGreaterThanOrEqualTo(expected.requestTimestamp.epochSecond) - assertThat(actual.responseTimestamp.epochSecond).isLessThanOrEqualTo(now.epochSecond) + assertThat(actual.responseTimestamp.toEpochMilli()).isGreaterThanOrEqualTo(expected.requestTimestamp.toEpochMilli()) + assertThat(actual.responseTimestamp.toEpochMilli()).isLessThanOrEqualTo(now.toEpochMilli()) } diff --git a/components/membership/registration-impl/src/main/kotlin/net/corda/membership/impl/registration/dynamic/handler/member/PersistMemberRegistrationStateHandler.kt b/components/membership/registration-impl/src/main/kotlin/net/corda/membership/impl/registration/dynamic/handler/member/PersistMemberRegistrationStateHandler.kt index c6d92b889ff..4178f8dc91e 100644 --- a/components/membership/registration-impl/src/main/kotlin/net/corda/membership/impl/registration/dynamic/handler/member/PersistMemberRegistrationStateHandler.kt +++ b/components/membership/registration-impl/src/main/kotlin/net/corda/membership/impl/registration/dynamic/handler/member/PersistMemberRegistrationStateHandler.kt @@ -2,6 +2,10 @@ package net.corda.membership.impl.registration.dynamic.handler.member import net.corda.data.identity.HoldingIdentity import net.corda.data.membership.command.registration.member.PersistMemberRegistrationState +import net.corda.data.membership.common.RegistrationStatus as RegistrationStatusV1 +import net.corda.data.membership.common.v2.RegistrationStatus as RegistrationStatusV2 +import net.corda.data.membership.p2p.SetOwnRegistrationStatus as SetOwnRegistrationStatusV1 +import net.corda.data.membership.p2p.v2.SetOwnRegistrationStatus as SetOwnRegistrationStatusV2 import net.corda.data.membership.state.RegistrationState import net.corda.membership.impl.registration.dynamic.handler.RegistrationHandler import net.corda.membership.impl.registration.dynamic.handler.RegistrationHandlerResult @@ -17,11 +21,12 @@ internal class PersistMemberRegistrationStateHandler( command: PersistMemberRegistrationState, ): RegistrationHandlerResult { val member = command.member.toCorda() + val request = command.request() val commands = membershipPersistenceClient.setRegistrationRequestStatus( member, - command.setStatusRequest.registrationId, - command.setStatusRequest.newStatus, - command.setStatusRequest.reason + request.registrationId, + request.newStatus, + request.reason ).createAsyncCommands() return RegistrationHandlerResult( null, @@ -35,4 +40,34 @@ internal class PersistMemberRegistrationStateHandler( state: RegistrationState?, command: PersistMemberRegistrationState ): HoldingIdentity = command.member + + private fun PersistMemberRegistrationState.request(): SetOwnRegistrationStatusV2 { + val request = this.setStatusRequest + return when (request) { + is SetOwnRegistrationStatusV2 -> request + is SetOwnRegistrationStatusV1 -> SetOwnRegistrationStatusV2( + request.registrationId, + request.newStatus.toV2(), + null + ) + else -> throw IllegalArgumentException("Unknown request status '${request.javaClass}' received.") + } + } + + private fun RegistrationStatusV1.toV2(): RegistrationStatusV2 { + return when(this) { + RegistrationStatusV1.NEW -> RegistrationStatusV2.NEW + RegistrationStatusV1.SENT_TO_MGM -> RegistrationStatusV2.SENT_TO_MGM + RegistrationStatusV1.RECEIVED_BY_MGM -> RegistrationStatusV2.RECEIVED_BY_MGM + RegistrationStatusV1.PENDING_MEMBER_VERIFICATION -> RegistrationStatusV2.PENDING_MEMBER_VERIFICATION + RegistrationStatusV1.PENDING_MANUAL_APPROVAL -> RegistrationStatusV2.PENDING_MANUAL_APPROVAL + RegistrationStatusV1.PENDING_AUTO_APPROVAL -> RegistrationStatusV2.PENDING_AUTO_APPROVAL + RegistrationStatusV1.APPROVED -> RegistrationStatusV2.APPROVED + RegistrationStatusV1.DECLINED -> RegistrationStatusV2.DECLINED + RegistrationStatusV1.INVALID -> RegistrationStatusV2.INVALID + RegistrationStatusV1.FAILED -> RegistrationStatusV2.FAILED + else -> throw IllegalArgumentException("Unknown status '${this.name}' received.") + } + + } } diff --git a/components/membership/registration-impl/src/test/kotlin/net/corda/membership/impl/registration/dynamic/handler/member/PersistMemberRegistrationStateHandlerTest.kt b/components/membership/registration-impl/src/test/kotlin/net/corda/membership/impl/registration/dynamic/handler/member/PersistMemberRegistrationStateHandlerTest.kt index f7acb9236d3..8386019d3ad 100644 --- a/components/membership/registration-impl/src/test/kotlin/net/corda/membership/impl/registration/dynamic/handler/member/PersistMemberRegistrationStateHandlerTest.kt +++ b/components/membership/registration-impl/src/test/kotlin/net/corda/membership/impl/registration/dynamic/handler/member/PersistMemberRegistrationStateHandlerTest.kt @@ -38,13 +38,14 @@ class PersistMemberRegistrationStateHandlerTest { } doReturn operation } private val reason = "some reason" - val command = PersistMemberRegistrationState( + private val request = SetOwnRegistrationStatus( + UUID(1,2).toString(), + RegistrationStatus.DECLINED, + reason + ) + private val command = PersistMemberRegistrationState( HoldingIdentity("O=Alice, L=London, C=GB", "GroupId"), - SetOwnRegistrationStatus( - UUID(1,2).toString(), - RegistrationStatus.DECLINED, - reason - ) + request ) private val handler = PersistMemberRegistrationStateHandler( @@ -66,8 +67,8 @@ class PersistMemberRegistrationStateHandlerTest { verify(membershipPersistenceClient).setRegistrationRequestStatus( command.member.toCorda(), - command.setStatusRequest.registrationId, - command.setStatusRequest.newStatus, + request.registrationId, + request.newStatus, reason ) } diff --git a/components/reconciliation/reconciliation-impl/src/main/kotlin/net/corda/reconciliation/impl/ReconcilerEventHandler.kt b/components/reconciliation/reconciliation-impl/src/main/kotlin/net/corda/reconciliation/impl/ReconcilerEventHandler.kt index 4fe7d46f77f..6ed76e03831 100644 --- a/components/reconciliation/reconciliation-impl/src/main/kotlin/net/corda/reconciliation/impl/ReconcilerEventHandler.kt +++ b/components/reconciliation/reconciliation-impl/src/main/kotlin/net/corda/reconciliation/impl/ReconcilerEventHandler.kt @@ -137,7 +137,15 @@ internal class ReconcilerEventHandler( // therefore through the defaulting config process which will add the property(ies) and subsequently // will publish them to Kafka. We only need to force the first reconciliation. if (forceInitialReconciliation && firstRun) { - dbRecord.version >= matchedKafkaRecord.version // reconcile all db records again (forced reconciliation) + dbRecord.version > matchedKafkaRecord.version + // reconcile all db records again (forced reconciliation) + || (dbRecord.version == matchedKafkaRecord.version + && writer.valuesMisalignedAfterDefaults( + dbRecord.key, + dbRecord.value, + matchedKafkaRecord.value + ) + ) } else { dbRecord.version > matchedKafkaRecord.version // reconcile db updated records } || dbRecord.isDeleted // reconcile db soft deleted records diff --git a/components/reconciliation/reconciliation-impl/src/test/kotlin/net/corda/reconciliation/impl/ReconcilerEventHandlerTest.kt b/components/reconciliation/reconciliation-impl/src/test/kotlin/net/corda/reconciliation/impl/ReconcilerEventHandlerTest.kt index cbf23559c4c..dc6d265df98 100644 --- a/components/reconciliation/reconciliation-impl/src/test/kotlin/net/corda/reconciliation/impl/ReconcilerEventHandlerTest.kt +++ b/components/reconciliation/reconciliation-impl/src/test/kotlin/net/corda/reconciliation/impl/ReconcilerEventHandlerTest.kt @@ -2,6 +2,7 @@ package net.corda.reconciliation.impl import net.corda.lifecycle.LifecycleCoordinator import net.corda.reconciliation.ReconcilerReader +import net.corda.reconciliation.ReconcilerWriter import net.corda.reconciliation.VersionedRecord import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions.assertEquals @@ -92,11 +93,15 @@ internal class ReconcilerEventHandlerTest { } } + val writer = mock>().also { + whenever(it.valuesMisalignedAfterDefaults(any(), any(), any())).thenReturn(true) + } + reconcilerEventHandler = ReconcilerEventHandler( dbReader, kafkaReader, - writer = mock(), + writer, keyClass = String::class.java, valueClass = Int::class.java, 10L, diff --git a/components/reconciliation/reconciliation/src/main/kotlin/net/corda/reconciliation/ReconcilerWriter.kt b/components/reconciliation/reconciliation/src/main/kotlin/net/corda/reconciliation/ReconcilerWriter.kt index 73da8a3a8b0..8d05c6180ab 100644 --- a/components/reconciliation/reconciliation/src/main/kotlin/net/corda/reconciliation/ReconcilerWriter.kt +++ b/components/reconciliation/reconciliation/src/main/kotlin/net/corda/reconciliation/ReconcilerWriter.kt @@ -18,4 +18,14 @@ interface ReconcilerWriter { fun remove(recordKey: K) val lifecycleCoordinatorName: LifecycleCoordinatorName + + /** + * Compare DB record value to its Kafka respective one to see if after applying defaults + * to the DB record, the DB record is different to the Kafka one. + */ + fun valuesMisalignedAfterDefaults( + recordKey: K, + dbRecordValue: V, + kafkaRecordValue: V + ): Boolean = false } \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index 5cf01b55410..2f2b96c64e1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -46,7 +46,7 @@ commonsTextVersion = 1.10.0 bouncycastleVersion=1.76 # Corda API libs revision (change in 4th digit indicates a breaking change) # Change to 5.1.0.xx-SNAPSHOT to pick up maven local published copy -cordaApiVersion=5.1.0.37-beta+ +cordaApiVersion=5.1.0.39-beta+ disruptorVersion=3.4.4 felixConfigAdminVersion=1.9.26 diff --git a/libs/membership/membership-impl/build.gradle b/libs/membership/membership-impl/build.gradle index 96285545f43..6965bd5bd8b 100644 --- a/libs/membership/membership-impl/build.gradle +++ b/libs/membership/membership-impl/build.gradle @@ -17,7 +17,9 @@ dependencies { implementation project(':libs:crypto:crypto-core') implementation project(":libs:membership:membership-common") implementation project(":libs:utilities") + implementation project(':libs:sandbox-types') implementation project(":libs:serialization:serialization-avro") + implementation project(':libs:serialization:serialization-internal') implementation "net.corda:corda-avro-schema" implementation "net.corda:corda-base" diff --git a/libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MGMContextSerializer.kt b/libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MGMContextSerializer.kt new file mode 100644 index 00000000000..b287f9aed0b --- /dev/null +++ b/libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MGMContextSerializer.kt @@ -0,0 +1,72 @@ +package net.corda.membership.lib.impl.serializer.amqp + +import net.corda.layeredpropertymap.LayeredPropertyMapFactory +import net.corda.membership.lib.impl.MGMContextImpl +import net.corda.sandbox.type.SandboxConstants +import net.corda.sandbox.type.UsedByFlow +import net.corda.sandbox.type.UsedByPersistence +import net.corda.sandbox.type.UsedByVerification +import net.corda.serialization.BaseProxySerializer +import net.corda.serialization.InternalCustomSerializer +import net.corda.v5.base.exceptions.CordaRuntimeException +import net.corda.v5.base.types.LayeredPropertyMap +import org.osgi.service.component.annotations.Activate +import org.osgi.service.component.annotations.Component +import org.osgi.service.component.annotations.Reference +import org.osgi.service.component.annotations.ServiceScope + +@Component( + service = [ InternalCustomSerializer::class, UsedByFlow::class, UsedByPersistence::class, UsedByVerification::class ], + property = [SandboxConstants.CORDA_UNINJECTABLE_SERVICE], + scope = ServiceScope.PROTOTYPE +) +class MGMContextSerializer @Activate constructor( + @Reference(service = LayeredPropertyMapFactory::class) + private val layeredPropertyMapFactory: LayeredPropertyMapFactory, +) : BaseProxySerializer(), UsedByFlow, UsedByPersistence, UsedByVerification { + private companion object { + private const val VERSION_1 = 1 + } + + override fun toProxy(obj: MGMContextImpl): MGMContextProxy { + return MGMContextProxy( + VERSION_1, + obj.toMap() + ) + } + + override fun fromProxy(proxy: MGMContextProxy): MGMContextImpl { + return when(proxy.version) { + VERSION_1 -> + MGMContextImpl(layeredPropertyMapFactory.createMap(proxy.map)) + else -> + throw CordaRuntimeException("Unable to create MGMContextImpl with Version='${proxy.version}'") + } + } + + override val proxyType: Class + get() = MGMContextProxy::class.java + + override val type: Class + get() = MGMContextImpl::class.java + + override val withInheritance: Boolean + get() = false + + private fun LayeredPropertyMap.toMap() = this.entries.associate { it.key to it.value } + +} + +/** + * The class that actually gets serialized on the wire. + */ +data class MGMContextProxy( + /** + * Version of container. + */ + val version: Int, + /** + * Properties for [MGMContextImpl] serialization. + */ + val map: Map +) diff --git a/libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MemberContextSerializer.kt b/libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MemberContextSerializer.kt new file mode 100644 index 00000000000..a6b795e0bff --- /dev/null +++ b/libs/membership/membership-impl/src/main/kotlin/net/corda/membership/lib/impl/serializer/amqp/MemberContextSerializer.kt @@ -0,0 +1,72 @@ +package net.corda.membership.lib.impl.serializer.amqp + +import net.corda.layeredpropertymap.LayeredPropertyMapFactory +import net.corda.membership.lib.impl.MemberContextImpl +import net.corda.sandbox.type.SandboxConstants.CORDA_UNINJECTABLE_SERVICE +import net.corda.sandbox.type.UsedByFlow +import net.corda.sandbox.type.UsedByPersistence +import net.corda.sandbox.type.UsedByVerification +import net.corda.serialization.BaseProxySerializer +import net.corda.serialization.InternalCustomSerializer +import net.corda.v5.base.exceptions.CordaRuntimeException +import net.corda.v5.base.types.LayeredPropertyMap +import org.osgi.service.component.annotations.Activate +import org.osgi.service.component.annotations.Component +import org.osgi.service.component.annotations.Reference +import org.osgi.service.component.annotations.ServiceScope + +@Component( + service = [ InternalCustomSerializer::class, UsedByFlow::class, UsedByPersistence::class, UsedByVerification::class ], + property = [ CORDA_UNINJECTABLE_SERVICE ], + scope = ServiceScope.PROTOTYPE +) +class MemberContextSerializer @Activate constructor( + @Reference(service = LayeredPropertyMapFactory::class) + private val layeredPropertyMapFactory: LayeredPropertyMapFactory, +) : BaseProxySerializer(), UsedByFlow, UsedByPersistence, UsedByVerification { + private companion object { + private const val VERSION_1 = 1 + } + + override fun toProxy(obj: MemberContextImpl): MemberContextProxy { + return MemberContextProxy( + VERSION_1, + obj.toMap() + ) + } + + override fun fromProxy(proxy: MemberContextProxy): MemberContextImpl { + return when(proxy.version) { + VERSION_1 -> + MemberContextImpl(layeredPropertyMapFactory.createMap(proxy.map)) + else -> + throw CordaRuntimeException("Unable to create MemberContextImpl with Version='${proxy.version}'") + } + } + + override val proxyType: Class + get() = MemberContextProxy::class.java + + override val type: Class + get() = MemberContextImpl::class.java + + override val withInheritance: Boolean + get() = false + + private fun LayeredPropertyMap.toMap() = this.entries.associate { it.key to it.value } + +} + +/** + * The class that actually gets serialized on the wire. + */ +data class MemberContextProxy( + /** + * Version of container. + */ + val version: Int, + /** + * Properties for [MemberContextImpl] serialization. + */ + val map: Map +) diff --git a/libs/messaging/kafka-message-bus-impl/src/main/kotlin/net/corda/messagebus/kafka/config/MessageBusConfigResolver.kt b/libs/messaging/kafka-message-bus-impl/src/main/kotlin/net/corda/messagebus/kafka/config/MessageBusConfigResolver.kt index ed7bcd43a3e..0d80edf5db7 100644 --- a/libs/messaging/kafka-message-bus-impl/src/main/kotlin/net/corda/messagebus/kafka/config/MessageBusConfigResolver.kt +++ b/libs/messaging/kafka-message-bus-impl/src/main/kotlin/net/corda/messagebus/kafka/config/MessageBusConfigResolver.kt @@ -1,7 +1,6 @@ package net.corda.messagebus.kafka.config import com.typesafe.config.ConfigFactory -import java.util.Properties import net.corda.libs.configuration.SmartConfig import net.corda.libs.configuration.SmartConfigFactory import net.corda.messagebus.api.configuration.AdminConfig @@ -16,6 +15,7 @@ import net.corda.utilities.debug import org.apache.kafka.clients.producer.ProducerConfig.PARTITIONER_CLASS_CONFIG import org.osgi.framework.FrameworkUtil import org.slf4j.LoggerFactory +import java.util.Properties /** * Resolve a Kafka bus configuration against the enforced and default configurations provided by the library. @@ -71,6 +71,7 @@ internal class MessageBusConfigResolver(private val smartConfigFactory: SmartCon val properties = roleConfig.toKafkaProperties() logger.debug {"Kafka properties for role $rolePath: $properties" } + return properties } @@ -97,13 +98,15 @@ internal class MessageBusConfigResolver(private val smartConfigFactory: SmartCon * @return Resolved user configurable consumer values and kafka properties to be used for the given role type */ fun resolve(messageBusConfig: SmartConfig, consumerConfig: ConsumerConfig): Pair { - val kafkaProperties = resolve(messageBusConfig, consumerConfig.role.configPath, consumerConfig.toSmartConfig()) - val resolvedConfig = ResolvedConsumerConfig( - consumerConfig.group, - consumerConfig.clientId, - messageBusConfig.getString(BootConfig.TOPIC_PREFIX) - ) - return Pair(resolvedConfig, kafkaProperties) + val topicPrefix = messageBusConfig.getString(BootConfig.TOPIC_PREFIX) + val amendedConfig = consumerConfig.addGroupPrefix(topicPrefix) + val kafkaProperties = resolve(messageBusConfig, amendedConfig.role.configPath, amendedConfig.toSmartConfig()) + + return ResolvedConsumerConfig( + amendedConfig.group, + amendedConfig.clientId, + topicPrefix + ) to kafkaProperties } /** @@ -119,13 +122,13 @@ internal class MessageBusConfigResolver(private val smartConfigFactory: SmartCon val kafkaProperties = resolve(messageBusConfig, producerConfig.role.configPath, producerConfig.toSmartConfig()) //enforce the partitioner to be our custom partitioner for producers only kafkaProperties[PARTITIONER_CLASS_CONFIG] = KafkaProducerPartitioner::class.java - val resolvedConfig = ResolvedProducerConfig( + + return ResolvedProducerConfig( producerConfig.clientId, producerConfig.transactional, messageBusConfig.getString(BootConfig.TOPIC_PREFIX), producerConfig.throwOnSerializationError - ) - return Pair(resolvedConfig, kafkaProperties) + ) to kafkaProperties } /** @@ -166,6 +169,10 @@ internal class MessageBusConfigResolver(private val smartConfigFactory: SmartCon ) } + private fun ConsumerConfig.addGroupPrefix(prefix: String) : ConsumerConfig { + return this.copy(group = prefix + this.group) + } + private fun ProducerConfig.toSmartConfig(): SmartConfig { val transactionalId = if (transactional) { "$clientId-$instanceId" diff --git a/libs/messaging/kafka-message-bus-impl/src/main/resources/kafka-messaging-defaults.conf b/libs/messaging/kafka-message-bus-impl/src/main/resources/kafka-messaging-defaults.conf index bd11c722869..6a89e3f335b 100644 --- a/libs/messaging/kafka-message-bus-impl/src/main/resources/kafka-messaging-defaults.conf +++ b/libs/messaging/kafka-message-bus-impl/src/main/resources/kafka-messaging-defaults.conf @@ -29,7 +29,7 @@ consumer = ${common} { heartbeat.interval.ms = 20000 # The maximum amount of time kafka broker will wait before answering a consumer request if there hasn't been # an update to the topic - fetch.max.wait.ms = 5 + fetch.max.wait.ms = 500 } # Defaults for all producers. diff --git a/libs/messaging/kafka-message-bus-impl/src/test/kotlin/net/corda/messagebus/kafka/config/MessageBusConfigResolverTest.kt b/libs/messaging/kafka-message-bus-impl/src/test/kotlin/net/corda/messagebus/kafka/config/MessageBusConfigResolverTest.kt index e64b50d8a20..43ae5ddbeea 100644 --- a/libs/messaging/kafka-message-bus-impl/src/test/kotlin/net/corda/messagebus/kafka/config/MessageBusConfigResolverTest.kt +++ b/libs/messaging/kafka-message-bus-impl/src/test/kotlin/net/corda/messagebus/kafka/config/MessageBusConfigResolverTest.kt @@ -1,8 +1,6 @@ package net.corda.messagebus.kafka.config import com.typesafe.config.ConfigFactory -import java.util.Properties -import java.util.stream.Stream import net.corda.libs.configuration.SmartConfig import net.corda.libs.configuration.SmartConfigFactory import net.corda.messagebus.api.configuration.AdminConfig @@ -17,6 +15,8 @@ import org.junit.jupiter.api.assertThrows import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.MethodSource +import java.util.Properties +import java.util.stream.Stream class MessageBusConfigResolverTest { @@ -74,7 +74,7 @@ class MessageBusConfigResolverTest { BOOTSTRAP_SERVERS_PROP to "kafka:1001", SSL_KEYSTORE_PROP to "foo/bar", CLIENT_ID_PROP to "eventConsumer--$CLIENT_ID", - GROUP_ID_PROP to "group-cooperative" + GROUP_ID_PROP to "testgroup-cooperative" ) ), ConsumerRoles.EVENT_LOG to getExpectedConsumerProperties( @@ -85,7 +85,7 @@ class MessageBusConfigResolverTest { ), ConsumerRoles.RPC_SENDER to getExpectedConsumerProperties( mapOf( - GROUP_ID_PROP to "$GROUP_NAME-sender", + GROUP_ID_PROP to "test$GROUP_NAME-sender", BOOTSTRAP_SERVERS_PROP to "kafka:1001", SSL_KEYSTORE_PROP to "foo/bar", AUTO_OFFSET_RESET_PROP to "latest" @@ -93,7 +93,7 @@ class MessageBusConfigResolverTest { ), ConsumerRoles.RPC_RESPONDER to getExpectedConsumerProperties( mapOf( - GROUP_ID_PROP to "$GROUP_NAME-responder", + GROUP_ID_PROP to "test$GROUP_NAME-responder", BOOTSTRAP_SERVERS_PROP to "kafka:1001", SSL_KEYSTORE_PROP to "foo/bar", AUTO_OFFSET_RESET_PROP to "latest" @@ -149,7 +149,7 @@ class MessageBusConfigResolverTest { private fun getExpectedConsumerProperties(overrides: Map): Properties { val defaults = mapOf( - GROUP_ID_PROP to GROUP_NAME, + GROUP_ID_PROP to "test$GROUP_NAME", CLIENT_ID_PROP to "consumer--$CLIENT_ID", ISOLATION_LEVEL_PROP to "read_committed", BOOTSTRAP_SERVERS_PROP to "localhost:9092", diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MessageBusClient.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MessageBusClient.kt index 116ca1579dd..8b2c43f8818 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MessageBusClient.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MessageBusClient.kt @@ -2,12 +2,14 @@ package net.corda.messaging.mediator import net.corda.messagebus.api.producer.CordaProducer import net.corda.messagebus.api.producer.CordaProducerRecord +import net.corda.messaging.api.exception.CordaMessageAPIFatalException import net.corda.messaging.api.mediator.MediatorMessage import net.corda.messaging.api.mediator.MessagingClient import net.corda.messaging.api.mediator.MessagingClient.Companion.MSG_PROP_ENDPOINT import net.corda.messaging.api.mediator.MessagingClient.Companion.MSG_PROP_KEY import org.slf4j.Logger import org.slf4j.LoggerFactory +import java.util.concurrent.CompletableFuture class MessageBusClient( override val id: String, @@ -18,9 +20,31 @@ class MessageBusClient( private val log: Logger = LoggerFactory.getLogger(this::class.java.enclosingClass) } - override fun send(message: MediatorMessage<*>): MediatorMessage<*>? { - producer.send(message.toCordaProducerRecord(), null) - return null + override fun send(message: MediatorMessage<*>): MediatorMessage<*> { + val future = CompletableFuture() + val record = message.toCordaProducerRecord() + producer.send(record) { ex -> + setFutureFromResponse(ex, future, record.topic) + } + + return MediatorMessage(future) + } + + /** + * Helper function to set a [future] result based on the presence of an [exception] + */ + private fun setFutureFromResponse( + exception: Exception?, + future: CompletableFuture, + topic: String + ) { + if (exception == null) { + future.complete(Unit) + } else { + val message = "Producer clientId $id for topic $topic failed to send." + log.warn(message, exception) + future.completeExceptionally(CordaMessageAPIFatalException(message, exception)) + } } override fun close() { @@ -34,6 +58,9 @@ class MessageBusClient( } } +/** + * Helper function to convert a [MediatorMessage] of a specific format to a [CordaProducerRecord] + */ private fun MediatorMessage<*>.toCordaProducerRecord(): CordaProducerRecord<*, *> { return CordaProducerRecord( topic = this.getProperty(MSG_PROP_ENDPOINT), @@ -43,5 +70,8 @@ private fun MediatorMessage<*>.toCordaProducerRecord(): CordaProducerRecord<*, * ) } +/** + * Helper function to extract headers from message props + */ private fun Map.toHeaders() = map { (key, value) -> (key to value.toString()) } \ No newline at end of file diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt index 339ee7e5027..e83c896467c 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt @@ -69,7 +69,7 @@ class MultiSourceEventMediatorImpl( private val running = AtomicBoolean(false) // TODO This timeout was set with CORE-17768 (changing configuration value would affect other messaging patterns) // This should be reverted to use configuration value once event mediator polling is refactored (planned for 5.2) - private val pollTimeout = Duration.ofMillis(5) + private val pollTimeout = Duration.ofMillis(50) override fun start() { log.debug { "Starting multi-source event mediator with config: $config" } @@ -165,19 +165,21 @@ class MultiSourceEventMediatorImpl( if (messages.isNotEmpty()) { var groups = allocateGroups(messages.map { it.toRecord() }) var states = stateManager.get(messages.map { it.key.toString() }.distinct()) + while (groups.isNotEmpty()) { - val asynchronousOutputs = mutableListOf>() - val newStates = ConcurrentHashMap() - val updateStates = ConcurrentHashMap() - val deleteStates = ConcurrentHashMap() + val asynchronousOutputs = ConcurrentHashMap>>() + val statesToCreate = ConcurrentHashMap() + val statesToUpdate = ConcurrentHashMap() + val statesToDelete = ConcurrentHashMap() val flowEvents = ConcurrentHashMap>>() + // Process each group on a thread groups.filter { it.isNotEmpty() }.map { group -> taskManager.executeShortRunningTask { // Process all same flow events in one go - group.map { it -> + group.map { // Keep track of all records belonging to one flow flowEvents.compute(it.key.toString()) { _, v -> if (v == null) { @@ -199,7 +201,7 @@ class MultiSourceEventMediatorImpl( val event = queue.removeFirst() val response = config.messageProcessor.onNext(processorState, event) processorState = response.updatedState - processOutputEvents(response, asynchronousOutputs, queue, event) + processOutputEvents(it.key.toString(), response, asynchronousOutputs, queue, event) } // ---- Manage the state ---- @@ -209,38 +211,26 @@ class MultiSourceEventMediatorImpl( processorState, ) - // New state - if (state == null && processedState != null) { - newStates[it.key.toString()] = processedState - } - - // Update state - if (state != null && processedState != null) { - updateStates[it.key.toString()] = processedState - } - - // Delete state - if (state != null && processorState == null) { - deleteStates[it.key.toString()] = state - } + qualifyState(it.key.toString(), state, processedState, statesToCreate, statesToUpdate, statesToDelete) } } }.map { it.join() } - sendAsynchronousEvents(asynchronousOutputs) // Persist states changes - val failedToCreateKeys = stateManager.create(newStates.values.mapNotNull { it }) + val failedToCreateKeys = stateManager.create(statesToCreate.values.mapNotNull { it }) val failedToCreate = stateManager.get(failedToCreateKeys.keys) - val failedToDelete = stateManager.delete(deleteStates.values.mapNotNull { it }) - val failedToUpdate = stateManager.update(updateStates.values.mapNotNull { it }) + val failedToDelete = stateManager.delete(statesToDelete.values.mapNotNull { it }) + val failedToUpdate = stateManager.update(statesToUpdate.values.mapNotNull { it }) states = failedToCreate + failedToDelete + failedToUpdate groups = if (states.isNotEmpty()) { allocateGroups(flowEvents.filterKeys { states.containsKey(it) }.values.flatten()) } else { listOf() } + states.keys.forEach { asynchronousOutputs.remove(it) } + sendAsynchronousEvents(asynchronousOutputs.values.flatten()) } metrics.commitTimer.recordCallable { consumer.syncCommitOffsets() @@ -249,7 +239,35 @@ class MultiSourceEventMediatorImpl( metrics.processorTimer.record(System.nanoTime() - startTimestamp, TimeUnit.NANOSECONDS) } - private fun sendAsynchronousEvents(busEvents: MutableList>) { + /** + * Decide, based on the original and processed state values, whether the state must be deleted, updated or + * deleted; and add the relevant state value to the specific Map. + */ + fun qualifyState( + groupId: String, + original: State?, + processed: State?, + toCreate : MutableMap, + toUpdate : MutableMap, + toDelete : MutableMap + ) { + // New state + if (original == null && processed != null) { + toCreate[groupId] = processed + } + + // Update state + if (original != null && processed != null) { + toUpdate[groupId] = processed + } + + // Delete state + if (original != null && processed == null) { + toDelete[groupId] = original + } + } + + private fun sendAsynchronousEvents(busEvents: Collection>) { busEvents.forEach { message -> with(messageRouter.getDestination(message)) { message.addProperty(MessagingClient.MSG_PROP_ENDPOINT, endpoint) @@ -262,8 +280,9 @@ class MultiSourceEventMediatorImpl( * Send any synchronous events immediately, add asynchronous events to the busEvents collection to be sent later */ private fun processOutputEvents( + key: String, response: StateAndEventProcessor.Response, - busEvents: MutableList>, + busEvents: MutableMap>>, queue: ArrayDeque>, event: Record ) { @@ -271,7 +290,11 @@ class MultiSourceEventMediatorImpl( output.forEach { message -> val destination = messageRouter.getDestination(message) if (destination.type == RoutingDestination.Type.ASYNCHRONOUS) { - busEvents.add(message) + busEvents.compute(key) { _, value -> + val list = value ?: mutableListOf() + list.add(message) + list + } } else { @Suppress("UNCHECKED_CAST") val reply = with(destination) { diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/publisher/CordaPublisherImpl.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/publisher/CordaPublisherImpl.kt index 5c9f2646aca..fa0b94f75eb 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/publisher/CordaPublisherImpl.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/publisher/CordaPublisherImpl.kt @@ -232,7 +232,7 @@ internal class CordaPublisherImpl( if (!config.transactional) { future.complete(Unit) } else { - log.debug { "Asynchronous send completed completed successfully." } + log.debug { "Asynchronous send completed successfully." } } } diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MessageBusClientTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MessageBusClientTest.kt index 58559d25ba4..1cf81e3a2e6 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MessageBusClientTest.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MessageBusClientTest.kt @@ -2,19 +2,26 @@ package net.corda.messaging.mediator import net.corda.messagebus.api.producer.CordaProducer import net.corda.messagebus.api.producer.CordaProducerRecord +import net.corda.messaging.api.exception.CordaMessageAPIFatalException import net.corda.messaging.api.mediator.MediatorMessage import net.corda.messaging.api.mediator.MessagingClient.Companion.MSG_PROP_ENDPOINT import net.corda.v5.base.exceptions.CordaRuntimeException +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.mockito.Mockito +import org.mockito.Mockito.any +import org.mockito.Mockito.doAnswer import org.mockito.Mockito.times import org.mockito.kotlin.eq -import org.mockito.kotlin.isNull import org.mockito.kotlin.mock import org.mockito.kotlin.verify import org.mockito.kotlin.whenever +import java.util.concurrent.CompletableFuture class MessageBusClientTest { private companion object { @@ -31,6 +38,12 @@ class MessageBusClientTest { MSG_PROP_KEY to TEST_KEY, ) private val message: MediatorMessage = MediatorMessage("value", messageProps) + private val record: CordaProducerRecord<*, *> = CordaProducerRecord( + TEST_ENDPOINT, + TEST_KEY, + message.payload, + messageProps.toHeaders(), + ) @BeforeEach @@ -39,22 +52,26 @@ class MessageBusClientTest { messageBusClient = MessageBusClient("client-id", cordaProducer) } + @Suppress("UNCHECKED_CAST") @Test - fun testSend() { - messageBusClient.send(message) + fun `test send`() { + doAnswer { + val callback = it.getArgument(1) + callback.onCompletion(null) + }.whenever(cordaProducer).send(eq(record), any()) - val expected = CordaProducerRecord( - TEST_ENDPOINT, - TEST_KEY, - message.payload, - messageProps.toHeaders(), - ) + val result = messageBusClient.send(message) as MediatorMessage> - verify(cordaProducer).send(eq(expected), isNull()) + verify(cordaProducer).send(eq(record), any()) + assertNotNull(result.payload) + result.payload?.let { + assertTrue(it.isDone) + assertFalse(it.isCompletedExceptionally) + } } @Test - fun testSendWithError() { + fun `send should handle synchronous error`() { val record = CordaProducerRecord( TEST_ENDPOINT, TEST_KEY, @@ -62,14 +79,56 @@ class MessageBusClientTest { messageProps.toHeaders(), ) - Mockito.doThrow(CordaRuntimeException("")).whenever(cordaProducer).send(eq(record), isNull()) + Mockito.doThrow(CordaRuntimeException("")).whenever(cordaProducer).send(eq(record), any()) assertThrows { messageBusClient.send(message) } } + @Suppress("UNCHECKED_CAST") + @Test + fun `send should handle asynchronous CordaMessageAPIFatalException`() { + doAnswer { + val callback = it.getArgument(1) + callback.onCompletion(CordaMessageAPIFatalException("test")) + }.whenever(cordaProducer).send(eq(record), any()) + + val result = messageBusClient.send(message) as MediatorMessage> + + verify(cordaProducer).send(eq(record), any()) + assertNotNull(result.payload) + + result.payload?.isCompletedExceptionally?.let { assertTrue(it) } + + result.payload?.handle { _, exception -> + assertTrue(exception is CordaMessageAPIFatalException) + assertEquals("Producer clientId client-id for topic topic failed to send.", exception.message) + }?.get() + } + + @Suppress("UNCHECKED_CAST") + @Test + fun `send should wrap unknown exceptions`() { + doAnswer { + val callback = it.getArgument(1) + callback.onCompletion(CordaRuntimeException("test")) + }.whenever(cordaProducer).send(eq(record), any()) + + val result = messageBusClient.send(message) as MediatorMessage> + + verify(cordaProducer).send(eq(record), any()) + assertNotNull(result.payload) + + result.payload?.isCompletedExceptionally?.let { assertTrue(it) } + + result.payload?.handle { _, exception -> + assertTrue(exception is CordaMessageAPIFatalException) + assertEquals("Producer clientId client-id for topic topic failed to send.", exception.message) + }?.get() + } + @Test - fun testClose() { + fun `test close`() { messageBusClient.close() verify(cordaProducer, times(1)).close() } diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImplTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImplTest.kt index 62b2e99dacb..b446a17f605 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImplTest.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImplTest.kt @@ -3,6 +3,7 @@ package net.corda.messaging.mediator import net.corda.avro.serialization.CordaAvroDeserializer import net.corda.avro.serialization.CordaAvroSerializer import net.corda.libs.configuration.SmartConfig +import net.corda.libs.statemanager.api.State import net.corda.libs.statemanager.api.StateManager import net.corda.lifecycle.LifecycleCoordinatorFactory import net.corda.messagebus.api.consumer.CordaConsumerRecord @@ -10,7 +11,6 @@ import net.corda.messaging.api.exception.CordaMessageAPIIntermittentException import net.corda.messaging.api.mediator.MediatorConsumer import net.corda.messaging.api.mediator.MessageRouter import net.corda.messaging.api.mediator.MessagingClient -import net.corda.messaging.api.mediator.MultiSourceEventMediator import net.corda.messaging.api.mediator.RoutingDestination import net.corda.messaging.api.mediator.config.EventMediatorConfig import net.corda.messaging.api.mediator.config.EventMediatorConfigBuilder @@ -25,7 +25,9 @@ import net.corda.messaging.api.records.Record import net.corda.schema.configuration.MessagingConfig import net.corda.taskmanager.TaskManager import net.corda.test.util.waitWhile +import org.assertj.core.api.Assertions import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test import org.mockito.kotlin.any import org.mockito.kotlin.anyOrNull import org.mockito.kotlin.atLeast @@ -36,6 +38,7 @@ import org.mockito.kotlin.verify import org.mockito.kotlin.whenever import java.time.Duration import java.util.concurrent.CompletableFuture +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicInteger class MultiSourceEventMediatorImplTest { @@ -48,7 +51,7 @@ class MultiSourceEventMediatorImplTest { private val messagingConfig = mock() private lateinit var config: EventMediatorConfig - private lateinit var mediator: MultiSourceEventMediator + private lateinit var mediator: MultiSourceEventMediatorImpl private val mediatorConsumerFactory = mock() private val consumer = mock>() private val messagingClientFactory = mock() @@ -207,6 +210,48 @@ class MultiSourceEventMediatorImplTest { verify(messagingClient, times(expectedProcessingCount)).send(any()) } + @Test + fun qualifyStateMarksStateAsNewWhenOriginalIsNullAndProcessedIsNot() { + val toCreate = ConcurrentHashMap() + val toUpdate = ConcurrentHashMap() + val toDelete = ConcurrentHashMap() + val originalState = null + val processedState = State("key1", "".toByteArray()) + mediator.qualifyState("groupKey", originalState, processedState, toCreate, toUpdate, toDelete) + + Assertions.assertThat(toCreate).containsExactly(Assertions.entry("groupKey", processedState)) + Assertions.assertThat(toUpdate).isEmpty() + Assertions.assertThat(toDelete).isEmpty() + } + + @Test + fun qualifyStateMarksStateAsUpdatableWhenOriginalAndProcessedAreNotNull() { + val toCreate = ConcurrentHashMap() + val toUpdate = ConcurrentHashMap() + val toDelete = ConcurrentHashMap() + val originalState = State("key1", "".toByteArray()) + val processedState = State("key1", "nonEmpty".toByteArray()) + mediator.qualifyState("groupKey", originalState, processedState, toCreate, toUpdate, toDelete) + + Assertions.assertThat(toCreate).isEmpty() + Assertions.assertThat(toUpdate).containsExactly(Assertions.entry("groupKey", processedState)) + Assertions.assertThat(toDelete).isEmpty() + } + + @Test + fun qualifyStateMarksStateAsDeletableWhenProcessedIsNullAndOriginalIsNot() { + val toCreate = ConcurrentHashMap() + val toUpdate = ConcurrentHashMap() + val toDelete = ConcurrentHashMap() + val originalState = State("key1", "".toByteArray()) + val processedState = null + mediator.qualifyState("groupKey", originalState, processedState, toCreate, toUpdate, toDelete) + + Assertions.assertThat(toCreate).isEmpty() + Assertions.assertThat(toUpdate).isEmpty() + Assertions.assertThat(toDelete).containsExactly(Assertions.entry("groupKey", originalState)) + } + private fun cordaConsumerRecords(key: String, event: String) = CordaConsumerRecord( topic = "", partition = 0, offset = 0, key, event, timestamp = 0 diff --git a/libs/state-manager/state-manager-db-impl/src/integrationTest/kotlin/net/corda/libs/statemanager/impl/tests/StateManagerIntegrationTest.kt b/libs/state-manager/state-manager-db-impl/src/integrationTest/kotlin/net/corda/libs/statemanager/impl/tests/StateManagerIntegrationTest.kt index ba725ed7d8d..47e671a072b 100644 --- a/libs/state-manager/state-manager-db-impl/src/integrationTest/kotlin/net/corda/libs/statemanager/impl/tests/StateManagerIntegrationTest.kt +++ b/libs/state-manager/state-manager-db-impl/src/integrationTest/kotlin/net/corda/libs/statemanager/impl/tests/StateManagerIntegrationTest.kt @@ -127,6 +127,24 @@ class StateManagerIntegrationTest { } } + private fun getIntervalBetweenEntities(startEntityKey: String, finishEntityKey: String): Pair { + return dataSource.connection.transaction { connection -> + val loadedEntities = connection.prepareStatement(queryProvider.findStatesByKey(2)) + .use { + it.setString(1, startEntityKey) + it.setString(2, finishEntityKey) + it.executeQuery().resultSetAsStateEntityCollection() + }.sortedBy { + it.modifiedTime + } + + Pair( + loadedEntities.elementAt(0).modifiedTime, + loadedEntities.elementAt(1).modifiedTime + ) + } + } + @ValueSource(ints = [1, 10]) @ParameterizedTest(name = "can create basic states (batch size: {0})") fun canCreateBasicStates(stateCount: Int) { @@ -457,22 +475,6 @@ class StateManagerIntegrationTest { } } - private fun getIntervalBetweenEntities(startEntityKey: String, finishEntityKey: String): Pair { - return dataSource.connection.transaction { connection -> - val loadedEntities = connection.prepareStatement(queryProvider.findStatesByKey(2)) - .use { - it.setString(1, startEntityKey) - it.setString(2, finishEntityKey) - it.executeQuery().resultSetAsStateEntityCollection() - } - - Pair( - loadedEntities.elementAt(0).modifiedTime, - loadedEntities.elementAt(1).modifiedTime - ) - } - } - @Test @DisplayName(value = "can filter states using simple comparisons on metadata values") fun canFilterStatesUsingSimpleComparisonsOnMetadataValues() { diff --git a/libs/web/web-impl/src/main/kotlin/net/corda/web/server/JavalinServer.kt b/libs/web/web-impl/src/main/kotlin/net/corda/web/server/JavalinServer.kt index 4c19c4bfa37..fb3d41c6d57 100644 --- a/libs/web/web-impl/src/main/kotlin/net/corda/web/server/JavalinServer.kt +++ b/libs/web/web-impl/src/main/kotlin/net/corda/web/server/JavalinServer.kt @@ -15,7 +15,6 @@ import org.osgi.service.component.annotations.Component import org.osgi.service.component.annotations.Reference import org.slf4j.Logger import org.slf4j.LoggerFactory -import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock @@ -55,12 +54,12 @@ class JavalinServer( private val apiPathPrefix: String = "/api/${platformInfoProvider.localWorkerSoftwareShortVersion}" private var server: Javalin? = null private val coordinator = coordinatorFactory.createCoordinator { _, _ -> } - private val serverStartLock = ReentrantLock() + private val serverLock = ReentrantLock() - override val endpoints: MutableSet = ConcurrentHashMap.newKeySet() + override val endpoints: MutableSet = mutableSetOf() override fun start(port: Int) { - serverStartLock.withLock { + serverLock.withLock { check(null == server) { "The Javalin webserver is already initialized" } coordinator.start() startServer(port) @@ -100,7 +99,7 @@ class JavalinServer( } private fun restartServer() { - serverStartLock.withLock { + serverLock.withLock { // restart server without marking the component down. checkNotNull(server) { "Cannot restart a non-existing server" } val port = server?.port() @@ -111,7 +110,7 @@ class JavalinServer( } override fun stop() { - serverStartLock.withLock { + serverLock.withLock { coordinator.updateStatus(LifecycleStatus.DOWN) stopServer() coordinator.stop() @@ -119,23 +118,27 @@ class JavalinServer( } override fun registerEndpoint(endpoint: Endpoint) { - if(endpoints.any { it.path == endpoint.path && it.methodType == endpoint.methodType }) - throw IllegalArgumentException("Endpoint with path ${endpoint.path} and method ${endpoint.methodType} already exists.") - // register immediately when the server has been started - if(null != server) registerEndpointInternal(endpoint) - // record the path in case we need to register when it's already started - endpoints.add(endpoint) + serverLock.withLock { + if (endpoints.any { it.path == endpoint.path && it.methodType == endpoint.methodType }) + throw IllegalArgumentException("Endpoint with path ${endpoint.path} and method ${endpoint.methodType} already exists.") + // register immediately when the server has been started + if (null != server) registerEndpointInternal(endpoint) + // record the path in case we need to register when it's already started + endpoints.add(endpoint) + } } override fun removeEndpoint(endpoint: Endpoint) { - endpoints.remove(endpoint) - // NOTE: - // The server needs to be restarted to un-register the path. However, this means everything dependent on - // this is impacted by a restart, which doesn't feel quite right. - // This also means we can't really DOWN/UP the lifecycle status of this because this would end up in a - // relentless yoyo-ing of this component as dependent components keep calling this function. - // TODO - review if it is really needed to de-register a path when a Subscription goes down, for example. - if(null != server) restartServer() + serverLock.withLock { + endpoints.remove(endpoint) + // NOTE: + // The server needs to be restarted to un-register the path. However, this means everything dependent on + // this is impacted by a restart, which doesn't feel quite right. + // This also means we can't really DOWN/UP the lifecycle status of this because this would end up in a + // relentless yoyo-ing of this component as dependent components keep calling this function. + // TODO - review if it is really needed to de-register a path when a Subscription goes down, for example. + if (null != server) restartServer() + } } private fun registerEndpointInternal(endpoint: Endpoint) { diff --git a/state-manager.yaml b/state-manager.yaml index b333b5c2f83..f1d52962b51 100644 --- a/state-manager.yaml +++ b/state-manager.yaml @@ -2,16 +2,8 @@ # Helm chart and a state-manager deployed via bitnami postgres helm chart with the overrides from # `state-manager-postgres.yaml`. # -# First use `./gradlew publishOSGiImage --parallel` to create local docker images -# -# Then deploy the prereqs using: -# helm upgrade --install prereqs -n corda \ -# oci://corda-os-docker.software.r3.com/helm-charts/corda-dev-prereqs \ -# --render-subchart-notes \ -# --timeout 10m \ -# # Then deploy the state manager using the bitnami helm chart -# helm install state-manager-db oci://docker-remotes.software.r3.com/bitnamicharts/postgresql -n corda --version "12.1.0" \ +# helm install state-manager-db oci://registry-1.docker.io/bitnamicharts/postgresql -n corda --version "12.1.0" \ # -f ./state-manager-postgres.yaml \ # --timeout 10m \ # --wait @@ -46,6 +38,14 @@ bootstrap: secretKeyRef: name: "state-manager-db-postgresql" key: "postgres-password" + tokenSelection: + username: + value: "postgres" + password: + valueFrom: + secretKeyRef: + name: "state-manager-db-postgresql" + key: "postgres-password" workers: flow: stateManager: diff --git a/tools/plugins/network/src/main/kotlin/net/corda/cli/plugins/network/GenerateGroupPolicy.kt b/tools/plugins/network/src/main/kotlin/net/corda/cli/plugins/network/GenerateGroupPolicy.kt index cada2a5502c..a006d186594 100644 --- a/tools/plugins/network/src/main/kotlin/net/corda/cli/plugins/network/GenerateGroupPolicy.kt +++ b/tools/plugins/network/src/main/kotlin/net/corda/cli/plugins/network/GenerateGroupPolicy.kt @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.exc.MismatchedInputException import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import net.corda.cli.plugins.network.output.ConsoleOutput +import net.corda.cli.plugins.network.output.Output import net.corda.cli.plugins.network.utils.PrintUtils.printJsonOutput import net.corda.cli.plugins.network.utils.PrintUtils.verifyAndPrintError import org.yaml.snakeyaml.Yaml @@ -20,7 +21,7 @@ import java.util.UUID description = ["Generates GroupPolicy.json file."], mixinStandardHelpOptions = true ) -class GenerateGroupPolicy(private val output: ConsoleOutput = ConsoleOutput()) : Runnable { +class GenerateGroupPolicy(private val output: Output = ConsoleOutput()) : Runnable { @CommandLine.Option( names = ["--endpoint"],