Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CORE-18320 Support for a flow specific timeout on Flow Message API #1394

Merged
merged 25 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3be95e2
CORE-18320 Support for a flow specific timeout on Flow Message API. C…
mbrkic-r3 Dec 12, 2023
98f3c00
CORE-18320 Increased API version
mbrkic-r3 Dec 12, 2023
1cded1a
CORE-18320 Set default value for SessionState.sessionTimeout
mbrkic-r3 Dec 13, 2023
89d6bbe
Merge branch 'release/os/5.2' into mbrkic-r3/CORE-18320/flow-timeout
mbrkic-r3 Dec 13, 2023
c887c98
Merge branch 'release/os/5.2' into mbrkic-r3/CORE-18320/flow-timeout
mbrkic-r3 Dec 14, 2023
42ccb7e
CORE-18320 Using builder to create session configuration.
mbrkic-r3 Dec 14, 2023
72dc4e5
Merge remote-tracking branch 'origin/mbrkic-r3/CORE-18320/flow-timeou…
mbrkic-r3 Dec 14, 2023
18f2352
CORE-18320 Removed "requireClose" and "sessionTimeout" from SessionSt…
mbrkic-r3 Dec 14, 2023
75122e1
Merge remote-tracking branch 'origin/release/os/5.2' into mbrkic-r3/C…
mbrkic-r3 Dec 15, 2023
82d9704
CORE-18320 Increased APi version
mbrkic-r3 Dec 15, 2023
602a075
Merge remote-tracking branch 'origin/release/os/5.2' into mbrkic-r3/C…
mbrkic-r3 Dec 15, 2023
fb4e495
Merge branch 'release/os/5.2' into mbrkic-r3/CORE-18320/flow-timeout
mbrkic-r3 Dec 15, 2023
dfdf2a1
CORE-18320 Increased APi version
mbrkic-r3 Dec 15, 2023
47d57ec
Merge branch 'release/os/5.2' into mbrkic-r3/CORE-18320/flow-timeout
mbrkic-r3 Dec 18, 2023
1b54ea7
CORE-18320 Increased APi version
mbrkic-r3 Dec 18, 2023
95d8f7f
CORE-18320 Deprecated FlowMessaging functions that use argument requi…
mbrkic-r3 Dec 18, 2023
b80aa88
CORE-18320 Deprecated FlowMessaging functions that use argument requi…
mbrkic-r3 Dec 18, 2023
e3ac9df
CORE-18320 ./gradlew cementApi
mbrkic-r3 Dec 18, 2023
2dd4186
CORE-18320 Removed deprecations (this change will be in a separate PR)
mbrkic-r3 Dec 19, 2023
30778bc
Merge remote-tracking branch 'origin/release/os/5.2' into mbrkic-r3/C…
mbrkic-r3 Dec 19, 2023
061c57c
CORE-18320 Class FlowSessionConfiguration and related Builder made final
mbrkic-r3 Dec 19, 2023
52b6a1b
CORE-18320 Increased API version
mbrkic-r3 Dec 20, 2023
ad4863b
Merge remote-tracking branch 'origin/release/os/5.2' into mbrkic-r3/C…
mbrkic-r3 Dec 20, 2023
5f5034c
CORE-18320 Increased API version
mbrkic-r3 Dec 20, 2023
40e1fe0
Merge remote-tracking branch 'origin/release/os/5.2' into mbrkic-r3/C…
mbrkic-r3 Dec 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,25 @@ public interface FlowMessaging {
@NotNull
FlowSession initiateFlow(@NotNull MemberX500Name x500Name, boolean requireClose);
mbrkic-r3 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Creates a communication session with a counterparty's {@link ResponderFlow}. Subsequently, you may send/receive using
* this session object. Note that this function does not communicate in itself. The counter-flow will be kicked off
* by the first send/receive.
* <p>
* Initiated flows are initiated with context based on the context of the initiating flow at the point in time this
* method is called. The context of the initiating flow is snapshotted by the returned session. Altering the flow
* context has no effect on the context of the session after this point, and therefore it has no effect on the
* context of the initiated flow either.
*
* @param x500Name The X500 name of the member to communicate with.
* @param sessionConfiguration Session configuration (see {@link FlowSessionConfiguration}).
*
* @return The session.
*/
@Suspendable
@NotNull
FlowSession initiateFlow(@NotNull MemberX500Name x500Name, @NotNull FlowSessionConfiguration sessionConfiguration);

/**
* Creates a communication session with another member. Subsequently, you may send/receive using this session object.
* Note that this function does not communicate in itself. The counter-flow will be kicked off by the first
Expand Down Expand Up @@ -189,6 +208,53 @@ public interface FlowMessaging {
@NotNull
FlowSession initiateFlow(@NotNull MemberX500Name x500Name, boolean requireClose, @NotNull FlowContextPropertiesBuilder flowContextPropertiesBuilder);

/**
* Creates a communication session with another member. Subsequently, you may send/receive using this session object.
* Note that this function does not communicate in itself. The counter-flow will be kicked off by the first
* send/receive.
* <p>
* This overload takes a builder of context properties. Any properties set or modified against the context passed to
* this builder will be propagated to initiated flows and all that flow's initiated flows and sub flows down the
* stack. The properties passed to the builder are pre-populated with the current flow context properties, see
* {@link FlowContextProperties}. Altering the current flow context has no effect on the context of the session after the
* builder is applied and the session returned by this method, and therefore it has no effect on the context of the
* initiated flow either.
* <p>
* Example of use in Kotlin.
* ```Kotlin
* val sessionConfig = FlowSessionConfiguration.Builder().requireClose(false).build()
* val flowSession = flowMessaging.initiateFlow(virtualNodeName, sessionConfig) { flowContextProperties ->
* flowContextProperties["key"] = "value"
* }
* ```
* Example of use in Java.
* ```Java
* FlowSessionConfiguration sessionConfig = new FlowSessionConfiguration.Builder().requireClose(false).build();
* FlowSession flowSession = flowMessaging.initiateFlow(virtualNodeName, sessionConfig, (flowContextProperties) -> {
* flowContextProperties.put("key", "value");
* });
* ```
*
* @param x500Name The X500 name of the member to communicate with.
* @param sessionConfiguration Session configuration (see {@link FlowSessionConfiguration}).
* @param flowContextPropertiesBuilder A builder of context properties.
*
* @return The session.
*
* @throws IllegalArgumentException if the builder tries to set a property for which a platform property already
* exists or if the key is prefixed by {@link FlowContextProperties#CORDA_RESERVED_PREFIX}. See also
* {@link FlowContextProperties}. Any other
* exception thrown by the builder will also be thrown through here and should be avoided in the provided
* implementation, see {@link FlowContextPropertiesBuilder}.
*/
@Suspendable
@NotNull
FlowSession initiateFlow(
@NotNull MemberX500Name x500Name,
@NotNull FlowSessionConfiguration sessionConfiguration,
@NotNull FlowContextPropertiesBuilder flowContextPropertiesBuilder
);

/**
* Suspends until a message has been received for each session in the specified {@code sessions}.
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package net.corda.v5.application.messaging;

import java.time.Duration;

/**
* Flow session configuration. Instances should be created using {@link Builder}.
*/
public class FlowSessionConfiguration {
mbrkic-r3 marked this conversation as resolved.
Show resolved Hide resolved
private final boolean requireClose;
private final Duration timeout;

private FlowSessionConfiguration(Builder builder) {
this.requireClose = builder.requireClose;
this.timeout = builder.timeout;
}

public boolean isRequireClose() {
return requireClose;
}

public Duration getTimeout() {
return timeout;
}

public static class Builder {
private boolean requireClose = true;
private Duration timeout;

/**
* When set to true, the initiated party will send a close message after calling FlowSession.close()
* and the initiating party will suspend and wait to receive the message when they call FlowSession.close().
* When set to false the session is marked as terminated immediately when close() is called.
* Default value is true.
* @param requireClose Flag that indicates whether close message is required.
* @return {@link Builder}.
*/
public Builder requireClose(boolean requireClose) {
this.requireClose = requireClose;
return this;
}

/**
* The duration that Corda waits when no message has been received from a counterparty before
* causing the session to error. If set to null, value set in Corda Configuration will be used.
* Default value is null.
* @param timeout Session timeout.
* @return {@link Builder}.
*/
public Builder timeout(Duration timeout) {
this.timeout = timeout;
return this;
}

/**
* Builds a new instance of {@link FlowSessionConfiguration}.
* @return a new instance of {@link FlowSessionConfiguration}.
*/
public FlowSessionConfiguration build() {
return new FlowSessionConfiguration(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public void initiateFlowParty() {
@Test
public void initiateFlowPartyWithBuilder() {
final MemberX500Name counterparty = new MemberX500Name("Alice Corp", "LDN", "GB");
when(flowMessaging.initiateFlow(eq(counterparty), any())).thenReturn(flowSession);
when(flowMessaging.initiateFlow(eq(counterparty), any(FlowContextPropertiesBuilder.class))).thenReturn(flowSession);

FlowSession result = flowMessaging.initiateFlow(counterparty, (contextProperties) -> contextProperties.put("key", "value"));

Expand All @@ -39,7 +39,7 @@ public void initiateFlowPartyWithBuilder() {
@Test
public void initiateFlowPartyWithBuilderRequireCloseTrue() {
final MemberX500Name counterparty = new MemberX500Name("Alice Corp", "LDN", "GB");
when(flowMessaging.initiateFlow(eq(counterparty), eq(true), any())).thenReturn(flowSession);
when(flowMessaging.initiateFlow(eq(counterparty), eq(true), any(FlowContextPropertiesBuilder.class))).thenReturn(flowSession);

FlowSession result = flowMessaging.initiateFlow(counterparty, true, (contextProperties) -> contextProperties.put("key", "value"));

Expand All @@ -50,7 +50,7 @@ public void initiateFlowPartyWithBuilderRequireCloseTrue() {
@Test
public void initiateFlowPartyWithBuilderRequireCloseFalse() {
final MemberX500Name counterparty = new MemberX500Name("Alice Corp", "LDN", "GB");
when(flowMessaging.initiateFlow(eq(counterparty), eq(false), any())).thenReturn(flowSession);
when(flowMessaging.initiateFlow(eq(counterparty), eq(false), any(FlowContextPropertiesBuilder.class))).thenReturn(flowSession);

FlowSession result = flowMessaging.initiateFlow(counterparty, false, (contextProperties) -> contextProperties.put("key", "value"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@
"type": "net.corda.data.identity.HoldingIdentity",
"doc": "Identity of the counterparty in the session."
},
{
"name": "requireClose",
"type": "boolean",
"doc": "True if the user has set requireClose to be true when calling initiate flow. False otherwise."
},
{
"name": "receivedEventsState",
"type": "net.corda.data.flow.state.session.SessionProcessState",
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ cordaProductVersion = 5.2.0
# NOTE: update this each time this module contains a breaking change
## NOTE: currently this is a top level revision, so all API versions will line up, but this could be moved to
## a per module property in which case module versions can change independently.
cordaApiRevision = 16
cordaApiRevision = 17

# Main
kotlin.stdlib.default.dependency = false
Expand Down