Skip to content

Commit

Permalink
[ETL-535] Set up SQS to receive S3 event notification (#75)
Browse files Browse the repository at this point in the history
* add initial sqs queue setup
* add changes to event config lambda to support sending to SQS queues
* add missing aws service param and change destination arn to sqs queue
* add allowed values, add prod ver of sqs queue and event conifg lambda

---------

Co-authored-by: Rixing Xu <[email protected]>
Co-authored-by: Rixing Xu <[email protected]>
  • Loading branch information
3 people authored Sep 7, 2023
1 parent 03abdda commit f729901
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 18 deletions.
3 changes: 2 additions & 1 deletion config/develop/namespaced/s3-event-config-lambda.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ stack_name: '{{ stack_group_config.namespace }}-lambda-S3EventConfig'
stack_tags: {{ stack_group_config.default_stack_tags }}
parameters:
Namespace: {{ stack_group_config.namespace }}
S3ToGlueFunctionArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-S3ToGlue::S3ToGlueFunctionArn"
S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-S3ToLambda::PrimaryQueueArn"
S3ToGlueDestinationType: "Queue"
S3EventConfigRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-event-config-lambda-role::RoleArn"
S3SourceBucketName: {{ stack_group_config.input_bucket_name }}
13 changes: 13 additions & 0 deletions config/develop/namespaced/sqs-queue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
template:
path: sqs-queue.yaml
parameters:
MessageRetentionPeriod: '86400'
ReceiveMessageWaitTimeSeconds: '20'
VisibilityTimeout: '120'
S3SourceBucketArn: !stack_output_external recover-dev-input-bucket::BucketArn
dependencies:
- develop/namespaced/s3-to-glue-lambda.yaml
- develop/s3-input-bucket.yaml
stack_name: '{{ stack_group_config.namespace }}-sqs-S3ToLambda'
stack_tags:
{{ stack_group_config.default_stack_tags }}
3 changes: 2 additions & 1 deletion config/prod/namespaced/s3-event-config-lambda.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ stack_name: '{{ stack_group_config.namespace }}-lambda-S3EventConfig'
stack_tags: {{ stack_group_config.default_stack_tags }}
parameters:
Namespace: {{ stack_group_config.namespace }}
S3ToGlueFunctionArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-S3ToGlue::S3ToGlueFunctionArn"
S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-S3ToLambda::PrimaryQueueArn"
S3ToGlueDestinationType: "Queue"
S3EventConfigRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-event-config-lambda-role::RoleArn"
S3SourceBucketName: {{ stack_group_config.input_bucket_name }}
13 changes: 13 additions & 0 deletions config/prod/namespaced/sqs-queue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
template:
path: sqs-queue.yaml
parameters:
MessageRetentionPeriod: '86400'
ReceiveMessageWaitTimeSeconds: '20'
VisibilityTimeout: '120'
S3SourceBucketArn: !stack_output_external recover-input-bucket::BucketArn
dependencies:
- prod/namespaced/s3-to-glue-lambda.yaml
- prod/s3-input-bucket.yaml
stack_name: '{{ stack_group_config.namespace }}-sqs-S3ToLambda'
stack_tags:
{{ stack_group_config.default_stack_tags }}
9 changes: 8 additions & 1 deletion src/lambda_function/s3_event_config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@ The s3_event_config lambda is triggered by a github action during deployment or

It will then put a S3 event notification configuration into
the input data bucket which allows the input data bucket to
trigger the S3 to JSON lambda with S3 new object notifications whenever new objects are added
trigger a specific destination type with S3 new object notifications whenever new objects are added
to it and eventually lead to the start of the S3-to-JSON workflow.

Currently **only** the following destination types are supported:

- Lambda Function
- SQS queue

## Event format

The events that will trigger the s3-event-config-lambda
Expand All @@ -23,6 +28,8 @@ Where the allowed RequestType values are:
- "Update"
- "Delete"

You can test the lambda by going to the AWS console for the lambda function, pasting the above sample event in and triggering the function. Any updates should then be visible in the input bucket's event config to confirm it was successful.

## Launching Lambda stack in AWS

There are two main stacks involved in the s3_event_config lambda. They are the
Expand Down
15 changes: 9 additions & 6 deletions src/lambda_function/s3_event_config/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def lambda_handler(event, context):
logger.info(f'Request Type: {event["RequestType"]}')
add_notification(
s3,
lambda_arn=os.environ["S3_TO_GLUE_FUNCTION_ARN"],
destination_arn=os.environ["S3_TO_GLUE_DESTINATION_ARN"],
destination_type=os.environ["S3_TO_GLUE_DESTINATION_TYPE"],
bucket=os.environ["S3_SOURCE_BUCKET_NAME"],
bucket_key_prefix=os.environ["BUCKET_KEY_PREFIX"],
)
Expand All @@ -41,29 +42,31 @@ def lambda_handler(event, context):

def add_notification(
s3_client: boto3.client,
lambda_arn: str,
destination_type : str,
destination_arn: str,
bucket: str,
bucket_key_prefix: str,
):
"""Adds the S3 notification configuration to an existing bucket
Args:
s3_client (boto3.client) : s3 client to use for s3 event config
lambda_arn (str): Arn of the lambda s3 event config function
destination_type (str): String name of the destination type for the configuration
destination_arn (str): Arn of the destination's s3 event config
bucket (str): bucket name of the s3 bucket to add the config to
bucket_key_prefix (str): bucket key prefix for where to look for s3 object notifications
"""
s3_client.put_bucket_notification_configuration(
Bucket=bucket,
NotificationConfiguration={
"LambdaFunctionConfigurations": [
f"{destination_type}Configurations": [
{
"LambdaFunctionArn": lambda_arn,
f"{destination_type}Arn": destination_arn,
"Events": ["s3:ObjectCreated:*"],
"Filter": {
"Key": {
"FilterRules": [
{"Name": "prefix", "Value": bucket_key_prefix}
{"Name": "prefix", "Value": f"{bucket_key_prefix}/"}
]
}
},
Expand Down
14 changes: 11 additions & 3 deletions src/lambda_function/s3_event_config/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,16 @@ Parameters:
Description: >-
The namespace string used for the bucket key prefix
S3ToGlueFunctionArn:
S3ToGlueDestinationArn:
Type: String
Description: Arn for the S3 Event Config Lambda Function
Description: Arn for the S3 Event Config Destination

S3ToGlueDestinationType:
Type: String
Description: The S3 Event Config Destination Type
AllowedValues:
- "Queue"
- "LambdaFunction"

S3EventConfigRoleArn:
Type: String
Expand Down Expand Up @@ -43,7 +50,8 @@ Resources:
Environment:
Variables:
S3_SOURCE_BUCKET_NAME: !Ref S3SourceBucketName
S3_TO_GLUE_FUNCTION_ARN: !Ref S3ToGlueFunctionArn
S3_TO_GLUE_DESTINATION_ARN: !Ref S3ToGlueDestinationArn
S3_TO_GLUE_DESTINATION_TYPE: !Ref S3ToGlueDestinationType
BUCKET_KEY_PREFIX: !Ref Namespace

Outputs:
Expand Down
70 changes: 70 additions & 0 deletions templates/sqs-queue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
AWSTemplateFormatVersion: '2010-09-09'

Description: >
Creates an SQS queue that gets S3 notifications
Parameters:

MessageRetentionPeriod:
Type: Number
Default: 1209600
Description: How long to retain messages in the primary queue.

ReceiveMessageWaitTimeSeconds:
Type: Number
Default: 20
Description: The delay between SQS receiving a message and making it available for others to poll

VisibilityTimeout:
Type: Number
Default: 120
Description: >-
How long our lambda has to submit the messages to Glue and
delete the message from the SQS queue
S3SourceBucketArn:
Type: String
Description: Arn of the S3 bucket where source data are stored.

Resources:

PrimaryQueue:
Type: AWS::SQS::Queue
Properties:
DelaySeconds: 0
MessageRetentionPeriod: !Ref MessageRetentionPeriod
QueueName: !Sub '${AWS::StackName}-Queue'
ReceiveMessageWaitTimeSeconds: !Ref ReceiveMessageWaitTimeSeconds
VisibilityTimeout: !Ref VisibilityTimeout

PrimaryQueuePolicy:
Type: AWS::SQS::QueuePolicy
Properties:
PolicyDocument:
Version: '2012-10-17'
Statement:
- Sid: Send_Permission
Effect: Allow
Principal:
Service: s3.amazonaws.com
AWS: !Sub '${AWS::AccountId}'
Action:
- SQS:SendMessage
Resource: !GetAtt PrimaryQueue.Arn
Condition:
ArnLike:
"aws:SourceArn": !Ref S3SourceBucketArn
Queues:
- !Ref PrimaryQueue

Outputs:

PrimaryQueueArn:
Value: !GetAtt PrimaryQueue.Arn
Export:
Name: !Sub '${AWS::Region}-${AWS::StackName}-PrimaryQueueArn'

PrimaryQueueUrl:
Value: !Ref PrimaryQueue
Export:
Name: !Sub '${AWS::Region}-${AWS::StackName}-PrimaryQueueUrl'
63 changes: 57 additions & 6 deletions tests/test_s3_event_config_lambda.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import zipfile
import io
import boto3
from moto import mock_s3, mock_lambda, mock_iam, mock_logs
from moto import mock_s3, mock_lambda, mock_iam, mock_sqs
import pytest

from src.lambda_function.s3_event_config import app


@pytest.fixture(scope="function")
def mock_iam_role(mock_aws_credentials):
def mock_iam_role():
with mock_iam():
iam = boto3.client("iam")
yield iam.create_role(
Expand All @@ -19,7 +19,7 @@ def mock_iam_role(mock_aws_credentials):


@pytest.fixture(scope="function")
def mock_lambda_function(mock_aws_credentials, mock_iam_role):
def mock_lambda_function(mock_iam_role):
with mock_lambda():
client = boto3.client("lambda")
client.create_function(
Expand All @@ -31,11 +31,23 @@ def mock_lambda_function(mock_aws_credentials, mock_iam_role):
yield client.get_function(FunctionName="some_function")


@pytest.fixture(scope="function")
def mock_sqs_queue(mock_aws_credentials):
with mock_sqs():
client = boto3.client("sqs")
client.create_queue(QueueName="test_sqs")
queue_url = client.get_queue_url(QueueName="test_sqs")
yield client.get_queue_attributes(
QueueUrl=queue_url["QueueUrl"], AttributeNames=["QueueArn"]
)


@mock_s3
def test_that_add_notification_adds_expected_settings(s3, mock_lambda_function):
def test_that_add_notification_adds_expected_settings_for_lambda(s3, mock_lambda_function):
s3.create_bucket(Bucket="some_bucket")
set_config = app.add_notification(
s3,
"LambdaFunction",
mock_lambda_function["Configuration"]["FunctionArn"],
"some_bucket",
"test_folder",
Expand All @@ -49,19 +61,58 @@ def test_that_add_notification_adds_expected_settings(s3, mock_lambda_function):
"s3:ObjectCreated:*"
]
assert get_config["LambdaFunctionConfigurations"][0]["Filter"] == {
"Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder"}]}
"Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]}
}


@mock_s3
def test_that_delete_notification_is_successful(s3, mock_lambda_function):
def test_that_delete_notification_is_successful_for_lambda(s3, mock_lambda_function):
s3.create_bucket(Bucket="some_bucket")
app.add_notification(
s3,
"LambdaFunction",
mock_lambda_function["Configuration"]["FunctionArn"],
"some_bucket",
"test_folder",
)
app.delete_notification(s3, "some_bucket")
get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket")
assert "LambdaFunctionConfigurations" not in get_config


@mock_s3
def test_that_add_notification_adds_expected_settings_for_sqs(s3, mock_sqs_queue):
s3.create_bucket(Bucket="some_bucket")
set_config = app.add_notification(
s3,
"Queue",
mock_sqs_queue['Attributes']['QueueArn'],
"some_bucket",
"test_folder",
)
get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket")
assert (
get_config["QueueConfigurations"][0]["QueueArn"]
== mock_sqs_queue['Attributes']['QueueArn']
)
assert get_config["QueueConfigurations"][0]["Events"] == [
"s3:ObjectCreated:*"
]
assert get_config["QueueConfigurations"][0]["Filter"] == {
"Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]}
}


@mock_s3
def test_that_delete_notification_is_successful_for_sqs(s3, mock_sqs_queue):
s3.create_bucket(Bucket="some_bucket")
app.add_notification(
s3,
"Queue",
mock_sqs_queue['Attributes']['QueueArn'],
"some_bucket",
"test_folder",
)
app.delete_notification(s3, "some_bucket")
get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket")
assert "QueueConfigurations" not in get_config

0 comments on commit f729901

Please sign in to comment.