Skip to content

Commit

Permalink
CORE-19256 A wrapper for all mediator state, to allow async outputs t…
Browse files Browse the repository at this point in the history
…o be stored along with the messaging clients state (#1442)

Store a list of bus bound output events per consumer input event along with the client supplied state object. MediatorState will be used by the mediator message pattern for all states.

A hash of ConsumerRecords (e.g Key + FlowEvent) will be used as the key to store bus bound output records of the message processor. e.g SessionEvents/FlowStatus,

Storing output events will allow them to be replayed when consumer input is replayed.
  • Loading branch information
LWogan authored Jan 15, 2024
1 parent 1601480 commit 450e3d4
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"type": "record",
"name": "MediatorReplayOutputEvent",
"namespace": "net.corda.data.messaging.mediator",
"doc": "Record the details of output events to be replayed if necessary.",
"fields": [
{
"name": "topic",
"type": "string",
"doc": "The topic the event should be sent to."
},
{
"name": "key",
"type": "bytes",
"doc": "Avro serialized bytes of the event key."
},
{
"name": "value",
"type": ["null", "bytes"],
"doc": "Avro serialized bytes of the event value."
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"type": "record",
"name": "MediatorReplayOutputEvents",
"namespace": "net.corda.data.messaging.mediator",
"doc": "Record the output events for a given input event, to be replayed by the mediator if necessary.",
"fields": [
{
"name": "inputEventHash",
"type": "bytes",
"doc": "Hash of the input event. Event key and value bytes are used to generated the hash. This hash will act as a unique key for consumer input events."
},
{
"name": "outputEvents",
"type": {
"type": "array",
"items": "net.corda.data.messaging.mediator.MediatorReplayOutputEvent"
},
"doc": "The list of output events for the given input events."
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"type": "record",
"name": "MediatorState",
"namespace": "net.corda.data.messaging.mediator",
"doc": "Mediator state wrapper to store replayed events",
"fields": [
{
"name": "state",
"type": ["null", "bytes"],
"doc": "The messaging library clients state."
},
{
"name": "outputEvents",
"type": {
"type": "array",
"items": "net.corda.data.messaging.mediator.MediatorReplayOutputEvents"
},
"doc": "Records output events to be sent to the message bus for each input event processed for this state."
}
]
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ cordaProductVersion = 5.2.0
# NOTE: update this each time this module contains a breaking change
## NOTE: currently this is a top level revision, so all API versions will line up, but this could be moved to
## a per module property in which case module versions can change independently.
cordaApiRevision = 27
cordaApiRevision = 28

# Main
kotlin.stdlib.default.dependency = false
Expand Down

0 comments on commit 450e3d4

Please sign in to comment.