Skip to content

Commit

Permalink
- Don't create if the correct config not passed in.
Browse files Browse the repository at this point in the history
- Fix FlowServiceTest
  • Loading branch information
driessamyn committed Oct 2, 2023
1 parent 63985fe commit 083efc2
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package net.corda.flow.maintenance

import net.corda.flow.service.FlowExecutor
import net.corda.libs.configuration.SmartConfig
import net.corda.libs.configuration.helper.getConfig
import net.corda.libs.statemanager.api.StateManagerFactory
Expand Down Expand Up @@ -35,19 +34,24 @@ class FlowMaintenanceImpl @Activate constructor(
private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass)
}

private val coordinator = coordinatorFactory.createCoordinator<FlowExecutor>(::eventHandler)
private val coordinator = coordinatorFactory.createCoordinator<FlowMaintenance>(::eventHandler)
override fun onConfigChange(config: Map<String, SmartConfig>) {
val messagingConfig = config.getConfig(ConfigKeys.MESSAGING_CONFIG)
// TODO - fix config key. The state manager has nothing to do with messaging.
val stateManagerConfig = config.getConfig(ConfigKeys.MESSAGING_CONFIG)
coordinator.createManagedResource("FLOW_MAINTENANCE_SUBSCRIPTION") {
subscriptionFactory.createDurableSubscription(
SubscriptionConfig("flow.maintenance.tasks", Schemas.ScheduledTask.SCHEDULED_TASK_TOPIC_FLOW_PROCESSOR),
SessionTimeoutTaskProcessor(stateManagerFactory.create(stateManagerConfig)),
messagingConfig,
null
)
}.start()
if(config.containsKey(ConfigKeys.MESSAGING_CONFIG)) {
val messagingConfig = config.getConfig(ConfigKeys.MESSAGING_CONFIG)
val stateManagerConfig = config.getConfig(ConfigKeys.MESSAGING_CONFIG)
coordinator.createManagedResource("FLOW_MAINTENANCE_SUBSCRIPTION") {
subscriptionFactory.createDurableSubscription(
SubscriptionConfig(
"flow.maintenance.tasks",
Schemas.ScheduledTask.SCHEDULED_TASK_TOPIC_FLOW_PROCESSOR
),
SessionTimeoutTaskProcessor(stateManagerFactory.create(stateManagerConfig)),
messagingConfig,
null
)
}.start()
}
}

override val isRunning: Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.mockito.kotlin.doReturn
import org.mockito.kotlin.eq
import org.mockito.kotlin.isNull
import org.mockito.kotlin.mock
import org.mockito.kotlin.never
import org.mockito.kotlin.verify

class FlowMaintenanceImplTests {
Expand Down Expand Up @@ -60,4 +61,11 @@ class FlowMaintenanceImplTests {
verify(stateManagerFactory).create(stateManagerConfig)
verify(subscription).start()
}

@Test
fun `do nothing when messaging config not sent`() {
val m = FlowMaintenanceImpl(lifecycleCoordinatorFactory, subscriptionFactory, stateManagerFactory)
m.onConfigChange(mapOf("foo" to mock()))
verify(lifecycleCoordinator, never()).createManagedResource(any(), any<() -> Subscription<String, ScheduledTaskTrigger>>())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class FlowServiceTest {
Arguments.of(LifecycleCoordinatorName.forComponent<SandboxGroupContextComponent>()),
Arguments.of(LifecycleCoordinatorName.forComponent<VirtualNodeInfoReadService>()),
Arguments.of(LifecycleCoordinatorName.forComponent<CpiInfoReadService>()),
Arguments.of(LifecycleCoordinatorName.forComponent<FlowExecutor>())
Arguments.of(LifecycleCoordinatorName.forComponent<FlowExecutor>()),
Arguments.of(LifecycleCoordinatorName.forComponent<FlowMaintenance>()),
)
}
}
Expand Down Expand Up @@ -147,6 +148,7 @@ class FlowServiceTest {
addDependency<VirtualNodeInfoReadService>()
addDependency<CpiInfoReadService>()
addDependency<FlowExecutor>()
addDependency<FlowMaintenance>()

FlowService(
coordinatorFactory,
Expand Down

0 comments on commit 083efc2

Please sign in to comment.