Skip to content

Commit

Permalink
Merge branch 'release/os/5.1' into alex/merge-from-release-os-5.1
Browse files Browse the repository at this point in the history
  • Loading branch information
ac101m committed Sep 7, 2023
2 parents 2cb81f3 + 3b92307 commit b60edda
Show file tree
Hide file tree
Showing 23 changed files with 149 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,28 @@ public interface FlowMessaging {
@NotNull
FlowSession initiateFlow(@NotNull MemberX500Name x500Name);


/**
* Creates a communication session with a counterparty's {@link ResponderFlow}. Subsequently, you may send/receive using
* this session object. Note that this function does not communicate in itself. The counter-flow will be kicked off
* by the first send/receive.
* <p>
* Initiated flows are initiated with context based on the context of the initiating flow at the point in time this
* method is called. The context of the initiating flow is snapshotted by the returned session. Altering the flow
* context has no effect on the context of the session after this point, and therefore it has no effect on the
* context of the initiated flow either.
*
* @param x500Name The X500 name of the member to communicate with.
* @param requireClose When set to true, the initiated party will send a close message after calling FlowSession.close()
* and the initiating party will suspend and wait to receive the message when they call FlowSession.close().
* When set to false the session is marked as terminated immediately when close() is called.
*
* @return The session.
*/
@Suspendable
@NotNull
FlowSession initiateFlow(@NotNull MemberX500Name x500Name, boolean requireClose);

/**
* Creates a communication session with another member. Subsequently, you may send/receive using this session object.
* Note that this function does not communicate in itself. The counter-flow will be kicked off by the first
Expand Down Expand Up @@ -146,10 +168,49 @@ String callFacade(
@NotNull String payload);

/**
* Suspends until a message has been received for each session in the specified [sessions].
* Creates a communication session with another member. Subsequently, you may send/receive using this session object.
* Note that this function does not communicate in itself. The counter-flow will be kicked off by the first
* send/receive.
* <p>
* This overload takes a builder of context properties. Any properties set or modified against the context passed to
* this builder will be propagated to initiated flows and all that flow's initiated flows and sub flows down the
* stack. The properties passed to the builder are pre-populated with the current flow context properties, see
* {@link FlowContextProperties}. Altering the current flow context has no effect on the context of the session after the
* builder is applied and the session returned by this method, and therefore it has no effect on the context of the
* initiated flow either.
* <p>
* Example of use in Kotlin.
* ```Kotlin
* val flowSession = flowMessaging.initiateFlow(virtualNodeName) { flowContextProperties ->
* flowContextProperties["key"] = "value"
* }
* ```
* Example of use in Java.
* ```Java
* FlowSession flowSession = flowMessaging.initiateFlow(virtualNodeName, (flowContextProperties) -> {
* flowContextProperties.put("key", "value");
* });
* ```
*
* @param x500Name The X500 name of the member to communicate with.
* @param requireClose When set to true, the initiated party will send a close message after calling FlowSession.close()
* and the initiating party will suspend and wait to receive the message when they call FlowSession.close().
* When set to false the session is marked as terminated immediately when close() is called.
* @param flowContextPropertiesBuilder A builder of context properties.
*
* @return The session.
*
* Consider [receiveAllMap(sessions: Map<FlowSession, Class<out Any>>): Map<FlowSession, Any>] when sessions are
* expected to receive different types.
* @throws IllegalArgumentException if the builder tries to set a property for which a platform property already
* exists or if the key is prefixed by {@link FlowContextProperties#CORDA_RESERVED_PREFIX}. See also
* {@link FlowContextProperties}. Any other
* exception thrown by the builder will also be thrown through here and should be avoided in the provided
* implementation, see {@link FlowContextPropertiesBuilder}.
*/
@Suspendable
@NotNull
FlowSession initiateFlow(@NotNull MemberX500Name x500Name, boolean requireClose, @NotNull FlowContextPropertiesBuilder flowContextPropertiesBuilder);

/**
* Suspends until a message has been received for each session in the specified {@code sessions}.
* <p>
* Consider {@link #receiveAllMap(Map)} when sessions are expected to receive different types.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,26 @@ public void initiateFlowPartyWithBuilder() {
Assertions.assertThat(result).isNotNull();
Assertions.assertThat(result).isEqualTo(flowSession);
}

@Test
public void initiateFlowPartyWithBuilderRequireCloseTrue() {
final MemberX500Name counterparty = new MemberX500Name("Alice Corp", "LDN", "GB");
when(flowMessaging.initiateFlow(eq(counterparty), eq(true), any())).thenReturn(flowSession);

FlowSession result = flowMessaging.initiateFlow(counterparty, true, (contextProperties) -> contextProperties.put("key", "value"));

Assertions.assertThat(result).isNotNull();
Assertions.assertThat(result).isEqualTo(flowSession);
}

@Test
public void initiateFlowPartyWithBuilderRequireCloseFalse() {
final MemberX500Name counterparty = new MemberX500Name("Alice Corp", "LDN", "GB");
when(flowMessaging.initiateFlow(eq(counterparty), eq(false), any())).thenReturn(flowSession);

FlowSession result = flowMessaging.initiateFlow(counterparty, false, (contextProperties) -> contextProperties.put("key", "value"));

Assertions.assertThat(result).isNotNull();
Assertions.assertThat(result).isEqualTo(flowSession);
}
}
2 changes: 1 addition & 1 deletion data/avro-schema/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## Avro Code Source Files

All Arvo definitions should go under `src/main/resources/avro`. While technically
All Avro definitions should go under `src/main/resources/avro`. While technically
not necessary, it would also be good to put them under the same directory structure as
the package (or `namespace` in Avro-speak) that the record will belong.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,29 +47,20 @@
"type": "net.corda.data.identity.HoldingIdentity",
"doc": "Identity of party in the session who was initiated."
},
{
"name": "receivedSequenceNum",
"type": "int",
"doc": "Sequence number of the last contiguous message received from a counterparty. 0 if no messages received."
},
{
"name": "outOfOrderSequenceNums",
"type": {
"type": "array",
"items": "int"
},
"doc": "The sequence numbers of events received with a value greater than the last contiguous event received. i.e out of order messages received with a value greater than the receivedSequenceNum."
},
{
"name": "payload",
"type": [
"net.corda.data.flow.event.session.SessionInit",
"net.corda.data.flow.event.session.SessionConfirm",
"net.corda.data.flow.event.session.SessionData",
"net.corda.data.flow.event.session.SessionClose",
"net.corda.data.flow.event.session.SessionAck",
"net.corda.data.flow.event.session.SessionError"
]
},
{
"name": "contextSessionProperties",
"type": ["null", "net.corda.data.KeyValuePairList"],
"doc": "A map of context properties received from a counterparty related to this flow session. This is static data and will be set to null when previously sent."
}
]
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,5 @@
"name": "SessionConfirm",
"doc" : "Acknowledge to counterparty that the session has been confirmed and is ready to send and receive messages.",
"namespace": "net.corda.data.flow.event.session",
"fields": [
{
"name": "contextSessionProperties",
"type": "net.corda.data.KeyValuePairList",
"doc": "A map of context properties received from a counterparty related to this flow session. This contains information such as protocol name and version that this flow is running."
}
]
"fields": []
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
"net.corda.data.chunking.Chunk",
"bytes"
]
},
{
"name": "sessionInit",
"type": ["null", "net.corda.data.flow.event.session.SessionInit"],
"doc": "Contains information that can be used to start an initiated flow, piggybacked on initial data messages. Will be null for messages sent to the initiator. Will be null when initiated party is confirmed to be present to ensure out of order messages that arrive first contain this info."
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,6 @@
"name": "contextPlatformProperties",
"type": "net.corda.data.KeyValuePairList",
"doc": "A map of context platform properties made available to the flow which will also be propagated to sub flows, initiated flows and services"
},
{
"name": "contextSessionProperties",
"type": "net.corda.data.KeyValuePairList",
"doc": "A map of context properties to send to a counterparty related to this flow session. This contains information such as protocol name and versions supported."
},
{
"name": "payload",
"type": [
"null",
"bytes"
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,15 @@
},
"doc": "Time ([Instant]) in milliseconds when the last session event was received from a counterparty"
},
{
"name": "lastSentMessageTime",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
},
"doc": "Time ([Instant]) in milliseconds of the last message sent to a counterparty"
},
{
"name": "counterpartyIdentity",
"type": "net.corda.data.identity.HoldingIdentity",
"doc": "Identity of the counterparty in the session."
},
{
"name": "sendAck",
"name": "requireClose",
"type": "boolean",
"doc": "True if there are messages to ack to a counterparty. False if there are no messages received that have not been acked."
"doc": "True if the user has set requireClose to be true when calling initiate flow. False otherwise."
},
{
"name": "receivedEventsState",
Expand All @@ -56,7 +48,7 @@
{
"name": "sendEventsState",
"type": "net.corda.data.flow.state.session.SessionProcessState",
"doc": "Record the sequence number of the last event sent to the counterparty. Record all events sent but not yet acknowledged by the counterparty as well as any messages to send to the counterparty."
"doc": "Record the sequence number of the last event sent to the counterparty. Record all events to be sent to the counterparty."
},
{
"name": "status",
Expand All @@ -67,7 +59,6 @@
"CREATED",
"CONFIRMED",
"CLOSING",
"WAIT_FOR_FINAL_ACK",
"CLOSED",
"ERROR"
]
Expand All @@ -80,12 +71,12 @@
"doc": "Whether the session state has already scheduled a cleanup event with the flow mapper."
},
{
"name": "counterpartySessionProperties",
"name": "sessionProperties",
"type": [
"null",
"net.corda.data.KeyValuePairList"
],
"doc": "A map of context properties received from a counterparty related to this flow session. This contains information such as protocol name and the version running."
"doc": "A map of context properties related to this flow session. This contains information such as protocol name and the version running."
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
},
{
"name": "uniquenessDmlConnectionId",
"type": "string",
"type": ["null", "string"],
"doc": "ID of virtual node Uniqueness DB connection for DML operations."
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ private FlowConfig() {

public static final String EXTERNAL_EVENT_MESSAGE_RESEND_WINDOW = "event.messageResendWindow";
public static final String EXTERNAL_EVENT_MAX_RETRIES = "event.maxRetries";
public static final String SESSION_MESSAGE_RESEND_WINDOW = "session.messageResendWindow";
public static final String SESSION_HEARTBEAT_TIMEOUT_WINDOW = "session.heartbeatTimeout";
public static final String SESSION_MISSING_COUNTERPARTY_TIMEOUT_WINDOW = "session.missingCounterpartyTimeout";
public static final String SESSION_TIMEOUT_WINDOW = "session.timeout";
public static final String SESSION_P2P_TTL = "session.p2pTTL";
public static final String SESSION_FLOW_CLEANUP_TIME = "session.cleanupTime";
public static final String PROCESSING_MAX_RETRY_ATTEMPTS = "processing.maxRetryAttempts";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,12 @@
"type": "object",
"default": {},
"properties": {
"messageResendWindow": {
"description": "The length of time in milliseconds that Corda waits before resending unacknowledged flow session messages.",
"type": "integer",
"minimum": 1000,
"maximum": 2147483647,
"default": 120000
},
"heartbeatTimeout": {
"description": "The length of time in milliseconds that Corda waits when no message has been received from a counterparty before causing the session to error. This should be set at least 2 times larger than session.messageResendWindow.",
"timeout": {
"description": "The length of time in milliseconds that Corda waits when no message has been received from a counterparty before causing the session to error.",
"type": "integer",
"minimum": 1000,
"maximum": 2147483647,
"default": 1800000
},
"missingCounterpartyTimeout": {
"description": "The length of time in milliseconds to wait when the counterparty can't be found in a member lookup before causing the session to error",
"type": "integer",
"minimum": 1000,
"maximum": 2147483647,
"default": 300000
},
"p2pTTL": {
"description": "The TTL set in milliseconds. This is added to the current time and set on messages passed to the P2P layer to send to a counterparty. Messages received with a TTL timestamp set in the past will be discarded.",
Expand Down
2 changes: 1 addition & 1 deletion data/topic-schema/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ The schemas for each topic a virtual node uses will be defined here.
These schemas should be updated here alongside their implementation in an effort to keep this document up-to-date.

### Kafka Auto-topic creation
**We must ensure to avoid setting auto-creation of topics is Kafka!**
**We must ensure we do not enable auto-creation of topics in Kafka!**

This can lead to unnoticed errors in production. We should adhere to the ethos that if a topic
we expect doesn't exist then that signifies a setup problem and should immediately cause an error.
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,24 @@ topics:
- crypto
- db
- flow
- flowMapper
- verification
- membership
- gateway
- link-manager
- persistence
- rest
- uniqueness
producers:
- crypto
- db
- flow
- flowMapper
- verification
- membership
- gateway
- link-manager
- persistence
- rest
- uniqueness
config:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ topics:
- crypto
- db
- flow
- flowMapper
- verification
- membership
- gateway
- link-manager
- rest
- persistence
- uniqueness
producers:
- db
Expand Down
Loading

0 comments on commit b60edda

Please sign in to comment.