Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CORE-17387 - flow checkpoint maintenance events #4746

1 change: 1 addition & 0 deletions applications/examples/sandbox-app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ dependencies {
runtimeOnly project(':libs:messaging:db-message-bus-impl')
runtimeOnly project(':libs:serialization:serialization-checkpoint-api')
runtimeOnly project(':libs:serialization:serialization-kryo')
runtimeOnly project(':libs:state-manager:state-manager-db-impl')
runtimeOnly project(':testing:group-policy-test-common')
runtimeOnly project(':libs:web:web-impl')

Expand Down
15 changes: 15 additions & 0 deletions applications/workers/release/flow-worker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,19 @@ dependencies {
exclude group: 'org.apache.felix'
exclude group: 'org.osgi'
}

// This puts the jdbc driver into the docker image in the /opt/jdbc-driver folder
// this folder can contain many jdbc drivers (and DataSourceFactory provider bundles).
// Postgres doesn't need a DataSourceFactory provider bundle (e.g. pax-jdbc), because
// the postgres devs have written their own and it's in this jar (PGDataSourceFactory).
dockerImageJdbc "org.postgresql:postgresql:$postgresDriverVersion"

// If we were to do this for a different database that is *not natively an OSGi bundle*
// we would need the wrapped OSGi bundle version and the pax-jdbc loader, i.e.
//
// dockerImageJdbc "org.ops4j.pax.jdbc:pax-jdbc-VENDOR:1.5.3"
// dockerImageJdbc "com.VENDOR.database.jdbc:vendor-jdbc-WRAPPED-AS-A-BUNDLE:$vendorVersion"
//
// NOTE: PLEASE MAKE SURE NOT TO PUBLISH A DOCKER IMAGE PUBLICLY WITH THESE WRAPPED DRIVERS,
// UNLESS ABSOLUTELY SURE WE CAN DISTRIBUTE IT!!
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,12 @@ class ScheduledTaskHandlerTest {
private fun createStateEntry(
key: String,
lastUpdated: Instant,
mapperState: FlowMapperStateType?
mapperState: FlowMapperStateType
): Pair<String, State> {
val metadata = metadata()
mapperState?.let {
metadata.put(FLOW_MAPPER_STATUS, it.toString())
}
val state = State(
key,
byteArrayOf(),
metadata = metadata,
metadata = metadata(FLOW_MAPPER_STATUS to mapperState.toString()),
modifiedTime = lastUpdated
)
return Pair(key, state)
Expand Down
74 changes: 38 additions & 36 deletions components/flow/flow-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,51 +12,52 @@ dependencies {
compileOnly 'org.osgi:osgi.annotation'
compileOnly "co.paralleluniverse:quasar-osgi-annotations:$quasarVersion"

implementation project(":components:configuration:configuration-read-service")
implementation project(":components:external-messaging-services")
implementation project(":components:membership:membership-group-read")
implementation project(':components:configuration:configuration-read-service')
implementation project(':components:external-messaging-services')
implementation project(':components:membership:membership-group-read')
implementation project(':components:membership:group-policy')
implementation project(":components:virtual-node:virtual-node-info-read-service")
implementation project(":components:virtual-node:cpi-info-read-service")
implementation project(":components:virtual-node:cpk-read-service")
implementation project(":components:virtual-node:sandbox-group-context-service")
implementation project(':components:virtual-node:virtual-node-info-read-service')
implementation project(':components:virtual-node:cpi-info-read-service')
implementation project(':components:virtual-node:cpk-read-service')
implementation project(':components:virtual-node:sandbox-group-context-service')
runtimeOnly project(':components:virtual-node:sandbox-amqp')
runtimeOnly project(':components:virtual-node:sandbox-json')

implementation project(":libs:cache:cache-caffeine")
implementation project(":libs:configuration:configuration-core")
implementation project(":libs:crypto:crypto-core")
implementation project(":libs:crypto:crypto-flow")
implementation project(":libs:external-messaging")
implementation project(':libs:cache:cache-caffeine')
implementation project(':libs:configuration:configuration-core')
implementation project(':libs:crypto:crypto-core')
implementation project(':libs:crypto:crypto-flow')
implementation project(':libs:external-messaging')
implementation project(':libs:flows:flow-api')
implementation project(':libs:flows:session-manager')
implementation project(":libs:flows:flow-utils")
implementation project(":libs:lifecycle:lifecycle")
implementation project(":libs:membership:membership-common")
implementation project(":libs:metrics")
implementation project(':libs:lifecycle:lifecycle')
implementation project(':libs:membership:membership-common')
implementation project(':libs:metrics')
implementation project(":libs:messaging:messaging")
implementation project(':libs:platform-info')
implementation project(":libs:sandbox")
implementation project(':libs:sandbox')
implementation project(':libs:serialization:serialization-amqp')
implementation project(":libs:serialization:serialization-checkpoint-api")
implementation project(":libs:utilities")
implementation project(":libs:virtual-node:sandbox-group-context")
implementation project(':libs:serialization:serialization-checkpoint-api')
implementation project(':libs:state-manager:state-manager-api')
implementation project(':libs:utilities')
implementation project(':libs:virtual-node:sandbox-group-context')
implementation project(':libs:virtual-node:virtual-node-info')
implementation project(':libs:platform-info')
implementation project(":libs:serialization:serialization-avro")
implementation project(":libs:tracing")
implementation project(':libs:serialization:serialization-avro')
implementation project(':libs:tracing')

implementation platform("net.corda:corda-api:$cordaApiVersion")

implementation "com.typesafe:config:$typeSafeConfigVersion"
implementation "net.corda:corda-application"
implementation "net.corda:corda-ledger-utxo"
implementation "net.corda:corda-avro-schema"
implementation "net.corda:corda-base"
implementation "net.corda:corda-config-schema"
implementation 'net.corda:corda-application'
implementation 'net.corda:corda-ledger-utxo'
implementation 'net.corda:corda-avro-schema'
implementation 'net.corda:corda-base'
implementation 'net.corda:corda-config-schema'
implementation 'net.corda:corda-ledger-common'
implementation project(":libs:packaging:packaging")
implementation "net.corda:corda-topic-schema"
implementation project(':libs:packaging:packaging')
implementation 'net.corda:corda-topic-schema'
implementation 'org.jetbrains.kotlin:kotlin-osgi-bundle'
implementation "org.slf4j:slf4j-api:$slf4jVersion"

Expand All @@ -69,11 +70,11 @@ dependencies {
testImplementation "org.apache.felix:org.apache.felix.framework:$felixVersion"
testImplementation "com.fasterxml.jackson.module:jackson-module-kotlin:$jacksonVersion"

testImplementation project(":libs:flows:session-manager-impl")
testImplementation project(":libs:lifecycle:lifecycle-test-impl")
testImplementation project(":libs:lifecycle:lifecycle-impl")
testImplementation project(":libs:lifecycle:registry")
testImplementation project(":testing:flow:flow-utilities")
testImplementation project(':libs:flows:session-manager-impl')
testImplementation project(':libs:lifecycle:lifecycle-test-impl')
testImplementation project(':libs:lifecycle:lifecycle-impl')
testImplementation project(':libs:lifecycle:registry')
testImplementation project(':testing:flow:flow-utilities')
testImplementation project(':testing:test-utilities')

testRuntimeOnly "org.slf4j:slf4j-simple:$slf4jVersion"
Expand All @@ -85,13 +86,14 @@ dependencies {

integrationTestRuntimeOnly project(':libs:application:application-impl')
integrationTestRuntimeOnly project(':libs:flows:session-manager-impl')
integrationTestRuntimeOnly project(":libs:lifecycle:lifecycle-impl")
integrationTestRuntimeOnly project(':libs:lifecycle:lifecycle-impl')
integrationTestRuntimeOnly project(':libs:messaging:db-message-bus-impl')
integrationTestRuntimeOnly project(':libs:messaging:messaging-impl')
integrationTestRuntimeOnly project(':libs:serialization:serialization-checkpoint-api')
integrationTestRuntimeOnly project(':libs:serialization:serialization-kryo')
integrationTestRuntimeOnly project(":components:membership:membership-group-read-impl")
integrationTestRuntimeOnly project(":components:virtual-node:cpk-read-service-impl")
integrationTestRuntimeOnly project(':libs:state-manager:state-manager-db-impl')
integrationTestRuntimeOnly project(':components:membership:membership-group-read-impl')
integrationTestRuntimeOnly project(':components:virtual-node:cpk-read-service-impl')

integrationTestRuntimeOnly "org.apache.aries.spifly:org.apache.aries.spifly.dynamic.framework.extension:$ariesDynamicFrameworkExtensionVersion"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
@Export
package net.corda.flow.maintenance;

import org.osgi.annotation.bundle.Export;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package net.corda.flow.maintenance

import net.corda.libs.configuration.SmartConfig
import net.corda.lifecycle.Lifecycle

interface FlowMaintenance : Lifecycle {
fun onConfigChange(config: Map<String, SmartConfig>)
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package net.corda.flow.maintenance

import net.corda.libs.configuration.SmartConfig
import net.corda.libs.configuration.helper.getConfig
import net.corda.libs.statemanager.api.StateManagerFactory
import net.corda.lifecycle.LifecycleCoordinator
import net.corda.lifecycle.LifecycleCoordinatorFactory
import net.corda.lifecycle.LifecycleEvent
import net.corda.lifecycle.LifecycleStatus
import net.corda.lifecycle.StartEvent
import net.corda.lifecycle.StopEvent
import net.corda.lifecycle.createCoordinator
import net.corda.messaging.api.subscription.config.SubscriptionConfig
import net.corda.messaging.api.subscription.factory.SubscriptionFactory
import net.corda.schema.Schemas
import net.corda.schema.configuration.ConfigKeys
import net.corda.utilities.debug
import net.corda.utilities.trace
import org.osgi.service.component.annotations.Activate
import org.osgi.service.component.annotations.Component
import org.osgi.service.component.annotations.Reference
import org.slf4j.LoggerFactory

@Component(service = [FlowMaintenance::class])
class FlowMaintenanceImpl @Activate constructor(
@Reference(service = LifecycleCoordinatorFactory::class)
coordinatorFactory: LifecycleCoordinatorFactory,
@Reference(service = SubscriptionFactory::class)
private val subscriptionFactory: SubscriptionFactory,
@Reference(service = StateManagerFactory::class)
private val stateManagerFactory: StateManagerFactory,
) : FlowMaintenance {
companion object {
private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass)
}

private val coordinator = coordinatorFactory.createCoordinator<FlowMaintenance>(::eventHandler)
override fun onConfigChange(config: Map<String, SmartConfig>) {
// TODO - fix config key (CORE-17437).
if(config.containsKey(ConfigKeys.MESSAGING_CONFIG)) {
val messagingConfig = config.getConfig(ConfigKeys.MESSAGING_CONFIG)
val stateManagerConfig = config.getConfig(ConfigKeys.MESSAGING_CONFIG)
coordinator.createManagedResource("FLOW_MAINTENANCE_SUBSCRIPTION") {
subscriptionFactory.createDurableSubscription(
SubscriptionConfig(
"flow.maintenance.tasks",
Schemas.ScheduledTask.SCHEDULED_TASK_TOPIC_FLOW_PROCESSOR
),
SessionTimeoutTaskProcessor(stateManagerFactory.create(stateManagerConfig)),
messagingConfig,
null
)
}.start()
}
}

override val isRunning: Boolean
get() = coordinator.isRunning

override fun start() {
coordinator.start()
}

override fun stop() {
coordinator.stop()
}

private fun eventHandler(event: LifecycleEvent, coordinator: LifecycleCoordinator) {
logger.debug { "Flow maintenance event $event." }

when (event) {
is StartEvent -> {
coordinator.updateStatus(LifecycleStatus.UP)
// TODO - this should register to follow the State Manager's lifecycle
}
is StopEvent -> {
logger.trace { "Flow maintenance is stopping..." }
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package net.corda.flow.maintenance

import net.corda.data.flow.FlowTimeout
import net.corda.data.scheduler.ScheduledTaskTrigger
import net.corda.libs.statemanager.api.Operation
import net.corda.libs.statemanager.api.SingleKeyFilter
import net.corda.libs.statemanager.api.StateManager
import net.corda.messaging.api.processor.DurableProcessor
import net.corda.messaging.api.records.Record
import net.corda.schema.Schemas.Flow.FLOW_TIMEOUT_TOPIC
import net.corda.schema.Schemas.ScheduledTask
import org.slf4j.LoggerFactory
import java.time.Instant

class SessionTimeoutTaskProcessor(
private val stateManager: StateManager,
private val now: () -> Instant = Instant::now
) : DurableProcessor<String, ScheduledTaskTrigger> {
companion object {
private val logger = LoggerFactory.getLogger(SessionTimeoutTaskProcessor::class.java)
// TODO - this may need to move out somewhere else.
const val STATE_META_SESSION_EXPIRY_KEY = "session.expiry"
}
override val keyClass: Class<String>
get() = String::class.java
override val valueClass: Class<ScheduledTaskTrigger>
get() = ScheduledTaskTrigger::class.java

override fun onNext(events: List<Record<String, ScheduledTaskTrigger>>): List<Record<*, *>> {
// If we receive multiple, there's probably an issue somewhere, and we can ignore all but the last one.
return events.lastOrNull { it.key == ScheduledTask.SCHEDULED_TASK_NAME_SESSION_TIMEOUT }?.value?.let { trigger ->
logger.trace("Processing trigger scheduled at ${trigger.timestamp}")
// TODO - temporary query
// TODO - we must be able to specify additional filters so we can limit to selecting those sessions that are still open
// TODO - we must be able to limit by type of state
Comment on lines +34 to +35
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jujoramos , @conalsmith-r3 , please advice how this can be implemented.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We haven't implemented custom filters yet, but there's already a ticket created for it.

val checkpoints = stateManager.find(
SingleKeyFilter(STATE_META_SESSION_EXPIRY_KEY, Operation.LesserThan, now().epochSecond)
)
if (checkpoints.isEmpty()) {
logger.trace("No flows to time out")
emptyList()
} else {
// TODO - take log message out when everything plumbed in.
logger.info("Trigger cleanup of $checkpoints")
checkpoints.map { kvp ->
Record(FLOW_TIMEOUT_TOPIC, kvp.key,
FlowTimeout(
kvp.value.key,
Instant.ofEpochSecond(kvp.value.metadata[STATE_META_SESSION_EXPIRY_KEY] as Long))
)
}
}
} ?: emptyList()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import net.corda.configuration.read.ConfigChangedEvent
import net.corda.configuration.read.ConfigurationReadService
import net.corda.cpiinfo.read.CpiInfoReadService
import net.corda.external.messaging.services.ExternalMessagingRoutingService
import net.corda.flow.maintenance.FlowMaintenance
import net.corda.lifecycle.Lifecycle
import net.corda.lifecycle.LifecycleCoordinator
import net.corda.lifecycle.LifecycleCoordinatorFactory
Expand All @@ -24,7 +25,6 @@ import net.corda.virtualnode.read.VirtualNodeInfoReadService
import org.osgi.service.component.annotations.Activate
import org.osgi.service.component.annotations.Component
import org.osgi.service.component.annotations.Reference
import org.slf4j.LoggerFactory

@Suppress("LongParameterList")
@Component(service = [FlowService::class])
Expand All @@ -37,10 +37,11 @@ class FlowService @Activate constructor(
private val flowExecutor: FlowExecutor,
@Reference(service = ExternalMessagingRoutingService::class)
private val externalMessagingRoutingService: ExternalMessagingRoutingService,
@Reference(service = FlowMaintenance::class)
private val flowMaintenance: FlowMaintenance,
) : Lifecycle {

companion object {
private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass)
private val configSections = setOf(BOOT_CONFIG, MESSAGING_CONFIG, FLOW_CONFIG, UTXO_LEDGER_CONFIG)
}

Expand All @@ -60,8 +61,10 @@ class FlowService @Activate constructor(
LifecycleCoordinatorName.forComponent<VirtualNodeInfoReadService>(),
LifecycleCoordinatorName.forComponent<CpiInfoReadService>(),
LifecycleCoordinatorName.forComponent<FlowExecutor>(),
LifecycleCoordinatorName.forComponent<FlowMaintenance>(),
)
)
flowMaintenance.start()
flowExecutor.start()
}

Expand All @@ -85,12 +88,14 @@ class FlowService @Activate constructor(
* is configured before we configure the executor to prevent a race between receiving the first
* state events and scheduler creating a publisher.
*/
flowMaintenance.onConfigChange(config)
flowExecutor.onConfigChange(config)
externalMessagingRoutingService.onConfigChange(config)
coordinator.updateStatus(LifecycleStatus.UP)
}

is StopEvent -> {
flowMaintenance.stop()
flowExecutor.stop()
registration?.close()
registration = null
Expand Down
Loading