Skip to content

Commit

Permalink
Merge branch 'release/os/5.1' into saurabh/sync-syntax
Browse files Browse the repository at this point in the history
# Conflicts:
#	gradle.properties
  • Loading branch information
SaurabhR3 committed Sep 12, 2023
2 parents 5f42c93 + 17bb735 commit 521f1c3
Show file tree
Hide file tree
Showing 14 changed files with 345 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@
"type": "string",
"default": "{}",
"doc": "Internal storage for recording flow metrics"
},
{
"name": "customState",
"type": [
"null",
"net.corda.data.KeyValuePairList"
],
"default": null,
"doc": "Internal storage for pipeline extensions."
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"type": "record",
"name": "ScheduledTaskTrigger",
"namespace": "net.corda.data.scheduler",
"fields": [
{
"name": "name",
"type": "string",
"doc": "Name of the scheduled task."
},
{
"name": "timestamp",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
},
"doc": "Timestamp for the trigger."
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package net.corda.data.flow.state.checkpoint

import net.corda.data.KeyValuePairList
import org.apache.avro.Schema
import org.apache.avro.SchemaCompatibility
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

class CheckpointSchemaCompatibilityTest {

@Test
fun `Flow checkpoint schema changes between Corda 5_0 and 5_1 are compatible`() {
val oldSchemaJson = """
{
"type": "record",
"name": "Checkpoint",
"namespace": "net.corda.data.flow.state.checkpoint",
"doc": "Represents the current state of a flow, plus information required to operate the flow engine.",
"fields": [
{
"name": "flowId",
"type": "string",
"doc": "Internal, globally unique key for a flow instance."
},
{
"name": "initialPlatformVersion",
"type": "int",
"doc": "The platform version at the time the flow was started."
},
{
"name": "pipelineState",
"type": "net.corda.data.flow.state.checkpoint.PipelineState",
"doc": "State required by the pipeline, e.g. to support retries."
},
{
"name": "flowState",
"type": [
"null",
"net.corda.data.flow.state.checkpoint.FlowState"
],
"doc": "Current flow execution state. Null if the flow has not yet been started, for example in the face of a retry-able error."
},
{
"name": "flowMetricsState",
"type": "string",
"default": "{}",
"doc": "Internal storage for recording flow metrics"
}
]
}
""".trimIndent()

val oldSchema = Schema.Parser()
.addTypes(
mapOf(
PipelineState::class.java.name to PipelineState.`SCHEMA$`,
FlowState::class.java.name to FlowState.`SCHEMA$`,
KeyValuePairList::class.java.name to KeyValuePairList.`SCHEMA$`
)
)
.parse(oldSchemaJson)

val newSchema = Checkpoint.`SCHEMA$`

val compatibility = SchemaCompatibility.checkReaderWriterCompatibility(newSchema, oldSchema)

Assertions.assertEquals(
compatibility.type,
SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE,
"Failed due to incompatible change. ${compatibility.description}"
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ private FlowConfig() {
public static final String SESSION_P2P_TTL = "session.p2pTTL";
public static final String SESSION_FLOW_CLEANUP_TIME = "session.cleanupTime";
public static final String PROCESSING_MAX_RETRY_ATTEMPTS = "processing.maxRetryAttempts";
public static final String PROCESSING_MAX_RETRY_WINDOW_DURATION = "processing.maxRetryWindowDuration";
public static final String PROCESSING_MAX_RETRY_DELAY = "processing.maxRetryDelay";
public static final String PROCESSING_MAX_FLOW_SLEEP_DURATION = "processing.maxFlowSleepDuration";
public static final String PROCESSING_FLOW_CLEANUP_TIME = "processing.cleanupTime";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,31 @@ private Publisher() {
* producers to stay under this limit when publishing messages.
*/
public static final String MAX_ALLOWED_MSG_SIZE = "maxAllowedMessageSize";

/**
* State Manager Configuration for connecting to the underlying persistent storage.
*/
public static final class StateManager {
private StateManager() {
}

public static final String STATE_MANAGER = "stateManager";
public static final String TYPE = STATE_MANAGER + ".type";

// Database Values
public static final String DB_PROPERTIES = STATE_MANAGER + ".database";
public static final String JDBC_USER = DB_PROPERTIES + ".user";
public static final String JDBC_PASS = DB_PROPERTIES + ".pass";

public static final String JDBC_URL = DB_PROPERTIES + ".jdbc.url";
public static final String JDBC_DRIVER = DB_PROPERTIES + "jdbc.driver";
public static final String JDBC_DRIVER_DIRECTORY = DB_PROPERTIES + ".jdbc.directory";
public static final String JDBC_PERSISTENCE_UNIT_NAME = DB_PROPERTIES + ".jdbc.persistenceUnitName";
public static final String JDBC_POOL_MAX_SIZE = DB_PROPERTIES + ".pool.maxSize";
public static final String JDBC_POOL_MIN_SIZE = DB_PROPERTIES + ".pool.minSize";
public static final String JDBC_POOL_IDLE_TIMEOUT_SECONDS = DB_PROPERTIES + ".pool.idleTimeoutSeconds";
public static final String JDBC_POOL_MAX_LIFETIME_SECONDS = DB_PROPERTIES + ".pool.maxLifetimeSeconds";
public static final String JDBC_POOL_KEEP_ALIVE_TIME_SECONDS = DB_PROPERTIES + ".pool.keepAliveTimeSeconds";
public static final String JDBC_POOL_VALIDATION_TIMEOUT_SECONDS = DB_PROPERTIES + ".pool.validationTimeoutSeconds";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@
"maximum": 1000,
"default": 5
},
"maxRetryWindowDuration": {
"description": "The duration in milliseconds after a transient error that Corda retries a flow before failing it. A value of zero disables retries.",
"type": "integer",
"minimum": 0,
"maximum": 2147483647,
"default": 300000
},
"maxRetryDelay": {
"description": "The maximum delay in milliseconds before Corda schedules a retry.",
"type": "integer",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,37 @@
"default": 972800,
"minimum": 512000,
"maximum": 8388608
},
"stateManager": {
"description": "Connection details for the underlying persistent storage used by the out of process State Manager.",
"type": "object",
"properties": {
"type": {
"description": "The type of state manager implementation.",
"enum": [
"DATABASE"
]
},
"additionalProperties": false
},
"$comment": "Polymorphic state manager storage connection configuration. The valid section depends on which state manager implementation is in use.",
"allOf": [
{
"if": {
"properties": {"type": {"const": "DATABASE"}},
"required": ["type"]
},
"then": {
"properties": {
"databaseProperties": {
"description": "Settings to connect to the state manager database.",
"$ref": "state-manager-db-properties.json"
}
},
"required": ["type","databaseProperties"]
}
}
]
}
},
"additionalProperties": false
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
{
"$schema": "https://json-schema.org/draft/2019-09/schema",
"$id": "https://corda.r3.com/net/corda/schema/configuration/db/1.0/corda.db.json",
"title": "State Manager Database Configuration Schema",
"description": "Configuration schema for the database section. Note that this configuration cannot be updated dynamically through the REST endpoint.",
"type": "object",
"properties": {
"database": {
"description": "Database settings.",
"type": "object",
"default": {},
"properties": {
"user": {
"description": "The database user.",
"type": "string"
},
"pass": {
"description": "The database password.",
"type": "string"
},
"jdbc": {
"description": "JDBC settings.",
"type": "object",
"default": {},
"properties": {
"driver": {
"description": "The JDBC driver.",
"type": "string",
"default": "org.postgresql.Driver"
},
"directory": {
"description": "The directory that contains the JDBC drivers.",
"type": "string",
"default": "/opt/jdbc-driver"
},
"url": {
"description": "The JDBC URL.",
"type": "string",
"default": "jdbc:postgresql://state-manager-db:5432/state_manager"
},
"persistenceUnitName": {
"description": "The persistent unit name.",
"type": "string",
"default": "corda-state-manager"
}
},
"additionalProperties": false
},
"pool": {
"description": "Database pool settings.",
"type": "object",
"default": {},
"properties": {
"max_size": {
"description": "The maximum database pool size.",
"type": "integer",
"minimum": 1,
"default": 10
},
"min_size": {
"description": "The minimum database pool size. If left null will default to the `max_size` value.",
"default": null,
"anyOf": [
{
"type": "integer",
"minimum": 0
},
{
"type": "null"
}
]
},
"idleTimeoutSeconds": {
"description": "The maximum time (in seconds) a connection can stay idle in the pool. A value of 0 means that idle connections are never removed from the pool.",
"type": "integer",
"minimum": 0,
"default": 120
},
"maxLifetimeSeconds": {
"description": "The maximum time (in seconds) a connection can stay in the pool, regardless if it has been idle or has been recently used. If a connection is in-use and has reached \"maxLifetime\" timeout, it will be removed from the pool only when it becomes idle.",
"type": "integer",
"minimum": 1,
"default": 1800
},
"keepAliveTimeSeconds": {
"description": "The interval time (in seconds) in which connections will be tested for aliveness. Connections which are no longer alive are removed from the pool. A value of 0 means this check is disabled.",
"type": "integer",
"minimum": 0,
"default": 0
},
"validationTimeoutSeconds": {
"description": "The maximum time (in seconds) that the pool will wait for a connection to be validated as alive.",
"type": "integer",
"minimum": 1,
"default": 5
}
},
"additionalProperties": false
}
},
"additionalProperties": false
},
"default": {}
},
"additionalProperties": false
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,6 @@ private DbSchema() {
public static final String UNIQUENESS_STATE_DETAILS_TABLE = "uniqueness_state_details";
public static final String UNIQUENESS_TX_DETAILS_TABLE = "uniqueness_tx_details";
public static final String UNIQUENESS_REJECTED_TX_TABLE = "uniqueness_rejected_txs";

public static final String STATE_MANAGER_TABLE = "state";
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@
<include file="net/corda/db/schema/config/migration/static-network-creation-v1.0.xml"/>

<include file="net/corda/db/schema/config/migration/config-creation-v5.1.xml"/>
<include file="net/corda/db/schema/config/migration/scheduler-creation-v5.1.xml"/>
</databaseChangeLog>
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.3.xsd">

<property name="now" value="now()" dbms="hsqldb"/>
<property name="now" value="now()" dbms="postgresql"/>

<changeSet author="R3.Corda" id="scheduler-creation-v5.1">

<createTable tableName="task_scheduler_log">
<column name="task_name" type="VARCHAR(255)">
<constraints nullable="false"/>
</column>
<column name="last_scheduled" type="TIMESTAMP WITH TIME ZONE" defaultValueDate="${now}">
<constraints nullable="false"/>
</column>
<column name="scheduler_id" type="VARCHAR(255)">
<constraints nullable="false"/>
</column>
</createTable>

<addPrimaryKey columnNames="task_name" constraintName="task_scheduler_log_task_name_pk" tableName="task_scheduler_log"/>
</changeSet>
</databaseChangeLog>
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
<include file="net/corda/db/schema/statemanager/migration/state-manager-migration-v5.1.xml"/>
</databaseChangeLog>
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">

<property name="now" value="now()" dbms="postgresql"/>
<property name="json.column.type" value="JSONB" dbms="postgresql"/>
<property name="state.column.type" value="bytea" dbms="postgresql"/>

<!-- Please note that HSQLDB schema is supported for integration test purposes only. -->
<property name="now" value="now()" dbms="hsqldb"/>
<property name="json.column.type" value="CLOB" dbms="hsqldb"/>
<property name="state.column.type" value="binary" dbms="hsqldb"/>

<changeSet author="R3.Corda" id="state-manager-migration-v5.1">
<createTable tableName="state">
<column name="key" type="varchar(255)">
<constraints nullable="false"/>
</column>
<column name="value" type="${state.column.type}">
<constraints nullable="false"/>
</column>
<column name="version" type="INT">
<constraints nullable="false"/>
</column>
<column name="metadata" type="${json.column.type}">
<constraints nullable="false"/>
</column>
<column name="modified_time" type="TIMESTAMP" defaultValueComputed="CURRENT_TIMESTAMP">
<constraints nullable="false"/>
</column>
</createTable>
<addPrimaryKey columnNames="key" constraintName="state_key" tableName="state"/>
</changeSet>
</databaseChangeLog>
Loading

0 comments on commit 521f1c3

Please sign in to comment.