Skip to content

Commit

Permalink
CORE-17623 - move-token-selection-processor (#4835)
Browse files Browse the repository at this point in the history
Move the Token Selection Processor in the stand-alone worker.
  • Loading branch information
driessamyn committed Oct 13, 2023
1 parent 4a03916 commit c544155
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 16 deletions.
1 change: 0 additions & 1 deletion applications/workers/release/db-worker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ dependencies {
// Processor list must be kept in sync with workersForProcessor in net.corda.cli.plugins.topicconfig.Create
implementation project(':processors:db-processor')
implementation project(':processors:scheduler-processor')
implementation project(':processors:token-cache-processor')
implementation 'org.jetbrains.kotlin:kotlin-osgi-bundle'
implementation "info.picocli:picocli:$picocliVersion"
implementation 'net.corda:corda-base'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import net.corda.osgi.api.Application
import net.corda.osgi.api.Shutdown
import net.corda.processors.db.DBProcessor
import net.corda.processors.scheduler.SchedulerProcessor
import net.corda.processors.token.cache.TokenCacheProcessor
import net.corda.schema.configuration.BootConfig.BOOT_DB
import net.corda.tracing.configureTracing
import net.corda.tracing.shutdownTracing
Expand All @@ -36,8 +35,6 @@ import picocli.CommandLine.Option
class DBWorker @Activate constructor(
@Reference(service = DBProcessor::class)
private val processor: DBProcessor,
@Reference(service = TokenCacheProcessor::class)
private val tokenCacheProcessor: TokenCacheProcessor,
@Reference(service = SchedulerProcessor::class)
private val schedulerProcessor: SchedulerProcessor,
@Reference(service = Shutdown::class)
Expand Down Expand Up @@ -69,7 +66,6 @@ class DBWorker @Activate constructor(

JavaSerialisationFilter.install()


val params = getParams(args, DBWorkerParams())

if (printHelpOrVersion(params.defaultParams, DBWorker::class.java, shutDownService)) return
Expand All @@ -86,15 +82,13 @@ class DBWorker @Activate constructor(
)
webServer.start(params.defaultParams.workerServerPort)
processor.start(config)
tokenCacheProcessor.start(config)
schedulerProcessor.start(config)
}

override fun shutdown() {
logger.info("DB worker stopping.")
processor.stop()
webServer.stop()
tokenCacheProcessor.stop()
schedulerProcessor.stop()
shutdownTracing()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ class ConfigTests {
val dbWorker = DBWorker(
dbProcessor,
mock(),
mock(),
DummyShutdown(),
DummyLifecycleRegistry(),
DummyWebServer(),
Expand Down Expand Up @@ -99,7 +98,6 @@ class ConfigTests {
val dbWorker = DBWorker(
dbProcessor,
mock(),
mock(),
DummyShutdown(),
DummyLifecycleRegistry(),
DummyWebServer(),
Expand Down Expand Up @@ -133,7 +131,6 @@ class ConfigTests {
val dbWorker = DBWorker(
dbProcessor,
mock(),
mock(),
DummyShutdown(),
DummyLifecycleRegistry(),
DummyWebServer(),
Expand Down Expand Up @@ -166,7 +163,6 @@ class ConfigTests {
val dbWorker = DBWorker(
dbProcessor,
mock(),
mock(),
DummyShutdown(),
DummyLifecycleRegistry(),
DummyWebServer(),
Expand All @@ -193,7 +189,6 @@ class ConfigTests {
val dbWorker = DBWorker(
dbProcessor,
mock(),
mock(),
DummyShutdown(),
DummyLifecycleRegistry(),
DummyWebServer(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies {
implementation project(':libs:platform-info')
implementation project(':libs:tracing')
implementation project(':libs:web:web')
implementation project(':processors:token-cache-processor')
implementation 'org.jetbrains.kotlin:kotlin-osgi-bundle'
implementation "info.picocli:picocli:$picocliVersion"
implementation 'net.corda:corda-base'
Expand All @@ -36,6 +37,7 @@ dependencies {
runtimeOnly "org.osgi:org.osgi.util.function:$osgiUtilFunctionVersion"
runtimeOnly "org.osgi:org.osgi.util.promise:$osgiUtilPromiseVersion"

runtimeOnly project(':libs:messaging:kafka-message-bus-impl')
runtimeOnly project(':libs:tracing-impl')

testImplementation 'org.osgi:osgi.core'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import net.corda.applications.workers.workercommon.DefaultWorkerParams
import net.corda.applications.workers.workercommon.Health
import net.corda.applications.workers.workercommon.JavaSerialisationFilter
import net.corda.applications.workers.workercommon.Metrics
import net.corda.applications.workers.workercommon.WorkerHelpers
import net.corda.applications.workers.workercommon.WorkerHelpers.Companion.getParams
import net.corda.applications.workers.workercommon.WorkerHelpers.Companion.loggerStartupInfo
import net.corda.applications.workers.workercommon.WorkerHelpers.Companion.printHelpOrVersion
Expand All @@ -14,6 +15,7 @@ import net.corda.libs.platform.PlatformInfoProvider
import net.corda.lifecycle.registry.LifecycleRegistry
import net.corda.osgi.api.Application
import net.corda.osgi.api.Shutdown
import net.corda.processors.token.cache.TokenCacheProcessor
import net.corda.schema.configuration.BootConfig
import net.corda.tracing.configureTracing
import net.corda.tracing.shutdownTracing
Expand Down Expand Up @@ -42,6 +44,8 @@ class TokenSelectionWorker @Activate constructor(
private val configurationValidatorFactory: ConfigurationValidatorFactory,
@Reference(service = SecretsServiceFactoryResolver::class)
val secretsServiceFactoryResolver: SecretsServiceFactoryResolver,
@Reference(service = TokenCacheProcessor::class)
private val tokenCacheProcessor: TokenCacheProcessor,
) : Application {

private companion object {
Expand All @@ -64,14 +68,21 @@ class TokenSelectionWorker @Activate constructor(

configureTracing("Token selection Worker", params.defaultParams.zipkinTraceUrl, params.defaultParams.traceSamplesPerSecond)

webServer.start(params.defaultParams.workerServerPort)
val config = WorkerHelpers.getBootstrapConfig(
secretsServiceFactoryResolver,
params.defaultParams,
configurationValidatorFactory.createConfigValidator(),
listOf(WorkerHelpers.createConfigFromParams(BootConfig.BOOT_DB, params.databaseParams))
)

// This is a placeholder worker the processor is still todo
webServer.start(params.defaultParams.workerServerPort)
tokenCacheProcessor.start(config)
}

override fun shutdown() {
logger.info("Token selection worker stopping.")
webServer.stop()
tokenCacheProcessor.stop()
shutdownTracing()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import net.corda.messaging.api.processor.StateAndEventProcessor
import net.corda.messaging.api.processor.StateAndEventProcessor.State
import net.corda.messaging.api.records.Record
import net.corda.tracing.traceStateAndEventExecution
import net.corda.utilities.debug
import org.slf4j.LoggerFactory

@Suppress("LongParameterList")
Expand Down Expand Up @@ -52,6 +53,8 @@ class TokenCacheEventProcessor(
markForDLQ = true)
}

log.debug { "Token event received: $tokenEvent" }

return traceStateAndEventExecution(event, "Token Event - ${tokenEvent.javaClass.simpleName}") {
try {
tokenSelectionMetrics.recordProcessingTime(tokenEvent) {
Expand Down Expand Up @@ -85,6 +88,8 @@ class TokenCacheEventProcessor(

val result = handler.handle(tokenCache, poolCacheState, tokenEvent)

log.debug { "sending token response: $result" }

if (result == null) {
StateAndEventProcessor.Response(
State(poolCacheState.toAvro(), metadata = state?.metadata),
Expand Down
2 changes: 1 addition & 1 deletion processors/db-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ dependencies {
runtimeOnly project(':components:configuration:configuration-write-service-impl')
runtimeOnly project(':components:configuration:configuration-read-service-impl')
runtimeOnly project(':components:db:db-connection-manager-impl')
runtimeOnly project(':libs:flows:external-event-responses-impl')
runtimeOnly project(':components:reconciliation:reconciliation-impl')
runtimeOnly project(':components:uniqueness:uniqueness-checker-impl')
runtimeOnly project(':components:virtual-node:cpi-info-read-service-impl')
Expand All @@ -88,6 +87,7 @@ dependencies {
runtimeOnly project(":libs:application:application-impl")
runtimeOnly project(':libs:crypto:cipher-suite-impl')
runtimeOnly project(':libs:db:db-orm-impl')
runtimeOnly project(':libs:flows:external-event-responses-impl')
runtimeOnly project(':libs:lifecycle:lifecycle-impl')
runtimeOnly project(':libs:membership:membership-impl')
runtimeOnly project(':libs:messaging:messaging-impl')
Expand Down
17 changes: 17 additions & 0 deletions processors/token-cache-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,28 @@ dependencies {
implementation 'org.jetbrains.kotlin:kotlin-osgi-bundle'
implementation 'org.slf4j:slf4j-api'

implementation 'net.corda:corda-config-schema'
implementation 'net.corda:corda-db-schema'

implementation project(':components:ledger:ledger-utxo-token-cache')
implementation project(":components:configuration:configuration-read-service")
implementation project(':components:db:db-connection-manager')
implementation project(':components:virtual-node:virtual-node-info-read-service')
implementation project(':libs:configuration:configuration-datamodel')
implementation project(":libs:lifecycle:lifecycle")
implementation project(':libs:utilities')

runtimeOnly project(":components:configuration:configuration-read-service-impl")
runtimeOnly project(":libs:web:web-impl")

runtimeOnly project(':components:configuration:configuration-read-service-impl')
runtimeOnly project(':components:db:db-connection-manager-impl')
runtimeOnly project(":libs:application:application-impl")
runtimeOnly project(':libs:crypto:cipher-suite-impl')
runtimeOnly project(':libs:db:db-orm-impl')
runtimeOnly project(':libs:flows:external-event-responses-impl')
runtimeOnly project(':libs:lifecycle:lifecycle-impl')
runtimeOnly project(':libs:messaging:messaging-impl')
runtimeOnly project(':libs:schema-registry:schema-registry-impl')
runtimeOnly project(":libs:web:web-impl")
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package net.corda.processors.token.cache.internal

import net.corda.configuration.read.ConfigurationReadService
import net.corda.db.connection.manager.DbConnectionManager
import net.corda.db.schema.CordaDb
import net.corda.ledger.utxo.token.cache.factories.TokenCacheComponentFactory
import net.corda.ledger.utxo.token.cache.services.TokenCacheComponent
import net.corda.libs.configuration.SmartConfig
import net.corda.libs.configuration.datamodel.ConfigurationEntities
import net.corda.lifecycle.DependentComponents
import net.corda.lifecycle.LifecycleCoordinator
import net.corda.lifecycle.LifecycleCoordinatorFactory
Expand All @@ -12,8 +15,11 @@ import net.corda.lifecycle.RegistrationStatusChangeEvent
import net.corda.lifecycle.StartEvent
import net.corda.lifecycle.StopEvent
import net.corda.lifecycle.createCoordinator
import net.corda.orm.JpaEntitiesRegistry
import net.corda.processors.token.cache.TokenCacheProcessor
import net.corda.schema.configuration.BootConfig.BOOT_DB
import net.corda.utilities.debug
import net.corda.virtualnode.read.VirtualNodeInfoReadService
import org.osgi.service.component.annotations.Activate
import org.osgi.service.component.annotations.Component
import org.osgi.service.component.annotations.Reference
Expand All @@ -28,15 +34,36 @@ class TokenCacheProcessorImpl @Activate constructor(
@Reference(service = ConfigurationReadService::class)
private val configurationReadService: ConfigurationReadService,
@Reference(service = TokenCacheComponentFactory::class)
private val tokenCacheComponentFactory: TokenCacheComponentFactory
private val tokenCacheComponentFactory: TokenCacheComponentFactory,
@Reference(service = DbConnectionManager::class)
private val dbConnectionManager: DbConnectionManager,
@Reference(service = VirtualNodeInfoReadService::class)
private val virtualNodeInfoReadService: VirtualNodeInfoReadService,
@Reference(service = JpaEntitiesRegistry::class)
private val entitiesRegistry: JpaEntitiesRegistry,
) : TokenCacheProcessor {
init {
// define the different DB Entity Sets
// entities can be in different packages, but all JPA classes must be passed in.
entitiesRegistry.register(
CordaDb.CordaCluster.persistenceUnitName,
ConfigurationEntities.classes
)
entitiesRegistry.register(
CordaDb.Vault.persistenceUnitName,
// Token selection uses native queries, so no JPA entities to register.
emptySet()
)
}

private companion object {
val log: Logger = LoggerFactory.getLogger(this::class.java.enclosingClass)
}

private val dependentComponents = DependentComponents.of(
::configurationReadService,
::dbConnectionManager,
::virtualNodeInfoReadService,
).with(tokenCacheComponentFactory.create(), TokenCacheComponent::class.java)

private val lifecycleCoordinator =
Expand Down Expand Up @@ -65,6 +92,10 @@ class TokenCacheProcessorImpl @Activate constructor(
coordinator.updateStatus(event.status)
}
is BootConfigEvent -> {
val bootstrapConfig = event.config
log.info("Bootstrapping DB connection Manager")
dbConnectionManager.bootstrap(bootstrapConfig.getConfig(BOOT_DB))
log.info("Bootstrapping config read service")
configurationReadService.bootstrapConfig(event.config)
}
is StopEvent -> {
Expand Down

0 comments on commit c544155

Please sign in to comment.