From c23d34c40ad1ccc73cf7014ece4e463ad3d3b913 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Jos=C3=A9=20Ramos?= Date: Fri, 5 Jan 2024 13:53:17 +0000 Subject: [PATCH] CORE-18533: Remove Checkpoint Retry (#1429) - Remove "RetryState" avro schema. - Move "FlowStates" enumeration to its own avro schema definition file. - Remove "RETRYING" and set "FAILED" as the default value (for schema evolution, unmatched values do not throw an error but are resolved to the default instead). - Remove "retryState" and "maxFlowSleepDuration" fields from the "PipelineState" avro schema. - Remove "CheckpointSchemaCompatibilityTest" (we only support evolving types one minor version at a time). - Add schema compatibility tests for "FlowStates" and "PipelineState". --- .../corda/data/flow/output/FlowStates.avsc | 14 + .../corda/data/flow/output/FlowStatus.avsc | 15 +- .../flow/state/checkpoint/PipelineState.avsc | 11 - .../flow/state/checkpoint/RetryState.avsc | 39 - .../src/test/java/checkpoint/RetryState.java | 731 ++++++++++++++++++ .../CheckpointSchemaCompatibilityTest.kt | 73 -- .../FlowStatusSchemaCompatibilityTest.kt | 113 +++ .../PipelineStateSchemaCompatibilityTest.kt | 70 ++ .../configuration/flow/1.0/corda.flow.json | 2 +- gradle.properties | 2 +- 10 files changed, 932 insertions(+), 138 deletions(-) create mode 100644 data/avro-schema/src/main/resources/avro/net/corda/data/flow/output/FlowStates.avsc delete mode 100644 data/avro-schema/src/main/resources/avro/net/corda/data/flow/state/checkpoint/RetryState.avsc create mode 100644 data/avro-schema/src/test/java/checkpoint/RetryState.java delete mode 100644 data/avro-schema/src/test/kotlin/net/corda/data/flow/state/checkpoint/CheckpointSchemaCompatibilityTest.kt create mode 100644 data/avro-schema/src/test/kotlin/net/corda/data/flow/state/checkpoint/FlowStatusSchemaCompatibilityTest.kt create mode 100644 data/avro-schema/src/test/kotlin/net/corda/data/flow/state/checkpoint/PipelineStateSchemaCompatibilityTest.kt diff --git a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/output/FlowStates.avsc b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/output/FlowStates.avsc new file mode 100644 index 0000000000..ba81901ff2 --- /dev/null +++ b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/output/FlowStates.avsc @@ -0,0 +1,14 @@ +{ + "type": "enum", + "name": "FlowStates", + "namespace": "net.corda.data.flow.output", + "doc": "The current processing status of a flow" , + "symbols": [ + "START_REQUESTED", + "RUNNING", + "COMPLETED", + "FAILED", + "KILLED" + ], + "default": "FAILED" +} diff --git a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/output/FlowStatus.avsc b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/output/FlowStatus.avsc index 65c7af89cc..0029793cf4 100644 --- a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/output/FlowStatus.avsc +++ b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/output/FlowStatus.avsc @@ -26,19 +26,8 @@ }, { "name": "flowStatus", - "doc": "The current processing status of a flow" , - "type": { - "name": "FlowStates", - "type": "enum", - "symbols": [ - "START_REQUESTED", - "RUNNING", - "RETRYING", - "COMPLETED", - "FAILED", - "KILLED" - ] - } + "type": "net.corda.data.flow.output.FlowStates", + "doc": "The current processing status of a flow" }, { "name": "result", diff --git a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/state/checkpoint/PipelineState.avsc b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/state/checkpoint/PipelineState.avsc index f13bf12899..e4f060eaa5 100644 --- a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/state/checkpoint/PipelineState.avsc +++ b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/state/checkpoint/PipelineState.avsc @@ -4,17 +4,6 @@ "namespace": "net.corda.data.flow.state.checkpoint", "doc": "State used by the flow engine to track pipeline details and provide diagnostics.", "fields": [ - { - "name": "retryState", - "type": ["null", "net.corda.data.flow.state.checkpoint.RetryState"], - "default": null, - "doc": "Optional retry information for a failed flow event. Setting this field marks the flow as retrying." - }, - { - "name": "maxFlowSleepDuration", - "type": "int", - "doc": "The maximum time a flow can sleep, before a Wakeup event is generated (milliseconds)" - }, { "name": "pendingPlatformError", "type": ["null", "net.corda.data.ExceptionEnvelope"], diff --git a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/state/checkpoint/RetryState.avsc b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/state/checkpoint/RetryState.avsc deleted file mode 100644 index 2e1b32aa36..0000000000 --- a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/state/checkpoint/RetryState.avsc +++ /dev/null @@ -1,39 +0,0 @@ -{ - "type": "record", - "name": "RetryState", - "namespace": "net.corda.data.flow.state.checkpoint", - "doc": "The Retry State records the need to retry a failed event on the flow Checkpoint", - "fields": [ - { - "name": "retryCount", - "type": "int", - "doc": "The current retry count, set to 0 for the initial failure" - }, - { - "name": "failedEvent", - "type": "net.corda.data.flow.event.FlowEvent", - "doc": "Copy of the event that caused the failure" - }, - { - "name": "error", - "type": "net.corda.data.ExceptionEnvelope", - "doc": "The original error that caused the retry" - }, - { - "name": "firstFailureTimestamp", - "type": { - "type": "long", - "logicalType": "timestamp-millis" - }, - "doc": "The timestamp of when the first exception occurred that triggered a retry" - }, - { - "name": "lastFailureTimestamp", - "type": { - "type": "long", - "logicalType": "timestamp-millis" - }, - "doc": "The timestamp of when the last exception occurred that triggered a retry (this will be the same as firstRetryTimestamp for a first time failure" - } - ] -} \ No newline at end of file diff --git a/data/avro-schema/src/test/java/checkpoint/RetryState.java b/data/avro-schema/src/test/java/checkpoint/RetryState.java new file mode 100644 index 0000000000..70d50fb459 --- /dev/null +++ b/data/avro-schema/src/test/java/checkpoint/RetryState.java @@ -0,0 +1,731 @@ +/** + * Autogenerated by Avro + * DO NOT EDIT DIRECTLY + * + * The schema has been deleted as part of the work to simplify the flow retry logic, so the class is not automatically + * generated by Avro and doesn't exist in the code base anymore. + * The old generated version of the class has been manually copied and should be kept here in order to verify backward + * compatibility through {@link net.corda.data.flow.state.checkpoint.PipelineStateSchemaCompatibilityTest}. + */ +package checkpoint; + +import org.apache.avro.message.BinaryMessageDecoder; +import org.apache.avro.message.BinaryMessageEncoder; +import org.apache.avro.message.SchemaStore; +import org.apache.avro.specific.SpecificData; + +/** The Retry State records the need to retry a failed event on the flow Checkpoint */ +@org.apache.avro.specific.AvroGenerated +public class RetryState extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + private static final long serialVersionUID = 8311123872064692403L; + + + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"RetryState\",\"namespace\":\"net.corda.data.flow.state.checkpoint\",\"doc\":\"The Retry State records the need to retry a failed event on the flow Checkpoint\",\"fields\":[{\"name\":\"retryCount\",\"type\":\"int\",\"doc\":\"The current retry count, set to 0 for the initial failure\"},{\"name\":\"failedEvent\",\"type\":{\"type\":\"record\",\"name\":\"FlowEvent\",\"namespace\":\"net.corda.data.flow.event\",\"fields\":[{\"name\":\"flowId\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"payload\",\"type\":[{\"type\":\"record\",\"name\":\"StartFlow\",\"doc\":\"The Start Flow event represents the information needed to initiate a flow.\",\"fields\":[{\"name\":\"startContext\",\"type\":{\"type\":\"record\",\"name\":\"FlowStartContext\",\"namespace\":\"net.corda.data.flow\",\"doc\":\"The Start Flow event represents the information needed to initiate a flow.\",\"fields\":[{\"name\":\"statusKey\",\"type\":{\"type\":\"record\",\"name\":\"FlowKey\",\"doc\":\"Represents of a unique key for a flow instance.\",\"fields\":[{\"name\":\"id\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"Unique flow id for the given Holding Identity\"},{\"name\":\"identity\",\"type\":{\"type\":\"record\",\"name\":\"HoldingIdentity\",\"namespace\":\"net.corda.data.identity\",\"fields\":[{\"name\":\"x500Name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"groupId\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]},\"doc\":\"Identity of the party executing the flow.\"}]},\"doc\":\"The unique ID for tracking the flow status\"},{\"name\":\"initiatorType\",\"type\":{\"type\":\"enum\",\"name\":\"FlowInitiatorType\",\"doc\":\"Represents the type flow initiator.\",\"symbols\":[\"RPC\",\"P2P\"]},\"doc\":\"The type of initiator that started the flow\"},{\"name\":\"requestId\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"The request ID assigned by the client or session that created the flow.\"},{\"name\":\"identity\",\"type\":\"net.corda.data.identity.HoldingIdentity\",\"doc\":\"The identity of the party executing the flow.\"},{\"name\":\"cpiId\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"The CPI ID for the package containing the flow.\"},{\"name\":\"initiatedBy\",\"type\":\"net.corda.data.identity.HoldingIdentity\",\"doc\":\"The identity of the party that initiated the flow.\"},{\"name\":\"flowClassName\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"The fully qualified class name of the flow.\"},{\"name\":\"startArgs\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"doc\":\"The body of the request provided when the flow was started if this flow was started via RPC. Null otherwise.\",\"default\":null},{\"name\":\"contextPlatformProperties\",\"type\":{\"type\":\"record\",\"name\":\"KeyValuePairList\",\"namespace\":\"net.corda.data\",\"doc\":\"Avro representation of the List> format.\",\"fields\":[{\"name\":\"items\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"KeyValuePair\",\"doc\":\"Key-value pair of strings.\",\"fields\":[{\"name\":\"key\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"value\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}]}]}},\"doc\":\"List of the Pair items.\"}]},\"doc\":\"A map of platform context properties made available to the flow which will also be propagated to sub flows, initiated flows and services, associating the entire end to end flow execution path with a notion of a context. Only populated if this flow was started via RPC. Empty otherwise.\"},{\"name\":\"createdTimestamp\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},\"doc\":\"The date and time the flow was created.\"}]},\"doc\":\"The request ID assigned by the client that created the flow.\"},{\"name\":\"flowStartArgs\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"A client supplied string, passed to the flow when it is started. This is used by the client to pass data to a flow.\"}]},{\"type\":\"record\",\"name\":\"FlowStatus\",\"namespace\":\"net.corda.data.flow.output\",\"doc\":\"The Flow Status represents the current processing state of a flow at a given point in time\",\"fields\":[{\"name\":\"key\",\"type\":\"net.corda.data.flow.FlowKey\",\"doc\":\"The unique ID for the status\"},{\"name\":\"initiatorType\",\"type\":\"net.corda.data.flow.FlowInitiatorType\",\"doc\":\"The type of initiator that started the flow\"},{\"name\":\"flowId\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"doc\":\"The unique flow ID\"},{\"name\":\"flowClassName\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"The fully qualified class name of the flow\"},{\"name\":\"flowStatus\",\"type\":{\"type\":\"enum\",\"name\":\"FlowStates\",\"symbols\":[\"START_REQUESTED\",\"RUNNING\",\"RETRYING\",\"COMPLETED\",\"FAILED\",\"KILLED\"]},\"doc\":\"The current processing status of a flow\"},{\"name\":\"result\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"doc\":\"Optional result, this contains the result from the flow will only be set if the flow status is 'COMPLETED'\"},{\"name\":\"error\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"ExceptionEnvelope\",\"namespace\":\"net.corda.data\",\"doc\":\"Exception envelope for transmitting exceptions\",\"fields\":[{\"name\":\"errorType\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"Error type in string format\"},{\"name\":\"errorMessage\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"The error message\"}]}],\"doc\":\"Optional error message, this will be set if the flow status is 'FAILED'\"},{\"name\":\"processingTerminatedReason\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"doc\":\"Optional message indicating reasoning why processing a flow has been terminated.\"},{\"name\":\"createdTimestamp\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},\"doc\":\"The date the flow was created.\"},{\"name\":\"lastUpdateTimestamp\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},\"doc\":\"The date and time this status update was published.\"}]},{\"type\":\"record\",\"name\":\"SessionEvent\",\"doc\":\"Events sent to counterparties as part of a session\",\"fields\":[{\"name\":\"messageDirection\",\"type\":{\"type\":\"enum\",\"name\":\"MessageDirection\",\"symbols\":[\"INBOUND\",\"OUTBOUND\"]},\"doc\":\"INBOUND for messages originating from a counterparty. OUTBOUND for messages to be sent to a counterparty.\"},{\"name\":\"timestamp\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},\"doc\":\"Time ([Instant]) in milliseconds when the session event was created\"},{\"name\":\"sessionId\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"UUID for the session. The initiated party sessionID will have a suffix of `-INITIATED`\"},{\"name\":\"sequenceNum\",\"type\":[\"null\",\"int\"],\"doc\":\"Unique id for a message sent to a party. sequenceNum is not bidirectional. Each party will track outbound/inbound sequence numbers separately.\"},{\"name\":\"initiatingIdentity\",\"type\":\"net.corda.data.identity.HoldingIdentity\",\"doc\":\"Identity of party who started a session.\"},{\"name\":\"initiatedIdentity\",\"type\":\"net.corda.data.identity.HoldingIdentity\",\"doc\":\"Identity of party in the session who was initiated.\"},{\"name\":\"payload\",\"type\":[{\"type\":\"record\",\"name\":\"SessionCounterpartyInfoRequest\",\"namespace\":\"net.corda.data.flow.event.session\",\"doc\":\"Request counterparties flow session information. This includes the flow protocol version that they are running.\",\"fields\":[{\"name\":\"sessionInit\",\"type\":{\"type\":\"record\",\"name\":\"SessionInit\",\"doc\":\"Message sent to a counterparty to initiate a session\",\"fields\":[{\"name\":\"cpiId\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"flowId\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"doc\":\"Flow ID of the flow initiating the session. This is only required to inform the Flow Mapper. It should be null when sent to counterparties.\"},{\"name\":\"contextUserProperties\",\"type\":\"net.corda.data.KeyValuePairList\",\"doc\":\"A map of context user properties made available to the flow which will also be propagated to sub flows, initiated flows and services\"},{\"name\":\"contextPlatformProperties\",\"type\":\"net.corda.data.KeyValuePairList\",\"doc\":\"A map of context platform properties made available to the flow which will also be propagated to sub flows, initiated flows and services\"}]},\"doc\":\"Contains information that can be used to start an initiated flow. Will be null for messages sent to the initiator. Will be null when initiated party is confirmed to be present to ensure out of order messages that arrive first contain this info.\"}]","},{\"type\":\"record\",\"name\":\"SessionCounterpartyInfoResponse\",\"namespace\":\"net.corda.data.flow.event.session\",\"doc\":\"Sent from initiated to initiating party to inform them which protocol version of the flow they are running.\",\"fields\":[]},{\"type\":\"record\",\"name\":\"SessionData\",\"namespace\":\"net.corda.data.flow.event.session\",\"doc\":\"Data message sent between parties containing a serialized payload\",\"fields\":[{\"name\":\"payload\",\"type\":[{\"type\":\"record\",\"name\":\"Chunk\",\"namespace\":\"net.corda.data.chunking\",\"doc\":\"Binary chunk of a larger binary artifact\",\"fields\":[{\"name\":\"requestId\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"some unique identifier that indicates the group this chunk belongs with\"},{\"name\":\"checksum\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"SecureHash\",\"namespace\":\"net.corda.data.crypto\",\"fields\":[{\"name\":\"algorithm\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"bytes\",\"type\":\"bytes\"}]}],\"doc\":\"checksum of assembled chunks\"},{\"name\":\"partNumber\",\"type\":\"int\",\"doc\":\"number of chunk\"},{\"name\":\"offset\",\"type\":\"long\",\"doc\":\"offset of this chunk from beginning of complete binary artifact\"},{\"name\":\"data\",\"type\":\"bytes\",\"doc\":\"the binary data fop the chunk. Zero data has a special meaning, signaling the last chunk in the sequence\"},{\"name\":\"properties\",\"type\":[\"null\",\"net.corda.data.KeyValuePairList\"],\"doc\":\"Optional list of chunk properties.\"}]},\"bytes\"]},{\"name\":\"sessionInit\",\"type\":[\"null\",\"SessionInit\"],\"doc\":\"Contains information that can be used to start an initiated flow, piggybacked on initial data messages. Will be null for messages sent to the initiator. Will be null when initiated party is confirmed to be present to ensure out of order messages that arrive first contain this info.\"}]},{\"type\":\"record\",\"name\":\"SessionClose\",\"namespace\":\"net.corda.data.flow.event.session\",\"doc\":\"Sent to a counterparty to indicate that this party is ready to close the session.\",\"fields\":[]},{\"type\":\"record\",\"name\":\"SessionError\",\"namespace\":\"net.corda.data.flow.event.session\",\"doc\":\"Error message sent to counterparty to indicate a fatal error has occurred and both parties should abort the session\",\"fields\":[{\"name\":\"errorMessage\",\"type\":\"net.corda.data.ExceptionEnvelope\"}]}]},{\"name\":\"contextSessionProperties\",\"type\":[\"null\",\"net.corda.data.KeyValuePairList\"],\"doc\":\"A map of context properties received from a counterparty related to this flow session. This is static data and will be set to null when previously sent.\"}]},{\"type\":\"record\",\"name\":\"ExternalEventResponse\",\"namespace\":\"net.corda.data.flow.event.external\",\"doc\":\"Events from workers that are sent back to the flow worker to be consumed by flows\",\"fields\":[{\"name\":\"requestId\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"The request id of the external event response. Used for deduplication when processing responses.\"},{\"name\":\"payload\",\"type\":[\"null\",\"bytes\"],\"doc\":\"Avro serialized representation of the underlying payload that can contain domain specific information that is not serialized into the {@link ExternalEventResponse#data} property. `null` if the response represents an error and therefore does not have a payload to respond with.\"},{\"name\":\"error\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"ExternalEventResponseError\",\"doc\":\"Included in {@link ExternalEventResponse}s that have failed to be processed. The failure type may be further categorised by the {@link Error} enumeration.\",\"fields\":[{\"name\":\"errorType\",\"type\":{\"type\":\"enum\",\"name\":\"ExternalEventResponseErrorType\",\"doc\":\"TRANSIENT: A transient error.\\nPLATFORM: An error that should be propagated to the calling flow.\\nFATAL: A fatal error.\",\"symbols\":[\"TRANSIENT\",\"PLATFORM\",\"FATAL\"]},\"doc\":\"Error type\"},{\"name\":\"exception\",\"type\":\"net.corda.data.ExceptionEnvelope\",\"doc\":\"Exception information\"}]}],\"doc\":\"The {@link ExternalEventResponseError} representing an error that occurred when processing the incoming external event request. `null` if the request was successfully processed and there is no error to respond with.\"},{\"name\":\"timestamp\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},\"doc\":\"Time ([Instant]) in milliseconds when the response was created.\"}]}]}]},\"doc\":\"Copy of the event that caused the failure\"},{\"name\":\"error\",\"type\":\"net.corda.data.ExceptionEnvelope\",\"doc\":\"The original error that caused the retry\"},{\"name\":\"firstFailureTimestamp\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},\"doc\":\"The timestamp of when the first exception occurred that triggered a retry\"},{\"name\":\"lastFailureTimestamp\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},\"doc\":\"The timestamp of when the last exception occurred that triggered a retry (this will be the same as firstRetryTimestamp for a first time failure\"}]}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + + private static final SpecificData MODEL$ = new SpecificData(); + static { + MODEL$.addLogicalTypeConversion(new org.apache.avro.data.TimeConversions.TimestampMillisConversion()); + } + + private static final BinaryMessageEncoder ENCODER = + new BinaryMessageEncoder(MODEL$, SCHEMA$); + + private static final BinaryMessageDecoder DECODER = + new BinaryMessageDecoder(MODEL$, SCHEMA$); + + /** + * Return the BinaryMessageEncoder instance used by this class. + * @return the message encoder used by this class + */ + public static BinaryMessageEncoder getEncoder() { + return ENCODER; + } + + /** + * Return the BinaryMessageDecoder instance used by this class. + * @return the message decoder used by this class + */ + public static BinaryMessageDecoder getDecoder() { + return DECODER; + } + + /** + * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}. + * @param resolver a {@link SchemaStore} used to find schemas by fingerprint + * @return a BinaryMessageDecoder instance for this class backed by the given SchemaStore + */ + public static BinaryMessageDecoder createDecoder(SchemaStore resolver) { + return new BinaryMessageDecoder(MODEL$, SCHEMA$, resolver); + } + + /** + * Serializes this RetryState to a ByteBuffer. + * @return a buffer holding the serialized data for this instance + * @throws java.io.IOException if this instance could not be serialized + */ + public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException { + return ENCODER.encode(this); + } + + /** + * Deserializes a RetryState from a ByteBuffer. + * @param b a byte buffer holding serialized data for an instance of this class + * @return a RetryState instance decoded from the given buffer + * @throws java.io.IOException if the given bytes could not be deserialized into an instance of this class + */ + public static RetryState fromByteBuffer( + java.nio.ByteBuffer b) throws java.io.IOException { + return DECODER.decode(b); + } + + /** The current retry count, set to 0 for the initial failure */ + private int retryCount; + /** Copy of the event that caused the failure */ + private net.corda.data.flow.event.FlowEvent failedEvent; + /** The original error that caused the retry */ + private net.corda.data.ExceptionEnvelope error; + /** The timestamp of when the first exception occurred that triggered a retry */ + private java.time.Instant firstFailureTimestamp; + /** The timestamp of when the last exception occurred that triggered a retry (this will be the same as firstRetryTimestamp for a first time failure */ + private java.time.Instant lastFailureTimestamp; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use newBuilder(). + */ + public RetryState() {} + + /** + * All-args constructor. + * @param retryCount The current retry count, set to 0 for the initial failure + * @param failedEvent Copy of the event that caused the failure + * @param error The original error that caused the retry + * @param firstFailureTimestamp The timestamp of when the first exception occurred that triggered a retry + * @param lastFailureTimestamp The timestamp of when the last exception occurred that triggered a retry (this will be the same as firstRetryTimestamp for a first time failure + */ + public RetryState(Integer retryCount, net.corda.data.flow.event.FlowEvent failedEvent, net.corda.data.ExceptionEnvelope error, java.time.Instant firstFailureTimestamp, java.time.Instant lastFailureTimestamp) { + this.retryCount = retryCount; + this.failedEvent = failedEvent; + this.error = error; + this.firstFailureTimestamp = firstFailureTimestamp.truncatedTo(java.time.temporal.ChronoUnit.MILLIS); + this.lastFailureTimestamp = lastFailureTimestamp.truncatedTo(java.time.temporal.ChronoUnit.MILLIS); + } + + public SpecificData getSpecificData() { return MODEL$; } + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public Object get(int field$) { + switch (field$) { + case 0: return retryCount; + case 1: return failedEvent; + case 2: return error; + case 3: return firstFailureTimestamp; + case 4: return lastFailureTimestamp; + default: throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + private static final org.apache.avro.Conversion[] conversions = + new org.apache.avro.Conversion[] { + null, + null, + null, + new org.apache.avro.data.TimeConversions.TimestampMillisConversion(), + new org.apache.avro.data.TimeConversions.TimestampMillisConversion(), + null + }; + + @Override + public org.apache.avro.Conversion getConversion(int field) { + return conversions[field]; + } + + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, Object value$) { + switch (field$) { + case 0: retryCount = (Integer)value$; break; + case 1: failedEvent = (net.corda.data.flow.event.FlowEvent)value$; break; + case 2: error = (net.corda.data.ExceptionEnvelope)value$; break; + case 3: firstFailureTimestamp = (java.time.Instant)value$; break; + case 4: lastFailureTimestamp = (java.time.Instant)value$; break; + default: throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + /** + * Gets the value of the 'retryCount' field. + * @return The current retry count, set to 0 for the initial failure + */ + public int getRetryCount() { + return retryCount; + } + + + /** + * Sets the value of the 'retryCount' field. + * The current retry count, set to 0 for the initial failure + * @param value the value to set. + */ + public void setRetryCount(int value) { + this.retryCount = value; + } + + /** + * Gets the value of the 'failedEvent' field. + * @return Copy of the event that caused the failure + */ + public net.corda.data.flow.event.FlowEvent getFailedEvent() { + return failedEvent; + } + + + /** + * Sets the value of the 'failedEvent' field. + * Copy of the event that caused the failure + * @param value the value to set. + */ + public void setFailedEvent(net.corda.data.flow.event.FlowEvent value) { + this.failedEvent = value; + } + + /** + * Gets the value of the 'error' field. + * @return The original error that caused the retry + */ + public net.corda.data.ExceptionEnvelope getError() { + return error; + } + + + /** + * Sets the value of the 'error' field. + * The original error that caused the retry + * @param value the value to set. + */ + public void setError(net.corda.data.ExceptionEnvelope value) { + this.error = value; + } + + /** + * Gets the value of the 'firstFailureTimestamp' field. + * @return The timestamp of when the first exception occurred that triggered a retry + */ + public java.time.Instant getFirstFailureTimestamp() { + return firstFailureTimestamp; + } + + + /** + * Sets the value of the 'firstFailureTimestamp' field. + * The timestamp of when the first exception occurred that triggered a retry + * @param value the value to set. + */ + public void setFirstFailureTimestamp(java.time.Instant value) { + this.firstFailureTimestamp = value.truncatedTo(java.time.temporal.ChronoUnit.MILLIS); + } + + /** + * Gets the value of the 'lastFailureTimestamp' field. + * @return The timestamp of when the last exception occurred that triggered a retry (this will be the same as firstRetryTimestamp for a first time failure + */ + public java.time.Instant getLastFailureTimestamp() { + return lastFailureTimestamp; + } + + + /** + * Sets the value of the 'lastFailureTimestamp' field. + * The timestamp of when the last exception occurred that triggered a retry (this will be the same as firstRetryTimestamp for a first time failure + * @param value the value to set. + */ + public void setLastFailureTimestamp(java.time.Instant value) { + this.lastFailureTimestamp = value.truncatedTo(java.time.temporal.ChronoUnit.MILLIS); + } + + /** + * Creates a new RetryState RecordBuilder. + * @return A new RetryState RecordBuilder + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Creates a new RetryState RecordBuilder by copying an existing Builder. + * @param other The existing builder to copy. + * @return A new RetryState RecordBuilder + */ + public static Builder newBuilder(Builder other) { + if (other == null) { + return new Builder(); + } else { + return new Builder(other); + } + } + + /** + * Creates a new RetryState RecordBuilder by copying an existing RetryState instance. + * @param other The existing instance to copy. + * @return A new RetryState RecordBuilder + */ + public static Builder newBuilder(RetryState other) { + if (other == null) { + return new Builder(); + } else { + return new Builder(other); + } + } + + /** + * RecordBuilder for RetryState instances. + */ + @org.apache.avro.specific.AvroGenerated + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase + implements org.apache.avro.data.RecordBuilder { + + /** The current retry count, set to 0 for the initial failure */ + private int retryCount; + /** Copy of the event that caused the failure */ + private net.corda.data.flow.event.FlowEvent failedEvent; + private net.corda.data.flow.event.FlowEvent.Builder failedEventBuilder; + /** The original error that caused the retry */ + private net.corda.data.ExceptionEnvelope error; + private net.corda.data.ExceptionEnvelope.Builder errorBuilder; + /** The timestamp of when the first exception occurred that triggered a retry */ + private java.time.Instant firstFailureTimestamp; + /** The timestamp of when the last exception occurred that triggered a retry (this will be the same as firstRetryTimestamp for a first time failure */ + private java.time.Instant lastFailureTimestamp; + + /** Creates a new Builder */ + private Builder() { + super(SCHEMA$, MODEL$); + } + + /** + * Creates a Builder by copying an existing Builder. + * @param other The existing Builder to copy. + */ + private Builder(Builder other) { + super(other); + if (isValidValue(fields()[0], other.retryCount)) { + this.retryCount = data().deepCopy(fields()[0].schema(), other.retryCount); + fieldSetFlags()[0] = other.fieldSetFlags()[0]; + } + if (isValidValue(fields()[1], other.failedEvent)) { + this.failedEvent = data().deepCopy(fields()[1].schema(), other.failedEvent); + fieldSetFlags()[1] = other.fieldSetFlags()[1]; + } + if (other.hasFailedEventBuilder()) { + this.failedEventBuilder = net.corda.data.flow.event.FlowEvent.newBuilder(other.getFailedEventBuilder()); + } + if (isValidValue(fields()[2], other.error)) { + this.error = data().deepCopy(fields()[2].schema(), other.error); + fieldSetFlags()[2] = other.fieldSetFlags()[2]; + } + if (other.hasErrorBuilder()) { + this.errorBuilder = net.corda.data.ExceptionEnvelope.newBuilder(other.getErrorBuilder()); + } + if (isValidValue(fields()[3], other.firstFailureTimestamp)) { + this.firstFailureTimestamp = data().deepCopy(fields()[3].schema(), other.firstFailureTimestamp); + fieldSetFlags()[3] = other.fieldSetFlags()[3]; + } + if (isValidValue(fields()[4], other.lastFailureTimestamp)) { + this.lastFailureTimestamp = data().deepCopy(fields()[4].schema(), other.lastFailureTimestamp); + fieldSetFlags()[4] = other.fieldSetFlags()[4]; + } + } + + /** + * Creates a Builder by copying an existing RetryState instance + * @param other The existing instance to copy. + */ + private Builder(RetryState other) { + super(SCHEMA$, MODEL$); + if (isValidValue(fields()[0], other.retryCount)) { + this.retryCount = data().deepCopy(fields()[0].schema(), other.retryCount); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.failedEvent)) { + this.failedEvent = data().deepCopy(fields()[1].schema(), other.failedEvent); + fieldSetFlags()[1] = true; + } + this.failedEventBuilder = null; + if (isValidValue(fields()[2], other.error)) { + this.error = data().deepCopy(fields()[2].schema(), other.error); + fieldSetFlags()[2] = true; + } + this.errorBuilder = null; + if (isValidValue(fields()[3], other.firstFailureTimestamp)) { + this.firstFailureTimestamp = data().deepCopy(fields()[3].schema(), other.firstFailureTimestamp); + fieldSetFlags()[3] = true; + } + if (isValidValue(fields()[4], other.lastFailureTimestamp)) { + this.lastFailureTimestamp = data().deepCopy(fields()[4].schema(), other.lastFailureTimestamp); + fieldSetFlags()[4] = true; + } + } + + /** + * Gets the value of the 'retryCount' field. + * The current retry count, set to 0 for the initial failure + * @return The value. + */ + public int getRetryCount() { + return retryCount; + } + + + /** + * Sets the value of the 'retryCount' field. + * The current retry count, set to 0 for the initial failure + * @param value The value of 'retryCount'. + * @return This builder. + */ + public Builder setRetryCount(int value) { + validate(fields()[0], value); + this.retryCount = value; + fieldSetFlags()[0] = true; + return this; + } + + /** + * Checks whether the 'retryCount' field has been set. + * The current retry count, set to 0 for the initial failure + * @return True if the 'retryCount' field has been set, false otherwise. + */ + public boolean hasRetryCount() { + return fieldSetFlags()[0]; + } + + + /** + * Clears the value of the 'retryCount' field. + * The current retry count, set to 0 for the initial failure + * @return This builder. + */ + public Builder clearRetryCount() { + fieldSetFlags()[0] = false; + return this; + } + + /** + * Gets the value of the 'failedEvent' field. + * Copy of the event that caused the failure + * @return The value. + */ + public net.corda.data.flow.event.FlowEvent getFailedEvent() { + return failedEvent; + } + + + /** + * Sets the value of the 'failedEvent' field. + * Copy of the event that caused the failure + * @param value The value of 'failedEvent'. + * @return This builder. + */ + public Builder setFailedEvent(net.corda.data.flow.event.FlowEvent value) { + validate(fields()[1], value); + this.failedEventBuilder = null; + this.failedEvent = value; + fieldSetFlags()[1] = true; + return this; + } + + /** + * Checks whether the 'failedEvent' field has been set. + * Copy of the event that caused the failure + * @return True if the 'failedEvent' field has been set, false otherwise. + */ + public boolean hasFailedEvent() { + return fieldSetFlags()[1]; + } + + /** + * Gets the Builder instance for the 'failedEvent' field and creates one if it doesn't exist yet. + * Copy of the event that caused the failure + * @return This builder. + */ + public net.corda.data.flow.event.FlowEvent.Builder getFailedEventBuilder() { + if (failedEventBuilder == null) { + if (hasFailedEvent()) { + setFailedEventBuilder(net.corda.data.flow.event.FlowEvent.newBuilder(failedEvent)); + } else { + setFailedEventBuilder(net.corda.data.flow.event.FlowEvent.newBuilder()); + } + } + return failedEventBuilder; + } + + /** + * Sets the Builder instance for the 'failedEvent' field + * Copy of the event that caused the failure + * @param value The builder instance that must be set. + * @return This builder. + */ + + public Builder setFailedEventBuilder(net.corda.data.flow.event.FlowEvent.Builder value) { + clearFailedEvent(); + failedEventBuilder = value; + return this; + } + + /** + * Checks whether the 'failedEvent' field has an active Builder instance + * Copy of the event that caused the failure + * @return True if the 'failedEvent' field has an active Builder instance + */ + public boolean hasFailedEventBuilder() { + return failedEventBuilder != null; + } + + /** + * Clears the value of the 'failedEvent' field. + * Copy of the event that caused the failure + * @return This builder. + */ + public Builder clearFailedEvent() { + failedEvent = null; + failedEventBuilder = null; + fieldSetFlags()[1] = false; + return this; + } + + /** + * Gets the value of the 'error' field. + * The original error that caused the retry + * @return The value. + */ + public net.corda.data.ExceptionEnvelope getError() { + return error; + } + + + /** + * Sets the value of the 'error' field. + * The original error that caused the retry + * @param value The value of 'error'. + * @return This builder. + */ + public Builder setError(net.corda.data.ExceptionEnvelope value) { + validate(fields()[2], value); + this.errorBuilder = null; + this.error = value; + fieldSetFlags()[2] = true; + return this; + } + + /** + * Checks whether the 'error' field has been set. + * The original error that caused the retry + * @return True if the 'error' field has been set, false otherwise. + */ + public boolean hasError() { + return fieldSetFlags()[2]; + } + + /** + * Gets the Builder instance for the 'error' field and creates one if it doesn't exist yet. + * The original error that caused the retry + * @return This builder. + */ + public net.corda.data.ExceptionEnvelope.Builder getErrorBuilder() { + if (errorBuilder == null) { + if (hasError()) { + setErrorBuilder(net.corda.data.ExceptionEnvelope.newBuilder(error)); + } else { + setErrorBuilder(net.corda.data.ExceptionEnvelope.newBuilder()); + } + } + return errorBuilder; + } + + /** + * Sets the Builder instance for the 'error' field + * The original error that caused the retry + * @param value The builder instance that must be set. + * @return This builder. + */ + + public Builder setErrorBuilder(net.corda.data.ExceptionEnvelope.Builder value) { + clearError(); + errorBuilder = value; + return this; + } + + /** + * Checks whether the 'error' field has an active Builder instance + * The original error that caused the retry + * @return True if the 'error' field has an active Builder instance + */ + public boolean hasErrorBuilder() { + return errorBuilder != null; + } + + /** + * Clears the value of the 'error' field. + * The original error that caused the retry + * @return This builder. + */ + public Builder clearError() { + error = null; + errorBuilder = null; + fieldSetFlags()[2] = false; + return this; + } + + /** + * Gets the value of the 'firstFailureTimestamp' field. + * The timestamp of when the first exception occurred that triggered a retry + * @return The value. + */ + public java.time.Instant getFirstFailureTimestamp() { + return firstFailureTimestamp; + } + + + /** + * Sets the value of the 'firstFailureTimestamp' field. + * The timestamp of when the first exception occurred that triggered a retry + * @param value The value of 'firstFailureTimestamp'. + * @return This builder. + */ + public Builder setFirstFailureTimestamp(java.time.Instant value) { + validate(fields()[3], value); + this.firstFailureTimestamp = value.truncatedTo(java.time.temporal.ChronoUnit.MILLIS); + fieldSetFlags()[3] = true; + return this; + } + + /** + * Checks whether the 'firstFailureTimestamp' field has been set. + * The timestamp of when the first exception occurred that triggered a retry + * @return True if the 'firstFailureTimestamp' field has been set, false otherwise. + */ + public boolean hasFirstFailureTimestamp() { + return fieldSetFlags()[3]; + } + + + /** + * Clears the value of the 'firstFailureTimestamp' field. + * The timestamp of when the first exception occurred that triggered a retry + * @return This builder. + */ + public Builder clearFirstFailureTimestamp() { + fieldSetFlags()[3] = false; + return this; + } + + /** + * Gets the value of the 'lastFailureTimestamp' field. + * The timestamp of when the last exception occurred that triggered a retry (this will be the same as firstRetryTimestamp for a first time failure + * @return The value. + */ + public java.time.Instant getLastFailureTimestamp() { + return lastFailureTimestamp; + } + + + /** + * Sets the value of the 'lastFailureTimestamp' field. + * The timestamp of when the last exception occurred that triggered a retry (this will be the same as firstRetryTimestamp for a first time failure + * @param value The value of 'lastFailureTimestamp'. + * @return This builder. + */ + public Builder setLastFailureTimestamp(java.time.Instant value) { + validate(fields()[4], value); + this.lastFailureTimestamp = value.truncatedTo(java.time.temporal.ChronoUnit.MILLIS); + fieldSetFlags()[4] = true; + return this; + } + + /** + * Checks whether the 'lastFailureTimestamp' field has been set. + * The timestamp of when the last exception occurred that triggered a retry (this will be the same as firstRetryTimestamp for a first time failure + * @return True if the 'lastFailureTimestamp' field has been set, false otherwise. + */ + public boolean hasLastFailureTimestamp() { + return fieldSetFlags()[4]; + } + + + /** + * Clears the value of the 'lastFailureTimestamp' field. + * The timestamp of when the last exception occurred that triggered a retry (this will be the same as firstRetryTimestamp for a first time failure + * @return This builder. + */ + public Builder clearLastFailureTimestamp() { + fieldSetFlags()[4] = false; + return this; + } + + @Override + @SuppressWarnings("unchecked") + public RetryState build() { + try { + RetryState record = new RetryState(); + record.retryCount = fieldSetFlags()[0] ? this.retryCount : (Integer) defaultValue(fields()[0]); + if (failedEventBuilder != null) { + try { + record.failedEvent = this.failedEventBuilder.build(); + } catch (org.apache.avro.AvroMissingFieldException e) { + e.addParentField(record.getSchema().getField("failedEvent")); + throw e; + } + } else { + record.failedEvent = fieldSetFlags()[1] ? this.failedEvent : (net.corda.data.flow.event.FlowEvent) defaultValue(fields()[1]); + } + if (errorBuilder != null) { + try { + record.error = this.errorBuilder.build(); + } catch (org.apache.avro.AvroMissingFieldException e) { + e.addParentField(record.getSchema().getField("error")); + throw e; + } + } else { + record.error = fieldSetFlags()[2] ? this.error : (net.corda.data.ExceptionEnvelope) defaultValue(fields()[2]); + } + record.firstFailureTimestamp = fieldSetFlags()[3] ? this.firstFailureTimestamp : (java.time.Instant) defaultValue(fields()[3]); + record.lastFailureTimestamp = fieldSetFlags()[4] ? this.lastFailureTimestamp : (java.time.Instant) defaultValue(fields()[4]); + return record; + } catch (org.apache.avro.AvroMissingFieldException e) { + throw e; + } catch (Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumWriter + WRITER$ = (org.apache.avro.io.DatumWriter)MODEL$.createDatumWriter(SCHEMA$); + + @Override public void writeExternal(java.io.ObjectOutput out) + throws java.io.IOException { + WRITER$.write(this, SpecificData.getEncoder(out)); + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumReader + READER$ = (org.apache.avro.io.DatumReader)MODEL$.createDatumReader(SCHEMA$); + + @Override public void readExternal(java.io.ObjectInput in) + throws java.io.IOException { + READER$.read(this, SpecificData.getDecoder(in)); + } + +} + + + + + + + + + + diff --git a/data/avro-schema/src/test/kotlin/net/corda/data/flow/state/checkpoint/CheckpointSchemaCompatibilityTest.kt b/data/avro-schema/src/test/kotlin/net/corda/data/flow/state/checkpoint/CheckpointSchemaCompatibilityTest.kt deleted file mode 100644 index 9eda116029..0000000000 --- a/data/avro-schema/src/test/kotlin/net/corda/data/flow/state/checkpoint/CheckpointSchemaCompatibilityTest.kt +++ /dev/null @@ -1,73 +0,0 @@ -package net.corda.data.flow.state.checkpoint - -import net.corda.data.KeyValuePairList -import org.apache.avro.Schema -import org.apache.avro.SchemaCompatibility -import org.junit.jupiter.api.Assertions -import org.junit.jupiter.api.Test - -class CheckpointSchemaCompatibilityTest { - - @Test - fun `Flow checkpoint schema changes between Corda 5_0 and 5_1 are compatible`() { - val oldSchemaJson = """ - { - "type": "record", - "name": "Checkpoint", - "namespace": "net.corda.data.flow.state.checkpoint", - "doc": "Represents the current state of a flow, plus information required to operate the flow engine.", - "fields": [ - { - "name": "flowId", - "type": "string", - "doc": "Internal, globally unique key for a flow instance." - }, - { - "name": "initialPlatformVersion", - "type": "int", - "doc": "The platform version at the time the flow was started." - }, - { - "name": "pipelineState", - "type": "net.corda.data.flow.state.checkpoint.PipelineState", - "doc": "State required by the pipeline, e.g. to support retries." - }, - { - "name": "flowState", - "type": [ - "null", - "net.corda.data.flow.state.checkpoint.FlowState" - ], - "doc": "Current flow execution state. Null if the flow has not yet been started, for example in the face of a retry-able error." - }, - { - "name": "flowMetricsState", - "type": "string", - "default": "{}", - "doc": "Internal storage for recording flow metrics" - } - ] - } - """.trimIndent() - - val oldSchema = Schema.Parser() - .addTypes( - mapOf( - PipelineState::class.java.name to PipelineState.`SCHEMA$`, - FlowState::class.java.name to FlowState.`SCHEMA$`, - KeyValuePairList::class.java.name to KeyValuePairList.`SCHEMA$` - ) - ) - .parse(oldSchemaJson) - - val newSchema = Checkpoint.`SCHEMA$` - - val compatibility = SchemaCompatibility.checkReaderWriterCompatibility(newSchema, oldSchema) - - Assertions.assertEquals( - compatibility.type, - SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE, - "Failed due to incompatible change. ${compatibility.description}" - ) - } -} diff --git a/data/avro-schema/src/test/kotlin/net/corda/data/flow/state/checkpoint/FlowStatusSchemaCompatibilityTest.kt b/data/avro-schema/src/test/kotlin/net/corda/data/flow/state/checkpoint/FlowStatusSchemaCompatibilityTest.kt new file mode 100644 index 0000000000..acdf8e4b8d --- /dev/null +++ b/data/avro-schema/src/test/kotlin/net/corda/data/flow/state/checkpoint/FlowStatusSchemaCompatibilityTest.kt @@ -0,0 +1,113 @@ +package net.corda.data.flow.state.checkpoint + +import net.corda.data.ExceptionEnvelope +import net.corda.data.flow.FlowInitiatorType +import net.corda.data.flow.FlowKey +import net.corda.data.flow.output.FlowStatus +import org.apache.avro.Schema +import org.apache.avro.SchemaCompatibility +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test + +class FlowStatusSchemaCompatibilityTest { + + @Test + fun `Flow status schema changes between Corda 5_1 and 5_2 are compatible`() { + val oldSchemaJson = """ + { + "type": "record", + "name": "FlowStatus", + "namespace": "net.corda.data.flow.output", + "doc": "The Flow Status represents the current processing state of a flow at a given point in time", + "fields": [ + { + "name": "key", + "type": "net.corda.data.flow.FlowKey", + "doc": "The unique ID for the status" + }, + { + "name": "initiatorType", + "type": "net.corda.data.flow.FlowInitiatorType", + "doc": "The type of initiator that started the flow" + }, + { + "name": "flowId", + "type": ["null", "string"], + "doc": "The unique flow ID" + }, + { + "name": "flowClassName", + "type": "string", + "doc": "The fully qualified class name of the flow" + }, + { + "name": "flowStatus", + "doc": "The current processing status of a flow" , + "type": { + "name": "FlowStates", + "type": "enum", + "symbols": [ + "START_REQUESTED", + "RUNNING", + "RETRYING", + "COMPLETED", + "FAILED", + "KILLED" + ] + } + }, + { + "name": "result", + "type": ["null", "string"], + "doc": "Optional result, this contains the result from the flow will only be set if the flow status is 'COMPLETED'" + }, + { + "name": "error", + "type": ["null", "net.corda.data.ExceptionEnvelope"], + "doc": "Optional error message, this will be set if the flow status is 'FAILED'" + }, + { + "name": "processingTerminatedReason", + "type": ["null", "string"], + "doc": "Optional message indicating reasoning why processing a flow has been terminated." + }, + { + "name": "createdTimestamp", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + }, + "doc": "The date the flow was created." + }, + { + "name": "lastUpdateTimestamp", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + }, + "doc": "The date and time this status update was published." + } + ] + } + """.trimIndent() + + val oldSchema = Schema.Parser() + .addTypes( + mapOf( + FlowKey::class.java.name to FlowKey.`SCHEMA$`, + FlowInitiatorType::class.java.name to FlowInitiatorType.`SCHEMA$`, + ExceptionEnvelope::class.java.name to ExceptionEnvelope.`SCHEMA$`, + ) + ) + .parse(oldSchemaJson) + + val newSchema = FlowStatus.`SCHEMA$` + val compatibility = SchemaCompatibility.checkReaderWriterCompatibility(newSchema, oldSchema) + + Assertions.assertEquals( + compatibility.type, + SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE, + "Failed due to incompatible change. ${compatibility.description}" + ) + } +} diff --git a/data/avro-schema/src/test/kotlin/net/corda/data/flow/state/checkpoint/PipelineStateSchemaCompatibilityTest.kt b/data/avro-schema/src/test/kotlin/net/corda/data/flow/state/checkpoint/PipelineStateSchemaCompatibilityTest.kt new file mode 100644 index 0000000000..f63872827c --- /dev/null +++ b/data/avro-schema/src/test/kotlin/net/corda/data/flow/state/checkpoint/PipelineStateSchemaCompatibilityTest.kt @@ -0,0 +1,70 @@ +package net.corda.data.flow.state.checkpoint + +import checkpoint.RetryState +import net.corda.data.ExceptionEnvelope +import net.corda.data.crypto.SecureHash +import org.apache.avro.Schema +import org.apache.avro.SchemaCompatibility +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test + +class PipelineStateSchemaCompatibilityTest { + + @Test + fun `Pipeline state schema changes between Corda 5_1 and 5_2 are compatible`() { + val oldSchemaJson = """ + { + "type": "record", + "name": "PipelineState", + "namespace": "net.corda.data.flow.state.checkpoint", + "doc": "State used by the flow engine to track pipeline details and provide diagnostics.", + "fields": [ + { + "name": "retryState", + "type": ["null", "net.corda.data.flow.state.checkpoint.RetryState"], + "default": null, + "doc": "Optional retry information for a failed flow event. Setting this field marks the flow as retrying." + }, + { + "name": "maxFlowSleepDuration", + "type": "int", + "doc": "The maximum time a flow can sleep, before a Wakeup event is generated (milliseconds)" + }, + { + "name": "pendingPlatformError", + "type": ["null", "net.corda.data.ExceptionEnvelope"], + "default": null, + "doc": "Used for platform generated errors reported back to user code." + }, + { + "name": "cpkFileHashes", + "type": { + "type": "array", + "items": "net.corda.data.crypto.SecureHash" + }, + "doc": "Array of stored cpkFileHashes from the Virtual Node." + } + ] + } + """.trimIndent() + + val oldSchema = Schema.Parser() + .addTypes( + mapOf( + SecureHash::class.java.name to SecureHash.`SCHEMA$`, + RetryState::class.java.name to RetryState.`SCHEMA$`, + ExceptionEnvelope::class.java.name to ExceptionEnvelope.`SCHEMA$`, + ) + ) + .parse(oldSchemaJson) + + val newSchema = PipelineState.`SCHEMA$` + val compatibility = SchemaCompatibility.checkReaderWriterCompatibility(newSchema, oldSchema) + + Assertions.assertEquals( + compatibility.type, + SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE, + "Failed due to incompatible change. ${compatibility.description}" + ) + } +} diff --git a/data/config-schema/src/main/resources/net/corda/schema/configuration/flow/1.0/corda.flow.json b/data/config-schema/src/main/resources/net/corda/schema/configuration/flow/1.0/corda.flow.json index baa488793f..36f1df75da 100644 --- a/data/config-schema/src/main/resources/net/corda/schema/configuration/flow/1.0/corda.flow.json +++ b/data/config-schema/src/main/resources/net/corda/schema/configuration/flow/1.0/corda.flow.json @@ -33,7 +33,7 @@ "default": 16000 }, "maxFlowSleepDuration": { - "description": "The maximum delay in milliseconds before Corda schedules a periodic wake-up.", + "description": "Deprecated. The maximum delay in milliseconds before Corda schedules a periodic wake-up.", "type": "integer", "minimum": 1000, "maximum": 2147483647, diff --git a/gradle.properties b/gradle.properties index ea7a8a5723..5d8f6d2127 100644 --- a/gradle.properties +++ b/gradle.properties @@ -5,7 +5,7 @@ cordaProductVersion = 5.2.0 # NOTE: update this each time this module contains a breaking change ## NOTE: currently this is a top level revision, so all API versions will line up, but this could be moved to ## a per module property in which case module versions can change independently. -cordaApiRevision = 25 +cordaApiRevision = 26 # Main kotlin.stdlib.default.dependency = false