From 3d224484b24b0dde7d8b17f05b7ab8264c7ef8bb Mon Sep 17 00:00:00 2001 From: Rixing Xu Date: Fri, 1 Sep 2023 16:38:25 -0700 Subject: [PATCH 1/5] add initial sqs queue setup --- config/develop/namespaced/sqs-queue.yaml | 13 +++++ templates/sqs-queue.yaml | 69 ++++++++++++++++++++++++ 2 files changed, 82 insertions(+) create mode 100644 config/develop/namespaced/sqs-queue.yaml create mode 100644 templates/sqs-queue.yaml diff --git a/config/develop/namespaced/sqs-queue.yaml b/config/develop/namespaced/sqs-queue.yaml new file mode 100644 index 00000000..b0eb9cb7 --- /dev/null +++ b/config/develop/namespaced/sqs-queue.yaml @@ -0,0 +1,13 @@ +template: + path: sqs-queue.yaml +parameters: + MessageRetentionPeriod: '86400' + ReceiveMessageWaitTimeSeconds: 20 + VisibilityTimeout: 120 + S3SourceBucketArn: !stack_output_external recover-dev-input-bucket::BucketArn +dependencies: + - develop/namespaced/s3-to-glue-lambda.yaml + - develop/s3-input-bucket.yaml +stack_name: '{{ stack_group_config.namespace }}-sqs-S3ToLambda' +stack_tags: + {{ stack_group_config.default_stack_tags }} diff --git a/templates/sqs-queue.yaml b/templates/sqs-queue.yaml new file mode 100644 index 00000000..d0b5f576 --- /dev/null +++ b/templates/sqs-queue.yaml @@ -0,0 +1,69 @@ +AWSTemplateFormatVersion: '2010-09-09' + +Description: > + Creates an SQS queue that gets S3 notifications + +Parameters: + + MessageRetentionPeriod: + Type: Number + Default: 1209600 + Description: How long to retain messages in the primary queue. + + ReceiveMessageWaitTimeSeconds: + Type: Number + Default: 20 + Description: The delay between SQS receiving a message and making it available for others to poll + + VisibilityTimeout: + Type: Number + Default: 120 + Description: >- + How long our lambda has to submit the messages to Glue and + delete the message from the SQS queue + + S3SourceBucketArn: + Type: String + Description: Arn of the S3 bucket where source data are stored. + +Resources: + + PrimaryQueue: + Type: AWS::SQS::Queue + Properties: + DelaySeconds: 0 + MessageRetentionPeriod: !Ref MessageRetentionPeriod + QueueName: !Sub '${AWS::StackName}-Queue' + ReceiveMessageWaitTimeSeconds: !Ref ReceiveMessageWaitTimeSeconds + VisibilityTimeout: !Ref VisibilityTimeout + + PrimaryQueuePolicy: + Type: AWS::SQS::QueuePolicy + Properties: + PolicyDocument: + Version: '2012-10-17' + Statement: + - Sid: Send_Permission + Effect: Allow + Principal: + AWS: !Sub '${AWS::AccountId}' + Action: + - SQS:SendMessage + Resource: !GetAtt PrimaryQueue.Arn + Condition: + ArnLike: + "aws:SourceArn": !Ref S3SourceBucketArn + Queues: + - !Ref PrimaryQueue + +Outputs: + + PrimaryQueueArn: + Value: !GetAtt PrimaryQueue.Arn + Export: + Name: !Sub '${AWS::Region}-${AWS::StackName}-PrimaryQueueArn' + + PrimaryQueueUrl: + Value: !Ref PrimaryQueue + Export: + Name: !Sub '${AWS::Region}-${AWS::StackName}-PrimaryQueueUrl' From 6c29f29afa1724bf9a6a92700add2b7d03473b85 Mon Sep 17 00:00:00 2001 From: Rixing Xu Date: Tue, 5 Sep 2023 13:15:46 -0700 Subject: [PATCH 2/5] change to str --- config/develop/namespaced/sqs-queue.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config/develop/namespaced/sqs-queue.yaml b/config/develop/namespaced/sqs-queue.yaml index b0eb9cb7..838902b5 100644 --- a/config/develop/namespaced/sqs-queue.yaml +++ b/config/develop/namespaced/sqs-queue.yaml @@ -2,8 +2,8 @@ template: path: sqs-queue.yaml parameters: MessageRetentionPeriod: '86400' - ReceiveMessageWaitTimeSeconds: 20 - VisibilityTimeout: 120 + ReceiveMessageWaitTimeSeconds: '20' + VisibilityTimeout: '120' S3SourceBucketArn: !stack_output_external recover-dev-input-bucket::BucketArn dependencies: - develop/namespaced/s3-to-glue-lambda.yaml From f5c5f811de121daf4c6604b246c1760a0837e3b6 Mon Sep 17 00:00:00 2001 From: Rixing Xu Date: Tue, 5 Sep 2023 13:55:45 -0700 Subject: [PATCH 3/5] add changes to event config lambda to support sending to SQS queues --- .../namespaced/s3-event-config-lambda.yaml | 3 +- src/lambda_function/s3_event_config/README.md | 9 ++- src/lambda_function/s3_event_config/app.py | 15 +++-- .../s3_event_config/template.yaml | 11 +++- tests/test_s3_event_config_lambda.py | 63 +++++++++++++++++-- 5 files changed, 84 insertions(+), 17 deletions(-) diff --git a/config/develop/namespaced/s3-event-config-lambda.yaml b/config/develop/namespaced/s3-event-config-lambda.yaml index 98f29daf..ad6442e0 100644 --- a/config/develop/namespaced/s3-event-config-lambda.yaml +++ b/config/develop/namespaced/s3-event-config-lambda.yaml @@ -10,6 +10,7 @@ stack_name: '{{ stack_group_config.namespace }}-lambda-S3EventConfig' stack_tags: {{ stack_group_config.default_stack_tags }} parameters: Namespace: {{ stack_group_config.namespace }} - S3ToGlueFunctionArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-S3ToGlue::S3ToGlueFunctionArn" + S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-S3ToGlue::S3ToGlueFunctionArn" + S3ToGlueDestinationType: "Queue" S3EventConfigRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-event-config-lambda-role::RoleArn" S3SourceBucketName: {{ stack_group_config.input_bucket_name }} diff --git a/src/lambda_function/s3_event_config/README.md b/src/lambda_function/s3_event_config/README.md index ccef795e..ca748f7d 100644 --- a/src/lambda_function/s3_event_config/README.md +++ b/src/lambda_function/s3_event_config/README.md @@ -4,9 +4,14 @@ The s3_event_config lambda is triggered by a github action during deployment or It will then put a S3 event notification configuration into the input data bucket which allows the input data bucket to -trigger the S3 to JSON lambda with S3 new object notifications whenever new objects are added +trigger a specific destination type with S3 new object notifications whenever new objects are added to it and eventually lead to the start of the S3-to-JSON workflow. +Currently **only** the following destination types are supported: + +- Lambda Function +- SQS queue + ## Event format The events that will trigger the s3-event-config-lambda @@ -23,6 +28,8 @@ Where the allowed RequestType values are: - "Update" - "Delete" +You can test the lambda by going to the AWS console for the lambda function, pasting the above sample event in and triggering the function. Any updates should then be visible in the input bucket's event config to confirm it was successful. + ## Launching Lambda stack in AWS There are two main stacks involved in the s3_event_config lambda. They are the diff --git a/src/lambda_function/s3_event_config/app.py b/src/lambda_function/s3_event_config/app.py index 5fb83ea4..307b2ce2 100644 --- a/src/lambda_function/s3_event_config/app.py +++ b/src/lambda_function/s3_event_config/app.py @@ -29,7 +29,8 @@ def lambda_handler(event, context): logger.info(f'Request Type: {event["RequestType"]}') add_notification( s3, - lambda_arn=os.environ["S3_TO_GLUE_FUNCTION_ARN"], + destination_arn=os.environ["S3_TO_GLUE_DESTINATION_ARN"], + destination_type=os.environ["S3_TO_GLUE_DESTINATION_TYPE"], bucket=os.environ["S3_SOURCE_BUCKET_NAME"], bucket_key_prefix=os.environ["BUCKET_KEY_PREFIX"], ) @@ -41,7 +42,8 @@ def lambda_handler(event, context): def add_notification( s3_client: boto3.client, - lambda_arn: str, + destination_type : str, + destination_arn: str, bucket: str, bucket_key_prefix: str, ): @@ -49,21 +51,22 @@ def add_notification( Args: s3_client (boto3.client) : s3 client to use for s3 event config - lambda_arn (str): Arn of the lambda s3 event config function + destination_type (str): String name of the destination type for the configuration + destination_arn (str): Arn of the destination's s3 event config bucket (str): bucket name of the s3 bucket to add the config to bucket_key_prefix (str): bucket key prefix for where to look for s3 object notifications """ s3_client.put_bucket_notification_configuration( Bucket=bucket, NotificationConfiguration={ - "LambdaFunctionConfigurations": [ + f"{destination_type}Configurations": [ { - "LambdaFunctionArn": lambda_arn, + f"{destination_type}Arn": destination_arn, "Events": ["s3:ObjectCreated:*"], "Filter": { "Key": { "FilterRules": [ - {"Name": "prefix", "Value": bucket_key_prefix} + {"Name": "prefix", "Value": f"{bucket_key_prefix}/"} ] } }, diff --git a/src/lambda_function/s3_event_config/template.yaml b/src/lambda_function/s3_event_config/template.yaml index 467d2754..e8487f51 100644 --- a/src/lambda_function/s3_event_config/template.yaml +++ b/src/lambda_function/s3_event_config/template.yaml @@ -11,9 +11,13 @@ Parameters: Description: >- The namespace string used for the bucket key prefix - S3ToGlueFunctionArn: + S3ToGlueDestinationArn: Type: String - Description: Arn for the S3 Event Config Lambda Function + Description: Arn for the S3 Event Config Destination + + S3ToGlueDestinationType: + Type: String + Description: The S3 Event Config Destination Type S3EventConfigRoleArn: Type: String @@ -43,7 +47,8 @@ Resources: Environment: Variables: S3_SOURCE_BUCKET_NAME: !Ref S3SourceBucketName - S3_TO_GLUE_FUNCTION_ARN: !Ref S3ToGlueFunctionArn + S3_TO_GLUE_DESTINATION_ARN: !Ref S3ToGlueDestinationArn + S3_TO_GLUE_DESTINATION_TYPE: !Ref S3ToGlueDestinationType BUCKET_KEY_PREFIX: !Ref Namespace Outputs: diff --git a/tests/test_s3_event_config_lambda.py b/tests/test_s3_event_config_lambda.py index 5ccd5e7f..cbe1032e 100644 --- a/tests/test_s3_event_config_lambda.py +++ b/tests/test_s3_event_config_lambda.py @@ -1,14 +1,14 @@ import zipfile import io import boto3 -from moto import mock_s3, mock_lambda, mock_iam, mock_logs +from moto import mock_s3, mock_lambda, mock_iam, mock_sqs import pytest from src.lambda_function.s3_event_config import app @pytest.fixture(scope="function") -def mock_iam_role(mock_aws_credentials): +def mock_iam_role(): with mock_iam(): iam = boto3.client("iam") yield iam.create_role( @@ -19,7 +19,7 @@ def mock_iam_role(mock_aws_credentials): @pytest.fixture(scope="function") -def mock_lambda_function(mock_aws_credentials, mock_iam_role): +def mock_lambda_function(mock_iam_role): with mock_lambda(): client = boto3.client("lambda") client.create_function( @@ -31,11 +31,23 @@ def mock_lambda_function(mock_aws_credentials, mock_iam_role): yield client.get_function(FunctionName="some_function") +@pytest.fixture(scope="function") +def mock_sqs_queue(mock_aws_credentials): + with mock_sqs(): + client = boto3.client("sqs") + client.create_queue(QueueName="test_sqs") + queue_url = client.get_queue_url(QueueName="test_sqs") + yield client.get_queue_attributes( + QueueUrl=queue_url["QueueUrl"], AttributeNames=["QueueArn"] + ) + + @mock_s3 -def test_that_add_notification_adds_expected_settings(s3, mock_lambda_function): +def test_that_add_notification_adds_expected_settings_for_lambda(s3, mock_lambda_function): s3.create_bucket(Bucket="some_bucket") set_config = app.add_notification( s3, + "LambdaFunction", mock_lambda_function["Configuration"]["FunctionArn"], "some_bucket", "test_folder", @@ -49,15 +61,16 @@ def test_that_add_notification_adds_expected_settings(s3, mock_lambda_function): "s3:ObjectCreated:*" ] assert get_config["LambdaFunctionConfigurations"][0]["Filter"] == { - "Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder"}]} + "Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]} } @mock_s3 -def test_that_delete_notification_is_successful(s3, mock_lambda_function): +def test_that_delete_notification_is_successful_for_lambda(s3, mock_lambda_function): s3.create_bucket(Bucket="some_bucket") app.add_notification( s3, + "LambdaFunction", mock_lambda_function["Configuration"]["FunctionArn"], "some_bucket", "test_folder", @@ -65,3 +78,41 @@ def test_that_delete_notification_is_successful(s3, mock_lambda_function): app.delete_notification(s3, "some_bucket") get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") assert "LambdaFunctionConfigurations" not in get_config + + +@mock_s3 +def test_that_add_notification_adds_expected_settings_for_sqs(s3, mock_sqs_queue): + s3.create_bucket(Bucket="some_bucket") + set_config = app.add_notification( + s3, + "Queue", + mock_sqs_queue['Attributes']['QueueArn'], + "some_bucket", + "test_folder", + ) + get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") + assert ( + get_config["QueueConfigurations"][0]["QueueArn"] + == mock_sqs_queue['Attributes']['QueueArn'] + ) + assert get_config["QueueConfigurations"][0]["Events"] == [ + "s3:ObjectCreated:*" + ] + assert get_config["QueueConfigurations"][0]["Filter"] == { + "Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]} + } + + +@mock_s3 +def test_that_delete_notification_is_successful_for_sqs(s3, mock_sqs_queue): + s3.create_bucket(Bucket="some_bucket") + app.add_notification( + s3, + "Queue", + mock_sqs_queue['Attributes']['QueueArn'], + "some_bucket", + "test_folder", + ) + app.delete_notification(s3, "some_bucket") + get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") + assert "QueueConfigurations" not in get_config From db1b247a44f744fe6def2f63776bb7f9549033c7 Mon Sep 17 00:00:00 2001 From: Rixing Xu Date: Wed, 6 Sep 2023 16:27:00 -0700 Subject: [PATCH 4/5] add missing aws service param and change destination arn to sqs queue --- config/develop/namespaced/s3-event-config-lambda.yaml | 2 +- templates/sqs-queue.yaml | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/config/develop/namespaced/s3-event-config-lambda.yaml b/config/develop/namespaced/s3-event-config-lambda.yaml index ad6442e0..d8950bd8 100644 --- a/config/develop/namespaced/s3-event-config-lambda.yaml +++ b/config/develop/namespaced/s3-event-config-lambda.yaml @@ -10,7 +10,7 @@ stack_name: '{{ stack_group_config.namespace }}-lambda-S3EventConfig' stack_tags: {{ stack_group_config.default_stack_tags }} parameters: Namespace: {{ stack_group_config.namespace }} - S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-S3ToGlue::S3ToGlueFunctionArn" + S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-S3ToLambda::PrimaryQueueArn" S3ToGlueDestinationType: "Queue" S3EventConfigRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-event-config-lambda-role::RoleArn" S3SourceBucketName: {{ stack_group_config.input_bucket_name }} diff --git a/templates/sqs-queue.yaml b/templates/sqs-queue.yaml index d0b5f576..5081e001 100644 --- a/templates/sqs-queue.yaml +++ b/templates/sqs-queue.yaml @@ -46,6 +46,7 @@ Resources: - Sid: Send_Permission Effect: Allow Principal: + Service: s3.amazonaws.com AWS: !Sub '${AWS::AccountId}' Action: - SQS:SendMessage From ff4fbc87e7585748cac7b284bb82ad2a5cad56ab Mon Sep 17 00:00:00 2001 From: Rixing Xu Date: Wed, 6 Sep 2023 16:34:10 -0700 Subject: [PATCH 5/5] add allowed values, add prod ver of sqs queue and event conifg lambda --- config/prod/namespaced/s3-event-config-lambda.yaml | 3 ++- config/prod/namespaced/sqs-queue.yaml | 13 +++++++++++++ src/lambda_function/s3_event_config/template.yaml | 3 +++ 3 files changed, 18 insertions(+), 1 deletion(-) create mode 100644 config/prod/namespaced/sqs-queue.yaml diff --git a/config/prod/namespaced/s3-event-config-lambda.yaml b/config/prod/namespaced/s3-event-config-lambda.yaml index b560afe0..4ed2983b 100644 --- a/config/prod/namespaced/s3-event-config-lambda.yaml +++ b/config/prod/namespaced/s3-event-config-lambda.yaml @@ -10,6 +10,7 @@ stack_name: '{{ stack_group_config.namespace }}-lambda-S3EventConfig' stack_tags: {{ stack_group_config.default_stack_tags }} parameters: Namespace: {{ stack_group_config.namespace }} - S3ToGlueFunctionArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-S3ToGlue::S3ToGlueFunctionArn" + S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-S3ToLambda::PrimaryQueueArn" + S3ToGlueDestinationType: "Queue" S3EventConfigRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-event-config-lambda-role::RoleArn" S3SourceBucketName: {{ stack_group_config.input_bucket_name }} diff --git a/config/prod/namespaced/sqs-queue.yaml b/config/prod/namespaced/sqs-queue.yaml new file mode 100644 index 00000000..8c31998a --- /dev/null +++ b/config/prod/namespaced/sqs-queue.yaml @@ -0,0 +1,13 @@ +template: + path: sqs-queue.yaml +parameters: + MessageRetentionPeriod: '86400' + ReceiveMessageWaitTimeSeconds: '20' + VisibilityTimeout: '120' + S3SourceBucketArn: !stack_output_external recover-input-bucket::BucketArn +dependencies: + - prod/namespaced/s3-to-glue-lambda.yaml + - prod/s3-input-bucket.yaml +stack_name: '{{ stack_group_config.namespace }}-sqs-S3ToLambda' +stack_tags: + {{ stack_group_config.default_stack_tags }} diff --git a/src/lambda_function/s3_event_config/template.yaml b/src/lambda_function/s3_event_config/template.yaml index e8487f51..c73d2888 100644 --- a/src/lambda_function/s3_event_config/template.yaml +++ b/src/lambda_function/s3_event_config/template.yaml @@ -18,6 +18,9 @@ Parameters: S3ToGlueDestinationType: Type: String Description: The S3 Event Config Destination Type + AllowedValues: + - "Queue" + - "LambdaFunction" S3EventConfigRoleArn: Type: String