From 7bcaabdcb14e747c2dd6f5d3a26e1766167f3283 Mon Sep 17 00:00:00 2001 From: Ray Liu Date: Tue, 3 Dec 2024 14:01:38 +1100 Subject: [PATCH 1/4] feat(srm) update SRM to handle BSSH events from orcabus ICAv2 event pipe --- .../sequence-run-manager/deploy/README.md | 8 +- .../sequence-run-manager/deploy/stack.ts | 47 +++-- .../sequence_run_manager/models/sequence.py | 3 +- .../lambdas/bssh_event.py | 114 +++++++----- .../tests/test_bssh_event.py | 172 +++++++++--------- 5 files changed, 189 insertions(+), 155 deletions(-) diff --git a/lib/workload/stateless/stacks/sequence-run-manager/deploy/README.md b/lib/workload/stateless/stacks/sequence-run-manager/deploy/README.md index 8d5577fdb..084f53937 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/deploy/README.md +++ b/lib/workload/stateless/stacks/sequence-run-manager/deploy/README.md @@ -17,10 +17,10 @@ Hot-deploy against dev: export AWS_PROFILE=umccr-dev-admin yarn cdk-stateless list -yarn cdk-stateless synth -e OrcaBusStatelessPipeline/BetaDeployment/SequenceRunManagerStack -yarn cdk-stateless diff -e OrcaBusStatelessPipeline/BetaDeployment/SequenceRunManagerStack -yarn cdk-stateless deploy -e OrcaBusStatelessPipeline/BetaDeployment/SequenceRunManagerStack -yarn cdk-stateless destroy -e OrcaBusStatelessPipeline/BetaDeployment/SequenceRunManagerStack +yarn cdk-stateless synth -e OrcaBusStatelessPipeline/OrcaBusBeta/SequenceRunManagerStack +yarn cdk-stateless diff -e OrcaBusStatelessPipeline/OrcaBusBeta/SequenceRunManagerStack +yarn cdk-stateless deploy -e OrcaBusStatelessPipeline/OrcaBusBeta/SequenceRunManagerStack +yarn cdk-stateless destroy -e OrcaBusStatelessPipeline/OrcaBusBeta/SequenceRunManagerStack ``` CloudFormation template: diff --git a/lib/workload/stateless/stacks/sequence-run-manager/deploy/stack.ts b/lib/workload/stateless/stacks/sequence-run-manager/deploy/stack.ts index 27f5b0381..6228cc321 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/deploy/stack.ts +++ b/lib/workload/stateless/stacks/sequence-run-manager/deploy/stack.ts @@ -3,7 +3,8 @@ import * as cdk from 'aws-cdk-lib'; import { aws_lambda, aws_secretsmanager, Duration, Stack } from 'aws-cdk-lib'; import { Construct } from 'constructs'; import { ISecurityGroup, IVpc, SecurityGroup, Vpc, VpcLookupOptions } from 'aws-cdk-lib/aws-ec2'; -import { EventBus, IEventBus } from 'aws-cdk-lib/aws-events'; +import { EventBus, IEventBus, Rule } from 'aws-cdk-lib/aws-events'; +import { LambdaFunction } from 'aws-cdk-lib/aws-events-targets'; import { PythonFunction, PythonLayerVersion } from '@aws-cdk/aws-lambda-python-alpha'; import { HttpLambdaIntegration } from 'aws-cdk-lib/aws-apigatewayv2-integrations'; import { HttpMethod, HttpRoute, HttpRouteKey } from 'aws-cdk-lib/aws-apigatewayv2'; @@ -140,20 +141,36 @@ export class SequenceRunManagerStack extends Stack { }); this.mainBus.grantPutEventsTo(procSqsFn); - // this.setupEventRule(procSqsFn); // TODO comment this out for now + this.setupEventRule(procSqsFn); // TODO comment this out for now } - // private setupEventRule(fn: aws_lambda.Function) { - // const eventRule = new Rule(this, this.id + 'EventRule', { - // ruleName: this.id + 'EventRule', - // description: 'Rule to send {event_type.value} events to the {handler.function_name} Lambda', - // eventBus: this.props.mainBus, - // }); - // - // eventRule.addTarget(new aws_events_targets.LambdaFunction(fn)); - // eventRule.addEventPattern({ - // source: ['ORCHESTRATOR'], // FIXME complete source to destination event mapping - // detailType: ['SequenceRunStateChange'], - // }); - // } + private setupEventRule(fn: aws_lambda.Function) { + /** + * For + + */ + const eventRule = new Rule(this, this.stackName + 'EventRule', { + ruleName: this.stackName + 'EventRule', + description: 'Rule to send {event_type.value} events to the {handler.function_name} Lambda', + eventBus: this.mainBus, + }); + eventRule.addEventPattern({ + detailType: ['Event from aws:sqs'], + detail: { + 'ica-event': { + // only for mandatory fields + gdsFolderPath: [{ exists: true }], + gdsVolumeName: [{ prefix: 'bssh' }], + instrumentRunId: [{ exists: true }], + dateModified: [{ exists: true }], + // optional fields (flowcell barcode, sample sheet name, reagent barcode, ica project id, api url, name) + acl: [{ prefix: 'wid:' }, { prefix: 'tid:' }], + id: [{ exists: true }], + status: [{ exists: true }], + }, + }, + }); + + eventRule.addTarget(new LambdaFunction(fn)); + } } diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/sequence.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/sequence.py index ba2c470fa..bfe5e3332 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/sequence.py +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/sequence.py @@ -30,7 +30,8 @@ def from_value(cls, value): def from_seq_run_status(cls, value): """ See Run Status - https://support.illumina.com/help/BaseSpace_Sequence_Hub/Source/Informatics/BS/Statuses_swBS.htm + https://help.basespace.illumina.com/automate/statuses + https://support.illumina.com/help/BaseSpace_Sequence_Hub/Source/Informatics/BS/Statuses_swBS.htm (deprecated) Note that we don't necessary support all these statuses. In the following check, those values come from observed values from our BSSH run events. diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/lambdas/bssh_event.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/lambdas/bssh_event.py index dbd5063f1..f365f0998 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/lambdas/bssh_event.py +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/lambdas/bssh_event.py @@ -17,33 +17,65 @@ from libumccr import libjson from libumccr.aws import libeb -from libica.app import ENSEventType +# from libica.app import ENSEventType logger = logging.getLogger() logger.setLevel(logging.INFO) -IMPLEMENTED_ENS_TYPES = [ - ENSEventType.BSSH_RUNS.value, -] +# IMPLEMENTED_ENS_TYPES = [ +# ENSEventType.BSSH_RUNS.value, +# ] -PRODUCED_BY_BSSH = ["BaseSpaceSequenceHub"] +# PRODUCED_BY_BSSH = ["BaseSpaceSequenceHub"] -def sqs_handler(event, context): +def event_handler(event, context): """event payload dict Here is how to generate an example event. See README for more. python manage.py generate_mock_bssh_event | jq + + example event: + { + "version": "0", + "id": f8c3de3d-1fea-4d7c-a8b0-29f63c4c3454", # Random UUID + "detail-type": "Event from aws:sqs", + "source": "Pipe IcaEventPipeConstru-xxxxxxxx", + "account": "444444444444", + "time": "2024-11-02T21:58:22Z", + "region": "ap-southeast-2", + "resources": [], + "detail": { + "ica-event": { + "gdsFolderPath": "", + "gdsVolumeName": "bssh.123456789fabcdefghijkl", + "v1pre3Id": "444555555555", + "dateModified": "2024-11-02T21:58:13.7451620Z", + "acl": [ + "wid:12345678-debe-3f9f-8b92-21244f46822c", + "tid:Yxmm......" + ], + "flowcellBarcode": "HVJJJJJJ", + "icaProjectId": "12345678-53ba-47a5-854d-e6b53101adb7", + "sampleSheetName": "SampleSheet.V2.134567.csv", + "apiUrl": "https://api.aps2.sh.basespace.illumina.com/v2/runs/r.4Wz-ABCDEFGHIJKLM-A", + "name": "222222_A01052_1234_BHVJJJJJJ", + "id": "r.4Wz-ABCDEFGHIJKLMN-A", + "instrumentRunId": "222222_A01052_1234_BHVJJJJJJ", + "status": "PendingAnalysis" + } + } + } - This Lambda is to be subscribed to SQS for BSSH event through ICA v1 ENS - https://illumina.gitbook.io/ica-v1/events/e-deliverytargets + This Lambda is to be subscribed to Orcabus Eventbridge rule for BSSH event through ICA v2 sqs event pipe + https://help.ica.illumina.com/project/p-notifications#delivery-targets + https://illumina.gitbook.io/ica-v1/events/e-deliverytargets (deprecated) OrcaBus SRM BSSH Event High Level: - - through ICA v1 ENS, we subscribe to `bssh.runs` using SQS queue created at our AWS - - in our SQS queue, we hook this Lambda as event trigger and process the event - - now, when `bssh.runs` event with `statuschanged` status arrive... - - this SQS event envelope may contain multiple `Records` - - we parse these `Records`, transform and persist them into our internal OrcaBus SRM `Sequence` entity model + - through ICA v2 sqs event pipe, we subscribe to Orcabus Eventbridge with specific rule + - this Lambda is to be hooked to this Eventbridge rule to process the event + - now, when `ica-event` event with `instrumentRunId` and `statuschanged` status arrive... + - we parse these `ica-event` payload, transform and persist them into our internal OrcaBus SRM `Sequence` entity model - after persisted into database, we again transform into our internal `SequenceRunStateChange` domain event - this domain event schema is what we consented and published in our EventBus event schema registry - we then dispatch our domain events into the channel in batching manner for efficiency @@ -58,51 +90,37 @@ def sqs_handler(event, context): :param context: :return: """ + assert os.environ["EVENT_BUS_NAME"] is not None, "EVENT_BUS_NAME must be set" + logger.info("Start processing BSSH ENS event") logger.info(libjson.dumps(event)) - messages = event["Records"] event_bus_name = os.environ["EVENT_BUS_NAME"] - entries = list() - - for message in messages: - event_type = message["messageAttributes"]["type"]["stringValue"] - produced_by = message["messageAttributes"]["producedby"]["stringValue"] - - if event_type not in IMPLEMENTED_ENS_TYPES: - logger.warning(f"Skipping unsupported ENS type: {event_type}") - continue + # Extract relevant fields from the event payload + event_details = event.get("detail", {}).get("ica-event", {}) - if produced_by not in PRODUCED_BY_BSSH: - raise ValueError(f"Unrecognised BSSH event produced_by: {produced_by}") + # Create or update Sequence record from BSSH Run event payload + sequence_domain: SequenceDomain = ( + sequence_srv.create_or_update_sequence_from_bssh_event(event_details) + ) - if event_type == ENSEventType.BSSH_RUNS.value: - payload = {} - payload.update(libjson.loads(message["body"])) - - # Create or update Sequence record from BSSH Run event payload - sequence_domain: SequenceDomain = ( - sequence_srv.create_or_update_sequence_from_bssh_event(payload) + # Detect SequenceRunStateChange + if sequence_domain.state_has_changed: + try: + SequenceRule(sequence_domain.sequence).must_not_emergency_stop() + entry = sequence_domain.to_put_events_request_entry( + event_bus_name=event_bus_name, ) - - # Detect SequenceRunStateChange - if sequence_domain.state_has_changed: - try: - SequenceRule(sequence_domain.sequence).must_not_emergency_stop() - entry = sequence_domain.to_put_events_request_entry( - event_bus_name=event_bus_name, - ) - entries.append(entry) - except SequenceRuleError as se: - # FIXME emit custom event for this? something to tackle later. log & skip for now - reason = f"Aborted pipeline due to {se}" - logger.warning(reason) - continue + + except SequenceRuleError as se: + # FIXME emit custom event for this? something to tackle later. log & skip for now + reason = f"Aborted pipeline due to {se}" + logger.warning(reason) # Dispatch all event entries in one-go! libeb will take care of batching them up for efficiency. - if entries: - libeb.dispatch_events(entries) + if entry: + libeb.emit_event(entry) resp_msg = { "message": f"BSSH ENS event processing complete", diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/tests/test_bssh_event.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/tests/test_bssh_event.py index 676b6b908..f25747b21 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/tests/test_bssh_event.py +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/tests/test_bssh_event.py @@ -9,8 +9,41 @@ from sequence_run_manager_proc.lambdas import bssh_event from sequence_run_manager_proc.tests.case import logger, SequenceRunProcUnitTestCase +""" +example event: + { + "version": "0", + "id": f8c3de3d-1fea-4d7c-a8b0-29f63c4c3454", # Random UUID + "detail-type": "Event from aws:sqs", + "source": "Pipe IcaEventPipeConstru-xxxxxxxx", + "account": "444444444444", + "time": "2024-11-02T21:58:22Z", + "region": "ap-southeast-2", + "resources": [], + "detail": { + "ica-event": { + "gdsFolderPath": "", + "gdsVolumeName": "bssh.123456789fabcdefghijkl", + "v1pre3Id": "444555555555", + "dateModified": "2024-11-02T21:58:13.7451620Z", + "acl": [ + "wid:12345678-debe-3f9f-8b92-21244f46822c", + "tid:Yxmm......" + ], + "flowcellBarcode": "HVJJJJJJ", + "icaProjectId": "12345678-53ba-47a5-854d-e6b53101adb7", + "sampleSheetName": "SampleSheet.V2.134567.csv", + "apiUrl": "https://api.aps2.sh.basespace.illumina.com/v2/runs/r.4Wz-ABCDEFGHIJKLM-A", + "name": "222222_A01052_1234_BHVJJJJJJ", + "id": "r.4Wz-ABCDEFGHIJKLMN-A", + "instrumentRunId": "222222_A01052_1234_BHVJJJJJJ", + "status": "PendingAnalysis" + } + } + } +""" -def sqs_bssh_event_message(): +def bssh_event_message(): mock_instrument_run_id = TestConstant.instrument_run_id.value mock_sequence_run_id = "r.ACGTlKjDgEy099ioQOeOWg" mock_sequence_run_name = mock_instrument_run_id @@ -33,57 +66,21 @@ def sqs_bssh_event_message(): "status": mock_status, } - ens_sqs_message_attributes = { - "action": { - "stringValue": "statuschanged", - "stringListValues": [], - "binaryListValues": [], - "dataType": "String", - }, - "actiondate": { - "stringValue": "2020-05-09T22:17:10.815Z", - "stringListValues": [], - "binaryListValues": [], - "dataType": "String", - }, - "type": { - "stringValue": "bssh.runs", - "stringListValues": [], - "binaryListValues": [], - "dataType": "String", - }, - "producedby": { - "stringValue": "BaseSpaceSequenceHub", - "stringListValues": [], - "binaryListValues": [], - "dataType": "String", - }, - "contenttype": { - "stringValue": "application/json", - "stringListValues": [], - "binaryListValues": [], - "dataType": "String", + orcabus_event_message = { + "version": "0", + "id": "f8c3de3d-1fea-4d7c-a8b0-29f63c4c3454", # Random UUID + "detail-type": "Event from aws:sqs", + "source": "Pipe IcaEventPipeConstru-xxxxxxxx", + "account": "444444444444", + "time": "2024-11-02T21:58:22Z", + "region": "ap-southeast-2", + "resources": [], + "detail": { + "ica-event": sequence_run_message, }, } - sqs_event_message = { - "Records": [ - { - "eventSource": "aws:sqs", - "body": libjson.dumps(sequence_run_message), - "messageAttributes": ens_sqs_message_attributes, - "attributes": { - "ApproximateReceiveCount": "3", - "SentTimestamp": "1589509337523", - "SenderId": "ACTGAGCTI2IGZA4XHGYYY:sender-sender", - "ApproximateFirstReceiveTimestamp": "1589509337535", - }, - "eventSourceARN": "arn:aws:sqs:ap-southeast-2:843407916570:my-queue", - } - ] - } - - return sqs_event_message + return orcabus_event_message class BSSHEventUnitTests(SequenceRunProcUnitTestCase): @@ -96,45 +93,46 @@ def tearDown(self) -> None: super(BSSHEventUnitTests, self).tearDown() del os.environ["EVENT_BUS_NAME"] - def test_unsupported_ens_event_type(self): - """ - python manage.py test sequence_run_manager_proc.tests.test_bssh_event.BSSHEventUnitTests.test_unsupported_ens_event_type - """ - ens_sqs_message_attributes = { - "type": { - "stringValue": "tes.runs", - "stringListValues": [], - "binaryListValues": [], - "dataType": "String", - }, - "producedby": { - "stringValue": "BaseSpaceSequenceHub", - "stringListValues": [], - "binaryListValues": [], - "dataType": "String", - }, - } - - sqs_event_message = { - "Records": [ - { - "eventSource": "aws:sqs", - "body": "does_not_matter", - "messageAttributes": ens_sqs_message_attributes, - } - ] - } - - result = bssh_event.sqs_handler(sqs_event_message, None) - self.assertIsNotNone(result) - - def test_sqs_handler(self): + # comment as eventbridge rule will filter out unsupported event type + # def test_unsupported_ens_event_type(self): + # """ + # python manage.py test sequence_run_manager_proc.tests.test_bssh_event.BSSHEventUnitTests.test_unsupported_ens_event_type + # """ + # ens_sqs_message_attributes = { + # "type": { + # "stringValue": "tes.runs", + # "stringListValues": [], + # "binaryListValues": [], + # "dataType": "String", + # }, + # "producedby": { + # "stringValue": "BaseSpaceSequenceHub", + # "stringListValues": [], + # "binaryListValues": [], + # "dataType": "String", + # }, + # } + + # sqs_event_message = { + # "Records": [ + # { + # "eventSource": "aws:sqs", + # "body": "does_not_matter", + # "messageAttributes": ens_sqs_message_attributes, + # } + # ] + # } + + # result = bssh_event.sqs_handler(sqs_event_message, None) + # self.assertIsNotNone(result) + + def test_event_handler(self): """ - python manage.py test sequence_run_manager_proc.tests.test_bssh_event.BSSHEventUnitTests.test_sqs_handler + python manage.py test sequence_run_manager_proc.tests.test_bssh_event.BSSHEventUnitTests.test_event_handler """ when(libssm).get_ssm_param(...).thenReturn(libjson.dumps([])) - _ = bssh_event.sqs_handler(sqs_bssh_event_message(), None) + _ = bssh_event.event_handler(bssh_event_message(), None) qs = Sequence.objects.filter( instrument_run_id=TestConstant.instrument_run_id.value @@ -144,15 +142,15 @@ def test_sqs_handler(self): self.assertEqual(1, qs.count()) verify(libeb, times=1).eb_client(...) # event should fire - def test_sqs_handler_emergency_stop(self): + def test_event_handler_emergency_stop(self): """ - python manage.py test sequence_run_manager_proc.tests.test_bssh_event.BSSHEventUnitTests.test_sqs_handler_emergency_stop + python manage.py test sequence_run_manager_proc.tests.test_bssh_event.BSSHEventUnitTests.test_event_handler_emergency_stop """ when(libssm).get_ssm_param(...).thenReturn( libjson.dumps([TestConstant.instrument_run_id.value]) ) - _ = bssh_event.sqs_handler(sqs_bssh_event_message(), None) + _ = bssh_event.event_handler(bssh_event_message(), None) qs = Sequence.objects.filter( instrument_run_id=TestConstant.instrument_run_id.value From 37f7c7cd048e9325f91b6a77052382386742235d Mon Sep 17 00:00:00 2001 From: Ray Liu Date: Tue, 3 Dec 2024 18:28:08 +1100 Subject: [PATCH 2/4] feat(srm) add state and comment modules for sequence --- .../deps/requirements.txt | 1 + .../migrations/0001_initial.py | 95 +++++++++++++++++-- .../migrations/0002_alter_sequence_status.py | 26 ----- .../sequence_run_manager/models/__init__.py | 2 + .../sequence_run_manager/models/base.py | 34 +++++++ .../sequence_run_manager/models/comment.py | 27 ++++++ .../sequence_run_manager/models/sequence.py | 2 +- .../sequence_run_manager/models/state.py | 26 +++++ .../sequence_run_manager/serializers.py | 9 -- .../serializers/__init__.py | 0 .../sequence_run_manager/serializers/base.py | 39 ++++++++ .../serializers/comment.py | 13 +++ .../serializers/sequence.py | 26 +++++ .../sequence_run_manager/serializers/state.py | 12 +++ .../sequence_run_manager/urls/base.py | 5 +- .../sequence_run_manager/viewsets/base.py | 45 +++++++++ .../sequence_run_manager/viewsets/comment.py | 88 +++++++++++++++++ .../sequence_run_manager/viewsets/sequence.py | 23 +++-- .../sequence_run_manager/viewsets/state.py | 14 +++ 19 files changed, 434 insertions(+), 53 deletions(-) delete mode 100644 lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/migrations/0002_alter_sequence_status.py create mode 100644 lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/comment.py create mode 100644 lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/state.py delete mode 100644 lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers.py create mode 100644 lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/__init__.py create mode 100644 lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/base.py create mode 100644 lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/comment.py create mode 100644 lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/sequence.py create mode 100644 lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/state.py create mode 100644 lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/base.py create mode 100644 lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/comment.py create mode 100644 lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/state.py diff --git a/lib/workload/stateless/stacks/sequence-run-manager/deps/requirements.txt b/lib/workload/stateless/stacks/sequence-run-manager/deps/requirements.txt index 81d833f7f..e5c259fde 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/deps/requirements.txt +++ b/lib/workload/stateless/stacks/sequence-run-manager/deps/requirements.txt @@ -22,3 +22,4 @@ serverless-wsgi==3.0.5 # for sequencerunstatechange package six==1.16.0 regex==2024.9.11 +ulid-py==1.1.0 \ No newline at end of file diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/migrations/0001_initial.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/migrations/0001_initial.py index b9df3d664..b8aaad8e0 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/migrations/0001_initial.py +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/migrations/0001_initial.py @@ -1,18 +1,66 @@ -# Generated by Django 4.2.1 on 2023-06-14 07:37 +# Generated by Django 5.1.2 on 2024-12-03 07:25 +import django.core.validators +import django.db.models.deletion from django.db import migrations, models class Migration(migrations.Migration): + initial = True dependencies = [] operations = [ + migrations.CreateModel( + name="Comment", + fields=[ + ( + "orcabus_id", + models.CharField( + editable=False, + primary_key=True, + serialize=False, + unique=True, + validators=[ + django.core.validators.RegexValidator( + code="invalid_orcabus_id", + message="ULID is expected to be 26 characters long", + regex="^[\\w]{26}$", + ) + ], + ), + ), + ("comment", models.TextField()), + ("association_id", models.CharField(max_length=255)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ("created_by", models.CharField(max_length=255)), + ("updated_at", models.DateTimeField(auto_now=True)), + ("is_deleted", models.BooleanField(default=False)), + ], + options={ + "abstract": False, + }, + ), migrations.CreateModel( name="Sequence", fields=[ - ("id", models.BigAutoField(primary_key=True, serialize=False)), + ( + "orcabus_id", + models.CharField( + editable=False, + primary_key=True, + serialize=False, + unique=True, + validators=[ + django.core.validators.RegexValidator( + code="invalid_orcabus_id", + message="ULID is expected to be 26 characters long", + regex="^[\\w]{26}$", + ) + ], + ), + ), ("instrument_run_id", models.CharField(max_length=255, unique=True)), ("run_volume_name", models.TextField()), ("run_folder_path", models.TextField()), @@ -21,10 +69,10 @@ class Migration(migrations.Migration): "status", models.CharField( choices=[ - ("started", "Started"), - ("failed", "Failed"), - ("succeeded", "Succeeded"), - ("aborted", "Aborted"), + ("STARTED", "Started"), + ("FAILED", "Failed"), + ("SUCCEEDED", "Succeeded"), + ("ABORTED", "Aborted"), ], max_length=255, ), @@ -56,4 +104,39 @@ class Migration(migrations.Migration): "abstract": False, }, ), + migrations.CreateModel( + name="State", + fields=[ + ( + "orcabus_id", + models.CharField( + editable=False, + primary_key=True, + serialize=False, + unique=True, + validators=[ + django.core.validators.RegexValidator( + code="invalid_orcabus_id", + message="ULID is expected to be 26 characters long", + regex="^[\\w]{26}$", + ) + ], + ), + ), + ("status", models.CharField(max_length=255)), + ("timestamp", models.DateTimeField()), + ("comment", models.CharField(blank=True, max_length=255, null=True)), + ( + "sequence", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="states", + to="sequence_run_manager.sequence", + ), + ), + ], + options={ + "abstract": False, + }, + ), ] diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/migrations/0002_alter_sequence_status.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/migrations/0002_alter_sequence_status.py deleted file mode 100644 index 68dec7401..000000000 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/migrations/0002_alter_sequence_status.py +++ /dev/null @@ -1,26 +0,0 @@ -# Generated by Django 5.0.5 on 2024-06-12 01:28 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ("sequence_run_manager", "0001_initial"), - ] - - operations = [ - migrations.AlterField( - model_name="sequence", - name="status", - field=models.CharField( - choices=[ - ("STARTED", "Started"), - ("FAILED", "Failed"), - ("SUCCEEDED", "Succeeded"), - ("ABORTED", "Aborted"), - ], - max_length=255, - ), - ), - ] diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/__init__.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/__init__.py index 4660a2f00..d717b8a99 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/__init__.py +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/__init__.py @@ -1,3 +1,5 @@ # https://docs.djangoproject.com/en/4.1/topics/db/models/#organizing-models-in-a-package from .sequence import Sequence +from .comment import Comment +from .state import State diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/base.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/base.py index f5e6aafb2..276c1d338 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/base.py +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/base.py @@ -1,9 +1,11 @@ import logging import operator +import ulid from functools import reduce from typing import List from django.core.exceptions import FieldError +from django.core.validators import RegexValidator from django.db import models from django.db.models import ( Q, @@ -23,6 +25,12 @@ logger = logging.getLogger(__name__) +orcabus_id_validator = RegexValidator( + regex=r'^[\w]{26}$', + message='ULID is expected to be 26 characters long', + code='invalid_orcabus_id' + ) + class OrcaBusBaseManager(models.Manager): @staticmethod @@ -78,6 +86,32 @@ def exclude_params(params): class OrcaBusBaseModel(models.Model): class Meta: abstract = True + + orcabus_id_prefix = None + + orcabus_id = models.CharField( + primary_key=True, + unique=True, + editable=False, + blank=False, + null=False, + validators=[orcabus_id_validator] + ) + + def save(self, *args, **kwargs): + # handle the OrcaBus ID + if not self.orcabus_id: + # if no OrcaBus ID was provided, then generate one + self.orcabus_id = ulid.new().str + else: + # check provided OrcaBus ID + if len(self.orcabus_id) > 26: + # assume the OrcaBus ID carries the prefix + # we strip it off and continue to the validation + l = len(self.orcabus_id_prefix) + self.orcabus_id = str(self.orcabus_id)[l:] + self.full_clean() # make sure we are validating the inputs (especially the OrcaBus ID) + return super(OrcaBusBaseModel, self).save(*args, **kwargs) @classmethod def get_fields(cls): diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/comment.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/comment.py new file mode 100644 index 000000000..5e7963d2a --- /dev/null +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/comment.py @@ -0,0 +1,27 @@ +import logging + +from django.db import models + +from sequence_run_manager.models.base import OrcaBusBaseModel, OrcaBusBaseManager + +logger = logging.getLogger(__name__) + +class CommentManager(OrcaBusBaseManager): + pass + + +class Comment(OrcaBusBaseModel): + # primary key + orcabus_id_prefix = 'cmt.' + + comment = models.TextField(null=False, blank=False) + association_id = models.CharField(max_length=255, null=False, blank=False) # comment association object id + created_at = models.DateTimeField(auto_now_add=True) + created_by = models.CharField(max_length=255, null=False, blank=False) + updated_at = models.DateTimeField(auto_now=True) + is_deleted = models.BooleanField(default=False) + + objects = CommentManager() + + def __str__(self): + return f"ID: {self.orcabus_id}, comment: {self.comment}, from {self.created_by}, for {self.association_id}" \ No newline at end of file diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/sequence.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/sequence.py index bfe5e3332..a997a0aa4 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/sequence.py +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/sequence.py @@ -62,7 +62,7 @@ def get_by_keyword(self, **kwargs) -> QuerySet: class Sequence(OrcaBusBaseModel): # primary key - id = models.BigAutoField(primary_key=True) + orcabus_id_prefix = 'seq.' # mandatory non-nullable base fields instrument_run_id = models.CharField( diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/state.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/state.py new file mode 100644 index 000000000..2beb3fe16 --- /dev/null +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/state.py @@ -0,0 +1,26 @@ +import logging + +from django.db import models + +from sequence_run_manager.models.base import OrcaBusBaseModel, OrcaBusBaseManager +from sequence_run_manager.models.sequence import Sequence + +logger = logging.getLogger(__name__) + +class StateManager(OrcaBusBaseManager): + pass + + +class State(OrcaBusBaseModel): + orcabus_id_prefix = 'sqs.' + + status = models.CharField(max_length=255, null=False, blank=False) + timestamp = models.DateTimeField() + comment = models.CharField(max_length=255, null=True, blank=True) + + sequence = models.ForeignKey(Sequence, on_delete=models.CASCADE, related_name='states') + + objects = StateManager() + + def __str__(self): + return f"ID: {self.orcabus_id}, status: {self.status}, for {self.sequence}" diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers.py deleted file mode 100644 index fffbd48ad..000000000 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers.py +++ /dev/null @@ -1,9 +0,0 @@ -from rest_framework import serializers - -from sequence_run_manager.models import Sequence - - -class SequenceSerializer(serializers.ModelSerializer): - class Meta: - model = Sequence - fields = "__all__" diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/__init__.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/base.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/base.py new file mode 100644 index 000000000..2b57a52a5 --- /dev/null +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/base.py @@ -0,0 +1,39 @@ +import re +from rest_framework import serializers + + +def to_camel_case(snake_str): + components = re.split(r'[_\-\s]', snake_str) + return components[0].lower() + ''.join(x.title() for x in components[1:]) + + +class SerializersBase(serializers.ModelSerializer): + prefix = '' + + def __init__(self, *args, camel_case_data=False, **kwargs): + super().__init__(*args, **kwargs) + self.use_camel_case = camel_case_data + + def to_representation(self, instance): + representation = super().to_representation(instance) + representation['orcabus_id'] = self.prefix + str(representation['orcabus_id']) + + if self.use_camel_case: + return {to_camel_case(key): value for key, value in representation.items()} + return representation + + +class OptionalFieldsMixin: + def make_fields_optional(self): + # Make all fields optional + for field in self.fields.values(): + field.required = False + + # If the fields are CharField, you might also want to allow them to be blank + for field_name, field in self.fields.items(): + if isinstance(field, serializers.CharField): + field.allow_blank = True + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.make_fields_optional() diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/comment.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/comment.py new file mode 100644 index 000000000..d1f68eec9 --- /dev/null +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/comment.py @@ -0,0 +1,13 @@ +from rest_framework import serializers + +from sequence_run_manager.models import Comment +from sequence_run_manager.serializers.base import SerializersBase, OptionalFieldsMixin + +class CommentBaseSerializer(SerializersBase): + orcabus_id_prefix = Comment.orcabus_id_prefix + +class CommentSerializer(CommentBaseSerializer): + class Meta: + model = Comment + fields = "__all__" + diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/sequence.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/sequence.py new file mode 100644 index 000000000..d87dfca5f --- /dev/null +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/sequence.py @@ -0,0 +1,26 @@ +from rest_framework import serializers + +from sequence_run_manager.models import Sequence +from sequence_run_manager.serializers.base import SerializersBase, OptionalFieldsMixin + + +class SequenceBaseSerializer(SerializersBase): + orcabus_id_prefix = Sequence.orcabus_id_prefix + + +class SequenceListParamSerializer(OptionalFieldsMixin, SequenceBaseSerializer): + class Meta: + model = Sequence + fields = "__all__" + +class SequenceMinSerializer(SequenceBaseSerializer): + class Meta: + model = Sequence + fields = ["orcabus_id", "instrument_run_id", "start_time", "end_time", "status"] + +class SequenceSerializer(SequenceBaseSerializer): + class Meta: + model = Sequence + fields = "__all__" + + \ No newline at end of file diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/state.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/state.py new file mode 100644 index 000000000..8059473ce --- /dev/null +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/state.py @@ -0,0 +1,12 @@ +from rest_framework import serializers + +from sequence_run_manager.models import State +from sequence_run_manager.serializers.base import SerializersBase, OptionalFieldsMixin + +class StateBaseSerializer(SerializersBase): + orcabus_id_prefix = State.orcabus_id_prefix + +class StateSerializer(StateBaseSerializer): + class Meta: + model = State + fields = "__all__" diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/urls/base.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/urls/base.py index f0dd4cc77..0466ad51f 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/urls/base.py +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/urls/base.py @@ -2,6 +2,8 @@ from sequence_run_manager.routers import OptionalSlashDefaultRouter from sequence_run_manager.viewsets.sequence import SequenceViewSet +from sequence_run_manager.viewsets.state import StateViewSet +from sequence_run_manager.viewsets.comment import CommentViewSet from sequence_run_manager.settings.base import API_VERSION api_namespace = "api" @@ -11,8 +13,9 @@ router = OptionalSlashDefaultRouter() router.register(r"sequence", SequenceViewSet, basename="sequence") +router.register(r"sequence/(?P[^/.]+)/state", StateViewSet, basename="sequence-state") +router.register(r"sequence/(?P[^/.]+)/comment", CommentViewSet, basename="sequence-comment") urlpatterns = [ - # path("iam/", include(router.urls)), path(f"{api_base}", include(router.urls)), ] diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/base.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/base.py new file mode 100644 index 000000000..701a96685 --- /dev/null +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/base.py @@ -0,0 +1,45 @@ +from abc import ABC +from rest_framework import filters +from django.shortcuts import get_object_or_404 +from sequence_run_manager.pagination import StandardResultsSetPagination +from rest_framework.response import Response +from rest_framework.viewsets import ReadOnlyModelViewSet + + +class BaseViewSet(ReadOnlyModelViewSet, ABC): + lookup_value_regex = "[^/]+" # This is to allow for special characters in the URL + orcabus_id_prefix = '' + ordering_fields = "__all__" + ordering = ["-orcabus_id"] + pagination_class = StandardResultsSetPagination + filter_backends = [filters.OrderingFilter, filters.SearchFilter] + + def retrieve(self, request, *args, **kwargs): + """ + Since we have custom orcabus_id prefix for each model, we need to remove the prefix before retrieving it. + """ + pk = self.kwargs.get('pk') + if pk and pk.startswith(self.orcabus_id_prefix): + pk = pk[len(self.orcabus_id_prefix):] + + obj = get_object_or_404(self.get_queryset(), pk=pk) + serializer = self.serializer_class(obj) + return Response(serializer.data) + + def get_query_params(self): + """ + Sanitize query params if needed + e.g. remove prefixes for each orcabus_id + """ + query_params = self.request.query_params.copy() + orcabus_id = query_params.getlist("orcabus_id", None) + if orcabus_id: + id_list = [] + for key in orcabus_id: + if key.startswith(self.orcabus_id_prefix): + id_list.append(key[len(self.orcabus_id_prefix):]) + else: + id_list.append(key) + query_params.setlist('orcabus_id', id_list) + + return query_params diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/comment.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/comment.py new file mode 100644 index 000000000..d51ac2615 --- /dev/null +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/comment.py @@ -0,0 +1,88 @@ +from rest_framework import mixins +from rest_framework.viewsets import GenericViewSet +from rest_framework import status +from rest_framework.response import Response +from rest_framework.exceptions import PermissionDenied +from rest_framework.decorators import action + +from sequence_run_manager.models.comment import Comment +from sequence_run_manager.models.sequence import Sequence +from sequence_run_manager.serializers.comment import CommentSerializer + + +class CommentViewSet(mixins.CreateModelMixin, mixins.UpdateModelMixin, mixins.DestroyModelMixin, GenericViewSet): + serializer_class = CommentSerializer + search_fields = Comment.get_base_fields() + orcabus_id_prefix = Comment.orcabus_id_prefix + http_method_names = ['get', 'post', 'patch', 'delete'] + pagination_class = None + + def get_queryset(self): + return Comment.objects.filter( + association_id=self.kwargs["orcabus_id"], + is_deleted=False + ) + + def perform_create(self, serializer): + serializer.save(association_id=self.kwargs["orcabus_id"]) + + def create(self, request, *args, **kwargs): + seq_orcabus_id = self.kwargs["orcabus_id"] + + # Check if the SequenceRun exists + try: + Sequence.objects.get(orcabus_id=seq_orcabus_id) + except Sequence.DoesNotExist: + return Response({"detail": "SequenceRun not found."}, status=status.HTTP_404_NOT_FOUND) + + # Check if created_by and comment are provided + if not request.data.get('created_by') or not request.data.get('comment'): + return Response({"detail": "created_by and comment are required."}, status=status.HTTP_400_BAD_REQUEST) + + # Add workflow_run_id to the request data + mutable_data = request.data.copy() + mutable_data['sequence'] = seq_orcabus_id + + serializer = self.get_serializer(data=mutable_data) + serializer.is_valid(raise_exception=True) + self.perform_create(serializer) + headers = self.get_success_headers(serializer.data) + return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers) + + def perform_create(self, serializer): + serializer.save() # Assuming you're using email as the user identifier + + def update(self, request, *args, **kwargs): + partial = kwargs.pop('partial', False) + instance = self.get_object() + + # Check if the user updating the comment is the same as the one who created it + if instance.created_by != request.data.get('created_by'): + raise PermissionDenied("You don't have permission to update this comment.") + + # Ensure only the comment field can be updated + if set(request.data.keys()) - {'comment', 'created_by'}: + return Response({"detail": "Only the comment field can be updated."}, + status=status.HTTP_400_BAD_REQUEST) + + serializer = self.get_serializer(instance, data=request.data, partial=partial) + serializer.is_valid(raise_exception=True) + self.perform_update(serializer) + headers = self.get_success_headers(serializer.data) + return Response(serializer.data, status=status.HTTP_200_OK, headers=headers) + + def perform_update(self, serializer): + serializer.save() + + @action(detail=True, methods=['delete']) + def soft_delete(self, request, *args, **kwargs): + instance = self.get_object() + + # Check if the user deleting the comment is the same as the one who created it + if instance.created_by != request.data.get('created_by'): + raise PermissionDenied("You don't have permission to delete this comment.") + + instance.is_deleted = True + instance.save() + + return Response({"detail": "Comment successfully marked as deleted."}, status=status.HTTP_204_NO_CONTENT) \ No newline at end of file diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/sequence.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/sequence.py index 736bcfc95..6041ae7f6 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/sequence.py +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/sequence.py @@ -1,18 +1,21 @@ -from rest_framework import filters -from rest_framework.viewsets import ReadOnlyModelViewSet -from sequence_run_manager.models.sequence import Sequence -from sequence_run_manager.pagination import StandardResultsSetPagination -from sequence_run_manager.serializers import SequenceSerializer +from drf_spectacular.utils import extend_schema +from sequence_run_manager.viewsets.base import BaseViewSet +from sequence_run_manager.models.sequence import Sequence +from sequence_run_manager.serializers.sequence import SequenceSerializer, SequenceListParamSerializer, SequenceMinSerializer -class SequenceViewSet(ReadOnlyModelViewSet): +class SequenceViewSet(BaseViewSet): serializer_class = SequenceSerializer - pagination_class = StandardResultsSetPagination - filter_backends = [filters.OrderingFilter, filters.SearchFilter] - ordering_fields = "__all__" - ordering = ["-id"] search_fields = Sequence.get_base_fields() + orcabus_id_prefix = Sequence.orcabus_id_prefix def get_queryset(self): return Sequence.objects.get_by_keyword(**self.request.query_params) + + @extend_schema(parameters=[ + SequenceListParamSerializer + ]) + def list(self, request, *args, **kwargs): + self.serializer_class = SequenceMinSerializer + return super().list(request, *args, **kwargs) diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/state.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/state.py new file mode 100644 index 000000000..aee42a3ab --- /dev/null +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/state.py @@ -0,0 +1,14 @@ +from rest_framework.viewsets import GenericViewSet + +from sequence_run_manager.models.state import State +from sequence_run_manager.serializers.state import StateSerializer + + +class StateViewSet(GenericViewSet): + serializer_class = StateSerializer + search_fields = State.get_base_fields() + orcabus_id_prefix = State.orcabus_id_prefix + pagination_class = None + + def get_queryset(self): + return State.objects.filter(sequence=self.kwargs["orcabus_id"]) From 1b99ea9b91220487dfae61ec9ad2840319b9f354 Mon Sep 17 00:00:00 2001 From: Ray Liu Date: Tue, 3 Dec 2024 21:34:16 +1100 Subject: [PATCH 3/4] add sequence state service to record status change --- .../lambdas/bssh_event.py | 5 +-- .../services/sequence_state_srv.py | 35 +++++++++++++++++++ 2 files changed, 38 insertions(+), 2 deletions(-) create mode 100644 lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/services/sequence_state_srv.py diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/lambdas/bssh_event.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/lambdas/bssh_event.py index f365f0998..7c7ae2497 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/lambdas/bssh_event.py +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/lambdas/bssh_event.py @@ -13,7 +13,7 @@ SequenceRule, SequenceRuleError, ) -from sequence_run_manager_proc.services import sequence_srv +from sequence_run_manager_proc.services import sequence_srv, sequence_state_srv from libumccr import libjson from libumccr.aws import libeb @@ -109,6 +109,7 @@ def event_handler(event, context): if sequence_domain.state_has_changed: try: SequenceRule(sequence_domain.sequence).must_not_emergency_stop() + sequence_state_srv.create_sequence_state_from_bssh_event(event_details) entry = sequence_domain.to_put_events_request_entry( event_bus_name=event_bus_name, ) @@ -118,7 +119,7 @@ def event_handler(event, context): reason = f"Aborted pipeline due to {se}" logger.warning(reason) - # Dispatch all event entries in one-go! libeb will take care of batching them up for efficiency. + # Dispatch event entry using libeb. if entry: libeb.emit_event(entry) diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/services/sequence_state_srv.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/services/sequence_state_srv.py new file mode 100644 index 000000000..c70c11020 --- /dev/null +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/services/sequence_state_srv.py @@ -0,0 +1,35 @@ +import logging + +from django.db import transaction +from django.db.models import QuerySet + +from sequence_run_manager.models.sequence import Sequence +from sequence_run_manager.models.state import State + + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +@transaction.atomic +def create_sequence_state_from_bssh_event(payload: dict) -> None: + """ + Create SequenceState record from BSSH Run event payload + + { + "dateModified": "2022-06-24T05:07:53.476767Z", + "instrumentRunId": "200508_A01052_0001_BH5LY7ACGT", + "status": "PendingAnalysis" + ... + } + """ + status = payload["status"] + timestamp = payload["dateModified"] + + # get sequence by instrument_run_id + instrument_run_id = payload["instrumentRunId"] + sequence = Sequence.objects.get(instrument_run_id=instrument_run_id) + + # comment for any future usage, None by default + comment = None + + State.objects.create(status=status, timestamp=timestamp, sequence=sequence, comment=comment) \ No newline at end of file From a1a13e9bafa6d4659f4ce80c1a65deb47841d05a Mon Sep 17 00:00:00 2001 From: Ray Liu Date: Wed, 4 Dec 2024 11:25:35 +1100 Subject: [PATCH 4/4] fix bssh event handler issue --- .../stacks/sequence-run-manager/deploy/stack.ts | 14 +++++++++----- .../lambdas/bssh_event.py | 1 + 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/lib/workload/stateless/stacks/sequence-run-manager/deploy/stack.ts b/lib/workload/stateless/stacks/sequence-run-manager/deploy/stack.ts index 6228cc321..8dcadd03e 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/deploy/stack.ts +++ b/lib/workload/stateless/stacks/sequence-run-manager/deploy/stack.ts @@ -134,7 +134,7 @@ export class SequenceRunManagerStack extends Stack { */ const procSqsFn = this.createPythonFunction('ProcHandler', { index: 'sequence_run_manager_proc/lambdas/bssh_event.py', - handler: 'sqs_handler', + handler: 'event_handler', timeout: Duration.minutes(2), memorySize: 512, reservedConcurrentExecutions: 1, @@ -146,9 +146,12 @@ export class SequenceRunManagerStack extends Stack { private setupEventRule(fn: aws_lambda.Function) { /** - * For - - */ + * For sequence run manager, we are using orcabus events ( source from BSSH ENS event pipe) to trigger the lambda function. + * event rule to filter the events that we are interested in. + * event pattern: see below + * process lambda will record the event to the database, and emit the 'SequenceRunStateChange' event to the event bus. + * + */ const eventRule = new Rule(this, this.stackName + 'EventRule', { ruleName: this.stackName + 'EventRule', description: 'Rule to send {event_type.value} events to the {handler.function_name} Lambda', @@ -158,11 +161,12 @@ export class SequenceRunManagerStack extends Stack { detailType: ['Event from aws:sqs'], detail: { 'ica-event': { - // only for mandatory fields + // mandatory fields (gdsFolderPath, gdsVolumeName(starts with bssh), instrumentRunId, dateModified) gdsFolderPath: [{ exists: true }], gdsVolumeName: [{ prefix: 'bssh' }], instrumentRunId: [{ exists: true }], dateModified: [{ exists: true }], + // optional fields (flowcell barcode, sample sheet name, reagent barcode, ica project id, api url, name) acl: [{ prefix: 'wid:' }, { prefix: 'tid:' }], id: [{ exists: true }], diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/lambdas/bssh_event.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/lambdas/bssh_event.py index f365f0998..f0a436214 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/lambdas/bssh_event.py +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/lambdas/bssh_event.py @@ -104,6 +104,7 @@ def event_handler(event, context): sequence_domain: SequenceDomain = ( sequence_srv.create_or_update_sequence_from_bssh_event(event_details) ) + entry = None # Detect SequenceRunStateChange if sequence_domain.state_has_changed: