Skip to content

Commit

Permalink
CORE-20867 add a flow retry topic to handle retries of transient RPC …
Browse files Browse the repository at this point in the history
…calls (#1710) (#1712)

Add new FlowEvent payload type to capture the details required to generates an external event retry request for transient errors.

Co-authored-by: Lorcan Wogan <[email protected]>
  • Loading branch information
corda-jenkins-ci02[bot] and LWogan authored Dec 2, 2024
1 parent 8cf7d72 commit f2d9a46
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
"net.corda.data.flow.event.StartFlow",
"net.corda.data.flow.output.FlowStatus",
"net.corda.data.flow.event.SessionEvent",
"net.corda.data.flow.event.external.ExternalEventResponse"
"net.corda.data.flow.event.external.ExternalEventResponse",
"net.corda.data.flow.event.external.ExternalEventRetryRequest"
]
}
]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"type": "record",
"name": "ExternalEventRetryRequest",
"namespace": "net.corda.data.flow.event.external",
"doc": "This event captures the details of the external event request to allow it to be polled from the bus and retried.",
"fields": [
{
"name": "requestId",
"type": "string",
"doc": "The requestId of the external event request to retry."
},
{
"name": "timestamp",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
},
"doc": "Time ([Instant]) in milliseconds when the request was created. Ensures each request is unique for replay logic."
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ private Bus() {
public static final String KAFKA_BOOTSTRAP_SERVERS = KAFKA_PROPERTIES_COMMON + ".bootstrap.servers";
public static final String KAFKA_PROPERTIES_CONSUMER = KAFKA_PROPERTIES + ".consumer";
public static final String KAFKA_CONSUMER_MAX_POLL_INTERVAL = KAFKA_PROPERTIES_CONSUMER + ".max.poll.interval.ms";
public static final String KAFKA_CONSUMER_MAX_POLL_RECORDS = KAFKA_PROPERTIES_CONSUMER + ".max.poll.records";
public static final String KAFKA_PROPERTIES_PRODUCER = KAFKA_PROPERTIES + ".producer";
public static final String KAFKA_PRODUCER_CLIENT_ID = KAFKA_PROPERTIES_PRODUCER + ".client.id";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@
"default": 60000
},
"maxRetries": {
"description": "The maximum number of times Corda retries a request before returning an exception.",
"description": "The maximum number of times Corda retries a request before returning an exception for error cases other than transient errors. Transient errors will be retied until flows are cleaned up when the maxIdleTime expires.",
"type": "integer",
"minimum": 0,
"maximum": 2147483647,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,3 @@ topics:
producers:
- flow
config:

2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ cordaProductVersion = 5.3.0
# NOTE: update this each time this module contains a breaking change
## NOTE: currently this is a top level revision, so all API versions will line up, but this could be moved to
## a per module property in which case module versions can change independently.
cordaApiRevision = 16
cordaApiRevision = 17

# Main
kotlin.stdlib.default.dependency = false
Expand Down

0 comments on commit f2d9a46

Please sign in to comment.