From c6323891e49d867253ed57cc6cad2e17e4dd0c0d Mon Sep 17 00:00:00 2001 From: Miljenko Brkic <97448832+mbrkic-r3@users.noreply.github.com> Date: Wed, 20 Dec 2023 10:07:28 +0000 Subject: [PATCH] CORE-18320 Support for a flow specific timeout on Flow Message API (#1394) Support for a flow specific timeout on Flow Message API. Client can specify a flow session specific timeout on Flow Message API. If set, it will be passed to the initiated party to also use it. If the client does not provide a timeout value then the value from Corda Configuration is used. A new class FlowSessionConfiguration is introduced to store session configuration. Methods that used configuration parameter requireClose will be deprecated with another PR. Avro schema for SessionState was modified, field requireClose was removed as this value will be stored in sessionProperties. --- .../scans/corda-application-5.2.0.yaml | 54 ++++++++++++++- .../application/messaging/FlowMessaging.java | 66 +++++++++++++++++++ .../messaging/FlowSessionConfiguration.java | 62 +++++++++++++++++ .../messaging/FlowMessagingJavaApiTest.java | 6 +- .../data/flow/state/session/SessionState.avsc | 5 -- gradle.properties | 2 +- 6 files changed, 183 insertions(+), 12 deletions(-) create mode 100644 application/src/main/java/net/corda/v5/application/messaging/FlowSessionConfiguration.java diff --git a/application/scans/corda-application-5.2.0.yaml b/application/scans/corda-application-5.2.0.yaml index 8c86ca41b3..28f677c51f 100644 --- a/application/scans/corda-application-5.2.0.yaml +++ b/application/scans/corda-application-5.2.0.yaml @@ -1328,9 +1328,10 @@ net.corda.v5.application.messaging.FlowMessaging: annotation: - NotNull type: net.corda.v5.base.types.MemberX500Name - requireClose: - annotation: [] - type: boolean + sessionConfiguration: + annotation: + - NotNull + type: net.corda.v5.application.messaging.FlowSessionConfiguration flowContextPropertiesBuilder: annotation: - NotNull @@ -1460,6 +1461,53 @@ net.corda.v5.application.messaging.FlowSession: annotation: - NotNull type: Object +net.corda.v5.application.messaging.FlowSessionConfiguration: + annotations: [] + type: public class + extends: null + implements: [] + interface: false + methods: + getTimeout: + annotations: [] + default: false + type: public + returnType: java.time.Duration + isRequireClose: + annotations: [] + default: false + type: public + returnType: boolean +net.corda.v5.application.messaging.FlowSessionConfiguration$Builder: + annotations: [] + type: public static class + extends: null + implements: [] + interface: false + methods: + build: + annotations: [] + default: false + type: public + returnType: net.corda.v5.application.messaging.FlowSessionConfiguration + requireClose: + annotations: [] + default: false + type: public + returnType: net.corda.v5.application.messaging.FlowSessionConfiguration$Builder + params: + requireClose: + annotation: [] + type: boolean + timeout: + annotations: [] + default: false + type: public + returnType: net.corda.v5.application.messaging.FlowSessionConfiguration$Builder + params: + timeout: + annotation: [] + type: java.time.Duration net.corda.v5.application.persistence.CordaPersistenceException: annotations: [] type: public final class diff --git a/application/src/main/java/net/corda/v5/application/messaging/FlowMessaging.java b/application/src/main/java/net/corda/v5/application/messaging/FlowMessaging.java index b83aa6a93d..8dd2ceedbb 100644 --- a/application/src/main/java/net/corda/v5/application/messaging/FlowMessaging.java +++ b/application/src/main/java/net/corda/v5/application/messaging/FlowMessaging.java @@ -106,6 +106,25 @@ public interface FlowMessaging { @NotNull FlowSession initiateFlow(@NotNull MemberX500Name x500Name, boolean requireClose); + /** + * Creates a communication session with a counterparty's {@link ResponderFlow}. Subsequently, you may send/receive using + * this session object. Note that this function does not communicate in itself. The counter-flow will be kicked off + * by the first send/receive. + *

+ * Initiated flows are initiated with context based on the context of the initiating flow at the point in time this + * method is called. The context of the initiating flow is snapshotted by the returned session. Altering the flow + * context has no effect on the context of the session after this point, and therefore it has no effect on the + * context of the initiated flow either. + * + * @param x500Name The X500 name of the member to communicate with. + * @param sessionConfiguration Session configuration (see {@link FlowSessionConfiguration}). + * + * @return The session. + */ + @Suspendable + @NotNull + FlowSession initiateFlow(@NotNull MemberX500Name x500Name, @NotNull FlowSessionConfiguration sessionConfiguration); + /** * Creates a communication session with another member. Subsequently, you may send/receive using this session object. * Note that this function does not communicate in itself. The counter-flow will be kicked off by the first @@ -189,6 +208,53 @@ public interface FlowMessaging { @NotNull FlowSession initiateFlow(@NotNull MemberX500Name x500Name, boolean requireClose, @NotNull FlowContextPropertiesBuilder flowContextPropertiesBuilder); + /** + * Creates a communication session with another member. Subsequently, you may send/receive using this session object. + * Note that this function does not communicate in itself. The counter-flow will be kicked off by the first + * send/receive. + *

+ * This overload takes a builder of context properties. Any properties set or modified against the context passed to + * this builder will be propagated to initiated flows and all that flow's initiated flows and sub flows down the + * stack. The properties passed to the builder are pre-populated with the current flow context properties, see + * {@link FlowContextProperties}. Altering the current flow context has no effect on the context of the session after the + * builder is applied and the session returned by this method, and therefore it has no effect on the context of the + * initiated flow either. + *

+ * Example of use in Kotlin. + * ```Kotlin + * val sessionConfig = FlowSessionConfiguration.Builder().requireClose(false).build() + * val flowSession = flowMessaging.initiateFlow(virtualNodeName, sessionConfig) { flowContextProperties -> + * flowContextProperties["key"] = "value" + * } + * ``` + * Example of use in Java. + * ```Java + * FlowSessionConfiguration sessionConfig = new FlowSessionConfiguration.Builder().requireClose(false).build(); + * FlowSession flowSession = flowMessaging.initiateFlow(virtualNodeName, sessionConfig, (flowContextProperties) -> { + * flowContextProperties.put("key", "value"); + * }); + * ``` + * + * @param x500Name The X500 name of the member to communicate with. + * @param sessionConfiguration Session configuration (see {@link FlowSessionConfiguration}). + * @param flowContextPropertiesBuilder A builder of context properties. + * + * @return The session. + * + * @throws IllegalArgumentException if the builder tries to set a property for which a platform property already + * exists or if the key is prefixed by {@link FlowContextProperties#CORDA_RESERVED_PREFIX}. See also + * {@link FlowContextProperties}. Any other + * exception thrown by the builder will also be thrown through here and should be avoided in the provided + * implementation, see {@link FlowContextPropertiesBuilder}. + */ + @Suspendable + @NotNull + FlowSession initiateFlow( + @NotNull MemberX500Name x500Name, + @NotNull FlowSessionConfiguration sessionConfiguration, + @NotNull FlowContextPropertiesBuilder flowContextPropertiesBuilder + ); + /** * Suspends until a message has been received for each session in the specified {@code sessions}. *

diff --git a/application/src/main/java/net/corda/v5/application/messaging/FlowSessionConfiguration.java b/application/src/main/java/net/corda/v5/application/messaging/FlowSessionConfiguration.java new file mode 100644 index 0000000000..8c832ec410 --- /dev/null +++ b/application/src/main/java/net/corda/v5/application/messaging/FlowSessionConfiguration.java @@ -0,0 +1,62 @@ +package net.corda.v5.application.messaging; + +import java.time.Duration; + +/** + * Flow session configuration. Instances should be created using {@link Builder}. + */ +public final class FlowSessionConfiguration { + private final boolean requireClose; + private final Duration timeout; + + private FlowSessionConfiguration(Builder builder) { + this.requireClose = builder.requireClose; + this.timeout = builder.timeout; + } + + public boolean isRequireClose() { + return requireClose; + } + + public Duration getTimeout() { + return timeout; + } + + public static final class Builder { + private boolean requireClose = true; + private Duration timeout; + + /** + * When set to true, the initiated party will send a close message after calling FlowSession.close() + * and the initiating party will suspend and wait to receive the message when they call FlowSession.close(). + * When set to false the session is marked as terminated immediately when close() is called. + * Default value is true. + * @param requireClose Flag that indicates whether close message is required. + * @return {@link Builder}. + */ + public Builder requireClose(boolean requireClose) { + this.requireClose = requireClose; + return this; + } + + /** + * The duration that Corda waits when no message has been received from a counterparty before + * causing the session to error. If set to null, value set in Corda Configuration will be used. + * Default value is null. + * @param timeout Session timeout. + * @return {@link Builder}. + */ + public Builder timeout(Duration timeout) { + this.timeout = timeout; + return this; + } + + /** + * Builds a new instance of {@link FlowSessionConfiguration}. + * @return a new instance of {@link FlowSessionConfiguration}. + */ + public FlowSessionConfiguration build() { + return new FlowSessionConfiguration(this); + } + } +} diff --git a/application/src/test/java/net/corda/v5/application/messaging/FlowMessagingJavaApiTest.java b/application/src/test/java/net/corda/v5/application/messaging/FlowMessagingJavaApiTest.java index 20caf32c3b..0a5f3e8ad0 100644 --- a/application/src/test/java/net/corda/v5/application/messaging/FlowMessagingJavaApiTest.java +++ b/application/src/test/java/net/corda/v5/application/messaging/FlowMessagingJavaApiTest.java @@ -28,7 +28,7 @@ public void initiateFlowParty() { @Test public void initiateFlowPartyWithBuilder() { final MemberX500Name counterparty = new MemberX500Name("Alice Corp", "LDN", "GB"); - when(flowMessaging.initiateFlow(eq(counterparty), any())).thenReturn(flowSession); + when(flowMessaging.initiateFlow(eq(counterparty), any(FlowContextPropertiesBuilder.class))).thenReturn(flowSession); FlowSession result = flowMessaging.initiateFlow(counterparty, (contextProperties) -> contextProperties.put("key", "value")); @@ -39,7 +39,7 @@ public void initiateFlowPartyWithBuilder() { @Test public void initiateFlowPartyWithBuilderRequireCloseTrue() { final MemberX500Name counterparty = new MemberX500Name("Alice Corp", "LDN", "GB"); - when(flowMessaging.initiateFlow(eq(counterparty), eq(true), any())).thenReturn(flowSession); + when(flowMessaging.initiateFlow(eq(counterparty), eq(true), any(FlowContextPropertiesBuilder.class))).thenReturn(flowSession); FlowSession result = flowMessaging.initiateFlow(counterparty, true, (contextProperties) -> contextProperties.put("key", "value")); @@ -50,7 +50,7 @@ public void initiateFlowPartyWithBuilderRequireCloseTrue() { @Test public void initiateFlowPartyWithBuilderRequireCloseFalse() { final MemberX500Name counterparty = new MemberX500Name("Alice Corp", "LDN", "GB"); - when(flowMessaging.initiateFlow(eq(counterparty), eq(false), any())).thenReturn(flowSession); + when(flowMessaging.initiateFlow(eq(counterparty), eq(false), any(FlowContextPropertiesBuilder.class))).thenReturn(flowSession); FlowSession result = flowMessaging.initiateFlow(counterparty, false, (contextProperties) -> contextProperties.put("key", "value")); diff --git a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/state/session/SessionState.avsc b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/state/session/SessionState.avsc index dea2cbcfaa..605f2ac894 100644 --- a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/state/session/SessionState.avsc +++ b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/state/session/SessionState.avsc @@ -29,11 +29,6 @@ "type": "net.corda.data.identity.HoldingIdentity", "doc": "Identity of the counterparty in the session." }, - { - "name": "requireClose", - "type": "boolean", - "doc": "True if the user has set requireClose to be true when calling initiate flow. False otherwise." - }, { "name": "receivedEventsState", "type": "net.corda.data.flow.state.session.SessionProcessState", diff --git a/gradle.properties b/gradle.properties index ecc8d115d6..95e765185c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -5,7 +5,7 @@ cordaProductVersion = 5.2.0 # NOTE: update this each time this module contains a breaking change ## NOTE: currently this is a top level revision, so all API versions will line up, but this could be moved to ## a per module property in which case module versions can change independently. -cordaApiRevision = 19 +cordaApiRevision = 20 # Main kotlin.stdlib.default.dependency = false