Skip to content

Commit

Permalink
CORE-18320 Support for a flow specific timeout on Flow Message API (#…
Browse files Browse the repository at this point in the history
…1394)

Support for a flow specific timeout on Flow Message API. Client can specify a flow session specific timeout on Flow Message API. If set, it will be passed to the initiated party to also use it. If the client does not provide a timeout value then the value from Corda Configuration is used.

A new class FlowSessionConfiguration is introduced to store session configuration. Methods that used configuration parameter requireClose will be deprecated with another PR.

Avro schema for SessionState was modified, field requireClose was removed as this value will be stored in sessionProperties.
  • Loading branch information
mbrkic-r3 authored Dec 20, 2023
1 parent b53f225 commit c632389
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 12 deletions.
54 changes: 51 additions & 3 deletions application/scans/corda-application-5.2.0.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1328,9 +1328,10 @@ net.corda.v5.application.messaging.FlowMessaging:
annotation:
- NotNull
type: net.corda.v5.base.types.MemberX500Name
requireClose:
annotation: []
type: boolean
sessionConfiguration:
annotation:
- NotNull
type: net.corda.v5.application.messaging.FlowSessionConfiguration
flowContextPropertiesBuilder:
annotation:
- NotNull
Expand Down Expand Up @@ -1460,6 +1461,53 @@ net.corda.v5.application.messaging.FlowSession:
annotation:
- NotNull
type: Object
net.corda.v5.application.messaging.FlowSessionConfiguration:
annotations: []
type: public class
extends: null
implements: []
interface: false
methods:
getTimeout:
annotations: []
default: false
type: public
returnType: java.time.Duration
isRequireClose:
annotations: []
default: false
type: public
returnType: boolean
net.corda.v5.application.messaging.FlowSessionConfiguration$Builder:
annotations: []
type: public static class
extends: null
implements: []
interface: false
methods:
build:
annotations: []
default: false
type: public
returnType: net.corda.v5.application.messaging.FlowSessionConfiguration
requireClose:
annotations: []
default: false
type: public
returnType: net.corda.v5.application.messaging.FlowSessionConfiguration$Builder
params:
requireClose:
annotation: []
type: boolean
timeout:
annotations: []
default: false
type: public
returnType: net.corda.v5.application.messaging.FlowSessionConfiguration$Builder
params:
timeout:
annotation: []
type: java.time.Duration
net.corda.v5.application.persistence.CordaPersistenceException:
annotations: []
type: public final class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,25 @@ public interface FlowMessaging {
@NotNull
FlowSession initiateFlow(@NotNull MemberX500Name x500Name, boolean requireClose);

/**
* Creates a communication session with a counterparty's {@link ResponderFlow}. Subsequently, you may send/receive using
* this session object. Note that this function does not communicate in itself. The counter-flow will be kicked off
* by the first send/receive.
* <p>
* Initiated flows are initiated with context based on the context of the initiating flow at the point in time this
* method is called. The context of the initiating flow is snapshotted by the returned session. Altering the flow
* context has no effect on the context of the session after this point, and therefore it has no effect on the
* context of the initiated flow either.
*
* @param x500Name The X500 name of the member to communicate with.
* @param sessionConfiguration Session configuration (see {@link FlowSessionConfiguration}).
*
* @return The session.
*/
@Suspendable
@NotNull
FlowSession initiateFlow(@NotNull MemberX500Name x500Name, @NotNull FlowSessionConfiguration sessionConfiguration);

/**
* Creates a communication session with another member. Subsequently, you may send/receive using this session object.
* Note that this function does not communicate in itself. The counter-flow will be kicked off by the first
Expand Down Expand Up @@ -189,6 +208,53 @@ public interface FlowMessaging {
@NotNull
FlowSession initiateFlow(@NotNull MemberX500Name x500Name, boolean requireClose, @NotNull FlowContextPropertiesBuilder flowContextPropertiesBuilder);

/**
* Creates a communication session with another member. Subsequently, you may send/receive using this session object.
* Note that this function does not communicate in itself. The counter-flow will be kicked off by the first
* send/receive.
* <p>
* This overload takes a builder of context properties. Any properties set or modified against the context passed to
* this builder will be propagated to initiated flows and all that flow's initiated flows and sub flows down the
* stack. The properties passed to the builder are pre-populated with the current flow context properties, see
* {@link FlowContextProperties}. Altering the current flow context has no effect on the context of the session after the
* builder is applied and the session returned by this method, and therefore it has no effect on the context of the
* initiated flow either.
* <p>
* Example of use in Kotlin.
* ```Kotlin
* val sessionConfig = FlowSessionConfiguration.Builder().requireClose(false).build()
* val flowSession = flowMessaging.initiateFlow(virtualNodeName, sessionConfig) { flowContextProperties ->
* flowContextProperties["key"] = "value"
* }
* ```
* Example of use in Java.
* ```Java
* FlowSessionConfiguration sessionConfig = new FlowSessionConfiguration.Builder().requireClose(false).build();
* FlowSession flowSession = flowMessaging.initiateFlow(virtualNodeName, sessionConfig, (flowContextProperties) -> {
* flowContextProperties.put("key", "value");
* });
* ```
*
* @param x500Name The X500 name of the member to communicate with.
* @param sessionConfiguration Session configuration (see {@link FlowSessionConfiguration}).
* @param flowContextPropertiesBuilder A builder of context properties.
*
* @return The session.
*
* @throws IllegalArgumentException if the builder tries to set a property for which a platform property already
* exists or if the key is prefixed by {@link FlowContextProperties#CORDA_RESERVED_PREFIX}. See also
* {@link FlowContextProperties}. Any other
* exception thrown by the builder will also be thrown through here and should be avoided in the provided
* implementation, see {@link FlowContextPropertiesBuilder}.
*/
@Suspendable
@NotNull
FlowSession initiateFlow(
@NotNull MemberX500Name x500Name,
@NotNull FlowSessionConfiguration sessionConfiguration,
@NotNull FlowContextPropertiesBuilder flowContextPropertiesBuilder
);

/**
* Suspends until a message has been received for each session in the specified {@code sessions}.
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package net.corda.v5.application.messaging;

import java.time.Duration;

/**
* Flow session configuration. Instances should be created using {@link Builder}.
*/
public final class FlowSessionConfiguration {
private final boolean requireClose;
private final Duration timeout;

private FlowSessionConfiguration(Builder builder) {
this.requireClose = builder.requireClose;
this.timeout = builder.timeout;
}

public boolean isRequireClose() {
return requireClose;
}

public Duration getTimeout() {
return timeout;
}

public static final class Builder {
private boolean requireClose = true;
private Duration timeout;

/**
* When set to true, the initiated party will send a close message after calling FlowSession.close()
* and the initiating party will suspend and wait to receive the message when they call FlowSession.close().
* When set to false the session is marked as terminated immediately when close() is called.
* Default value is true.
* @param requireClose Flag that indicates whether close message is required.
* @return {@link Builder}.
*/
public Builder requireClose(boolean requireClose) {
this.requireClose = requireClose;
return this;
}

/**
* The duration that Corda waits when no message has been received from a counterparty before
* causing the session to error. If set to null, value set in Corda Configuration will be used.
* Default value is null.
* @param timeout Session timeout.
* @return {@link Builder}.
*/
public Builder timeout(Duration timeout) {
this.timeout = timeout;
return this;
}

/**
* Builds a new instance of {@link FlowSessionConfiguration}.
* @return a new instance of {@link FlowSessionConfiguration}.
*/
public FlowSessionConfiguration build() {
return new FlowSessionConfiguration(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public void initiateFlowParty() {
@Test
public void initiateFlowPartyWithBuilder() {
final MemberX500Name counterparty = new MemberX500Name("Alice Corp", "LDN", "GB");
when(flowMessaging.initiateFlow(eq(counterparty), any())).thenReturn(flowSession);
when(flowMessaging.initiateFlow(eq(counterparty), any(FlowContextPropertiesBuilder.class))).thenReturn(flowSession);

FlowSession result = flowMessaging.initiateFlow(counterparty, (contextProperties) -> contextProperties.put("key", "value"));

Expand All @@ -39,7 +39,7 @@ public void initiateFlowPartyWithBuilder() {
@Test
public void initiateFlowPartyWithBuilderRequireCloseTrue() {
final MemberX500Name counterparty = new MemberX500Name("Alice Corp", "LDN", "GB");
when(flowMessaging.initiateFlow(eq(counterparty), eq(true), any())).thenReturn(flowSession);
when(flowMessaging.initiateFlow(eq(counterparty), eq(true), any(FlowContextPropertiesBuilder.class))).thenReturn(flowSession);

FlowSession result = flowMessaging.initiateFlow(counterparty, true, (contextProperties) -> contextProperties.put("key", "value"));

Expand All @@ -50,7 +50,7 @@ public void initiateFlowPartyWithBuilderRequireCloseTrue() {
@Test
public void initiateFlowPartyWithBuilderRequireCloseFalse() {
final MemberX500Name counterparty = new MemberX500Name("Alice Corp", "LDN", "GB");
when(flowMessaging.initiateFlow(eq(counterparty), eq(false), any())).thenReturn(flowSession);
when(flowMessaging.initiateFlow(eq(counterparty), eq(false), any(FlowContextPropertiesBuilder.class))).thenReturn(flowSession);

FlowSession result = flowMessaging.initiateFlow(counterparty, false, (contextProperties) -> contextProperties.put("key", "value"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@
"type": "net.corda.data.identity.HoldingIdentity",
"doc": "Identity of the counterparty in the session."
},
{
"name": "requireClose",
"type": "boolean",
"doc": "True if the user has set requireClose to be true when calling initiate flow. False otherwise."
},
{
"name": "receivedEventsState",
"type": "net.corda.data.flow.state.session.SessionProcessState",
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ cordaProductVersion = 5.2.0
# NOTE: update this each time this module contains a breaking change
## NOTE: currently this is a top level revision, so all API versions will line up, but this could be moved to
## a per module property in which case module versions can change independently.
cordaApiRevision = 19
cordaApiRevision = 20

# Main
kotlin.stdlib.default.dependency = false
Expand Down

0 comments on commit c632389

Please sign in to comment.