From 6ed83d54d4376bbca0daee18a2d1e6a02334dbd4 Mon Sep 17 00:00:00 2001 From: Dries Samyn Date: Fri, 29 Sep 2023 16:07:00 +0100 Subject: [PATCH 01/15] SessionTimeoutTaskProcessor implementation. --- components/flow/flow-service/build.gradle | 73 +++++++++--------- .../SessionTimeoutTaskProcessor.kt | 45 +++++++++++ .../SessionTimeoutTaskProcessorTests.kt | 77 +++++++++++++++++++ 3 files changed, 159 insertions(+), 36 deletions(-) create mode 100644 components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessor.kt create mode 100644 components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessorTests.kt diff --git a/components/flow/flow-service/build.gradle b/components/flow/flow-service/build.gradle index d13f5d4188d..8b1351f968a 100644 --- a/components/flow/flow-service/build.gradle +++ b/components/flow/flow-service/build.gradle @@ -12,51 +12,52 @@ dependencies { compileOnly 'org.osgi:osgi.annotation' compileOnly "co.paralleluniverse:quasar-osgi-annotations:$quasarVersion" - implementation project(":components:configuration:configuration-read-service") - implementation project(":components:external-messaging-services") - implementation project(":components:membership:membership-group-read") + implementation project(':components:configuration:configuration-read-service') + implementation project(':components:external-messaging-services') + implementation project(':components:membership:membership-group-read') implementation project(':components:membership:group-policy') - implementation project(":components:virtual-node:virtual-node-info-read-service") - implementation project(":components:virtual-node:cpi-info-read-service") - implementation project(":components:virtual-node:cpk-read-service") - implementation project(":components:virtual-node:sandbox-group-context-service") + implementation project(':components:virtual-node:virtual-node-info-read-service') + implementation project(':components:virtual-node:cpi-info-read-service') + implementation project(':components:virtual-node:cpk-read-service') + implementation project(':components:virtual-node:sandbox-group-context-service') runtimeOnly project(':components:virtual-node:sandbox-amqp') runtimeOnly project(':components:virtual-node:sandbox-json') - implementation project(":libs:cache:cache-caffeine") - implementation project(":libs:configuration:configuration-core") - implementation project(":libs:crypto:crypto-core") - implementation project(":libs:crypto:crypto-flow") - implementation project(":libs:external-messaging") + implementation project(':libs:cache:cache-caffeine') + implementation project(':libs:configuration:configuration-core') + implementation project(':libs:crypto:crypto-core') + implementation project(':libs:crypto:crypto-flow') + implementation project(':libs:external-messaging') implementation project(':libs:flows:flow-api') implementation project(':libs:flows:session-manager') implementation project(":libs:flows:flow-utils") - implementation project(":libs:lifecycle:lifecycle") - implementation project(":libs:membership:membership-common") - implementation project(":libs:metrics") + implementation project(':libs:lifecycle:lifecycle') + implementation project(':libs:membership:membership-common') + implementation project(':libs:metrics') implementation project(":libs:messaging:messaging") implementation project(':libs:platform-info') - implementation project(":libs:sandbox") + implementation project(':libs:sandbox') implementation project(':libs:serialization:serialization-amqp') - implementation project(":libs:serialization:serialization-checkpoint-api") - implementation project(":libs:utilities") - implementation project(":libs:virtual-node:sandbox-group-context") + implementation project(':libs:serialization:serialization-checkpoint-api') + implementation project(':libs:state-manager:state-manager-api') + implementation project(':libs:utilities') + implementation project(':libs:virtual-node:sandbox-group-context') implementation project(':libs:virtual-node:virtual-node-info') implementation project(':libs:platform-info') - implementation project(":libs:serialization:serialization-avro") - implementation project(":libs:tracing") + implementation project(':libs:serialization:serialization-avro') + implementation project(':libs:tracing') implementation platform("net.corda:corda-api:$cordaApiVersion") implementation "com.typesafe:config:$typeSafeConfigVersion" - implementation "net.corda:corda-application" - implementation "net.corda:corda-ledger-utxo" - implementation "net.corda:corda-avro-schema" - implementation "net.corda:corda-base" - implementation "net.corda:corda-config-schema" + implementation 'net.corda:corda-application' + implementation 'net.corda:corda-ledger-utxo' + implementation 'net.corda:corda-avro-schema' + implementation 'net.corda:corda-base' + implementation 'net.corda:corda-config-schema' implementation 'net.corda:corda-ledger-common' - implementation project(":libs:packaging:packaging") - implementation "net.corda:corda-topic-schema" + implementation project(':libs:packaging:packaging') + implementation 'net.corda:corda-topic-schema' implementation 'org.jetbrains.kotlin:kotlin-osgi-bundle' implementation "org.slf4j:slf4j-api:$slf4jVersion" @@ -69,11 +70,11 @@ dependencies { testImplementation "org.apache.felix:org.apache.felix.framework:$felixVersion" testImplementation "com.fasterxml.jackson.module:jackson-module-kotlin:$jacksonVersion" - testImplementation project(":libs:flows:session-manager-impl") - testImplementation project(":libs:lifecycle:lifecycle-test-impl") - testImplementation project(":libs:lifecycle:lifecycle-impl") - testImplementation project(":libs:lifecycle:registry") - testImplementation project(":testing:flow:flow-utilities") + testImplementation project(':libs:flows:session-manager-impl') + testImplementation project(':libs:lifecycle:lifecycle-test-impl') + testImplementation project(':libs:lifecycle:lifecycle-impl') + testImplementation project(':libs:lifecycle:registry') + testImplementation project(':testing:flow:flow-utilities') testImplementation project(':testing:test-utilities') testRuntimeOnly "org.slf4j:slf4j-simple:$slf4jVersion" @@ -85,13 +86,13 @@ dependencies { integrationTestRuntimeOnly project(':libs:application:application-impl') integrationTestRuntimeOnly project(':libs:flows:session-manager-impl') - integrationTestRuntimeOnly project(":libs:lifecycle:lifecycle-impl") + integrationTestRuntimeOnly project(':libs:lifecycle:lifecycle-impl') integrationTestRuntimeOnly project(':libs:messaging:db-message-bus-impl') integrationTestRuntimeOnly project(':libs:messaging:messaging-impl') integrationTestRuntimeOnly project(':libs:serialization:serialization-checkpoint-api') integrationTestRuntimeOnly project(':libs:serialization:serialization-kryo') - integrationTestRuntimeOnly project(":components:membership:membership-group-read-impl") - integrationTestRuntimeOnly project(":components:virtual-node:cpk-read-service-impl") + integrationTestRuntimeOnly project(':components:membership:membership-group-read-impl') + integrationTestRuntimeOnly project(':components:virtual-node:cpk-read-service-impl') integrationTestRuntimeOnly "org.apache.aries.spifly:org.apache.aries.spifly.dynamic.framework.extension:$ariesDynamicFrameworkExtensionVersion" } diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessor.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessor.kt new file mode 100644 index 00000000000..a55044d6de2 --- /dev/null +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessor.kt @@ -0,0 +1,45 @@ +package net.corda.flow.maintenance + +import net.corda.data.scheduler.ScheduledTaskTrigger +import net.corda.libs.statemanager.api.Operation +import net.corda.libs.statemanager.api.SingleKeyFilter +import net.corda.libs.statemanager.api.StateManager +import net.corda.messaging.api.processor.DurableProcessor +import net.corda.messaging.api.records.Record +import net.corda.schema.Schemas.ScheduledTask +import org.slf4j.LoggerFactory +import java.time.Instant + +class SessionTimeoutTaskProcessor( + private val stateManager: StateManager, + private val now: () -> Instant = Instant::now +) : DurableProcessor { + companion object { + private val logger = LoggerFactory.getLogger(SessionTimeoutTaskProcessor::class.java) + } + override val keyClass: Class + get() = String::class.java + override val valueClass: Class + get() = ScheduledTaskTrigger::class.java + + override fun onNext(events: List>): List> { + // If we receive multiple, there's probably an issue somewhere, and we can ignore all but the last one. + return events.lastOrNull { it.key == ScheduledTask.SCHEDULED_TASK_NAME_SESSION_TIMEOUT }?.value?.let { trigger -> + logger.trace("Processing trigger scheduled at ${trigger.timestamp}") + // TODO - temporary query + // TODO - we must be able to specify additional filters so we can limit to selecting those sessions that are still open + // TODO - we must be able to limit by type of state + val checkpoints = stateManager.find( + SingleKeyFilter("session.expiry", Operation.LesserThan, now()) + ) + if (checkpoints.isEmpty()) { + logger.trace("No flows to time out") + emptyList() + } else { + // TODO - return an avro message (schema TBC) for each checkpoint + logger.info("Trigger cleanup of $checkpoints") + emptyList() + } + } ?: emptyList() + } +} \ No newline at end of file diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessorTests.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessorTests.kt new file mode 100644 index 00000000000..3d8ed1cb03e --- /dev/null +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessorTests.kt @@ -0,0 +1,77 @@ +package net.corda.flow.maintenance + +import net.corda.data.scheduler.ScheduledTaskTrigger +import net.corda.libs.statemanager.api.Metadata +import net.corda.libs.statemanager.api.State +import net.corda.libs.statemanager.api.StateManager +import net.corda.messaging.api.records.Record +import net.corda.schema.Schemas +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Disabled +import org.junit.jupiter.api.Test +import org.mockito.internal.verification.Times +import org.mockito.kotlin.any +import org.mockito.kotlin.doReturn +import org.mockito.kotlin.mock +import org.mockito.kotlin.never +import org.mockito.kotlin.verify +import org.mockito.kotlin.whenever +import java.time.Instant + +class SessionTimeoutTaskProcessorTests { + private val state = State("foo", randomBytes(), 0, Metadata()) + private val states = mapOf( + "foo" to state + ) + private val stateManager = mock { + on { find(any()) } doReturn (states) + } + private val record1 = Record( + Schemas.ScheduledTask.SCHEDULED_TASK_NAME_SESSION_TIMEOUT, + Schemas.ScheduledTask.SCHEDULED_TASK_NAME_SESSION_TIMEOUT, + mock()) + private val now = Instant.now() + @Test + fun `when empty list do nothing`() { + val processor = SessionTimeoutTaskProcessor(stateManager) { now } + val output = processor.onNext(emptyList()) + assertThat(output).isEmpty() + verify(stateManager, never()).find(any()) + } + + @Test + fun `when multiple in list do only process one`() { + val processor = SessionTimeoutTaskProcessor(stateManager) { now } + processor.onNext(listOf(record1, record1.copy(value = mock()))) + verify(stateManager, Times(1)).find(any()) + } + + @Test + fun `filter out wrong key`() { + val processor = SessionTimeoutTaskProcessor(stateManager) { now } + val output = processor.onNext(listOf(record1.copy(key = "foo"))) + assertThat(output).isEmpty() + verify(stateManager, never()).find(any()) + } + + @Test + @Disabled + fun `when state found return`() { + whenever(stateManager.find(any())).doReturn(emptyMap()) + val processor = SessionTimeoutTaskProcessor(stateManager) { now } + val output = processor.onNext(listOf(record1)) + assertThat(output).isNotEmpty + } + + @Test + fun `when no states found return empty`() { + val processor = SessionTimeoutTaskProcessor(stateManager) { now } + val output = processor.onNext(listOf(record1)) + // TODO - better assertion when integrated + assertThat(output).isEmpty() + } + + private fun randomBytes(): ByteArray { + return (1..16).map { ('0'..'9').random() }.joinToString("").toByteArray() + } +} \ No newline at end of file From addec41f4712554e31e9cf76ddf7806c61d036be Mon Sep 17 00:00:00 2001 From: Dries Samyn Date: Sat, 30 Sep 2023 14:37:18 +0100 Subject: [PATCH 02/15] Flow maintenance service. --- .../corda/flow/maintenance/FlowMaintenance.kt | 9 +++ .../flow/maintenance/FlowMaintenanceImpl.kt | 76 +++++++++++++++++++ .../SessionTimeoutTaskProcessor.kt | 1 + .../net/corda/flow/service/FlowService.kt | 9 ++- .../maintenance/FlowMaintenanceImplTests.kt | 63 +++++++++++++++ .../net/corda/flow/service/FlowServiceTest.kt | 7 +- .../statemanager/api/StateManagerFactory.kt | 8 +- .../impl/factory/StateManagerFactoryImpl.kt | 24 +++--- 8 files changed, 177 insertions(+), 20 deletions(-) create mode 100644 components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenance.kt create mode 100644 components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenanceImpl.kt create mode 100644 components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/FlowMaintenanceImplTests.kt diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenance.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenance.kt new file mode 100644 index 00000000000..3e19e9a6e01 --- /dev/null +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenance.kt @@ -0,0 +1,9 @@ +package net.corda.flow.maintenance + +import net.corda.libs.configuration.SmartConfig +import net.corda.lifecycle.Lifecycle + +interface FlowMaintenance : Lifecycle { + fun onConfigChange(config: Map) +} + diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenanceImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenanceImpl.kt new file mode 100644 index 00000000000..6a2cb06a1a6 --- /dev/null +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenanceImpl.kt @@ -0,0 +1,76 @@ +package net.corda.flow.maintenance + +import net.corda.flow.service.FlowExecutor +import net.corda.libs.configuration.SmartConfig +import net.corda.libs.configuration.helper.getConfig +import net.corda.libs.statemanager.api.StateManagerFactory +import net.corda.lifecycle.LifecycleCoordinator +import net.corda.lifecycle.LifecycleCoordinatorFactory +import net.corda.lifecycle.LifecycleEvent +import net.corda.lifecycle.LifecycleStatus +import net.corda.lifecycle.StartEvent +import net.corda.lifecycle.StopEvent +import net.corda.lifecycle.createCoordinator +import net.corda.messaging.api.subscription.config.SubscriptionConfig +import net.corda.messaging.api.subscription.factory.SubscriptionFactory +import net.corda.schema.Schemas +import net.corda.schema.configuration.ConfigKeys +import net.corda.utilities.debug +import net.corda.utilities.trace +import org.osgi.service.component.annotations.Component +import org.osgi.service.component.annotations.Reference +import org.slf4j.LoggerFactory + +@Component(service = [FlowMaintenance::class]) +class FlowMaintenanceImpl constructor( + @Reference(service = LifecycleCoordinatorFactory::class) + coordinatorFactory: LifecycleCoordinatorFactory, + @Reference(service = SubscriptionFactory::class) + private val subscriptionFactory: SubscriptionFactory, + @Reference(service = StateManagerFactory::class) + private val stateManagerFactory: StateManagerFactory, +) : FlowMaintenance { + companion object { + private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass) + } + + private val coordinator = coordinatorFactory.createCoordinator(::eventHandler) + override fun onConfigChange(config: Map) { + val messagingConfig = config.getConfig(ConfigKeys.MESSAGING_CONFIG) + // TODO - fix config key. The state manager has nothing to do with messaging. + val stateManagerConfig = config.getConfig(ConfigKeys.MESSAGING_CONFIG) + coordinator.createManagedResource("FLOW_MAINTENANCE_SUBSCRIPTION") { + subscriptionFactory.createDurableSubscription( + SubscriptionConfig("flow.maintenance.tasks", Schemas.ScheduledTask.SCHEDULED_TASK_TOPIC_FLOW_PROCESSOR), + SessionTimeoutTaskProcessor(stateManagerFactory.create(stateManagerConfig)), + messagingConfig, + null + ) + }.start() + } + + override val isRunning: Boolean + get() = coordinator.isRunning + + override fun start() { + coordinator.start() + } + + override fun stop() { + coordinator.stop() + } + + private fun eventHandler(event: LifecycleEvent, coordinator: LifecycleCoordinator) { + logger.debug { "Flow maintenance event $event." } + + when (event) { + is StartEvent -> { + coordinator.updateStatus(LifecycleStatus.UP) + // TODO - this should register to follow the State Manager's lifecycle + } + is StopEvent -> { + logger.trace { "Flow maintenance is stopping..." } + } + } + } +} \ No newline at end of file diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessor.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessor.kt index a55044d6de2..7d170a8f16f 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessor.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessor.kt @@ -37,6 +37,7 @@ class SessionTimeoutTaskProcessor( emptyList() } else { // TODO - return an avro message (schema TBC) for each checkpoint + // TODO - define topic to publish message on logger.info("Trigger cleanup of $checkpoints") emptyList() } diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/service/FlowService.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/service/FlowService.kt index 634214ee803..59b223f0c1c 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/service/FlowService.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/service/FlowService.kt @@ -4,6 +4,7 @@ import net.corda.configuration.read.ConfigChangedEvent import net.corda.configuration.read.ConfigurationReadService import net.corda.cpiinfo.read.CpiInfoReadService import net.corda.external.messaging.services.ExternalMessagingRoutingService +import net.corda.flow.maintenance.FlowMaintenance import net.corda.lifecycle.Lifecycle import net.corda.lifecycle.LifecycleCoordinator import net.corda.lifecycle.LifecycleCoordinatorFactory @@ -24,7 +25,6 @@ import net.corda.virtualnode.read.VirtualNodeInfoReadService import org.osgi.service.component.annotations.Activate import org.osgi.service.component.annotations.Component import org.osgi.service.component.annotations.Reference -import org.slf4j.LoggerFactory @Suppress("LongParameterList") @Component(service = [FlowService::class]) @@ -37,10 +37,11 @@ class FlowService @Activate constructor( private val flowExecutor: FlowExecutor, @Reference(service = ExternalMessagingRoutingService::class) private val externalMessagingRoutingService: ExternalMessagingRoutingService, + @Reference(service = FlowMaintenance::class) + private val flowMaintenance: FlowMaintenance, ) : Lifecycle { companion object { - private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass) private val configSections = setOf(BOOT_CONFIG, MESSAGING_CONFIG, FLOW_CONFIG, UTXO_LEDGER_CONFIG) } @@ -60,8 +61,10 @@ class FlowService @Activate constructor( LifecycleCoordinatorName.forComponent(), LifecycleCoordinatorName.forComponent(), LifecycleCoordinatorName.forComponent(), + LifecycleCoordinatorName.forComponent(), ) ) + flowMaintenance.start() flowExecutor.start() } @@ -85,12 +88,14 @@ class FlowService @Activate constructor( * is configured before we configure the executor to prevent a race between receiving the first * state events and scheduler creating a publisher. */ + flowMaintenance.onConfigChange(config) flowExecutor.onConfigChange(config) externalMessagingRoutingService.onConfigChange(config) coordinator.updateStatus(LifecycleStatus.UP) } is StopEvent -> { + flowMaintenance.stop() flowExecutor.stop() registration?.close() registration = null diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/FlowMaintenanceImplTests.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/FlowMaintenanceImplTests.kt new file mode 100644 index 00000000000..80266cbc0ef --- /dev/null +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/FlowMaintenanceImplTests.kt @@ -0,0 +1,63 @@ +package net.corda.flow.maintenance + +import net.corda.data.scheduler.ScheduledTaskTrigger +import net.corda.libs.configuration.SmartConfig +import net.corda.libs.statemanager.api.StateManager +import net.corda.libs.statemanager.api.StateManagerFactory +import net.corda.lifecycle.LifecycleCoordinator +import net.corda.lifecycle.LifecycleCoordinatorFactory +import net.corda.messaging.api.subscription.Subscription +import net.corda.messaging.api.subscription.factory.SubscriptionFactory +import net.corda.schema.Schemas +import net.corda.schema.configuration.ConfigKeys +import org.junit.jupiter.api.Test +import org.mockito.kotlin.any +import org.mockito.kotlin.argThat +import org.mockito.kotlin.argumentCaptor +import org.mockito.kotlin.doReturn +import org.mockito.kotlin.eq +import org.mockito.kotlin.isNull +import org.mockito.kotlin.mock +import org.mockito.kotlin.verify + +class FlowMaintenanceImplTests { + private val subscription = mock>() + private val lifecycleCoordinator = mock { + on { createManagedResource(any(), any<() -> Subscription>()) } doReturn (subscription) + } + private val lifecycleCoordinatorFactory = mock { + on { createCoordinator(any(), any()) } doReturn (lifecycleCoordinator) + } + private val subscriptionFactory = mock { + on { createDurableSubscription(any(), any(), any(), any()) } doReturn(subscription) + } + private val stateManager = mock() + private val stateManagerFactory = mock { + on { create(any()) } doReturn (stateManager) + } + private val messagingConfig = mock() + // TODO - fix this when state manager config is split up from messaging + private val stateManagerConfig = messagingConfig + private val config = mapOf( + ConfigKeys.MESSAGING_CONFIG to messagingConfig + ) + + @Test + fun `when config provided create subscription and start it`() { + val captor = argumentCaptor<() -> Subscription>() + val m = FlowMaintenanceImpl(lifecycleCoordinatorFactory, subscriptionFactory, stateManagerFactory) + m.onConfigChange(config) + verify(lifecycleCoordinator).createManagedResource(any(), captor.capture()) + captor.firstValue() + verify(subscriptionFactory).createDurableSubscription( + argThat { it -> + it.eventTopic == Schemas.ScheduledTask.SCHEDULED_TASK_TOPIC_FLOW_PROCESSOR + }, + any(), + eq(messagingConfig), + isNull() + ) + verify(stateManagerFactory).create(stateManagerConfig) + verify(subscription).start() + } +} \ No newline at end of file diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/service/FlowServiceTest.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/service/FlowServiceTest.kt index b7901cf0a8b..1a913c77756 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/service/FlowServiceTest.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/service/FlowServiceTest.kt @@ -1,10 +1,10 @@ package net.corda.flow.service -import java.util.stream.Stream import net.corda.configuration.read.ConfigurationReadService import net.corda.cpiinfo.read.CpiInfoReadService import net.corda.external.messaging.services.ExternalMessagingRoutingService import net.corda.flow.MINIMUM_SMART_CONFIG +import net.corda.flow.maintenance.FlowMaintenance import net.corda.lifecycle.LifecycleCoordinatorName import net.corda.lifecycle.test.impl.LifecycleTest import net.corda.sandboxgroupcontext.service.SandboxGroupContextComponent @@ -19,6 +19,7 @@ import org.mockito.kotlin.eq import org.mockito.kotlin.mock import org.mockito.kotlin.times import org.mockito.kotlin.verify +import java.util.stream.Stream class FlowServiceTest { @@ -37,6 +38,7 @@ class FlowServiceTest { private val flowExecutor = mock() private val externalMessagingRoutingService = mock() + private val flowMaintenance = mock() private val exampleConfig = mapOf( ConfigKeys.BOOT_CONFIG to MINIMUM_SMART_CONFIG, @@ -150,7 +152,8 @@ class FlowServiceTest { coordinatorFactory, configReadService, flowExecutor, - externalMessagingRoutingService + externalMessagingRoutingService, + flowMaintenance ) } } diff --git a/libs/state-manager/state-manager-api/src/main/kotlin/net/corda/libs/statemanager/api/StateManagerFactory.kt b/libs/state-manager/state-manager-api/src/main/kotlin/net/corda/libs/statemanager/api/StateManagerFactory.kt index 658cbc70550..ef0d893b2d3 100644 --- a/libs/state-manager/state-manager-api/src/main/kotlin/net/corda/libs/statemanager/api/StateManagerFactory.kt +++ b/libs/state-manager/state-manager-api/src/main/kotlin/net/corda/libs/statemanager/api/StateManagerFactory.kt @@ -8,10 +8,10 @@ import net.corda.libs.configuration.SmartConfig interface StateManagerFactory { /** - * Create a state manager from the given [messagingConfig]. + * Create a state manager from the given [config]. * - * @param messagingConfig containing the state manager to connect to underlying storage mechanism. - * @return a state manager created from the given [messagingConfig]. + * @param config containing the state manager to connect to underlying storage mechanism. + * @return a state manager created from the given [config]. */ - fun create(messagingConfig: SmartConfig): StateManager + fun create(config: SmartConfig): StateManager } diff --git a/libs/state-manager/state-manager-db-impl/src/main/kotlin/net/corda/libs/statemanager/impl/factory/StateManagerFactoryImpl.kt b/libs/state-manager/state-manager-db-impl/src/main/kotlin/net/corda/libs/statemanager/impl/factory/StateManagerFactoryImpl.kt index afa12ae5b65..35e03a8ea43 100644 --- a/libs/state-manager/state-manager-db-impl/src/main/kotlin/net/corda/libs/statemanager/impl/factory/StateManagerFactoryImpl.kt +++ b/libs/state-manager/state-manager-db-impl/src/main/kotlin/net/corda/libs/statemanager/impl/factory/StateManagerFactoryImpl.kt @@ -34,18 +34,18 @@ class StateManagerFactoryImpl @Activate constructor( private val entityManagerFactoryFactory: EntityManagerFactoryFactory, ) : StateManagerFactory { - override fun create(messagingConfig: SmartConfig): StateManager { - val user = messagingConfig.getString(JDBC_USER) - val pass = messagingConfig.getString(JDBC_PASS) - val jdbcUrl = messagingConfig.getString(JDBC_URL) - val jdbcDiver = messagingConfig.getString(JDBC_DRIVER) - val persistenceUnitName = messagingConfig.getString(JDBC_PERSISTENCE_UNIT_NAME) - val maxPoolSize = messagingConfig.getInt(JDBC_POOL_MAX_SIZE) - val minPoolSize = messagingConfig.getIntOrDefault(JDBC_POOL_MIN_SIZE, maxPoolSize) - val idleTimeout = messagingConfig.getInt(JDBC_POOL_IDLE_TIMEOUT_SECONDS).toLong().run(Duration::ofSeconds) - val maxLifetime = messagingConfig.getInt(JDBC_POOL_MAX_LIFETIME_SECONDS).toLong().run(Duration::ofSeconds) - val keepAliveTime = messagingConfig.getInt(JDBC_POOL_KEEP_ALIVE_TIME_SECONDS).toLong().run(Duration::ofSeconds) - val validationTimeout = messagingConfig.getInt(JDBC_POOL_VALIDATION_TIMEOUT_SECONDS).toLong().run(Duration::ofSeconds) + override fun create(config: SmartConfig): StateManager { + val user = config.getString(JDBC_USER) + val pass = config.getString(JDBC_PASS) + val jdbcUrl = config.getString(JDBC_URL) + val jdbcDiver = config.getString(JDBC_DRIVER) + val persistenceUnitName = config.getString(JDBC_PERSISTENCE_UNIT_NAME) + val maxPoolSize = config.getInt(JDBC_POOL_MAX_SIZE) + val minPoolSize = config.getIntOrDefault(JDBC_POOL_MIN_SIZE, maxPoolSize) + val idleTimeout = config.getInt(JDBC_POOL_IDLE_TIMEOUT_SECONDS).toLong().run(Duration::ofSeconds) + val maxLifetime = config.getInt(JDBC_POOL_MAX_LIFETIME_SECONDS).toLong().run(Duration::ofSeconds) + val keepAliveTime = config.getInt(JDBC_POOL_KEEP_ALIVE_TIME_SECONDS).toLong().run(Duration::ofSeconds) + val validationTimeout = config.getInt(JDBC_POOL_VALIDATION_TIMEOUT_SECONDS).toLong().run(Duration::ofSeconds) val dataSource = HikariDataSourceFactory().create( username = user, From 6f0aa0f921b0b6781dec879dd98afb9950cb3902 Mon Sep 17 00:00:00 2001 From: Dries Samyn Date: Sun, 1 Oct 2023 17:57:47 +0100 Subject: [PATCH 03/15] Missing annotation & package export --- .../main/java/net/corda/flow/maintenance/package-info.java | 4 ++++ .../kotlin/net/corda/flow/maintenance/FlowMaintenanceImpl.kt | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) create mode 100644 components/flow/flow-service/src/main/java/net/corda/flow/maintenance/package-info.java diff --git a/components/flow/flow-service/src/main/java/net/corda/flow/maintenance/package-info.java b/components/flow/flow-service/src/main/java/net/corda/flow/maintenance/package-info.java new file mode 100644 index 00000000000..4246a307488 --- /dev/null +++ b/components/flow/flow-service/src/main/java/net/corda/flow/maintenance/package-info.java @@ -0,0 +1,4 @@ +@Export +package net.corda.flow.maintenance; + +import org.osgi.annotation.bundle.Export; \ No newline at end of file diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenanceImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenanceImpl.kt index 6a2cb06a1a6..cddd0c6df6f 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenanceImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenanceImpl.kt @@ -17,12 +17,13 @@ import net.corda.schema.Schemas import net.corda.schema.configuration.ConfigKeys import net.corda.utilities.debug import net.corda.utilities.trace +import org.osgi.service.component.annotations.Activate import org.osgi.service.component.annotations.Component import org.osgi.service.component.annotations.Reference import org.slf4j.LoggerFactory @Component(service = [FlowMaintenance::class]) -class FlowMaintenanceImpl constructor( +class FlowMaintenanceImpl @Activate constructor( @Reference(service = LifecycleCoordinatorFactory::class) coordinatorFactory: LifecycleCoordinatorFactory, @Reference(service = SubscriptionFactory::class) From 80fd64070330d373a8b1133373ddf42edd0cb8bf Mon Sep 17 00:00:00 2001 From: Dries Samyn Date: Mon, 2 Oct 2023 08:48:21 +0100 Subject: [PATCH 04/15] Ensure `StateManagerFactory` OSGi service is available to those OSGi tests that need it. --- components/flow/flow-service/build.gradle | 1 + components/ledger/ledger-common-flow/build.gradle | 1 + components/ledger/ledger-consensual-flow/build.gradle | 1 + components/ledger/ledger-utxo-flow/build.gradle | 1 + processors/flow-mapper-processor/build.gradle | 1 + processors/flow-processor/build.gradle | 1 + 6 files changed, 6 insertions(+) diff --git a/components/flow/flow-service/build.gradle b/components/flow/flow-service/build.gradle index 8b1351f968a..93cac3f5011 100644 --- a/components/flow/flow-service/build.gradle +++ b/components/flow/flow-service/build.gradle @@ -91,6 +91,7 @@ dependencies { integrationTestRuntimeOnly project(':libs:messaging:messaging-impl') integrationTestRuntimeOnly project(':libs:serialization:serialization-checkpoint-api') integrationTestRuntimeOnly project(':libs:serialization:serialization-kryo') + integrationTestRuntimeOnly project(":libs:state-manager:state-manager-db-impl") integrationTestRuntimeOnly project(':components:membership:membership-group-read-impl') integrationTestRuntimeOnly project(':components:virtual-node:cpk-read-service-impl') diff --git a/components/ledger/ledger-common-flow/build.gradle b/components/ledger/ledger-common-flow/build.gradle index 36770b7581e..a1ccbcf7405 100644 --- a/components/ledger/ledger-common-flow/build.gradle +++ b/components/ledger/ledger-common-flow/build.gradle @@ -50,6 +50,7 @@ dependencies { integrationTestRuntimeOnly project(':libs:messaging:db-message-bus-impl') integrationTestRuntimeOnly project(':libs:messaging:messaging-impl') integrationTestRuntimeOnly project(':libs:flows:session-manager-impl') + integrationTestRuntimeOnly project(":libs:state-manager:state-manager-db-impl") integrationTestRuntimeOnly "org.apache.aries.spifly:org.apache.aries.spifly.dynamic.framework.extension:$ariesDynamicFrameworkExtensionVersion" cpis project(path: ':testing:ledger:ledger-common-empty-app', configuration: 'cordaCPB') diff --git a/components/ledger/ledger-consensual-flow/build.gradle b/components/ledger/ledger-consensual-flow/build.gradle index 0f0ad0ea038..84a9996622d 100644 --- a/components/ledger/ledger-consensual-flow/build.gradle +++ b/components/ledger/ledger-consensual-flow/build.gradle @@ -52,6 +52,7 @@ dependencies { integrationTestRuntimeOnly project(':libs:messaging:messaging-impl') integrationTestRuntimeOnly project(':libs:lifecycle:lifecycle-impl') integrationTestRuntimeOnly project(':libs:flows:session-manager-impl') + integrationTestRuntimeOnly project(":libs:state-manager:state-manager-db-impl") cpis project(path: ':testing:ledger:ledger-consensual-state-app', configuration: 'cordaCPB') } diff --git a/components/ledger/ledger-utxo-flow/build.gradle b/components/ledger/ledger-utxo-flow/build.gradle index 8d7024c1db4..33e0f595e8c 100644 --- a/components/ledger/ledger-utxo-flow/build.gradle +++ b/components/ledger/ledger-utxo-flow/build.gradle @@ -73,6 +73,7 @@ dependencies { integrationTestRuntimeOnly project(':libs:lifecycle:lifecycle-impl') integrationTestRuntimeOnly project(':libs:flows:session-manager-impl') integrationTestRuntimeOnly project(':libs:membership:membership-impl') + integrationTestRuntimeOnly project(":libs:state-manager:state-manager-db-impl") cpis project(path: ':testing:ledger:ledger-utxo-state-app', configuration: 'cordaCPB') } diff --git a/processors/flow-mapper-processor/build.gradle b/processors/flow-mapper-processor/build.gradle index 59f952edfa1..4e55722062f 100644 --- a/processors/flow-mapper-processor/build.gradle +++ b/processors/flow-mapper-processor/build.gradle @@ -59,5 +59,6 @@ dependencies { runtimeOnly project(':libs:ledger:ledger-utxo-data') runtimeOnly project(":libs:lifecycle:lifecycle-impl") runtimeOnly project(":libs:schema-registry:schema-registry-impl") + runtimeOnly project(":libs:state-manager:state-manager-db-impl") } diff --git a/processors/flow-processor/build.gradle b/processors/flow-processor/build.gradle index 123c156cffe..8f099f4b84b 100644 --- a/processors/flow-processor/build.gradle +++ b/processors/flow-processor/build.gradle @@ -69,6 +69,7 @@ dependencies { runtimeOnly project(":libs:sandbox-internal") runtimeOnly project(":libs:schema-registry:schema-registry-impl") runtimeOnly project(":libs:serialization:serialization-kryo") + runtimeOnly project(":libs:state-manager:state-manager-db-impl") runtimeOnly project(":libs:web:web-impl") } From 63985fea06522c9b35024ff0288a437dabf5e48b Mon Sep 17 00:00:00 2001 From: Dries Samyn Date: Mon, 2 Oct 2023 09:25:21 +0100 Subject: [PATCH 05/15] More OSGi nonsense. --- applications/examples/sandbox-app/build.gradle | 1 + libs/db/db-orm-impl/build.gradle | 5 +++++ processors/db-processor/build.gradle | 6 ------ processors/flow-mapper-processor/build.gradle | 1 + processors/flow-processor/build.gradle | 1 + 5 files changed, 8 insertions(+), 6 deletions(-) diff --git a/applications/examples/sandbox-app/build.gradle b/applications/examples/sandbox-app/build.gradle index 786cfd556f5..6c10a2c33f8 100644 --- a/applications/examples/sandbox-app/build.gradle +++ b/applications/examples/sandbox-app/build.gradle @@ -65,6 +65,7 @@ dependencies { runtimeOnly project(':libs:messaging:db-message-bus-impl') runtimeOnly project(':libs:serialization:serialization-checkpoint-api') runtimeOnly project(':libs:serialization:serialization-kryo') + runtimeOnly project(":libs:state-manager:state-manager-db-impl") runtimeOnly project(':testing:group-policy-test-common') runtimeOnly project(':libs:web:web-impl') diff --git a/libs/db/db-orm-impl/build.gradle b/libs/db/db-orm-impl/build.gradle index 04a8fdcb217..b657dfcca83 100644 --- a/libs/db/db-orm-impl/build.gradle +++ b/libs/db/db-orm-impl/build.gradle @@ -27,6 +27,11 @@ dependencies { exclude group: 'org.osgi' } runtimeOnly project(':libs:antlr') + runtimeOnly "com.sun.activation:javax.activation:$activationVersion" + runtimeOnly "org.apache.aries.spifly:org.apache.aries.spifly.dynamic.framework.extension:$ariesDynamicFrameworkExtensionVersion" + runtimeOnly "org.liquibase:liquibase-core:$liquibaseVersion" + // NOTE: this is needed by Liquibase but for some reason not picked up automatically. + runtimeOnly "commons-beanutils:commons-beanutils:$beanutilsVersion" implementation project(":libs:db:db-core") implementation project(":libs:db:db-orm") diff --git a/processors/db-processor/build.gradle b/processors/db-processor/build.gradle index 560398dce3f..5f87ac291a8 100644 --- a/processors/db-processor/build.gradle +++ b/processors/db-processor/build.gradle @@ -101,12 +101,6 @@ dependencies { runtimeOnly project(':libs:schema-registry:schema-registry-impl') runtimeOnly project(":libs:web:web-impl") - runtimeOnly "com.sun.activation:javax.activation:$activationVersion" - runtimeOnly "org.apache.aries.spifly:org.apache.aries.spifly.dynamic.framework.extension:$ariesDynamicFrameworkExtensionVersion" - runtimeOnly "org.liquibase:liquibase-core:$liquibaseVersion" - // NOTE: this is needed by Liquibase but for some reason not picked up automatically. - runtimeOnly "commons-beanutils:commons-beanutils:$beanutilsVersion" - testRuntimeOnly "org.postgresql:postgresql:$postgresDriverVersion" testImplementation "org.assertj:assertj-core:$assertjVersion" diff --git a/processors/flow-mapper-processor/build.gradle b/processors/flow-mapper-processor/build.gradle index 4e55722062f..1441efb4245 100644 --- a/processors/flow-mapper-processor/build.gradle +++ b/processors/flow-mapper-processor/build.gradle @@ -50,6 +50,7 @@ dependencies { runtimeOnly project(':libs:crypto:cipher-suite-impl') runtimeOnly project(":libs:crypto:crypto-serialization-impl") runtimeOnly project(':libs:crypto:merkle-impl') + runtimeOnly project(':libs:db:db-orm-impl') runtimeOnly project(":libs:flows:external-event-responses-impl") runtimeOnly project(":libs:flows:flow-api") runtimeOnly project(":libs:flows:session-manager-impl") diff --git a/processors/flow-processor/build.gradle b/processors/flow-processor/build.gradle index 8f099f4b84b..a463888f3d0 100644 --- a/processors/flow-processor/build.gradle +++ b/processors/flow-processor/build.gradle @@ -58,6 +58,7 @@ dependencies { runtimeOnly project(":components:virtual-node:cpk-read-service-impl") runtimeOnly project(":components:flow:flow-mapper-impl") runtimeOnly project(":libs:application:application-impl") + runtimeOnly project(':libs:db:db-orm-impl') runtimeOnly project(":libs:flows:external-event-responses-impl") runtimeOnly project(":libs:flows:flow-api") runtimeOnly project(":libs:flows:session-manager-impl") From 083efc22cfd5350b8b86b63f89506982a2129308 Mon Sep 17 00:00:00 2001 From: Dries Samyn Date: Mon, 2 Oct 2023 10:18:04 +0100 Subject: [PATCH 06/15] - Don't create if the correct config not passed in. - Fix FlowServiceTest --- .../flow/maintenance/FlowMaintenanceImpl.kt | 28 +++++++++++-------- .../maintenance/FlowMaintenanceImplTests.kt | 8 ++++++ .../net/corda/flow/service/FlowServiceTest.kt | 4 ++- 3 files changed, 27 insertions(+), 13 deletions(-) diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenanceImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenanceImpl.kt index cddd0c6df6f..faeee069ac7 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenanceImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenanceImpl.kt @@ -1,6 +1,5 @@ package net.corda.flow.maintenance -import net.corda.flow.service.FlowExecutor import net.corda.libs.configuration.SmartConfig import net.corda.libs.configuration.helper.getConfig import net.corda.libs.statemanager.api.StateManagerFactory @@ -35,19 +34,24 @@ class FlowMaintenanceImpl @Activate constructor( private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass) } - private val coordinator = coordinatorFactory.createCoordinator(::eventHandler) + private val coordinator = coordinatorFactory.createCoordinator(::eventHandler) override fun onConfigChange(config: Map) { - val messagingConfig = config.getConfig(ConfigKeys.MESSAGING_CONFIG) // TODO - fix config key. The state manager has nothing to do with messaging. - val stateManagerConfig = config.getConfig(ConfigKeys.MESSAGING_CONFIG) - coordinator.createManagedResource("FLOW_MAINTENANCE_SUBSCRIPTION") { - subscriptionFactory.createDurableSubscription( - SubscriptionConfig("flow.maintenance.tasks", Schemas.ScheduledTask.SCHEDULED_TASK_TOPIC_FLOW_PROCESSOR), - SessionTimeoutTaskProcessor(stateManagerFactory.create(stateManagerConfig)), - messagingConfig, - null - ) - }.start() + if(config.containsKey(ConfigKeys.MESSAGING_CONFIG)) { + val messagingConfig = config.getConfig(ConfigKeys.MESSAGING_CONFIG) + val stateManagerConfig = config.getConfig(ConfigKeys.MESSAGING_CONFIG) + coordinator.createManagedResource("FLOW_MAINTENANCE_SUBSCRIPTION") { + subscriptionFactory.createDurableSubscription( + SubscriptionConfig( + "flow.maintenance.tasks", + Schemas.ScheduledTask.SCHEDULED_TASK_TOPIC_FLOW_PROCESSOR + ), + SessionTimeoutTaskProcessor(stateManagerFactory.create(stateManagerConfig)), + messagingConfig, + null + ) + }.start() + } } override val isRunning: Boolean diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/FlowMaintenanceImplTests.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/FlowMaintenanceImplTests.kt index 80266cbc0ef..33ed1326776 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/FlowMaintenanceImplTests.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/FlowMaintenanceImplTests.kt @@ -18,6 +18,7 @@ import org.mockito.kotlin.doReturn import org.mockito.kotlin.eq import org.mockito.kotlin.isNull import org.mockito.kotlin.mock +import org.mockito.kotlin.never import org.mockito.kotlin.verify class FlowMaintenanceImplTests { @@ -60,4 +61,11 @@ class FlowMaintenanceImplTests { verify(stateManagerFactory).create(stateManagerConfig) verify(subscription).start() } + + @Test + fun `do nothing when messaging config not sent`() { + val m = FlowMaintenanceImpl(lifecycleCoordinatorFactory, subscriptionFactory, stateManagerFactory) + m.onConfigChange(mapOf("foo" to mock())) + verify(lifecycleCoordinator, never()).createManagedResource(any(), any<() -> Subscription>()) + } } \ No newline at end of file diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/service/FlowServiceTest.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/service/FlowServiceTest.kt index 1a913c77756..cd94b4c9fd5 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/service/FlowServiceTest.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/service/FlowServiceTest.kt @@ -31,7 +31,8 @@ class FlowServiceTest { Arguments.of(LifecycleCoordinatorName.forComponent()), Arguments.of(LifecycleCoordinatorName.forComponent()), Arguments.of(LifecycleCoordinatorName.forComponent()), - Arguments.of(LifecycleCoordinatorName.forComponent()) + Arguments.of(LifecycleCoordinatorName.forComponent()), + Arguments.of(LifecycleCoordinatorName.forComponent()), ) } } @@ -147,6 +148,7 @@ class FlowServiceTest { addDependency() addDependency() addDependency() + addDependency() FlowService( coordinatorFactory, From 297c5ba3294073bf9be7ebb70df4b6ff8ad9d6d8 Mon Sep 17 00:00:00 2001 From: Dries Samyn Date: Mon, 2 Oct 2023 16:18:14 +0100 Subject: [PATCH 07/15] - SessionTimeoutTaskProcessor integration with new topic and AVRO object. - Fix Metadata so it's an immutable map. - Add tests for Metadata checks. --- .../executor/ScheduledTaskHandlerTest.kt | 8 +--- .../flow/maintenance/FlowMaintenanceImpl.kt | 2 +- .../SessionTimeoutTaskProcessor.kt | 17 +++++-- .../SessionTimeoutTaskProcessorTests.kt | 28 ++++++++--- gradle.properties | 3 +- .../corda/libs/statemanager/api/Metadata.kt | 45 ++++++++---------- .../corda/libs/statemanager/api/Metadata.kt | 47 +++++++++++++++++++ 7 files changed, 106 insertions(+), 44 deletions(-) create mode 100644 libs/state-manager/state-manager-api/src/test/kotlin/net/corda/libs/statemanager/api/Metadata.kt diff --git a/components/flow/flow-mapper-service/src/test/kotlin/net/corda/session/mapper/service/executor/ScheduledTaskHandlerTest.kt b/components/flow/flow-mapper-service/src/test/kotlin/net/corda/session/mapper/service/executor/ScheduledTaskHandlerTest.kt index c3869966351..2d47e0b068a 100644 --- a/components/flow/flow-mapper-service/src/test/kotlin/net/corda/session/mapper/service/executor/ScheduledTaskHandlerTest.kt +++ b/components/flow/flow-mapper-service/src/test/kotlin/net/corda/session/mapper/service/executor/ScheduledTaskHandlerTest.kt @@ -80,16 +80,12 @@ class ScheduledTaskHandlerTest { private fun createStateEntry( key: String, lastUpdated: Instant, - mapperState: FlowMapperStateType? + mapperState: FlowMapperStateType ): Pair { - val metadata = metadata() - mapperState?.let { - metadata.put(FLOW_MAPPER_STATUS, it.toString()) - } val state = State( key, byteArrayOf(), - metadata = metadata, + metadata = metadata(FLOW_MAPPER_STATUS to mapperState.toString()), modifiedTime = lastUpdated ) return Pair(key, state) diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenanceImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenanceImpl.kt index faeee069ac7..7b8725fb26d 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenanceImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenanceImpl.kt @@ -36,7 +36,7 @@ class FlowMaintenanceImpl @Activate constructor( private val coordinator = coordinatorFactory.createCoordinator(::eventHandler) override fun onConfigChange(config: Map) { - // TODO - fix config key. The state manager has nothing to do with messaging. + // TODO - fix config key (CORE-17437). if(config.containsKey(ConfigKeys.MESSAGING_CONFIG)) { val messagingConfig = config.getConfig(ConfigKeys.MESSAGING_CONFIG) val stateManagerConfig = config.getConfig(ConfigKeys.MESSAGING_CONFIG) diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessor.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessor.kt index 7d170a8f16f..3ae30fb6c55 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessor.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessor.kt @@ -1,11 +1,13 @@ package net.corda.flow.maintenance +import net.corda.data.flow.FlowTimeout import net.corda.data.scheduler.ScheduledTaskTrigger import net.corda.libs.statemanager.api.Operation import net.corda.libs.statemanager.api.SingleKeyFilter import net.corda.libs.statemanager.api.StateManager import net.corda.messaging.api.processor.DurableProcessor import net.corda.messaging.api.records.Record +import net.corda.schema.Schemas.Flow.FLOW_TIMEOUT_TOPIC import net.corda.schema.Schemas.ScheduledTask import org.slf4j.LoggerFactory import java.time.Instant @@ -16,6 +18,8 @@ class SessionTimeoutTaskProcessor( ) : DurableProcessor { companion object { private val logger = LoggerFactory.getLogger(SessionTimeoutTaskProcessor::class.java) + // TODO - this may need to move out somewhere else. + const val STATE_META_SESSION_EXPIRY_KEY = "session.expiry" } override val keyClass: Class get() = String::class.java @@ -30,16 +34,21 @@ class SessionTimeoutTaskProcessor( // TODO - we must be able to specify additional filters so we can limit to selecting those sessions that are still open // TODO - we must be able to limit by type of state val checkpoints = stateManager.find( - SingleKeyFilter("session.expiry", Operation.LesserThan, now()) + SingleKeyFilter(STATE_META_SESSION_EXPIRY_KEY, Operation.LesserThan, now().epochSecond) ) if (checkpoints.isEmpty()) { logger.trace("No flows to time out") emptyList() } else { - // TODO - return an avro message (schema TBC) for each checkpoint - // TODO - define topic to publish message on + // TODO - take log message out when everything plumbed in. logger.info("Trigger cleanup of $checkpoints") - emptyList() + checkpoints.map { kvp -> + Record(FLOW_TIMEOUT_TOPIC, kvp.key, + FlowTimeout( + kvp.value.key, + Instant.ofEpochSecond(kvp.value.metadata[STATE_META_SESSION_EXPIRY_KEY] as Long)) + ) + } } } ?: emptyList() } diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessorTests.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessorTests.kt index 3d8ed1cb03e..2058eecc56b 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessorTests.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessorTests.kt @@ -1,13 +1,14 @@ package net.corda.flow.maintenance +import net.corda.data.flow.FlowTimeout import net.corda.data.scheduler.ScheduledTaskTrigger +import net.corda.flow.maintenance.SessionTimeoutTaskProcessor.Companion.STATE_META_SESSION_EXPIRY_KEY import net.corda.libs.statemanager.api.Metadata import net.corda.libs.statemanager.api.State import net.corda.libs.statemanager.api.StateManager import net.corda.messaging.api.records.Record import net.corda.schema.Schemas import org.assertj.core.api.Assertions.assertThat -import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.mockito.internal.verification.Times import org.mockito.kotlin.any @@ -19,9 +20,15 @@ import org.mockito.kotlin.whenever import java.time.Instant class SessionTimeoutTaskProcessorTests { - private val state = State("foo", randomBytes(), 0, Metadata()) + private val now = Instant.now() + private val state1 = + State( + "foo", + randomBytes(), + 0, + Metadata(mapOf(STATE_META_SESSION_EXPIRY_KEY to now.minusSeconds(1).epochSecond))) private val states = mapOf( - "foo" to state + state1.key to state1, ) private val stateManager = mock { on { find(any()) } doReturn (states) @@ -30,7 +37,7 @@ class SessionTimeoutTaskProcessorTests { Schemas.ScheduledTask.SCHEDULED_TASK_NAME_SESSION_TIMEOUT, Schemas.ScheduledTask.SCHEDULED_TASK_NAME_SESSION_TIMEOUT, mock()) - private val now = Instant.now() + @Test fun `when empty list do nothing`() { val processor = SessionTimeoutTaskProcessor(stateManager) { now } @@ -55,16 +62,23 @@ class SessionTimeoutTaskProcessorTests { } @Test - @Disabled fun `when state found return`() { - whenever(stateManager.find(any())).doReturn(emptyMap()) val processor = SessionTimeoutTaskProcessor(stateManager) { now } val output = processor.onNext(listOf(record1)) - assertThat(output).isNotEmpty + assertThat(output).containsExactly( + Record( + Schemas.Flow.FLOW_TIMEOUT_TOPIC, + state1.key, + FlowTimeout( + state1.key, + Instant.ofEpochSecond(state1.metadata[STATE_META_SESSION_EXPIRY_KEY] as Long)) + ) + ) } @Test fun `when no states found return empty`() { + whenever(stateManager.find(any())).doReturn(emptyMap()) val processor = SessionTimeoutTaskProcessor(stateManager) { now } val output = processor.onNext(listOf(record1)) // TODO - better assertion when integrated diff --git a/gradle.properties b/gradle.properties index 54f03b14601..a21e985c8b1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -46,7 +46,8 @@ commonsTextVersion = 1.10.0 bouncycastleVersion=1.73 # Corda API libs revision (change in 4th digit indicates a breaking change) # Change to 5.0.0.xx-SNAPSHOT to pick up maven local published copy -cordaApiVersion=5.1.0.26-beta+ +#cordaApiVersion=5.1.0.26-beta+ +cordaApiVersion=5.1.0.26-alpha-1696254653687 disruptorVersion=3.4.4 felixConfigAdminVersion=1.9.26 diff --git a/libs/state-manager/state-manager-api/src/main/kotlin/net/corda/libs/statemanager/api/Metadata.kt b/libs/state-manager/state-manager-api/src/main/kotlin/net/corda/libs/statemanager/api/Metadata.kt index aa66c4a57e4..5099f2bf0fb 100644 --- a/libs/state-manager/state-manager-api/src/main/kotlin/net/corda/libs/statemanager/api/Metadata.kt +++ b/libs/state-manager/state-manager-api/src/main/kotlin/net/corda/libs/statemanager/api/Metadata.kt @@ -4,28 +4,26 @@ package net.corda.libs.statemanager.api * Mutable map that allows only primitive types to be used as values. */ class Metadata( - private val map: MutableMap = mutableMapOf() -) : MutableMap by map { - - private val supportedType = listOf( - String::class.java, - java.lang.String::class.java, - Number::class.java, - java.lang.Number::class.java, - Boolean::class.java, - java.lang.Boolean::class.java, - ) - - private fun isPrimitiveOrBoxedValue(value: Any): Boolean { - return supportedType.any { it.isAssignableFrom(value.javaClass) } + private val map: Map = emptyMap() +) : Map by map { + companion object { + private val supportedType = listOf( + String::class.java, + java.lang.String::class.java, + Number::class.java, + java.lang.Number::class.java, + Boolean::class.java, + java.lang.Boolean::class.java, + ) + private fun isPrimitiveOrBoxedValue(value: Any): Boolean { + return supportedType.any { it.isAssignableFrom(value.javaClass) } + } } - - override fun put(key: String, value: Any): Any? { - if (!isPrimitiveOrBoxedValue(value)) { - throw IllegalArgumentException("Type not supported: ${value::class}") + init { + map.filter { kvp -> !isPrimitiveOrBoxedValue(kvp.value) }.takeIf { it.isNotEmpty() }?.also { kvp -> + val invalidPairs = kvp.entries.joinToString { "${it.key}/${it.value::class.java.name}" } + throw IllegalArgumentException("Type(s) not supported: $invalidPairs") } - - return map.put(key, value) } override fun equals(other: Any?): Boolean { @@ -35,18 +33,15 @@ class Metadata( other as Metadata if (map != other.map) return false - if (supportedType != other.supportedType) return false return true } override fun hashCode(): Int { - var result = map.hashCode() - result = 31 * result + supportedType.hashCode() - return result + return map.hashCode() } } fun metadata(): Metadata = Metadata() -fun metadata(vararg pairs: Pair): Metadata = Metadata(mutableMapOf(*pairs)) +fun metadata(vararg pairs: Pair): Metadata = Metadata(mapOf(*pairs)) diff --git a/libs/state-manager/state-manager-api/src/test/kotlin/net/corda/libs/statemanager/api/Metadata.kt b/libs/state-manager/state-manager-api/src/test/kotlin/net/corda/libs/statemanager/api/Metadata.kt new file mode 100644 index 00000000000..8f07513fe3c --- /dev/null +++ b/libs/state-manager/state-manager-api/src/test/kotlin/net/corda/libs/statemanager/api/Metadata.kt @@ -0,0 +1,47 @@ +package net.corda.libs.statemanager.api + +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.junit.jupiter.api.assertThrows +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource +import java.util.stream.Stream + +class MetadataTests { + companion object { + @JvmStatic + private fun acceptedTypes(): Stream = + Stream.of( + "foo", + 123, + true + ) + } + @ParameterizedTest + @MethodSource("acceptedTypes") + fun `accept primitive types`(value: Any) { + assertDoesNotThrow { + Metadata(mapOf("foo" to value)) + } + } + + @Test + fun `fail all non-primitive types`() { + val list = listOf("Na Na Na Na Na Na Na Na", "Batman") + val ex = assertThrows { + Metadata(mapOf("joker" to Superman(1000), "batman" to list)) + } + assertThat(ex).hasMessageContainingAll("joker", "batman", Superman::class.java.name, list.javaClass.name) + } + + @Test + fun `equals works as expected with map`() { + val meta1 = Metadata(mapOf("foo" to "bar")) + val meta2 = Metadata(mapOf("foo" to "bar")) + assertThat(meta2).isEqualTo(meta1) + assertThat(meta2).isNotSameAs(meta1) + } + + data class Superman(val kudos: Int) +} \ No newline at end of file From 1e472098b4a805c062a60aa26cf8a509eb062df3 Mon Sep 17 00:00:00 2001 From: Dries Samyn Date: Mon, 2 Oct 2023 17:03:48 +0100 Subject: [PATCH 08/15] - Revert api version - detekt fix --- gradle.properties | 3 +-- .../libs/statemanager/api/{Metadata.kt => MetadataTests.kt} | 0 2 files changed, 1 insertion(+), 2 deletions(-) rename libs/state-manager/state-manager-api/src/test/kotlin/net/corda/libs/statemanager/api/{Metadata.kt => MetadataTests.kt} (100%) diff --git a/gradle.properties b/gradle.properties index a21e985c8b1..54f03b14601 100644 --- a/gradle.properties +++ b/gradle.properties @@ -46,8 +46,7 @@ commonsTextVersion = 1.10.0 bouncycastleVersion=1.73 # Corda API libs revision (change in 4th digit indicates a breaking change) # Change to 5.0.0.xx-SNAPSHOT to pick up maven local published copy -#cordaApiVersion=5.1.0.26-beta+ -cordaApiVersion=5.1.0.26-alpha-1696254653687 +cordaApiVersion=5.1.0.26-beta+ disruptorVersion=3.4.4 felixConfigAdminVersion=1.9.26 diff --git a/libs/state-manager/state-manager-api/src/test/kotlin/net/corda/libs/statemanager/api/Metadata.kt b/libs/state-manager/state-manager-api/src/test/kotlin/net/corda/libs/statemanager/api/MetadataTests.kt similarity index 100% rename from libs/state-manager/state-manager-api/src/test/kotlin/net/corda/libs/statemanager/api/Metadata.kt rename to libs/state-manager/state-manager-api/src/test/kotlin/net/corda/libs/statemanager/api/MetadataTests.kt From c7979daa46ce759fc3082ccf1d7bfe584dafdcbc Mon Sep 17 00:00:00 2001 From: Dries Samyn Date: Mon, 2 Oct 2023 17:16:28 +0100 Subject: [PATCH 09/15] - Add postgres driver --- .../workers/release/flow-worker/build.gradle | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/applications/workers/release/flow-worker/build.gradle b/applications/workers/release/flow-worker/build.gradle index e18aa05dc35..114dab31af7 100644 --- a/applications/workers/release/flow-worker/build.gradle +++ b/applications/workers/release/flow-worker/build.gradle @@ -51,4 +51,19 @@ dependencies { exclude group: 'org.apache.felix' exclude group: 'org.osgi' } + + // This puts the jdbc driver into the docker image in the /opt/jdbc-driver folder + // this folder can contain many jdbc drivers (and DataSourceFactory provider bundles). + // Postgres doesn't need a DataSourceFactory provider bundle (e.g. pax-jdbc), because + // the postgres devs have written their own and it's in this jar (PGDataSourceFactory). + dockerImageJdbc "org.postgresql:postgresql:$postgresDriverVersion" + + // If we were to do this for a different database that is *not natively an OSGi bundle* + // we would need the wrapped OSGi bundle version and the pax-jdbc loader, i.e. + // + // dockerImageJdbc "org.ops4j.pax.jdbc:pax-jdbc-VENDOR:1.5.3" + // dockerImageJdbc "com.VENDOR.database.jdbc:vendor-jdbc-WRAPPED-AS-A-BUNDLE:$vendorVersion" + // + // NOTE: PLEASE MAKE SURE NOT TO PUBLISH A DOCKER IMAGE PUBLICLY WITH THESE WRAPPED DRIVERS, + // UNLESS ABSOLUTELY SURE WE CAN DISTRIBUTE IT!! } From 943995f8f4f0534a573a3bcebd80850ded04e097 Mon Sep 17 00:00:00 2001 From: Dries Samyn Date: Mon, 2 Oct 2023 18:19:32 +0100 Subject: [PATCH 10/15] Fix json deserialiser & add test --- .../libs/statemanager/impl/StateManagerImpl.kt | 10 +++++----- .../statemanager/impl/StateManagerImplTest.kt | 15 +++++++++++++++ 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/libs/state-manager/state-manager-db-impl/src/main/kotlin/net/corda/libs/statemanager/impl/StateManagerImpl.kt b/libs/state-manager/state-manager-db-impl/src/main/kotlin/net/corda/libs/statemanager/impl/StateManagerImpl.kt index f299b7e796e..a76350bb1d3 100644 --- a/libs/state-manager/state-manager-db-impl/src/main/kotlin/net/corda/libs/statemanager/impl/StateManagerImpl.kt +++ b/libs/state-manager/state-manager-db-impl/src/main/kotlin/net/corda/libs/statemanager/impl/StateManagerImpl.kt @@ -1,7 +1,7 @@ package net.corda.libs.statemanager.impl -import com.fasterxml.jackson.core.type.TypeReference import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue import net.corda.libs.statemanager.api.IntervalFilter import net.corda.libs.statemanager.api.Metadata import net.corda.libs.statemanager.api.SingleKeyFilter @@ -30,10 +30,7 @@ class StateManagerImpl( StateEntity(key, value, objectMapper.writeValueAsString(metadata), version, modifiedTime) private fun StateEntity.fromPersistentEntity() = - State(key, value, version, metadata.toMetadataMap(), modifiedTime) - - private fun String.toMetadataMap() = - objectMapper.readValue(this, object : TypeReference() {}) + State(key, value, version, objectMapper.convertToMetadata(metadata), modifiedTime) internal fun checkVersionAndPrepareEntitiesForPersistence( states: Collection, @@ -153,3 +150,6 @@ class StateManagerImpl( entityManagerFactory.close() } } + +fun ObjectMapper.convertToMetadata(json: String) = + Metadata(this.readValue(json)) \ No newline at end of file diff --git a/libs/state-manager/state-manager-db-impl/src/test/kotlin/net/corda/libs/statemanager/impl/StateManagerImplTest.kt b/libs/state-manager/state-manager-db-impl/src/test/kotlin/net/corda/libs/statemanager/impl/StateManagerImplTest.kt index 2c42c4d2ee7..a089b04f655 100644 --- a/libs/state-manager/state-manager-db-impl/src/test/kotlin/net/corda/libs/statemanager/impl/StateManagerImplTest.kt +++ b/libs/state-manager/state-manager-db-impl/src/test/kotlin/net/corda/libs/statemanager/impl/StateManagerImplTest.kt @@ -1,5 +1,6 @@ package net.corda.libs.statemanager.impl +import com.fasterxml.jackson.databind.ObjectMapper import net.corda.libs.statemanager.api.State import net.corda.libs.statemanager.api.metadata import net.corda.libs.statemanager.impl.model.v1.StateEntity @@ -144,4 +145,18 @@ class StateManagerImplTest { verify(stateRepository).delete(entityManager, listOf(persistentStateOne.key, persistentStateFour.key)) verifyNoMoreInteractions(stateRepository) } + + @Test + fun convertJson() { + val str = """ + { + "foo": "bar", + "hello": 123 + } + """.trimIndent() + + val meta = ObjectMapper().convertToMetadata(str) + assertThat(meta["foo"]).isEqualTo("bar") + assertThat(meta["hello"]).isEqualTo(123) + } } From 5171e1c5cfcef8b157f6c4fd8a9f3a73738d50a6 Mon Sep 17 00:00:00 2001 From: Dries Samyn Date: Mon, 2 Oct 2023 18:37:32 +0100 Subject: [PATCH 11/15] Fix json deserialiser & add test --- .../statemanager/impl/tests/StateManagerIntegrationTest.kt | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/libs/state-manager/state-manager-db-impl/src/integrationTest/kotlin/net/corda/libs/statemanager/impl/tests/StateManagerIntegrationTest.kt b/libs/state-manager/state-manager-db-impl/src/integrationTest/kotlin/net/corda/libs/statemanager/impl/tests/StateManagerIntegrationTest.kt index 048386e618e..1f6ec9eaacb 100644 --- a/libs/state-manager/state-manager-db-impl/src/integrationTest/kotlin/net/corda/libs/statemanager/impl/tests/StateManagerIntegrationTest.kt +++ b/libs/state-manager/state-manager-db-impl/src/integrationTest/kotlin/net/corda/libs/statemanager/impl/tests/StateManagerIntegrationTest.kt @@ -1,6 +1,5 @@ package net.corda.libs.statemanager.impl.tests -import com.fasterxml.jackson.core.type.TypeReference import com.fasterxml.jackson.databind.ObjectMapper import net.corda.db.admin.impl.ClassloaderChangeLog import net.corda.db.admin.impl.LiquibaseSchemaMigratorImpl @@ -14,6 +13,7 @@ import net.corda.libs.statemanager.api.State import net.corda.libs.statemanager.api.StateManager import net.corda.libs.statemanager.api.metadata import net.corda.libs.statemanager.impl.StateManagerImpl +import net.corda.libs.statemanager.impl.convertToMetadata import net.corda.libs.statemanager.impl.model.v1.StateEntity import net.corda.libs.statemanager.impl.model.v1.StateManagerEntities import net.corda.libs.statemanager.impl.repository.impl.KEY_PARAMETER_NAME @@ -76,9 +76,6 @@ class StateManagerIntegrationTest { private val stateManager: StateManager = StateManagerImpl(StateRepositoryImpl(queryProvider), entityManagerFactoryFactory) - private fun ObjectMapper.toMetadata(metadata: String) = - this.readValue(metadata, object : TypeReference() {}) - private fun cleanStates() = entityManagerFactoryFactory.createEntityManager().transaction { it.createNativeQuery("DELETE FROM state s WHERE s.key LIKE '%$testUniqueId%'").executeUpdate() it.flush() @@ -129,7 +126,7 @@ class StateManagerIntegrationTest { it.assertThat(loadedEntity.modifiedTime).isNotNull it.assertThat(loadedEntity.version).isEqualTo(version(i, key)) it.assertThat(loadedEntity.value).isEqualTo((stateContent(i, key).toByteArray())) - it.assertThat(objectMapper.toMetadata(loadedEntity.metadata)) + it.assertThat(objectMapper.convertToMetadata(loadedEntity.metadata)) .containsExactlyInAnyOrderEntriesOf(metadataContent(i, key)) } } From 8fcf4c48b49a8e6396e40dd89d3a0206f2d5f26b Mon Sep 17 00:00:00 2001 From: Dries Samyn Date: Tue, 3 Oct 2023 10:01:17 +0100 Subject: [PATCH 12/15] PR feedback on use of `assertThrows` vs `assertThatThrownBy` --- .../net/corda/libs/statemanager/api/MetadataTests.kt | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/libs/state-manager/state-manager-api/src/test/kotlin/net/corda/libs/statemanager/api/MetadataTests.kt b/libs/state-manager/state-manager-api/src/test/kotlin/net/corda/libs/statemanager/api/MetadataTests.kt index 8f07513fe3c..337b3130483 100644 --- a/libs/state-manager/state-manager-api/src/test/kotlin/net/corda/libs/statemanager/api/MetadataTests.kt +++ b/libs/state-manager/state-manager-api/src/test/kotlin/net/corda/libs/statemanager/api/MetadataTests.kt @@ -1,9 +1,9 @@ package net.corda.libs.statemanager.api import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow -import org.junit.jupiter.api.assertThrows import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.MethodSource import java.util.stream.Stream @@ -29,10 +29,9 @@ class MetadataTests { @Test fun `fail all non-primitive types`() { val list = listOf("Na Na Na Na Na Na Na Na", "Batman") - val ex = assertThrows { - Metadata(mapOf("joker" to Superman(1000), "batman" to list)) - } - assertThat(ex).hasMessageContainingAll("joker", "batman", Superman::class.java.name, list.javaClass.name) + assertThatThrownBy { Metadata(mapOf("joker" to Superman(1000), "batman" to list)) } + .isExactlyInstanceOf(IllegalArgumentException::class.java) + .hasMessageContainingAll("joker", "batman", Superman::class.java.name, list.javaClass.name) } @Test From 9516cbb2f86791f74f2835e363cbe12ae7373519 Mon Sep 17 00:00:00 2001 From: Dries Samyn Date: Tue, 3 Oct 2023 12:15:36 +0100 Subject: [PATCH 13/15] PR feedback --- .../examples/sandbox-app/build.gradle | 2 +- components/flow/flow-service/build.gradle | 2 +- .../ledger/ledger-common-flow/build.gradle | 2 +- .../ledger-consensual-flow/build.gradle | 2 +- .../ledger/ledger-utxo-flow/build.gradle | 2 +- .../corda/libs/statemanager/api/Metadata.kt | 2 +- .../libs/statemanager/api/MetadataTests.kt | 7 ++ processors/flow-mapper-processor/build.gradle | 54 ++++++++-------- processors/flow-processor/build.gradle | 64 +++++++++---------- 9 files changed, 72 insertions(+), 65 deletions(-) diff --git a/applications/examples/sandbox-app/build.gradle b/applications/examples/sandbox-app/build.gradle index 6c10a2c33f8..bb2edcab25c 100644 --- a/applications/examples/sandbox-app/build.gradle +++ b/applications/examples/sandbox-app/build.gradle @@ -65,7 +65,7 @@ dependencies { runtimeOnly project(':libs:messaging:db-message-bus-impl') runtimeOnly project(':libs:serialization:serialization-checkpoint-api') runtimeOnly project(':libs:serialization:serialization-kryo') - runtimeOnly project(":libs:state-manager:state-manager-db-impl") + runtimeOnly project(':libs:state-manager:state-manager-db-impl') runtimeOnly project(':testing:group-policy-test-common') runtimeOnly project(':libs:web:web-impl') diff --git a/components/flow/flow-service/build.gradle b/components/flow/flow-service/build.gradle index 93cac3f5011..e1f2bc3c398 100644 --- a/components/flow/flow-service/build.gradle +++ b/components/flow/flow-service/build.gradle @@ -91,7 +91,7 @@ dependencies { integrationTestRuntimeOnly project(':libs:messaging:messaging-impl') integrationTestRuntimeOnly project(':libs:serialization:serialization-checkpoint-api') integrationTestRuntimeOnly project(':libs:serialization:serialization-kryo') - integrationTestRuntimeOnly project(":libs:state-manager:state-manager-db-impl") + integrationTestRuntimeOnly project(':libs:state-manager:state-manager-db-impl') integrationTestRuntimeOnly project(':components:membership:membership-group-read-impl') integrationTestRuntimeOnly project(':components:virtual-node:cpk-read-service-impl') diff --git a/components/ledger/ledger-common-flow/build.gradle b/components/ledger/ledger-common-flow/build.gradle index a1ccbcf7405..870d35a2e3d 100644 --- a/components/ledger/ledger-common-flow/build.gradle +++ b/components/ledger/ledger-common-flow/build.gradle @@ -50,7 +50,7 @@ dependencies { integrationTestRuntimeOnly project(':libs:messaging:db-message-bus-impl') integrationTestRuntimeOnly project(':libs:messaging:messaging-impl') integrationTestRuntimeOnly project(':libs:flows:session-manager-impl') - integrationTestRuntimeOnly project(":libs:state-manager:state-manager-db-impl") + integrationTestRuntimeOnly project(':libs:state-manager:state-manager-db-impl') integrationTestRuntimeOnly "org.apache.aries.spifly:org.apache.aries.spifly.dynamic.framework.extension:$ariesDynamicFrameworkExtensionVersion" cpis project(path: ':testing:ledger:ledger-common-empty-app', configuration: 'cordaCPB') diff --git a/components/ledger/ledger-consensual-flow/build.gradle b/components/ledger/ledger-consensual-flow/build.gradle index 84a9996622d..1d5e368bd22 100644 --- a/components/ledger/ledger-consensual-flow/build.gradle +++ b/components/ledger/ledger-consensual-flow/build.gradle @@ -52,7 +52,7 @@ dependencies { integrationTestRuntimeOnly project(':libs:messaging:messaging-impl') integrationTestRuntimeOnly project(':libs:lifecycle:lifecycle-impl') integrationTestRuntimeOnly project(':libs:flows:session-manager-impl') - integrationTestRuntimeOnly project(":libs:state-manager:state-manager-db-impl") + integrationTestRuntimeOnly project(':libs:state-manager:state-manager-db-impl') cpis project(path: ':testing:ledger:ledger-consensual-state-app', configuration: 'cordaCPB') } diff --git a/components/ledger/ledger-utxo-flow/build.gradle b/components/ledger/ledger-utxo-flow/build.gradle index 33e0f595e8c..c7b937da4cf 100644 --- a/components/ledger/ledger-utxo-flow/build.gradle +++ b/components/ledger/ledger-utxo-flow/build.gradle @@ -73,7 +73,7 @@ dependencies { integrationTestRuntimeOnly project(':libs:lifecycle:lifecycle-impl') integrationTestRuntimeOnly project(':libs:flows:session-manager-impl') integrationTestRuntimeOnly project(':libs:membership:membership-impl') - integrationTestRuntimeOnly project(":libs:state-manager:state-manager-db-impl") + integrationTestRuntimeOnly project(':libs:state-manager:state-manager-db-impl') cpis project(path: ':testing:ledger:ledger-utxo-state-app', configuration: 'cordaCPB') } diff --git a/libs/state-manager/state-manager-api/src/main/kotlin/net/corda/libs/statemanager/api/Metadata.kt b/libs/state-manager/state-manager-api/src/main/kotlin/net/corda/libs/statemanager/api/Metadata.kt index 5099f2bf0fb..70746c6eb33 100644 --- a/libs/state-manager/state-manager-api/src/main/kotlin/net/corda/libs/statemanager/api/Metadata.kt +++ b/libs/state-manager/state-manager-api/src/main/kotlin/net/corda/libs/statemanager/api/Metadata.kt @@ -1,7 +1,7 @@ package net.corda.libs.statemanager.api /** - * Mutable map that allows only primitive types to be used as values. + * Map that allows only primitive types to be used as values. */ class Metadata( private val map: Map = emptyMap() diff --git a/libs/state-manager/state-manager-api/src/test/kotlin/net/corda/libs/statemanager/api/MetadataTests.kt b/libs/state-manager/state-manager-api/src/test/kotlin/net/corda/libs/statemanager/api/MetadataTests.kt index 337b3130483..c1215394581 100644 --- a/libs/state-manager/state-manager-api/src/test/kotlin/net/corda/libs/statemanager/api/MetadataTests.kt +++ b/libs/state-manager/state-manager-api/src/test/kotlin/net/corda/libs/statemanager/api/MetadataTests.kt @@ -42,5 +42,12 @@ class MetadataTests { assertThat(meta2).isNotSameAs(meta1) } + @Test + fun `new meta with additional elements`() { + val meta1 = Metadata(mapOf("foo" to "bar")) + assertThat(meta1.plus("batman" to "joker")) + .containsExactlyInAnyOrderEntriesOf(mapOf("foo" to "bar", "batman" to "joker")) + } + data class Superman(val kudos: Int) } \ No newline at end of file diff --git a/processors/flow-mapper-processor/build.gradle b/processors/flow-mapper-processor/build.gradle index 1441efb4245..c683d9cb6b9 100644 --- a/processors/flow-mapper-processor/build.gradle +++ b/processors/flow-mapper-processor/build.gradle @@ -17,49 +17,49 @@ dependencies { implementation 'net.corda:corda-ledger-utxo' implementation 'net.corda:corda-notary-plugin' - implementation project(":components:configuration:configuration-read-service") - implementation project(":components:flow:flow-mapper-service") - implementation project(":components:flow:flow-p2p-filter-service") - implementation project(":components:membership:locally-hosted-identities-service") + implementation project(':components:configuration:configuration-read-service') + implementation project(':components:flow:flow-mapper-service') + implementation project(':components:flow:flow-p2p-filter-service') + implementation project(':components:membership:locally-hosted-identities-service') implementation project(':components:membership:group-policy') - implementation project(":components:membership:membership-group-read") - implementation project(":components:membership:membership-persistence-client") - implementation project(":components:virtual-node:cpi-info-read-service") - implementation project(":components:virtual-node:virtual-node-info-read-service") - implementation project(":libs:lifecycle:lifecycle") + implementation project(':components:membership:membership-group-read') + implementation project(':components:membership:membership-persistence-client') + implementation project(':components:virtual-node:cpi-info-read-service') + implementation project(':components:virtual-node:virtual-node-info-read-service') + implementation project(':libs:lifecycle:lifecycle') implementation project(":libs:messaging:messaging") implementation project(':libs:utilities') - implementation project(":notary-plugins:notary-plugin-common") + implementation project(':notary-plugins:notary-plugin-common') runtimeOnly project(":components:configuration:configuration-read-service-impl") runtimeOnly project(':libs:ledger:ledger-common-data') runtimeOnly project(":components:ledger:notary-worker-selection-impl") runtimeOnly project(':libs:ledger:ledger-consensual-data') runtimeOnly project(':libs:ledger:ledger-utxo-data') - runtimeOnly project(":libs:crypto:crypto-serialization-impl") + runtimeOnly project(':libs:crypto:crypto-serialization-impl') runtimeOnly project(':libs:crypto:merkle-impl') - runtimeOnly project(":components:configuration:configuration-read-service-impl") - runtimeOnly project(":components:flow:flow-mapper-impl") - runtimeOnly project(":components:membership:locally-hosted-identities-service-impl") - runtimeOnly project(":components:membership:membership-group-read-impl") - runtimeOnly project(":components:membership:membership-persistence-client-impl") - runtimeOnly project(":components:membership:group-policy-impl") - runtimeOnly project(":components:uniqueness:uniqueness-checker-client-service-impl") - runtimeOnly project(":components:virtual-node:cpi-info-read-service-impl") - runtimeOnly project(":libs:application:application-impl") + runtimeOnly project(':components:configuration:configuration-read-service-impl') + runtimeOnly project(':components:flow:flow-mapper-impl') + runtimeOnly project(':components:membership:locally-hosted-identities-service-impl') + runtimeOnly project(':components:membership:membership-group-read-impl') + runtimeOnly project(':components:membership:membership-persistence-client-impl') + runtimeOnly project(':components:membership:group-policy-impl') + runtimeOnly project(':components:uniqueness:uniqueness-checker-client-service-impl') + runtimeOnly project(':components:virtual-node:cpi-info-read-service-impl') + runtimeOnly project(':libs:application:application-impl') runtimeOnly project(':libs:crypto:cipher-suite-impl') runtimeOnly project(":libs:crypto:crypto-serialization-impl") runtimeOnly project(':libs:crypto:merkle-impl') runtimeOnly project(':libs:db:db-orm-impl') - runtimeOnly project(":libs:flows:external-event-responses-impl") - runtimeOnly project(":libs:flows:flow-api") - runtimeOnly project(":libs:flows:session-manager-impl") - runtimeOnly project(":libs:messaging:messaging-impl") + runtimeOnly project(':libs:flows:external-event-responses-impl') + runtimeOnly project(':libs:flows:flow-api') + runtimeOnly project(':libs:flows:session-manager-impl') + runtimeOnly project(':libs:messaging:messaging-impl') runtimeOnly project(':libs:ledger:ledger-common-data') runtimeOnly project(':libs:ledger:ledger-consensual-data') runtimeOnly project(':libs:ledger:ledger-utxo-data') - runtimeOnly project(":libs:lifecycle:lifecycle-impl") - runtimeOnly project(":libs:schema-registry:schema-registry-impl") - runtimeOnly project(":libs:state-manager:state-manager-db-impl") + runtimeOnly project(':libs:lifecycle:lifecycle-impl') + runtimeOnly project(':libs:schema-registry:schema-registry-impl') + runtimeOnly project(':libs:state-manager:state-manager-db-impl') } diff --git a/processors/flow-processor/build.gradle b/processors/flow-processor/build.gradle index a463888f3d0..770df86e505 100644 --- a/processors/flow-processor/build.gradle +++ b/processors/flow-processor/build.gradle @@ -17,22 +17,22 @@ dependencies { implementation 'net.corda:corda-ledger-utxo' implementation 'net.corda:corda-notary-plugin' - implementation project(":components:configuration:configuration-read-service") - implementation project(":components:flow:flow-service") + implementation project(':components:configuration:configuration-read-service') + implementation project(':components:flow:flow-service') implementation project(':components:membership:group-policy') implementation project(':components:membership:locally-hosted-identities-service') - implementation project(":components:membership:membership-group-read") - implementation project(":components:membership:membership-persistence-client") - implementation project(":components:ledger:ledger-utxo-token-cache") - implementation project(":components:virtual-node:cpi-info-read-service") - implementation project(":components:virtual-node:cpk-read-service") - implementation project(":components:virtual-node:sandbox-group-context-service") - implementation project(":components:virtual-node:virtual-node-info-read-service") + implementation project(':components:membership:membership-group-read') + implementation project(':components:membership:membership-persistence-client') + implementation project(':components:ledger:ledger-utxo-token-cache') + implementation project(':components:virtual-node:cpi-info-read-service') + implementation project(':components:virtual-node:cpk-read-service') + implementation project(':components:virtual-node:sandbox-group-context-service') + implementation project(':components:virtual-node:virtual-node-info-read-service') implementation project(":libs:lifecycle:lifecycle") implementation project(":libs:messaging:messaging") implementation project(':libs:utilities') implementation project(':libs:virtual-node:sandbox-group-context') - implementation project(":notary-plugins:notary-plugin-common") + implementation project(':notary-plugins:notary-plugin-common') runtimeOnly project(":components:configuration:configuration-read-service-impl") runtimeOnly project(":components:flow:flow-service") @@ -40,37 +40,37 @@ dependencies { runtimeOnly project(':libs:ledger:ledger-common-data') runtimeOnly project(':components:ledger:ledger-consensual-flow') runtimeOnly project(':components:ledger:ledger-utxo-flow') - runtimeOnly project(":components:ledger:notary-worker-selection-impl") + runtimeOnly project(':components:ledger:notary-worker-selection-impl') runtimeOnly project(':libs:ledger:ledger-consensual-data') runtimeOnly project(':components:ledger:ledger-utxo-flow') runtimeOnly project(':libs:ledger:ledger-utxo-data') - runtimeOnly project(":libs:crypto:crypto-serialization-impl") + runtimeOnly project(':libs:crypto:crypto-serialization-impl') runtimeOnly project(':libs:crypto:merkle-impl') runtimeOnly project(':libs:crypto:cipher-suite-impl') - runtimeOnly project(":components:configuration:configuration-read-service-impl") - runtimeOnly project(":components:uniqueness:uniqueness-checker-client-service-impl") - runtimeOnly project(":components:membership:membership-group-read-impl") - runtimeOnly project(":components:membership:membership-persistence-client-impl") - runtimeOnly project(":components:membership:group-policy-impl") - runtimeOnly project(":components:membership:locally-hosted-identities-service-impl") - runtimeOnly project(":components:uniqueness:uniqueness-checker-client-service-impl") - runtimeOnly project(":components:virtual-node:cpi-info-read-service-impl") - runtimeOnly project(":components:virtual-node:cpk-read-service-impl") - runtimeOnly project(":components:flow:flow-mapper-impl") - runtimeOnly project(":libs:application:application-impl") + runtimeOnly project(':components:configuration:configuration-read-service-impl') + runtimeOnly project(':components:uniqueness:uniqueness-checker-client-service-impl') + runtimeOnly project(':components:membership:membership-group-read-impl') + runtimeOnly project(':components:membership:membership-persistence-client-impl') + runtimeOnly project(':components:membership:group-policy-impl') + runtimeOnly project(':components:membership:locally-hosted-identities-service-impl') + runtimeOnly project(':components:uniqueness:uniqueness-checker-client-service-impl') + runtimeOnly project(':components:virtual-node:cpi-info-read-service-impl') + runtimeOnly project(':components:virtual-node:cpk-read-service-impl') + runtimeOnly project(':components:flow:flow-mapper-impl') + runtimeOnly project(':libs:application:application-impl') runtimeOnly project(':libs:db:db-orm-impl') - runtimeOnly project(":libs:flows:external-event-responses-impl") - runtimeOnly project(":libs:flows:flow-api") - runtimeOnly project(":libs:flows:session-manager-impl") - runtimeOnly project(":libs:messaging:messaging-impl") + runtimeOnly project(':libs:flows:external-event-responses-impl') + runtimeOnly project(':libs:flows:flow-api') + runtimeOnly project(':libs:flows:session-manager-impl') + runtimeOnly project(':libs:messaging:messaging-impl') runtimeOnly project(':libs:ledger:ledger-common-data') runtimeOnly project(':libs:ledger:ledger-consensual-data') runtimeOnly project(':libs:ledger:ledger-utxo-data') - runtimeOnly project(":libs:lifecycle:lifecycle-impl") - runtimeOnly project(":libs:sandbox-internal") - runtimeOnly project(":libs:schema-registry:schema-registry-impl") - runtimeOnly project(":libs:serialization:serialization-kryo") - runtimeOnly project(":libs:state-manager:state-manager-db-impl") + runtimeOnly project(':libs:lifecycle:lifecycle-impl') + runtimeOnly project(':libs:sandbox-internal') + runtimeOnly project(':libs:schema-registry:schema-registry-impl') + runtimeOnly project(':libs:serialization:serialization-kryo') + runtimeOnly project(':libs:state-manager:state-manager-db-impl') runtimeOnly project(":libs:web:web-impl") } From cc9d374382e0a8fb58c5be9bb45cd987d56b9959 Mon Sep 17 00:00:00 2001 From: Dries Samyn Date: Tue, 3 Oct 2023 13:25:50 +0100 Subject: [PATCH 14/15] PR feedback --- processors/flow-mapper-processor/build.gradle | 8 ++++---- processors/flow-processor/build.gradle | 10 +++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/processors/flow-mapper-processor/build.gradle b/processors/flow-mapper-processor/build.gradle index c683d9cb6b9..c2431a56ad1 100644 --- a/processors/flow-mapper-processor/build.gradle +++ b/processors/flow-mapper-processor/build.gradle @@ -27,13 +27,13 @@ dependencies { implementation project(':components:virtual-node:cpi-info-read-service') implementation project(':components:virtual-node:virtual-node-info-read-service') implementation project(':libs:lifecycle:lifecycle') - implementation project(":libs:messaging:messaging") + implementation project(':libs:messaging:messaging') implementation project(':libs:utilities') implementation project(':notary-plugins:notary-plugin-common') - runtimeOnly project(":components:configuration:configuration-read-service-impl") + runtimeOnly project(':components:configuration:configuration-read-service-impl') runtimeOnly project(':libs:ledger:ledger-common-data') - runtimeOnly project(":components:ledger:notary-worker-selection-impl") + runtimeOnly project(':components:ledger:notary-worker-selection-impl') runtimeOnly project(':libs:ledger:ledger-consensual-data') runtimeOnly project(':libs:ledger:ledger-utxo-data') runtimeOnly project(':libs:crypto:crypto-serialization-impl') @@ -48,7 +48,7 @@ dependencies { runtimeOnly project(':components:virtual-node:cpi-info-read-service-impl') runtimeOnly project(':libs:application:application-impl') runtimeOnly project(':libs:crypto:cipher-suite-impl') - runtimeOnly project(":libs:crypto:crypto-serialization-impl") + runtimeOnly project(':libs:crypto:crypto-serialization-impl') runtimeOnly project(':libs:crypto:merkle-impl') runtimeOnly project(':libs:db:db-orm-impl') runtimeOnly project(':libs:flows:external-event-responses-impl') diff --git a/processors/flow-processor/build.gradle b/processors/flow-processor/build.gradle index 770df86e505..9582eff8cfc 100644 --- a/processors/flow-processor/build.gradle +++ b/processors/flow-processor/build.gradle @@ -28,14 +28,14 @@ dependencies { implementation project(':components:virtual-node:cpk-read-service') implementation project(':components:virtual-node:sandbox-group-context-service') implementation project(':components:virtual-node:virtual-node-info-read-service') - implementation project(":libs:lifecycle:lifecycle") - implementation project(":libs:messaging:messaging") + implementation project(':libs:lifecycle:lifecycle') + implementation project(':libs:messaging:messaging') implementation project(':libs:utilities') implementation project(':libs:virtual-node:sandbox-group-context') implementation project(':notary-plugins:notary-plugin-common') - runtimeOnly project(":components:configuration:configuration-read-service-impl") - runtimeOnly project(":components:flow:flow-service") + runtimeOnly project(':components:configuration:configuration-read-service-impl') + runtimeOnly project(':components:flow:flow-service') runtimeOnly project(':components:ledger:ledger-common-flow') runtimeOnly project(':libs:ledger:ledger-common-data') runtimeOnly project(':components:ledger:ledger-consensual-flow') @@ -71,6 +71,6 @@ dependencies { runtimeOnly project(':libs:schema-registry:schema-registry-impl') runtimeOnly project(':libs:serialization:serialization-kryo') runtimeOnly project(':libs:state-manager:state-manager-db-impl') - runtimeOnly project(":libs:web:web-impl") + runtimeOnly project(':libs:web:web-impl') } From 3e8193653388b781f145f8eede0b5a0fcc31ce55 Mon Sep 17 00:00:00 2001 From: Dries Samyn Date: Tue, 3 Oct 2023 15:55:38 +0100 Subject: [PATCH 15/15] Fix Liquibase dependency --- libs/db/db-admin-impl/build.gradle | 6 ++++++ libs/db/db-orm-impl/build.gradle | 4 ---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/libs/db/db-admin-impl/build.gradle b/libs/db/db-admin-impl/build.gradle index b1037e2ffac..690e5138f88 100644 --- a/libs/db/db-admin-impl/build.gradle +++ b/libs/db/db-admin-impl/build.gradle @@ -21,6 +21,12 @@ dependencies { api project(":libs:db:db-admin") + runtimeOnly "org.apache.aries.spifly:org.apache.aries.spifly.dynamic.framework.extension:$ariesDynamicFrameworkExtensionVersion" + runtimeOnly "org.liquibase:liquibase-core:$liquibaseVersion" + // NOTE: this is needed by Liquibase but for some reason not picked up automatically. + runtimeOnly "commons-beanutils:commons-beanutils:$beanutilsVersion" + + testImplementation "org.assertj:assertj-core:$assertjVersion" testImplementation "org.mockito:mockito-core:$mockitoVersion" testImplementation "org.mockito.kotlin:mockito-kotlin:$mockitoKotlinVersion" diff --git a/libs/db/db-orm-impl/build.gradle b/libs/db/db-orm-impl/build.gradle index b657dfcca83..792a3ff4be9 100644 --- a/libs/db/db-orm-impl/build.gradle +++ b/libs/db/db-orm-impl/build.gradle @@ -28,10 +28,6 @@ dependencies { } runtimeOnly project(':libs:antlr') runtimeOnly "com.sun.activation:javax.activation:$activationVersion" - runtimeOnly "org.apache.aries.spifly:org.apache.aries.spifly.dynamic.framework.extension:$ariesDynamicFrameworkExtensionVersion" - runtimeOnly "org.liquibase:liquibase-core:$liquibaseVersion" - // NOTE: this is needed by Liquibase but for some reason not picked up automatically. - runtimeOnly "commons-beanutils:commons-beanutils:$beanutilsVersion" implementation project(":libs:db:db-core") implementation project(":libs:db:db-orm")