From 450e3d406416937e318f630fac352f7a98f469d7 Mon Sep 17 00:00:00 2001 From: Lorcan Wogan <69468264+LWogan@users.noreply.github.com> Date: Mon, 15 Jan 2024 16:02:32 +0000 Subject: [PATCH] CORE-19256 A wrapper for all mediator state, to allow async outputs to be stored along with the messaging clients state (#1442) Store a list of bus bound output events per consumer input event along with the client supplied state object. MediatorState will be used by the mediator message pattern for all states. A hash of ConsumerRecords (e.g Key + FlowEvent) will be used as the key to store bus bound output records of the message processor. e.g SessionEvents/FlowStatus, Storing output events will allow them to be replayed when consumer input is replayed. --- .../mediator/MediatorReplayOutputEvent.avsc | 23 +++++++++++++++++++ .../mediator/MediatorReplayOutputEvents.avsc | 21 +++++++++++++++++ .../messaging/mediator/MediatorState.avsc | 21 +++++++++++++++++ gradle.properties | 2 +- 4 files changed, 66 insertions(+), 1 deletion(-) create mode 100644 data/avro-schema/src/main/resources/avro/net/corda/data/messaging/mediator/MediatorReplayOutputEvent.avsc create mode 100644 data/avro-schema/src/main/resources/avro/net/corda/data/messaging/mediator/MediatorReplayOutputEvents.avsc create mode 100644 data/avro-schema/src/main/resources/avro/net/corda/data/messaging/mediator/MediatorState.avsc diff --git a/data/avro-schema/src/main/resources/avro/net/corda/data/messaging/mediator/MediatorReplayOutputEvent.avsc b/data/avro-schema/src/main/resources/avro/net/corda/data/messaging/mediator/MediatorReplayOutputEvent.avsc new file mode 100644 index 0000000000..6627ac4c47 --- /dev/null +++ b/data/avro-schema/src/main/resources/avro/net/corda/data/messaging/mediator/MediatorReplayOutputEvent.avsc @@ -0,0 +1,23 @@ +{ + "type": "record", + "name": "MediatorReplayOutputEvent", + "namespace": "net.corda.data.messaging.mediator", + "doc": "Record the details of output events to be replayed if necessary.", + "fields": [ + { + "name": "topic", + "type": "string", + "doc": "The topic the event should be sent to." + }, + { + "name": "key", + "type": "bytes", + "doc": "Avro serialized bytes of the event key." + }, + { + "name": "value", + "type": ["null", "bytes"], + "doc": "Avro serialized bytes of the event value." + } + ] +} \ No newline at end of file diff --git a/data/avro-schema/src/main/resources/avro/net/corda/data/messaging/mediator/MediatorReplayOutputEvents.avsc b/data/avro-schema/src/main/resources/avro/net/corda/data/messaging/mediator/MediatorReplayOutputEvents.avsc new file mode 100644 index 0000000000..52d2b0875c --- /dev/null +++ b/data/avro-schema/src/main/resources/avro/net/corda/data/messaging/mediator/MediatorReplayOutputEvents.avsc @@ -0,0 +1,21 @@ +{ + "type": "record", + "name": "MediatorReplayOutputEvents", + "namespace": "net.corda.data.messaging.mediator", + "doc": "Record the output events for a given input event, to be replayed by the mediator if necessary.", + "fields": [ + { + "name": "inputEventHash", + "type": "bytes", + "doc": "Hash of the input event. Event key and value bytes are used to generated the hash. This hash will act as a unique key for consumer input events." + }, + { + "name": "outputEvents", + "type": { + "type": "array", + "items": "net.corda.data.messaging.mediator.MediatorReplayOutputEvent" + }, + "doc": "The list of output events for the given input events." + } + ] +} \ No newline at end of file diff --git a/data/avro-schema/src/main/resources/avro/net/corda/data/messaging/mediator/MediatorState.avsc b/data/avro-schema/src/main/resources/avro/net/corda/data/messaging/mediator/MediatorState.avsc new file mode 100644 index 0000000000..fcb7befe16 --- /dev/null +++ b/data/avro-schema/src/main/resources/avro/net/corda/data/messaging/mediator/MediatorState.avsc @@ -0,0 +1,21 @@ +{ + "type": "record", + "name": "MediatorState", + "namespace": "net.corda.data.messaging.mediator", + "doc": "Mediator state wrapper to store replayed events", + "fields": [ + { + "name": "state", + "type": ["null", "bytes"], + "doc": "The messaging library clients state." + }, + { + "name": "outputEvents", + "type": { + "type": "array", + "items": "net.corda.data.messaging.mediator.MediatorReplayOutputEvents" + }, + "doc": "Records output events to be sent to the message bus for each input event processed for this state." + } + ] +} \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index b3c5e874a8..115c77a2f4 100644 --- a/gradle.properties +++ b/gradle.properties @@ -5,7 +5,7 @@ cordaProductVersion = 5.2.0 # NOTE: update this each time this module contains a breaking change ## NOTE: currently this is a top level revision, so all API versions will line up, but this could be moved to ## a per module property in which case module versions can change independently. -cordaApiRevision = 27 +cordaApiRevision = 28 # Main kotlin.stdlib.default.dependency = false