From 297c5ba3294073bf9be7ebb70df4b6ff8ad9d6d8 Mon Sep 17 00:00:00 2001 From: Dries Samyn Date: Mon, 2 Oct 2023 16:18:14 +0100 Subject: [PATCH] - SessionTimeoutTaskProcessor integration with new topic and AVRO object. - Fix Metadata so it's an immutable map. - Add tests for Metadata checks. --- .../executor/ScheduledTaskHandlerTest.kt | 8 +--- .../flow/maintenance/FlowMaintenanceImpl.kt | 2 +- .../SessionTimeoutTaskProcessor.kt | 17 +++++-- .../SessionTimeoutTaskProcessorTests.kt | 28 ++++++++--- gradle.properties | 3 +- .../corda/libs/statemanager/api/Metadata.kt | 45 ++++++++---------- .../corda/libs/statemanager/api/Metadata.kt | 47 +++++++++++++++++++ 7 files changed, 106 insertions(+), 44 deletions(-) create mode 100644 libs/state-manager/state-manager-api/src/test/kotlin/net/corda/libs/statemanager/api/Metadata.kt diff --git a/components/flow/flow-mapper-service/src/test/kotlin/net/corda/session/mapper/service/executor/ScheduledTaskHandlerTest.kt b/components/flow/flow-mapper-service/src/test/kotlin/net/corda/session/mapper/service/executor/ScheduledTaskHandlerTest.kt index c3869966351..2d47e0b068a 100644 --- a/components/flow/flow-mapper-service/src/test/kotlin/net/corda/session/mapper/service/executor/ScheduledTaskHandlerTest.kt +++ b/components/flow/flow-mapper-service/src/test/kotlin/net/corda/session/mapper/service/executor/ScheduledTaskHandlerTest.kt @@ -80,16 +80,12 @@ class ScheduledTaskHandlerTest { private fun createStateEntry( key: String, lastUpdated: Instant, - mapperState: FlowMapperStateType? + mapperState: FlowMapperStateType ): Pair { - val metadata = metadata() - mapperState?.let { - metadata.put(FLOW_MAPPER_STATUS, it.toString()) - } val state = State( key, byteArrayOf(), - metadata = metadata, + metadata = metadata(FLOW_MAPPER_STATUS to mapperState.toString()), modifiedTime = lastUpdated ) return Pair(key, state) diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenanceImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenanceImpl.kt index faeee069ac7..7b8725fb26d 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenanceImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/FlowMaintenanceImpl.kt @@ -36,7 +36,7 @@ class FlowMaintenanceImpl @Activate constructor( private val coordinator = coordinatorFactory.createCoordinator(::eventHandler) override fun onConfigChange(config: Map) { - // TODO - fix config key. The state manager has nothing to do with messaging. + // TODO - fix config key (CORE-17437). if(config.containsKey(ConfigKeys.MESSAGING_CONFIG)) { val messagingConfig = config.getConfig(ConfigKeys.MESSAGING_CONFIG) val stateManagerConfig = config.getConfig(ConfigKeys.MESSAGING_CONFIG) diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessor.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessor.kt index 7d170a8f16f..3ae30fb6c55 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessor.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessor.kt @@ -1,11 +1,13 @@ package net.corda.flow.maintenance +import net.corda.data.flow.FlowTimeout import net.corda.data.scheduler.ScheduledTaskTrigger import net.corda.libs.statemanager.api.Operation import net.corda.libs.statemanager.api.SingleKeyFilter import net.corda.libs.statemanager.api.StateManager import net.corda.messaging.api.processor.DurableProcessor import net.corda.messaging.api.records.Record +import net.corda.schema.Schemas.Flow.FLOW_TIMEOUT_TOPIC import net.corda.schema.Schemas.ScheduledTask import org.slf4j.LoggerFactory import java.time.Instant @@ -16,6 +18,8 @@ class SessionTimeoutTaskProcessor( ) : DurableProcessor { companion object { private val logger = LoggerFactory.getLogger(SessionTimeoutTaskProcessor::class.java) + // TODO - this may need to move out somewhere else. + const val STATE_META_SESSION_EXPIRY_KEY = "session.expiry" } override val keyClass: Class get() = String::class.java @@ -30,16 +34,21 @@ class SessionTimeoutTaskProcessor( // TODO - we must be able to specify additional filters so we can limit to selecting those sessions that are still open // TODO - we must be able to limit by type of state val checkpoints = stateManager.find( - SingleKeyFilter("session.expiry", Operation.LesserThan, now()) + SingleKeyFilter(STATE_META_SESSION_EXPIRY_KEY, Operation.LesserThan, now().epochSecond) ) if (checkpoints.isEmpty()) { logger.trace("No flows to time out") emptyList() } else { - // TODO - return an avro message (schema TBC) for each checkpoint - // TODO - define topic to publish message on + // TODO - take log message out when everything plumbed in. logger.info("Trigger cleanup of $checkpoints") - emptyList() + checkpoints.map { kvp -> + Record(FLOW_TIMEOUT_TOPIC, kvp.key, + FlowTimeout( + kvp.value.key, + Instant.ofEpochSecond(kvp.value.metadata[STATE_META_SESSION_EXPIRY_KEY] as Long)) + ) + } } } ?: emptyList() } diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessorTests.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessorTests.kt index 3d8ed1cb03e..2058eecc56b 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessorTests.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/maintenance/SessionTimeoutTaskProcessorTests.kt @@ -1,13 +1,14 @@ package net.corda.flow.maintenance +import net.corda.data.flow.FlowTimeout import net.corda.data.scheduler.ScheduledTaskTrigger +import net.corda.flow.maintenance.SessionTimeoutTaskProcessor.Companion.STATE_META_SESSION_EXPIRY_KEY import net.corda.libs.statemanager.api.Metadata import net.corda.libs.statemanager.api.State import net.corda.libs.statemanager.api.StateManager import net.corda.messaging.api.records.Record import net.corda.schema.Schemas import org.assertj.core.api.Assertions.assertThat -import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.mockito.internal.verification.Times import org.mockito.kotlin.any @@ -19,9 +20,15 @@ import org.mockito.kotlin.whenever import java.time.Instant class SessionTimeoutTaskProcessorTests { - private val state = State("foo", randomBytes(), 0, Metadata()) + private val now = Instant.now() + private val state1 = + State( + "foo", + randomBytes(), + 0, + Metadata(mapOf(STATE_META_SESSION_EXPIRY_KEY to now.minusSeconds(1).epochSecond))) private val states = mapOf( - "foo" to state + state1.key to state1, ) private val stateManager = mock { on { find(any()) } doReturn (states) @@ -30,7 +37,7 @@ class SessionTimeoutTaskProcessorTests { Schemas.ScheduledTask.SCHEDULED_TASK_NAME_SESSION_TIMEOUT, Schemas.ScheduledTask.SCHEDULED_TASK_NAME_SESSION_TIMEOUT, mock()) - private val now = Instant.now() + @Test fun `when empty list do nothing`() { val processor = SessionTimeoutTaskProcessor(stateManager) { now } @@ -55,16 +62,23 @@ class SessionTimeoutTaskProcessorTests { } @Test - @Disabled fun `when state found return`() { - whenever(stateManager.find(any())).doReturn(emptyMap()) val processor = SessionTimeoutTaskProcessor(stateManager) { now } val output = processor.onNext(listOf(record1)) - assertThat(output).isNotEmpty + assertThat(output).containsExactly( + Record( + Schemas.Flow.FLOW_TIMEOUT_TOPIC, + state1.key, + FlowTimeout( + state1.key, + Instant.ofEpochSecond(state1.metadata[STATE_META_SESSION_EXPIRY_KEY] as Long)) + ) + ) } @Test fun `when no states found return empty`() { + whenever(stateManager.find(any())).doReturn(emptyMap()) val processor = SessionTimeoutTaskProcessor(stateManager) { now } val output = processor.onNext(listOf(record1)) // TODO - better assertion when integrated diff --git a/gradle.properties b/gradle.properties index 54f03b14601..a21e985c8b1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -46,7 +46,8 @@ commonsTextVersion = 1.10.0 bouncycastleVersion=1.73 # Corda API libs revision (change in 4th digit indicates a breaking change) # Change to 5.0.0.xx-SNAPSHOT to pick up maven local published copy -cordaApiVersion=5.1.0.26-beta+ +#cordaApiVersion=5.1.0.26-beta+ +cordaApiVersion=5.1.0.26-alpha-1696254653687 disruptorVersion=3.4.4 felixConfigAdminVersion=1.9.26 diff --git a/libs/state-manager/state-manager-api/src/main/kotlin/net/corda/libs/statemanager/api/Metadata.kt b/libs/state-manager/state-manager-api/src/main/kotlin/net/corda/libs/statemanager/api/Metadata.kt index aa66c4a57e4..5099f2bf0fb 100644 --- a/libs/state-manager/state-manager-api/src/main/kotlin/net/corda/libs/statemanager/api/Metadata.kt +++ b/libs/state-manager/state-manager-api/src/main/kotlin/net/corda/libs/statemanager/api/Metadata.kt @@ -4,28 +4,26 @@ package net.corda.libs.statemanager.api * Mutable map that allows only primitive types to be used as values. */ class Metadata( - private val map: MutableMap = mutableMapOf() -) : MutableMap by map { - - private val supportedType = listOf( - String::class.java, - java.lang.String::class.java, - Number::class.java, - java.lang.Number::class.java, - Boolean::class.java, - java.lang.Boolean::class.java, - ) - - private fun isPrimitiveOrBoxedValue(value: Any): Boolean { - return supportedType.any { it.isAssignableFrom(value.javaClass) } + private val map: Map = emptyMap() +) : Map by map { + companion object { + private val supportedType = listOf( + String::class.java, + java.lang.String::class.java, + Number::class.java, + java.lang.Number::class.java, + Boolean::class.java, + java.lang.Boolean::class.java, + ) + private fun isPrimitiveOrBoxedValue(value: Any): Boolean { + return supportedType.any { it.isAssignableFrom(value.javaClass) } + } } - - override fun put(key: String, value: Any): Any? { - if (!isPrimitiveOrBoxedValue(value)) { - throw IllegalArgumentException("Type not supported: ${value::class}") + init { + map.filter { kvp -> !isPrimitiveOrBoxedValue(kvp.value) }.takeIf { it.isNotEmpty() }?.also { kvp -> + val invalidPairs = kvp.entries.joinToString { "${it.key}/${it.value::class.java.name}" } + throw IllegalArgumentException("Type(s) not supported: $invalidPairs") } - - return map.put(key, value) } override fun equals(other: Any?): Boolean { @@ -35,18 +33,15 @@ class Metadata( other as Metadata if (map != other.map) return false - if (supportedType != other.supportedType) return false return true } override fun hashCode(): Int { - var result = map.hashCode() - result = 31 * result + supportedType.hashCode() - return result + return map.hashCode() } } fun metadata(): Metadata = Metadata() -fun metadata(vararg pairs: Pair): Metadata = Metadata(mutableMapOf(*pairs)) +fun metadata(vararg pairs: Pair): Metadata = Metadata(mapOf(*pairs)) diff --git a/libs/state-manager/state-manager-api/src/test/kotlin/net/corda/libs/statemanager/api/Metadata.kt b/libs/state-manager/state-manager-api/src/test/kotlin/net/corda/libs/statemanager/api/Metadata.kt new file mode 100644 index 00000000000..8f07513fe3c --- /dev/null +++ b/libs/state-manager/state-manager-api/src/test/kotlin/net/corda/libs/statemanager/api/Metadata.kt @@ -0,0 +1,47 @@ +package net.corda.libs.statemanager.api + +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.junit.jupiter.api.assertThrows +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource +import java.util.stream.Stream + +class MetadataTests { + companion object { + @JvmStatic + private fun acceptedTypes(): Stream = + Stream.of( + "foo", + 123, + true + ) + } + @ParameterizedTest + @MethodSource("acceptedTypes") + fun `accept primitive types`(value: Any) { + assertDoesNotThrow { + Metadata(mapOf("foo" to value)) + } + } + + @Test + fun `fail all non-primitive types`() { + val list = listOf("Na Na Na Na Na Na Na Na", "Batman") + val ex = assertThrows { + Metadata(mapOf("joker" to Superman(1000), "batman" to list)) + } + assertThat(ex).hasMessageContainingAll("joker", "batman", Superman::class.java.name, list.javaClass.name) + } + + @Test + fun `equals works as expected with map`() { + val meta1 = Metadata(mapOf("foo" to "bar")) + val meta2 = Metadata(mapOf("foo" to "bar")) + assertThat(meta2).isEqualTo(meta1) + assertThat(meta2).isNotSameAs(meta1) + } + + data class Superman(val kudos: Int) +} \ No newline at end of file