From 2d4dab43606533b273954aacc725658f84037e20 Mon Sep 17 00:00:00 2001 From: Phil Snyder Date: Tue, 28 May 2024 13:05:47 -0700 Subject: [PATCH] [ETL-634] Implement Dispatch Lambda (#114) * Refactor stacks related to input SNS In preparation for dispatch related stacks * Add Dispatch Lambda code and tests * Add stacks related to dispatch Lambda * Add `filter_object_info` function to dispatch Lambda --- .../namespaced/lambda-dispatch-role.yaml | 13 + .../develop/namespaced/lambda-dispatch.yaml | 15 ++ .../namespaced/lambda-s3-event-config.yaml | 4 +- .../namespaced/sns-dispatch-policy.yaml | 11 + config/develop/namespaced/sns-dispatch.yaml | 5 + .../develop/namespaced/sns-input-policy.yaml | 11 + config/develop/namespaced/sns-input.yaml | 5 + config/develop/namespaced/sns-topic.yaml | 9 - ...to-raw.yaml => sqs-input-to-dispatch.yaml} | 6 +- .../namespaced/sqs-input-to-intermediate.yaml | 4 +- .../prod/namespaced/lambda-dispatch-role.yaml | 13 + config/prod/namespaced/lambda-dispatch.yaml | 15 ++ .../namespaced/lambda-s3-event-config.yaml | 4 +- .../prod/namespaced/sns-dispatch-policy.yaml | 11 + config/prod/namespaced/sns-dispatch.yaml | 5 + config/prod/namespaced/sns-input-policy.yaml | 11 + config/prod/namespaced/sns-input.yaml | 5 + config/prod/namespaced/sns-topic.yaml | 9 - ...to-raw.yaml => sqs-input-to-dispatch.yaml} | 6 +- .../namespaced/sqs-input-to-intermediate.yaml | 4 +- src/lambda_function/dispatch/README.md | 35 +++ src/lambda_function/dispatch/app.py | 190 +++++++++++++++ src/lambda_function/dispatch/template.yaml | 72 ++++++ templates/lambda-dispatch-role.yaml | 78 ++++++ templates/sns-topic-policy.yaml | 73 ++++++ templates/sns-topic.yaml | 28 +-- tests/test_lambda_dispatch.py | 228 ++++++++++++++++++ 27 files changed, 811 insertions(+), 59 deletions(-) create mode 100644 config/develop/namespaced/lambda-dispatch-role.yaml create mode 100644 config/develop/namespaced/lambda-dispatch.yaml create mode 100644 config/develop/namespaced/sns-dispatch-policy.yaml create mode 100644 config/develop/namespaced/sns-dispatch.yaml create mode 100644 config/develop/namespaced/sns-input-policy.yaml create mode 100644 config/develop/namespaced/sns-input.yaml delete mode 100644 config/develop/namespaced/sns-topic.yaml rename config/develop/namespaced/{sqs-input-to-raw.yaml => sqs-input-to-dispatch.yaml} (64%) create mode 100644 config/prod/namespaced/lambda-dispatch-role.yaml create mode 100644 config/prod/namespaced/lambda-dispatch.yaml create mode 100644 config/prod/namespaced/sns-dispatch-policy.yaml create mode 100644 config/prod/namespaced/sns-dispatch.yaml create mode 100644 config/prod/namespaced/sns-input-policy.yaml create mode 100644 config/prod/namespaced/sns-input.yaml delete mode 100644 config/prod/namespaced/sns-topic.yaml rename config/prod/namespaced/{sqs-input-to-raw.yaml => sqs-input-to-dispatch.yaml} (65%) create mode 100644 src/lambda_function/dispatch/README.md create mode 100644 src/lambda_function/dispatch/app.py create mode 100644 src/lambda_function/dispatch/template.yaml create mode 100644 templates/lambda-dispatch-role.yaml create mode 100644 templates/sns-topic-policy.yaml create mode 100644 tests/test_lambda_dispatch.py diff --git a/config/develop/namespaced/lambda-dispatch-role.yaml b/config/develop/namespaced/lambda-dispatch-role.yaml new file mode 100644 index 00000000..01794860 --- /dev/null +++ b/config/develop/namespaced/lambda-dispatch-role.yaml @@ -0,0 +1,13 @@ +template: + path: lambda-dispatch-role.yaml +stack_name: "{{ stack_group_config.namespace }}-lambda-dispatch-role" +dependencies: + - develop/namespaced/sqs-input-to-dispatch.yaml + - develop/namespaced/sns-dispatch.yaml + - develop/s3-cloudformation-bucket.yaml +parameters: + SQSQueueArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-input-to-dispatch::PrimaryQueueArn" + S3SourceBucketName: {{ stack_group_config.input_bucket_name }} + SNSTopicArn: !stack_output_external "{{ stack_group_config.namespace }}-sns-dispatch::SnsTopicArn" +stack_tags: + {{ stack_group_config.default_stack_tags }} diff --git a/config/develop/namespaced/lambda-dispatch.yaml b/config/develop/namespaced/lambda-dispatch.yaml new file mode 100644 index 00000000..acbd4d37 --- /dev/null +++ b/config/develop/namespaced/lambda-dispatch.yaml @@ -0,0 +1,15 @@ +template: + type: sam + path: src/lambda_function/dispatch/template.yaml + artifact_bucket_name: {{ stack_group_config.template_bucket_name }} + artifact_prefix: "{{ stack_group_config.namespace }}/src/lambda" +dependencies: + - develop/namespaced/lambda-dispatch-role.yaml + - develop/namespaced/sqs-input-to-dispatch.yaml + - develop/s3-cloudformation-bucket.yaml +stack_name: "{{ stack_group_config.namespace }}-lambda-dispatch" +parameters: + RoleArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-dispatch-role::RoleArn" + SQSQueueArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-input-to-dispatch::PrimaryQueueArn" + DispatchSnsArn: !stack_output_external "{{ stack_group_config.namespace }}-sns-dispatch::SnsTopicArn" +stack_tags: {{ stack_group_config.default_stack_tags }} diff --git a/config/develop/namespaced/lambda-s3-event-config.yaml b/config/develop/namespaced/lambda-s3-event-config.yaml index 69ab6a78..86dd0803 100644 --- a/config/develop/namespaced/lambda-s3-event-config.yaml +++ b/config/develop/namespaced/lambda-s3-event-config.yaml @@ -6,12 +6,12 @@ template: dependencies: - develop/namespaced/lambda-s3-event-config-role.yaml - develop/s3-cloudformation-bucket.yaml - - develop/namespaced/sns-topic.yaml + - develop/namespaced/sns-input.yaml stack_name: '{{ stack_group_config.namespace }}-lambda-S3EventConfig' stack_tags: {{ stack_group_config.default_stack_tags }} parameters: Namespace: {{ stack_group_config.namespace }} - S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn" + S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-sns-input::SnsTopicArn" S3ToGlueDestinationType: "Topic" S3EventConfigRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-s3-event-config-role::RoleArn" S3SourceBucketName: {{ stack_group_config.input_bucket_name }} diff --git a/config/develop/namespaced/sns-dispatch-policy.yaml b/config/develop/namespaced/sns-dispatch-policy.yaml new file mode 100644 index 00000000..050c2ef3 --- /dev/null +++ b/config/develop/namespaced/sns-dispatch-policy.yaml @@ -0,0 +1,11 @@ +template: + path: sns-topic-policy.yaml +stack_name: "{{ stack_group_config.namespace }}-sns-dispatch-policy" +dependencies: + - develop/namespaced/lambda-dispatch.yaml + - develop/namespaced/sns-dispatch.yaml +parameters: + SnsTopicArn: !stack_output_external "{{ stack_group_config.namespace }}-sns-dispatch::SnsTopicArn" + LambdaSourceArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-dispatch::DispatchFunctionArn" +stack_tags: + {{ stack_group_config.default_stack_tags }} diff --git a/config/develop/namespaced/sns-dispatch.yaml b/config/develop/namespaced/sns-dispatch.yaml new file mode 100644 index 00000000..36d68ff7 --- /dev/null +++ b/config/develop/namespaced/sns-dispatch.yaml @@ -0,0 +1,5 @@ +template: + path: sns-topic.yaml +stack_name: "{{ stack_group_config.namespace }}-sns-dispatch" +stack_tags: + {{ stack_group_config.default_stack_tags }} diff --git a/config/develop/namespaced/sns-input-policy.yaml b/config/develop/namespaced/sns-input-policy.yaml new file mode 100644 index 00000000..eb19577a --- /dev/null +++ b/config/develop/namespaced/sns-input-policy.yaml @@ -0,0 +1,11 @@ +template: + path: sns-topic-policy.yaml +stack_name: "{{ stack_group_config.namespace }}-sns-input-policy" +dependencies: + - develop/s3-input-bucket.yaml + - develop/namespaced/sns-input.yaml +parameters: + SnsTopicArn: !stack_output_external "{{ stack_group_config.namespace }}-sns-input::SnsTopicArn" + S3SourceBucketArn: !stack_output_external "recover-dev-input-bucket::BucketArn" +stack_tags: + {{ stack_group_config.default_stack_tags }} diff --git a/config/develop/namespaced/sns-input.yaml b/config/develop/namespaced/sns-input.yaml new file mode 100644 index 00000000..f1f0a884 --- /dev/null +++ b/config/develop/namespaced/sns-input.yaml @@ -0,0 +1,5 @@ +template: + path: sns-topic.yaml +stack_name: "{{ stack_group_config.namespace }}-sns-input" +stack_tags: + {{ stack_group_config.default_stack_tags }} diff --git a/config/develop/namespaced/sns-topic.yaml b/config/develop/namespaced/sns-topic.yaml deleted file mode 100644 index 5c407963..00000000 --- a/config/develop/namespaced/sns-topic.yaml +++ /dev/null @@ -1,9 +0,0 @@ -template: - path: sns-topic.yaml -parameters: - S3SourceBucketArn: !stack_output_external recover-dev-input-bucket::BucketArn -dependencies: - - develop/s3-input-bucket.yaml -stack_name: "{{ stack_group_config.namespace }}-sns-input-to-sqs" -stack_tags: - {{ stack_group_config.default_stack_tags }} diff --git a/config/develop/namespaced/sqs-input-to-raw.yaml b/config/develop/namespaced/sqs-input-to-dispatch.yaml similarity index 64% rename from config/develop/namespaced/sqs-input-to-raw.yaml rename to config/develop/namespaced/sqs-input-to-dispatch.yaml index 65cbf6f8..634979d1 100644 --- a/config/develop/namespaced/sqs-input-to-raw.yaml +++ b/config/develop/namespaced/sqs-input-to-dispatch.yaml @@ -4,9 +4,9 @@ parameters: MessageRetentionPeriod: "1209600" ReceiveMessageWaitTimeSeconds: "20" VisibilityTimeout: "120" - SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn" + SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-input::SnsTopicArn" dependencies: - - develop/namespaced/sns-topic.yaml -stack_name: '{{ stack_group_config.namespace }}-sqs-input-to-raw' + - develop/namespaced/sns-input.yaml +stack_name: "{{ stack_group_config.namespace }}-sqs-input-to-dispatch" stack_tags: {{ stack_group_config.default_stack_tags }} diff --git a/config/develop/namespaced/sqs-input-to-intermediate.yaml b/config/develop/namespaced/sqs-input-to-intermediate.yaml index ab2a5b12..7759af46 100644 --- a/config/develop/namespaced/sqs-input-to-intermediate.yaml +++ b/config/develop/namespaced/sqs-input-to-intermediate.yaml @@ -4,9 +4,9 @@ parameters: MessageRetentionPeriod: "1209600" ReceiveMessageWaitTimeSeconds: "20" VisibilityTimeout: "120" - SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn" + SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-input::SnsTopicArn" dependencies: - - develop/namespaced/sns-topic.yaml + - develop/namespaced/sns-input.yaml stack_name: "{{ stack_group_config.namespace }}-sqs-input-to-intermediate" stack_tags: {{ stack_group_config.default_stack_tags }} diff --git a/config/prod/namespaced/lambda-dispatch-role.yaml b/config/prod/namespaced/lambda-dispatch-role.yaml new file mode 100644 index 00000000..7c9b702b --- /dev/null +++ b/config/prod/namespaced/lambda-dispatch-role.yaml @@ -0,0 +1,13 @@ +template: + path: lambda-dispatch-role.yaml +stack_name: "{{ stack_group_config.namespace }}-lambda-dispatch-role" +dependencies: + - prod/namespaced/sqs-input-to-dispatch.yaml + - prod/namespaced/sns-dispatch.yaml + - prod/s3-cloudformation-bucket.yaml +parameters: + SQSQueueArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-input-to-dispatch::PrimaryQueueArn" + S3SourceBucketName: {{ stack_group_config.input_bucket_name }} + SNSTopicArn: !stack_output_external "{{ stack_group_config.namespace }}-sns-dispatch::SnsTopicArn" +stack_tags: + {{ stack_group_config.default_stack_tags }} diff --git a/config/prod/namespaced/lambda-dispatch.yaml b/config/prod/namespaced/lambda-dispatch.yaml new file mode 100644 index 00000000..3320b63b --- /dev/null +++ b/config/prod/namespaced/lambda-dispatch.yaml @@ -0,0 +1,15 @@ +template: + type: sam + path: src/lambda_function/dispatch/template.yaml + artifact_bucket_name: {{ stack_group_config.template_bucket_name }} + artifact_prefix: "{{ stack_group_config.namespace }}/src/lambda" +dependencies: + - prod/namespaced/lambda-dispatch-role.yaml + - prod/namespaced/sqs-input-to-dispatch.yaml + - prod/s3-cloudformation-bucket.yaml +stack_name: "{{ stack_group_config.namespace }}-lambda-dispatch" +parameters: + RoleArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-dispatch-role::RoleArn" + SQSQueueArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-input-to-dispatch::PrimaryQueueArn" + DispatchSnsArn: !stack_output_external "{{ stack_group_config.namespace }}-sns-dispatch::SnsTopicArn" +stack_tags: {{ stack_group_config.default_stack_tags }} diff --git a/config/prod/namespaced/lambda-s3-event-config.yaml b/config/prod/namespaced/lambda-s3-event-config.yaml index 1d26eca1..c6ab6263 100644 --- a/config/prod/namespaced/lambda-s3-event-config.yaml +++ b/config/prod/namespaced/lambda-s3-event-config.yaml @@ -6,12 +6,12 @@ template: dependencies: - prod/namespaced/lambda-s3-event-config-role.yaml - prod/s3-cloudformation-bucket.yaml - - prod/namespaced/sns-topic.yaml + - prod/namespaced/sns-input.yaml stack_name: '{{ stack_group_config.namespace }}-lambda-S3EventConfig' stack_tags: {{ stack_group_config.default_stack_tags }} parameters: Namespace: {{ stack_group_config.namespace }} - S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn" + S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-sns-input::SnsTopicArn" S3ToGlueDestinationType: "Topic" S3EventConfigRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-s3-event-config-role::RoleArn" S3SourceBucketName: {{ stack_group_config.input_bucket_name }} diff --git a/config/prod/namespaced/sns-dispatch-policy.yaml b/config/prod/namespaced/sns-dispatch-policy.yaml new file mode 100644 index 00000000..8cee0459 --- /dev/null +++ b/config/prod/namespaced/sns-dispatch-policy.yaml @@ -0,0 +1,11 @@ +template: + path: sns-topic-policy.yaml +stack_name: "{{ stack_group_config.namespace }}-sns-dispatch-policy" +dependencies: + - prod/namespaced/lambda-dispatch.yaml + - prod/namespaced/sns-dispatch.yaml +parameters: + SnsTopicArn: !stack_output_external "{{ stack_group_config.namespace }}-sns-dispatch::SnsTopicArn" + LambdaSourceArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-dispatch::DispatchFunctionArn" +stack_tags: + {{ stack_group_config.default_stack_tags }} diff --git a/config/prod/namespaced/sns-dispatch.yaml b/config/prod/namespaced/sns-dispatch.yaml new file mode 100644 index 00000000..36d68ff7 --- /dev/null +++ b/config/prod/namespaced/sns-dispatch.yaml @@ -0,0 +1,5 @@ +template: + path: sns-topic.yaml +stack_name: "{{ stack_group_config.namespace }}-sns-dispatch" +stack_tags: + {{ stack_group_config.default_stack_tags }} diff --git a/config/prod/namespaced/sns-input-policy.yaml b/config/prod/namespaced/sns-input-policy.yaml new file mode 100644 index 00000000..df5be3f9 --- /dev/null +++ b/config/prod/namespaced/sns-input-policy.yaml @@ -0,0 +1,11 @@ +template: + path: sns-topic-policy.yaml +stack_name: "{{ stack_group_config.namespace }}-sns-input-policy" +dependencies: + - prod/s3-input-bucket.yaml + - prod/namespaced/sns-input.yaml +parameters: + SnsTopicArn: !stack_output_external "{{ stack_group_config.namespace }}-sns-input::SnsTopicArn" + S3SourceBucketArn: !stack_output_external "recover-input-bucket::BucketArn" +stack_tags: + {{ stack_group_config.default_stack_tags }} diff --git a/config/prod/namespaced/sns-input.yaml b/config/prod/namespaced/sns-input.yaml new file mode 100644 index 00000000..f1f0a884 --- /dev/null +++ b/config/prod/namespaced/sns-input.yaml @@ -0,0 +1,5 @@ +template: + path: sns-topic.yaml +stack_name: "{{ stack_group_config.namespace }}-sns-input" +stack_tags: + {{ stack_group_config.default_stack_tags }} diff --git a/config/prod/namespaced/sns-topic.yaml b/config/prod/namespaced/sns-topic.yaml deleted file mode 100644 index 8b5a5668..00000000 --- a/config/prod/namespaced/sns-topic.yaml +++ /dev/null @@ -1,9 +0,0 @@ -template: - path: sns-topic.yaml -parameters: - S3SourceBucketArn: !stack_output_external recover-input-bucket::BucketArn -dependencies: - - prod/s3-input-bucket.yaml -stack_name: "{{ stack_group_config.namespace }}-sns-input-to-sqs" -stack_tags: - {{ stack_group_config.default_stack_tags }} diff --git a/config/prod/namespaced/sqs-input-to-raw.yaml b/config/prod/namespaced/sqs-input-to-dispatch.yaml similarity index 65% rename from config/prod/namespaced/sqs-input-to-raw.yaml rename to config/prod/namespaced/sqs-input-to-dispatch.yaml index ad10199f..d1b539ae 100644 --- a/config/prod/namespaced/sqs-input-to-raw.yaml +++ b/config/prod/namespaced/sqs-input-to-dispatch.yaml @@ -4,9 +4,9 @@ parameters: MessageRetentionPeriod: "1209600" ReceiveMessageWaitTimeSeconds: "20" VisibilityTimeout: "120" - SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn" + SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-input::SnsTopicArn" dependencies: - - prod/namespaced/sns-topic.yaml -stack_name: '{{ stack_group_config.namespace }}-sqs-input-to-raw' + - prod/namespaced/sns-input.yaml +stack_name: '{{ stack_group_config.namespace }}-sqs-input-to-dispatch' stack_tags: {{ stack_group_config.default_stack_tags }} diff --git a/config/prod/namespaced/sqs-input-to-intermediate.yaml b/config/prod/namespaced/sqs-input-to-intermediate.yaml index ccbf268e..1f694589 100644 --- a/config/prod/namespaced/sqs-input-to-intermediate.yaml +++ b/config/prod/namespaced/sqs-input-to-intermediate.yaml @@ -4,9 +4,9 @@ parameters: MessageRetentionPeriod: "1209600" ReceiveMessageWaitTimeSeconds: "20" VisibilityTimeout: "120" - SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn" + SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-input::SnsTopicArn" dependencies: - - prod/namespaced/sns-topic.yaml + - prod/namespaced/sns-input.yaml stack_name: "{{ stack_group_config.namespace }}-sqs-input-to-intermediate" stack_tags: {{ stack_group_config.default_stack_tags }} diff --git a/src/lambda_function/dispatch/README.md b/src/lambda_function/dispatch/README.md new file mode 100644 index 00000000..2e7b1061 --- /dev/null +++ b/src/lambda_function/dispatch/README.md @@ -0,0 +1,35 @@ +# Dispatch Lambda + +The dispatch Lambda polls the input-to-dispatch SQS queue and publishes to the dispatch SNS topic. +Its purpose is to inspect an export and dispatch each file as a separate job to eventually be consumed +by the dispatch-to-raw Lambda. + +## Development + +The Serverless Application Model Command Line Interface (SAM CLI) is an +extension of the AWS CLI that adds functionality for building and testing +Lambda applications. + +To use the SAM CLI, you need the following tools. + +* SAM CLI - [Install the SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) +* Docker - [Install Docker community edition](https://hub.docker.com/search/?type=edition&offering=community) + +You may need the following for local testing. +* [Python 3 installed](https://www.python.org/downloads/) + +You will also need to configure your AWS credentials, if you have not already done so. + +## Creating a local build + +Use the SAM CLI to build and test your lambda locally. +Build your application with the `sam build` command. + +```bash +cd src/lambda_function/dispatch/ +sam build +``` + +## Tests + +Tests are available in `tests/test_dispatch_lambda.py`. diff --git a/src/lambda_function/dispatch/app.py b/src/lambda_function/dispatch/app.py new file mode 100644 index 00000000..35f14930 --- /dev/null +++ b/src/lambda_function/dispatch/app.py @@ -0,0 +1,190 @@ +""" +Dispatch Lambda + +This Lambda polls the input-to-dispatch SQS queue and publishes to the dispatch SNS topic. +Its purpose is to inspect each export and dispatch each file as a separate job in which +the file will be decompressed and uploaded to S3. +""" +import json +import logging +import os +import zipfile +from typing import Optional # use | for type hints in 3.10+ +from urllib import parse + +import boto3 + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +def filter_object_info(object_info: dict) -> Optional[dict]: + """ + Filter out objects that should not be processed. + + Returns None for: + + - Records containing owner.txt + - Records that don't contain a specific object key like / + - Records that are missing the `Key` field. + - Records that are missing the `Bucket` field. + + Args: + object_info (dict): Object information from source S3 bucket + as formatted by `get_object_info`. + + Returns: + dict: `object_info` if it passes the filter criteria (i.e., acts as + identity function) otherwise returns None. + """ + if not object_info["Key"]: + logger.info( + "This object_info record doesn't contain a source key " + f"and can't be processed.\nMessage: {object_info}", + ) + return None + elif not object_info["Bucket"]: + logger.info( + "This object_info record doesn't contain a source bucket " + f"and can't be processed.\nMessage: {object_info}", + ) + return None + elif "owner.txt" in object_info["Key"]: + logger.info( + f"This object_info record is an owner.txt and can't be processed.\nMessage: {object_info}" + ) + return None + elif object_info["Key"].endswith("/"): + logger.info( + f"This object_info record is a directory and can't be processed.\nMessage: {object_info}" + ) + return None + return object_info + +def get_object_info(s3_event: dict) -> dict: + """ + Derive object info from an S3 event. + + Args: + s3_event (dict): An S3 event + + Returns: + object_info (dict) The S3 object info + """ + bucket_name = s3_event["s3"]["bucket"]["name"] + object_key = parse.unquote(s3_event["s3"]["object"]["key"]) + object_info = { + "Bucket": bucket_name, + "Key": object_key, + } + return object_info + +def get_archive_contents(archive_path: str, bucket: str, key: str) -> list[dict]: + """ + Inspect a ZIP archive for its file contents. + + Args: + archive_path (str): The path of the ZIP archive to inspect. + bucket (str): The S3 bucket where the ZIP archive originates from. + key (str): The S3 key where the ZIP archive originates from. + + Returns: + archive_contents (list) A list of dictionaries. Each dictionary contains + the keys: + + * Bucket - The name of the S3 bucket + * Key - The S3 key + * Path - The path within the archive which identifies this file + * FileSize - The uncompressed size in bytes of this file + """ + archive_contents = [] + with zipfile.ZipFile(archive_path, "r") as archive: + for path in archive.infolist(): + if ( + "/" not in path.filename # necessary for pilot data only + and "Manifest" not in path.filename + and path.file_size > 0 + ): + file_info = { + "Bucket": bucket, + "Key": key, + "Path": path.filename, + "FileSize": path.file_size + } + archive_contents.append(file_info) + return archive_contents + +def lambda_handler(event: dict, context:dict) -> None: + """ + This function serves as the entrypoint and will be triggered upon + polling the input-to-dispatch SQS queue. + + Args: + event (dict): The input-to-dispatch SQS event. + context (dict): Information about the runtime environment and + the current invocation + + Returns: + (None): Calls the real workhorse of this module: `main`. + """ + s3_client = boto3.client("s3") + sns_client = boto3.client("sns") + dispatch_sns_arn = os.environ.get("DISPATCH_SNS_ARN", "") + # if there are multiple exports, they will overwrite each other + # since it's not necessary to have access to more than one export at a time. + temp_zip_path = "/tmp/export.zip" + main( + event=event, + context=context, + s3_client=s3_client, + sns_client=sns_client, + dispatch_sns_arn=dispatch_sns_arn, + temp_zip_path=temp_zip_path + ) + +def main( + event: dict, + context: dict, + sns_client: "botocore.client.SNS", + s3_client: "botocore.client.S3", + dispatch_sns_arn: str, + temp_zip_path: str +) -> None: + """ + This function should be invoked by `lambda_handler`. + + Args: + event (dict): The input-to-dispatch SQS event. + context (dict): Information about the runtime environment and + the current invocation + sns_client (botocore.client.SNS): An SNS client + s3_client (botocore.client.S3): An S3 client + dispatch_sns_arn: The ARN of the SNS topic we publish to + temp_zip_path: The path to download the export S3 object to. + + Returns: + (None): Logs and publishes to the dispatch SNS topic. + """ + for sqs_record in event["Records"]: + sns_notification = json.loads(sqs_record["body"]) + sns_message = json.loads(sns_notification["Message"]) + logger.info(f"Received SNS message: {sns_message}") + all_object_info_list = map(get_object_info, sns_message["Records"]) + valid_object_info_list = [ + object_info + for object_info in all_object_info_list + if filter_object_info(object_info) is not None + ] + for object_info in valid_object_info_list: + s3_client.download_file(Filename=temp_zip_path, **object_info) + logger.info(f"Getting archive contents for {object_info}") + archive_contents = get_archive_contents( + archive_path=temp_zip_path, + bucket=object_info["Bucket"], + key=object_info["Key"] + ) + for file_info in archive_contents: + logger.info(f"Publishing {file_info} to {dispatch_sns_arn}") + sns_client.publish( + TopicArn=dispatch_sns_arn, + Message=json.dumps(file_info) + ) diff --git a/src/lambda_function/dispatch/template.yaml b/src/lambda_function/dispatch/template.yaml new file mode 100644 index 00000000..78c56970 --- /dev/null +++ b/src/lambda_function/dispatch/template.yaml @@ -0,0 +1,72 @@ +AWSTemplateFormatVersion: '2010-09-09' + +Transform: AWS::Serverless-2016-10-31 + +Description: > + SAM Template for the dispatch Lambda. The dispatch Lambda polls the input-to-dispatch SQS + queue and publishes to the dispatch SNS topic. Its purpose is to inspect the each export + and dispatch each file as a separate job to be consumed by the dispatch-to-raw Lambda. + +Parameters: + + RoleArn: + Type: String + Description: ARN of the dispatch Lambda role. + + SQSQueueArn: + Type: String + Description: ARN of the input-to-dispatch SQS queue. + + DispatchSnsArn: + Type: String + Description: > + ARN of the Dispatch SNS Topic. + + LambdaPythonVersion: + Type: String + Description: Python version to use for this lambda function + Default: "3.9" + + LambdaBatchSize: + Type: Number + Default: 10 + Description: >- + The maximum number of S3 messages in a SQS event that Lambda will process in a batch + + LambdaMaximumBatchingWindowInSeconds: + Type: Number + Default: 300 + Description: >- + The maximum amount of time in seconds Lambda will batch messages before polling + the SQS queue and processing them + +Resources: + DispatchFunction: + Type: AWS::Serverless::Function + Properties: + PackageType: Zip + CodeUri: ./ + Handler: app.lambda_handler + Runtime: !Sub "python${LambdaPythonVersion}" + Role: !Ref RoleArn + MemorySize: 256 + EphemeralStorage: + Size: 2048 + Timeout: 30 + Environment: + Variables: + DISPATCH_SNS_ARN: !Ref DispatchSnsArn + Events: + SQSEvent: + Type: SQS + Properties: + BatchSize: !Ref LambdaBatchSize + MaximumBatchingWindowInSeconds: !Ref LambdaMaximumBatchingWindowInSeconds + Queue: !Ref SQSQueueArn + +Outputs: + DispatchFunctionArn: + Description: Arn of the dispatch Lambda. + Value: !GetAtt DispatchFunction.Arn + Export: + Name: !Sub "${AWS::Region}-${AWS::StackName}-DispatchFunctionArn" diff --git a/templates/lambda-dispatch-role.yaml b/templates/lambda-dispatch-role.yaml new file mode 100644 index 00000000..30913b5f --- /dev/null +++ b/templates/lambda-dispatch-role.yaml @@ -0,0 +1,78 @@ +AWSTemplateFormatVersion: '2010-09-09' + +Transform: AWS::Serverless-2016-10-31 + +Description: > + An IAM Role for the dispatch lambda + +Parameters: + SQSQueueArn: + Type: String + Description: ARN of the SQS queue for lambda to poll messages from. + + S3SourceBucketName: + Type: String + Description: Name of the S3 bucket where exports are deposited. + + SNSTopicArn: + Type: String + Description: ARN of the SNS topic which dispatched jobs will be published to. + +Resources: + DispatchRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: + - lambda.amazonaws.com + Action: + - sts:AssumeRole + ManagedPolicyArns: + - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole + Policies: + - PolicyName: PollSQSQueue + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - sqs:DeleteMessage + - sqs:GetQueueAttributes + - sqs:ReceiveMessage + Resource: + - !Ref SQSQueueArn + - PolicyName: ReadS3 + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - s3:Get* + - s3:List* + Resource: + - !Sub arn:aws:s3:::${S3SourceBucketName} + - !Sub arn:aws:s3:::${S3SourceBucketName}/* + - PolicyName: PublishToSNS + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - sns:Publish + Resource: + - !Ref SNSTopicArn + +Outputs: + RoleName: + Value: !Ref DispatchRole + Export: + Name: !Sub '${AWS::Region}-${AWS::StackName}-RoleName' + + RoleArn: + Value: !GetAtt DispatchRole.Arn + Export: + Name: !Sub '${AWS::Region}-${AWS::StackName}-RoleArn' diff --git a/templates/sns-topic-policy.yaml b/templates/sns-topic-policy.yaml new file mode 100644 index 00000000..dfead1fc --- /dev/null +++ b/templates/sns-topic-policy.yaml @@ -0,0 +1,73 @@ +AWSTemplateFormatVersion: "2010-09-09" + +Description: > + Allows an SNS Topic to be used with S3 event notifications and/or Lambda triggers. + +Parameters: + + SnsTopicArn: + Type: String + Description: The ARN of the SNS topic to apply this policy to. + + S3SourceBucketArn: + Type: String + Default: "" + Description: > + ARN of an S3 bucket where source data are stored. Specifying this + parameter enables this SNS topic to have S3 event notifications published to it. + + LambdaSourceArn: + Type: String + Default: "" + Description: ARN of the Lambda function which can trigger this SNS topic. Specifying this + parameter enables this SNS topic to have a Lambda publish to it. + +Conditions: + + HasS3SourceBucketArn: + !Not [!Equals [!Ref S3SourceBucketArn, ""]] + + HasLambdaSourceArn: + !Not [!Equals [!Ref LambdaSourceArn, ""]] + +Resources: + + SnsTopicPolicyForS3: + Type: AWS::SNS::TopicPolicy + Condition: HasS3SourceBucketArn + Properties: + PolicyDocument: + Version: "2012-10-17" + Statement: + - Sid: Input S3 bucket event notification to SNS + Effect: Allow + Principal: + Service: s3.amazonaws.com + Action: + - sns:Publish + Resource: !Ref SnsTopicArn + Condition: + ArnLike: + "aws:SourceArn": !Ref S3SourceBucketArn + Topics: + - !Ref SnsTopicArn + + SnsTopicPolicyForLambda: + Type: AWS::SNS::TopicPolicy + Condition: HasLambdaSourceArn + Properties: + PolicyDocument: + Version: "2012-10-17" + Statement: + - Sid: Lambda trigger to SNS + Effect: Allow + Principal: + Service: lambda.amazonaws.com + Action: + - sns:Publish + Resource: !Ref SnsTopicArn + Condition: + ArnLike: + "aws:SourceArn": !Ref LambdaSourceArn + Topics: + - !Ref SnsTopicArn diff --git a/templates/sns-topic.yaml b/templates/sns-topic.yaml index be3ddb32..5c2015b7 100644 --- a/templates/sns-topic.yaml +++ b/templates/sns-topic.yaml @@ -1,13 +1,7 @@ AWSTemplateFormatVersion: "2010-09-09" Description: > - Creates a Standard SNS queue which can be used with S3 event notifications. - -Parameters: - - S3SourceBucketArn: - Type: String - Description: Arn of the S3 bucket where source data are stored. + Creates a Standard SNS topic. Resources: @@ -17,26 +11,6 @@ Resources: TopicName: !Sub "${AWS::StackName}-Topic" FifoTopic: false - SnsTopicPolicy: - Type: AWS::SNS::TopicPolicy - Properties: - PolicyDocument: - Version: "2012-10-17" - Statement: - - Sid: Input S3 bucket event notification to SNS - Effect: Allow - Principal: - Service: s3.amazonaws.com - AWS: !Sub '${AWS::AccountId}' - Action: - - sns:Publish - Resource: !Ref SnsTopic - Condition: - ArnLike: - "aws:SourceArn": !Ref S3SourceBucketArn - Topics: - - !Ref SnsTopic - Outputs: SnsTopicArn: diff --git a/tests/test_lambda_dispatch.py b/tests/test_lambda_dispatch.py new file mode 100644 index 00000000..5de358a5 --- /dev/null +++ b/tests/test_lambda_dispatch.py @@ -0,0 +1,228 @@ +import json +import os +from pathlib import Path +import shutil +import tempfile +import zipfile + +import boto3 +import pytest +from moto import mock_sns, mock_s3 +from src.lambda_function.dispatch import app +from unittest import mock + +@pytest.fixture +def s3_event(): + s3_event = { + "eventVersion": "2.0", + "eventSource": "aws:s3", + "awsRegion": "us-east-1", + "eventTime": "1970-01-01T00:00:00.000Z", + "eventName": "ObjectCreated:Put", + "userIdentity": {"principalId": "EXAMPLE"}, + "requestParameters": {"sourceIPAddress": "127.0.0.1"}, + "responseElements": { + "x-amz-request-id": "EXAMPLE123456789", + "x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH", + }, + "s3": { + "s3SchemaVersion": "1.0", + "configurationId": "testConfigRule", + "bucket": { + "name": "recover-dev-input-data", + "ownerIdentity": {"principalId": "EXAMPLE"}, + "arn": "arn:aws:s3:::bucket_arn", + }, + "object": { + "key": "main/2023-01-12T22--02--17Z_77fefff8-b0e2-4c1b-b0c5-405554c92368", + "size": 1024, + "eTag": "0123456789abcdef0123456789abcdef", + "sequencer": "0A1B2C3D4E5F678901", + }, + }, + } + return s3_event + +@pytest.fixture +def sns_message_template(): + sns_message_template = { + "Type": "string", + "MessageId": "string", + "TopicArn": "string", + "Subject": "string", + "Message": "string", + "Timestamp": "string" + } + return sns_message_template + +@pytest.fixture +def sqs_message_template(): + sqs_msg = { + "Records": [ + { + "MessageId": "string", + "receiptHandle": "string", + "MD5OfBody": "string", + "body": "string", + "Attributes": { + "string": "string", + }, + "MD5OfMessageAttributes": "string", + "MessageAttributes": { + "string": { + "DataType": "string", + "StringValue": "string", + "BinaryValue": "string", + } + }, + } + ] + } + yield sqs_msg + +@pytest.fixture +def event(s3_event, sns_message_template, sqs_message_template): + sns_message_template["Message"] = json.dumps({"Records": [s3_event]}) + sqs_message_template["Records"][0]["body"] = json.dumps(sns_message_template) + return sqs_message_template + +@pytest.fixture +def archive_json_paths(): + archive_json_paths = [ + "HealthKitV2Workouts_20240508-20240509.json", # normal file + "empty.json", # should have file size 0 and be ignored + "Manifest.json", # should be ignored + "dir/containing/stuff.json" # should be ignored + ] + return archive_json_paths + +@pytest.fixture +def temp_zip_file(): + temp_zip_file = tempfile.NamedTemporaryFile( + delete=False, + suffix='.zip' + ) + return temp_zip_file + +@pytest.fixture +def archive_path(archive_json_paths, temp_zip_file): + with zipfile.ZipFile(temp_zip_file.name, 'w', zipfile.ZIP_DEFLATED) as zip_file: + for file_path in archive_json_paths: + if "/" in file_path: + os.makedirs(os.path.dirname(file_path)) + if file_path != "empty.json": + with open(file_path, "w") as this_file: + this_file.write("test") + else: + Path(file_path).touch() + zip_file.write(file_path) + if "/" in file_path: + shutil.rmtree(file_path.split("/")[0]) + else: + os.remove(file_path) + yield temp_zip_file.name + os.remove(temp_zip_file.name) + +def test_get_object_info(s3_event): + object_info = app.get_object_info(s3_event=s3_event) + assert object_info["Bucket"] == s3_event["s3"]["bucket"]["name"] + assert object_info["Key"] == s3_event["s3"]["object"]["key"] + +def test_get_object_info_unicode_characters_in_key(s3_event): + s3_event["s3"]["object"]["key"] = \ + "main/2023-09-26T00%3A06%3A39Z_d873eafb-554f-4f8a-9e61-cdbcb7de07eb" + object_info = app.get_object_info(s3_event=s3_event) + assert object_info["Key"] == \ + "main/2023-09-26T00:06:39Z_d873eafb-554f-4f8a-9e61-cdbcb7de07eb" + +@pytest.mark.parametrize( + "object_info,expected", + [ + ( + { + "Bucket": "recover-dev-input-data", + "Key": "main/2023-01-12T22--02--17Z_77fefff8-b0e2-4c1b-b0c5-405554c92368", + }, + { + "Bucket": "recover-dev-input-data", + "Key": "main/2023-01-12T22--02--17Z_77fefff8-b0e2-4c1b-b0c5-405554c92368", + }, + ), + ( + { + "Bucket": "recover-dev-input-data", + "Key": "main/v1/owner.txt", + }, + None, + ), + ( + { + "Bucket": "recover-dev-input-data", + "Key": "main/adults_v2/", + }, + None, + ), + ( + { + "Bucket": "recover-dev-input-data", + "Key": None, + }, + None, + ), + ( + { + "Bucket": None, + "Key": "main/2023-01-12T22--02--17Z_77fefff8-b0e2-4c1b-b0c5-405554c92368", + }, + None, + ), + ], + ids=[ + "correct_msg_format", + "owner_txt", + "directory", + "missing_key", + "missing_bucket", + ], +) +def test_that_filter_object_info_returns_expected_result(object_info, expected): + assert app.filter_object_info(object_info) == expected + +def test_get_archive_contents(archive_path, archive_json_paths): + dummy_bucket = "dummy_bucket" + dummy_key = "dummy_key" + archive_contents = app.get_archive_contents( + archive_path=archive_path, + bucket=dummy_bucket, + key=dummy_key + ) + assert all([content["Bucket"] == dummy_bucket for content in archive_contents]) + assert all([content["Key"] == dummy_key for content in archive_contents]) + assert all([content["FileSize"] > 0 for content in archive_contents]) + assert set([content["Path"] for content in archive_contents]) == \ + set(["HealthKitV2Workouts_20240508-20240509.json"]) + +@mock_sns +@mock_s3 +def test_main(event, temp_zip_file, s3_event, archive_path): + sns_client = boto3.client("sns") + s3_client = boto3.client("s3") + bucket = s3_event["s3"]["bucket"]["name"] + key = s3_event["s3"]["object"]["key"] + s3_client.create_bucket(Bucket=bucket) + s3_client.upload_file( + Filename=archive_path, + Bucket=bucket, + Key=key + ) + dispatch_sns = sns_client.create_topic(Name="test-sns-topic") + with mock.patch.object(sns_client, "publish", wraps=sns_client.publish) as mock_publish: + app.main( + event=event, + context=dict(), + sns_client=sns_client, + s3_client=s3_client, + dispatch_sns_arn=dispatch_sns["TopicArn"], + temp_zip_path=temp_zip_file.name + ) + mock_publish.assert_called()