Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ETL-535] Set up SQS to receive S3 event notification #75

Merged
merged 5 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion config/develop/namespaced/s3-event-config-lambda.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ stack_name: '{{ stack_group_config.namespace }}-lambda-S3EventConfig'
stack_tags: {{ stack_group_config.default_stack_tags }}
parameters:
Namespace: {{ stack_group_config.namespace }}
S3ToGlueFunctionArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-S3ToGlue::S3ToGlueFunctionArn"
S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-S3ToGlue::S3ToGlueFunctionArn"
S3ToGlueDestinationType: "Queue"
S3EventConfigRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-event-config-lambda-role::RoleArn"
S3SourceBucketName: {{ stack_group_config.input_bucket_name }}
13 changes: 13 additions & 0 deletions config/develop/namespaced/sqs-queue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
template:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want an analogous config file in config/prod?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise for the s3-event-config-lambda

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess when I originally created this PR, I wasn't sure if we were going to push this right away or leave it in dev branch to avoid messing with production. Based on our recent discussion, it seems like we should be okay to also have it in prod so will do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. It will exist independently of everything else for now. We do modify the behavior of the event config lambda, but we aren't using it right now so it shouldn't matter.

path: sqs-queue.yaml
parameters:
MessageRetentionPeriod: '86400'
ReceiveMessageWaitTimeSeconds: '20'
VisibilityTimeout: '120'
S3SourceBucketArn: !stack_output_external recover-dev-input-bucket::BucketArn
dependencies:
- develop/namespaced/s3-to-glue-lambda.yaml
- develop/s3-input-bucket.yaml
stack_name: '{{ stack_group_config.namespace }}-sqs-S3ToLambda'
stack_tags:
{{ stack_group_config.default_stack_tags }}
9 changes: 8 additions & 1 deletion src/lambda_function/s3_event_config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@ The s3_event_config lambda is triggered by a github action during deployment or

It will then put a S3 event notification configuration into
the input data bucket which allows the input data bucket to
trigger the S3 to JSON lambda with S3 new object notifications whenever new objects are added
trigger a specific destination type with S3 new object notifications whenever new objects are added
to it and eventually lead to the start of the S3-to-JSON workflow.

Currently **only** the following destination types are supported:

- Lambda Function
- SQS queue

## Event format

The events that will trigger the s3-event-config-lambda
Expand All @@ -23,6 +28,8 @@ Where the allowed RequestType values are:
- "Update"
- "Delete"

You can test the lambda by going to the AWS console for the lambda function, pasting the above sample event in and triggering the function. Any updates should then be visible in the input bucket's event config to confirm it was successful.

## Launching Lambda stack in AWS

There are two main stacks involved in the s3_event_config lambda. They are the
Expand Down
15 changes: 9 additions & 6 deletions src/lambda_function/s3_event_config/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def lambda_handler(event, context):
logger.info(f'Request Type: {event["RequestType"]}')
add_notification(
s3,
lambda_arn=os.environ["S3_TO_GLUE_FUNCTION_ARN"],
destination_arn=os.environ["S3_TO_GLUE_DESTINATION_ARN"],
destination_type=os.environ["S3_TO_GLUE_DESTINATION_TYPE"],
bucket=os.environ["S3_SOURCE_BUCKET_NAME"],
bucket_key_prefix=os.environ["BUCKET_KEY_PREFIX"],
)
Expand All @@ -41,29 +42,31 @@ def lambda_handler(event, context):

def add_notification(
s3_client: boto3.client,
lambda_arn: str,
destination_type : str,
destination_arn: str,
bucket: str,
bucket_key_prefix: str,
):
"""Adds the S3 notification configuration to an existing bucket

Args:
s3_client (boto3.client) : s3 client to use for s3 event config
lambda_arn (str): Arn of the lambda s3 event config function
destination_type (str): String name of the destination type for the configuration
destination_arn (str): Arn of the destination's s3 event config
bucket (str): bucket name of the s3 bucket to add the config to
bucket_key_prefix (str): bucket key prefix for where to look for s3 object notifications
"""
s3_client.put_bucket_notification_configuration(
Bucket=bucket,
NotificationConfiguration={
"LambdaFunctionConfigurations": [
f"{destination_type}Configurations": [
{
"LambdaFunctionArn": lambda_arn,
f"{destination_type}Arn": destination_arn,
"Events": ["s3:ObjectCreated:*"],
"Filter": {
"Key": {
"FilterRules": [
{"Name": "prefix", "Value": bucket_key_prefix}
{"Name": "prefix", "Value": f"{bucket_key_prefix}/"}
]
}
},
Expand Down
11 changes: 8 additions & 3 deletions src/lambda_function/s3_event_config/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@ Parameters:
Description: >-
The namespace string used for the bucket key prefix

S3ToGlueFunctionArn:
S3ToGlueDestinationArn:
Type: String
Description: Arn for the S3 Event Config Lambda Function
Description: Arn for the S3 Event Config Destination

S3ToGlueDestinationType:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add an AllowedValues property to this.

Type: String
Description: The S3 Event Config Destination Type

S3EventConfigRoleArn:
Type: String
Expand Down Expand Up @@ -43,7 +47,8 @@ Resources:
Environment:
Variables:
S3_SOURCE_BUCKET_NAME: !Ref S3SourceBucketName
S3_TO_GLUE_FUNCTION_ARN: !Ref S3ToGlueFunctionArn
S3_TO_GLUE_DESTINATION_ARN: !Ref S3ToGlueDestinationArn
S3_TO_GLUE_DESTINATION_TYPE: !Ref S3ToGlueDestinationType
BUCKET_KEY_PREFIX: !Ref Namespace

Outputs:
Expand Down
69 changes: 69 additions & 0 deletions templates/sqs-queue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
AWSTemplateFormatVersion: '2010-09-09'

Description: >
Creates an SQS queue that gets S3 notifications

Parameters:

MessageRetentionPeriod:
Type: Number
Default: 1209600
Description: How long to retain messages in the primary queue.

ReceiveMessageWaitTimeSeconds:
Type: Number
Default: 20
Description: The delay between SQS receiving a message and making it available for others to poll

VisibilityTimeout:
Type: Number
Default: 120
Description: >-
How long our lambda has to submit the messages to Glue and
delete the message from the SQS queue

S3SourceBucketArn:
Type: String
Description: Arn of the S3 bucket where source data are stored.

Resources:

PrimaryQueue:
Type: AWS::SQS::Queue
Properties:
DelaySeconds: 0
MessageRetentionPeriod: !Ref MessageRetentionPeriod
QueueName: !Sub '${AWS::StackName}-Queue'
ReceiveMessageWaitTimeSeconds: !Ref ReceiveMessageWaitTimeSeconds
VisibilityTimeout: !Ref VisibilityTimeout

PrimaryQueuePolicy:
Type: AWS::SQS::QueuePolicy
Properties:
PolicyDocument:
Version: '2012-10-17'
Statement:
- Sid: Send_Permission
Effect: Allow
Principal:
AWS: !Sub '${AWS::AccountId}'
Action:
- SQS:SendMessage
Resource: !GetAtt PrimaryQueue.Arn
Condition:
ArnLike:
"aws:SourceArn": !Ref S3SourceBucketArn
Queues:
- !Ref PrimaryQueue

Outputs:

PrimaryQueueArn:
Value: !GetAtt PrimaryQueue.Arn
Export:
Name: !Sub '${AWS::Region}-${AWS::StackName}-PrimaryQueueArn'

PrimaryQueueUrl:
Value: !Ref PrimaryQueue
Export:
Name: !Sub '${AWS::Region}-${AWS::StackName}-PrimaryQueueUrl'
63 changes: 57 additions & 6 deletions tests/test_s3_event_config_lambda.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import zipfile
import io
import boto3
from moto import mock_s3, mock_lambda, mock_iam, mock_logs
from moto import mock_s3, mock_lambda, mock_iam, mock_sqs
import pytest

from src.lambda_function.s3_event_config import app


@pytest.fixture(scope="function")
def mock_iam_role(mock_aws_credentials):
def mock_iam_role():
with mock_iam():
iam = boto3.client("iam")
yield iam.create_role(
Expand All @@ -19,7 +19,7 @@ def mock_iam_role(mock_aws_credentials):


@pytest.fixture(scope="function")
def mock_lambda_function(mock_aws_credentials, mock_iam_role):
def mock_lambda_function(mock_iam_role):
with mock_lambda():
client = boto3.client("lambda")
client.create_function(
Expand All @@ -31,11 +31,23 @@ def mock_lambda_function(mock_aws_credentials, mock_iam_role):
yield client.get_function(FunctionName="some_function")


@pytest.fixture(scope="function")
def mock_sqs_queue(mock_aws_credentials):
with mock_sqs():
client = boto3.client("sqs")
client.create_queue(QueueName="test_sqs")
queue_url = client.get_queue_url(QueueName="test_sqs")
yield client.get_queue_attributes(
QueueUrl=queue_url["QueueUrl"], AttributeNames=["QueueArn"]
)


@mock_s3
def test_that_add_notification_adds_expected_settings(s3, mock_lambda_function):
def test_that_add_notification_adds_expected_settings_for_lambda(s3, mock_lambda_function):
s3.create_bucket(Bucket="some_bucket")
set_config = app.add_notification(
s3,
"LambdaFunction",
mock_lambda_function["Configuration"]["FunctionArn"],
"some_bucket",
"test_folder",
Expand All @@ -49,19 +61,58 @@ def test_that_add_notification_adds_expected_settings(s3, mock_lambda_function):
"s3:ObjectCreated:*"
]
assert get_config["LambdaFunctionConfigurations"][0]["Filter"] == {
"Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder"}]}
"Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]}
}


@mock_s3
def test_that_delete_notification_is_successful(s3, mock_lambda_function):
def test_that_delete_notification_is_successful_for_lambda(s3, mock_lambda_function):
s3.create_bucket(Bucket="some_bucket")
app.add_notification(
s3,
"LambdaFunction",
mock_lambda_function["Configuration"]["FunctionArn"],
"some_bucket",
"test_folder",
)
app.delete_notification(s3, "some_bucket")
get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket")
assert "LambdaFunctionConfigurations" not in get_config


@mock_s3
def test_that_add_notification_adds_expected_settings_for_sqs(s3, mock_sqs_queue):
s3.create_bucket(Bucket="some_bucket")
set_config = app.add_notification(
s3,
"Queue",
mock_sqs_queue['Attributes']['QueueArn'],
"some_bucket",
"test_folder",
)
get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket")
assert (
get_config["QueueConfigurations"][0]["QueueArn"]
== mock_sqs_queue['Attributes']['QueueArn']
)
assert get_config["QueueConfigurations"][0]["Events"] == [
"s3:ObjectCreated:*"
]
assert get_config["QueueConfigurations"][0]["Filter"] == {
"Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]}
}


@mock_s3
def test_that_delete_notification_is_successful_for_sqs(s3, mock_sqs_queue):
s3.create_bucket(Bucket="some_bucket")
app.add_notification(
s3,
"Queue",
mock_sqs_queue['Attributes']['QueueArn'],
"some_bucket",
"test_folder",
)
app.delete_notification(s3, "some_bucket")
get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket")
assert "QueueConfigurations" not in get_config
Loading