Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialise workflow task token manager #789

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ import { getOraDecompressionManagerStackProps } from './stacks/oraDecompressionP
import { getPgDDProps } from './stacks/pgDD';
import { getDataMigrateStackProps } from './stacks/dataMigrate';
import { getHtsgetProps } from './stacks/htsget';
import {
getWorkflowTaskTokenManagerStackProps,
getWorkflowTaskTokenManagerTableStackProps,
} from './stacks/workflowTaskTokenManager';

interface EnvironmentConfig {
name: string;
Expand Down Expand Up @@ -106,6 +110,7 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null
pierianDxPipelineTableStackProps: getPierianDxPipelineTableStackProps(),
oncoanalyserPipelineTableStackProps: getOncoanalyserPipelineTableStackProps(),
sashPipelineTableStackProps: getSashPipelineTableStackProps(),
workflowTaskTokenTableStackProps: getWorkflowTaskTokenManagerTableStackProps(),
},
statelessConfig: {
metadataManagerStackProps: getMetadataManagerStackProps(stage),
Expand Down Expand Up @@ -136,6 +141,7 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null
dataMigrateProps: getDataMigrateStackProps(stage),
htsgetProps: getHtsgetProps(stage),
pgDDProps: getPgDDProps(stage),
workflowTaskTokenManagerStackProps: getWorkflowTaskTokenManagerStackProps(),
},
};

Expand Down
4 changes: 4 additions & 0 deletions config/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -886,3 +886,7 @@ export const oraDecompressionIcav2ReadyEventSource = 'orcabus.workflowmanager';
export const oraDecompressionIcav2EventSource = 'orcabus.oradecompression';
export const oraDecompressionIcav2EventDetailType = 'FastqListRowDecompressed';
export const oraDecompressionStateMachinePrefix = 'oraDecompressionSfn';
/*
Workflow Task Token Manager stack
*/
export const workflowTaskTokenManagerDynamodbTableName = 'workflowTaskTokenManagerDynamoDBTable';
19 changes: 19 additions & 0 deletions config/stacks/workflowTaskTokenManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { AppStage, eventBusName, workflowTaskTokenManagerDynamodbTableName } from '../constants';
import { WorkflowTaskTokenManagerConfig } from '../../lib/workload/stateless/stacks/workflow-task-token-manager/deploy';
import { WorkflowTaskTokenTableConfig } from '../../lib/workload/stateful/stacks/workflow-task-token-manager-dynamo-db/deploy';

// Stateful
export const getWorkflowTaskTokenManagerTableStackProps = (): WorkflowTaskTokenTableConfig => {
return {
dynamodbTableName: workflowTaskTokenManagerDynamodbTableName,
};
};

// Stateless
export const getWorkflowTaskTokenManagerStackProps = (): WorkflowTaskTokenManagerConfig => {
return {
dynamodbTableName: workflowTaskTokenManagerDynamodbTableName,
eventBusName: eventBusName,
stateMachinePrefix: 'workflow-task-token',
};
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import { DynamodbPartitionedPipelineConstruct } from '../../../../components/dynamodb-partitioned-table';

export interface WorkflowTaskTokenTableConfig {
dynamodbTableName: string;
}

export type WorkflowTaskTokenTableStackProps = WorkflowTaskTokenTableConfig & cdk.StackProps;

export class WorkflowTaskTokenTable extends cdk.Stack {
constructor(scope: Construct, id: string, props: WorkflowTaskTokenTableStackProps) {
super(scope, id, props);

/*
Initialise dynamodb table with id and sort keys
*/
new DynamodbPartitionedPipelineConstruct(this, props.dynamodbTableName, {
tableName: props.dynamodbTableName,
});
}
}
15 changes: 15 additions & 0 deletions lib/workload/stateful/statefulStackCollectionClass.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ import {
OraCompressionIcav2PipelineTable,
OraCompressionIcav2PipelineTableStackProps,
} from './stacks/ora-decompression-dynamodb/deploy/stack';
import {
WorkflowTaskTokenTable,
WorkflowTaskTokenTableStackProps,
} from './stacks/workflow-task-token-manager-dynamo-db/deploy';

export interface StatefulStackCollectionProps {
dataBucketStackProps: DataBucketStackProps;
Expand All @@ -86,6 +90,7 @@ export interface StatefulStackCollectionProps {
pierianDxPipelineTableStackProps: PierianDxPipelineTableStackProps;
oncoanalyserPipelineTableStackProps: OncoanalyserNfPipelineTableStackProps;
sashPipelineTableStackProps: SashNfPipelineTableStackProps;
workflowTaskTokenTableStackProps: WorkflowTaskTokenTableStackProps;
}

export class StatefulStackCollection {
Expand All @@ -110,6 +115,7 @@ export class StatefulStackCollection {
readonly pierianDxPipelineTableStack: Stack;
readonly oncoanalyserPipelineTableStack: Stack;
readonly sashPipelineTableStack: Stack;
readonly workflowTaskTokenTableStack: Stack;

constructor(
scope: Construct,
Expand Down Expand Up @@ -258,6 +264,15 @@ export class StatefulStackCollection {
...this.createTemplateProps(env, 'SashNfPipelineTableStack'),
...statefulConfiguration.sashPipelineTableStackProps,
});

this.workflowTaskTokenTableStack = new WorkflowTaskTokenTable(
scope,
'WorkflowTaskTokenTableStack',
{
...this.createTemplateProps(env, 'WorkflowTaskTokenTableStack'),
...statefulConfiguration.workflowTaskTokenTableStackProps,
}
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Workflow Task Token Manager

This is a simple set of step functions:

## Service 1: TaskTokenManager

1. Listens to any 'WorkflowRunStateChangeSync' event from the OrcaBus
2. Splits the task token out from the event details and stores it in a DynamoDB table indexed by the portal run id
3. Publishes a WorkflowRunStateChange event, with the task token removed

## Service 2: Send Task Token Success Events

1. Listens to any 'WorkflowRunStateChange' event with a terminal `Status` from the OrcaBus
2. Looks up the portal run id in the DynamoDB table to get the task token (if it exists)
3. Sends either a `TaskTokenSuccess` or `TaskTokenFailure` event

Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
/*
Workflow Step Functions Sync Manager

Allow a step function to wait while a workflow completes

*/

import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as events from 'aws-cdk-lib/aws-events';
import path from 'path';
import { IEventBus } from 'aws-cdk-lib/aws-events';
import { StateMachine } from 'aws-cdk-lib/aws-stepfunctions';
import * as eventsTargets from 'aws-cdk-lib/aws-events-targets';
import * as events_targets from 'aws-cdk-lib/aws-events-targets';

export interface WorkflowTaskTokenManagerConfig {
/*
Tables
*/
dynamodbTableName: string;

/*
Event handling
*/
eventBusName: string;

/*
Names for statemachines
*/
stateMachinePrefix: string;
}

export type WorkflowTaskTokenManagerStackProps = WorkflowTaskTokenManagerConfig & cdk.StackProps;

export class WorkflowTaskTokenManagerStack extends cdk.Stack {
private globals = {
source: 'orcabus.workflowsync',
triggerDetailType: 'WorkflowRunStateChangeSync',
outputDetailType: 'WorkflowRunStateChange',
tablePartitionName: 'portal_run_id_task_token',
};

private createLaunchWorkflowRunStateChangeStepFunction(
eventBusObj: IEventBus,
tableObj: dynamodb.ITableV2,
stateMachineName: string
): StateMachine {
/*
Build the state machine
*/
const stateMachineObj = new sfn.StateMachine(this, stateMachineName, {
stateMachineName: stateMachineName,
definitionBody: sfn.DefinitionBody.fromFile(
path.join(
__dirname,
'../step_functions_templates/launch_workflow_run_state_change_event.asl.json'
)
),
definitionSubstitutions: {
/* Table */
__table_name__: tableObj.tableName,
__portal_run_id_table_partition_name__: this.globals.tablePartitionName,
/* Events */
__detail_type__: this.globals.triggerDetailType,
__event_bus_name__: eventBusObj.eventBusName,
__source__: this.globals.source,
},
});

// Give state machine permissions to read/write to db
tableObj.grantReadWriteData(stateMachineObj);

// Give state machine permissions to put events to event bus
eventBusObj.grantPutEventsTo(stateMachineObj);

// Return state machine
return stateMachineObj;
}

private createSendTaskTokenStepFunction(
eventBusObj: IEventBus,
tableObj: dynamodb.ITableV2,
stateMachineName: string
): StateMachine {
const stateMachineObj = new sfn.StateMachine(this, stateMachineName, {
stateMachineName: stateMachineName,
definitionBody: sfn.DefinitionBody.fromFile(
path.join(__dirname, '../step_functions_templates/send_task_token.asl.json')
),
definitionSubstitutions: {
/* Table */
__table_name__: tableObj.tableName,
__portal_run_id_table_partition_name__: this.globals.tablePartitionName,
/* Events */
__detail_type__: this.globals.triggerDetailType,
__event_bus_name__: eventBusObj.eventBusName,
__source__: this.globals.source,
},
});

// Grant permissions to read/write data
tableObj.grantReadWriteData(stateMachineObj);

// Return the state machine object
return stateMachineObj;
}

private createRuleForWorkflowRunStateChangeSyncEvents(
eventBusObj: events.IEventBus,
ruleName: string
): events.Rule {
/*
Can't use $or over detailType and detail so use two rules instead
*/
return new events.Rule(this, ruleName, {
eventBus: eventBusObj,
ruleName: ruleName,
eventPattern: {
detailType: [this.globals.triggerDetailType],
detail: {
portalRunId: { exists: true },
taskToken: { exists: true },
},
},
});
}

private createRuleForWorkflowRunStateChangeEvents(
eventBusObj: events.IEventBus,
ruleName: string
): events.Rule {
/*
Can't use $or over detailType and detail so use two rules instead
*/
return new events.Rule(this, ruleName, {
eventBus: eventBusObj,
ruleName: ruleName,
eventPattern: {
detailType: [this.globals.outputDetailType],
detail: {
portalRunId: { exists: true },
// One of SUCCEEDED, ABORTED, FAILED, DEPRECATED
status: [
{ 'equals-ignore-case': 'SUCCEEDED' },
{ 'equals-ignore-case': 'FAILED' },
{ 'equals-ignore-case': 'ABORTED' },
{ 'equals-ignore-case': 'DEPRECATED' },
],
},
},
});
}

constructor(scope: Construct, id: string, props: WorkflowTaskTokenManagerStackProps) {
super(scope, id, props);

// Get dynamodb table for construct
const tableObj = dynamodb.TableV2.fromTableName(this, 'tableObj', props.dynamodbTableName);

// Get the event bus object
const eventBusObj = events.EventBus.fromEventBusName(this, 'event_bus', props.eventBusName);

// Add launch workflow run state change step function object
const launchWorkflowRunStateChangeSfnObj = this.createLaunchWorkflowRunStateChangeStepFunction(
eventBusObj,
tableObj,
`${props.stateMachinePrefix}-launch-state-machine-sfn`
);

// Add send task token state change step function object
const sendTaskTokenStateChangeSfnObj = this.createSendTaskTokenStepFunction(
eventBusObj,
tableObj,
`${props.stateMachinePrefix}-send-task-token-sfn`
);

// Rules
const workflowRunStateChangeLaunchRule = this.createRuleForWorkflowRunStateChangeSyncEvents(
eventBusObj,
'workflow-sync-launch-wrsc-rule'
);

const sendTaskTokenLaunchRule = this.createRuleForWorkflowRunStateChangeEvents(
eventBusObj,
'workflow-sync-send-task-token-event-rule'
);

// Add targets to rules
workflowRunStateChangeLaunchRule.addTarget(
new eventsTargets.SfnStateMachine(launchWorkflowRunStateChangeSfnObj, {
input: events.RuleTargetInput.fromEventPath('$.detail'),
})
);

// Add the target to the task token rule
sendTaskTokenLaunchRule.addTarget(
new events_targets.SfnStateMachine(sendTaskTokenStateChangeSfnObj, {
input: events.RuleTargetInput.fromEventPath('$.detail'),
})
);
}
}
Loading