Skip to content

Commit

Permalink
CORE-18533: Remove Checkpoint Retry (#1429)
Browse files Browse the repository at this point in the history
- Remove "RetryState" avro schema.
- Move "FlowStates" enumeration to its own avro schema definition file.
- Remove "RETRYING" and set "FAILED" as the default value (for schema
  evolution, unmatched values do not throw an error but are resolved 
  to the default instead).
- Remove "retryState" and "maxFlowSleepDuration" fields from the
  "PipelineState" avro schema.
- Remove "CheckpointSchemaCompatibilityTest" (we only support evolving
  types one minor version at a time).
- Add schema compatibility tests for "FlowStates" and "PipelineState".
  • Loading branch information
jujoramos authored Jan 5, 2024
1 parent 2590fcd commit c23d34c
Show file tree
Hide file tree
Showing 10 changed files with 932 additions and 138 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"type": "enum",
"name": "FlowStates",
"namespace": "net.corda.data.flow.output",
"doc": "The current processing status of a flow" ,
"symbols": [
"START_REQUESTED",
"RUNNING",
"COMPLETED",
"FAILED",
"KILLED"
],
"default": "FAILED"
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,8 @@
},
{
"name": "flowStatus",
"doc": "The current processing status of a flow" ,
"type": {
"name": "FlowStates",
"type": "enum",
"symbols": [
"START_REQUESTED",
"RUNNING",
"RETRYING",
"COMPLETED",
"FAILED",
"KILLED"
]
}
"type": "net.corda.data.flow.output.FlowStates",
"doc": "The current processing status of a flow"
},
{
"name": "result",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,6 @@
"namespace": "net.corda.data.flow.state.checkpoint",
"doc": "State used by the flow engine to track pipeline details and provide diagnostics.",
"fields": [
{
"name": "retryState",
"type": ["null", "net.corda.data.flow.state.checkpoint.RetryState"],
"default": null,
"doc": "Optional retry information for a failed flow event. Setting this field marks the flow as retrying."
},
{
"name": "maxFlowSleepDuration",
"type": "int",
"doc": "The maximum time a flow can sleep, before a Wakeup event is generated (milliseconds)"
},
{
"name": "pendingPlatformError",
"type": ["null", "net.corda.data.ExceptionEnvelope"],
Expand Down

This file was deleted.

731 changes: 731 additions & 0 deletions data/avro-schema/src/test/java/checkpoint/RetryState.java

Large diffs are not rendered by default.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package net.corda.data.flow.state.checkpoint

import net.corda.data.ExceptionEnvelope
import net.corda.data.flow.FlowInitiatorType
import net.corda.data.flow.FlowKey
import net.corda.data.flow.output.FlowStatus
import org.apache.avro.Schema
import org.apache.avro.SchemaCompatibility
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

class FlowStatusSchemaCompatibilityTest {

@Test
fun `Flow status schema changes between Corda 5_1 and 5_2 are compatible`() {
val oldSchemaJson = """
{
"type": "record",
"name": "FlowStatus",
"namespace": "net.corda.data.flow.output",
"doc": "The Flow Status represents the current processing state of a flow at a given point in time",
"fields": [
{
"name": "key",
"type": "net.corda.data.flow.FlowKey",
"doc": "The unique ID for the status"
},
{
"name": "initiatorType",
"type": "net.corda.data.flow.FlowInitiatorType",
"doc": "The type of initiator that started the flow"
},
{
"name": "flowId",
"type": ["null", "string"],
"doc": "The unique flow ID"
},
{
"name": "flowClassName",
"type": "string",
"doc": "The fully qualified class name of the flow"
},
{
"name": "flowStatus",
"doc": "The current processing status of a flow" ,
"type": {
"name": "FlowStates",
"type": "enum",
"symbols": [
"START_REQUESTED",
"RUNNING",
"RETRYING",
"COMPLETED",
"FAILED",
"KILLED"
]
}
},
{
"name": "result",
"type": ["null", "string"],
"doc": "Optional result, this contains the result from the flow will only be set if the flow status is 'COMPLETED'"
},
{
"name": "error",
"type": ["null", "net.corda.data.ExceptionEnvelope"],
"doc": "Optional error message, this will be set if the flow status is 'FAILED'"
},
{
"name": "processingTerminatedReason",
"type": ["null", "string"],
"doc": "Optional message indicating reasoning why processing a flow has been terminated."
},
{
"name": "createdTimestamp",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
},
"doc": "The date the flow was created."
},
{
"name": "lastUpdateTimestamp",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
},
"doc": "The date and time this status update was published."
}
]
}
""".trimIndent()

val oldSchema = Schema.Parser()
.addTypes(
mapOf(
FlowKey::class.java.name to FlowKey.`SCHEMA$`,
FlowInitiatorType::class.java.name to FlowInitiatorType.`SCHEMA$`,
ExceptionEnvelope::class.java.name to ExceptionEnvelope.`SCHEMA$`,
)
)
.parse(oldSchemaJson)

val newSchema = FlowStatus.`SCHEMA$`
val compatibility = SchemaCompatibility.checkReaderWriterCompatibility(newSchema, oldSchema)

Assertions.assertEquals(
compatibility.type,
SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE,
"Failed due to incompatible change. ${compatibility.description}"
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package net.corda.data.flow.state.checkpoint

import checkpoint.RetryState
import net.corda.data.ExceptionEnvelope
import net.corda.data.crypto.SecureHash
import org.apache.avro.Schema
import org.apache.avro.SchemaCompatibility
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

class PipelineStateSchemaCompatibilityTest {

@Test
fun `Pipeline state schema changes between Corda 5_1 and 5_2 are compatible`() {
val oldSchemaJson = """
{
"type": "record",
"name": "PipelineState",
"namespace": "net.corda.data.flow.state.checkpoint",
"doc": "State used by the flow engine to track pipeline details and provide diagnostics.",
"fields": [
{
"name": "retryState",
"type": ["null", "net.corda.data.flow.state.checkpoint.RetryState"],
"default": null,
"doc": "Optional retry information for a failed flow event. Setting this field marks the flow as retrying."
},
{
"name": "maxFlowSleepDuration",
"type": "int",
"doc": "The maximum time a flow can sleep, before a Wakeup event is generated (milliseconds)"
},
{
"name": "pendingPlatformError",
"type": ["null", "net.corda.data.ExceptionEnvelope"],
"default": null,
"doc": "Used for platform generated errors reported back to user code."
},
{
"name": "cpkFileHashes",
"type": {
"type": "array",
"items": "net.corda.data.crypto.SecureHash"
},
"doc": "Array of stored cpkFileHashes from the Virtual Node."
}
]
}
""".trimIndent()

val oldSchema = Schema.Parser()
.addTypes(
mapOf(
SecureHash::class.java.name to SecureHash.`SCHEMA$`,
RetryState::class.java.name to RetryState.`SCHEMA$`,
ExceptionEnvelope::class.java.name to ExceptionEnvelope.`SCHEMA$`,
)
)
.parse(oldSchemaJson)

val newSchema = PipelineState.`SCHEMA$`
val compatibility = SchemaCompatibility.checkReaderWriterCompatibility(newSchema, oldSchema)

Assertions.assertEquals(
compatibility.type,
SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE,
"Failed due to incompatible change. ${compatibility.description}"
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"default": 16000
},
"maxFlowSleepDuration": {
"description": "The maximum delay in milliseconds before Corda schedules a periodic wake-up.",
"description": "Deprecated. The maximum delay in milliseconds before Corda schedules a periodic wake-up.",
"type": "integer",
"minimum": 1000,
"maximum": 2147483647,
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ cordaProductVersion = 5.2.0
# NOTE: update this each time this module contains a breaking change
## NOTE: currently this is a top level revision, so all API versions will line up, but this could be moved to
## a per module property in which case module versions can change independently.
cordaApiRevision = 25
cordaApiRevision = 26

# Main
kotlin.stdlib.default.dependency = false
Expand Down

0 comments on commit c23d34c

Please sign in to comment.