diff --git a/config/develop/namespaced/s3-event-config-lambda.yaml b/config/develop/namespaced/s3-event-config-lambda.yaml index 98f29daf..d8950bd8 100644 --- a/config/develop/namespaced/s3-event-config-lambda.yaml +++ b/config/develop/namespaced/s3-event-config-lambda.yaml @@ -10,6 +10,7 @@ stack_name: '{{ stack_group_config.namespace }}-lambda-S3EventConfig' stack_tags: {{ stack_group_config.default_stack_tags }} parameters: Namespace: {{ stack_group_config.namespace }} - S3ToGlueFunctionArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-S3ToGlue::S3ToGlueFunctionArn" + S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-S3ToLambda::PrimaryQueueArn" + S3ToGlueDestinationType: "Queue" S3EventConfigRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-event-config-lambda-role::RoleArn" S3SourceBucketName: {{ stack_group_config.input_bucket_name }} diff --git a/config/develop/namespaced/sqs-queue.yaml b/config/develop/namespaced/sqs-queue.yaml new file mode 100644 index 00000000..838902b5 --- /dev/null +++ b/config/develop/namespaced/sqs-queue.yaml @@ -0,0 +1,13 @@ +template: + path: sqs-queue.yaml +parameters: + MessageRetentionPeriod: '86400' + ReceiveMessageWaitTimeSeconds: '20' + VisibilityTimeout: '120' + S3SourceBucketArn: !stack_output_external recover-dev-input-bucket::BucketArn +dependencies: + - develop/namespaced/s3-to-glue-lambda.yaml + - develop/s3-input-bucket.yaml +stack_name: '{{ stack_group_config.namespace }}-sqs-S3ToLambda' +stack_tags: + {{ stack_group_config.default_stack_tags }} diff --git a/config/prod/namespaced/s3-event-config-lambda.yaml b/config/prod/namespaced/s3-event-config-lambda.yaml index b560afe0..4ed2983b 100644 --- a/config/prod/namespaced/s3-event-config-lambda.yaml +++ b/config/prod/namespaced/s3-event-config-lambda.yaml @@ -10,6 +10,7 @@ stack_name: '{{ stack_group_config.namespace }}-lambda-S3EventConfig' stack_tags: {{ stack_group_config.default_stack_tags }} parameters: Namespace: {{ stack_group_config.namespace }} - S3ToGlueFunctionArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-S3ToGlue::S3ToGlueFunctionArn" + S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-S3ToLambda::PrimaryQueueArn" + S3ToGlueDestinationType: "Queue" S3EventConfigRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-event-config-lambda-role::RoleArn" S3SourceBucketName: {{ stack_group_config.input_bucket_name }} diff --git a/config/prod/namespaced/sqs-queue.yaml b/config/prod/namespaced/sqs-queue.yaml new file mode 100644 index 00000000..8c31998a --- /dev/null +++ b/config/prod/namespaced/sqs-queue.yaml @@ -0,0 +1,13 @@ +template: + path: sqs-queue.yaml +parameters: + MessageRetentionPeriod: '86400' + ReceiveMessageWaitTimeSeconds: '20' + VisibilityTimeout: '120' + S3SourceBucketArn: !stack_output_external recover-input-bucket::BucketArn +dependencies: + - prod/namespaced/s3-to-glue-lambda.yaml + - prod/s3-input-bucket.yaml +stack_name: '{{ stack_group_config.namespace }}-sqs-S3ToLambda' +stack_tags: + {{ stack_group_config.default_stack_tags }} diff --git a/src/lambda_function/s3_event_config/README.md b/src/lambda_function/s3_event_config/README.md index ccef795e..ca748f7d 100644 --- a/src/lambda_function/s3_event_config/README.md +++ b/src/lambda_function/s3_event_config/README.md @@ -4,9 +4,14 @@ The s3_event_config lambda is triggered by a github action during deployment or It will then put a S3 event notification configuration into the input data bucket which allows the input data bucket to -trigger the S3 to JSON lambda with S3 new object notifications whenever new objects are added +trigger a specific destination type with S3 new object notifications whenever new objects are added to it and eventually lead to the start of the S3-to-JSON workflow. +Currently **only** the following destination types are supported: + +- Lambda Function +- SQS queue + ## Event format The events that will trigger the s3-event-config-lambda @@ -23,6 +28,8 @@ Where the allowed RequestType values are: - "Update" - "Delete" +You can test the lambda by going to the AWS console for the lambda function, pasting the above sample event in and triggering the function. Any updates should then be visible in the input bucket's event config to confirm it was successful. + ## Launching Lambda stack in AWS There are two main stacks involved in the s3_event_config lambda. They are the diff --git a/src/lambda_function/s3_event_config/app.py b/src/lambda_function/s3_event_config/app.py index 5fb83ea4..307b2ce2 100644 --- a/src/lambda_function/s3_event_config/app.py +++ b/src/lambda_function/s3_event_config/app.py @@ -29,7 +29,8 @@ def lambda_handler(event, context): logger.info(f'Request Type: {event["RequestType"]}') add_notification( s3, - lambda_arn=os.environ["S3_TO_GLUE_FUNCTION_ARN"], + destination_arn=os.environ["S3_TO_GLUE_DESTINATION_ARN"], + destination_type=os.environ["S3_TO_GLUE_DESTINATION_TYPE"], bucket=os.environ["S3_SOURCE_BUCKET_NAME"], bucket_key_prefix=os.environ["BUCKET_KEY_PREFIX"], ) @@ -41,7 +42,8 @@ def lambda_handler(event, context): def add_notification( s3_client: boto3.client, - lambda_arn: str, + destination_type : str, + destination_arn: str, bucket: str, bucket_key_prefix: str, ): @@ -49,21 +51,22 @@ def add_notification( Args: s3_client (boto3.client) : s3 client to use for s3 event config - lambda_arn (str): Arn of the lambda s3 event config function + destination_type (str): String name of the destination type for the configuration + destination_arn (str): Arn of the destination's s3 event config bucket (str): bucket name of the s3 bucket to add the config to bucket_key_prefix (str): bucket key prefix for where to look for s3 object notifications """ s3_client.put_bucket_notification_configuration( Bucket=bucket, NotificationConfiguration={ - "LambdaFunctionConfigurations": [ + f"{destination_type}Configurations": [ { - "LambdaFunctionArn": lambda_arn, + f"{destination_type}Arn": destination_arn, "Events": ["s3:ObjectCreated:*"], "Filter": { "Key": { "FilterRules": [ - {"Name": "prefix", "Value": bucket_key_prefix} + {"Name": "prefix", "Value": f"{bucket_key_prefix}/"} ] } }, diff --git a/src/lambda_function/s3_event_config/template.yaml b/src/lambda_function/s3_event_config/template.yaml index 467d2754..c73d2888 100644 --- a/src/lambda_function/s3_event_config/template.yaml +++ b/src/lambda_function/s3_event_config/template.yaml @@ -11,9 +11,16 @@ Parameters: Description: >- The namespace string used for the bucket key prefix - S3ToGlueFunctionArn: + S3ToGlueDestinationArn: Type: String - Description: Arn for the S3 Event Config Lambda Function + Description: Arn for the S3 Event Config Destination + + S3ToGlueDestinationType: + Type: String + Description: The S3 Event Config Destination Type + AllowedValues: + - "Queue" + - "LambdaFunction" S3EventConfigRoleArn: Type: String @@ -43,7 +50,8 @@ Resources: Environment: Variables: S3_SOURCE_BUCKET_NAME: !Ref S3SourceBucketName - S3_TO_GLUE_FUNCTION_ARN: !Ref S3ToGlueFunctionArn + S3_TO_GLUE_DESTINATION_ARN: !Ref S3ToGlueDestinationArn + S3_TO_GLUE_DESTINATION_TYPE: !Ref S3ToGlueDestinationType BUCKET_KEY_PREFIX: !Ref Namespace Outputs: diff --git a/templates/sqs-queue.yaml b/templates/sqs-queue.yaml new file mode 100644 index 00000000..5081e001 --- /dev/null +++ b/templates/sqs-queue.yaml @@ -0,0 +1,70 @@ +AWSTemplateFormatVersion: '2010-09-09' + +Description: > + Creates an SQS queue that gets S3 notifications + +Parameters: + + MessageRetentionPeriod: + Type: Number + Default: 1209600 + Description: How long to retain messages in the primary queue. + + ReceiveMessageWaitTimeSeconds: + Type: Number + Default: 20 + Description: The delay between SQS receiving a message and making it available for others to poll + + VisibilityTimeout: + Type: Number + Default: 120 + Description: >- + How long our lambda has to submit the messages to Glue and + delete the message from the SQS queue + + S3SourceBucketArn: + Type: String + Description: Arn of the S3 bucket where source data are stored. + +Resources: + + PrimaryQueue: + Type: AWS::SQS::Queue + Properties: + DelaySeconds: 0 + MessageRetentionPeriod: !Ref MessageRetentionPeriod + QueueName: !Sub '${AWS::StackName}-Queue' + ReceiveMessageWaitTimeSeconds: !Ref ReceiveMessageWaitTimeSeconds + VisibilityTimeout: !Ref VisibilityTimeout + + PrimaryQueuePolicy: + Type: AWS::SQS::QueuePolicy + Properties: + PolicyDocument: + Version: '2012-10-17' + Statement: + - Sid: Send_Permission + Effect: Allow + Principal: + Service: s3.amazonaws.com + AWS: !Sub '${AWS::AccountId}' + Action: + - SQS:SendMessage + Resource: !GetAtt PrimaryQueue.Arn + Condition: + ArnLike: + "aws:SourceArn": !Ref S3SourceBucketArn + Queues: + - !Ref PrimaryQueue + +Outputs: + + PrimaryQueueArn: + Value: !GetAtt PrimaryQueue.Arn + Export: + Name: !Sub '${AWS::Region}-${AWS::StackName}-PrimaryQueueArn' + + PrimaryQueueUrl: + Value: !Ref PrimaryQueue + Export: + Name: !Sub '${AWS::Region}-${AWS::StackName}-PrimaryQueueUrl' diff --git a/tests/test_s3_event_config_lambda.py b/tests/test_s3_event_config_lambda.py index 5ccd5e7f..cbe1032e 100644 --- a/tests/test_s3_event_config_lambda.py +++ b/tests/test_s3_event_config_lambda.py @@ -1,14 +1,14 @@ import zipfile import io import boto3 -from moto import mock_s3, mock_lambda, mock_iam, mock_logs +from moto import mock_s3, mock_lambda, mock_iam, mock_sqs import pytest from src.lambda_function.s3_event_config import app @pytest.fixture(scope="function") -def mock_iam_role(mock_aws_credentials): +def mock_iam_role(): with mock_iam(): iam = boto3.client("iam") yield iam.create_role( @@ -19,7 +19,7 @@ def mock_iam_role(mock_aws_credentials): @pytest.fixture(scope="function") -def mock_lambda_function(mock_aws_credentials, mock_iam_role): +def mock_lambda_function(mock_iam_role): with mock_lambda(): client = boto3.client("lambda") client.create_function( @@ -31,11 +31,23 @@ def mock_lambda_function(mock_aws_credentials, mock_iam_role): yield client.get_function(FunctionName="some_function") +@pytest.fixture(scope="function") +def mock_sqs_queue(mock_aws_credentials): + with mock_sqs(): + client = boto3.client("sqs") + client.create_queue(QueueName="test_sqs") + queue_url = client.get_queue_url(QueueName="test_sqs") + yield client.get_queue_attributes( + QueueUrl=queue_url["QueueUrl"], AttributeNames=["QueueArn"] + ) + + @mock_s3 -def test_that_add_notification_adds_expected_settings(s3, mock_lambda_function): +def test_that_add_notification_adds_expected_settings_for_lambda(s3, mock_lambda_function): s3.create_bucket(Bucket="some_bucket") set_config = app.add_notification( s3, + "LambdaFunction", mock_lambda_function["Configuration"]["FunctionArn"], "some_bucket", "test_folder", @@ -49,15 +61,16 @@ def test_that_add_notification_adds_expected_settings(s3, mock_lambda_function): "s3:ObjectCreated:*" ] assert get_config["LambdaFunctionConfigurations"][0]["Filter"] == { - "Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder"}]} + "Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]} } @mock_s3 -def test_that_delete_notification_is_successful(s3, mock_lambda_function): +def test_that_delete_notification_is_successful_for_lambda(s3, mock_lambda_function): s3.create_bucket(Bucket="some_bucket") app.add_notification( s3, + "LambdaFunction", mock_lambda_function["Configuration"]["FunctionArn"], "some_bucket", "test_folder", @@ -65,3 +78,41 @@ def test_that_delete_notification_is_successful(s3, mock_lambda_function): app.delete_notification(s3, "some_bucket") get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") assert "LambdaFunctionConfigurations" not in get_config + + +@mock_s3 +def test_that_add_notification_adds_expected_settings_for_sqs(s3, mock_sqs_queue): + s3.create_bucket(Bucket="some_bucket") + set_config = app.add_notification( + s3, + "Queue", + mock_sqs_queue['Attributes']['QueueArn'], + "some_bucket", + "test_folder", + ) + get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") + assert ( + get_config["QueueConfigurations"][0]["QueueArn"] + == mock_sqs_queue['Attributes']['QueueArn'] + ) + assert get_config["QueueConfigurations"][0]["Events"] == [ + "s3:ObjectCreated:*" + ] + assert get_config["QueueConfigurations"][0]["Filter"] == { + "Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]} + } + + +@mock_s3 +def test_that_delete_notification_is_successful_for_sqs(s3, mock_sqs_queue): + s3.create_bucket(Bucket="some_bucket") + app.add_notification( + s3, + "Queue", + mock_sqs_queue['Attributes']['QueueArn'], + "some_bucket", + "test_folder", + ) + app.delete_notification(s3, "some_bucket") + get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") + assert "QueueConfigurations" not in get_config