Skip to content

Commit

Permalink
Merge pull request #81 from Sage-Bionetworks/etl-561
Browse files Browse the repository at this point in the history
[ETL-561] Unquote S3 key name before submitting to Glue
  • Loading branch information
philerooski authored Sep 29, 2023
2 parents 762add7 + d497ed8 commit ab93796
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 11 deletions.
32 changes: 22 additions & 10 deletions src/lambda_function/s3_to_glue/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import json
import logging
import boto3
from urllib import parse

logger = logging.getLogger()
logger.setLevel(logging.INFO)
Expand Down Expand Up @@ -91,8 +92,25 @@ def is_s3_test_event(record : dict) -> bool:
else:
return False

def get_object_info(s3_event) -> dict:
"""
Derive object info formatted for submission to Glue from an S3 event.
Args:
s3_event (dict): An S3 event
def lambda_handler(event, context) -> None:
Returns:
object_info (dict) The S3 object info
"""
bucket_name = s3_event["s3"]["bucket"]["name"]
object_key = parse.unquote(s3_event["s3"]["object"]["key"])
object_info = {
"source_bucket": bucket_name,
"source_key": object_key,
}
return object_info

def lambda_handler(event, context) -> dict:
"""
This main lambda function will be triggered by a SQS event and will
poll the SQS queue for all available S3 event messages. If the
Expand All @@ -113,27 +131,21 @@ def lambda_handler(event, context) -> None:
logger.info(f"Found AWS default s3:TestEvent. Skipping.")
else:
for s3_event in s3_event_records["Records"]:
bucket_name = s3_event["s3"]["bucket"]["name"]
object_key = s3_event["s3"]["object"]["key"]
object_info = {
"source_bucket": bucket_name,
"source_key": object_key,
}
object_info = get_object_info(s3_event)
if filter_object_info(object_info) is not None:
s3_objects_info.append(object_info)
else:
logger.info(
f"Object doesn't meet the S3 event rules to be processed. Skipping."
)

if len(s3_objects_info) > 0:
logger.info(
"Submitting the following files to "
f"{os.environ['PRIMARY_WORKFLOW_NAME']}: {json.dumps(s3_objects_info)}"
)
submit_s3_to_json_workflow(
objects_info=s3_objects_info,
workflow_name=os.environ["PRIMARY_WORKFLOW_NAME"],
objects_info=s3_objects_info,
workflow_name=os.environ["PRIMARY_WORKFLOW_NAME"]
)
else:
logger.info(
Expand Down
8 changes: 7 additions & 1 deletion tests/test_s3_to_glue_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ def object_info(self):

@pytest.fixture
def set_env_var(self, monkeypatch, sqs_queue):
monkeypatch.setenv("SQS_QUEUE_URL", sqs_queue["QueueUrl"])
monkeypatch.setenv("PRIMARY_WORKFLOW_NAME", "test_workflow")

def test_submit_s3_to_json_workflow(self, object_info, monkeypatch):
Expand Down Expand Up @@ -198,6 +197,13 @@ def test_that_lambda_handler_calls_submit_s3_to_json_workflow_if_queue_has_messa
workflow_name="test_workflow",
)

def test_get_object_info_unicode_characters_in_key(self, s3_event):
s3_event["s3"]["object"]["key"] = \
"main/2023-09-26T00%3A06%3A39Z_d873eafb-554f-4f8a-9e61-cdbcb7de07eb"
object_info = app.get_object_info(s3_event=s3_event)
assert object_info["source_key"] == \
"main/2023-09-26T00:06:39Z_d873eafb-554f-4f8a-9e61-cdbcb7de07eb"

@pytest.mark.parametrize(
"object_info,expected",
[
Expand Down

0 comments on commit ab93796

Please sign in to comment.