From 4c8fcdf52039a6dcfdd7ad4f4ff8de16cb892218 Mon Sep 17 00:00:00 2001 From: James Higgs <45565019+JamesHR3@users.noreply.github.com> Date: Thu, 28 Sep 2023 12:07:18 +0100 Subject: [PATCH] CORE-17360: Add topics for mapper scheduled cleanup and update cleanup Avro records (#1265) Introduces new topics for scheduled cleanup in the flow mapper, and updates the execute cleanup record. Two new topics are introduced. One is used for publishing scheduled task triggers to the flow mapper. The other is used to publish execute cleanup events to. Additionally, the execute cleanup Avro record has been updated to allow multiple state IDs to be provided. This allows cleanups to be performed in batches if required. --- .../net/corda/data/flow/event/mapper/ExecuteCleanup.avsc | 8 ++++++++ .../src/main/java/net/corda/schema/Schemas.java | 2 ++ .../src/main/resources/net/corda/schema/Flow.yaml | 7 +++++++ .../main/resources/net/corda/schema/ScheduledTask.yaml | 6 ++++++ gradle.properties | 2 +- 5 files changed, 24 insertions(+), 1 deletion(-) diff --git a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/mapper/ExecuteCleanup.avsc b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/mapper/ExecuteCleanup.avsc index c04b43197d..e5e1595715 100644 --- a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/mapper/ExecuteCleanup.avsc +++ b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/mapper/ExecuteCleanup.avsc @@ -4,5 +4,13 @@ "docs": "When this event is processed the flow mapper state should be set to null", "namespace": "net.corda.data.flow.event.mapper", "fields": [ + { + "name": "ids", + "docs": "A list of IDs of mapper states that should be deleted", + "type": { + "type": "array", + "items": "string" + } + } ] } diff --git a/data/topic-schema/src/main/java/net/corda/schema/Schemas.java b/data/topic-schema/src/main/java/net/corda/schema/Schemas.java index 342ae10cdc..33d6590895 100644 --- a/data/topic-schema/src/main/java/net/corda/schema/Schemas.java +++ b/data/topic-schema/src/main/java/net/corda/schema/Schemas.java @@ -103,6 +103,7 @@ private Flow() { public static final String FLOW_MAPPER_EVENT_TOPIC = "flow.mapper.event"; public static final String FLOW_MAPPER_EVENT_STATE_TOPIC = getStateAndEventStateTopic(FLOW_MAPPER_EVENT_TOPIC); public static final String FLOW_MAPPER_EVENT_DLQ_TOPIC = getDLQTopic(FLOW_MAPPER_EVENT_TOPIC); + public static final String FLOW_MAPPER_CLEANUP_TOPIC = "flow.mapper.cleanup"; } /** @@ -267,5 +268,6 @@ public static final class ScheduledTask { private ScheduledTask() {} public static final String SCHEDULED_TASK_DB_PROCESSOR = "scheduled.task.db.processor"; + public static final String SCHEDULED_TASK_MAPPER_PROCESSOR = "scheduled.task.mapper.processor"; } } diff --git a/data/topic-schema/src/main/resources/net/corda/schema/Flow.yaml b/data/topic-schema/src/main/resources/net/corda/schema/Flow.yaml index 04a0e5236c..0b29c6fce0 100644 --- a/data/topic-schema/src/main/resources/net/corda/schema/Flow.yaml +++ b/data/topic-schema/src/main/resources/net/corda/schema/Flow.yaml @@ -80,3 +80,10 @@ topics: min.compaction.lag.ms: 60000 max.compaction.lag.ms: 604800000 min.cleanable.dirty.ratio: 0.5 + FlowMapperCleanupTopic: + name: flow.mapper.cleanup + consumers: + - flowMapper + producers: + - flowMapper + config: diff --git a/data/topic-schema/src/main/resources/net/corda/schema/ScheduledTask.yaml b/data/topic-schema/src/main/resources/net/corda/schema/ScheduledTask.yaml index e0a19daa79..fa45e6401c 100644 --- a/data/topic-schema/src/main/resources/net/corda/schema/ScheduledTask.yaml +++ b/data/topic-schema/src/main/resources/net/corda/schema/ScheduledTask.yaml @@ -5,3 +5,9 @@ topics: - db producers: - db + ScheduledTaskFlowMapperProcessorTopic: + name: scheduled.task.mapper.processor + consumers: + - flowMapper + producers: + - db diff --git a/gradle.properties b/gradle.properties index 10fdad95d7..d2908e1b60 100644 --- a/gradle.properties +++ b/gradle.properties @@ -9,7 +9,7 @@ cordaProductVersion = 5.1.0 # NOTE: update this each time this module contains a breaking change ## NOTE: currently this is a top level revision, so all API versions will line up, but this could be moved to ## a per module property in which case module versions can change independently. -cordaApiRevision = 24 +cordaApiRevision = 25 # Main kotlinVersion = 1.8.21