diff --git a/applications/workers/release/combined-worker/src/main/kotlin/net/corda/applications/workers/combined/CombinedWorker.kt b/applications/workers/release/combined-worker/src/main/kotlin/net/corda/applications/workers/combined/CombinedWorker.kt index 49f238b482b..5bdbeea2fec 100644 --- a/applications/workers/release/combined-worker/src/main/kotlin/net/corda/applications/workers/combined/CombinedWorker.kt +++ b/applications/workers/release/combined-worker/src/main/kotlin/net/corda/applications/workers/combined/CombinedWorker.kt @@ -45,8 +45,8 @@ import net.corda.schema.configuration.BootConfig.BOOT_STATE_MANAGER_DB_USER import net.corda.schema.configuration.BootConfig.BOOT_STATE_MANAGER_JDBC_URL import net.corda.schema.configuration.BootConfig.BOOT_STATE_MANAGER_TYPE import net.corda.schema.configuration.DatabaseConfig -import net.corda.schema.configuration.MessagingConfig import net.corda.schema.configuration.MessagingConfig.Bus.BUS_TYPE +import net.corda.schema.configuration.StateManagerConfig import net.corda.tracing.configureTracing import net.corda.tracing.shutdownTracing import net.corda.web.api.WebServer @@ -224,17 +224,16 @@ class CombinedWorker @Activate constructor( */ private fun prepareStateManagerConfig(stateManagerConfig: Config): Config { val defaultConfig = ConfigFactory.empty() - .withValue(MessagingConfig.StateManager.JDBC_DRIVER, fromAnyRef("org.postgresql.Driver")) - .withValue(MessagingConfig.StateManager.JDBC_PERSISTENCE_UNIT_NAME, fromAnyRef("corda-state-manager")) - .withValue(MessagingConfig.StateManager.JDBC_POOL_MIN_SIZE, fromAnyRef(1)) - .withValue(MessagingConfig.StateManager.JDBC_POOL_MAX_SIZE, fromAnyRef(5)) - .withValue(MessagingConfig.StateManager.JDBC_POOL_IDLE_TIMEOUT_SECONDS, fromAnyRef(Duration.ofMinutes(2).toSeconds())) - .withValue(MessagingConfig.StateManager.JDBC_POOL_MAX_LIFETIME_SECONDS, fromAnyRef(Duration.ofMinutes(30).toSeconds())) - .withValue(MessagingConfig.StateManager.JDBC_POOL_KEEP_ALIVE_TIME_SECONDS, fromAnyRef(Duration.ZERO.toSeconds())) - .withValue( - MessagingConfig.StateManager.JDBC_POOL_VALIDATION_TIMEOUT_SECONDS, fromAnyRef(Duration.ofSeconds(5).toSeconds()) - ) - val stateManagerConfigWithFallback = stateManagerConfig.withFallback(defaultConfig) + .withValue(StateManagerConfig.Database.JDBC_DRIVER, fromAnyRef("org.postgresql.Driver")) + .withValue(StateManagerConfig.Database.JDBC_POOL_MIN_SIZE, fromAnyRef(1)) + .withValue(StateManagerConfig.Database.JDBC_POOL_MAX_SIZE, fromAnyRef(5)) + .withValue(StateManagerConfig.Database.JDBC_POOL_IDLE_TIMEOUT_SECONDS, fromAnyRef(Duration.ofMinutes(2).toSeconds())) + .withValue(StateManagerConfig.Database.JDBC_POOL_MAX_LIFETIME_SECONDS, fromAnyRef(Duration.ofMinutes(30).toSeconds())) + .withValue(StateManagerConfig.Database.JDBC_POOL_KEEP_ALIVE_TIME_SECONDS, fromAnyRef(Duration.ZERO.toSeconds())) + .withValue(StateManagerConfig.Database.JDBC_POOL_VALIDATION_TIMEOUT_SECONDS, fromAnyRef(Duration.ofSeconds(5).toSeconds())) + val stateManagerConfigWithFallback = stateManagerConfig.withFallback( + ConfigFactory.empty().withValue(StateManagerConfig.STATE_MANAGER, defaultConfig.root()) + ) // add the state manager schema to the JDBC URL. return stateManagerConfigWithFallback.withValue( BOOT_STATE_MANAGER_JDBC_URL, diff --git a/applications/workers/worker-common/src/main/kotlin/net/corda/applications/workers/workercommon/WorkerHelpers.kt b/applications/workers/worker-common/src/main/kotlin/net/corda/applications/workers/workercommon/WorkerHelpers.kt index 1dae746c254..265c8f03324 100644 --- a/applications/workers/worker-common/src/main/kotlin/net/corda/applications/workers/workercommon/WorkerHelpers.kt +++ b/applications/workers/worker-common/src/main/kotlin/net/corda/applications/workers/workercommon/WorkerHelpers.kt @@ -2,7 +2,6 @@ package net.corda.applications.workers.workercommon import com.typesafe.config.Config import com.typesafe.config.ConfigFactory -import com.typesafe.config.ConfigValueFactory.fromAnyRef import net.corda.libs.configuration.SmartConfig import net.corda.libs.configuration.SmartConfigFactory import net.corda.libs.configuration.secret.SecretsServiceFactoryResolver @@ -12,7 +11,6 @@ import net.corda.osgi.api.Shutdown import net.corda.schema.configuration.BootConfig import net.corda.schema.configuration.ConfigDefaults import net.corda.schema.configuration.ConfigKeys -import net.corda.schema.configuration.MessagingConfig import net.corda.schema.configuration.MessagingConfig.Bus.BUS_TYPE import net.corda.schema.configuration.MessagingConfig.MAX_ALLOWED_MSG_SIZE import org.osgi.framework.FrameworkUtil @@ -40,18 +38,6 @@ class WorkerHelpers { "-msasl.jaas.config" ) - /** - * Define the paths of any boot config that must be treated as integers for boot config json validation. - */ - private val BOOT_CONFIG_INTEGER_PATHS = setOf( - MessagingConfig.StateManager.JDBC_POOL_MAX_SIZE, - MessagingConfig.StateManager.JDBC_POOL_MIN_SIZE, - MessagingConfig.StateManager.JDBC_POOL_IDLE_TIMEOUT_SECONDS, - MessagingConfig.StateManager.JDBC_POOL_MAX_LIFETIME_SECONDS, - MessagingConfig.StateManager.JDBC_POOL_KEEP_ALIVE_TIME_SECONDS, - MessagingConfig.StateManager.JDBC_POOL_VALIDATION_TIMEOUT_SECONDS, - ) - /** * Parses the [args] into the [params]. * @@ -110,31 +96,6 @@ class WorkerHelpers { }.withFallback(baseConfig) } - /** - * Converts configuration parameters that should be [Integer] from their [String] representations to actual Integers - * before performing boot configuration validation. PicoCLI casts parameters in maps to strings, so this function - * helps ensure that specific configuration paths are treated as integers. - * - * For example, when passing the command-line argument: - * - * ``` - * --stateManager database.pool.maxSize=1 - * ``` - * - * The corresponding map will be `["database.pool.maxSize" to "1"]`. This function checks if the specified configuration - * paths (defined in [BOOT_CONFIG_INTEGER_PATHS]) exist in the given `bootConfig` and, if found, converts them to integer values. - * - * @param bootConfig The SmartConfig containing the configuration parameters. - * @return A new SmartConfig with specified integer configuration paths converted to actual integers. - */ - private fun prepareIntegerConfigPaths(bootConfig: SmartConfig): SmartConfig { - var updatedConfig = bootConfig - BOOT_CONFIG_INTEGER_PATHS.forEach { path -> - if(bootConfig.hasPath(path)) updatedConfig = updatedConfig.withValue(path, fromAnyRef(bootConfig.getInt(path))) - } - return updatedConfig - } - /** * Return a SmartConfig object for the top level of the bootstrap configuration. * @@ -200,10 +161,7 @@ class WorkerHelpers { configWithFiles.getConfig(BootConfig.BOOT_SECRETS).atPath(BootConfig.BOOT_SECRETS), secretsServiceFactoryResolver.findAll()) - val unvalidatedBootConfig = smartConfigFactory.create(configWithFiles.withoutPath(BootConfig.BOOT_SECRETS)) - - val bootConfig = prepareIntegerConfigPaths(unvalidatedBootConfig) - + val bootConfig = smartConfigFactory.create(configWithFiles.withoutPath(BootConfig.BOOT_SECRETS)) validator.validate(ConfigKeys.BOOT_CONFIG, bootConfig, loadResource(BOOT_CONFIG_PATH), true) // we now know bootConfig has: @@ -289,4 +247,4 @@ class WorkerHelpers { info("VM ${mxBeanInfo.vmName} ${mxBeanInfo.vmVendor} ${mxBeanInfo.vmVersion}") } } -} \ No newline at end of file +} diff --git a/applications/workers/worker-common/src/main/resources/net/corda/applications/workers/workercommon/boot/corda.boot.json b/applications/workers/worker-common/src/main/resources/net/corda/applications/workers/workercommon/boot/corda.boot.json index 6108f20f0e2..60ace61c1fa 100644 --- a/applications/workers/worker-common/src/main/resources/net/corda/applications/workers/workercommon/boot/corda.boot.json +++ b/applications/workers/worker-common/src/main/resources/net/corda/applications/workers/workercommon/boot/corda.boot.json @@ -229,13 +229,6 @@ "null" ] }, - "persistenceUnitName": { - "description": "The persistence unit name to use by the state manager when using database as the persistent storage", - "type": [ - "string", - "null" - ] - }, "additionalProperties": false }, "additionalProperties": false diff --git a/applications/workers/worker-common/src/test/kotlin/net/corda/applications/workers/workercommon/internal/BootstrapConfigTest.kt b/applications/workers/worker-common/src/test/kotlin/net/corda/applications/workers/workercommon/internal/BootstrapConfigTest.kt index 9f9b8044698..f7190cc9839 100644 --- a/applications/workers/worker-common/src/test/kotlin/net/corda/applications/workers/workercommon/internal/BootstrapConfigTest.kt +++ b/applications/workers/worker-common/src/test/kotlin/net/corda/applications/workers/workercommon/internal/BootstrapConfigTest.kt @@ -9,7 +9,6 @@ import net.corda.libs.configuration.validation.ConfigurationValidator import net.corda.schema.configuration.BootConfig import net.corda.schema.configuration.ConfigDefaults import net.corda.schema.configuration.ConfigKeys -import net.corda.schema.configuration.MessagingConfig import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.SoftAssertions.assertSoftly import org.junit.jupiter.api.Test @@ -75,7 +74,6 @@ class BootstrapConfigTest { @Test fun `full config can be provided in file (json)`() { - val config = WorkerHelpers.getBootstrapConfig( mockSecretsServiceFactoryResolver, DefaultWorkerParams(1234).also { @@ -184,31 +182,4 @@ class BootstrapConfigTest { } } - - @Test - fun `getBootstrapConfig converts integers to strings at predefined paths`() { - defaultWorkerParams.stateManagerParams = mapOf( - "database.pool.maxSize" to "111", - "database.pool.minSize" to "222", - "database.pool.idleTimeoutSeconds" to "333", - "database.pool.maxLifetimeSeconds" to "444", - "database.pool.keepAliveTimeSeconds" to "555", - "database.pool.validationTimeoutSeconds" to "666", - ) - val config = WorkerHelpers.getBootstrapConfig( - mockSecretsServiceFactoryResolver, - defaultWorkerParams, - mockConfigurationValidator - ) - - assertSoftly { softly -> - softly.assertThat(config.getInt(MessagingConfig.StateManager.JDBC_POOL_MAX_SIZE)).isEqualTo(111) - softly.assertThat(config.getInt(MessagingConfig.StateManager.JDBC_POOL_MIN_SIZE)).isEqualTo(222) - softly.assertThat(config.getInt(MessagingConfig.StateManager.JDBC_POOL_IDLE_TIMEOUT_SECONDS)).isEqualTo(333) - softly.assertThat(config.getInt(MessagingConfig.StateManager.JDBC_POOL_MAX_LIFETIME_SECONDS)).isEqualTo(444) - softly.assertThat(config.getInt(MessagingConfig.StateManager.JDBC_POOL_KEEP_ALIVE_TIME_SECONDS)).isEqualTo(555) - softly.assertThat(config.getInt(MessagingConfig.StateManager.JDBC_POOL_VALIDATION_TIMEOUT_SECONDS)).isEqualTo(666) - } - - } -} \ No newline at end of file +} diff --git a/charts/corda-lib/templates/_worker.tpl b/charts/corda-lib/templates/_worker.tpl index 2196b9a7f55..fa4a365af1c 100644 --- a/charts/corda-lib/templates/_worker.tpl +++ b/charts/corda-lib/templates/_worker.tpl @@ -309,8 +309,6 @@ spec: - "--stateManager" - "database.jdbc.driver=org.postgresql.Driver" - "--stateManager" - - "database.jdbc.persistenceUnitName=corda-state-manager" - - "--stateManager" - "database.pool.maxSize={{ .stateManagerDbConnectionPool.maxSize }}" {{- if .stateManagerDbConnectionPool.minSize }} - "--stateManager" diff --git a/components/configuration/configuration-read-service-impl/src/integrationTest/kotlin/net/corda/configuration/read/impl/ConfigurationReadServiceImplTest.kt b/components/configuration/configuration-read-service-impl/src/integrationTest/kotlin/net/corda/configuration/read/impl/ConfigurationReadServiceImplTest.kt index 808d83b4b70..80d7f3c3777 100644 --- a/components/configuration/configuration-read-service-impl/src/integrationTest/kotlin/net/corda/configuration/read/impl/ConfigurationReadServiceImplTest.kt +++ b/components/configuration/configuration-read-service-impl/src/integrationTest/kotlin/net/corda/configuration/read/impl/ConfigurationReadServiceImplTest.kt @@ -16,12 +16,16 @@ import net.corda.messaging.api.records.Record import net.corda.schema.Schemas.Config.CONFIG_TOPIC import net.corda.schema.configuration.BootConfig.BOOT_JDBC_URL import net.corda.schema.configuration.BootConfig.BOOT_MAX_ALLOWED_MSG_SIZE +import net.corda.schema.configuration.BootConfig.BOOT_STATE_MANAGER_JDBC_URL +import net.corda.schema.configuration.BootConfig.BOOT_STATE_MANAGER_TYPE import net.corda.schema.configuration.BootConfig.INSTANCE_ID import net.corda.schema.configuration.ConfigKeys.BOOT_CONFIG import net.corda.schema.configuration.ConfigKeys.DB_CONFIG import net.corda.schema.configuration.ConfigKeys.FLOW_CONFIG +import net.corda.schema.configuration.ConfigKeys.STATE_MANAGER_CONFIG import net.corda.schema.configuration.DatabaseConfig.JDBC_URL import net.corda.schema.configuration.MessagingConfig.Bus.BUS_TYPE +import net.corda.schema.configuration.StateManagerConfig import net.corda.test.util.eventually import net.corda.utilities.seconds import org.junit.jupiter.api.Assertions.assertEquals @@ -40,17 +44,25 @@ class ConfigurationReadServiceImplTest { companion object { private const val JDBC_URL_DATA = "testDataToTriggerBootDBParamLogic" + private const val STATE_MANAGER_JDBC_URL_DATA = "testDataToTriggerBootStateManagerDBParamLogic" private const val BOOT_CONFIG_STRING = """ $INSTANCE_ID = 1 $BUS_TYPE = DATABASE $BOOT_JDBC_URL = $JDBC_URL_DATA $BOOT_MAX_ALLOWED_MSG_SIZE = 1000000000 + $BOOT_STATE_MANAGER_TYPE = DATABASE + $BOOT_STATE_MANAGER_JDBC_URL = $STATE_MANAGER_JDBC_URL_DATA """ private const val DB_CONFIG_STRING = """ $JDBC_URL = $JDBC_URL_DATA """ + private const val STATE_MANAGER_CONFIG_STRING = """ + ${StateManagerConfig.TYPE} = "DATABASE" + ${StateManagerConfig.Database.JDBC_URL} = $STATE_MANAGER_JDBC_URL_DATA + """ + private const val TIMEOUT = 10000L } @@ -107,10 +119,16 @@ class ConfigurationReadServiceImplTest { // Register a new client and verify everything gets delivered val expectedDBConfig = smartConfigFactory.create(ConfigFactory.parseString(DB_CONFIG_STRING)) - val expectedKeys = mutableSetOf(BOOT_CONFIG, FLOW_CONFIG, DB_CONFIG) + val expectedStateManagerConfig = smartConfigFactory.create(ConfigFactory.parseString(STATE_MANAGER_CONFIG_STRING)) + + val expectedKeys = mutableSetOf(BOOT_CONFIG, FLOW_CONFIG, DB_CONFIG, STATE_MANAGER_CONFIG) val expectedConfig = mutableMapOf( - BOOT_CONFIG to bootConfig, FLOW_CONFIG to flowConfig, DB_CONFIG to expectedDBConfig + BOOT_CONFIG to bootConfig, + FLOW_CONFIG to flowConfig, + DB_CONFIG to expectedDBConfig, + STATE_MANAGER_CONFIG to expectedStateManagerConfig ) + val latch = CountDownLatch(1) val reg2 = configurationReadService.registerForUpdates { keys, config -> receivedKeys.addAll(keys) @@ -118,10 +136,12 @@ class ConfigurationReadServiceImplTest { latch.countDown() } latch.await(TIMEOUT, TimeUnit.MILLISECONDS) + assertEquals(expectedKeys, receivedKeys, "Incorrect keys") assertEquals(expectedConfig[BOOT_CONFIG], receivedConfig[BOOT_CONFIG], "Incorrect config for key $BOOT_CONFIG") assertEquals(expectedConfig[FLOW_CONFIG], receivedConfig[FLOW_CONFIG], "Incorrect config for key $FLOW_CONFIG") assertEquals(expectedConfig[DB_CONFIG], receivedConfig[DB_CONFIG], "Incorrect config for key $DB_CONFIG") + assertEquals(expectedConfig[STATE_MANAGER_CONFIG], receivedConfig[STATE_MANAGER_CONFIG], "Incorrect config for key $STATE_MANAGER_CONFIG") // Cleanup reg.close() diff --git a/components/configuration/configuration-read-service-impl/src/main/kotlin/net/corda/configuration/read/impl/ConfigProcessor.kt b/components/configuration/configuration-read-service-impl/src/main/kotlin/net/corda/configuration/read/impl/ConfigProcessor.kt index f6612bb734c..e16b2004277 100644 --- a/components/configuration/configuration-read-service-impl/src/main/kotlin/net/corda/configuration/read/impl/ConfigProcessor.kt +++ b/components/configuration/configuration-read-service-impl/src/main/kotlin/net/corda/configuration/read/impl/ConfigProcessor.kt @@ -11,8 +11,11 @@ import net.corda.lifecycle.LifecycleCoordinator import net.corda.messaging.api.processor.CompactedProcessor import net.corda.messaging.api.records.Record import net.corda.reconciliation.VersionedRecord +import net.corda.schema.configuration.BootConfig.BOOT_DB +import net.corda.schema.configuration.BootConfig.BOOT_STATE_MANAGER import net.corda.schema.configuration.ConfigKeys.DB_CONFIG import net.corda.schema.configuration.ConfigKeys.MESSAGING_CONFIG +import net.corda.schema.configuration.ConfigKeys.STATE_MANAGER_CONFIG import net.corda.utilities.debug import org.slf4j.LoggerFactory import java.util.concurrent.ConcurrentHashMap @@ -94,11 +97,17 @@ internal class ConfigProcessor( if (currentData.containsKey(MESSAGING_CONFIG)) { config[MESSAGING_CONFIG] = configMerger.getMessagingConfig(bootConfig, config[MESSAGING_CONFIG]) } - val dbConfig = configMerger.getDbConfig(bootConfig, config[DB_CONFIG]) + + val dbConfig = configMerger.getConfig(bootConfig, BOOT_DB, config[DB_CONFIG]) if (!dbConfig.isEmpty) { config[DB_CONFIG] = dbConfig } + val stateManagerConfig = configMerger.getConfig(bootConfig, BOOT_STATE_MANAGER, config[STATE_MANAGER_CONFIG]) + if (!stateManagerConfig.isEmpty) { + config[STATE_MANAGER_CONFIG] = stateManagerConfig + } + // at this point config is fully populated and verified return config } @@ -124,4 +133,4 @@ internal class ConfigProcessor( } internal fun getAllVersionedRecords() = configCache.values.stream() -} \ No newline at end of file +} diff --git a/components/configuration/configuration-read-service-impl/src/test/kotlin/net/corda/configuration/read/impl/ConfigProcessorTest.kt b/components/configuration/configuration-read-service-impl/src/test/kotlin/net/corda/configuration/read/impl/ConfigProcessorTest.kt index 83958fb2ea9..e0a2633af42 100644 --- a/components/configuration/configuration-read-service-impl/src/test/kotlin/net/corda/configuration/read/impl/ConfigProcessorTest.kt +++ b/components/configuration/configuration-read-service-impl/src/test/kotlin/net/corda/configuration/read/impl/ConfigProcessorTest.kt @@ -34,7 +34,7 @@ class ConfigProcessorTest { private val smartConfigFactory = SmartConfigFactory.createWithoutSecurityServices() private val configMerger: ConfigMerger = mock { on { getMessagingConfig(any(), any()) } doAnswer { it.arguments[1] as SmartConfig } - on { getDbConfig(any(), anyOrNull()) } doAnswer { SmartConfigImpl.empty() } + on { getConfig(any(), any(), anyOrNull()) } doAnswer { SmartConfigImpl.empty() } } companion object { @@ -75,24 +75,24 @@ class ConfigProcessorTest { @Test fun `No config is forwarded if the snapshot is empty and db boot config is empty`() { val coordinator = mock() - val bootconfig = BOOT_CONFIG_STRING.toSmartConfig() - val configProcessor = ConfigProcessor(coordinator, smartConfigFactory, bootconfig, configMerger) + val bootConfig = BOOT_CONFIG_STRING.toSmartConfig() + val configProcessor = ConfigProcessor(coordinator, smartConfigFactory, bootConfig, configMerger) configProcessor.onSnapshot(mapOf()) verify(coordinator, times(0)).postEvent(capture(eventCaptor)) - verify(configMerger, times(0)).getMessagingConfig(bootconfig, null) + verify(configMerger, times(0)).getMessagingConfig(bootConfig, null) } @Test fun `config is forwarded if the snapshot is empty but db boot config is set`() { val coordinator = mock() val dbConfig = DB_CONFIG_STRING.toSmartConfig() - whenever(configMerger.getDbConfig(any(), anyOrNull())).thenReturn(dbConfig) - val bootconfig = BOOT_CONFIG_STRING.toSmartConfig() - val configProcessor = ConfigProcessor(coordinator, smartConfigFactory, bootconfig, configMerger) + whenever(configMerger.getConfig(any(), any(), anyOrNull())).thenReturn(dbConfig) + val bootConfig = BOOT_CONFIG_STRING.toSmartConfig() + val configProcessor = ConfigProcessor(coordinator, smartConfigFactory, bootConfig, configMerger) configProcessor.onSnapshot(mapOf()) verify(coordinator, times(1)).postEvent(capture(eventCaptor)) - verify(configMerger, times(0)).getMessagingConfig(bootconfig, null) - verify(configMerger, times(1)).getDbConfig(any(), anyOrNull()) + verify(configMerger, times(0)).getMessagingConfig(bootConfig, null) + verify(configMerger, times(2)).getConfig(any(), any(), anyOrNull()) } @Test diff --git a/components/configuration/configuration-read-service-impl/src/test/kotlin/net/corda/configuration/read/impl/ConfigReadServiceEventHandlerTest.kt b/components/configuration/configuration-read-service-impl/src/test/kotlin/net/corda/configuration/read/impl/ConfigReadServiceEventHandlerTest.kt index d2f4fb8b8a5..6370aede5d9 100644 --- a/components/configuration/configuration-read-service-impl/src/test/kotlin/net/corda/configuration/read/impl/ConfigReadServiceEventHandlerTest.kt +++ b/components/configuration/configuration-read-service-impl/src/test/kotlin/net/corda/configuration/read/impl/ConfigReadServiceEventHandlerTest.kt @@ -104,7 +104,7 @@ internal class ConfigReadServiceEventHandlerTest { messagingConfig = mock() configMerger = mock { on { getMessagingConfig(bootConfig, null) } doAnswer { messagingConfig } - on { getDbConfig(any(), any()) } doAnswer { it.arguments[1] as SmartConfig } + on { getConfig(any(), any(), any()) } doAnswer { it.arguments[1] as SmartConfig } } avroSchemaRegistry = mock() publisher = mock() diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenanceImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenanceImpl.kt index 09c7b951072..3e135e3b334 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenanceImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenanceImpl.kt @@ -40,17 +40,16 @@ class FlowMaintenanceImpl @Activate constructor( private var stateManager: StateManager? = null override fun onConfigChange(config: Map) { - // TODO - fix config key (CORE-17437). - if(config.containsKey(ConfigKeys.MESSAGING_CONFIG)) { + // Top level component is using ConfigurationReadService#registerComponentForUpdates, so either both or none of the keys + // should be present. + if (config.containsKey(ConfigKeys.STATE_MANAGER_CONFIG) && config.containsKey(ConfigKeys.MESSAGING_CONFIG)) { val messagingConfig = config.getConfig(ConfigKeys.MESSAGING_CONFIG) - val newStateManagerConfig = config.getConfig(ConfigKeys.MESSAGING_CONFIG) - // Only re-configure the state manager if the config has changes. - // This should not be needed anymore once it's a separate key. - if(newStateManagerConfig != stateManagerConfig) { - stateManager?.close() - stateManager = stateManagerFactory.create(newStateManagerConfig) - stateManagerConfig = newStateManagerConfig - } + val newStateManagerConfig = config.getConfig(ConfigKeys.STATE_MANAGER_CONFIG) + + stateManager?.close() + stateManagerConfig = newStateManagerConfig + stateManager = stateManagerFactory.create(newStateManagerConfig) + coordinator.createManagedResource("FLOW_MAINTENANCE_SUBSCRIPTION") { subscriptionFactory.createDurableSubscription( SubscriptionConfig( @@ -86,9 +85,10 @@ class FlowMaintenanceImpl @Activate constructor( coordinator.updateStatus(LifecycleStatus.UP) // TODO - this should register to follow the State Manager's lifecycle } + is StopEvent -> { logger.trace { "Flow maintenance is stopping..." } } } } -} \ No newline at end of file +} diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/service/FlowService.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/service/FlowService.kt index 59b223f0c1c..08aed82af94 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/service/FlowService.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/service/FlowService.kt @@ -20,6 +20,7 @@ import net.corda.sandboxgroupcontext.service.SandboxGroupContextComponent import net.corda.schema.configuration.ConfigKeys.BOOT_CONFIG import net.corda.schema.configuration.ConfigKeys.FLOW_CONFIG import net.corda.schema.configuration.ConfigKeys.MESSAGING_CONFIG +import net.corda.schema.configuration.ConfigKeys.STATE_MANAGER_CONFIG import net.corda.schema.configuration.ConfigKeys.UTXO_LEDGER_CONFIG import net.corda.virtualnode.read.VirtualNodeInfoReadService import org.osgi.service.component.annotations.Activate @@ -38,11 +39,17 @@ class FlowService @Activate constructor( @Reference(service = ExternalMessagingRoutingService::class) private val externalMessagingRoutingService: ExternalMessagingRoutingService, @Reference(service = FlowMaintenance::class) - private val flowMaintenance: FlowMaintenance, - ) : Lifecycle { + private val flowMaintenance: FlowMaintenance +) : Lifecycle { companion object { - private val configSections = setOf(BOOT_CONFIG, MESSAGING_CONFIG, FLOW_CONFIG, UTXO_LEDGER_CONFIG) + private val configSections = setOf( + FLOW_CONFIG, + BOOT_CONFIG, + MESSAGING_CONFIG, + UTXO_LEDGER_CONFIG, + STATE_MANAGER_CONFIG + ) } private var registration: RegistrationHandle? = null @@ -75,7 +82,7 @@ class FlowService @Activate constructor( coordinator, configSections ) - }else { + } else { coordinator.updateStatus(event.status) } } diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/FlowMaintenanceImplTests.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/FlowMaintenanceImplTests.kt index 3f814865e12..f825584dc31 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/FlowMaintenanceImplTests.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/FlowMaintenanceImplTests.kt @@ -37,18 +37,21 @@ class FlowMaintenanceImplTests { private val stateManagerFactory = mock { on { create(any()) } doReturn (stateManager) } + + private val flowMaintenance = FlowMaintenanceImpl(lifecycleCoordinatorFactory, subscriptionFactory, stateManagerFactory) private val messagingConfig = mock() - // TODO - fix this when state manager config is split up from messaging - private val stateManagerConfig = messagingConfig + private val stateManagerConfig = mock() + private val config = mapOf( - ConfigKeys.MESSAGING_CONFIG to messagingConfig + ConfigKeys.MESSAGING_CONFIG to messagingConfig, + ConfigKeys.STATE_MANAGER_CONFIG to stateManagerConfig ) @Test fun `when config provided create subscription and start it`() { val captor = argumentCaptor<() -> Subscription>() - val m = FlowMaintenanceImpl(lifecycleCoordinatorFactory, subscriptionFactory, stateManagerFactory) - m.onConfigChange(config) + + flowMaintenance.onConfigChange(config) verify(lifecycleCoordinator).createManagedResource(any(), captor.capture()) captor.firstValue() verify(subscriptionFactory).createDurableSubscription( @@ -64,35 +67,48 @@ class FlowMaintenanceImplTests { } @Test - fun `when same state manager config pushed do not create another StateManager`() { - val m = FlowMaintenanceImpl(lifecycleCoordinatorFactory, subscriptionFactory, stateManagerFactory) - m.onConfigChange(config) - m.onConfigChange(config) + fun `when no state manager config pushed and messaging config pushed do not create another StateManager`() { + flowMaintenance.onConfigChange(config) + flowMaintenance.onConfigChange(mapOf(ConfigKeys.MESSAGING_CONFIG to messagingConfig)) + + verify(stateManagerFactory, Times(1)).create(stateManagerConfig) + } + + @Test + fun `when no messaging config pushed and no state manager config pushed do not create another StateManager`() { + flowMaintenance.onConfigChange(config) + flowMaintenance.onConfigChange(mapOf(ConfigKeys.STATE_MANAGER_CONFIG to mock())) + verify(stateManagerFactory, Times(1)).create(stateManagerConfig) } @Test fun `when new state manager config pushed create another StateManager and close old`() { - val m = FlowMaintenanceImpl(lifecycleCoordinatorFactory, subscriptionFactory, stateManagerFactory) - m.onConfigChange(config) + flowMaintenance.onConfigChange(config) val newConfig = mock() - m.onConfigChange(mapOf(ConfigKeys.MESSAGING_CONFIG to newConfig)) + flowMaintenance.onConfigChange( + mapOf( + ConfigKeys.MESSAGING_CONFIG to newConfig, + ConfigKeys.STATE_MANAGER_CONFIG to newConfig, + ) + ) + verify(stateManagerFactory).create(newConfig) verify(stateManager).close() } @Test fun `when stop close StateManager`() { - val m = FlowMaintenanceImpl(lifecycleCoordinatorFactory, subscriptionFactory, stateManagerFactory) - m.onConfigChange(config) - m.stop() + flowMaintenance.onConfigChange(config) + flowMaintenance.stop() + verify(stateManager).close() } @Test fun `do nothing when messaging config not sent`() { - val m = FlowMaintenanceImpl(lifecycleCoordinatorFactory, subscriptionFactory, stateManagerFactory) - m.onConfigChange(mapOf("foo" to mock())) + flowMaintenance.onConfigChange(mapOf("foo" to mock())) + verify(lifecycleCoordinator, never()).createManagedResource(any(), any<() -> Subscription>()) } -} \ No newline at end of file +} diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/service/FlowServiceTest.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/service/FlowServiceTest.kt index cd94b4c9fd5..3edd56f88be 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/service/FlowServiceTest.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/service/FlowServiceTest.kt @@ -38,14 +38,15 @@ class FlowServiceTest { } private val flowExecutor = mock() - private val externalMessagingRoutingService = mock() private val flowMaintenance = mock() + private val externalMessagingRoutingService = mock() private val exampleConfig = mapOf( ConfigKeys.BOOT_CONFIG to MINIMUM_SMART_CONFIG, ConfigKeys.MESSAGING_CONFIG to MINIMUM_SMART_CONFIG, ConfigKeys.FLOW_CONFIG to MINIMUM_SMART_CONFIG, - ConfigKeys.UTXO_LEDGER_CONFIG to MINIMUM_SMART_CONFIG + ConfigKeys.UTXO_LEDGER_CONFIG to MINIMUM_SMART_CONFIG, + ConfigKeys.STATE_MANAGER_CONFIG to MINIMUM_SMART_CONFIG ) @Test @@ -69,7 +70,15 @@ class FlowServiceTest { verify(this.configReadService).registerComponentForUpdates( eq(flowServiceCoordinator), - eq(setOf(ConfigKeys.BOOT_CONFIG, ConfigKeys.MESSAGING_CONFIG, ConfigKeys.FLOW_CONFIG, ConfigKeys.UTXO_LEDGER_CONFIG)) + eq( + setOf( + ConfigKeys.BOOT_CONFIG, + ConfigKeys.MESSAGING_CONFIG, + ConfigKeys.FLOW_CONFIG, + ConfigKeys.UTXO_LEDGER_CONFIG, + ConfigKeys.STATE_MANAGER_CONFIG + ) + ) ) } } diff --git a/gradle.properties b/gradle.properties index 8c8c220eff7..56f0bd53c9f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -47,7 +47,7 @@ commonsTextVersion = 1.10.0 bouncycastleVersion=1.73 # Corda API libs revision (change in 4th digit indicates a breaking change) # Change to 5.1.0.xx-SNAPSHOT to pick up maven local published copy -cordaApiVersion=5.1.0.30-beta+ +cordaApiVersion=5.1.0.31-beta+ disruptorVersion=3.4.4 felixConfigAdminVersion=1.9.26 diff --git a/libs/configuration/configuration-merger/src/main/kotlin/net/corda/libs/configuration/merger/ConfigMerger.kt b/libs/configuration/configuration-merger/src/main/kotlin/net/corda/libs/configuration/merger/ConfigMerger.kt index bf17b0d9474..599ce4c88e1 100644 --- a/libs/configuration/configuration-merger/src/main/kotlin/net/corda/libs/configuration/merger/ConfigMerger.kt +++ b/libs/configuration/configuration-merger/src/main/kotlin/net/corda/libs/configuration/merger/ConfigMerger.kt @@ -3,25 +3,27 @@ package net.corda.libs.configuration.merger import net.corda.libs.configuration.SmartConfig /** - * Handles merging of 2 or more SmartConfigs into each other. e.g merging boot config into messaging config. + * Handles merging of 2 or more SmartConfigs into each other, e.g. merging boot config into messaging config. */ interface ConfigMerger { /** - * Merge values from the [bootConfig] into the [messagingConfig] received from the config topic and return the resulting messaging - * config. + * Merge values from configuration section under [configKey] path within [bootConfig] into the [existingConfig] + * received from the configuration topic, and return the merged configuration. + * * @param bootConfig boot config created on startup - * @param messagingConfig messaging config taken from the topic - * @return Messaging config with boot config values merged into it. + * @param configKey path for the boot configuration section to merge + * @param existingConfig existing configuration taken from the topic + * @return configuration with boot config values under the specified section merged into it */ - fun getMessagingConfig(bootConfig: SmartConfig, messagingConfig: SmartConfig? = null) : SmartConfig + fun getConfig(bootConfig: SmartConfig, configKey: String, existingConfig: SmartConfig?): SmartConfig /** - * Merge values from the [bootConfig] into the [dbConfig] received from the config topic and return the resulting db + * Merge values from the [bootConfig] into the [messagingConfig] received from the config topic and return the resulting messaging * config. * @param bootConfig boot config created on startup - * @param dbConfig db config taken from the topic - * @return DB config with boot config values merged into it. + * @param messagingConfig messaging config taken from the topic + * @return Messaging config with boot config values merged into it. */ - fun getDbConfig(bootConfig: SmartConfig, dbConfig: SmartConfig?): SmartConfig + fun getMessagingConfig(bootConfig: SmartConfig, messagingConfig: SmartConfig? = null) : SmartConfig } diff --git a/libs/configuration/configuration-merger/src/main/kotlin/net/corda/libs/configuration/merger/impl/ConfigMergerImpl.kt b/libs/configuration/configuration-merger/src/main/kotlin/net/corda/libs/configuration/merger/impl/ConfigMergerImpl.kt index 164d7b9d93b..cd3a7ce7d19 100644 --- a/libs/configuration/configuration-merger/src/main/kotlin/net/corda/libs/configuration/merger/impl/ConfigMergerImpl.kt +++ b/libs/configuration/configuration-merger/src/main/kotlin/net/corda/libs/configuration/merger/impl/ConfigMergerImpl.kt @@ -5,7 +5,6 @@ import net.corda.libs.configuration.SmartConfigImpl import net.corda.libs.configuration.merger.ConfigMerger import net.corda.messagebus.api.configuration.BusConfigMerger import net.corda.messagebus.api.configuration.getConfigOrEmpty -import net.corda.schema.configuration.BootConfig.BOOT_DB import org.osgi.service.component.annotations.Activate import org.osgi.service.component.annotations.Component import org.osgi.service.component.annotations.Reference @@ -16,15 +15,14 @@ class ConfigMergerImpl @Activate constructor( private val busConfigMerger: BusConfigMerger ) : ConfigMerger { - override fun getMessagingConfig(bootConfig: SmartConfig, messagingConfig: SmartConfig?): SmartConfig { - return busConfigMerger.getMessagingConfig(bootConfig, messagingConfig) + override fun getConfig(bootConfig: SmartConfig, configKey: String, existingConfig: SmartConfig?): SmartConfig { + val updatedConfig = existingConfig ?: SmartConfigImpl.empty() + val bootConfiguration = bootConfig.getConfigOrEmpty(configKey) + + return bootConfiguration.withFallback(updatedConfig) } - override fun getDbConfig(bootConfig: SmartConfig, dbConfig: SmartConfig?): SmartConfig { - //TODO - Boot params for db connection details currently passed in via BOOT_DB.*. Db config logic needs to be - // migrated to use the defined boot schema values. When that this done they can be merged properly from boot db config here. - val updatedDbConfig = dbConfig ?: SmartConfigImpl.empty() - val bootDBParamsConfig = bootConfig.getConfigOrEmpty(BOOT_DB) - return bootDBParamsConfig.withFallback(updatedDbConfig) + override fun getMessagingConfig(bootConfig: SmartConfig, messagingConfig: SmartConfig?): SmartConfig { + return busConfigMerger.getMessagingConfig(bootConfig, messagingConfig) } -} \ No newline at end of file +} diff --git a/libs/configuration/configuration-merger/src/test/kotlin/net/corda/libs/configuration/merger/impl/ConfigMergerImplTest.kt b/libs/configuration/configuration-merger/src/test/kotlin/net/corda/libs/configuration/merger/impl/ConfigMergerImplTest.kt index d3cd146e6a8..3d43be555e7 100644 --- a/libs/configuration/configuration-merger/src/test/kotlin/net/corda/libs/configuration/merger/impl/ConfigMergerImplTest.kt +++ b/libs/configuration/configuration-merger/src/test/kotlin/net/corda/libs/configuration/merger/impl/ConfigMergerImplTest.kt @@ -12,10 +12,9 @@ import org.mockito.kotlin.whenever class ConfigMergerImplTest { private val busConfigMerger = mock() + private val configMerger = ConfigMergerImpl(busConfigMerger) private val smartConfigFactory = SmartConfigFactory.createWithoutSecurityServices() - private val merger = ConfigMergerImpl(busConfigMerger) - @Test fun `merger correctly merges messaging config with boot config using messaging as fallback`() { val messagingConfig = SmartConfigImpl.empty() @@ -27,13 +26,36 @@ class ConfigMergerImplTest { ) ) ) - val mergedMessagingConfig = bootConfig.withFallback(messagingConfig) + val mergedMessagingConfig = bootConfig.withFallback(messagingConfig) whenever(busConfigMerger.getMessagingConfig(eq(bootConfig), eq(messagingConfig))).thenReturn(mergedMessagingConfig) - val result = merger.getMessagingConfig(bootConfig, messagingConfig) + val result = configMerger.getMessagingConfig(bootConfig, messagingConfig) assertThat(result.getString("boot.param.a")).isEqualTo("111") assertThat(result.getString("boot.param.b")).isEqualTo("222") } -} \ No newline at end of file + + @Test + fun `merger correctly merges boot config using existing config as fallback`() { + val bootConfig = smartConfigFactory.create( + ConfigFactory.parseMap( + mapOf( + "section.key.param.a" to "X", + ) + ) + ) + + val messagingConfig = smartConfigFactory.create( + ConfigFactory.parseMap( + mapOf( + "key.param.b" to "222", + ) + ) + ) + + val result = configMerger.getConfig(bootConfig, "section", messagingConfig) + assertThat(result.getString("key.param.a")).isEqualTo("X") + assertThat(result.getString("key.param.b")).isEqualTo("222") + } +} diff --git a/libs/configuration/configuration-validation/src/main/kotlin/net/corda/libs/configuration/validation/impl/ConfigurationValidatorImpl.kt b/libs/configuration/configuration-validation/src/main/kotlin/net/corda/libs/configuration/validation/impl/ConfigurationValidatorImpl.kt index f36f89ae3c0..1c4d19b68c3 100644 --- a/libs/configuration/configuration-validation/src/main/kotlin/net/corda/libs/configuration/validation/impl/ConfigurationValidatorImpl.kt +++ b/libs/configuration/configuration-validation/src/main/kotlin/net/corda/libs/configuration/validation/impl/ConfigurationValidatorImpl.kt @@ -117,6 +117,8 @@ internal class ConfigurationValidatorImpl(private val schemaProvider: SchemaProv private fun getSchema(schemaInput: InputStream, applyDefaults: Boolean): JsonSchema { val schemaValidatorsConfig = SchemaValidatorsConfig().apply { + // Try to convert string to declared type when validating the schema. + isTypeLoose = true applyDefaultsStrategy = ApplyDefaultsStrategy(applyDefaults, false, false) } return schemaFactory.getSchema(schemaInput, schemaValidatorsConfig) diff --git a/libs/configuration/configuration-validation/src/test/kotlin/net/corda/libs/configuration/validation/impl/ConfigurationValidatorImplTest.kt b/libs/configuration/configuration-validation/src/test/kotlin/net/corda/libs/configuration/validation/impl/ConfigurationValidatorImplTest.kt index 9fe4a65e389..5907c6842d9 100644 --- a/libs/configuration/configuration-validation/src/test/kotlin/net/corda/libs/configuration/validation/impl/ConfigurationValidatorImplTest.kt +++ b/libs/configuration/configuration-validation/src/test/kotlin/net/corda/libs/configuration/validation/impl/ConfigurationValidatorImplTest.kt @@ -33,7 +33,6 @@ class ConfigurationValidatorImplTest { private const val NO_FILE_EXISTS = "no-file-here" private const val VALID_DATA = "data/valid.conf" - private const val VALID_MISSING_REFERENCE_DATA = "data/valid-missing-reference.conf" private const val INVALID_DATA = "data/invalid.conf" private val TEST_VERSION = Version.fromString("1.0") @@ -56,6 +55,15 @@ class ConfigurationValidatorImplTest { assertThat(outputConfig.getInt("testInteger")).isEqualTo(7) } + @Test + fun `valid document against test schema, applies defaults and cast types when possible`() { + val validator = createSchemaValidator() + val smartConfig = loadData(VALID_DATA) + val outputConfig = validator.validate(TEST_SCHEMA, TEST_VERSION, smartConfig, true) + assertThat(smartConfig).isNotEqualTo(outputConfig) + assertThat(outputConfig.getInt("anotherTestInteger")).isEqualTo(20) + } + @Test fun `valid document against test schema with missing testReference fields, applies defaults`() { val validator = createSchemaValidator() diff --git a/libs/configuration/configuration-validation/src/test/resources/data/valid.conf b/libs/configuration/configuration-validation/src/test/resources/data/valid.conf index dad37319321..1674d9394e8 100644 --- a/libs/configuration/configuration-validation/src/test/resources/data/valid.conf +++ b/libs/configuration/configuration-validation/src/test/resources/data/valid.conf @@ -1,8 +1,9 @@ "testString": "hello", +"anotherTestInteger": "20", "testReference": { "foo": [1, 2, 3.14], "bar": false } "testObject": { "testPropertyB": {} -} \ No newline at end of file +} diff --git a/libs/configuration/configuration-validation/src/test/resources/schema/valid/test-schema.json b/libs/configuration/configuration-validation/src/test/resources/schema/valid/test-schema.json index 0c4bf195a23..fb3004ddbb5 100644 --- a/libs/configuration/configuration-validation/src/test/resources/schema/valid/test-schema.json +++ b/libs/configuration/configuration-validation/src/test/resources/schema/valid/test-schema.json @@ -10,6 +10,11 @@ "maximum": 500, "default": 7 }, + "anotherTestInteger": { + "type": "integer", + "minimum": 0, + "maximum": 500 + }, "testString": { "type": "string" }, diff --git a/libs/messaging/db-message-bus-impl/src/main/kotlin/net/corda/messagebus/db/configuration/DbBusConfigMergerImpl.kt b/libs/messaging/db-message-bus-impl/src/main/kotlin/net/corda/messagebus/db/configuration/DbBusConfigMergerImpl.kt index df4ea25e56d..d10fc15c9c3 100644 --- a/libs/messaging/db-message-bus-impl/src/main/kotlin/net/corda/messagebus/db/configuration/DbBusConfigMergerImpl.kt +++ b/libs/messaging/db-message-bus-impl/src/main/kotlin/net/corda/messagebus/db/configuration/DbBusConfigMergerImpl.kt @@ -4,13 +4,11 @@ import com.typesafe.config.ConfigValueFactory.fromAnyRef import net.corda.libs.configuration.SmartConfig import net.corda.libs.configuration.SmartConfigImpl import net.corda.messagebus.api.configuration.BusConfigMerger -import net.corda.messagebus.api.configuration.getConfigOrEmpty import net.corda.messagebus.api.configuration.getStringOrDefault import net.corda.messagebus.api.configuration.getStringOrNull import net.corda.schema.configuration.BootConfig import net.corda.schema.configuration.BootConfig.INSTANCE_ID import net.corda.schema.configuration.BootConfig.TOPIC_PREFIX -import net.corda.schema.configuration.MessagingConfig import net.corda.schema.configuration.MessagingConfig.Bus import net.corda.schema.configuration.MessagingConfig.MAX_ALLOWED_MSG_SIZE import org.osgi.service.component.annotations.Component @@ -19,14 +17,7 @@ import org.osgi.service.component.annotations.Component class DbBusConfigMergerImpl : BusConfigMerger { override fun getMessagingConfig(bootConfig: SmartConfig, messagingConfig: SmartConfig?): SmartConfig { - var updatedMessagingConfig = messagingConfig ?: SmartConfigImpl.empty() - - bootConfig.getConfigOrEmpty(BootConfig.BOOT_STATE_MANAGER).entrySet().forEach { entry -> - updatedMessagingConfig = updatedMessagingConfig.withValue( - "${MessagingConfig.StateManager.STATE_MANAGER}.${entry.key}", - fromAnyRef(bootConfig.getString("${BootConfig.BOOT_STATE_MANAGER}.${entry.key}")) - ) - } + val updatedMessagingConfig = messagingConfig ?: SmartConfigImpl.empty() return updatedMessagingConfig .withValue(INSTANCE_ID, fromAnyRef(bootConfig.getString(INSTANCE_ID))) diff --git a/libs/messaging/db-message-bus-impl/src/test/kotlin/net/corda/messagebus/db/configuration/DbBusConfigMergerImplTest.kt b/libs/messaging/db-message-bus-impl/src/test/kotlin/net/corda/messagebus/db/configuration/DbBusConfigMergerImplTest.kt index 155971698cf..887b0a5fdc8 100644 --- a/libs/messaging/db-message-bus-impl/src/test/kotlin/net/corda/messagebus/db/configuration/DbBusConfigMergerImplTest.kt +++ b/libs/messaging/db-message-bus-impl/src/test/kotlin/net/corda/messagebus/db/configuration/DbBusConfigMergerImplTest.kt @@ -1,5 +1,6 @@ package net.corda.messagebus.db.configuration +import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import net.corda.libs.configuration.SmartConfig import net.corda.libs.configuration.SmartConfigFactory @@ -11,61 +12,39 @@ class DbBusConfigMergerImplTest { private const val TEST_BOOT_CONFIG = "test_boot.conf" } - private val smartConfigFactory = SmartConfigFactory.createWithoutSecurityServices() private val merger = DbBusConfigMergerImpl() + private val smartConfigFactory = SmartConfigFactory.createWithoutSecurityServices() + + private fun assertResultingConfig(result: Config) { + assertThat(result.getString("bus.busType")).isEqualTo("DATABASE") + assertThat(result.getString("bus.dbProperties.user")).isEqualTo("user") + assertThat(result.getString("bus.dbProperties.pass")).isEqualTo("password") + assertThat(result.getString("bus.dbProperties.jdbcUrl")).isEqualTo("sampleurlmessagebus") + } @Test fun `empty messaging config can be merged with boot config`(){ - val bootConfig = loadTestConfig(TEST_BOOT_CONFIG) + val bootConfig = loadTestConfig() val messagingConfig = smartConfigFactory.create(ConfigFactory.empty()) val result = merger.getMessagingConfig(bootConfig, messagingConfig) - - assertThat(result.getString("bus.busType")).isEqualTo("DATABASE") - assertThat(result.getString("bus.dbProperties.user")).isEqualTo("user") - assertThat(result.getString("bus.dbProperties.pass")).isEqualTo("password") - assertThat(result.getString("bus.dbProperties.jdbcUrl")).isEqualTo("sampleurlmessagebus") - assertThat(result.getString("stateManager.type")).isEqualTo("DATABASE") - assertThat(result.getString("stateManager.database.user")).isEqualTo("sampleuser") - assertThat(result.getString("stateManager.database.pass")).isEqualTo("samplepass") - assertThat(result.getString("stateManager.database.jdbc.url")).isEqualTo("samplestatemanager") - assertThat(result.getInt("stateManager.database.pool.idleTimeoutSeconds")).isEqualTo(120) - assertThat(result.getInt("stateManager.database.pool.keepAliveTimeSeconds")).isEqualTo(0) - assertThat(result.getInt("stateManager.database.pool.maxLifetimeSeconds")).isEqualTo(1800) - assertThat(result.getInt("stateManager.database.pool.maxSize")).isEqualTo(5) - assertThat(result.getInt("stateManager.database.pool.minSize")).isEqualTo(1) - assertThat(result.getInt("stateManager.database.pool.validationTimeoutSeconds")).isEqualTo(5) + assertResultingConfig(result) } @Test fun `existing messaging config can be merged with boot config`(){ - val bootConfig = loadTestConfig(TEST_BOOT_CONFIG) + val bootConfig = loadTestConfig() val messagingConfig = smartConfigFactory.create(ConfigFactory.parseMap(mapOf( - "stateManager.type" to "UNKNOWN", "db.bus.busType" to "UNKNOWN" ))) val result = merger.getMessagingConfig(bootConfig, messagingConfig) - - assertThat(result.getString("bus.busType")).isEqualTo("DATABASE") - assertThat(result.getString("bus.dbProperties.user")).isEqualTo("user") - assertThat(result.getString("bus.dbProperties.pass")).isEqualTo("password") - assertThat(result.getString("bus.dbProperties.jdbcUrl")).isEqualTo("sampleurlmessagebus") - assertThat(result.getString("stateManager.type")).isEqualTo("DATABASE") - assertThat(result.getString("stateManager.database.user")).isEqualTo("sampleuser") - assertThat(result.getString("stateManager.database.pass")).isEqualTo("samplepass") - assertThat(result.getString("stateManager.database.jdbc.url")).isEqualTo("samplestatemanager") - assertThat(result.getInt("stateManager.database.pool.idleTimeoutSeconds")).isEqualTo(120) - assertThat(result.getInt("stateManager.database.pool.keepAliveTimeSeconds")).isEqualTo(0) - assertThat(result.getInt("stateManager.database.pool.maxLifetimeSeconds")).isEqualTo(1800) - assertThat(result.getInt("stateManager.database.pool.maxSize")).isEqualTo(5) - assertThat(result.getInt("stateManager.database.pool.minSize")).isEqualTo(1) - assertThat(result.getInt("stateManager.database.pool.validationTimeoutSeconds")).isEqualTo(5) + assertResultingConfig(result) } - private fun loadTestConfig(resource: String): SmartConfig { - val url = this::class.java.classLoader.getResource(resource) - ?: throw IllegalArgumentException("Failed to find $resource") + private fun loadTestConfig(): SmartConfig { + val url = this::class.java.classLoader.getResource(TEST_BOOT_CONFIG) + ?: throw IllegalArgumentException("Failed to find $TEST_BOOT_CONFIG") val configString = url.openStream().bufferedReader().use { it.readText() } diff --git a/libs/messaging/kafka-message-bus-impl/src/main/kotlin/net/corda/messagebus/kafka/config/KafkaConfigMergerImpl.kt b/libs/messaging/kafka-message-bus-impl/src/main/kotlin/net/corda/messagebus/kafka/config/KafkaConfigMergerImpl.kt index d3012802472..b20d87f2dc7 100644 --- a/libs/messaging/kafka-message-bus-impl/src/main/kotlin/net/corda/messagebus/kafka/config/KafkaConfigMergerImpl.kt +++ b/libs/messaging/kafka-message-bus-impl/src/main/kotlin/net/corda/messagebus/kafka/config/KafkaConfigMergerImpl.kt @@ -4,11 +4,8 @@ import com.typesafe.config.ConfigValueFactory import net.corda.libs.configuration.SmartConfig import net.corda.libs.configuration.SmartConfigImpl import net.corda.messagebus.api.configuration.BusConfigMerger -import net.corda.messagebus.api.configuration.getConfigOrEmpty import net.corda.schema.configuration.BootConfig import net.corda.schema.configuration.BootConfig.BOOT_KAFKA_COMMON -import net.corda.schema.configuration.BootConfig.BOOT_STATE_MANAGER -import net.corda.schema.configuration.MessagingConfig import net.corda.schema.configuration.MessagingConfig.Bus.BUS_TYPE import net.corda.schema.configuration.MessagingConfig.Bus.KAFKA_PROPERTIES_COMMON import net.corda.schema.configuration.MessagingConfig.MAX_ALLOWED_MSG_SIZE @@ -40,16 +37,6 @@ class KafkaConfigMergerImpl : BusConfigMerger { ) } - logger.debug { "Looping through State Manager Boot Configuration" } - val stateManagerBootConfig = bootConfig.getConfigOrEmpty(BOOT_STATE_MANAGER).entrySet() - stateManagerBootConfig.forEach { entry -> - logger.debug { "Entry key: ${entry.key}" } - updatedMessagingConfig = updatedMessagingConfig.withValue( - "${MessagingConfig.StateManager.STATE_MANAGER}.${entry.key}", - ConfigValueFactory.fromAnyRef(bootConfig.getString("$BOOT_STATE_MANAGER.${entry.key}")) - ) - } - return updatedMessagingConfig } diff --git a/libs/messaging/kafka-message-bus-impl/src/test/kotlin/net/corda/messagebus/kafka/config/KafkaConfigMergerImplTest.kt b/libs/messaging/kafka-message-bus-impl/src/test/kotlin/net/corda/messagebus/kafka/config/KafkaConfigMergerImplTest.kt index e31aa0aa693..c0c379d1b28 100644 --- a/libs/messaging/kafka-message-bus-impl/src/test/kotlin/net/corda/messagebus/kafka/config/KafkaConfigMergerImplTest.kt +++ b/libs/messaging/kafka-message-bus-impl/src/test/kotlin/net/corda/messagebus/kafka/config/KafkaConfigMergerImplTest.kt @@ -1,5 +1,6 @@ package net.corda.messaging.kafka.subscription.net.corda.messagebus.kafka.config +import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import net.corda.libs.configuration.SmartConfig import net.corda.libs.configuration.SmartConfigFactory @@ -15,54 +16,34 @@ class KafkaConfigMergerImplTest { private val smartConfigFactory = SmartConfigFactory.createWithoutSecurityServices() private val merger = KafkaConfigMergerImpl() + private fun assertResultingConfig(result: Config) { + assertThat(result.getString("bus.busType")).isEqualTo("KAFKA") + assertThat(result.getString("bus.kafkaProperties.common.bootstrap.servers")).isEqualTo("localhost:9092") + } + @Test fun `empty messaging config can be merged with boot config`(){ - val bootConfig = loadTestConfig(TEST_BOOT_CONFIG) + val bootConfig = loadTestConfig() val messagingConfig = smartConfigFactory.create(ConfigFactory.empty()) val result = merger.getMessagingConfig(bootConfig, messagingConfig) - - assertThat(result.getString("bus.busType")).isEqualTo("KAFKA") - assertThat(result.getString("bus.kafkaProperties.common.bootstrap.servers")).isEqualTo("localhost:9092") - assertThat(result.getString("stateManager.type")).isEqualTo("DATABASE") - assertThat(result.getString("stateManager.database.user")).isEqualTo("sampleuser") - assertThat(result.getString("stateManager.database.pass")).isEqualTo("samplepass") - assertThat(result.getString("stateManager.database.jdbc.url")).isEqualTo("samplestatemanager") - assertThat(result.getInt("stateManager.database.pool.idleTimeoutSeconds")).isEqualTo(120) - assertThat(result.getInt("stateManager.database.pool.keepAliveTimeSeconds")).isEqualTo(0) - assertThat(result.getInt("stateManager.database.pool.maxLifetimeSeconds")).isEqualTo(1800) - assertThat(result.getInt("stateManager.database.pool.maxSize")).isEqualTo(5) - assertThat(result.getInt("stateManager.database.pool.minSize")).isEqualTo(1) - assertThat(result.getInt("stateManager.database.pool.validationTimeoutSeconds")).isEqualTo(5) + assertResultingConfig(result) } @Test fun `existing messaging config can be merged with boot config with boot config taking precendence`(){ - val bootConfig = loadTestConfig(TEST_BOOT_CONFIG) + val bootConfig = loadTestConfig() val messagingConfig = smartConfigFactory.create(ConfigFactory.parseMap(mapOf( - "stateManager.type" to "UNKNOWN", "kafka.bus.busType" to "UNKNOWN" ))) val result = merger.getMessagingConfig(bootConfig, messagingConfig) - - assertThat(result.getString("bus.busType")).isEqualTo("KAFKA") - assertThat(result.getString("bus.kafkaProperties.common.bootstrap.servers")).isEqualTo("localhost:9092") - assertThat(result.getString("stateManager.type")).isEqualTo("DATABASE") - assertThat(result.getString("stateManager.database.user")).isEqualTo("sampleuser") - assertThat(result.getString("stateManager.database.pass")).isEqualTo("samplepass") - assertThat(result.getString("stateManager.database.jdbc.url")).isEqualTo("samplestatemanager") - assertThat(result.getInt("stateManager.database.pool.idleTimeoutSeconds")).isEqualTo(120) - assertThat(result.getInt("stateManager.database.pool.keepAliveTimeSeconds")).isEqualTo(0) - assertThat(result.getInt("stateManager.database.pool.maxLifetimeSeconds")).isEqualTo(1800) - assertThat(result.getInt("stateManager.database.pool.maxSize")).isEqualTo(5) - assertThat(result.getInt("stateManager.database.pool.minSize")).isEqualTo(1) - assertThat(result.getInt("stateManager.database.pool.validationTimeoutSeconds")).isEqualTo(5) + assertResultingConfig(result) } - private fun loadTestConfig(resource: String): SmartConfig { - val url = this::class.java.classLoader.getResource(resource) - ?: throw IllegalArgumentException("Failed to find $resource") + private fun loadTestConfig(): SmartConfig { + val url = this::class.java.classLoader.getResource(TEST_BOOT_CONFIG) + ?: throw IllegalArgumentException("Failed to find $TEST_BOOT_CONFIG") val configString = url.openStream().bufferedReader().use { it.readText() } diff --git a/libs/state-manager/state-manager-db-impl/src/main/kotlin/net/corda/libs/statemanager/impl/factory/StateManagerFactoryImpl.kt b/libs/state-manager/state-manager-db-impl/src/main/kotlin/net/corda/libs/statemanager/impl/factory/StateManagerFactoryImpl.kt index 35e03a8ea43..9ea443a5616 100644 --- a/libs/state-manager/state-manager-db-impl/src/main/kotlin/net/corda/libs/statemanager/impl/factory/StateManagerFactoryImpl.kt +++ b/libs/state-manager/state-manager-db-impl/src/main/kotlin/net/corda/libs/statemanager/impl/factory/StateManagerFactoryImpl.kt @@ -12,17 +12,16 @@ import net.corda.libs.statemanager.impl.repository.impl.QueryProvider import net.corda.libs.statemanager.impl.repository.impl.StateRepositoryImpl import net.corda.orm.DbEntityManagerConfiguration import net.corda.orm.EntityManagerFactoryFactory -import net.corda.schema.configuration.MessagingConfig.StateManager.JDBC_DRIVER -import net.corda.schema.configuration.MessagingConfig.StateManager.JDBC_PASS -import net.corda.schema.configuration.MessagingConfig.StateManager.JDBC_PERSISTENCE_UNIT_NAME -import net.corda.schema.configuration.MessagingConfig.StateManager.JDBC_POOL_IDLE_TIMEOUT_SECONDS -import net.corda.schema.configuration.MessagingConfig.StateManager.JDBC_POOL_KEEP_ALIVE_TIME_SECONDS -import net.corda.schema.configuration.MessagingConfig.StateManager.JDBC_POOL_MAX_LIFETIME_SECONDS -import net.corda.schema.configuration.MessagingConfig.StateManager.JDBC_POOL_MAX_SIZE -import net.corda.schema.configuration.MessagingConfig.StateManager.JDBC_POOL_MIN_SIZE -import net.corda.schema.configuration.MessagingConfig.StateManager.JDBC_POOL_VALIDATION_TIMEOUT_SECONDS -import net.corda.schema.configuration.MessagingConfig.StateManager.JDBC_URL -import net.corda.schema.configuration.MessagingConfig.StateManager.JDBC_USER +import net.corda.schema.configuration.StateManagerConfig.Database.JDBC_DRIVER +import net.corda.schema.configuration.StateManagerConfig.Database.JDBC_PASS +import net.corda.schema.configuration.StateManagerConfig.Database.JDBC_POOL_IDLE_TIMEOUT_SECONDS +import net.corda.schema.configuration.StateManagerConfig.Database.JDBC_POOL_KEEP_ALIVE_TIME_SECONDS +import net.corda.schema.configuration.StateManagerConfig.Database.JDBC_POOL_MAX_LIFETIME_SECONDS +import net.corda.schema.configuration.StateManagerConfig.Database.JDBC_POOL_MAX_SIZE +import net.corda.schema.configuration.StateManagerConfig.Database.JDBC_POOL_MIN_SIZE +import net.corda.schema.configuration.StateManagerConfig.Database.JDBC_POOL_VALIDATION_TIMEOUT_SECONDS +import net.corda.schema.configuration.StateManagerConfig.Database.JDBC_URL +import net.corda.schema.configuration.StateManagerConfig.Database.JDBC_USER import org.osgi.service.component.annotations.Activate import org.osgi.service.component.annotations.Component import org.osgi.service.component.annotations.Reference @@ -39,7 +38,6 @@ class StateManagerFactoryImpl @Activate constructor( val pass = config.getString(JDBC_PASS) val jdbcUrl = config.getString(JDBC_URL) val jdbcDiver = config.getString(JDBC_DRIVER) - val persistenceUnitName = config.getString(JDBC_PERSISTENCE_UNIT_NAME) val maxPoolSize = config.getInt(JDBC_POOL_MAX_SIZE) val minPoolSize = config.getIntOrDefault(JDBC_POOL_MIN_SIZE, maxPoolSize) val idleTimeout = config.getInt(JDBC_POOL_IDLE_TIMEOUT_SECONDS).toLong().run(Duration::ofSeconds) @@ -61,7 +59,8 @@ class StateManagerFactoryImpl @Activate constructor( ) val entityManagerFactory = entityManagerFactoryFactory.create( - persistenceUnitName, + // TODO-[CORE-CORE-17025]: persistent unit name will not be required after removing Hibernate. + "corda-state-manager", StateManagerEntities.classes, DbEntityManagerConfiguration(dataSource) )