Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CORE-18320 Support for a flow specific timeout on Flow Message API #1394

Merged
merged 25 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3be95e2
CORE-18320 Support for a flow specific timeout on Flow Message API. C…
mbrkic-r3 Dec 12, 2023
98f3c00
CORE-18320 Increased API version
mbrkic-r3 Dec 12, 2023
1cded1a
CORE-18320 Set default value for SessionState.sessionTimeout
mbrkic-r3 Dec 13, 2023
89d6bbe
Merge branch 'release/os/5.2' into mbrkic-r3/CORE-18320/flow-timeout
mbrkic-r3 Dec 13, 2023
c887c98
Merge branch 'release/os/5.2' into mbrkic-r3/CORE-18320/flow-timeout
mbrkic-r3 Dec 14, 2023
42ccb7e
CORE-18320 Using builder to create session configuration.
mbrkic-r3 Dec 14, 2023
72dc4e5
Merge remote-tracking branch 'origin/mbrkic-r3/CORE-18320/flow-timeou…
mbrkic-r3 Dec 14, 2023
18f2352
CORE-18320 Removed "requireClose" and "sessionTimeout" from SessionSt…
mbrkic-r3 Dec 14, 2023
75122e1
Merge remote-tracking branch 'origin/release/os/5.2' into mbrkic-r3/C…
mbrkic-r3 Dec 15, 2023
82d9704
CORE-18320 Increased APi version
mbrkic-r3 Dec 15, 2023
602a075
Merge remote-tracking branch 'origin/release/os/5.2' into mbrkic-r3/C…
mbrkic-r3 Dec 15, 2023
fb4e495
Merge branch 'release/os/5.2' into mbrkic-r3/CORE-18320/flow-timeout
mbrkic-r3 Dec 15, 2023
dfdf2a1
CORE-18320 Increased APi version
mbrkic-r3 Dec 15, 2023
47d57ec
Merge branch 'release/os/5.2' into mbrkic-r3/CORE-18320/flow-timeout
mbrkic-r3 Dec 18, 2023
1b54ea7
CORE-18320 Increased APi version
mbrkic-r3 Dec 18, 2023
95d8f7f
CORE-18320 Deprecated FlowMessaging functions that use argument requi…
mbrkic-r3 Dec 18, 2023
b80aa88
CORE-18320 Deprecated FlowMessaging functions that use argument requi…
mbrkic-r3 Dec 18, 2023
e3ac9df
CORE-18320 ./gradlew cementApi
mbrkic-r3 Dec 18, 2023
2dd4186
CORE-18320 Removed deprecations (this change will be in a separate PR)
mbrkic-r3 Dec 19, 2023
30778bc
Merge remote-tracking branch 'origin/release/os/5.2' into mbrkic-r3/C…
mbrkic-r3 Dec 19, 2023
061c57c
CORE-18320 Class FlowSessionConfiguration and related Builder made final
mbrkic-r3 Dec 19, 2023
52b6a1b
CORE-18320 Increased API version
mbrkic-r3 Dec 20, 2023
ad4863b
Merge remote-tracking branch 'origin/release/os/5.2' into mbrkic-r3/C…
mbrkic-r3 Dec 20, 2023
5f5034c
CORE-18320 Increased API version
mbrkic-r3 Dec 20, 2023
40e1fe0
Merge remote-tracking branch 'origin/release/os/5.2' into mbrkic-r3/C…
mbrkic-r3 Dec 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import net.corda.v5.base.types.MemberX500Name;
import org.jetbrains.annotations.NotNull;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -106,6 +107,29 @@ public interface FlowMessaging {
@NotNull
FlowSession initiateFlow(@NotNull MemberX500Name x500Name, boolean requireClose);
mbrkic-r3 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Creates a communication session with a counterparty's {@link ResponderFlow}. Subsequently, you may send/receive using
* this session object. Note that this function does not communicate in itself. The counter-flow will be kicked off
* by the first send/receive.
* <p>
* Initiated flows are initiated with context based on the context of the initiating flow at the point in time this
* method is called. The context of the initiating flow is snapshotted by the returned session. Altering the flow
* context has no effect on the context of the session after this point, and therefore it has no effect on the
* context of the initiated flow either.
*
* @param x500Name The X500 name of the member to communicate with.
* @param requireClose When set to true, the initiated party will send a close message after calling FlowSession.close()
* and the initiating party will suspend and wait to receive the message when they call FlowSession.close().
* When set to false the session is marked as terminated immediately when close() is called.
* @param sessionTimeout The duration that Corda waits when no message has been received from a counterparty before
* causing the session to error. If set to `null`, value set in Corda Configuration will be used.
*
* @return The session.
*/
@Suspendable
@NotNull
FlowSession initiateFlow(@NotNull MemberX500Name x500Name, boolean requireClose, Duration sessionTimeout);

/**
* Creates a communication session with another member. Subsequently, you may send/receive using this session object.
* Note that this function does not communicate in itself. The counter-flow will be kicked off by the first
Expand Down Expand Up @@ -189,6 +213,57 @@ public interface FlowMessaging {
@NotNull
FlowSession initiateFlow(@NotNull MemberX500Name x500Name, boolean requireClose, @NotNull FlowContextPropertiesBuilder flowContextPropertiesBuilder);

/**
* Creates a communication session with another member. Subsequently, you may send/receive using this session object.
* Note that this function does not communicate in itself. The counter-flow will be kicked off by the first
* send/receive.
* <p>
* This overload takes a builder of context properties. Any properties set or modified against the context passed to
* this builder will be propagated to initiated flows and all that flow's initiated flows and sub flows down the
* stack. The properties passed to the builder are pre-populated with the current flow context properties, see
* {@link FlowContextProperties}. Altering the current flow context has no effect on the context of the session after the
* builder is applied and the session returned by this method, and therefore it has no effect on the context of the
* initiated flow either.
* <p>
* Example of use in Kotlin.
* ```Kotlin
* val flowSession = flowMessaging.initiateFlow(virtualNodeName) { flowContextProperties ->
* flowContextProperties["key"] = "value"
* }
* ```
* Example of use in Java.
* ```Java
* FlowSession flowSession = flowMessaging.initiateFlow(virtualNodeName, (flowContextProperties) -> {
* flowContextProperties.put("key", "value");
* });
* ```
*
* @param x500Name The X500 name of the member to communicate with.
* @param requireClose When set to true, the initiated party will send a close message after calling FlowSession.close()
* and the initiating party will suspend and wait to receive the message when they call FlowSession.close().
* When set to false the session is marked as terminated immediately when close() is called.
* @param sessionTimeout The duration that Corda waits when no message has been received from a counterparty before
* causing the session to error. If set to `null`, value set in Corda Configuration will
* be used.
* @param flowContextPropertiesBuilder A builder of context properties.
*
* @return The session.
*
* @throws IllegalArgumentException if the builder tries to set a property for which a platform property already
* exists or if the key is prefixed by {@link FlowContextProperties#CORDA_RESERVED_PREFIX}. See also
* {@link FlowContextProperties}. Any other
* exception thrown by the builder will also be thrown through here and should be avoided in the provided
* implementation, see {@link FlowContextPropertiesBuilder}.
*/
@Suspendable
@NotNull
FlowSession initiateFlow(
@NotNull MemberX500Name x500Name,
boolean requireClose,
Duration sessionTimeout,
@NotNull FlowContextPropertiesBuilder flowContextPropertiesBuilder
);

/**
* Suspends until a message has been received for each session in the specified {@code sessions}.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void initiateFlowPartyWithBuilder() {
@Test
public void initiateFlowPartyWithBuilderRequireCloseTrue() {
final MemberX500Name counterparty = new MemberX500Name("Alice Corp", "LDN", "GB");
when(flowMessaging.initiateFlow(eq(counterparty), eq(true), any())).thenReturn(flowSession);
when(flowMessaging.initiateFlow(eq(counterparty), eq(true), any(FlowContextPropertiesBuilder.class))).thenReturn(flowSession);

FlowSession result = flowMessaging.initiateFlow(counterparty, true, (contextProperties) -> contextProperties.put("key", "value"));

Expand All @@ -50,7 +50,7 @@ public void initiateFlowPartyWithBuilderRequireCloseTrue() {
@Test
public void initiateFlowPartyWithBuilderRequireCloseFalse() {
final MemberX500Name counterparty = new MemberX500Name("Alice Corp", "LDN", "GB");
when(flowMessaging.initiateFlow(eq(counterparty), eq(false), any())).thenReturn(flowSession);
when(flowMessaging.initiateFlow(eq(counterparty), eq(false), any(FlowContextPropertiesBuilder.class))).thenReturn(flowSession);

FlowSession result = flowMessaging.initiateFlow(counterparty, false, (contextProperties) -> contextProperties.put("key", "value"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@
"type": "boolean",
"doc": "True if the user has set requireClose to be true when calling initiate flow. False otherwise."
},
{
"name": "sessionTimeout",
mbrkic-r3 marked this conversation as resolved.
Show resolved Hide resolved
"type": [
"null",
"int"
],
"default": null,
"doc": "The length of time in milliseconds that Corda waits when no message has been received from a counterparty before causing the session to error. If not set, value from configuration is used."
},
{
"name": "receivedEventsState",
"type": "net.corda.data.flow.state.session.SessionProcessState",
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ cordaProductVersion = 5.2.0
# NOTE: update this each time this module contains a breaking change
## NOTE: currently this is a top level revision, so all API versions will line up, but this could be moved to
## a per module property in which case module versions can change independently.
cordaApiRevision = 12
cordaApiRevision = 13

# Main
kotlinVersion = 1.8.21
Expand Down