Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(srm) update SRM to handle BSSH events from orcabus ICAv2 event pipe #748

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ Hot-deploy against dev:
export AWS_PROFILE=umccr-dev-admin

yarn cdk-stateless list
yarn cdk-stateless synth -e OrcaBusStatelessPipeline/BetaDeployment/SequenceRunManagerStack
yarn cdk-stateless diff -e OrcaBusStatelessPipeline/BetaDeployment/SequenceRunManagerStack
yarn cdk-stateless deploy -e OrcaBusStatelessPipeline/BetaDeployment/SequenceRunManagerStack
yarn cdk-stateless destroy -e OrcaBusStatelessPipeline/BetaDeployment/SequenceRunManagerStack
yarn cdk-stateless synth -e OrcaBusStatelessPipeline/OrcaBusBeta/SequenceRunManagerStack
yarn cdk-stateless diff -e OrcaBusStatelessPipeline/OrcaBusBeta/SequenceRunManagerStack
yarn cdk-stateless deploy -e OrcaBusStatelessPipeline/OrcaBusBeta/SequenceRunManagerStack
yarn cdk-stateless destroy -e OrcaBusStatelessPipeline/OrcaBusBeta/SequenceRunManagerStack
```

CloudFormation template:
Expand Down
53 changes: 37 additions & 16 deletions lib/workload/stateless/stacks/sequence-run-manager/deploy/stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import * as cdk from 'aws-cdk-lib';
import { aws_lambda, aws_secretsmanager, Duration, Stack } from 'aws-cdk-lib';
import { Construct } from 'constructs';
import { ISecurityGroup, IVpc, SecurityGroup, Vpc, VpcLookupOptions } from 'aws-cdk-lib/aws-ec2';
import { EventBus, IEventBus } from 'aws-cdk-lib/aws-events';
import { EventBus, IEventBus, Rule } from 'aws-cdk-lib/aws-events';
import { LambdaFunction } from 'aws-cdk-lib/aws-events-targets';
import { PythonFunction, PythonLayerVersion } from '@aws-cdk/aws-lambda-python-alpha';
import { HttpLambdaIntegration } from 'aws-cdk-lib/aws-apigatewayv2-integrations';
import { HttpMethod, HttpRoute, HttpRouteKey } from 'aws-cdk-lib/aws-apigatewayv2';
Expand Down Expand Up @@ -133,27 +134,47 @@ export class SequenceRunManagerStack extends Stack {
*/
const procSqsFn = this.createPythonFunction('ProcHandler', {
index: 'sequence_run_manager_proc/lambdas/bssh_event.py',
handler: 'sqs_handler',
handler: 'event_handler',
timeout: Duration.minutes(2),
memorySize: 512,
reservedConcurrentExecutions: 1,
});

this.mainBus.grantPutEventsTo(procSqsFn);
// this.setupEventRule(procSqsFn); // TODO comment this out for now
this.setupEventRule(procSqsFn); // TODO comment this out for now
}

// private setupEventRule(fn: aws_lambda.Function) {
// const eventRule = new Rule(this, this.id + 'EventRule', {
// ruleName: this.id + 'EventRule',
// description: 'Rule to send {event_type.value} events to the {handler.function_name} Lambda',
// eventBus: this.props.mainBus,
// });
//
// eventRule.addTarget(new aws_events_targets.LambdaFunction(fn));
// eventRule.addEventPattern({
// source: ['ORCHESTRATOR'], // FIXME complete source to destination event mapping
// detailType: ['SequenceRunStateChange'],
// });
// }
private setupEventRule(fn: aws_lambda.Function) {
/**
* For sequence run manager, we are using orcabus events ( source from BSSH ENS event pipe) to trigger the lambda function.
* event rule to filter the events that we are interested in.
* event pattern: see below
* process lambda will record the event to the database, and emit the 'SequenceRunStateChange' event to the event bus.
*
*/
const eventRule = new Rule(this, this.stackName + 'EventRule', {
ruleName: this.stackName + 'EventRule',
description: 'Rule to send {event_type.value} events to the {handler.function_name} Lambda',
eventBus: this.mainBus,
});
eventRule.addEventPattern({
detailType: ['Event from aws:sqs'],
detail: {
'ica-event': {
// mandatory fields (gdsFolderPath, gdsVolumeName(starts with bssh), instrumentRunId, dateModified)
gdsFolderPath: [{ exists: true }],
gdsVolumeName: [{ prefix: 'bssh' }],
instrumentRunId: [{ exists: true }],
dateModified: [{ exists: true }],

// optional fields (flowcell barcode, sample sheet name, reagent barcode, ica project id, api url, name)
acl: [{ prefix: 'wid:' }, { prefix: 'tid:' }],
id: [{ exists: true }],
status: [{ exists: true }],
},
},
});

eventRule.addTarget(new LambdaFunction(fn));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ def from_value(cls, value):
def from_seq_run_status(cls, value):
"""
See Run Status
https://support.illumina.com/help/BaseSpace_Sequence_Hub/Source/Informatics/BS/Statuses_swBS.htm
https://help.basespace.illumina.com/automate/statuses
https://support.illumina.com/help/BaseSpace_Sequence_Hub/Source/Informatics/BS/Statuses_swBS.htm (deprecated)

Note that we don't necessary support all these statuses. In the following check, those values come
from observed values from our BSSH run events.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,65 @@

from libumccr import libjson
from libumccr.aws import libeb
from libica.app import ENSEventType
# from libica.app import ENSEventType

logger = logging.getLogger()
logger.setLevel(logging.INFO)

IMPLEMENTED_ENS_TYPES = [
ENSEventType.BSSH_RUNS.value,
]
# IMPLEMENTED_ENS_TYPES = [
# ENSEventType.BSSH_RUNS.value,
# ]

PRODUCED_BY_BSSH = ["BaseSpaceSequenceHub"]
# PRODUCED_BY_BSSH = ["BaseSpaceSequenceHub"]


def sqs_handler(event, context):
def event_handler(event, context):
"""event payload dict

Here is how to generate an example event. See README for more.
python manage.py generate_mock_bssh_event | jq

example event:
{
"version": "0",
"id": f8c3de3d-1fea-4d7c-a8b0-29f63c4c3454", # Random UUID
"detail-type": "Event from aws:sqs",
"source": "Pipe IcaEventPipeConstru-xxxxxxxx",
"account": "444444444444",
"time": "2024-11-02T21:58:22Z",
"region": "ap-southeast-2",
"resources": [],
"detail": {
"ica-event": {
"gdsFolderPath": "",
"gdsVolumeName": "bssh.123456789fabcdefghijkl",
"v1pre3Id": "444555555555",
"dateModified": "2024-11-02T21:58:13.7451620Z",
"acl": [
"wid:12345678-debe-3f9f-8b92-21244f46822c",
"tid:Yxmm......"
],
"flowcellBarcode": "HVJJJJJJ",
"icaProjectId": "12345678-53ba-47a5-854d-e6b53101adb7",
"sampleSheetName": "SampleSheet.V2.134567.csv",
"apiUrl": "https://api.aps2.sh.basespace.illumina.com/v2/runs/r.4Wz-ABCDEFGHIJKLM-A",
"name": "222222_A01052_1234_BHVJJJJJJ",
"id": "r.4Wz-ABCDEFGHIJKLMN-A",
"instrumentRunId": "222222_A01052_1234_BHVJJJJJJ",
"status": "PendingAnalysis"
}
}
}

This Lambda is to be subscribed to SQS for BSSH event through ICA v1 ENS
https://illumina.gitbook.io/ica-v1/events/e-deliverytargets
This Lambda is to be subscribed to Orcabus Eventbridge rule for BSSH event through ICA v2 sqs event pipe
https://help.ica.illumina.com/project/p-notifications#delivery-targets
https://illumina.gitbook.io/ica-v1/events/e-deliverytargets (deprecated)

OrcaBus SRM BSSH Event High Level:
- through ICA v1 ENS, we subscribe to `bssh.runs` using SQS queue created at our AWS
- in our SQS queue, we hook this Lambda as event trigger and process the event
- now, when `bssh.runs` event with `statuschanged` status arrive...
- this SQS event envelope may contain multiple `Records`
- we parse these `Records`, transform and persist them into our internal OrcaBus SRM `Sequence` entity model
- through ICA v2 sqs event pipe, we subscribe to Orcabus Eventbridge with specific rule
- this Lambda is to be hooked to this Eventbridge rule to process the event
- now, when `ica-event` event with `instrumentRunId` and `statuschanged` status arrive...
- we parse these `ica-event` payload, transform and persist them into our internal OrcaBus SRM `Sequence` entity model
- after persisted into database, we again transform into our internal `SequenceRunStateChange` domain event
- this domain event schema is what we consented and published in our EventBus event schema registry
- we then dispatch our domain events into the channel in batching manner for efficiency
Expand All @@ -58,51 +90,38 @@ def sqs_handler(event, context):
:param context:
:return:
"""
assert os.environ["EVENT_BUS_NAME"] is not None, "EVENT_BUS_NAME must be set"

logger.info("Start processing BSSH ENS event")
logger.info(libjson.dumps(event))

messages = event["Records"]
event_bus_name = os.environ["EVENT_BUS_NAME"]

entries = list()

for message in messages:
event_type = message["messageAttributes"]["type"]["stringValue"]
produced_by = message["messageAttributes"]["producedby"]["stringValue"]

if event_type not in IMPLEMENTED_ENS_TYPES:
logger.warning(f"Skipping unsupported ENS type: {event_type}")
continue

if produced_by not in PRODUCED_BY_BSSH:
raise ValueError(f"Unrecognised BSSH event produced_by: {produced_by}")

if event_type == ENSEventType.BSSH_RUNS.value:
payload = {}
payload.update(libjson.loads(message["body"]))

# Create or update Sequence record from BSSH Run event payload
sequence_domain: SequenceDomain = (
sequence_srv.create_or_update_sequence_from_bssh_event(payload)
# Extract relevant fields from the event payload
event_details = event.get("detail", {}).get("ica-event", {})

# Create or update Sequence record from BSSH Run event payload
sequence_domain: SequenceDomain = (
sequence_srv.create_or_update_sequence_from_bssh_event(event_details)
)
entry = None

# Detect SequenceRunStateChange
if sequence_domain.state_has_changed:
try:
SequenceRule(sequence_domain.sequence).must_not_emergency_stop()
entry = sequence_domain.to_put_events_request_entry(
event_bus_name=event_bus_name,
)

# Detect SequenceRunStateChange
if sequence_domain.state_has_changed:
try:
SequenceRule(sequence_domain.sequence).must_not_emergency_stop()
entry = sequence_domain.to_put_events_request_entry(
event_bus_name=event_bus_name,
)
entries.append(entry)
except SequenceRuleError as se:
# FIXME emit custom event for this? something to tackle later. log & skip for now
reason = f"Aborted pipeline due to {se}"
logger.warning(reason)
continue

except SequenceRuleError as se:
# FIXME emit custom event for this? something to tackle later. log & skip for now
reason = f"Aborted pipeline due to {se}"
logger.warning(reason)

# Dispatch all event entries in one-go! libeb will take care of batching them up for efficiency.
if entries:
libeb.dispatch_events(entries)
if entry:
libeb.emit_event(entry)

resp_msg = {
"message": f"BSSH ENS event processing complete",
Expand Down
Loading
Loading