diff --git a/config/config.ts b/config/config.ts index 98a63883f..ec3432a14 100644 --- a/config/config.ts +++ b/config/config.ts @@ -66,6 +66,10 @@ import { getOraDecompressionManagerStackProps } from './stacks/oraDecompressionP import { getPgDDProps } from './stacks/pgDD'; import { getDataMigrateStackProps } from './stacks/dataMigrate'; import { getHtsgetProps } from './stacks/htsget'; +import { + getWorkflowTaskTokenManagerStackProps, + getWorkflowTaskTokenManagerTableStackProps, +} from './stacks/workflowTaskTokenManager'; interface EnvironmentConfig { name: string; @@ -106,6 +110,7 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null pierianDxPipelineTableStackProps: getPierianDxPipelineTableStackProps(), oncoanalyserPipelineTableStackProps: getOncoanalyserPipelineTableStackProps(), sashPipelineTableStackProps: getSashPipelineTableStackProps(), + workflowTaskTokenTableStackProps: getWorkflowTaskTokenManagerTableStackProps(), }, statelessConfig: { metadataManagerStackProps: getMetadataManagerStackProps(stage), @@ -136,6 +141,7 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null dataMigrateProps: getDataMigrateStackProps(stage), htsgetProps: getHtsgetProps(stage), pgDDProps: getPgDDProps(stage), + workflowTaskTokenManagerStackProps: getWorkflowTaskTokenManagerStackProps(), }, }; diff --git a/config/constants.ts b/config/constants.ts index 23de60031..f00e7b133 100644 --- a/config/constants.ts +++ b/config/constants.ts @@ -886,3 +886,7 @@ export const oraDecompressionIcav2ReadyEventSource = 'orcabus.workflowmanager'; export const oraDecompressionIcav2EventSource = 'orcabus.oradecompression'; export const oraDecompressionIcav2EventDetailType = 'FastqListRowDecompressed'; export const oraDecompressionStateMachinePrefix = 'oraDecompressionSfn'; +/* +Workflow Task Token Manager stack +*/ +export const workflowTaskTokenManagerDynamodbTableName = 'workflowTaskTokenManagerDynamoDBTable'; diff --git a/config/stacks/workflowTaskTokenManager.ts b/config/stacks/workflowTaskTokenManager.ts new file mode 100644 index 000000000..b09c494b4 --- /dev/null +++ b/config/stacks/workflowTaskTokenManager.ts @@ -0,0 +1,19 @@ +import { AppStage, eventBusName, workflowTaskTokenManagerDynamodbTableName } from '../constants'; +import { WorkflowTaskTokenManagerConfig } from '../../lib/workload/stateless/stacks/workflow-task-token-manager/deploy'; +import { WorkflowTaskTokenTableConfig } from '../../lib/workload/stateful/stacks/workflow-task-token-manager-dynamo-db/deploy'; + +// Stateful +export const getWorkflowTaskTokenManagerTableStackProps = (): WorkflowTaskTokenTableConfig => { + return { + dynamodbTableName: workflowTaskTokenManagerDynamodbTableName, + }; +}; + +// Stateless +export const getWorkflowTaskTokenManagerStackProps = (): WorkflowTaskTokenManagerConfig => { + return { + dynamodbTableName: workflowTaskTokenManagerDynamodbTableName, + eventBusName: eventBusName, + stateMachinePrefix: 'workflow-task-token', + }; +}; diff --git a/lib/workload/stateful/stacks/workflow-task-token-manager-dynamo-db/deploy/index.ts b/lib/workload/stateful/stacks/workflow-task-token-manager-dynamo-db/deploy/index.ts new file mode 100644 index 000000000..dfb37c97d --- /dev/null +++ b/lib/workload/stateful/stacks/workflow-task-token-manager-dynamo-db/deploy/index.ts @@ -0,0 +1,22 @@ +import * as cdk from 'aws-cdk-lib'; +import { Construct } from 'constructs'; +import { DynamodbPartitionedPipelineConstruct } from '../../../../components/dynamodb-partitioned-table'; + +export interface WorkflowTaskTokenTableConfig { + dynamodbTableName: string; +} + +export type WorkflowTaskTokenTableStackProps = WorkflowTaskTokenTableConfig & cdk.StackProps; + +export class WorkflowTaskTokenTable extends cdk.Stack { + constructor(scope: Construct, id: string, props: WorkflowTaskTokenTableStackProps) { + super(scope, id, props); + + /* + Initialise dynamodb table with id and sort keys + */ + new DynamodbPartitionedPipelineConstruct(this, props.dynamodbTableName, { + tableName: props.dynamodbTableName, + }); + } +} diff --git a/lib/workload/stateful/statefulStackCollectionClass.ts b/lib/workload/stateful/statefulStackCollectionClass.ts index a47b3dcf6..752ed4bb3 100644 --- a/lib/workload/stateful/statefulStackCollectionClass.ts +++ b/lib/workload/stateful/statefulStackCollectionClass.ts @@ -65,6 +65,10 @@ import { OraCompressionIcav2PipelineTable, OraCompressionIcav2PipelineTableStackProps, } from './stacks/ora-decompression-dynamodb/deploy/stack'; +import { + WorkflowTaskTokenTable, + WorkflowTaskTokenTableStackProps, +} from './stacks/workflow-task-token-manager-dynamo-db/deploy'; export interface StatefulStackCollectionProps { dataBucketStackProps: DataBucketStackProps; @@ -86,6 +90,7 @@ export interface StatefulStackCollectionProps { pierianDxPipelineTableStackProps: PierianDxPipelineTableStackProps; oncoanalyserPipelineTableStackProps: OncoanalyserNfPipelineTableStackProps; sashPipelineTableStackProps: SashNfPipelineTableStackProps; + workflowTaskTokenTableStackProps: WorkflowTaskTokenTableStackProps; } export class StatefulStackCollection { @@ -110,6 +115,7 @@ export class StatefulStackCollection { readonly pierianDxPipelineTableStack: Stack; readonly oncoanalyserPipelineTableStack: Stack; readonly sashPipelineTableStack: Stack; + readonly workflowTaskTokenTableStack: Stack; constructor( scope: Construct, @@ -258,6 +264,15 @@ export class StatefulStackCollection { ...this.createTemplateProps(env, 'SashNfPipelineTableStack'), ...statefulConfiguration.sashPipelineTableStackProps, }); + + this.workflowTaskTokenTableStack = new WorkflowTaskTokenTable( + scope, + 'WorkflowTaskTokenTableStack', + { + ...this.createTemplateProps(env, 'WorkflowTaskTokenTableStack'), + ...statefulConfiguration.workflowTaskTokenTableStackProps, + } + ); } /** diff --git a/lib/workload/stateless/stacks/workflow-task-token-manager/Readme.md b/lib/workload/stateless/stacks/workflow-task-token-manager/Readme.md new file mode 100644 index 000000000..1833dfe6f --- /dev/null +++ b/lib/workload/stateless/stacks/workflow-task-token-manager/Readme.md @@ -0,0 +1,16 @@ +# Workflow Task Token Manager + +This is a simple set of step functions: + +## Service 1: TaskTokenManager + +1. Listens to any 'WorkflowRunStateChangeSync' event from the OrcaBus +2. Splits the task token out from the event details and stores it in a DynamoDB table indexed by the portal run id +3. Publishes a WorkflowRunStateChange event, with the task token removed + +## Service 2: Send Task Token Success Events + +1. Listens to any 'WorkflowRunStateChange' event with a terminal `Status` from the OrcaBus +2. Looks up the portal run id in the DynamoDB table to get the task token (if it exists) +3. Sends either a `TaskTokenSuccess` or `TaskTokenFailure` event + diff --git a/lib/workload/stateless/stacks/workflow-task-token-manager/deploy/index.ts b/lib/workload/stateless/stacks/workflow-task-token-manager/deploy/index.ts new file mode 100644 index 000000000..e6639dff4 --- /dev/null +++ b/lib/workload/stateless/stacks/workflow-task-token-manager/deploy/index.ts @@ -0,0 +1,205 @@ +/* +Workflow Step Functions Sync Manager + +Allow a step function to wait while a workflow completes + +*/ + +import * as cdk from 'aws-cdk-lib'; +import { Construct } from 'constructs'; +import * as dynamodb from 'aws-cdk-lib/aws-dynamodb'; +import * as sfn from 'aws-cdk-lib/aws-stepfunctions'; +import * as events from 'aws-cdk-lib/aws-events'; +import path from 'path'; +import { IEventBus } from 'aws-cdk-lib/aws-events'; +import { StateMachine } from 'aws-cdk-lib/aws-stepfunctions'; +import * as eventsTargets from 'aws-cdk-lib/aws-events-targets'; +import * as events_targets from 'aws-cdk-lib/aws-events-targets'; + +export interface WorkflowTaskTokenManagerConfig { + /* + Tables + */ + dynamodbTableName: string; + + /* + Event handling + */ + eventBusName: string; + + /* + Names for statemachines + */ + stateMachinePrefix: string; +} + +export type WorkflowTaskTokenManagerStackProps = WorkflowTaskTokenManagerConfig & cdk.StackProps; + +export class WorkflowTaskTokenManagerStack extends cdk.Stack { + private globals = { + source: 'orcabus.workflowsync', + triggerDetailType: 'WorkflowRunStateChangeSync', + outputDetailType: 'WorkflowRunStateChange', + tablePartitionName: 'portal_run_id_task_token', + }; + + private createLaunchWorkflowRunStateChangeStepFunction( + eventBusObj: IEventBus, + tableObj: dynamodb.ITableV2, + stateMachineName: string + ): StateMachine { + /* + Build the state machine + */ + const stateMachineObj = new sfn.StateMachine(this, stateMachineName, { + stateMachineName: stateMachineName, + definitionBody: sfn.DefinitionBody.fromFile( + path.join( + __dirname, + '../step_functions_templates/launch_workflow_run_state_change_event.asl.json' + ) + ), + definitionSubstitutions: { + /* Table */ + __table_name__: tableObj.tableName, + __portal_run_id_table_partition_name__: this.globals.tablePartitionName, + /* Events */ + __detail_type__: this.globals.triggerDetailType, + __event_bus_name__: eventBusObj.eventBusName, + __source__: this.globals.source, + }, + }); + + // Give state machine permissions to read/write to db + tableObj.grantReadWriteData(stateMachineObj); + + // Give state machine permissions to put events to event bus + eventBusObj.grantPutEventsTo(stateMachineObj); + + // Return state machine + return stateMachineObj; + } + + private createSendTaskTokenStepFunction( + eventBusObj: IEventBus, + tableObj: dynamodb.ITableV2, + stateMachineName: string + ): StateMachine { + const stateMachineObj = new sfn.StateMachine(this, stateMachineName, { + stateMachineName: stateMachineName, + definitionBody: sfn.DefinitionBody.fromFile( + path.join(__dirname, '../step_functions_templates/send_task_token.asl.json') + ), + definitionSubstitutions: { + /* Table */ + __table_name__: tableObj.tableName, + __portal_run_id_table_partition_name__: this.globals.tablePartitionName, + /* Events */ + __detail_type__: this.globals.triggerDetailType, + __event_bus_name__: eventBusObj.eventBusName, + __source__: this.globals.source, + }, + }); + + // Grant permissions to read/write data + tableObj.grantReadWriteData(stateMachineObj); + + // Return the state machine object + return stateMachineObj; + } + + private createRuleForWorkflowRunStateChangeSyncEvents( + eventBusObj: events.IEventBus, + ruleName: string + ): events.Rule { + /* + Can't use $or over detailType and detail so use two rules instead + */ + return new events.Rule(this, ruleName, { + eventBus: eventBusObj, + ruleName: ruleName, + eventPattern: { + detailType: [this.globals.triggerDetailType], + detail: { + portalRunId: { exists: true }, + taskToken: { exists: true }, + }, + }, + }); + } + + private createRuleForWorkflowRunStateChangeEvents( + eventBusObj: events.IEventBus, + ruleName: string + ): events.Rule { + /* + Can't use $or over detailType and detail so use two rules instead + */ + return new events.Rule(this, ruleName, { + eventBus: eventBusObj, + ruleName: ruleName, + eventPattern: { + detailType: [this.globals.outputDetailType], + detail: { + portalRunId: { exists: true }, + // One of SUCCEEDED, ABORTED, FAILED, DEPRECATED + status: [ + { 'equals-ignore-case': 'SUCCEEDED' }, + { 'equals-ignore-case': 'FAILED' }, + { 'equals-ignore-case': 'ABORTED' }, + { 'equals-ignore-case': 'DEPRECATED' }, + ], + }, + }, + }); + } + + constructor(scope: Construct, id: string, props: WorkflowTaskTokenManagerStackProps) { + super(scope, id, props); + + // Get dynamodb table for construct + const tableObj = dynamodb.TableV2.fromTableName(this, 'tableObj', props.dynamodbTableName); + + // Get the event bus object + const eventBusObj = events.EventBus.fromEventBusName(this, 'event_bus', props.eventBusName); + + // Add launch workflow run state change step function object + const launchWorkflowRunStateChangeSfnObj = this.createLaunchWorkflowRunStateChangeStepFunction( + eventBusObj, + tableObj, + `${props.stateMachinePrefix}-launch-state-machine-sfn` + ); + + // Add send task token state change step function object + const sendTaskTokenStateChangeSfnObj = this.createSendTaskTokenStepFunction( + eventBusObj, + tableObj, + `${props.stateMachinePrefix}-send-task-token-sfn` + ); + + // Rules + const workflowRunStateChangeLaunchRule = this.createRuleForWorkflowRunStateChangeSyncEvents( + eventBusObj, + 'workflow-sync-launch-wrsc-rule' + ); + + const sendTaskTokenLaunchRule = this.createRuleForWorkflowRunStateChangeEvents( + eventBusObj, + 'workflow-sync-send-task-token-event-rule' + ); + + // Add targets to rules + workflowRunStateChangeLaunchRule.addTarget( + new eventsTargets.SfnStateMachine(launchWorkflowRunStateChangeSfnObj, { + input: events.RuleTargetInput.fromEventPath('$.detail'), + }) + ); + + // Add the target to the task token rule + sendTaskTokenLaunchRule.addTarget( + new events_targets.SfnStateMachine(sendTaskTokenStateChangeSfnObj, { + input: events.RuleTargetInput.fromEventPath('$.detail'), + }) + ); + } +} diff --git a/lib/workload/stateless/stacks/workflow-task-token-manager/step_functions_templates/launch_workflow_run_state_change_event.asl.json b/lib/workload/stateless/stacks/workflow-task-token-manager/step_functions_templates/launch_workflow_run_state_change_event.asl.json new file mode 100644 index 000000000..bbd902077 --- /dev/null +++ b/lib/workload/stateless/stacks/workflow-task-token-manager/step_functions_templates/launch_workflow_run_state_change_event.asl.json @@ -0,0 +1,61 @@ +{ + "QueryLanguage": "JSONata", + "Comment": "A description of my state machine", + "StartAt": "Set variables from inputs", + "States": { + "Set variables from inputs": { + "Type": "Pass", + "Next": "Launch event and write to db", + "Assign": { + "portal_run_id": "{% $states.input.portalRunId %}", + "task_token": "{% $states.input.taskToken %}" + } + }, + "Launch event and write to db": { + "Type": "Parallel", + "Branches": [ + { + "StartAt": "Save Portal Run ID and Task Token", + "States": { + "Save Portal Run ID and Task Token": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:putItem", + "Arguments": { + "TableName": "${__table_name__}", + "Item": { + "id": "{% $portal_run_id %}", + "id_type": "${__portal_run_id_table_partition_name__}", + "task_token": { + "S": "{% $task_token %}" + } + } + }, + "End": true + } + } + }, + { + "StartAt": "Launch Workflow Run State Change Event", + "States": { + "Launch Workflow Run State Change Event": { + "Type": "Task", + "Resource": "arn:aws:states:::events:putEvents", + "Arguments": { + "Entries": [ + { + "Detail": "{% (\n /*\n Remove TaskToken from Detail before providing detail object\n */\n $each(\n $states.input, \n function($v, $k){\n $k = \"taskToken\" ? {} : {$k: $v}\n }\n ) ~> $merge\n) %}", + "DetailType": "${__detail_type__}", + "EventBusName": "${__event_bus_name__}", + "Source": "${__source__}" + } + ] + }, + "End": true + } + } + } + ], + "End": true + } + } +} diff --git a/lib/workload/stateless/stacks/workflow-task-token-manager/step_functions_templates/send_task_token.asl.json b/lib/workload/stateless/stacks/workflow-task-token-manager/step_functions_templates/send_task_token.asl.json new file mode 100644 index 000000000..f03d2558f --- /dev/null +++ b/lib/workload/stateless/stacks/workflow-task-token-manager/step_functions_templates/send_task_token.asl.json @@ -0,0 +1,90 @@ +{ + "QueryLanguage": "JSONata", + "Comment": "A description of my state machine", + "StartAt": "Get Inputs from Payload", + "States": { + "Get Inputs from Payload": { + "Type": "Pass", + "Next": "Get Portal Run ID from DB", + "Assign": { + "portal_run_id": "{% $states.input.portalRunId %}", + "status": "{% $states.input.status %}" + } + }, + "Get Portal Run ID from DB": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:getItem", + "Arguments": { + "TableName": "${__table_name__}", + "Key": { + "id": "{% $portal_run_id %}", + "id_type": "${__portal_run_id_table_partition_name__}" + } + }, + "Next": "Portal Run ID in DB", + "Assign": { + "portal_run_id_in_db": "{% $states.result.Item != null ? true : false %}", + "task_token": "{% $states.result.Item != null ? $states.result.Item.task_token.S : null %}" + } + }, + "Portal Run ID in DB": { + "Type": "Choice", + "Choices": [ + { + "Next": "Workflow Run Status", + "Condition": "{% $portal_run_id_in_db %}", + "Comment": "Portal Run ID in DataBase" + } + ], + "Default": "Not in DB" + }, + "Workflow Run Status": { + "Type": "Choice", + "Choices": [ + { + "Next": "Send Task Success", + "Condition": "{% $status = 'SUCCEEDED' %}", + "Comment": "Workflow Run Succeeded" + } + ], + "Default": "Send Task Failure" + }, + "Send Task Success": { + "Type": "Task", + "Arguments": { + "Output": { + "status": "{% $status %}" + }, + "TaskToken": "{% $task_token %}" + }, + "Resource": "arn:aws:states:::aws-sdk:sfn:sendTaskSuccess", + "Next": "Delete Portal Run ID from Database" + }, + "Delete Portal Run ID from Database": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:deleteItem", + "Arguments": { + "TableName": "${__table_name__}", + "Key": { + "id": "{% $portal_run_id %}", + "id_type": "${__portal_run_id_table_partition_name__}" + } + }, + "End": true + }, + "Send Task Failure": { + "Type": "Task", + "Arguments": { + "Output": { + "status": "{% $status %}" + }, + "TaskToken": "{% $task_token %}" + }, + "Resource": "arn:aws:states:::aws-sdk:sfn:sendTaskFailure", + "Next": "Delete Portal Run ID from Database" + }, + "Not in DB": { + "Type": "Succeed" + } + } +} diff --git a/lib/workload/stateless/statelessStackCollectionClass.ts b/lib/workload/stateless/statelessStackCollectionClass.ts index b0ac853d4..269b566ee 100644 --- a/lib/workload/stateless/statelessStackCollectionClass.ts +++ b/lib/workload/stateless/statelessStackCollectionClass.ts @@ -82,6 +82,11 @@ import { import { PgDDStack, PgDDStackProps } from './stacks/pg-dd/deploy/stack'; import { DataMigrateStack, DataMigrateStackProps } from './stacks/data-migrate/deploy/stack'; import { HtsgetStack, HtsgetStackConfigurableProps } from './stacks/htsget/stack'; +import { getWorkflowTaskTokenManagerTableStackProps } from '../../../config/stacks/workflowTaskTokenManager'; +import { + WorkflowTaskTokenManagerStack, + WorkflowTaskTokenManagerStackProps, +} from './stacks/workflow-task-token-manager/deploy'; export interface StatelessStackCollectionProps { metadataManagerStackProps: MetadataManagerStackProps; @@ -110,6 +115,7 @@ export interface StatelessStackCollectionProps { dataMigrateProps: DataMigrateStackProps; htsgetProps: HtsgetStackConfigurableProps; pgDDProps?: PgDDStackProps; + workflowTaskTokenManagerStackProps: WorkflowTaskTokenManagerStackProps; } export class StatelessStackCollection { @@ -140,6 +146,7 @@ export class StatelessStackCollection { readonly dataMigrate: Stack; readonly htsgetStack: Stack; readonly pgDDStack: Stack; + readonly workflowTaskTokenManagerStack: Stack; constructor( scope: Construct, @@ -334,6 +341,14 @@ export class StatelessStackCollection { ...statelessConfiguration.pgDDProps, }); } + this.workflowTaskTokenManagerStack = new WorkflowTaskTokenManagerStack( + scope, + 'WorkflowTaskTokenManagerStack', + { + ...this.createTemplateProps(env, 'WorkflowTaskTokenManagerStack'), + ...statelessConfiguration.workflowTaskTokenManagerStackProps, + } + ); } /**