Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CORE-17437: State Manager Configuration Section #4780

Merged
merged 13 commits into from
Oct 10, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ import net.corda.schema.configuration.BootConfig.BOOT_STATE_MANAGER_DB_USER
import net.corda.schema.configuration.BootConfig.BOOT_STATE_MANAGER_JDBC_URL
import net.corda.schema.configuration.BootConfig.BOOT_STATE_MANAGER_TYPE
import net.corda.schema.configuration.DatabaseConfig
import net.corda.schema.configuration.MessagingConfig
import net.corda.schema.configuration.MessagingConfig.Bus.BUS_TYPE
import net.corda.schema.configuration.StateManagerConfig
import net.corda.tracing.configureTracing
import net.corda.tracing.shutdownTracing
import net.corda.web.api.WebServer
Expand Down Expand Up @@ -224,17 +224,16 @@ class CombinedWorker @Activate constructor(
*/
private fun prepareStateManagerConfig(stateManagerConfig: Config): Config {
val defaultConfig = ConfigFactory.empty()
.withValue(MessagingConfig.StateManager.JDBC_DRIVER, fromAnyRef("org.postgresql.Driver"))
.withValue(MessagingConfig.StateManager.JDBC_PERSISTENCE_UNIT_NAME, fromAnyRef("corda-state-manager"))
.withValue(MessagingConfig.StateManager.JDBC_POOL_MIN_SIZE, fromAnyRef(1))
.withValue(MessagingConfig.StateManager.JDBC_POOL_MAX_SIZE, fromAnyRef(5))
.withValue(MessagingConfig.StateManager.JDBC_POOL_IDLE_TIMEOUT_SECONDS, fromAnyRef(Duration.ofMinutes(2).toSeconds()))
.withValue(MessagingConfig.StateManager.JDBC_POOL_MAX_LIFETIME_SECONDS, fromAnyRef(Duration.ofMinutes(30).toSeconds()))
.withValue(MessagingConfig.StateManager.JDBC_POOL_KEEP_ALIVE_TIME_SECONDS, fromAnyRef(Duration.ZERO.toSeconds()))
.withValue(
MessagingConfig.StateManager.JDBC_POOL_VALIDATION_TIMEOUT_SECONDS, fromAnyRef(Duration.ofSeconds(5).toSeconds())
)
val stateManagerConfigWithFallback = stateManagerConfig.withFallback(defaultConfig)
.withValue(StateManagerConfig.Database.JDBC_DRIVER, fromAnyRef("org.postgresql.Driver"))
.withValue(StateManagerConfig.Database.JDBC_POOL_MIN_SIZE, fromAnyRef(1))
.withValue(StateManagerConfig.Database.JDBC_POOL_MAX_SIZE, fromAnyRef(5))
.withValue(StateManagerConfig.Database.JDBC_POOL_IDLE_TIMEOUT_SECONDS, fromAnyRef(Duration.ofMinutes(2).toSeconds()))
.withValue(StateManagerConfig.Database.JDBC_POOL_MAX_LIFETIME_SECONDS, fromAnyRef(Duration.ofMinutes(30).toSeconds()))
.withValue(StateManagerConfig.Database.JDBC_POOL_KEEP_ALIVE_TIME_SECONDS, fromAnyRef(Duration.ZERO.toSeconds()))
.withValue(StateManagerConfig.Database.JDBC_POOL_VALIDATION_TIMEOUT_SECONDS, fromAnyRef(Duration.ofSeconds(5).toSeconds()))
val stateManagerConfigWithFallback = stateManagerConfig.withFallback(
ConfigFactory.empty().withValue(StateManagerConfig.STATE_MANAGER, defaultConfig.root())
)
// add the state manager schema to the JDBC URL.
return stateManagerConfigWithFallback.withValue(
BOOT_STATE_MANAGER_JDBC_URL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package net.corda.applications.workers.workercommon

import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigValueFactory.fromAnyRef
import net.corda.libs.configuration.SmartConfig
import net.corda.libs.configuration.SmartConfigFactory
import net.corda.libs.configuration.secret.SecretsServiceFactoryResolver
Expand All @@ -12,7 +11,6 @@ import net.corda.osgi.api.Shutdown
import net.corda.schema.configuration.BootConfig
import net.corda.schema.configuration.ConfigDefaults
import net.corda.schema.configuration.ConfigKeys
import net.corda.schema.configuration.MessagingConfig
import net.corda.schema.configuration.MessagingConfig.Bus.BUS_TYPE
import net.corda.schema.configuration.MessagingConfig.MAX_ALLOWED_MSG_SIZE
import org.osgi.framework.FrameworkUtil
Expand Down Expand Up @@ -40,18 +38,6 @@ class WorkerHelpers {
"-msasl.jaas.config"
)

/**
* Define the paths of any boot config that must be treated as integers for boot config json validation.
*/
private val BOOT_CONFIG_INTEGER_PATHS = setOf(
MessagingConfig.StateManager.JDBC_POOL_MAX_SIZE,
MessagingConfig.StateManager.JDBC_POOL_MIN_SIZE,
MessagingConfig.StateManager.JDBC_POOL_IDLE_TIMEOUT_SECONDS,
MessagingConfig.StateManager.JDBC_POOL_MAX_LIFETIME_SECONDS,
MessagingConfig.StateManager.JDBC_POOL_KEEP_ALIVE_TIME_SECONDS,
MessagingConfig.StateManager.JDBC_POOL_VALIDATION_TIMEOUT_SECONDS,
)

/**
* Parses the [args] into the [params].
*
Expand Down Expand Up @@ -110,31 +96,6 @@ class WorkerHelpers {
}.withFallback(baseConfig)
}

/**
* Converts configuration parameters that should be [Integer] from their [String] representations to actual Integers
* before performing boot configuration validation. PicoCLI casts parameters in maps to strings, so this function
* helps ensure that specific configuration paths are treated as integers.
*
* For example, when passing the command-line argument:
*
* ```
* --stateManager database.pool.maxSize=1
* ```
*
* The corresponding map will be `["database.pool.maxSize" to "1"]`. This function checks if the specified configuration
* paths (defined in [BOOT_CONFIG_INTEGER_PATHS]) exist in the given `bootConfig` and, if found, converts them to integer values.
*
* @param bootConfig The SmartConfig containing the configuration parameters.
* @return A new SmartConfig with specified integer configuration paths converted to actual integers.
*/
private fun prepareIntegerConfigPaths(bootConfig: SmartConfig): SmartConfig {
var updatedConfig = bootConfig
BOOT_CONFIG_INTEGER_PATHS.forEach { path ->
if(bootConfig.hasPath(path)) updatedConfig = updatedConfig.withValue(path, fromAnyRef(bootConfig.getInt(path)))
}
return updatedConfig
}

/**
* Return a SmartConfig object for the top level of the bootstrap configuration.
*
Expand Down Expand Up @@ -200,10 +161,7 @@ class WorkerHelpers {
configWithFiles.getConfig(BootConfig.BOOT_SECRETS).atPath(BootConfig.BOOT_SECRETS),
secretsServiceFactoryResolver.findAll())

val unvalidatedBootConfig = smartConfigFactory.create(configWithFiles.withoutPath(BootConfig.BOOT_SECRETS))

val bootConfig = prepareIntegerConfigPaths(unvalidatedBootConfig)

val bootConfig = smartConfigFactory.create(configWithFiles.withoutPath(BootConfig.BOOT_SECRETS))
validator.validate(ConfigKeys.BOOT_CONFIG, bootConfig, loadResource(BOOT_CONFIG_PATH), true)

// we now know bootConfig has:
Expand Down Expand Up @@ -289,4 +247,4 @@ class WorkerHelpers {
info("VM ${mxBeanInfo.vmName} ${mxBeanInfo.vmVendor} ${mxBeanInfo.vmVersion}")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,6 @@
"null"
]
},
"persistenceUnitName": {
"description": "The persistence unit name to use by the state manager when using database as the persistent storage",
"type": [
"string",
"null"
]
},
"additionalProperties": false
},
"additionalProperties": false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import net.corda.libs.configuration.validation.ConfigurationValidator
import net.corda.schema.configuration.BootConfig
import net.corda.schema.configuration.ConfigDefaults
import net.corda.schema.configuration.ConfigKeys
import net.corda.schema.configuration.MessagingConfig
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.SoftAssertions.assertSoftly
import org.junit.jupiter.api.Test
Expand Down Expand Up @@ -75,7 +74,6 @@ class BootstrapConfigTest {

@Test
fun `full config can be provided in file (json)`() {

val config = WorkerHelpers.getBootstrapConfig(
mockSecretsServiceFactoryResolver,
DefaultWorkerParams(1234).also {
Expand Down Expand Up @@ -184,31 +182,4 @@ class BootstrapConfigTest {
}

}

@Test
fun `getBootstrapConfig converts integers to strings at predefined paths`() {
defaultWorkerParams.stateManagerParams = mapOf(
"database.pool.maxSize" to "111",
"database.pool.minSize" to "222",
"database.pool.idleTimeoutSeconds" to "333",
"database.pool.maxLifetimeSeconds" to "444",
"database.pool.keepAliveTimeSeconds" to "555",
"database.pool.validationTimeoutSeconds" to "666",
)
val config = WorkerHelpers.getBootstrapConfig(
mockSecretsServiceFactoryResolver,
defaultWorkerParams,
mockConfigurationValidator
)

assertSoftly { softly ->
softly.assertThat(config.getInt(MessagingConfig.StateManager.JDBC_POOL_MAX_SIZE)).isEqualTo(111)
softly.assertThat(config.getInt(MessagingConfig.StateManager.JDBC_POOL_MIN_SIZE)).isEqualTo(222)
softly.assertThat(config.getInt(MessagingConfig.StateManager.JDBC_POOL_IDLE_TIMEOUT_SECONDS)).isEqualTo(333)
softly.assertThat(config.getInt(MessagingConfig.StateManager.JDBC_POOL_MAX_LIFETIME_SECONDS)).isEqualTo(444)
softly.assertThat(config.getInt(MessagingConfig.StateManager.JDBC_POOL_KEEP_ALIVE_TIME_SECONDS)).isEqualTo(555)
softly.assertThat(config.getInt(MessagingConfig.StateManager.JDBC_POOL_VALIDATION_TIMEOUT_SECONDS)).isEqualTo(666)
}

}
jujoramos marked this conversation as resolved.
Show resolved Hide resolved
}
}
2 changes: 0 additions & 2 deletions charts/corda-lib/templates/_worker.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,6 @@ spec:
- "--stateManager"
- "database.jdbc.driver=org.postgresql.Driver"
- "--stateManager"
- "database.jdbc.persistenceUnitName=corda-state-manager"
- "--stateManager"
- "database.pool.maxSize={{ .stateManagerDbConnectionPool.maxSize }}"
{{- if .stateManagerDbConnectionPool.minSize }}
- "--stateManager"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@ import net.corda.messaging.api.records.Record
import net.corda.schema.Schemas.Config.CONFIG_TOPIC
import net.corda.schema.configuration.BootConfig.BOOT_JDBC_URL
import net.corda.schema.configuration.BootConfig.BOOT_MAX_ALLOWED_MSG_SIZE
import net.corda.schema.configuration.BootConfig.BOOT_STATE_MANAGER_JDBC_URL
import net.corda.schema.configuration.BootConfig.BOOT_STATE_MANAGER_TYPE
import net.corda.schema.configuration.BootConfig.INSTANCE_ID
import net.corda.schema.configuration.ConfigKeys.BOOT_CONFIG
import net.corda.schema.configuration.ConfigKeys.DB_CONFIG
import net.corda.schema.configuration.ConfigKeys.FLOW_CONFIG
import net.corda.schema.configuration.ConfigKeys.STATE_MANAGER_CONFIG
import net.corda.schema.configuration.DatabaseConfig.JDBC_URL
import net.corda.schema.configuration.MessagingConfig.Bus.BUS_TYPE
import net.corda.schema.configuration.StateManagerConfig
import net.corda.test.util.eventually
import net.corda.utilities.seconds
import org.junit.jupiter.api.Assertions.assertEquals
Expand All @@ -40,17 +44,25 @@ class ConfigurationReadServiceImplTest {

companion object {
private const val JDBC_URL_DATA = "testDataToTriggerBootDBParamLogic"
private const val STATE_MANAGER_JDBC_URL_DATA = "testDataToTriggerBootStateManagerDBParamLogic"
private const val BOOT_CONFIG_STRING = """
$INSTANCE_ID = 1
$BUS_TYPE = DATABASE
$BOOT_JDBC_URL = $JDBC_URL_DATA
$BOOT_MAX_ALLOWED_MSG_SIZE = 1000000000
$BOOT_STATE_MANAGER_TYPE = DATABASE
$BOOT_STATE_MANAGER_JDBC_URL = $STATE_MANAGER_JDBC_URL_DATA
"""

private const val DB_CONFIG_STRING = """
$JDBC_URL = $JDBC_URL_DATA
"""

private const val STATE_MANAGER_CONFIG_STRING = """
${StateManagerConfig.TYPE} = "DATABASE"
${StateManagerConfig.Database.JDBC_URL} = $STATE_MANAGER_JDBC_URL_DATA
"""

private const val TIMEOUT = 10000L
}

Expand Down Expand Up @@ -107,21 +119,29 @@ class ConfigurationReadServiceImplTest {

// Register a new client and verify everything gets delivered
val expectedDBConfig = smartConfigFactory.create(ConfigFactory.parseString(DB_CONFIG_STRING))
val expectedKeys = mutableSetOf(BOOT_CONFIG, FLOW_CONFIG, DB_CONFIG)
val expectedStateManagerConfig = smartConfigFactory.create(ConfigFactory.parseString(STATE_MANAGER_CONFIG_STRING))

val expectedKeys = mutableSetOf(BOOT_CONFIG, FLOW_CONFIG, DB_CONFIG, STATE_MANAGER_CONFIG)
val expectedConfig = mutableMapOf(
BOOT_CONFIG to bootConfig, FLOW_CONFIG to flowConfig, DB_CONFIG to expectedDBConfig
BOOT_CONFIG to bootConfig,
FLOW_CONFIG to flowConfig,
DB_CONFIG to expectedDBConfig,
STATE_MANAGER_CONFIG to expectedStateManagerConfig
)

val latch = CountDownLatch(1)
val reg2 = configurationReadService.registerForUpdates { keys, config ->
receivedKeys.addAll(keys)
receivedConfig = config
latch.countDown()
}
latch.await(TIMEOUT, TimeUnit.MILLISECONDS)

assertEquals(expectedKeys, receivedKeys, "Incorrect keys")
assertEquals(expectedConfig[BOOT_CONFIG], receivedConfig[BOOT_CONFIG], "Incorrect config for key $BOOT_CONFIG")
assertEquals(expectedConfig[FLOW_CONFIG], receivedConfig[FLOW_CONFIG], "Incorrect config for key $FLOW_CONFIG")
assertEquals(expectedConfig[DB_CONFIG], receivedConfig[DB_CONFIG], "Incorrect config for key $DB_CONFIG")
assertEquals(expectedConfig[STATE_MANAGER_CONFIG], receivedConfig[STATE_MANAGER_CONFIG], "Incorrect config for key $STATE_MANAGER_CONFIG")

// Cleanup
reg.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ import net.corda.lifecycle.LifecycleCoordinator
import net.corda.messaging.api.processor.CompactedProcessor
import net.corda.messaging.api.records.Record
import net.corda.reconciliation.VersionedRecord
import net.corda.schema.configuration.BootConfig.BOOT_DB
import net.corda.schema.configuration.BootConfig.BOOT_STATE_MANAGER
import net.corda.schema.configuration.ConfigKeys.DB_CONFIG
import net.corda.schema.configuration.ConfigKeys.MESSAGING_CONFIG
import net.corda.schema.configuration.ConfigKeys.STATE_MANAGER_CONFIG
import net.corda.utilities.debug
import org.slf4j.LoggerFactory
import java.util.concurrent.ConcurrentHashMap
Expand Down Expand Up @@ -94,11 +97,17 @@ internal class ConfigProcessor(
if (currentData.containsKey(MESSAGING_CONFIG)) {
config[MESSAGING_CONFIG] = configMerger.getMessagingConfig(bootConfig, config[MESSAGING_CONFIG])
}
val dbConfig = configMerger.getDbConfig(bootConfig, config[DB_CONFIG])

val dbConfig = configMerger.getConfig(bootConfig, BOOT_DB, config[DB_CONFIG])
if (!dbConfig.isEmpty) {
config[DB_CONFIG] = dbConfig
}

val stateManagerConfig = configMerger.getConfig(bootConfig, BOOT_STATE_MANAGER, config[STATE_MANAGER_CONFIG])
if (!stateManagerConfig.isEmpty) {
config[STATE_MANAGER_CONFIG] = stateManagerConfig
}

// at this point config is fully populated and verified
return config
}
Expand All @@ -124,4 +133,4 @@ internal class ConfigProcessor(
}

internal fun getAllVersionedRecords() = configCache.values.stream()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ConfigProcessorTest {
private val smartConfigFactory = SmartConfigFactory.createWithoutSecurityServices()
private val configMerger: ConfigMerger = mock {
on { getMessagingConfig(any(), any()) } doAnswer { it.arguments[1] as SmartConfig }
on { getDbConfig(any(), anyOrNull()) } doAnswer { SmartConfigImpl.empty() }
on { getConfig(any(), any(), anyOrNull()) } doAnswer { SmartConfigImpl.empty() }
}

companion object {
Expand Down Expand Up @@ -75,24 +75,24 @@ class ConfigProcessorTest {
@Test
fun `No config is forwarded if the snapshot is empty and db boot config is empty`() {
val coordinator = mock<LifecycleCoordinator>()
val bootconfig = BOOT_CONFIG_STRING.toSmartConfig()
val configProcessor = ConfigProcessor(coordinator, smartConfigFactory, bootconfig, configMerger)
val bootConfig = BOOT_CONFIG_STRING.toSmartConfig()
val configProcessor = ConfigProcessor(coordinator, smartConfigFactory, bootConfig, configMerger)
configProcessor.onSnapshot(mapOf())
verify(coordinator, times(0)).postEvent(capture(eventCaptor))
verify(configMerger, times(0)).getMessagingConfig(bootconfig, null)
verify(configMerger, times(0)).getMessagingConfig(bootConfig, null)
}

@Test
fun `config is forwarded if the snapshot is empty but db boot config is set`() {
val coordinator = mock<LifecycleCoordinator>()
val dbConfig = DB_CONFIG_STRING.toSmartConfig()
whenever(configMerger.getDbConfig(any(), anyOrNull())).thenReturn(dbConfig)
val bootconfig = BOOT_CONFIG_STRING.toSmartConfig()
val configProcessor = ConfigProcessor(coordinator, smartConfigFactory, bootconfig, configMerger)
whenever(configMerger.getConfig(any(), any(), anyOrNull())).thenReturn(dbConfig)
val bootConfig = BOOT_CONFIG_STRING.toSmartConfig()
val configProcessor = ConfigProcessor(coordinator, smartConfigFactory, bootConfig, configMerger)
configProcessor.onSnapshot(mapOf())
verify(coordinator, times(1)).postEvent(capture(eventCaptor))
verify(configMerger, times(0)).getMessagingConfig(bootconfig, null)
verify(configMerger, times(1)).getDbConfig(any(), anyOrNull())
verify(configMerger, times(0)).getMessagingConfig(bootConfig, null)
verify(configMerger, times(2)).getConfig(any(), any(), anyOrNull())
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ internal class ConfigReadServiceEventHandlerTest {
messagingConfig = mock()
configMerger = mock {
on { getMessagingConfig(bootConfig, null) } doAnswer { messagingConfig }
on { getDbConfig(any(), any()) } doAnswer { it.arguments[1] as SmartConfig }
on { getConfig(any(), any(), any()) } doAnswer { it.arguments[1] as SmartConfig }
}
avroSchemaRegistry = mock()
publisher = mock()
Expand Down
Loading