Skip to content

Commit

Permalink
CORE-17360: Add topics for mapper scheduled cleanup and update cleanu…
Browse files Browse the repository at this point in the history
…p Avro records (#1265)

Introduces new topics for scheduled cleanup in the flow mapper, and updates the execute cleanup record.

Two new topics are introduced. One is used for publishing scheduled task triggers to the flow mapper. The other is used to publish execute cleanup events to.

Additionally, the execute cleanup Avro record has been updated to allow multiple state IDs to be provided. This allows cleanups to be performed in batches if required.
  • Loading branch information
JamesHR3 authored Sep 28, 2023
1 parent 8132b94 commit 4c8fcdf
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,13 @@
"docs": "When this event is processed the flow mapper state should be set to null",
"namespace": "net.corda.data.flow.event.mapper",
"fields": [
{
"name": "ids",
"docs": "A list of IDs of mapper states that should be deleted",
"type": {
"type": "array",
"items": "string"
}
}
]
}
2 changes: 2 additions & 0 deletions data/topic-schema/src/main/java/net/corda/schema/Schemas.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ private Flow() {
public static final String FLOW_MAPPER_EVENT_TOPIC = "flow.mapper.event";
public static final String FLOW_MAPPER_EVENT_STATE_TOPIC = getStateAndEventStateTopic(FLOW_MAPPER_EVENT_TOPIC);
public static final String FLOW_MAPPER_EVENT_DLQ_TOPIC = getDLQTopic(FLOW_MAPPER_EVENT_TOPIC);
public static final String FLOW_MAPPER_CLEANUP_TOPIC = "flow.mapper.cleanup";
}

/**
Expand Down Expand Up @@ -267,5 +268,6 @@ public static final class ScheduledTask {
private ScheduledTask() {}

public static final String SCHEDULED_TASK_DB_PROCESSOR = "scheduled.task.db.processor";
public static final String SCHEDULED_TASK_MAPPER_PROCESSOR = "scheduled.task.mapper.processor";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,10 @@ topics:
min.compaction.lag.ms: 60000
max.compaction.lag.ms: 604800000
min.cleanable.dirty.ratio: 0.5
FlowMapperCleanupTopic:
name: flow.mapper.cleanup
consumers:
- flowMapper
producers:
- flowMapper
config:
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,9 @@ topics:
- db
producers:
- db
ScheduledTaskFlowMapperProcessorTopic:
name: scheduled.task.mapper.processor
consumers:
- flowMapper
producers:
- db
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ cordaProductVersion = 5.1.0
# NOTE: update this each time this module contains a breaking change
## NOTE: currently this is a top level revision, so all API versions will line up, but this could be moved to
## a per module property in which case module versions can change independently.
cordaApiRevision = 24
cordaApiRevision = 25

# Main
kotlinVersion = 1.8.21
Expand Down

0 comments on commit 4c8fcdf

Please sign in to comment.