Skip to content

Commit

Permalink
- SessionTimeoutTaskProcessor integration with new topic and AVRO obj…
Browse files Browse the repository at this point in the history
…ect.

- Fix Metadata so it's an immutable map.
- Add tests for Metadata checks.
  • Loading branch information
driessamyn committed Oct 2, 2023
1 parent 083efc2 commit 297c5ba
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,12 @@ class ScheduledTaskHandlerTest {
private fun createStateEntry(
key: String,
lastUpdated: Instant,
mapperState: FlowMapperStateType?
mapperState: FlowMapperStateType
): Pair<String, State> {
val metadata = metadata()
mapperState?.let {
metadata.put(FLOW_MAPPER_STATUS, it.toString())
}
val state = State(
key,
byteArrayOf(),
metadata = metadata,
metadata = metadata(FLOW_MAPPER_STATUS to mapperState.toString()),
modifiedTime = lastUpdated
)
return Pair(key, state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class FlowMaintenanceImpl @Activate constructor(

private val coordinator = coordinatorFactory.createCoordinator<FlowMaintenance>(::eventHandler)
override fun onConfigChange(config: Map<String, SmartConfig>) {
// TODO - fix config key. The state manager has nothing to do with messaging.
// TODO - fix config key (CORE-17437).
if(config.containsKey(ConfigKeys.MESSAGING_CONFIG)) {
val messagingConfig = config.getConfig(ConfigKeys.MESSAGING_CONFIG)
val stateManagerConfig = config.getConfig(ConfigKeys.MESSAGING_CONFIG)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package net.corda.flow.maintenance

import net.corda.data.flow.FlowTimeout
import net.corda.data.scheduler.ScheduledTaskTrigger
import net.corda.libs.statemanager.api.Operation
import net.corda.libs.statemanager.api.SingleKeyFilter
import net.corda.libs.statemanager.api.StateManager
import net.corda.messaging.api.processor.DurableProcessor
import net.corda.messaging.api.records.Record
import net.corda.schema.Schemas.Flow.FLOW_TIMEOUT_TOPIC
import net.corda.schema.Schemas.ScheduledTask
import org.slf4j.LoggerFactory
import java.time.Instant
Expand All @@ -16,6 +18,8 @@ class SessionTimeoutTaskProcessor(
) : DurableProcessor<String, ScheduledTaskTrigger> {
companion object {
private val logger = LoggerFactory.getLogger(SessionTimeoutTaskProcessor::class.java)
// TODO - this may need to move out somewhere else.
const val STATE_META_SESSION_EXPIRY_KEY = "session.expiry"
}
override val keyClass: Class<String>
get() = String::class.java
Expand All @@ -30,16 +34,21 @@ class SessionTimeoutTaskProcessor(
// TODO - we must be able to specify additional filters so we can limit to selecting those sessions that are still open
// TODO - we must be able to limit by type of state
val checkpoints = stateManager.find(
SingleKeyFilter("session.expiry", Operation.LesserThan, now())
SingleKeyFilter(STATE_META_SESSION_EXPIRY_KEY, Operation.LesserThan, now().epochSecond)
)
if (checkpoints.isEmpty()) {
logger.trace("No flows to time out")
emptyList()
} else {
// TODO - return an avro message (schema TBC) for each checkpoint
// TODO - define topic to publish message on
// TODO - take log message out when everything plumbed in.
logger.info("Trigger cleanup of $checkpoints")
emptyList()
checkpoints.map { kvp ->
Record(FLOW_TIMEOUT_TOPIC, kvp.key,
FlowTimeout(
kvp.value.key,
Instant.ofEpochSecond(kvp.value.metadata[STATE_META_SESSION_EXPIRY_KEY] as Long))
)
}
}
} ?: emptyList()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package net.corda.flow.maintenance

import net.corda.data.flow.FlowTimeout
import net.corda.data.scheduler.ScheduledTaskTrigger
import net.corda.flow.maintenance.SessionTimeoutTaskProcessor.Companion.STATE_META_SESSION_EXPIRY_KEY
import net.corda.libs.statemanager.api.Metadata
import net.corda.libs.statemanager.api.State
import net.corda.libs.statemanager.api.StateManager
import net.corda.messaging.api.records.Record
import net.corda.schema.Schemas
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.mockito.internal.verification.Times
import org.mockito.kotlin.any
Expand All @@ -19,9 +20,15 @@ import org.mockito.kotlin.whenever
import java.time.Instant

class SessionTimeoutTaskProcessorTests {
private val state = State("foo", randomBytes(), 0, Metadata())
private val now = Instant.now()
private val state1 =
State(
"foo",
randomBytes(),
0,
Metadata(mapOf(STATE_META_SESSION_EXPIRY_KEY to now.minusSeconds(1).epochSecond)))
private val states = mapOf(
"foo" to state
state1.key to state1,
)
private val stateManager = mock<StateManager> {
on { find(any()) } doReturn (states)
Expand All @@ -30,7 +37,7 @@ class SessionTimeoutTaskProcessorTests {
Schemas.ScheduledTask.SCHEDULED_TASK_NAME_SESSION_TIMEOUT,
Schemas.ScheduledTask.SCHEDULED_TASK_NAME_SESSION_TIMEOUT,
mock())
private val now = Instant.now()

@Test
fun `when empty list do nothing`() {
val processor = SessionTimeoutTaskProcessor(stateManager) { now }
Expand All @@ -55,16 +62,23 @@ class SessionTimeoutTaskProcessorTests {
}

@Test
@Disabled
fun `when state found return`() {
whenever(stateManager.find(any())).doReturn(emptyMap())
val processor = SessionTimeoutTaskProcessor(stateManager) { now }
val output = processor.onNext(listOf(record1))
assertThat(output).isNotEmpty
assertThat(output).containsExactly(
Record(
Schemas.Flow.FLOW_TIMEOUT_TOPIC,
state1.key,
FlowTimeout(
state1.key,
Instant.ofEpochSecond(state1.metadata[STATE_META_SESSION_EXPIRY_KEY] as Long))
)
)
}

@Test
fun `when no states found return empty`() {
whenever(stateManager.find(any())).doReturn(emptyMap())
val processor = SessionTimeoutTaskProcessor(stateManager) { now }
val output = processor.onNext(listOf(record1))
// TODO - better assertion when integrated
Expand Down
3 changes: 2 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ commonsTextVersion = 1.10.0
bouncycastleVersion=1.73
# Corda API libs revision (change in 4th digit indicates a breaking change)
# Change to 5.0.0.xx-SNAPSHOT to pick up maven local published copy
cordaApiVersion=5.1.0.26-beta+
#cordaApiVersion=5.1.0.26-beta+
cordaApiVersion=5.1.0.26-alpha-1696254653687

disruptorVersion=3.4.4
felixConfigAdminVersion=1.9.26
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,26 @@ package net.corda.libs.statemanager.api
* Mutable map that allows only primitive types to be used as values.
*/
class Metadata(
private val map: MutableMap<String, Any> = mutableMapOf()
) : MutableMap<String, Any> by map {

private val supportedType = listOf(
String::class.java,
java.lang.String::class.java,
Number::class.java,
java.lang.Number::class.java,
Boolean::class.java,
java.lang.Boolean::class.java,
)

private fun isPrimitiveOrBoxedValue(value: Any): Boolean {
return supportedType.any { it.isAssignableFrom(value.javaClass) }
private val map: Map<String, Any> = emptyMap()
) : Map<String, Any> by map {
companion object {
private val supportedType = listOf(
String::class.java,
java.lang.String::class.java,
Number::class.java,
java.lang.Number::class.java,
Boolean::class.java,
java.lang.Boolean::class.java,
)
private fun isPrimitiveOrBoxedValue(value: Any): Boolean {
return supportedType.any { it.isAssignableFrom(value.javaClass) }
}
}

override fun put(key: String, value: Any): Any? {
if (!isPrimitiveOrBoxedValue(value)) {
throw IllegalArgumentException("Type not supported: ${value::class}")
init {
map.filter { kvp -> !isPrimitiveOrBoxedValue(kvp.value) }.takeIf { it.isNotEmpty() }?.also { kvp ->
val invalidPairs = kvp.entries.joinToString { "${it.key}/${it.value::class.java.name}" }
throw IllegalArgumentException("Type(s) not supported: $invalidPairs")
}

return map.put(key, value)
}

override fun equals(other: Any?): Boolean {
Expand All @@ -35,18 +33,15 @@ class Metadata(
other as Metadata

if (map != other.map) return false
if (supportedType != other.supportedType) return false

return true
}

override fun hashCode(): Int {
var result = map.hashCode()
result = 31 * result + supportedType.hashCode()
return result
return map.hashCode()
}
}

fun metadata(): Metadata = Metadata()

fun metadata(vararg pairs: Pair<String, Any>): Metadata = Metadata(mutableMapOf(*pairs))
fun metadata(vararg pairs: Pair<String, Any>): Metadata = Metadata(mapOf(*pairs))
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package net.corda.libs.statemanager.api

import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import java.util.stream.Stream

class MetadataTests {
companion object {
@JvmStatic
private fun acceptedTypes(): Stream<Any> =
Stream.of(
"foo",
123,
true
)
}
@ParameterizedTest
@MethodSource("acceptedTypes")
fun `accept primitive types`(value: Any) {
assertDoesNotThrow {
Metadata(mapOf("foo" to value))
}
}

@Test
fun `fail all non-primitive types`() {
val list = listOf("Na Na Na Na Na Na Na Na", "Batman")
val ex = assertThrows<IllegalArgumentException> {
Metadata(mapOf("joker" to Superman(1000), "batman" to list))
}
assertThat(ex).hasMessageContainingAll("joker", "batman", Superman::class.java.name, list.javaClass.name)
}

@Test
fun `equals works as expected with map`() {
val meta1 = Metadata(mapOf("foo" to "bar"))
val meta2 = Metadata(mapOf("foo" to "bar"))
assertThat(meta2).isEqualTo(meta1)
assertThat(meta2).isNotSameAs(meta1)
}

data class Superman(val kudos: Int)
}

0 comments on commit 297c5ba

Please sign in to comment.